chore: using SetupContextForConn to reduce the DialContext cannot be cancelled

This commit is contained in:
wwqgtxx 2025-04-12 11:19:03 +08:00
parent 7a260f7bcf
commit cedb36df5f
9 changed files with 107 additions and 52 deletions

View file

@ -7,11 +7,11 @@ import (
"encoding/base64"
"errors"
"fmt"
"io"
"net"
"net/http"
"strconv"
N "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/component/ca"
"github.com/metacubex/mihomo/component/dialer"
"github.com/metacubex/mihomo/component/proxydialer"
@ -51,7 +51,7 @@ func (h *Http) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.Me
}
}
if err := h.shakeHand(metadata, c); err != nil {
if err := h.shakeHandContext(ctx, c, metadata); err != nil {
return nil, err
}
return c, nil
@ -99,7 +99,12 @@ func (h *Http) ProxyInfo() C.ProxyInfo {
return info
}
func (h *Http) shakeHand(metadata *C.Metadata, rw io.ReadWriter) error {
func (h *Http) shakeHandContext(ctx context.Context, c net.Conn, metadata *C.Metadata) (err error) {
if ctx.Done() != nil {
done := N.SetupContextForConn(ctx, c)
defer done(&err)
}
addr := metadata.RemoteAddress()
HeaderString := "CONNECT " + addr + " HTTP/1.1\r\n"
tempHeaders := map[string]string{
@ -123,13 +128,13 @@ func (h *Http) shakeHand(metadata *C.Metadata, rw io.ReadWriter) error {
HeaderString += "\r\n"
_, err := rw.Write([]byte(HeaderString))
_, err = c.Write([]byte(HeaderString))
if err != nil {
return err
}
resp, err := http.ReadResponse(bufio.NewReader(rw), nil)
resp, err := http.ReadResponse(bufio.NewReader(c), nil)
if err != nil {
return err

View file

@ -100,7 +100,7 @@ type restlsOption struct {
}
// StreamConnContext implements C.ProxyAdapter
func (ss *ShadowSocks) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.Metadata) (net.Conn, error) {
func (ss *ShadowSocks) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.Metadata) (_ net.Conn, err error) {
useEarly := false
switch ss.obfsMode {
case "tls":
@ -109,7 +109,6 @@ func (ss *ShadowSocks) StreamConnContext(ctx context.Context, c net.Conn, metada
_, port, _ := net.SplitHostPort(ss.addr)
c = obfs.NewHTTPObfs(c, ss.obfsOption.Host, port)
case "websocket":
var err error
if ss.v2rayOption != nil {
c, err = v2rayObfs.NewV2rayObfs(ctx, c, ss.v2rayOption)
} else if ss.gostOption != nil {
@ -121,14 +120,12 @@ func (ss *ShadowSocks) StreamConnContext(ctx context.Context, c net.Conn, metada
return nil, fmt.Errorf("%s connect error: %w", ss.addr, err)
}
case shadowtls.Mode:
var err error
c, err = shadowtls.NewShadowTLS(ctx, c, ss.shadowTLSOption)
if err != nil {
return nil, err
}
useEarly = true
case restls.Mode:
var err error
c, err = restls.NewRestls(ctx, c, ss.restlsConfig)
if err != nil {
return nil, fmt.Errorf("%s (restls) connect error: %w", ss.addr, err)
@ -136,6 +133,12 @@ func (ss *ShadowSocks) StreamConnContext(ctx context.Context, c net.Conn, metada
useEarly = true
}
useEarly = useEarly || N.NeedHandshake(c)
if !useEarly {
if ctx.Done() != nil {
done := N.SetupContextForConn(ctx, c)
defer done(&err)
}
}
if metadata.NetWork == C.UDP && ss.option.UDPOverTCP {
uotDestination := uot.RequestDestination(uint8(ss.option.UDPOverTCPVersion))
if useEarly {

View file

@ -42,12 +42,15 @@ type ShadowSocksROption struct {
}
// StreamConnContext implements C.ProxyAdapter
func (ssr *ShadowSocksR) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.Metadata) (net.Conn, error) {
func (ssr *ShadowSocksR) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.Metadata) (_ net.Conn, err error) {
if ctx.Done() != nil {
done := N.SetupContextForConn(ctx, c)
defer done(&err)
}
c = ssr.obfs.StreamConn(c)
c = ssr.cipher.StreamConn(c)
var (
iv []byte
err error
iv []byte
)
switch conn := c.(type) {
case *shadowstream.Conn:

View file

@ -6,6 +6,7 @@ import (
"net"
"strconv"
N "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/common/structure"
"github.com/metacubex/mihomo/component/dialer"
"github.com/metacubex/mihomo/component/proxydialer"
@ -41,7 +42,7 @@ type streamOption struct {
obfsOption *simpleObfsOption
}
func streamConn(c net.Conn, option streamOption) *snell.Snell {
func snellStreamConn(c net.Conn, option streamOption) *snell.Snell {
switch option.obfsOption.Mode {
case "tls":
c = obfs.NewTLSObfs(c, option.obfsOption.Host)
@ -54,15 +55,25 @@ func streamConn(c net.Conn, option streamOption) *snell.Snell {
// StreamConnContext implements C.ProxyAdapter
func (s *Snell) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.Metadata) (net.Conn, error) {
c = streamConn(c, streamOption{s.psk, s.version, s.addr, s.obfsOption})
if metadata.NetWork == C.UDP {
err := snell.WriteUDPHeader(c, s.version)
return c, err
}
err := snell.WriteHeader(c, metadata.String(), uint(metadata.DstPort), s.version)
c = snellStreamConn(c, streamOption{s.psk, s.version, s.addr, s.obfsOption})
err := s.writeHeaderContext(ctx, c, metadata)
return c, err
}
func (s *Snell) writeHeaderContext(ctx context.Context, c net.Conn, metadata *C.Metadata) (err error) {
if ctx.Done() != nil {
done := N.SetupContextForConn(ctx, c)
defer done(&err)
}
if metadata.NetWork == C.UDP {
err = snell.WriteUDPHeader(c, s.version)
return
}
err = snell.WriteHeader(c, metadata.String(), uint(metadata.DstPort), s.version)
return
}
// DialContext implements C.ProxyAdapter
func (s *Snell) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.Conn, err error) {
if s.version == snell.Version2 && len(opts) == 0 {
@ -71,8 +82,8 @@ func (s *Snell) DialContext(ctx context.Context, metadata *C.Metadata, opts ...d
return nil, err
}
if err = snell.WriteHeader(c, metadata.String(), uint(metadata.DstPort), s.version); err != nil {
c.Close()
if err = s.writeHeaderContext(ctx, c, metadata); err != nil {
_ = c.Close()
return nil, err
}
return NewConn(c, s), err
@ -120,12 +131,8 @@ func (s *Snell) ListenPacketWithDialer(ctx context.Context, dialer C.Dialer, met
if err != nil {
return nil, err
}
c = streamConn(c, streamOption{s.psk, s.version, s.addr, s.obfsOption})
err = snell.WriteUDPHeader(c, s.version)
if err != nil {
return nil, err
}
c, err = s.StreamConnContext(ctx, c, metadata)
pc := snell.PacketConn(c)
return newPacketConn(pc, s), nil
@ -212,7 +219,7 @@ func NewSnell(option SnellOption) (*Snell, error) {
return nil, err
}
return streamConn(c, streamOption{psk, option.Version, addr, obfsOption}), nil
return snellStreamConn(c, streamOption{psk, option.Version, addr, obfsOption}), nil
})
}
return s, nil

View file

@ -10,6 +10,7 @@ import (
"net/netip"
"strconv"
N "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/component/ca"
"github.com/metacubex/mihomo/component/dialer"
"github.com/metacubex/mihomo/component/proxydialer"
@ -58,7 +59,7 @@ func (ss *Socks5) StreamConnContext(ctx context.Context, c net.Conn, metadata *C
Password: ss.pass,
}
}
if _, err := socks5.ClientHandshake(c, serializesSocksAddr(metadata), socks5.CmdConnect, user); err != nil {
if _, err := ss.clientHandshakeContext(ctx, c, serializesSocksAddr(metadata), socks5.CmdConnect, user); err != nil {
return nil, err
}
return c, nil
@ -135,7 +136,7 @@ func (ss *Socks5) ListenPacketContext(ctx context.Context, metadata *C.Metadata,
}
udpAssocateAddr := socks5.AddrFromStdAddrPort(netip.AddrPortFrom(netip.IPv4Unspecified(), 0))
bindAddr, err := socks5.ClientHandshake(c, udpAssocateAddr, socks5.CmdUDPAssociate, user)
bindAddr, err := ss.clientHandshakeContext(ctx, c, udpAssocateAddr, socks5.CmdUDPAssociate, user)
if err != nil {
err = fmt.Errorf("client hanshake error: %w", err)
return
@ -178,6 +179,14 @@ func (ss *Socks5) ProxyInfo() C.ProxyInfo {
return info
}
func (ss *Socks5) clientHandshakeContext(ctx context.Context, c net.Conn, addr socks5.Addr, command socks5.Command, user *socks5.User) (_ socks5.Addr, err error) {
if ctx.Done() != nil {
done := N.SetupContextForConn(ctx, c)
defer done(&err)
}
return socks5.ClientHandshake(c, addr, command, user)
}
func NewSocks5(option Socks5Option) (*Socks5, error) {
var tlsConfig *tls.Config
if option.TLS {

View file

@ -9,6 +9,7 @@ import (
"net/http"
"strconv"
N "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/component/ca"
"github.com/metacubex/mihomo/component/dialer"
"github.com/metacubex/mihomo/component/proxydialer"
@ -110,14 +111,23 @@ func (t *Trojan) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.
c = t.ssCipher.StreamConn(c)
}
if metadata.NetWork == C.UDP {
err = t.instance.WriteHeader(c, trojan.CommandUDP, serializesSocksAddr(metadata))
return c, err
}
err = t.instance.WriteHeader(c, trojan.CommandTCP, serializesSocksAddr(metadata))
err = t.writeHeaderContext(ctx, c, metadata)
return c, err
}
func (t *Trojan) writeHeaderContext(ctx context.Context, c net.Conn, metadata *C.Metadata) (err error) {
if ctx.Done() != nil {
done := N.SetupContextForConn(ctx, c)
defer done(&err)
}
command := trojan.CommandTCP
if metadata.NetWork == C.UDP {
command = trojan.CommandUDP
}
err = t.instance.WriteHeader(c, command, serializesSocksAddr(metadata))
return err
}
// DialContext implements C.ProxyAdapter
func (t *Trojan) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.Conn, err error) {
// gun transport
@ -131,7 +141,7 @@ func (t *Trojan) DialContext(ctx context.Context, metadata *C.Metadata, opts ...
c = t.ssCipher.StreamConn(c)
}
if err = t.instance.WriteHeader(c, trojan.CommandTCP, serializesSocksAddr(metadata)); err != nil {
if err = t.writeHeaderContext(ctx, c, metadata); err != nil {
c.Close()
return nil, err
}
@ -184,7 +194,7 @@ func (t *Trojan) ListenPacketContext(ctx context.Context, metadata *C.Metadata,
c = t.ssCipher.StreamConn(c)
}
err = t.instance.WriteHeader(c, trojan.CommandUDP, serializesSocksAddr(metadata))
err = t.writeHeaderContext(ctx, c, metadata)
if err != nil {
return nil, err
}
@ -219,7 +229,7 @@ func (t *Trojan) ListenPacketWithDialer(ctx context.Context, dialer C.Dialer, me
c = t.ssCipher.StreamConn(c)
}
err = t.instance.WriteHeader(c, trojan.CommandUDP, serializesSocksAddr(metadata))
err = t.writeHeaderContext(ctx, c, metadata)
if err != nil {
return nil, err
}

View file

@ -155,7 +155,7 @@ func (v *Vless) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.M
Path: v.option.HTTP2Opts.Path,
}
c, err = vmess.StreamH2Conn(c, h2Opts)
c, err = vmess.StreamH2Conn(ctx, c, h2Opts)
case "grpc":
c, err = gun.StreamGunWithConn(c, v.gunTLSConfig, v.gunConfig, v.realityConfig)
default:
@ -168,10 +168,14 @@ func (v *Vless) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.M
return nil, err
}
return v.streamConn(c, metadata)
return v.streamConnContext(ctx, c, metadata)
}
func (v *Vless) streamConn(c net.Conn, metadata *C.Metadata) (conn net.Conn, err error) {
func (v *Vless) streamConnContext(ctx context.Context, c net.Conn, metadata *C.Metadata) (conn net.Conn, err error) {
if ctx.Done() != nil {
done := N.SetupContextForConn(ctx, c)
defer done(&err)
}
if metadata.NetWork == C.UDP {
if v.option.PacketAddr {
metadata = &C.Metadata{
@ -238,7 +242,7 @@ func (v *Vless) DialContext(ctx context.Context, metadata *C.Metadata, opts ...d
safeConnClose(c, err)
}(c)
c, err = v.client.StreamConn(c, parseVlessAddr(metadata, v.option.XUDP))
c, err = v.streamConnContext(ctx, c, metadata)
if err != nil {
return nil, err
}
@ -292,7 +296,7 @@ func (v *Vless) ListenPacketContext(ctx context.Context, metadata *C.Metadata, o
safeConnClose(c, err)
}(c)
c, err = v.streamConn(c, metadata)
c, err = v.streamConnContext(ctx, c, metadata)
if err != nil {
return nil, fmt.Errorf("new vless client error: %v", err)
}

View file

@ -199,7 +199,7 @@ func (v *Vmess) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.M
Path: v.option.HTTP2Opts.Path,
}
c, err = mihomoVMess.StreamH2Conn(c, h2Opts)
c, err = mihomoVMess.StreamH2Conn(ctx, c, h2Opts)
case "grpc":
c, err = gun.StreamGunWithConn(c, v.gunTLSConfig, v.gunConfig, v.realityConfig)
default:
@ -226,17 +226,24 @@ func (v *Vmess) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.M
if err != nil {
return nil, err
}
return v.streamConn(c, metadata)
return v.streamConnConntext(ctx, c, metadata)
}
func (v *Vmess) streamConn(c net.Conn, metadata *C.Metadata) (conn net.Conn, err error) {
func (v *Vmess) streamConnConntext(ctx context.Context, c net.Conn, metadata *C.Metadata) (conn net.Conn, err error) {
useEarly := N.NeedHandshake(c)
if !useEarly {
if ctx.Done() != nil {
done := N.SetupContextForConn(ctx, c)
defer done(&err)
}
}
if metadata.NetWork == C.UDP {
if v.option.XUDP {
var globalID [8]byte
if metadata.SourceValid() {
globalID = utils.GlobalID(metadata.SourceAddress())
}
if N.NeedHandshake(c) {
if useEarly {
conn = v.client.DialEarlyXUDPPacketConn(c,
globalID,
M.SocksaddrFromNet(metadata.UDPAddr()))
@ -246,7 +253,7 @@ func (v *Vmess) streamConn(c net.Conn, metadata *C.Metadata) (conn net.Conn, err
M.SocksaddrFromNet(metadata.UDPAddr()))
}
} else if v.option.PacketAddr {
if N.NeedHandshake(c) {
if useEarly {
conn = v.client.DialEarlyPacketConn(c,
M.ParseSocksaddrHostPort(packetaddr.SeqPacketMagicAddress, 443))
} else {
@ -255,7 +262,7 @@ func (v *Vmess) streamConn(c net.Conn, metadata *C.Metadata) (conn net.Conn, err
}
conn = packetaddr.NewBindConn(conn)
} else {
if N.NeedHandshake(c) {
if useEarly {
conn = v.client.DialEarlyPacketConn(c,
M.SocksaddrFromNet(metadata.UDPAddr()))
} else {
@ -264,7 +271,7 @@ func (v *Vmess) streamConn(c net.Conn, metadata *C.Metadata) (conn net.Conn, err
}
}
} else {
if N.NeedHandshake(c) {
if useEarly {
conn = v.client.DialEarlyConn(c,
M.ParseSocksaddrHostPort(metadata.String(), metadata.DstPort))
} else {
@ -290,7 +297,7 @@ func (v *Vmess) DialContext(ctx context.Context, metadata *C.Metadata, opts ...d
safeConnClose(c, err)
}(c)
c, err = v.client.DialConn(c, M.ParseSocksaddrHostPort(metadata.String(), metadata.DstPort))
c, err = v.streamConnConntext(ctx, c, metadata)
if err != nil {
return nil, err
}
@ -341,7 +348,7 @@ func (v *Vmess) ListenPacketContext(ctx context.Context, metadata *C.Metadata, o
safeConnClose(c, err)
}(c)
c, err = v.streamConn(c, metadata)
c, err = v.streamConnConntext(ctx, c, metadata)
if err != nil {
return nil, fmt.Errorf("new vmess client error: %v", err)
}

View file

@ -7,6 +7,8 @@ import (
"net/http"
"net/url"
N "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/randv2"
"golang.org/x/net/http2"
)
@ -100,7 +102,12 @@ func (hc *h2Conn) Close() error {
return hc.Conn.Close()
}
func StreamH2Conn(conn net.Conn, cfg *H2Config) (net.Conn, error) {
func StreamH2Conn(ctx context.Context, conn net.Conn, cfg *H2Config) (_ net.Conn, err error) {
if ctx.Done() != nil {
done := N.SetupContextForConn(ctx, conn)
defer done(&err)
}
transport := &http2.Transport{}
cconn, err := transport.NewClientConn(conn)