From e81f3a97afb4324d2cd977ed9b6167d21da4b3bb Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Fri, 4 Apr 2025 09:08:52 +0800 Subject: [PATCH] fix: correctly implement references to proxies --- adapter/adapter.go | 22 +---------- adapter/outbound/base.go | 85 +++++++++++++++++++++++++++++++++++++++- adapter/parser.go | 1 + common/net/refconn.go | 2 +- 4 files changed, 86 insertions(+), 24 deletions(-) diff --git a/adapter/adapter.go b/adapter/adapter.go index ea528b00..4320702d 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -9,10 +9,8 @@ import ( "net/http" "net/netip" "net/url" - "runtime" "strconv" "strings" - "sync" "time" "github.com/metacubex/mihomo/common/atomic" @@ -41,9 +39,6 @@ type Proxy struct { alive atomic.Bool history *queue.Queue[C.DelayHistory] extra *xsync.MapOf[string, *internalProxyState] - - closeOnce sync.Once - closeErr error } // Adapter implements C.Proxy @@ -296,27 +291,12 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In return } -func (p *Proxy) Close() error { - p.closeOnce.Do(func() { - runtime.SetFinalizer(p, nil) - p.closeErr = p.ProxyAdapter.Close() - }) - return p.closeErr -} - func NewProxy(adapter C.ProxyAdapter) *Proxy { - proxy := &Proxy{ + return &Proxy{ ProxyAdapter: adapter, history: queue.New[C.DelayHistory](defaultHistoriesNum), alive: atomic.NewBool(true), extra: xsync.NewMapOf[string, *internalProxyState]()} - - // auto close ProxyAdapter - runtime.SetFinalizer(proxy, func(p *Proxy) { - log.Debugln("Closing outdated proxy [%s]", p.Name()) - _ = p.Close() - }) - return proxy } func urlToMetadata(rawURL string) (addr C.Metadata, err error) { diff --git a/adapter/outbound/base.go b/adapter/outbound/base.go index 9c3515c0..b2f1ac56 100644 --- a/adapter/outbound/base.go +++ b/adapter/outbound/base.go @@ -4,13 +4,16 @@ import ( "context" "encoding/json" "net" + "runtime" "strings" + "sync" "syscall" N "github.com/metacubex/mihomo/common/net" "github.com/metacubex/mihomo/common/utils" "github.com/metacubex/mihomo/component/dialer" C "github.com/metacubex/mihomo/constant" + "github.com/metacubex/mihomo/log" ) type ProxyAdapter interface { @@ -230,10 +233,13 @@ func (c *conn) ReaderReplaceable() bool { return true } +func (c *conn) AddRef(ref any) { + c.ExtendedConn = N.NewRefConn(c.ExtendedConn, ref) // add ref for autoCloseProxyAdapter +} + func NewConn(c net.Conn, a C.ProxyAdapter) C.Conn { if _, ok := c.(syscall.Conn); !ok { // exclusion system conn like *net.TCPConn c = N.NewDeadlineConn(c) // most conn from outbound can't handle readDeadline correctly - c = N.NewRefConn(c, a) // add ref for autoCloseProxyAdapter } return &conn{N.NewExtendedConn(c), []string{a.Name()}, parseRemoteDestination(a.Addr())} } @@ -277,11 +283,14 @@ func (c *packetConn) ReaderReplaceable() bool { return true } +func (c *packetConn) AddRef(ref any) { + c.EnhancePacketConn = N.NewRefPacketConn(c.EnhancePacketConn, ref) // add ref for autoCloseProxyAdapter +} + func newPacketConn(pc net.PacketConn, a C.ProxyAdapter) C.PacketConn { epc := N.NewEnhancePacketConn(pc) if _, ok := pc.(syscall.Conn); !ok { // exclusion system conn like *net.UDPConn epc = N.NewDeadlineEnhancePacketConn(epc) // most conn from outbound can't handle readDeadline correctly - epc = N.NewRefPacketConn(epc, a) // add ref for autoCloseProxyAdapter } return &packetConn{epc, []string{a.Name()}, a.Name(), utils.NewUUIDV4().String(), parseRemoteDestination(a.Addr())} } @@ -297,3 +306,75 @@ func parseRemoteDestination(addr string) string { } } } + +type AddRef interface { + AddRef(ref any) +} + +type autoCloseProxyAdapter struct { + ProxyAdapter + closeOnce sync.Once + closeErr error +} + +func (p *autoCloseProxyAdapter) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.Conn, err error) { + c, err := p.ProxyAdapter.DialContext(ctx, metadata, opts...) + if err != nil { + return nil, err + } + if c, ok := c.(AddRef); ok { + c.AddRef(p) + } + return c, nil +} + +func (p *autoCloseProxyAdapter) DialContextWithDialer(ctx context.Context, dialer C.Dialer, metadata *C.Metadata) (_ C.Conn, err error) { + c, err := p.ProxyAdapter.DialContextWithDialer(ctx, dialer, metadata) + if err != nil { + return nil, err + } + if c, ok := c.(AddRef); ok { + c.AddRef(p) + } + return c, nil +} + +func (p *autoCloseProxyAdapter) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) { + pc, err := p.ProxyAdapter.ListenPacketContext(ctx, metadata, opts...) + if err != nil { + return nil, err + } + if pc, ok := pc.(AddRef); ok { + pc.AddRef(p) + } + return pc, nil +} + +func (p *autoCloseProxyAdapter) ListenPacketWithDialer(ctx context.Context, dialer C.Dialer, metadata *C.Metadata) (_ C.PacketConn, err error) { + pc, err := p.ProxyAdapter.ListenPacketWithDialer(ctx, dialer, metadata) + if err != nil { + return nil, err + } + if pc, ok := pc.(AddRef); ok { + pc.AddRef(p) + } + return pc, nil +} + +func (p *autoCloseProxyAdapter) Close() error { + p.closeOnce.Do(func() { + log.Debugln("Closing outdated proxy [%s]", p.Name()) + runtime.SetFinalizer(p, nil) + p.closeErr = p.ProxyAdapter.Close() + }) + return p.closeErr +} + +func NewAutoCloseProxyAdapter(adapter ProxyAdapter) ProxyAdapter { + proxy := &autoCloseProxyAdapter{ + ProxyAdapter: adapter, + } + // auto close ProxyAdapter + runtime.SetFinalizer(proxy, (*autoCloseProxyAdapter).Close) + return proxy +} diff --git a/adapter/parser.go b/adapter/parser.go index 203531c9..48359f70 100644 --- a/adapter/parser.go +++ b/adapter/parser.go @@ -176,5 +176,6 @@ func ParseProxy(mapping map[string]any) (C.Proxy, error) { } } + proxy = outbound.NewAutoCloseProxyAdapter(proxy) return NewProxy(proxy), nil } diff --git a/common/net/refconn.go b/common/net/refconn.go index 6d0dde98..b1e6c9fc 100644 --- a/common/net/refconn.go +++ b/common/net/refconn.go @@ -77,6 +77,6 @@ func (c *refConn) WriterReplaceable() bool { // Relay() will handle reference var _ ExtendedConn = (*refConn)(nil) -func NewRefConn(conn net.Conn, ref any) net.Conn { +func NewRefConn(conn net.Conn, ref any) ExtendedConn { return &refConn{conn: NewExtendedConn(conn), ref: ref} }