diff --git a/adapter/outboundgroup/http2ping.go b/adapter/outboundgroup/http2ping.go new file mode 100644 index 00000000..d8dbb340 --- /dev/null +++ b/adapter/outboundgroup/http2ping.go @@ -0,0 +1,194 @@ +package outboundgroup + +import ( + "context" + "encoding/json" + "fmt" + _ "net/http/pprof" + "net/url" + "reflect" + "time" + + "github.com/metacubex/mihomo/adapter/outbound" + "github.com/metacubex/mihomo/adapter/outboundgroup/http2ping" + "github.com/metacubex/mihomo/common/callback" + N "github.com/metacubex/mihomo/common/net" + "github.com/metacubex/mihomo/component/dialer" + C "github.com/metacubex/mihomo/constant" + "github.com/metacubex/mihomo/constant/provider" + "github.com/metacubex/mihomo/log" +) + +type HTTP2Ping struct { + *GroupBase + g http2ping.PingerGroup + cachedProxies []C.Proxy +} + +func (hp *HTTP2Ping) Now() string { + proxy := hp.getBestProxy() + return proxy.Name() +} + +// DialContext implements C.ProxyAdapter +func (hp *HTTP2Ping) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) { + start := time.Now() + proxy := hp.getBestProxy() + if proxy == nil { + // TODO: fix this + log.Warnln("[htt2ping] no proxy available, dial direct to %v", metadata) + direct := outbound.NewDirect() + return direct.DialContext(ctx, metadata, hp.Base.DialOptions(opts...)...) + } + if cost := time.Since(start).Milliseconds(); cost > 0 { + log.Warnln("[htt2ping] getBestProxy took %d ms to %v", cost, metadata) + } + + c, err := proxy.DialContext(ctx, metadata, hp.Base.DialOptions(opts...)...) + if err == nil { + c.AppendToChains(hp) + } else { + hp.onDialFailed(proxy.Type(), err) + } + + if N.NeedHandshake(c) { + c = callback.NewFirstWriteCallBackConn(c, func(err error) { + if err == nil { + hp.onDialSuccess() + } else { + hp.onDialFailed(proxy.Type(), err) + } + }) + } + + return c, err +} + +// ListenPacketContext implements C.ProxyAdapter +func (hp *HTTP2Ping) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.PacketConn, error) { + proxy := hp.getBestProxy() + pc, err := proxy.ListenPacketContext(ctx, metadata, hp.Base.DialOptions(opts...)...) + if err == nil { + pc.AppendToChains(hp) + } + + return pc, err +} + +// SupportUDP implements C.ProxyAdapter +func (hp *HTTP2Ping) SupportUDP() bool { + proxy := hp.getBestProxy() + return proxy.SupportUDP() +} + +// IsL3Protocol implements C.ProxyAdapter +func (hp *HTTP2Ping) IsL3Protocol(metadata *C.Metadata) bool { + return hp.getBestProxy().IsL3Protocol(metadata) +} + +// MarshalJSON implements C.ProxyAdapter +func (hp *HTTP2Ping) MarshalJSON() ([]byte, error) { + all := []string{} + for _, proxy := range hp.GetProxies(false) { + all = append(all, proxy.Name()) + } + return json.Marshal(map[string]any{ + "type": hp.Type().String(), + "now": hp.Now(), + "all": all, + }) +} + +// Unwrap implements C.ProxyAdapter +func (hp *HTTP2Ping) Unwrap(metadata *C.Metadata, touch bool) C.Proxy { + proxy := hp.getBestProxy() + return proxy +} + +func (hp *HTTP2Ping) Set(name string) error { + return fmt.Errorf("not implemented") +} + +func (hp *HTTP2Ping) ForceSet(name string) { + log.Warnln("not implemented") +} + +func (hp *HTTP2Ping) getBestProxy() C.Proxy { + return hp.g.GetMinRttProxy(context.TODO()) +} + +func (hp *HTTP2Ping) pollForProviderProxiesUpdate(providers []provider.ProxyProvider) { + // TODO: use dynamic fallback timer + ticker := time.NewTicker(time.Second) + for range ticker.C { + proxies := hp.GetProxies(true) + if reflect.DeepEqual(proxies, hp.cachedProxies) { + continue + } + hp.cachedProxies = proxies + hp.g.SetProxies(proxies) + } +} + +func NewHTTP2Ping(option *GroupCommonOption, providers []provider.ProxyProvider, cfg *http2ping.Config) *HTTP2Ping { + hp := &HTTP2Ping{ + GroupBase: NewGroupBase(GroupBaseOption{ + outbound.BaseOption{ + Name: option.Name, + Type: C.Fallback, + Interface: option.Interface, + RoutingMark: option.RoutingMark, + }, + option.Filter, + option.ExcludeFilter, + option.ExcludeType, + providers, + }), + g: http2ping.NewHTTP2PingGroup(cfg), + } + go hp.pollForProviderProxiesUpdate(providers) + return hp +} + +func parseHTTP2PingOption(m map[string]any) *http2ping.Config { + config := http2ping.Config{} + + interval := 1000 + if v, ok := m["interval"]; ok { + if i, ok := v.(int); ok { + if i <= 0 { + panic("`interval` must be greater than zero") + } + interval = i + } + } + config.Interval = time.Millisecond * time.Duration(interval) + + tolerance := 0 + if v, ok := m["tolerance"]; ok { + if t, ok := v.(int); ok { + if t < 0 { + panic("`tolerance` can't be negative number") + } + tolerance = t + } + } + config.Tolerance = time.Millisecond * time.Duration(tolerance) + + // For testing the usage of http2 server, using cli tool `h2i`: + // + // # h2i google.com + // # ping + server := "https://cloudflare.com" + if v, ok := m["server"]; ok { + if s, ok := v.(string); ok { + server = s + } + } + if u, err := url.Parse(server); err != nil { + panic("invalid http2ping server: " + server) + } else { + config.HTTP2Server = u + } + return &config +} diff --git a/adapter/outboundgroup/http2ping/README.md b/adapter/outboundgroup/http2ping/README.md new file mode 100644 index 00000000..ce56478b --- /dev/null +++ b/adapter/outboundgroup/http2ping/README.md @@ -0,0 +1,54 @@ +# http2ping for Clash.META + +使用 HTTP2 Ping Frame 监测链路 rtt, 并从中选择 rtt 最优的链路 + +## 为什么 + +相比于`url-test`, http2ping 针对每个 endpoint 建立一条 HTTP2 长连接, +避免了频繁建立/断开连接, +因此我们可以使用更低的 interval(1s) 进行接近实时的 rtt 监测. + +相比于使用`ICMP ping`进行延迟检测, 对于某些使用中转服务的网络接入供应商, +ICMP packets 只能检测`用户->中转->落地`这条链路的第一部分而非整条链路的完整 RTT. + +相比于使用`http://www.gstatic.com/generate_204`这类常见的基于 HTTP 的 health check, +如果使用 HTTP 协议, 部分鸡贼的网络接入供应商会在中转服务器进行 MITM 直接返回 HTTP 204 response, 以试图欺骗客户. + +## 配置 + +```YAML +# enable verbose logging for more infomation +log-level: debug +proxy-groups: + - name: min-rtt-group + type: http2-ping + filter: "hk" + use: + - airport_1 + # interval milliseconds for sending Ping frame, default value: 1000ms + interval: 1000 + # tolerance for changing current best route, default value: 0ms + tolerance: 0 + # target server, default server: https://cloudflare.com + server: https://cloudflare.com +``` + +## 测试 + +For debugging: + +```bash +#!/bin/bash + +interface=enp1s0 +ip=1.1.1.1 +delay=100ms + +# add latency to ip address +tc qdisc add dev $interface root handle 1: prio +tc filter add dev $interface parent 1:0 protocol ip prio 1 u32 match ip dst $ip flowid 2:1 +tc qdisc add dev $interface parent 1:1 handle 2: netem delay $delay + +# remove tc rules +tc qdisc del dev $interface root +``` \ No newline at end of file diff --git a/adapter/outboundgroup/http2ping/group.go b/adapter/outboundgroup/http2ping/group.go new file mode 100644 index 00000000..9ba64467 --- /dev/null +++ b/adapter/outboundgroup/http2ping/group.go @@ -0,0 +1,132 @@ +package http2ping + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/metacubex/mihomo/constant" + "github.com/metacubex/mihomo/log" + "golang.org/x/exp/maps" +) + +var _ PingerGroup = (*http2PingGroup)(nil) + +type http2PingGroup struct { + mu sync.RWMutex + pingers map[string]Pinger + resolver *dnsResolver + dieCh chan struct{} + config *Config + best atomic.Value +} + +func NewHTTP2PingGroup(config *Config) PingerGroup { + g := &http2PingGroup{ + pingers: make(map[string]Pinger), + resolver: newDnsResolver(), + dieCh: make(chan struct{}), + config: config, + } + go g.loop(config.Interval) + return g +} + +func (g *http2PingGroup) SetProxies(proxies []constant.Proxy) { + g.mu.Lock() + defer g.mu.Unlock() + + newPingers := make(map[string]Pinger) + oldPingers := g.pingers + for _, proxy := range proxies { + key, err := g.resolver.DomainPortToIpPort(proxy.Addr()) + if err != nil { + log.Errorln("[http2ping] resolve domain error for %s: %v", proxy.Addr(), err) + continue + } + if _, ok := newPingers[key]; ok { + log.Debugln("[http2ping] duplicate proxy [%s] with addr: %s", proxy.Addr(), key) + continue + } + if _, ok := oldPingers[key]; !ok { + log.Infoln("[http2ping] add proxy [%s:%s] with addr: %s", proxy.Name(), proxy.Addr(), key) + newPingers[key] = NewHTTP2Pinger(g.config, proxy) + } else { + newPingers[key] = oldPingers[key] + delete(oldPingers, key) + } + } + for _, deadp := range oldPingers { + deadp.Close() + } + g.pingers = newPingers +} + +func (g *http2PingGroup) getPingersCopy() map[string]Pinger { + g.mu.RLock() + defer g.mu.RUnlock() + + return maps.Clone(g.pingers) +} + +func (g *http2PingGroup) loop(interval time.Duration) { + ticker := time.NewTicker(interval) + for { + select { + case <-g.dieCh: + return + case <-ticker.C: + var newBest Pinger + minRtt := uint32(1<<31 - 1) + for _, p := range g.getPingersCopy() { + if rtt := p.GetSmoothRtt(); rtt > 0 && rtt < minRtt { + minRtt = rtt + newBest = p + } + } + oldBest := g.loadBest() + if g.foundBetterProxy(oldBest, newBest) { + g.best.Store(newBest) + } + } + } +} + +func (g *http2PingGroup) GetMinRttProxy(ctx context.Context) constant.Proxy { + if p := g.loadBest(); p != nil { + return p.GetProxy() + } + return nil +} + +func (g *http2PingGroup) Close() error { + close(g.dieCh) + return nil +} + +func (g *http2PingGroup) loadBest() Pinger { + if ptr := g.best.Load(); ptr != nil { + if p, ok := ptr.(Pinger); ok { + return p + } + } + return nil +} + +func (g *http2PingGroup) foundBetterProxy(oldp, newp Pinger) bool { + if oldp == nil && newp != nil { + return true + } + if oldp == newp || newp == nil { + return false + } + oldRtt := time.Millisecond * time.Duration(oldp.GetSmoothRtt()) + newRtt := time.Millisecond * time.Duration(newp.GetSmoothRtt()) + ok := oldRtt-newRtt > g.config.Tolerance + if ok { + log.Debugln("[http2ping] change best route from [%v][rtt: %v] to [%v][rtt: %v]", + oldp, oldRtt, newp, newRtt) + } + return ok +} diff --git a/adapter/outboundgroup/http2ping/interface.go b/adapter/outboundgroup/http2ping/interface.go new file mode 100644 index 00000000..133d366e --- /dev/null +++ b/adapter/outboundgroup/http2ping/interface.go @@ -0,0 +1,27 @@ +package http2ping + +import ( + "context" + "net/url" + "time" + + "github.com/metacubex/mihomo/constant" +) + +type Pinger interface { + GetProxy() constant.Proxy + GetSmoothRtt() uint32 + String() string + Close() error +} + +type PingerGroup interface { + GetMinRttProxy(ctx context.Context) constant.Proxy + SetProxies(proxies []constant.Proxy) +} + +type Config struct { + Interval time.Duration + Tolerance time.Duration + HTTP2Server *url.URL +} diff --git a/adapter/outboundgroup/http2ping/pinger.go b/adapter/outboundgroup/http2ping/pinger.go new file mode 100644 index 00000000..4d02fc6d --- /dev/null +++ b/adapter/outboundgroup/http2ping/pinger.go @@ -0,0 +1,185 @@ +package http2ping + +import ( + "context" + "crypto/tls" + "fmt" + "sync/atomic" + "time" + + "github.com/metacubex/mihomo/constant" + "github.com/metacubex/mihomo/log" + "golang.org/x/net/http2" +) + +type pingerStatusCode = uint32 + +const ( + PINGER_STATUS_DEAD pingerStatusCode = iota + PINGER_STATUS_PINGING + PINGER_STATUS_IDLE +) + +const ( + rttAlpha = 0.2 + oneMinusAlpha = 1 - rttAlpha + rttBeta = 0.25 + oneMinusBeta = 1 - rttBeta +) + +func updateSRtt(sRtt, rtt uint32) uint32 { + return uint32(float32(sRtt)*oneMinusAlpha + float32(rtt)*rttAlpha) +} + +func updateMeanDeviation(meanDeviation, sRtt, rtt uint32) uint32 { + return uint32(float32(meanDeviation)*oneMinusBeta + float32(Abs(int32(sRtt)-int32(rtt)))*rttBeta) +} + +type pingerSharedStatus struct { + statusCode atomic.Uint32 + latestRtt atomic.Uint32 + sRtt atomic.Uint32 + meanDeviation atomic.Uint32 +} + +type http2Pinger struct { + pingerSharedStatus + config *Config + proxy constant.Proxy + + hasRecordedRtt atomic.Bool + newSRttCh chan uint32 + + ctx context.Context + ctxCancel context.CancelFunc + closed atomic.Bool +} + +func NewHTTP2Pinger(config *Config, proxy constant.Proxy) *http2Pinger { + ctx, cancel := context.WithCancel(context.Background()) + p := &http2Pinger{ + config: config, + proxy: proxy, + newSRttCh: make(chan uint32), + ctx: ctx, + ctxCancel: cancel, + } + p.statusCode.Store(PINGER_STATUS_DEAD) + go p.pingLoop() + return p +} + +func (p *http2Pinger) doPing(tlsConn *tls.Conn, http2Conn *http2.ClientConn) (uint32, error) { + tlsConn.SetDeadline(time.Now().Add(p.config.Interval * 5)) + defer tlsConn.SetDeadline(time.Time{}) + + start := time.Now() + err := http2Conn.Ping(p.ctx) + if err != nil { + return 0, fmt.Errorf("http2 ping: %w", err) + } + return uint32(time.Since(start).Milliseconds()), nil +} + +func (p *http2Pinger) Ping(tlsConn *tls.Conn, http2Conn *http2.ClientConn) error { + p.statusCode.Store(PINGER_STATUS_PINGING) + rtt, err := p.doPing(tlsConn, http2Conn) + if err != nil { + p.statusCode.Store(PINGER_STATUS_DEAD) + return err + } + sRtt := rtt + meanDeviation := rtt / 2 + if p.hasRecordedRtt.Load() { + sRtt = updateSRtt(p.sRtt.Load(), rtt) + meanDeviation = updateMeanDeviation(p.meanDeviation.Load(), sRtt, rtt) + } else { + p.hasRecordedRtt.Store(true) + } + log.Debugln("[http2ping] [%s], rtt: %d, sRtt: %d, meanDeviation: %d", p.proxy.Name(), rtt, sRtt, meanDeviation) + p.sRtt.Store(sRtt) + p.latestRtt.Store(rtt) + p.meanDeviation.Store(meanDeviation) + p.statusCode.Store(PINGER_STATUS_IDLE) + select { + case p.newSRttCh <- sRtt: + default: + } + return nil +} + +func (p *http2Pinger) Dial(ctx context.Context) (*tls.Conn, *http2.ClientConn, error) { + log.Debugln("[http2ping] [%s] dialing conn to %v", p.proxy.Name(), p.config.HTTP2Server) + rawConn, err := dialProxyConn(ctx, p.proxy, p.config.HTTP2Server.String()) + if err != nil { + return nil, nil, fmt.Errorf("dial proxy conn: %w", err) + } + tlsConn := tls.Client(rawConn, &tls.Config{ + ServerName: p.config.HTTP2Server.Hostname(), + NextProtos: []string{"h2"}, + }) + // set deadline for protocol handshake + tlsConn.SetDeadline(time.Now().Add(p.config.Interval * 5)) + defer tlsConn.SetDeadline(time.Time{}) + tr := http2.Transport{} + http2Conn, err := tr.NewClientConn(tlsConn) + if err != nil { + return nil, nil, fmt.Errorf("new client conn: %w", err) + } + return tlsConn, http2Conn, nil +} + +func (p *http2Pinger) pingLoop() { + loopFn := func() (err error) { + tlsConn, http2Conn, err := p.Dial(context.Background()) + if err != nil { + p.statusCode.Store(PINGER_STATUS_DEAD) + return err + } + defer http2Conn.Close() + for { + if p.closed.Load() { + return nil + } + err = p.Ping(tlsConn, http2Conn) + if err != nil { + return err + } + time.Sleep(p.config.Interval) + } + } + for { + if p.closed.Load() { + return + } + err := loopFn() + log.Debugln("[http2ping] [%s] pingLoop err: %v, wait for retry...", p.proxy.Name(), err) + time.Sleep(p.config.Interval * 5) + } +} + +func (p *http2Pinger) GetSmoothRtt() uint32 { + switch p.statusCode.Load() { + case PINGER_STATUS_DEAD: + return 0 + case PINGER_STATUS_PINGING: + fallthrough + case PINGER_STATUS_IDLE: + return p.sRtt.Load() + } + panic("unreachable") +} + +func (p *http2Pinger) GetProxy() constant.Proxy { + return p.proxy +} + +func (p *http2Pinger) Close() error { + p.closed.Store(true) + p.ctxCancel() + return nil +} + +func (p *http2Pinger) String() string { + return p.proxy.Name() +} diff --git a/adapter/outboundgroup/http2ping/server-dedup.go b/adapter/outboundgroup/http2ping/server-dedup.go new file mode 100644 index 00000000..e4cb3fc4 --- /dev/null +++ b/adapter/outboundgroup/http2ping/server-dedup.go @@ -0,0 +1,95 @@ +package http2ping + +import ( + "fmt" + "net" + "regexp" + "strings" + "sync" + "time" + + "github.com/ReneKroon/ttlcache" +) + +var ( + domainPortRegex = regexp.MustCompile(`^(.+):(\d+)$`) +) + +type dnsResolvePromise struct { + ips []net.IP + err error + doneCh chan struct{} +} + +func (d *dnsResolvePromise) Result() ([]net.IP, error) { + <-d.doneCh + return d.ips, d.err +} + +func (d *dnsResolvePromise) Fulfill(ips []net.IP, err error) { + d.ips = ips + d.err = err + close(d.doneCh) +} + +type dnsResolver struct { + mu sync.Mutex + cache *ttlcache.Cache +} + +func newDnsResolver() *dnsResolver { + cache := ttlcache.NewCache() + cache.SetTTL(time.Second * 60) + return &dnsResolver{ + cache: cache, + } +} + +func (d *dnsResolver) getCachedPromise(domain string) (*dnsResolvePromise, bool) { + d.mu.Lock() + defer d.mu.Unlock() + + if v, ok := d.cache.Get(domain); ok { + return v.(*dnsResolvePromise), true + } else { + promise := &dnsResolvePromise{ + doneCh: make(chan struct{}), + } + d.cache.Set(domain, promise) + return promise, false + } +} + +func (d *dnsResolver) resolveDomain(domain string) ([]net.IP, error) { + promise, ok := d.getCachedPromise(domain) + if ok { + return promise.Result() + } + ips, err := net.LookupIP(domain) + promise.Fulfill(ips, err) + if err != nil { + return nil, fmt.Errorf("lookup ip error: %w", err) + } + return ips, nil +} + +func (d *dnsResolver) joinIpPorts(ips []net.IP, port string) string { + var ipPorts []string + for _, ip := range ips { + ipPorts = append(ipPorts, fmt.Sprintf("%s:%s", ip.String(), port)) + } + return strings.Join(ipPorts, ",") +} + +func (d *dnsResolver) DomainPortToIpPort(serverDomainPort string) (string, error) { + matches := domainPortRegex.FindStringSubmatch(serverDomainPort) + if len(matches) != 3 { + return "", fmt.Errorf("invalid server domain: %s", serverDomainPort) + } + domain, port := matches[1], matches[2] + ips, err := d.resolveDomain(domain) + if err != nil { + return "", err + } + return d.joinIpPorts(ips, port), nil +} diff --git a/adapter/outboundgroup/http2ping/utils.go b/adapter/outboundgroup/http2ping/utils.go new file mode 100644 index 00000000..f6d81fc8 --- /dev/null +++ b/adapter/outboundgroup/http2ping/utils.go @@ -0,0 +1,60 @@ +package http2ping + +import ( + "context" + "fmt" + "net" + "net/netip" + "net/url" + "strconv" + + C "github.com/metacubex/mihomo/constant" + "golang.org/x/exp/constraints" +) + +func dialProxyConn(ctx context.Context, p C.Proxy, targetUrlString string) (net.Conn, error) { + addr, err := urlToMetadata(targetUrlString) + if err != nil { + return nil, err + } + return p.DialContext(ctx, &addr) +} + +func urlToMetadata(rawURL string) (addr C.Metadata, err error) { + u, err := url.Parse(rawURL) + if err != nil { + return + } + + port := u.Port() + if port == "" { + switch u.Scheme { + case "https": + port = "443" + case "http": + port = "80" + default: + err = fmt.Errorf("%s scheme not Support", rawURL) + return + } + } + portNum, err := strconv.Atoi(port) + if err != nil || portNum > 65535 || portNum < 0 { + err = fmt.Errorf("invalid port %s ", port) + return + } + + addr = C.Metadata{ + Host: u.Hostname(), + DstIP: netip.Addr{}, + DstPort: uint16(portNum), + } + return +} + +func Abs[T constraints.Signed](a T) T { + if a < 0 { + return -a + } + return a +} diff --git a/adapter/outboundgroup/parser.go b/adapter/outboundgroup/parser.go index 3f7f9770..6a9f5e06 100644 --- a/adapter/outboundgroup/parser.go +++ b/adapter/outboundgroup/parser.go @@ -151,6 +151,9 @@ func ParseProxyGroup(config map[string]any, proxyMap map[string]C.Proxy, provide return NewLoadBalance(groupOption, providers, strategy) case "relay": group = NewRelay(groupOption, providers) + case "http2-ping": + cfg := parseHTTP2PingOption(config) + group = NewHTTP2Ping(groupOption, providers, cfg) default: return nil, fmt.Errorf("%w: %s", errType, groupOption.Type) }