diff --git a/common/net/bufconn.go b/common/net/bufconn.go index 6da2d9d1..b840fefc 100644 --- a/common/net/bufconn.go +++ b/common/net/bufconn.go @@ -22,6 +22,16 @@ func NewBufferedConn(c net.Conn) *BufferedConn { return &BufferedConn{bufio.NewReader(c), NewExtendedConn(c), false} } +func WarpConnWithBioReader(c net.Conn, br *bufio.Reader) net.Conn { + if br != nil && br.Buffered() > 0 { + if bc, ok := c.(*BufferedConn); ok && bc.r == br { + return bc + } + return &BufferedConn{br, NewExtendedConn(c), true} + } + return c +} + // Reader returns the internal bufio.Reader. func (c *BufferedConn) Reader() *bufio.Reader { return c.r diff --git a/transport/vmess/websocket.go b/transport/vmess/websocket.go index 83f5e3c2..1117edaf 100644 --- a/transport/vmess/websocket.go +++ b/transport/vmess/websocket.go @@ -1,7 +1,6 @@ package vmess import ( - "bufio" "bytes" "context" "crypto/tls" @@ -393,7 +392,11 @@ func streamWebsocketConn(ctx context.Context, conn net.Conn, c *WebsocketConfig, return nil, fmt.Errorf("dial %s error: %w", uri.Host, err) } - conn = newWebsocketConn(conn, reader, ws.StateClientSide) + // some bytes which could be written by the peer right after response and be caught by us during buffered read, + // so we need warp Conn with bio.Reader + conn = N.WarpConnWithBioReader(conn, reader) + + conn = newWebsocketConn(conn, ws.StateClientSide) // websocketConn can't correct handle ReadDeadline // so call N.NewDeadlineConn to add a safe wrapper return N.NewDeadlineConn(conn), nil @@ -419,19 +422,13 @@ func StreamWebsocketConn(ctx context.Context, conn net.Conn, c *WebsocketConfig) return streamWebsocketConn(ctx, conn, c, nil) } -func newWebsocketConn(conn net.Conn, br *bufio.Reader, state ws.State) *websocketConn { +func newWebsocketConn(conn net.Conn, state ws.State) *websocketConn { controlHandler := wsutil.ControlFrameHandler(conn, state) - var reader io.Reader - if br != nil && br.Buffered() > 0 { - reader = br - } else { - reader = conn - } return &websocketConn{ Conn: conn, state: state, reader: &wsutil.Reader{ - Source: reader, + Source: conn, State: state, SkipHeaderCheck: true, CheckUTF8: false, @@ -463,7 +460,11 @@ func StreamUpgradedWebsocketConn(w http.ResponseWriter, r *http.Request) (net.Co if err != nil { return nil, err } - conn := newWebsocketConn(wsConn, rw.Reader, ws.StateServerSide) + + // gobwas/ws will flush rw.Writer, so we only need warp rw.Reader + wsConn = N.WarpConnWithBioReader(wsConn, rw.Reader) + + conn := newWebsocketConn(wsConn, ws.StateServerSide) if edBuf := decodeXray0rtt(r.Header); len(edBuf) > 0 { return N.NewDeadlineConn(&websocketWithReaderConn{conn, io.MultiReader(bytes.NewReader(edBuf), conn)}), nil }