diff --git a/adapter/outboundgroup/http2ping.go b/adapter/outboundgroup/http2ping.go index 0c52a004..8fd1d237 100644 --- a/adapter/outboundgroup/http2ping.go +++ b/adapter/outboundgroup/http2ping.go @@ -92,7 +92,8 @@ func (hp *HTTP2Ping) MarshalJSON() ([]byte, error) { all = append(all, proxy.Name()) } return json.Marshal(map[string]any{ - "type": hp.Type().String(), + // "type": hp.Type().String(), + "type": "HTTP2Ping", "now": hp.Now(), "all": all, }) @@ -192,3 +193,30 @@ func parseHTTP2PingOption(m map[string]any) *http2ping.Config { } return &config } + +func (hp *HTTP2Ping) SubscribeCurrentGroupStatus(ctx context.Context, ch chan<- *http2ping.GroupStatus) { + // TODO: cache the result? + getGroupStatus := func() *http2ping.GroupStatus { + groupStatus := &http2ping.GroupStatus{ + Name: hp.Name(), + Proxies: make([]*http2ping.PingerStatus, 0), + } + for _, p := range hp.g.GetPingersCopy() { + groupStatus.Proxies = append(groupStatus.Proxies, p.GetStatus()) + } + return groupStatus + } + + go func() { + // choose a slighter longer interval to wait for all proxies got updated + ticker := time.NewTicker(hp.g.GetConfig().Interval * 2) + defer ticker.Stop() + for range ticker.C { + select { + case <-ctx.Done(): + return + case ch <- getGroupStatus(): + } + } + }() +} diff --git a/adapter/outboundgroup/http2ping/group.go b/adapter/outboundgroup/http2ping/group.go index d1cbfced..9d2286fe 100644 --- a/adapter/outboundgroup/http2ping/group.go +++ b/adapter/outboundgroup/http2ping/group.go @@ -69,7 +69,11 @@ func (g *http2PingGroup) SetProxies(proxies []constant.Proxy) { g.pingers = newPingers } -func (g *http2PingGroup) getPingersCopy() map[string]Pinger { +func (g *http2PingGroup) GetConfig() *Config { + return g.config +} + +func (g *http2PingGroup) GetPingersCopy() map[string]Pinger { g.mu.RLock() defer g.mu.RUnlock() @@ -78,6 +82,7 @@ func (g *http2PingGroup) getPingersCopy() map[string]Pinger { func (g *http2PingGroup) loop(interval time.Duration) { ticker := time.NewTicker(interval) + defer ticker.Stop() for { select { case <-g.dieCh: @@ -85,7 +90,7 @@ func (g *http2PingGroup) loop(interval time.Duration) { case <-ticker.C: var newBest Pinger minRtt := uint32(1<<31 - 1) - for _, p := range g.getPingersCopy() { + for _, p := range g.GetPingersCopy() { if rtt := p.GetSmoothRtt(); rtt > 0 && rtt < minRtt { minRtt = rtt newBest = p diff --git a/adapter/outboundgroup/http2ping/interface.go b/adapter/outboundgroup/http2ping/interface.go index 133d366e..b8e14373 100644 --- a/adapter/outboundgroup/http2ping/interface.go +++ b/adapter/outboundgroup/http2ping/interface.go @@ -11,13 +11,16 @@ import ( type Pinger interface { GetProxy() constant.Proxy GetSmoothRtt() uint32 + GetStatus() *PingerStatus String() string Close() error } type PingerGroup interface { + GetConfig() *Config GetMinRttProxy(ctx context.Context) constant.Proxy SetProxies(proxies []constant.Proxy) + GetPingersCopy() map[string]Pinger } type Config struct { @@ -25,3 +28,16 @@ type Config struct { Tolerance time.Duration HTTP2Server *url.URL } + +type PingerStatus struct { + Name string `json:"name"` + StatusCode uint32 `json:"status-code"` + LatestRtt uint32 `json:"latest-rtt"` + SRtt uint32 `json:"srtt"` + MeanDeviation uint32 `json:"mean-deviation"` +} + +type GroupStatus struct { + Name string `json:"name"` + Proxies []*PingerStatus `json:"proxies"` +} diff --git a/adapter/outboundgroup/http2ping/pinger.go b/adapter/outboundgroup/http2ping/pinger.go index 4d02fc6d..a81fd063 100644 --- a/adapter/outboundgroup/http2ping/pinger.go +++ b/adapter/outboundgroup/http2ping/pinger.go @@ -35,15 +35,12 @@ func updateMeanDeviation(meanDeviation, sRtt, rtt uint32) uint32 { return uint32(float32(meanDeviation)*oneMinusBeta + float32(Abs(int32(sRtt)-int32(rtt)))*rttBeta) } -type pingerSharedStatus struct { +type http2Pinger struct { statusCode atomic.Uint32 latestRtt atomic.Uint32 sRtt atomic.Uint32 meanDeviation atomic.Uint32 -} -type http2Pinger struct { - pingerSharedStatus config *Config proxy constant.Proxy @@ -183,3 +180,13 @@ func (p *http2Pinger) Close() error { func (p *http2Pinger) String() string { return p.proxy.Name() } + +func (p *http2Pinger) GetStatus() *PingerStatus { + return &PingerStatus{ + Name: p.GetProxy().Name(), + StatusCode: p.statusCode.Load(), + LatestRtt: p.latestRtt.Load(), + SRtt: p.GetSmoothRtt(), + MeanDeviation: p.meanDeviation.Load(), + } +} diff --git a/hub/route/server.go b/hub/route/server.go index 8e7f225f..406410b0 100644 --- a/hub/route/server.go +++ b/hub/route/server.go @@ -2,6 +2,7 @@ package route import ( "bytes" + "context" "crypto/subtle" "crypto/tls" "encoding/json" @@ -11,11 +12,15 @@ import ( "strings" "time" + "github.com/metacubex/mihomo/adapter" "github.com/metacubex/mihomo/adapter/inbound" + "github.com/metacubex/mihomo/adapter/outboundgroup" + "github.com/metacubex/mihomo/adapter/outboundgroup/http2ping" CN "github.com/metacubex/mihomo/common/net" "github.com/metacubex/mihomo/common/utils" C "github.com/metacubex/mihomo/constant" "github.com/metacubex/mihomo/log" + "github.com/metacubex/mihomo/tunnel" "github.com/metacubex/mihomo/tunnel/statistic" "github.com/go-chi/chi/v5" @@ -83,6 +88,7 @@ func Start(addr string, tlsAddr string, secret string, r.Get("/traffic", traffic) r.Get("/memory", memory) r.Get("/version", version) + r.Get("/http2ping", http2pingStatus) r.Mount("/configs", configRouter()) r.Mount("/proxies", proxyRouter()) r.Mount("/group", GroupRouter()) @@ -199,6 +205,57 @@ func authentication(next http.Handler) http.Handler { return http.HandlerFunc(fn) } +func http2pingStatus(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var hp *outboundgroup.HTTP2Ping + for _, proxy := range tunnel.Proxies() { + if group, ok := proxy.(*adapter.Proxy).ProxyAdapter.(C.Group); ok { + if v, ok := group.(*outboundgroup.HTTP2Ping); ok { + hp = v + break + } + } + } + ch := make(chan *http2ping.GroupStatus, 1) + hp.SubscribeCurrentGroupStatus(ctx, ch) + + var wsConn net.Conn + if r.Header.Get("Upgrade") == "websocket" { + var err error + wsConn, _, _, err = ws.UpgradeHTTP(r, w) + if err != nil { + return + } + } + + if wsConn == nil { + w.Header().Set("Content-Type", "application/json") + render.Status(r, http.StatusOK) + } + + buf := &bytes.Buffer{} + var err error + for status := range ch { + buf.Reset() + if err := json.NewEncoder(buf).Encode(status); err != nil { + break + } + + if wsConn == nil { + _, err = w.Write(buf.Bytes()) + w.(http.Flusher).Flush() + } else { + err = wsutil.WriteMessage(wsConn, ws.StateServerSide, ws.OpText, buf.Bytes()) + } + + if err != nil { + break + } + } +} + func hello(w http.ResponseWriter, r *http.Request) { render.JSON(w, r, render.M{"hello": "mihomo"}) }