Cherrypick and merge from http2ping

This commit is contained in:
Cong Zuo 2024-01-09 16:33:50 +08:00
parent dc2108c174
commit 3031cac8a9
8 changed files with 750 additions and 0 deletions

View file

@ -0,0 +1,194 @@
package outboundgroup
import (
"context"
"encoding/json"
"fmt"
_ "net/http/pprof"
"net/url"
"reflect"
"time"
"github.com/metacubex/mihomo/adapter/outbound"
"github.com/metacubex/mihomo/adapter/outboundgroup/http2ping"
"github.com/metacubex/mihomo/common/callback"
N "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/component/dialer"
C "github.com/metacubex/mihomo/constant"
"github.com/metacubex/mihomo/constant/provider"
"github.com/metacubex/mihomo/log"
)
type HTTP2Ping struct {
*GroupBase
g http2ping.PingerGroup
cachedProxies []C.Proxy
}
func (hp *HTTP2Ping) Now() string {
proxy := hp.getBestProxy()
return proxy.Name()
}
// DialContext implements C.ProxyAdapter
func (hp *HTTP2Ping) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) {
start := time.Now()
proxy := hp.getBestProxy()
if proxy == nil {
// TODO: fix this
log.Warnln("[htt2ping] no proxy available, dial direct to %v", metadata)
direct := outbound.NewDirect()
return direct.DialContext(ctx, metadata, hp.Base.DialOptions(opts...)...)
}
if cost := time.Since(start).Milliseconds(); cost > 0 {
log.Warnln("[htt2ping] getBestProxy took %d ms to %v", cost, metadata)
}
c, err := proxy.DialContext(ctx, metadata, hp.Base.DialOptions(opts...)...)
if err == nil {
c.AppendToChains(hp)
} else {
hp.onDialFailed(proxy.Type(), err)
}
if N.NeedHandshake(c) {
c = callback.NewFirstWriteCallBackConn(c, func(err error) {
if err == nil {
hp.onDialSuccess()
} else {
hp.onDialFailed(proxy.Type(), err)
}
})
}
return c, err
}
// ListenPacketContext implements C.ProxyAdapter
func (hp *HTTP2Ping) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.PacketConn, error) {
proxy := hp.getBestProxy()
pc, err := proxy.ListenPacketContext(ctx, metadata, hp.Base.DialOptions(opts...)...)
if err == nil {
pc.AppendToChains(hp)
}
return pc, err
}
// SupportUDP implements C.ProxyAdapter
func (hp *HTTP2Ping) SupportUDP() bool {
proxy := hp.getBestProxy()
return proxy.SupportUDP()
}
// IsL3Protocol implements C.ProxyAdapter
func (hp *HTTP2Ping) IsL3Protocol(metadata *C.Metadata) bool {
return hp.getBestProxy().IsL3Protocol(metadata)
}
// MarshalJSON implements C.ProxyAdapter
func (hp *HTTP2Ping) MarshalJSON() ([]byte, error) {
all := []string{}
for _, proxy := range hp.GetProxies(false) {
all = append(all, proxy.Name())
}
return json.Marshal(map[string]any{
"type": hp.Type().String(),
"now": hp.Now(),
"all": all,
})
}
// Unwrap implements C.ProxyAdapter
func (hp *HTTP2Ping) Unwrap(metadata *C.Metadata, touch bool) C.Proxy {
proxy := hp.getBestProxy()
return proxy
}
func (hp *HTTP2Ping) Set(name string) error {
return fmt.Errorf("not implemented")
}
func (hp *HTTP2Ping) ForceSet(name string) {
log.Warnln("not implemented")
}
func (hp *HTTP2Ping) getBestProxy() C.Proxy {
return hp.g.GetMinRttProxy(context.TODO())
}
func (hp *HTTP2Ping) pollForProviderProxiesUpdate(providers []provider.ProxyProvider) {
// TODO: use dynamic fallback timer
ticker := time.NewTicker(time.Second)
for range ticker.C {
proxies := hp.GetProxies(true)
if reflect.DeepEqual(proxies, hp.cachedProxies) {
continue
}
hp.cachedProxies = proxies
hp.g.SetProxies(proxies)
}
}
func NewHTTP2Ping(option *GroupCommonOption, providers []provider.ProxyProvider, cfg *http2ping.Config) *HTTP2Ping {
hp := &HTTP2Ping{
GroupBase: NewGroupBase(GroupBaseOption{
outbound.BaseOption{
Name: option.Name,
Type: C.Fallback,
Interface: option.Interface,
RoutingMark: option.RoutingMark,
},
option.Filter,
option.ExcludeFilter,
option.ExcludeType,
providers,
}),
g: http2ping.NewHTTP2PingGroup(cfg),
}
go hp.pollForProviderProxiesUpdate(providers)
return hp
}
func parseHTTP2PingOption(m map[string]any) *http2ping.Config {
config := http2ping.Config{}
interval := 1000
if v, ok := m["interval"]; ok {
if i, ok := v.(int); ok {
if i <= 0 {
panic("`interval` must be greater than zero")
}
interval = i
}
}
config.Interval = time.Millisecond * time.Duration(interval)
tolerance := 0
if v, ok := m["tolerance"]; ok {
if t, ok := v.(int); ok {
if t < 0 {
panic("`tolerance` can't be negative number")
}
tolerance = t
}
}
config.Tolerance = time.Millisecond * time.Duration(tolerance)
// For testing the usage of http2 server, using cli tool `h2i`:
//
// # h2i google.com
// # ping
server := "https://cloudflare.com"
if v, ok := m["server"]; ok {
if s, ok := v.(string); ok {
server = s
}
}
if u, err := url.Parse(server); err != nil {
panic("invalid http2ping server: " + server)
} else {
config.HTTP2Server = u
}
return &config
}

View file

@ -0,0 +1,54 @@
# http2ping for Clash.META
使用 HTTP2 Ping Frame 监测链路 rtt, 并从中选择 rtt 最优的链路
## 为什么
相比于`url-test`, http2ping 针对每个 endpoint 建立一条 HTTP2 长连接,
避免了频繁建立/断开连接,
因此我们可以使用更低的 interval(1s) 进行接近实时的 rtt 监测.
相比于使用`ICMP ping`进行延迟检测, 对于某些使用中转服务的网络接入供应商,
ICMP packets 只能检测`用户->中转->落地`这条链路的第一部分而非整条链路的完整 RTT.
相比于使用`http://www.gstatic.com/generate_204`这类常见的基于 HTTP 的 health check,
如果使用 HTTP 协议, 部分鸡贼的网络接入供应商会在中转服务器进行 MITM 直接返回 HTTP 204 response, 以试图欺骗客户.
## 配置
```YAML
# enable verbose logging for more infomation
log-level: debug
proxy-groups:
- name: min-rtt-group
type: http2-ping
filter: "hk"
use:
- airport_1
# interval milliseconds for sending Ping frame, default value: 1000ms
interval: 1000
# tolerance for changing current best route, default value: 0ms
tolerance: 0
# target server, default server: https://cloudflare.com
server: https://cloudflare.com
```
## 测试
For debugging:
```bash
#!/bin/bash
interface=enp1s0
ip=1.1.1.1
delay=100ms
# add latency to ip address
tc qdisc add dev $interface root handle 1: prio
tc filter add dev $interface parent 1:0 protocol ip prio 1 u32 match ip dst $ip flowid 2:1
tc qdisc add dev $interface parent 1:1 handle 2: netem delay $delay
# remove tc rules
tc qdisc del dev $interface root
```

View file

@ -0,0 +1,132 @@
package http2ping
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/metacubex/mihomo/constant"
"github.com/metacubex/mihomo/log"
"golang.org/x/exp/maps"
)
var _ PingerGroup = (*http2PingGroup)(nil)
type http2PingGroup struct {
mu sync.RWMutex
pingers map[string]Pinger
resolver *dnsResolver
dieCh chan struct{}
config *Config
best atomic.Value
}
func NewHTTP2PingGroup(config *Config) PingerGroup {
g := &http2PingGroup{
pingers: make(map[string]Pinger),
resolver: newDnsResolver(),
dieCh: make(chan struct{}),
config: config,
}
go g.loop(config.Interval)
return g
}
func (g *http2PingGroup) SetProxies(proxies []constant.Proxy) {
g.mu.Lock()
defer g.mu.Unlock()
newPingers := make(map[string]Pinger)
oldPingers := g.pingers
for _, proxy := range proxies {
key, err := g.resolver.DomainPortToIpPort(proxy.Addr())
if err != nil {
log.Errorln("[http2ping] resolve domain error for %s: %v", proxy.Addr(), err)
continue
}
if _, ok := newPingers[key]; ok {
log.Debugln("[http2ping] duplicate proxy [%s] with addr: %s", proxy.Addr(), key)
continue
}
if _, ok := oldPingers[key]; !ok {
log.Infoln("[http2ping] add proxy [%s:%s] with addr: %s", proxy.Name(), proxy.Addr(), key)
newPingers[key] = NewHTTP2Pinger(g.config, proxy)
} else {
newPingers[key] = oldPingers[key]
delete(oldPingers, key)
}
}
for _, deadp := range oldPingers {
deadp.Close()
}
g.pingers = newPingers
}
func (g *http2PingGroup) getPingersCopy() map[string]Pinger {
g.mu.RLock()
defer g.mu.RUnlock()
return maps.Clone(g.pingers)
}
func (g *http2PingGroup) loop(interval time.Duration) {
ticker := time.NewTicker(interval)
for {
select {
case <-g.dieCh:
return
case <-ticker.C:
var newBest Pinger
minRtt := uint32(1<<31 - 1)
for _, p := range g.getPingersCopy() {
if rtt := p.GetSmoothRtt(); rtt > 0 && rtt < minRtt {
minRtt = rtt
newBest = p
}
}
oldBest := g.loadBest()
if g.foundBetterProxy(oldBest, newBest) {
g.best.Store(newBest)
}
}
}
}
func (g *http2PingGroup) GetMinRttProxy(ctx context.Context) constant.Proxy {
if p := g.loadBest(); p != nil {
return p.GetProxy()
}
return nil
}
func (g *http2PingGroup) Close() error {
close(g.dieCh)
return nil
}
func (g *http2PingGroup) loadBest() Pinger {
if ptr := g.best.Load(); ptr != nil {
if p, ok := ptr.(Pinger); ok {
return p
}
}
return nil
}
func (g *http2PingGroup) foundBetterProxy(oldp, newp Pinger) bool {
if oldp == nil && newp != nil {
return true
}
if oldp == newp || newp == nil {
return false
}
oldRtt := time.Millisecond * time.Duration(oldp.GetSmoothRtt())
newRtt := time.Millisecond * time.Duration(newp.GetSmoothRtt())
ok := oldRtt-newRtt > g.config.Tolerance
if ok {
log.Debugln("[http2ping] change best route from [%v][rtt: %v] to [%v][rtt: %v]",
oldp, oldRtt, newp, newRtt)
}
return ok
}

View file

@ -0,0 +1,27 @@
package http2ping
import (
"context"
"net/url"
"time"
"github.com/metacubex/mihomo/constant"
)
type Pinger interface {
GetProxy() constant.Proxy
GetSmoothRtt() uint32
String() string
Close() error
}
type PingerGroup interface {
GetMinRttProxy(ctx context.Context) constant.Proxy
SetProxies(proxies []constant.Proxy)
}
type Config struct {
Interval time.Duration
Tolerance time.Duration
HTTP2Server *url.URL
}

View file

@ -0,0 +1,185 @@
package http2ping
import (
"context"
"crypto/tls"
"fmt"
"sync/atomic"
"time"
"github.com/metacubex/mihomo/constant"
"github.com/metacubex/mihomo/log"
"golang.org/x/net/http2"
)
type pingerStatusCode = uint32
const (
PINGER_STATUS_DEAD pingerStatusCode = iota
PINGER_STATUS_PINGING
PINGER_STATUS_IDLE
)
const (
rttAlpha = 0.2
oneMinusAlpha = 1 - rttAlpha
rttBeta = 0.25
oneMinusBeta = 1 - rttBeta
)
func updateSRtt(sRtt, rtt uint32) uint32 {
return uint32(float32(sRtt)*oneMinusAlpha + float32(rtt)*rttAlpha)
}
func updateMeanDeviation(meanDeviation, sRtt, rtt uint32) uint32 {
return uint32(float32(meanDeviation)*oneMinusBeta + float32(Abs(int32(sRtt)-int32(rtt)))*rttBeta)
}
type pingerSharedStatus struct {
statusCode atomic.Uint32
latestRtt atomic.Uint32
sRtt atomic.Uint32
meanDeviation atomic.Uint32
}
type http2Pinger struct {
pingerSharedStatus
config *Config
proxy constant.Proxy
hasRecordedRtt atomic.Bool
newSRttCh chan uint32
ctx context.Context
ctxCancel context.CancelFunc
closed atomic.Bool
}
func NewHTTP2Pinger(config *Config, proxy constant.Proxy) *http2Pinger {
ctx, cancel := context.WithCancel(context.Background())
p := &http2Pinger{
config: config,
proxy: proxy,
newSRttCh: make(chan uint32),
ctx: ctx,
ctxCancel: cancel,
}
p.statusCode.Store(PINGER_STATUS_DEAD)
go p.pingLoop()
return p
}
func (p *http2Pinger) doPing(tlsConn *tls.Conn, http2Conn *http2.ClientConn) (uint32, error) {
tlsConn.SetDeadline(time.Now().Add(p.config.Interval * 5))
defer tlsConn.SetDeadline(time.Time{})
start := time.Now()
err := http2Conn.Ping(p.ctx)
if err != nil {
return 0, fmt.Errorf("http2 ping: %w", err)
}
return uint32(time.Since(start).Milliseconds()), nil
}
func (p *http2Pinger) Ping(tlsConn *tls.Conn, http2Conn *http2.ClientConn) error {
p.statusCode.Store(PINGER_STATUS_PINGING)
rtt, err := p.doPing(tlsConn, http2Conn)
if err != nil {
p.statusCode.Store(PINGER_STATUS_DEAD)
return err
}
sRtt := rtt
meanDeviation := rtt / 2
if p.hasRecordedRtt.Load() {
sRtt = updateSRtt(p.sRtt.Load(), rtt)
meanDeviation = updateMeanDeviation(p.meanDeviation.Load(), sRtt, rtt)
} else {
p.hasRecordedRtt.Store(true)
}
log.Debugln("[http2ping] [%s], rtt: %d, sRtt: %d, meanDeviation: %d", p.proxy.Name(), rtt, sRtt, meanDeviation)
p.sRtt.Store(sRtt)
p.latestRtt.Store(rtt)
p.meanDeviation.Store(meanDeviation)
p.statusCode.Store(PINGER_STATUS_IDLE)
select {
case p.newSRttCh <- sRtt:
default:
}
return nil
}
func (p *http2Pinger) Dial(ctx context.Context) (*tls.Conn, *http2.ClientConn, error) {
log.Debugln("[http2ping] [%s] dialing conn to %v", p.proxy.Name(), p.config.HTTP2Server)
rawConn, err := dialProxyConn(ctx, p.proxy, p.config.HTTP2Server.String())
if err != nil {
return nil, nil, fmt.Errorf("dial proxy conn: %w", err)
}
tlsConn := tls.Client(rawConn, &tls.Config{
ServerName: p.config.HTTP2Server.Hostname(),
NextProtos: []string{"h2"},
})
// set deadline for protocol handshake
tlsConn.SetDeadline(time.Now().Add(p.config.Interval * 5))
defer tlsConn.SetDeadline(time.Time{})
tr := http2.Transport{}
http2Conn, err := tr.NewClientConn(tlsConn)
if err != nil {
return nil, nil, fmt.Errorf("new client conn: %w", err)
}
return tlsConn, http2Conn, nil
}
func (p *http2Pinger) pingLoop() {
loopFn := func() (err error) {
tlsConn, http2Conn, err := p.Dial(context.Background())
if err != nil {
p.statusCode.Store(PINGER_STATUS_DEAD)
return err
}
defer http2Conn.Close()
for {
if p.closed.Load() {
return nil
}
err = p.Ping(tlsConn, http2Conn)
if err != nil {
return err
}
time.Sleep(p.config.Interval)
}
}
for {
if p.closed.Load() {
return
}
err := loopFn()
log.Debugln("[http2ping] [%s] pingLoop err: %v, wait for retry...", p.proxy.Name(), err)
time.Sleep(p.config.Interval * 5)
}
}
func (p *http2Pinger) GetSmoothRtt() uint32 {
switch p.statusCode.Load() {
case PINGER_STATUS_DEAD:
return 0
case PINGER_STATUS_PINGING:
fallthrough
case PINGER_STATUS_IDLE:
return p.sRtt.Load()
}
panic("unreachable")
}
func (p *http2Pinger) GetProxy() constant.Proxy {
return p.proxy
}
func (p *http2Pinger) Close() error {
p.closed.Store(true)
p.ctxCancel()
return nil
}
func (p *http2Pinger) String() string {
return p.proxy.Name()
}

View file

@ -0,0 +1,95 @@
package http2ping
import (
"fmt"
"net"
"regexp"
"strings"
"sync"
"time"
"github.com/ReneKroon/ttlcache"
)
var (
domainPortRegex = regexp.MustCompile(`^(.+):(\d+)$`)
)
type dnsResolvePromise struct {
ips []net.IP
err error
doneCh chan struct{}
}
func (d *dnsResolvePromise) Result() ([]net.IP, error) {
<-d.doneCh
return d.ips, d.err
}
func (d *dnsResolvePromise) Fulfill(ips []net.IP, err error) {
d.ips = ips
d.err = err
close(d.doneCh)
}
type dnsResolver struct {
mu sync.Mutex
cache *ttlcache.Cache
}
func newDnsResolver() *dnsResolver {
cache := ttlcache.NewCache()
cache.SetTTL(time.Second * 60)
return &dnsResolver{
cache: cache,
}
}
func (d *dnsResolver) getCachedPromise(domain string) (*dnsResolvePromise, bool) {
d.mu.Lock()
defer d.mu.Unlock()
if v, ok := d.cache.Get(domain); ok {
return v.(*dnsResolvePromise), true
} else {
promise := &dnsResolvePromise{
doneCh: make(chan struct{}),
}
d.cache.Set(domain, promise)
return promise, false
}
}
func (d *dnsResolver) resolveDomain(domain string) ([]net.IP, error) {
promise, ok := d.getCachedPromise(domain)
if ok {
return promise.Result()
}
ips, err := net.LookupIP(domain)
promise.Fulfill(ips, err)
if err != nil {
return nil, fmt.Errorf("lookup ip error: %w", err)
}
return ips, nil
}
func (d *dnsResolver) joinIpPorts(ips []net.IP, port string) string {
var ipPorts []string
for _, ip := range ips {
ipPorts = append(ipPorts, fmt.Sprintf("%s:%s", ip.String(), port))
}
return strings.Join(ipPorts, ",")
}
func (d *dnsResolver) DomainPortToIpPort(serverDomainPort string) (string, error) {
matches := domainPortRegex.FindStringSubmatch(serverDomainPort)
if len(matches) != 3 {
return "", fmt.Errorf("invalid server domain: %s", serverDomainPort)
}
domain, port := matches[1], matches[2]
ips, err := d.resolveDomain(domain)
if err != nil {
return "", err
}
return d.joinIpPorts(ips, port), nil
}

View file

@ -0,0 +1,60 @@
package http2ping
import (
"context"
"fmt"
"net"
"net/netip"
"net/url"
"strconv"
C "github.com/metacubex/mihomo/constant"
"golang.org/x/exp/constraints"
)
func dialProxyConn(ctx context.Context, p C.Proxy, targetUrlString string) (net.Conn, error) {
addr, err := urlToMetadata(targetUrlString)
if err != nil {
return nil, err
}
return p.DialContext(ctx, &addr)
}
func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
u, err := url.Parse(rawURL)
if err != nil {
return
}
port := u.Port()
if port == "" {
switch u.Scheme {
case "https":
port = "443"
case "http":
port = "80"
default:
err = fmt.Errorf("%s scheme not Support", rawURL)
return
}
}
portNum, err := strconv.Atoi(port)
if err != nil || portNum > 65535 || portNum < 0 {
err = fmt.Errorf("invalid port %s ", port)
return
}
addr = C.Metadata{
Host: u.Hostname(),
DstIP: netip.Addr{},
DstPort: uint16(portNum),
}
return
}
func Abs[T constraints.Signed](a T) T {
if a < 0 {
return -a
}
return a
}

View file

@ -151,6 +151,9 @@ func ParseProxyGroup(config map[string]any, proxyMap map[string]C.Proxy, provide
return NewLoadBalance(groupOption, providers, strategy)
case "relay":
group = NewRelay(groupOption, providers)
case "http2-ping":
cfg := parseHTTP2PingOption(config)
group = NewHTTP2Ping(groupOption, providers, cfg)
default:
return nil, fmt.Errorf("%w: %s", errType, groupOption.Type)
}