From f318b805571187e287b80d7981421fdb7bab70e8 Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Tue, 11 Mar 2025 22:50:55 +0800 Subject: [PATCH] chore: better cache implement for group's getProxies --- adapter/outboundgroup/fallback.go | 4 +- adapter/outboundgroup/groupbase.go | 178 ++++++++++++++------------- adapter/outboundgroup/loadbalance.go | 4 +- adapter/outboundgroup/urltest.go | 70 ++++------- adapter/provider/provider.go | 3 +- constant/adapters.go | 1 - 6 files changed, 127 insertions(+), 133 deletions(-) diff --git a/adapter/outboundgroup/fallback.go b/adapter/outboundgroup/fallback.go index 9387f7de..b8bb458f 100644 --- a/adapter/outboundgroup/fallback.go +++ b/adapter/outboundgroup/fallback.go @@ -37,7 +37,7 @@ func (f *Fallback) DialContext(ctx context.Context, metadata *C.Metadata, opts . if err == nil { c.AppendToChains(f) } else { - f.onDialFailed(proxy.Type(), err) + f.onDialFailed(proxy.Type(), err, f.healthCheck) } if N.NeedHandshake(c) { @@ -45,7 +45,7 @@ func (f *Fallback) DialContext(ctx context.Context, metadata *C.Metadata, opts . if err == nil { f.onDialSuccess() } else { - f.onDialFailed(proxy.Type(), err) + f.onDialFailed(proxy.Type(), err, f.healthCheck) } }) } diff --git a/adapter/outboundgroup/groupbase.go b/adapter/outboundgroup/groupbase.go index f2a567ae..f8910168 100644 --- a/adapter/outboundgroup/groupbase.go +++ b/adapter/outboundgroup/groupbase.go @@ -2,6 +2,7 @@ package outboundgroup import ( "context" + "errors" "fmt" "strings" "sync" @@ -17,22 +18,26 @@ import ( "github.com/metacubex/mihomo/tunnel" "github.com/dlclark/regexp2" + "golang.org/x/exp/slices" ) type GroupBase struct { *outbound.Base - filterRegs []*regexp2.Regexp - excludeFilterReg *regexp2.Regexp - excludeTypeArray []string - providers []provider.ProxyProvider - failedTestMux sync.Mutex - failedTimes int - failedTime time.Time - failedTesting atomic.Bool - proxies [][]C.Proxy - versions []atomic.Uint32 - TestTimeout int - maxFailedTimes int + filterRegs []*regexp2.Regexp + excludeFilterRegs []*regexp2.Regexp + excludeTypeArray []string + providers []provider.ProxyProvider + failedTestMux sync.Mutex + failedTimes int + failedTime time.Time + failedTesting atomic.Bool + TestTimeout int + maxFailedTimes int + + // for GetProxies + getProxiesMutex sync.Mutex + providerVersions []uint32 + providerProxies []C.Proxy } type GroupBaseOption struct { @@ -53,15 +58,19 @@ func NewGroupBase(opt GroupBaseOption) *GroupBase { log.Warnln("The group [%s] with interface-name configuration is deprecated, please set it directly on the proxy instead", opt.Name) } - var excludeFilterReg *regexp2.Regexp - if opt.excludeFilter != "" { - excludeFilterReg = regexp2.MustCompile(opt.excludeFilter, regexp2.None) - } var excludeTypeArray []string if opt.excludeType != "" { excludeTypeArray = strings.Split(opt.excludeType, "|") } + var excludeFilterRegs []*regexp2.Regexp + if opt.excludeFilter != "" { + for _, excludeFilter := range strings.Split(opt.excludeFilter, "`") { + excludeFilterReg := regexp2.MustCompile(excludeFilter, regexp2.None) + excludeFilterRegs = append(excludeFilterRegs, excludeFilterReg) + } + } + var filterRegs []*regexp2.Regexp if opt.filter != "" { for _, filter := range strings.Split(opt.filter, "`") { @@ -71,14 +80,14 @@ func NewGroupBase(opt GroupBaseOption) *GroupBase { } gb := &GroupBase{ - Base: outbound.NewBase(opt.BaseOption), - filterRegs: filterRegs, - excludeFilterReg: excludeFilterReg, - excludeTypeArray: excludeTypeArray, - providers: opt.providers, - failedTesting: atomic.NewBool(false), - TestTimeout: opt.TestTimeout, - maxFailedTimes: opt.maxFailedTimes, + Base: outbound.NewBase(opt.BaseOption), + filterRegs: filterRegs, + excludeFilterRegs: excludeFilterRegs, + excludeTypeArray: excludeTypeArray, + providers: opt.providers, + failedTesting: atomic.NewBool(false), + TestTimeout: opt.TestTimeout, + maxFailedTimes: opt.maxFailedTimes, } if gb.TestTimeout == 0 { @@ -88,9 +97,6 @@ func NewGroupBase(opt GroupBaseOption) *GroupBase { gb.maxFailedTimes = 5 } - gb.proxies = make([][]C.Proxy, len(opt.providers)) - gb.versions = make([]atomic.Uint32, len(opt.providers)) - return gb } @@ -101,56 +107,55 @@ func (gb *GroupBase) Touch() { } func (gb *GroupBase) GetProxies(touch bool) []C.Proxy { + providerVersions := make([]uint32, len(gb.providers)) + for i, pd := range gb.providers { + if touch { // touch first + pd.Touch() + } + providerVersions[i] = pd.Version() + } + + // thread safe + gb.getProxiesMutex.Lock() + defer gb.getProxiesMutex.Unlock() + + // return the cached proxies if version not changed + if slices.Equal(providerVersions, gb.providerVersions) { + return gb.providerProxies + } + var proxies []C.Proxy if len(gb.filterRegs) == 0 { for _, pd := range gb.providers { - if touch { - pd.Touch() - } proxies = append(proxies, pd.Proxies()...) } } else { - for i, pd := range gb.providers { - if touch { - pd.Touch() - } - - if pd.VehicleType() == types.Compatible { - gb.versions[i].Store(pd.Version()) - gb.proxies[i] = pd.Proxies() + for _, pd := range gb.providers { + if pd.VehicleType() == types.Compatible { // compatible provider unneeded filter + proxies = append(proxies, pd.Proxies()...) continue } - version := gb.versions[i].Load() - if version != pd.Version() && gb.versions[i].CompareAndSwap(version, pd.Version()) { - var ( - proxies []C.Proxy - newProxies []C.Proxy - ) - - proxies = pd.Proxies() - proxiesSet := map[string]struct{}{} - for _, filterReg := range gb.filterRegs { - for _, p := range proxies { - name := p.Name() - if mat, _ := filterReg.MatchString(name); mat { - if _, ok := proxiesSet[name]; !ok { - proxiesSet[name] = struct{}{} - newProxies = append(newProxies, p) - } + var newProxies []C.Proxy + proxiesSet := map[string]struct{}{} + for _, filterReg := range gb.filterRegs { + for _, p := range pd.Proxies() { + name := p.Name() + if mat, _ := filterReg.MatchString(name); mat { + if _, ok := proxiesSet[name]; !ok { + proxiesSet[name] = struct{}{} + newProxies = append(newProxies, p) } } } - - gb.proxies[i] = newProxies } - } - - for _, p := range gb.proxies { - proxies = append(proxies, p...) + proxies = append(proxies, newProxies...) } } + // Multiple filers means that proxies are sorted in the order in which the filers appear. + // Although the filter has been performed once in the previous process, + // when there are multiple providers, the array needs to be reordered as a whole. if len(gb.providers) > 1 && len(gb.filterRegs) > 1 { var newProxies []C.Proxy proxiesSet := map[string]struct{}{} @@ -174,32 +179,31 @@ func (gb *GroupBase) GetProxies(touch bool) []C.Proxy { } proxies = newProxies } - if gb.excludeTypeArray != nil { - var newProxies []C.Proxy - for _, p := range proxies { - mType := p.Type().String() - flag := false - for i := range gb.excludeTypeArray { - if strings.EqualFold(mType, gb.excludeTypeArray[i]) { - flag = true - break - } - } - if flag { - continue + if len(gb.excludeFilterRegs) > 0 { + var newProxies []C.Proxy + LOOP1: + for _, p := range proxies { + name := p.Name() + for _, excludeFilterReg := range gb.excludeFilterRegs { + if mat, _ := excludeFilterReg.MatchString(name); mat { + continue LOOP1 + } } newProxies = append(newProxies, p) } proxies = newProxies } - if gb.excludeFilterReg != nil { + if gb.excludeTypeArray != nil { var newProxies []C.Proxy + LOOP2: for _, p := range proxies { - name := p.Name() - if mat, _ := gb.excludeFilterReg.MatchString(name); mat { - continue + mType := p.Type().String() + for _, excludeType := range gb.excludeTypeArray { + if strings.EqualFold(mType, excludeType) { + continue LOOP2 + } } newProxies = append(newProxies, p) } @@ -207,9 +211,13 @@ func (gb *GroupBase) GetProxies(touch bool) []C.Proxy { } if len(proxies) == 0 { - return append(proxies, tunnel.Proxies()["COMPATIBLE"]) + return []C.Proxy{tunnel.Proxies()["COMPATIBLE"]} } + // only cache when proxies not empty + gb.providerVersions = providerVersions + gb.providerProxies = proxies + return proxies } @@ -241,17 +249,21 @@ func (gb *GroupBase) URLTest(ctx context.Context, url string, expectedStatus uti } } -func (gb *GroupBase) onDialFailed(adapterType C.AdapterType, err error) { +func (gb *GroupBase) onDialFailed(adapterType C.AdapterType, err error, fn func()) { if adapterType == C.Direct || adapterType == C.Compatible || adapterType == C.Reject || adapterType == C.Pass || adapterType == C.RejectDrop { return } - if strings.Contains(err.Error(), "connection refused") { - go gb.healthCheck() + if errors.Is(err, C.ErrNotSupport) { return } go func() { + if strings.Contains(err.Error(), "connection refused") { + fn() + return + } + gb.failedTestMux.Lock() defer gb.failedTestMux.Unlock() @@ -268,7 +280,7 @@ func (gb *GroupBase) onDialFailed(adapterType C.AdapterType, err error) { log.Debugln("ProxyGroup: %s failed count: %d", gb.Name(), gb.failedTimes) if gb.failedTimes >= gb.maxFailedTimes { log.Warnln("because %s failed multiple times, active health check", gb.Name()) - gb.healthCheck() + fn() } } }() diff --git a/adapter/outboundgroup/loadbalance.go b/adapter/outboundgroup/loadbalance.go index 048cc34c..c3222b3a 100644 --- a/adapter/outboundgroup/loadbalance.go +++ b/adapter/outboundgroup/loadbalance.go @@ -95,7 +95,7 @@ func (lb *LoadBalance) DialContext(ctx context.Context, metadata *C.Metadata, op if err == nil { c.AppendToChains(lb) } else { - lb.onDialFailed(proxy.Type(), err) + lb.onDialFailed(proxy.Type(), err, lb.healthCheck) } if N.NeedHandshake(c) { @@ -103,7 +103,7 @@ func (lb *LoadBalance) DialContext(ctx context.Context, metadata *C.Metadata, op if err == nil { lb.onDialSuccess() } else { - lb.onDialFailed(proxy.Type(), err) + lb.onDialFailed(proxy.Type(), err, lb.healthCheck) } }) } diff --git a/adapter/outboundgroup/urltest.go b/adapter/outboundgroup/urltest.go index 5da44f38..bdb15734 100644 --- a/adapter/outboundgroup/urltest.go +++ b/adapter/outboundgroup/urltest.go @@ -4,8 +4,6 @@ import ( "context" "encoding/json" "errors" - "fmt" - "sync" "time" "github.com/metacubex/mihomo/adapter/outbound" @@ -54,13 +52,13 @@ func (u *URLTest) Set(name string) error { if p == nil { return errors.New("proxy not exist") } - u.selected = name - u.fast(false) + u.ForceSet(name) return nil } func (u *URLTest) ForceSet(name string) { u.selected = name + u.fastSingle.Reset() } // DialContext implements C.ProxyAdapter @@ -70,7 +68,7 @@ func (u *URLTest) DialContext(ctx context.Context, metadata *C.Metadata, opts .. if err == nil { c.AppendToChains(u) } else { - u.onDialFailed(proxy.Type(), err) + u.onDialFailed(proxy.Type(), err, u.healthCheck) } if N.NeedHandshake(c) { @@ -78,7 +76,7 @@ func (u *URLTest) DialContext(ctx context.Context, metadata *C.Metadata, opts .. if err == nil { u.onDialSuccess() } else { - u.onDialFailed(proxy.Type(), err) + u.onDialFailed(proxy.Type(), err, u.healthCheck) } }) } @@ -88,9 +86,12 @@ func (u *URLTest) DialContext(ctx context.Context, metadata *C.Metadata, opts .. // ListenPacketContext implements C.ProxyAdapter func (u *URLTest) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.PacketConn, error) { - pc, err := u.fast(true).ListenPacketContext(ctx, metadata, u.Base.DialOptions(opts...)...) + proxy := u.fast(true) + pc, err := proxy.ListenPacketContext(ctx, metadata, u.Base.DialOptions(opts...)...) if err == nil { pc.AppendToChains(u) + } else { + u.onDialFailed(proxy.Type(), err, u.healthCheck) } return pc, err @@ -101,22 +102,27 @@ func (u *URLTest) Unwrap(metadata *C.Metadata, touch bool) C.Proxy { return u.fast(touch) } -func (u *URLTest) fast(touch bool) C.Proxy { +func (u *URLTest) healthCheck() { + u.fastSingle.Reset() + u.GroupBase.healthCheck() + u.fastSingle.Reset() +} - proxies := u.GetProxies(touch) - if u.selected != "" { - for _, proxy := range proxies { - if !proxy.AliveForTestUrl(u.testUrl) { - continue - } - if proxy.Name() == u.selected { - u.fastNode = proxy - return proxy +func (u *URLTest) fast(touch bool) C.Proxy { + elm, _, shared := u.fastSingle.Do(func() (C.Proxy, error) { + proxies := u.GetProxies(touch) + if u.selected != "" { + for _, proxy := range proxies { + if !proxy.AliveForTestUrl(u.testUrl) { + continue + } + if proxy.Name() == u.selected { + u.fastNode = proxy + return proxy, nil + } } } - } - elm, _, shared := u.fastSingle.Do(func() (C.Proxy, error) { fast := proxies[0] minDelay := fast.LastDelayForTestUrl(u.testUrl) fastNotExist := true @@ -182,31 +188,7 @@ func (u *URLTest) MarshalJSON() ([]byte, error) { } func (u *URLTest) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16]) (map[string]uint16, error) { - var wg sync.WaitGroup - var lock sync.Mutex - mp := map[string]uint16{} - proxies := u.GetProxies(false) - for _, proxy := range proxies { - proxy := proxy - wg.Add(1) - go func() { - delay, err := proxy.URLTest(ctx, u.testUrl, expectedStatus) - if err == nil { - lock.Lock() - mp[proxy.Name()] = delay - lock.Unlock() - } - - wg.Done() - }() - } - wg.Wait() - - if len(mp) == 0 { - return mp, fmt.Errorf("get delay: all proxies timeout") - } else { - return mp, nil - } + return u.GroupBase.URLTest(ctx, u.testUrl, expectedStatus) } func parseURLTestOption(config map[string]any) []urlTestOption { diff --git a/adapter/provider/provider.go b/adapter/provider/provider.go index 64d83b08..2abf9618 100644 --- a/adapter/provider/provider.go +++ b/adapter/provider/provider.go @@ -87,6 +87,7 @@ func (bp *baseProvider) RegisterHealthCheckTask(url string, expectedStatus utils func (bp *baseProvider) setProxies(proxies []C.Proxy) { bp.proxies = proxies + bp.version += 1 bp.healthCheck.setProxy(proxies) if bp.healthCheck.auto() { go bp.healthCheck.check() @@ -173,7 +174,7 @@ func NewProxySetProvider(name string, interval time.Duration, parser resource.Pa }, } - fetcher := resource.NewFetcher[[]C.Proxy](name, interval, vehicle, parser, proxiesOnUpdate(pd)) + fetcher := resource.NewFetcher[[]C.Proxy](name, interval, vehicle, parser, pd.setProxies) pd.Fetcher = fetcher if httpVehicle, ok := vehicle.(*resource.HTTPVehicle); ok { httpVehicle.SetInRead(func(resp *http.Response) { diff --git a/constant/adapters.go b/constant/adapters.go index b6b104c9..5dc5a717 100644 --- a/constant/adapters.go +++ b/constant/adapters.go @@ -153,7 +153,6 @@ type ProxyAdapter interface { type Group interface { URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16]) (mp map[string]uint16, err error) - GetProxies(touch bool) []Proxy Touch() }