package outboundgroup import ( "context" "errors" "fmt" "strings" "sync" "time" "github.com/metacubex/mihomo/adapter/outbound" "github.com/metacubex/mihomo/common/atomic" "github.com/metacubex/mihomo/common/utils" C "github.com/metacubex/mihomo/constant" "github.com/metacubex/mihomo/constant/provider" types "github.com/metacubex/mihomo/constant/provider" "github.com/metacubex/mihomo/log" "github.com/metacubex/mihomo/tunnel" "github.com/dlclark/regexp2" "golang.org/x/exp/slices" ) type GroupBase struct { *outbound.Base filterRegs []*regexp2.Regexp excludeFilterRegs []*regexp2.Regexp excludeTypeArray []string providers []provider.ProxyProvider failedTestMux sync.Mutex failedTimes int failedTime time.Time failedTesting atomic.Bool TestTimeout int maxFailedTimes int // for GetProxies getProxiesMutex sync.Mutex providerVersions []uint32 providerProxies []C.Proxy } type GroupBaseOption struct { outbound.BaseOption filter string excludeFilter string excludeType string TestTimeout int maxFailedTimes int providers []provider.ProxyProvider } func NewGroupBase(opt GroupBaseOption) *GroupBase { if opt.RoutingMark != 0 { log.Warnln("The group [%s] with routing-mark configuration is deprecated, please set it directly on the proxy instead", opt.Name) } if opt.Interface != "" { log.Warnln("The group [%s] with interface-name configuration is deprecated, please set it directly on the proxy instead", opt.Name) } var excludeTypeArray []string if opt.excludeType != "" { excludeTypeArray = strings.Split(opt.excludeType, "|") } var excludeFilterRegs []*regexp2.Regexp if opt.excludeFilter != "" { for _, excludeFilter := range strings.Split(opt.excludeFilter, "`") { excludeFilterReg := regexp2.MustCompile(excludeFilter, regexp2.None) excludeFilterRegs = append(excludeFilterRegs, excludeFilterReg) } } var filterRegs []*regexp2.Regexp if opt.filter != "" { for _, filter := range strings.Split(opt.filter, "`") { filterReg := regexp2.MustCompile(filter, regexp2.None) filterRegs = append(filterRegs, filterReg) } } gb := &GroupBase{ Base: outbound.NewBase(opt.BaseOption), filterRegs: filterRegs, excludeFilterRegs: excludeFilterRegs, excludeTypeArray: excludeTypeArray, providers: opt.providers, failedTesting: atomic.NewBool(false), TestTimeout: opt.TestTimeout, maxFailedTimes: opt.maxFailedTimes, } if gb.TestTimeout == 0 { gb.TestTimeout = 5000 } if gb.maxFailedTimes == 0 { gb.maxFailedTimes = 5 } return gb } func (gb *GroupBase) Touch() { for _, pd := range gb.providers { pd.Touch() } } func (gb *GroupBase) GetProxies(touch bool) []C.Proxy { providerVersions := make([]uint32, len(gb.providers)) for i, pd := range gb.providers { if touch { // touch first pd.Touch() } providerVersions[i] = pd.Version() } // thread safe gb.getProxiesMutex.Lock() defer gb.getProxiesMutex.Unlock() // return the cached proxies if version not changed if slices.Equal(providerVersions, gb.providerVersions) { return gb.providerProxies } var proxies []C.Proxy if len(gb.filterRegs) == 0 { for _, pd := range gb.providers { proxies = append(proxies, pd.Proxies()...) } } else { for _, pd := range gb.providers { if pd.VehicleType() == types.Compatible { // compatible provider unneeded filter proxies = append(proxies, pd.Proxies()...) continue } var newProxies []C.Proxy proxiesSet := map[string]struct{}{} for _, filterReg := range gb.filterRegs { for _, p := range pd.Proxies() { name := p.Name() if mat, _ := filterReg.MatchString(name); mat { if _, ok := proxiesSet[name]; !ok { proxiesSet[name] = struct{}{} newProxies = append(newProxies, p) } } } } proxies = append(proxies, newProxies...) } } // Multiple filers means that proxies are sorted in the order in which the filers appear. // Although the filter has been performed once in the previous process, // when there are multiple providers, the array needs to be reordered as a whole. if len(gb.providers) > 1 && len(gb.filterRegs) > 1 { var newProxies []C.Proxy proxiesSet := map[string]struct{}{} for _, filterReg := range gb.filterRegs { for _, p := range proxies { name := p.Name() if mat, _ := filterReg.MatchString(name); mat { if _, ok := proxiesSet[name]; !ok { proxiesSet[name] = struct{}{} newProxies = append(newProxies, p) } } } } for _, p := range proxies { // add not matched proxies at the end name := p.Name() if _, ok := proxiesSet[name]; !ok { proxiesSet[name] = struct{}{} newProxies = append(newProxies, p) } } proxies = newProxies } if len(gb.excludeFilterRegs) > 0 { var newProxies []C.Proxy LOOP1: for _, p := range proxies { name := p.Name() for _, excludeFilterReg := range gb.excludeFilterRegs { if mat, _ := excludeFilterReg.MatchString(name); mat { continue LOOP1 } } newProxies = append(newProxies, p) } proxies = newProxies } if gb.excludeTypeArray != nil { var newProxies []C.Proxy LOOP2: for _, p := range proxies { mType := p.Type().String() for _, excludeType := range gb.excludeTypeArray { if strings.EqualFold(mType, excludeType) { continue LOOP2 } } newProxies = append(newProxies, p) } proxies = newProxies } if len(proxies) == 0 { return []C.Proxy{tunnel.Proxies()["COMPATIBLE"]} } // only cache when proxies not empty gb.providerVersions = providerVersions gb.providerProxies = proxies return proxies } func (gb *GroupBase) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16]) (map[string]uint16, error) { var wg sync.WaitGroup var lock sync.Mutex mp := map[string]uint16{} proxies := gb.GetProxies(false) for _, proxy := range proxies { proxy := proxy wg.Add(1) go func() { delay, err := proxy.URLTest(ctx, url, expectedStatus) if err == nil { lock.Lock() mp[proxy.Name()] = delay lock.Unlock() } wg.Done() }() } wg.Wait() if len(mp) == 0 { return mp, fmt.Errorf("get delay: all proxies timeout") } else { return mp, nil } } func (gb *GroupBase) onDialFailed(adapterType C.AdapterType, err error, fn func()) { if adapterType == C.Direct || adapterType == C.Compatible || adapterType == C.Reject || adapterType == C.Pass || adapterType == C.RejectDrop { return } if errors.Is(err, C.ErrNotSupport) { return } go func() { if strings.Contains(err.Error(), "connection refused") { fn() return } gb.failedTestMux.Lock() defer gb.failedTestMux.Unlock() gb.failedTimes++ if gb.failedTimes == 1 { log.Debugln("ProxyGroup: %s first failed", gb.Name()) gb.failedTime = time.Now() } else { if time.Since(gb.failedTime) > time.Duration(gb.TestTimeout)*time.Millisecond { gb.failedTimes = 0 return } log.Debugln("ProxyGroup: %s failed count: %d", gb.Name(), gb.failedTimes) if gb.failedTimes >= gb.maxFailedTimes { log.Warnln("because %s failed multiple times, active health check", gb.Name()) fn() } } }() } func (gb *GroupBase) healthCheck() { if gb.failedTesting.Load() { return } gb.failedTesting.Store(true) wg := sync.WaitGroup{} for _, proxyProvider := range gb.providers { wg.Add(1) proxyProvider := proxyProvider go func() { defer wg.Done() proxyProvider.HealthCheck() }() } wg.Wait() gb.failedTesting.Store(false) gb.failedTimes = 0 } func (gb *GroupBase) onDialSuccess() { if !gb.failedTesting.Load() { gb.failedTimes = 0 } }