From 622d99d000439df85d4b4030937c2cfc280bcb0f Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Thu, 3 Apr 2025 22:42:32 +0800 Subject: [PATCH] chore: rebuild outdated proxy auto close mechanism --- adapter/adapter.go | 23 ++++- adapter/outbound/anytls.go | 13 +-- adapter/outbound/base.go | 11 +++ adapter/outbound/hysteria.go | 18 ++-- adapter/outbound/hysteria2.go | 17 ++-- adapter/outbound/hysteria2_test.go | 38 ------- adapter/outbound/hysteria_test.go | 39 -------- adapter/outbound/mieru.go | 10 +- adapter/outbound/singmux.go | 28 +++--- adapter/outbound/ssh.go | 44 ++++----- adapter/outbound/trojan.go | 8 ++ adapter/outbound/vless.go | 8 ++ adapter/outbound/vmess.go | 8 ++ adapter/outbound/wireguard.go | 154 ++--------------------------- adapter/outbound/wireguard_test.go | 45 --------- adapter/parser.go | 7 +- constant/adapters.go | 3 + 17 files changed, 122 insertions(+), 352 deletions(-) delete mode 100644 adapter/outbound/hysteria2_test.go delete mode 100644 adapter/outbound/hysteria_test.go delete mode 100644 adapter/outbound/wireguard_test.go diff --git a/adapter/adapter.go b/adapter/adapter.go index 31706a2e..ea528b00 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -9,8 +9,10 @@ import ( "net/http" "net/netip" "net/url" + "runtime" "strconv" "strings" + "sync" "time" "github.com/metacubex/mihomo/common/atomic" @@ -39,6 +41,9 @@ type Proxy struct { alive atomic.Bool history *queue.Queue[C.DelayHistory] extra *xsync.MapOf[string, *internalProxyState] + + closeOnce sync.Once + closeErr error } // Adapter implements C.Proxy @@ -290,12 +295,28 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In t = uint16(time.Since(start) / time.Millisecond) return } + +func (p *Proxy) Close() error { + p.closeOnce.Do(func() { + runtime.SetFinalizer(p, nil) + p.closeErr = p.ProxyAdapter.Close() + }) + return p.closeErr +} + func NewProxy(adapter C.ProxyAdapter) *Proxy { - return &Proxy{ + proxy := &Proxy{ ProxyAdapter: adapter, history: queue.New[C.DelayHistory](defaultHistoriesNum), alive: atomic.NewBool(true), extra: xsync.NewMapOf[string, *internalProxyState]()} + + // auto close ProxyAdapter + runtime.SetFinalizer(proxy, func(p *Proxy) { + log.Debugln("Closing outdated proxy [%s]", p.Name()) + _ = p.Close() + }) + return proxy } func urlToMetadata(rawURL string) (addr C.Metadata, err error) { diff --git a/adapter/outbound/anytls.go b/adapter/outbound/anytls.go index 66722261..4727b188 100644 --- a/adapter/outbound/anytls.go +++ b/adapter/outbound/anytls.go @@ -4,7 +4,6 @@ import ( "context" "errors" "net" - "runtime" "strconv" "time" @@ -52,7 +51,7 @@ func (t *AnyTLS) DialContext(ctx context.Context, metadata *C.Metadata, opts ... if err != nil { return nil, err } - return NewConn(CN.NewRefConn(c, t), t), nil + return NewConn(c, t), nil } func (t *AnyTLS) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) { @@ -73,7 +72,7 @@ func (t *AnyTLS) ListenPacketContext(ctx context.Context, metadata *C.Metadata, metadata.DstIP = ip } destination := M.SocksaddrFromNet(metadata.UDPAddr()) - return newPacketConn(CN.NewRefPacketConn(CN.NewThreadSafePacketConn(uot.NewLazyConn(c, uot.Request{Destination: destination})), t), t), nil + return newPacketConn(CN.NewThreadSafePacketConn(uot.NewLazyConn(c, uot.Request{Destination: destination})), t), nil } // SupportUOT implements C.ProxyAdapter @@ -88,6 +87,11 @@ func (t *AnyTLS) ProxyInfo() C.ProxyInfo { return info } +// Close implements C.ProxyAdapter +func (t *AnyTLS) Close() error { + return t.client.Close() +} + func NewAnyTLS(option AnyTLSOption) (*AnyTLS, error) { addr := net.JoinHostPort(option.Server, strconv.Itoa(option.Port)) @@ -132,9 +136,6 @@ func NewAnyTLS(option AnyTLSOption) (*AnyTLS, error) { option: &option, dialer: singDialer, } - runtime.SetFinalizer(outbound, func(o *AnyTLS) { - _ = o.client.Close() - }) return outbound, nil } diff --git a/adapter/outbound/base.go b/adapter/outbound/base.go index 56bed9d5..84e9b7c2 100644 --- a/adapter/outbound/base.go +++ b/adapter/outbound/base.go @@ -13,6 +13,11 @@ import ( C "github.com/metacubex/mihomo/constant" ) +type ProxyAdapter interface { + C.ProxyAdapter + DialOptions(opts ...dialer.Option) []dialer.Option +} + type Base struct { name string addr string @@ -152,6 +157,10 @@ func (b *Base) DialOptions(opts ...dialer.Option) []dialer.Option { return opts } +func (b *Base) Close() error { + return nil +} + type BasicOption struct { TFO bool `proxy:"tfo,omitempty"` MPTCP bool `proxy:"mptcp,omitempty"` @@ -224,6 +233,7 @@ func (c *conn) ReaderReplaceable() bool { func NewConn(c net.Conn, a C.ProxyAdapter) C.Conn { if _, ok := c.(syscall.Conn); !ok { // exclusion system conn like *net.TCPConn c = N.NewDeadlineConn(c) // most conn from outbound can't handle readDeadline correctly + c = N.NewRefConn(c, a) // add ref for autoCloseProxyAdapter } return &conn{N.NewExtendedConn(c), []string{a.Name()}, parseRemoteDestination(a.Addr())} } @@ -271,6 +281,7 @@ func newPacketConn(pc net.PacketConn, a C.ProxyAdapter) C.PacketConn { epc := N.NewEnhancePacketConn(pc) if _, ok := pc.(syscall.Conn); !ok { // exclusion system conn like *net.UDPConn epc = N.NewDeadlineEnhancePacketConn(epc) // most conn from outbound can't handle readDeadline correctly + epc = N.NewRefPacketConn(epc, a) // add ref for autoCloseProxyAdapter } return &packetConn{epc, []string{a.Name()}, a.Name(), utils.NewUUIDV4().String(), parseRemoteDestination(a.Addr())} } diff --git a/adapter/outbound/hysteria.go b/adapter/outbound/hysteria.go index b0edab02..39b5bbdb 100644 --- a/adapter/outbound/hysteria.go +++ b/adapter/outbound/hysteria.go @@ -7,7 +7,6 @@ import ( "fmt" "net" "net/netip" - "runtime" "strconv" "time" @@ -15,7 +14,6 @@ import ( "github.com/metacubex/quic-go/congestion" M "github.com/sagernet/sing/common/metadata" - CN "github.com/metacubex/mihomo/common/net" "github.com/metacubex/mihomo/component/ca" "github.com/metacubex/mihomo/component/dialer" "github.com/metacubex/mihomo/component/proxydialer" @@ -45,8 +43,6 @@ type Hysteria struct { option *HysteriaOption client *core.Client - - closeCh chan struct{} // for test } func (h *Hysteria) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) { @@ -55,7 +51,7 @@ func (h *Hysteria) DialContext(ctx context.Context, metadata *C.Metadata, opts . return nil, err } - return NewConn(CN.NewRefConn(tcpConn, h), h), nil + return NewConn(tcpConn, h), nil } func (h *Hysteria) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.PacketConn, error) { @@ -63,7 +59,7 @@ func (h *Hysteria) ListenPacketContext(ctx context.Context, metadata *C.Metadata if err != nil { return nil, err } - return newPacketConn(CN.NewRefPacketConn(&hyPacketConn{udpConn}, h), h), nil + return newPacketConn(&hyPacketConn{udpConn}, h), nil } func (h *Hysteria) genHdc(ctx context.Context, opts ...dialer.Option) utils.PacketDialer { @@ -239,18 +235,16 @@ func NewHysteria(option HysteriaOption) (*Hysteria, error) { option: &option, client: client, } - runtime.SetFinalizer(outbound, closeHysteria) return outbound, nil } -func closeHysteria(h *Hysteria) { +// Close implements C.ProxyAdapter +func (h *Hysteria) Close() error { if h.client != nil { - _ = h.client.Close() - } - if h.closeCh != nil { - close(h.closeCh) + return h.client.Close() } + return nil } type hyPacketConn struct { diff --git a/adapter/outbound/hysteria2.go b/adapter/outbound/hysteria2.go index fa0cebdb..95bbb5ec 100644 --- a/adapter/outbound/hysteria2.go +++ b/adapter/outbound/hysteria2.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "net" - "runtime" "strconv" "time" @@ -39,8 +38,6 @@ type Hysteria2 struct { option *Hysteria2Option client *hysteria2.Client dialer proxydialer.SingDialer - - closeCh chan struct{} // for test } type Hysteria2Option struct { @@ -78,7 +75,7 @@ func (h *Hysteria2) DialContext(ctx context.Context, metadata *C.Metadata, opts if err != nil { return nil, err } - return NewConn(CN.NewRefConn(c, h), h), nil + return NewConn(c, h), nil } func (h *Hysteria2) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) { @@ -91,16 +88,15 @@ func (h *Hysteria2) ListenPacketContext(ctx context.Context, metadata *C.Metadat if pc == nil { return nil, errors.New("packetConn is nil") } - return newPacketConn(CN.NewRefPacketConn(CN.NewThreadSafePacketConn(pc), h), h), nil + return newPacketConn(CN.NewThreadSafePacketConn(pc), h), nil } -func closeHysteria2(h *Hysteria2) { +// Close implements C.ProxyAdapter +func (h *Hysteria2) Close() error { if h.client != nil { - _ = h.client.CloseWithError(errors.New("proxy removed")) - } - if h.closeCh != nil { - close(h.closeCh) + return h.client.CloseWithError(errors.New("proxy removed")) } + return nil } // ProxyInfo implements C.ProxyAdapter @@ -226,7 +222,6 @@ func NewHysteria2(option Hysteria2Option) (*Hysteria2, error) { client: client, dialer: singDialer, } - runtime.SetFinalizer(outbound, closeHysteria2) return outbound, nil } diff --git a/adapter/outbound/hysteria2_test.go b/adapter/outbound/hysteria2_test.go deleted file mode 100644 index de7d8227..00000000 --- a/adapter/outbound/hysteria2_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package outbound - -import ( - "context" - "runtime" - "testing" - "time" -) - -func TestHysteria2GC(t *testing.T) { - option := Hysteria2Option{} - option.Server = "127.0.0.1" - option.Ports = "200,204,401-429,501-503" - option.HopInterval = 30 - option.Password = "password" - option.Obfs = "salamander" - option.ObfsPassword = "password" - option.SNI = "example.com" - option.ALPN = []string{"h3"} - hy, err := NewHysteria2(option) - if err != nil { - t.Error(err) - return - } - closeCh := make(chan struct{}) - hy.closeCh = closeCh - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - hy = nil - runtime.GC() - select { - case <-closeCh: - return - case <-ctx.Done(): - t.Error("timeout not GC") - } -} diff --git a/adapter/outbound/hysteria_test.go b/adapter/outbound/hysteria_test.go deleted file mode 100644 index f2297c60..00000000 --- a/adapter/outbound/hysteria_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package outbound - -import ( - "context" - "runtime" - "testing" - "time" -) - -func TestHysteriaGC(t *testing.T) { - option := HysteriaOption{} - option.Server = "127.0.0.1" - option.Ports = "200,204,401-429,501-503" - option.Protocol = "udp" - option.Up = "1Mbps" - option.Down = "1Mbps" - option.HopInterval = 30 - option.Obfs = "salamander" - option.SNI = "example.com" - option.ALPN = []string{"h3"} - hy, err := NewHysteria(option) - if err != nil { - t.Error(err) - return - } - closeCh := make(chan struct{}) - hy.closeCh = closeCh - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - hy = nil - runtime.GC() - select { - case <-closeCh: - return - case <-ctx.Done(): - t.Error("timeout not GC") - } -} diff --git a/adapter/outbound/mieru.go b/adapter/outbound/mieru.go index d2209442..b0f24323 100644 --- a/adapter/outbound/mieru.go +++ b/adapter/outbound/mieru.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net" - "runtime" "strconv" "sync" @@ -62,7 +61,7 @@ func (m *Mieru) ListenPacketContext(ctx context.Context, metadata *C.Metadata, o if err != nil { return nil, fmt.Errorf("dial to %s failed: %w", metadata.UDPAddr(), err) } - return newPacketConn(CN.NewRefPacketConn(CN.NewThreadSafePacketConn(mierucommon.NewUDPAssociateWrapper(mierucommon.NewPacketOverStreamTunnel(c))), m), m), nil + return newPacketConn(CN.NewThreadSafePacketConn(mierucommon.NewUDPAssociateWrapper(mierucommon.NewPacketOverStreamTunnel(c))), m), nil } // SupportUOT implements C.ProxyAdapter @@ -141,16 +140,17 @@ func NewMieru(option MieruOption) (*Mieru, error) { option: &option, client: c, } - runtime.SetFinalizer(outbound, closeMieru) return outbound, nil } -func closeMieru(m *Mieru) { +// Close implements C.ProxyAdapter +func (m *Mieru) Close() error { m.mu.Lock() defer m.mu.Unlock() if m.client != nil && m.client.IsRunning() { - m.client.Stop() + return m.client.Stop() } + return nil } func metadataToMieruNetAddrSpec(metadata *C.Metadata) mierumodel.NetAddrSpec { diff --git a/adapter/outbound/singmux.go b/adapter/outbound/singmux.go index 26a8d26e..13f9f66c 100644 --- a/adapter/outbound/singmux.go +++ b/adapter/outbound/singmux.go @@ -3,7 +3,6 @@ package outbound import ( "context" "errors" - "runtime" CN "github.com/metacubex/mihomo/common/net" "github.com/metacubex/mihomo/component/dialer" @@ -18,8 +17,7 @@ import ( ) type SingMux struct { - C.ProxyAdapter - base ProxyBase + ProxyAdapter client *mux.Client dialer proxydialer.SingDialer onlyTcp bool @@ -43,25 +41,21 @@ type BrutalOption struct { Down string `proxy:"down,omitempty"` } -type ProxyBase interface { - DialOptions(opts ...dialer.Option) []dialer.Option -} - func (s *SingMux) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.Conn, err error) { - options := s.base.DialOptions(opts...) + options := s.ProxyAdapter.DialOptions(opts...) s.dialer.SetDialer(dialer.NewDialer(options...)) c, err := s.client.DialContext(ctx, "tcp", M.ParseSocksaddrHostPort(metadata.String(), metadata.DstPort)) if err != nil { return nil, err } - return NewConn(CN.NewRefConn(c, s), s.ProxyAdapter), err + return NewConn(c, s), err } func (s *SingMux) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) { if s.onlyTcp { return s.ProxyAdapter.ListenPacketContext(ctx, metadata, opts...) } - options := s.base.DialOptions(opts...) + options := s.ProxyAdapter.DialOptions(opts...) s.dialer.SetDialer(dialer.NewDialer(options...)) // sing-mux use stream-oriented udp with a special address, so we need a net.UDPAddr @@ -80,7 +74,7 @@ func (s *SingMux) ListenPacketContext(ctx context.Context, metadata *C.Metadata, if pc == nil { return nil, E.New("packetConn is nil") } - return newPacketConn(CN.NewRefPacketConn(CN.NewThreadSafePacketConn(pc), s), s.ProxyAdapter), nil + return newPacketConn(CN.NewThreadSafePacketConn(pc), s), nil } func (s *SingMux) SupportUDP() bool { @@ -103,11 +97,15 @@ func (s *SingMux) ProxyInfo() C.ProxyInfo { return info } -func closeSingMux(s *SingMux) { - _ = s.client.Close() +// Close implements C.ProxyAdapter +func (s *SingMux) Close() error { + if s.client != nil { + _ = s.client.Close() + } + return s.ProxyAdapter.Close() } -func NewSingMux(option SingMuxOption, proxy C.ProxyAdapter, base ProxyBase) (C.ProxyAdapter, error) { +func NewSingMux(option SingMuxOption, proxy ProxyAdapter) (ProxyAdapter, error) { // TODO // "TCP Brutal is only supported on Linux-based systems" @@ -131,11 +129,9 @@ func NewSingMux(option SingMuxOption, proxy C.ProxyAdapter, base ProxyBase) (C.P } outbound := &SingMux{ ProxyAdapter: proxy, - base: base, client: client, dialer: singDialer, onlyTcp: option.OnlyTcp, } - runtime.SetFinalizer(outbound, closeSingMux) return outbound, nil } diff --git a/adapter/outbound/ssh.go b/adapter/outbound/ssh.go index d746842d..2f08bdbc 100644 --- a/adapter/outbound/ssh.go +++ b/adapter/outbound/ssh.go @@ -7,7 +7,6 @@ import ( "fmt" "net" "os" - "runtime" "strconv" "strings" "sync" @@ -25,7 +24,10 @@ type Ssh struct { *Base option *SshOption - client *sshClient // using a standalone struct to avoid its inner loop invalidate the Finalizer + + config *ssh.ClientConfig + client *ssh.Client + cMutex sync.Mutex } type SshOption struct { @@ -49,7 +51,7 @@ func (s *Ssh) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dia return nil, err } } - client, err := s.client.connect(ctx, cDialer, s.addr) + client, err := s.connect(ctx, cDialer, s.addr) if err != nil { return nil, err } @@ -58,16 +60,10 @@ func (s *Ssh) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dia return nil, err } - return NewConn(N.NewRefConn(c, s), s), nil + return NewConn(c, s), nil } -type sshClient struct { - config *ssh.ClientConfig - client *ssh.Client - cMutex sync.Mutex -} - -func (s *sshClient) connect(ctx context.Context, cDialer C.Dialer, addr string) (client *ssh.Client, err error) { +func (s *Ssh) connect(ctx context.Context, cDialer C.Dialer, addr string) (client *ssh.Client, err error) { s.cMutex.Lock() defer s.cMutex.Unlock() if s.client != nil { @@ -108,7 +104,15 @@ func (s *sshClient) connect(ctx context.Context, cDialer C.Dialer, addr string) return client, nil } -func (s *sshClient) Close() error { +// ProxyInfo implements C.ProxyAdapter +func (s *Ssh) ProxyInfo() C.ProxyInfo { + info := s.Base.ProxyInfo() + info.DialerProxy = s.option.DialerProxy + return info +} + +// Close implements C.ProxyAdapter +func (s *Ssh) Close() error { s.cMutex.Lock() defer s.cMutex.Unlock() if s.client != nil { @@ -117,17 +121,6 @@ func (s *sshClient) Close() error { return nil } -func closeSsh(s *Ssh) { - _ = s.client.Close() -} - -// ProxyInfo implements C.ProxyAdapter -func (s *Ssh) ProxyInfo() C.ProxyInfo { - info := s.Base.ProxyInfo() - info.DialerProxy = s.option.DialerProxy - return info -} - func NewSsh(option SshOption) (*Ssh, error) { addr := net.JoinHostPort(option.Server, strconv.Itoa(option.Port)) @@ -204,11 +197,8 @@ func NewSsh(option SshOption) (*Ssh, error) { prefer: C.NewDNSPrefer(option.IPVersion), }, option: &option, - client: &sshClient{ - config: &config, - }, + config: &config, } - runtime.SetFinalizer(outbound, closeSsh) return outbound, nil } diff --git a/adapter/outbound/trojan.go b/adapter/outbound/trojan.go index b6265439..c62c1eb4 100644 --- a/adapter/outbound/trojan.go +++ b/adapter/outbound/trojan.go @@ -251,6 +251,14 @@ func (t *Trojan) ProxyInfo() C.ProxyInfo { return info } +// Close implements C.ProxyAdapter +func (t *Trojan) Close() error { + if t.transport != nil { + return t.transport.Close() + } + return nil +} + func NewTrojan(option TrojanOption) (*Trojan, error) { addr := net.JoinHostPort(option.Server, strconv.Itoa(option.Port)) diff --git a/adapter/outbound/vless.go b/adapter/outbound/vless.go index ab5167bf..6609812e 100644 --- a/adapter/outbound/vless.go +++ b/adapter/outbound/vless.go @@ -385,6 +385,14 @@ func (v *Vless) ProxyInfo() C.ProxyInfo { return info } +// Close implements C.ProxyAdapter +func (v *Vless) Close() error { + if v.transport != nil { + return v.transport.Close() + } + return nil +} + func parseVlessAddr(metadata *C.Metadata, xudp bool) *vless.DstAddr { var addrType byte var addr []byte diff --git a/adapter/outbound/vmess.go b/adapter/outbound/vmess.go index 0ea8d859..54a25711 100644 --- a/adapter/outbound/vmess.go +++ b/adapter/outbound/vmess.go @@ -395,6 +395,14 @@ func (v *Vmess) ProxyInfo() C.ProxyInfo { return info } +// Close implements C.ProxyAdapter +func (v *Vmess) Close() error { + if v.transport != nil { + return v.transport.Close() + } + return nil +} + // ListenPacketOnStreamConn implements C.ProxyAdapter func (v *Vmess) ListenPacketOnStreamConn(ctx context.Context, c net.Conn, metadata *C.Metadata) (_ C.PacketConn, err error) { // vmess use stream-oriented udp with a special address, so we need a net.UDPAddr diff --git a/adapter/outbound/wireguard.go b/adapter/outbound/wireguard.go index 2834d324..57138947 100644 --- a/adapter/outbound/wireguard.go +++ b/adapter/outbound/wireguard.go @@ -8,14 +8,12 @@ import ( "fmt" "net" "net/netip" - "runtime" "strconv" "strings" "sync" "time" "github.com/metacubex/mihomo/common/atomic" - CN "github.com/metacubex/mihomo/common/net" "github.com/metacubex/mihomo/component/dialer" "github.com/metacubex/mihomo/component/proxydialer" "github.com/metacubex/mihomo/component/resolver" @@ -45,7 +43,6 @@ type WireGuard struct { tunDevice wireguard.Device dialer proxydialer.SingDialer resolver resolver.Resolver - refP *refProxyAdapter initOk atomic.Bool initMutex sync.Mutex @@ -57,8 +54,6 @@ type WireGuard struct { serverAddrMap map[M.Socksaddr]netip.AddrPort serverAddrTime atomic.TypedValue[time.Time] serverAddrMutex sync.Mutex - - closeCh chan struct{} // for test } type WireGuardOption struct { @@ -173,7 +168,6 @@ func NewWireGuard(option WireGuardOption) (*WireGuard, error) { }, dialer: proxydialer.NewSlowDownSingDialer(proxydialer.NewByNameSingDialer(option.DialerProxy, dialer.NewDialer()), slowdown.New()), } - runtime.SetFinalizer(outbound, closeWireGuard) var reserved [3]uint8 if len(option.Reserved) > 0 { @@ -286,15 +280,13 @@ func NewWireGuard(option WireGuardOption) (*WireGuard, error) { } } - refP := &refProxyAdapter{} - outbound.refP = refP if option.RemoteDnsResolve && len(option.Dns) > 0 { nss, err := dns.ParseNameServer(option.Dns) if err != nil { return nil, err } for i := range nss { - nss[i].ProxyAdapter = refP + nss[i].ProxyAdapter = outbound } outbound.resolver = dns.NewResolver(dns.Config{ Main: nss, @@ -488,13 +480,12 @@ func (w *WireGuard) genIpcConf(ctx context.Context, updateOnly bool) (string, er return ipcConf, nil } -func closeWireGuard(w *WireGuard) { +// Close implements C.ProxyAdapter +func (w *WireGuard) Close() error { if w.device != nil { w.device.Close() } - if w.closeCh != nil { - close(w.closeCh) - } + return nil } func (w *WireGuard) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.Conn, err error) { @@ -507,8 +498,6 @@ func (w *WireGuard) DialContext(ctx context.Context, metadata *C.Metadata, opts if !metadata.Resolved() || w.resolver != nil { r := resolver.DefaultResolver if w.resolver != nil { - w.refP.SetProxyAdapter(w) - defer w.refP.ClearProxyAdapter() r = w.resolver } options = append(options, dialer.WithResolver(r)) @@ -523,7 +512,7 @@ func (w *WireGuard) DialContext(ctx context.Context, metadata *C.Metadata, opts if conn == nil { return nil, E.New("conn is nil") } - return NewConn(CN.NewRefConn(conn, w), w), nil + return NewConn(conn, w), nil } func (w *WireGuard) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) { @@ -536,8 +525,6 @@ func (w *WireGuard) ListenPacketContext(ctx context.Context, metadata *C.Metadat if (!metadata.Resolved() || w.resolver != nil) && metadata.Host != "" { r := resolver.DefaultResolver if w.resolver != nil { - w.refP.SetProxyAdapter(w) - defer w.refP.ClearProxyAdapter() r = w.resolver } ip, err := resolver.ResolveIPWithResolver(ctx, metadata.Host, r) @@ -553,139 +540,10 @@ func (w *WireGuard) ListenPacketContext(ctx context.Context, metadata *C.Metadat if pc == nil { return nil, E.New("packetConn is nil") } - return newPacketConn(CN.NewRefPacketConn(pc, w), w), nil + return newPacketConn(pc, w), nil } // IsL3Protocol implements C.ProxyAdapter func (w *WireGuard) IsL3Protocol(metadata *C.Metadata) bool { return true } - -type refProxyAdapter struct { - proxyAdapter C.ProxyAdapter - count int - mutex sync.Mutex -} - -func (r *refProxyAdapter) SetProxyAdapter(proxyAdapter C.ProxyAdapter) { - r.mutex.Lock() - defer r.mutex.Unlock() - r.proxyAdapter = proxyAdapter - r.count++ -} - -func (r *refProxyAdapter) ClearProxyAdapter() { - r.mutex.Lock() - defer r.mutex.Unlock() - r.count-- - if r.count == 0 { - r.proxyAdapter = nil - } -} - -func (r *refProxyAdapter) Name() string { - if r.proxyAdapter != nil { - return r.proxyAdapter.Name() - } - return "" -} - -func (r *refProxyAdapter) Type() C.AdapterType { - if r.proxyAdapter != nil { - return r.proxyAdapter.Type() - } - return C.AdapterType(0) -} - -func (r *refProxyAdapter) Addr() string { - if r.proxyAdapter != nil { - return r.proxyAdapter.Addr() - } - return "" -} - -func (r *refProxyAdapter) SupportUDP() bool { - if r.proxyAdapter != nil { - return r.proxyAdapter.SupportUDP() - } - return false -} - -func (r *refProxyAdapter) ProxyInfo() C.ProxyInfo { - if r.proxyAdapter != nil { - return r.proxyAdapter.ProxyInfo() - } - return C.ProxyInfo{} -} - -func (r *refProxyAdapter) MarshalJSON() ([]byte, error) { - if r.proxyAdapter != nil { - return r.proxyAdapter.MarshalJSON() - } - return nil, C.ErrNotSupport -} - -func (r *refProxyAdapter) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.Metadata) (net.Conn, error) { - if r.proxyAdapter != nil { - return r.proxyAdapter.StreamConnContext(ctx, c, metadata) - } - return nil, C.ErrNotSupport -} - -func (r *refProxyAdapter) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) { - if r.proxyAdapter != nil { - return r.proxyAdapter.DialContext(ctx, metadata, opts...) - } - return nil, C.ErrNotSupport -} - -func (r *refProxyAdapter) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.PacketConn, error) { - if r.proxyAdapter != nil { - return r.proxyAdapter.ListenPacketContext(ctx, metadata, opts...) - } - return nil, C.ErrNotSupport -} - -func (r *refProxyAdapter) SupportUOT() bool { - if r.proxyAdapter != nil { - return r.proxyAdapter.SupportUOT() - } - return false -} - -func (r *refProxyAdapter) SupportWithDialer() C.NetWork { - if r.proxyAdapter != nil { - return r.proxyAdapter.SupportWithDialer() - } - return C.InvalidNet -} - -func (r *refProxyAdapter) DialContextWithDialer(ctx context.Context, dialer C.Dialer, metadata *C.Metadata) (C.Conn, error) { - if r.proxyAdapter != nil { - return r.proxyAdapter.DialContextWithDialer(ctx, dialer, metadata) - } - return nil, C.ErrNotSupport -} - -func (r *refProxyAdapter) ListenPacketWithDialer(ctx context.Context, dialer C.Dialer, metadata *C.Metadata) (C.PacketConn, error) { - if r.proxyAdapter != nil { - return r.proxyAdapter.ListenPacketWithDialer(ctx, dialer, metadata) - } - return nil, C.ErrNotSupport -} - -func (r *refProxyAdapter) IsL3Protocol(metadata *C.Metadata) bool { - if r.proxyAdapter != nil { - return r.proxyAdapter.IsL3Protocol(metadata) - } - return false -} - -func (r *refProxyAdapter) Unwrap(metadata *C.Metadata, touch bool) C.Proxy { - if r.proxyAdapter != nil { - return r.proxyAdapter.Unwrap(metadata, touch) - } - return nil -} - -var _ C.ProxyAdapter = (*refProxyAdapter)(nil) diff --git a/adapter/outbound/wireguard_test.go b/adapter/outbound/wireguard_test.go deleted file mode 100644 index 2248bb7b..00000000 --- a/adapter/outbound/wireguard_test.go +++ /dev/null @@ -1,45 +0,0 @@ -//go:build with_gvisor - -package outbound - -import ( - "context" - "runtime" - "testing" - "time" -) - -func TestWireGuardGC(t *testing.T) { - option := WireGuardOption{} - option.Server = "162.159.192.1" - option.Port = 2408 - option.PrivateKey = "iOx7749AdqH3IqluG7+0YbGKd0m1mcEXAfGRzpy9rG8=" - option.PublicKey = "bmXOC+F1FxEMF9dyiK2H5/1SUtzH0JuVo51h2wPfgyo=" - option.Ip = "172.16.0.2" - option.Ipv6 = "2606:4700:110:8d29:be92:3a6a:f4:c437" - option.Reserved = []uint8{51, 69, 125} - wg, err := NewWireGuard(option) - if err != nil { - t.Error(err) - } - closeCh := make(chan struct{}) - wg.closeCh = closeCh - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - err = wg.init(ctx) - if err != nil { - t.Error(err) - return - } - // must do a small sleep before test GC - // because it maybe deadlocks if w.device.Close call too fast after w.device.Start - time.Sleep(10 * time.Millisecond) - wg = nil - runtime.GC() - select { - case <-closeCh: - return - case <-ctx.Done(): - t.Error("timeout not GC") - } -} diff --git a/adapter/parser.go b/adapter/parser.go index 9b256e6d..203531c9 100644 --- a/adapter/parser.go +++ b/adapter/parser.go @@ -3,10 +3,9 @@ package adapter import ( "fmt" - tlsC "github.com/metacubex/mihomo/component/tls" - "github.com/metacubex/mihomo/adapter/outbound" "github.com/metacubex/mihomo/common/structure" + tlsC "github.com/metacubex/mihomo/component/tls" C "github.com/metacubex/mihomo/constant" ) @@ -18,7 +17,7 @@ func ParseProxy(mapping map[string]any) (C.Proxy, error) { } var ( - proxy C.ProxyAdapter + proxy outbound.ProxyAdapter err error ) switch proxyType { @@ -170,7 +169,7 @@ func ParseProxy(mapping map[string]any) (C.Proxy, error) { return nil, err } if muxOption.Enabled { - proxy, err = outbound.NewSingMux(*muxOption, proxy, proxy.(outbound.ProxyBase)) + proxy, err = outbound.NewSingMux(*muxOption, proxy) if err != nil { return nil, err } diff --git a/constant/adapters.go b/constant/adapters.go index 5dc5a717..4289dfa7 100644 --- a/constant/adapters.go +++ b/constant/adapters.go @@ -149,6 +149,9 @@ type ProxyAdapter interface { // Unwrap extracts the proxy from a proxy-group. It returns nil when nothing to extract. Unwrap(metadata *Metadata, touch bool) Proxy + + // Close releasing associated resources + Close() error } type Group interface {