From 1ff56ceb69ef71012e3866884c8494428e2536e9 Mon Sep 17 00:00:00 2001 From: anytls Date: Thu, 27 Mar 2025 14:20:18 +0900 Subject: [PATCH] Protocol version 2 --- listener/anytls/server.go | 6 ++++ transport/anytls/session/session.go | 23 +++++++++----- transport/anytls/session/stream.go | 49 +++++++++++++++++++++++++++-- 3 files changed, 69 insertions(+), 9 deletions(-) diff --git a/listener/anytls/server.go b/listener/anytls/server.go index 31a7c55a..aa8d946a 100644 --- a/listener/anytls/server.go +++ b/listener/anytls/server.go @@ -174,6 +174,12 @@ func (l *Listener) HandleConn(conn net.Conn, h *sing.ListenerHandler) { return } + // It seems that mihomo does not implement a connection error reporting mechanism, so we report success directly. + err = stream.HandshakeSuccess() + if err != nil { + return + } + h.NewConnection(ctx, stream, M.Metadata{ Source: M.SocksaddrFromNet(conn.RemoteAddr()), Destination: destination, diff --git a/transport/anytls/session/session.go b/transport/anytls/session/session.go index 887a5f6d..c80639ce 100644 --- a/transport/anytls/session/session.go +++ b/transport/anytls/session/session.go @@ -3,6 +3,7 @@ package session import ( "crypto/md5" "encoding/binary" + "fmt" "io" "net" "runtime/debug" @@ -215,13 +216,6 @@ func (s *Session) recvLoop() error { s.streams[sid] = stream go func() { if s.onNewStream != nil { - // report SYNACK to client - if s.peerVersion >= 2 { - if _, err := s.writeFrame(newFrame(cmdSYNACK, sid)); err != nil { - s.Close() - return - } - } s.onNewStream(stream) } else { stream.Close() @@ -236,6 +230,21 @@ func (s *Session) recvLoop() error { s.synDone = nil } s.synDoneLock.Unlock() + if hdr.Length() > 0 { + buffer := pool.Get(int(hdr.Length())) + if _, err := io.ReadFull(s.conn, buffer); err != nil { + pool.Put(buffer) + return err + } + // report error + s.streamLock.RLock() + stream, ok := s.streams[sid] + s.streamLock.RUnlock() + if ok { + stream.CloseWithError(fmt.Errorf("remote: %s", string(buffer))) + } + pool.Put(buffer) + } case cmdFIN: s.streamLock.RLock() stream, ok := s.streams[sid] diff --git a/transport/anytls/session/stream.go b/transport/anytls/session/stream.go index 83edc736..1ec8fbc9 100644 --- a/transport/anytls/session/stream.go +++ b/transport/anytls/session/stream.go @@ -22,6 +22,9 @@ type Stream struct { dieOnce sync.Once dieHook func() + dieErr error + + reportOnce sync.Once } // newStream initiates a Stream struct @@ -36,7 +39,11 @@ func newStream(id uint32, sess *Session) *Stream { // Read implements net.Conn func (s *Stream) Read(b []byte) (n int, err error) { - return s.pipeR.Read(b) + n, err = s.pipeR.Read(b) + if s.dieErr != nil { + err = s.dieErr + } + return } // Write implements net.Conn @@ -54,8 +61,16 @@ func (s *Stream) Write(b []byte) (n int, err error) { // Close implements net.Conn func (s *Stream) Close() error { + return s.CloseWithError(io.ErrClosedPipe) +} + +func (s *Stream) CloseWithError(err error) error { + // if err != io.ErrClosedPipe { + // logrus.Debugln(err) + // } var once bool s.dieOnce.Do(func() { + s.dieErr = err s.pipeR.Close() once = true }) @@ -66,7 +81,7 @@ func (s *Stream) Close() error { } return s.sess.streamClosed(s.id) } else { - return io.ErrClosedPipe + return s.dieErr } } @@ -103,3 +118,33 @@ func (s *Stream) RemoteAddr() net.Addr { } return nil } + +// HandshakeFailure should be called when Server fail to create outbound proxy +func (s *Stream) HandshakeFailure(err error) error { + var once bool + s.reportOnce.Do(func() { + once = true + }) + if once && err != nil && s.sess.peerVersion >= 2 { + f := newFrame(cmdSYNACK, s.id) + f.data = []byte(err.Error()) + if _, err := s.sess.writeFrame(f); err != nil { + return err + } + } + return nil +} + +// HandshakeSuccess should be called when Server success to create outbound proxy +func (s *Stream) HandshakeSuccess() error { + var once bool + s.reportOnce.Do(func() { + once = true + }) + if once && s.sess.peerVersion >= 2 { + if _, err := s.sess.writeFrame(newFrame(cmdSYNACK, s.id)); err != nil { + return err + } + } + return nil +}