From 414379ca96b57c2045a77e24cc816b5c797bfbf0 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Sun, 12 Jan 2025 12:57:47 -0500 Subject: [PATCH 1/6] Add config proto --- transport/internet/quic/config.pb.go | 137 +++++++++++++++++++++++++++ transport/internet/quic/config.proto | 14 +++ 2 files changed, 151 insertions(+) create mode 100644 transport/internet/quic/config.pb.go create mode 100644 transport/internet/quic/config.proto diff --git a/transport/internet/quic/config.pb.go b/transport/internet/quic/config.pb.go new file mode 100644 index 00000000..5220cd79 --- /dev/null +++ b/transport/internet/quic/config.pb.go @@ -0,0 +1,137 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.35.1 +// protoc v5.28.2 +// source: transport/internet/quic/config.proto + +package quic + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Config struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // string key = 1; + // xray.common.protocol.SecurityConfig security = 2; + // xray.common.serial.TypedMessage header = 3; + Fec bool `protobuf:"varint,4,opt,name=fec,proto3" json:"fec,omitempty"` +} + +func (x *Config) Reset() { + *x = Config{} + mi := &file_transport_internet_quic_config_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Config) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Config) ProtoMessage() {} + +func (x *Config) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_quic_config_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Config.ProtoReflect.Descriptor instead. +func (*Config) Descriptor() ([]byte, []int) { + return file_transport_internet_quic_config_proto_rawDescGZIP(), []int{0} +} + +func (x *Config) GetFec() bool { + if x != nil { + return x.Fec + } + return false +} + +var File_transport_internet_quic_config_proto protoreflect.FileDescriptor + +var file_transport_internet_quic_config_proto_rawDesc = []byte{ + 0x0a, 0x24, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x71, 0x75, 0x69, 0x63, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1c, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, + 0x71, 0x75, 0x69, 0x63, 0x22, 0x1a, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x10, + 0x0a, 0x03, 0x66, 0x65, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x66, 0x65, 0x63, + 0x42, 0x76, 0x0a, 0x20, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, + 0x71, 0x75, 0x69, 0x63, 0x50, 0x01, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, + 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x71, 0x75, 0x69, 0x63, 0xaa, 0x02, 0x1c, 0x58, 0x72, 0x61, 0x79, + 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x65, 0x74, 0x2e, 0x51, 0x75, 0x69, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_transport_internet_quic_config_proto_rawDescOnce sync.Once + file_transport_internet_quic_config_proto_rawDescData = file_transport_internet_quic_config_proto_rawDesc +) + +func file_transport_internet_quic_config_proto_rawDescGZIP() []byte { + file_transport_internet_quic_config_proto_rawDescOnce.Do(func() { + file_transport_internet_quic_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_transport_internet_quic_config_proto_rawDescData) + }) + return file_transport_internet_quic_config_proto_rawDescData +} + +var file_transport_internet_quic_config_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_transport_internet_quic_config_proto_goTypes = []any{ + (*Config)(nil), // 0: xray.transport.internet.quic.Config +} +var file_transport_internet_quic_config_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_transport_internet_quic_config_proto_init() } +func file_transport_internet_quic_config_proto_init() { + if File_transport_internet_quic_config_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_transport_internet_quic_config_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_transport_internet_quic_config_proto_goTypes, + DependencyIndexes: file_transport_internet_quic_config_proto_depIdxs, + MessageInfos: file_transport_internet_quic_config_proto_msgTypes, + }.Build() + File_transport_internet_quic_config_proto = out.File + file_transport_internet_quic_config_proto_rawDesc = nil + file_transport_internet_quic_config_proto_goTypes = nil + file_transport_internet_quic_config_proto_depIdxs = nil +} diff --git a/transport/internet/quic/config.proto b/transport/internet/quic/config.proto new file mode 100644 index 00000000..2393d6b5 --- /dev/null +++ b/transport/internet/quic/config.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package xray.transport.internet.quic; +option csharp_namespace = "Xray.Transport.Internet.Quic"; +option go_package = "github.com/xtls/xray-core/transport/internet/quic"; +option java_package = "com.xray.transport.internet.quic"; +option java_multiple_files = true; + +message Config { +// string key = 1; +// xray.common.protocol.SecurityConfig security = 2; +// xray.common.serial.TypedMessage header = 3; + bool fec = 4; +} \ No newline at end of file From c660bf7e37edbda0c5808108fa70f7520efd1873 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Sun, 12 Jan 2025 14:49:34 -0500 Subject: [PATCH 2/6] Add Datagram transport --- infra/conf/transport_internet.go | 32 +++++- main/distro/all/all.go | 1 + transport/internet/quic/conn.go | 61 +++++++++++ transport/internet/quic/dialer.go | 154 +++++++++++++++++++++++++++ transport/internet/quic/hub.go | 113 ++++++++++++++++++++ transport/internet/quic/quic.go | 17 +++ transport/internet/quic/quic_test.go | 92 ++++++++++++++++ 7 files changed, 468 insertions(+), 2 deletions(-) create mode 100644 transport/internet/quic/conn.go create mode 100644 transport/internet/quic/dialer.go create mode 100644 transport/internet/quic/hub.go create mode 100644 transport/internet/quic/quic.go create mode 100644 transport/internet/quic/quic_test.go diff --git a/infra/conf/transport_internet.go b/infra/conf/transport_internet.go index 78bd76fe..30d5b800 100644 --- a/infra/conf/transport_internet.go +++ b/infra/conf/transport_internet.go @@ -18,6 +18,7 @@ import ( "github.com/xtls/xray-core/transport/internet" "github.com/xtls/xray-core/transport/internet/httpupgrade" "github.com/xtls/xray-core/transport/internet/kcp" + "github.com/xtls/xray-core/transport/internet/quic" "github.com/xtls/xray-core/transport/internet/reality" "github.com/xtls/xray-core/transport/internet/splithttp" "github.com/xtls/xray-core/transport/internet/tcp" @@ -332,6 +333,22 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) { return config, nil } +type QUICConfig struct { + // Header json.RawMessage `json:"header"` + // Security string `json:"security"` + // Key string `json:"key"` + + Fec bool `json:"fec"` +} + +// Build implements Buildable. +func (c *QUICConfig) Build() (proto.Message, error) { + config := &quic.Config{ + Fec: c.Fec, + } + return config, nil +} + func readFileOrString(f string, s []string) ([]byte, error) { if len(f) > 0 { return filesystem.ReadFile(f) @@ -683,8 +700,8 @@ func (p TransportProtocol) Build() (string, error) { return "httpupgrade", nil case "h2", "h3", "http": return "", errors.PrintRemovedFeatureError("HTTP transport (without header padding, etc.)", "XHTTP stream-one H2 & H3") - case "quic": - return "", errors.PrintRemovedFeatureError("QUIC transport (without web service, etc.)", "XHTTP stream-one H3") + case "quic", "datagram": + return "quic", nil default: return "", errors.New("Config: unknown transport protocol: ", p) } @@ -839,6 +856,7 @@ type StreamConfig struct { XHTTPSettings *SplitHTTPConfig `json:"xhttpSettings"` SplitHTTPSettings *SplitHTTPConfig `json:"splithttpSettings"` KCPSettings *KCPConfig `json:"kcpSettings"` + QUICSettings *QUICConfig `json:"quicSettings"` GRPCSettings *GRPCConfig `json:"grpcSettings"` WSSettings *WebSocketConfig `json:"wsSettings"` HTTPUPGRADESettings *HttpUpgradeConfig `json:"httpupgradeSettings"` @@ -930,6 +948,16 @@ func (c *StreamConfig) Build() (*internet.StreamConfig, error) { Settings: serial.ToTypedMessage(ts), }) } + if c.QUICSettings != nil { + qs, err := c.QUICSettings.Build() + if err != nil { + return nil, errors.New("Failed to build QUIC config").Base(err) + } + config.TransportSettings = append(config.TransportSettings, &internet.TransportConfig{ + ProtocolName: "quic", + Settings: serial.ToTypedMessage(qs), + }) + } if c.GRPCSettings != nil { gs, err := c.GRPCSettings.Build() if err != nil { diff --git a/main/distro/all/all.go b/main/distro/all/all.go index 198abb3f..e3b982d7 100644 --- a/main/distro/all/all.go +++ b/main/distro/all/all.go @@ -53,6 +53,7 @@ import ( _ "github.com/xtls/xray-core/transport/internet/grpc" _ "github.com/xtls/xray-core/transport/internet/httpupgrade" _ "github.com/xtls/xray-core/transport/internet/kcp" + _ "github.com/xtls/xray-core/transport/internet/quic" _ "github.com/xtls/xray-core/transport/internet/reality" _ "github.com/xtls/xray-core/transport/internet/splithttp" _ "github.com/xtls/xray-core/transport/internet/tcp" diff --git a/transport/internet/quic/conn.go b/transport/internet/quic/conn.go new file mode 100644 index 00000000..b82dcff6 --- /dev/null +++ b/transport/internet/quic/conn.go @@ -0,0 +1,61 @@ +package quic + +import ( + "context" + "time" + + "github.com/xtls/quic-go" + "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/net" +) + +type interConn struct { + ctx context.Context + quicConn quic.Connection + local net.Addr + remote net.Addr +} + +func (c *interConn) Read(b []byte) (int, error) { + received, e := c.quicConn.ReceiveDatagram(c.ctx) + if e != nil { + return 0, e + } + nBytes := copy(b, received[:]) + return nBytes, nil +} + +func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error { + mb = buf.Compact(mb) + mb, err := buf.WriteMultiBuffer(c, mb) + buf.ReleaseMulti(mb) + return err +} + +func (c *interConn) Write(b []byte) (int, error) { + return len(b), c.quicConn.SendDatagram(b) +} + +func (c *interConn) Close() error { + return nil +} + +func (c *interConn) LocalAddr() net.Addr { + return c.local +} + +func (c *interConn) RemoteAddr() net.Addr { + return c.remote +} + +func (c *interConn) SetDeadline(t time.Time) error { + return nil +} + +func (c *interConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *interConn) SetWriteDeadline(t time.Time) error { + return nil +} diff --git a/transport/internet/quic/dialer.go b/transport/internet/quic/dialer.go new file mode 100644 index 00000000..316fbbcd --- /dev/null +++ b/transport/internet/quic/dialer.go @@ -0,0 +1,154 @@ +package quic + +import ( + "context" + "sync" + "time" + + "github.com/xtls/quic-go" + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/errors" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/transport/internet" + "github.com/xtls/xray-core/transport/internet/stat" + "github.com/xtls/xray-core/transport/internet/tls" +) + +type connectionContext struct { + rawConn *net.UDPConn + conn quic.Connection +} + +type clientConnections struct { + access sync.Mutex + conns map[net.Destination][]*connectionContext + // cleanup *task.Periodic +} + +func isActive(s quic.Connection) bool { + select { + case <-s.Context().Done(): + return false + default: + return true + } +} + +func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) { + tlsConfig := tls.ConfigFromStreamSettings(streamSettings) + if tlsConfig == nil { + tlsConfig = &tls.Config{ + ServerName: internalDomain, + AllowInsecure: true, + } + } + + var destAddr *net.UDPAddr + if dest.Address.Family().IsIP() { + destAddr = &net.UDPAddr{ + IP: dest.Address.IP(), + Port: int(dest.Port), + } + } else { + dialerIp := internet.DestIpAddress() + if dialerIp != nil { + destAddr = &net.UDPAddr{ + IP: dialerIp, + Port: int(dest.Port), + } + errors.LogInfo(ctx, "quic Dial use dialer dest addr: ", destAddr) + } else { + addr, err := net.ResolveUDPAddr("udp", dest.NetAddr()) + if err != nil { + return nil, err + } + destAddr = addr + } + } + + config := streamSettings.ProtocolSettings.(*Config) + + return client.openConnection(ctx, destAddr, config, tlsConfig, streamSettings.SocketSettings) +} + +func (s *clientConnections) openConnection(ctx context.Context, destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) { + s.access.Lock() + defer s.access.Unlock() + + if s.conns == nil { + s.conns = make(map[net.Destination][]*connectionContext) + } + + dest := net.DestinationFromAddr(destAddr) + + var conns []*connectionContext + if s, found := s.conns[dest]; found { + conns = s + } + + if len(conns) > 0 { + s := conns[len(conns)-1] + if isActive(s.conn) { + return &interConn{ + ctx: ctx, + quicConn: s.conn, + local: s.conn.LocalAddr(), + remote: destAddr, + }, nil + } else { + errors.LogInfo(ctx, "current quic connection is not active!") + } + } + + errors.LogInfo(ctx, "dialing quic to ", dest) + rawConn, err := internet.DialSystem(ctx, dest, sockopt) + if err != nil { + return nil, errors.New("failed to dial to dest: ", err).AtWarning().Base(err) + } + + quicConfig := &quic.Config{ + KeepAlivePeriod: 0, + HandshakeIdleTimeout: time.Second * 8, + MaxIdleTimeout: time.Second * 300, + EnableDatagrams: true, + } + + var udpConn *net.UDPConn + switch conn := rawConn.(type) { + case *net.UDPConn: + udpConn = conn + case *internet.PacketConnWrapper: + udpConn = conn.Conn.(*net.UDPConn) + default: + rawConn.Close() + return nil, errors.New("QUIC with sockopt is unsupported").AtWarning() + } + + tr := quic.Transport{ + ConnectionIDLength: 12, + Conn: udpConn, + } + conn, err := tr.Dial(context.Background(), destAddr, tlsConfig.GetTLSConfig(tls.WithDestination(dest)), quicConfig) + if err != nil { + udpConn.Close() + return nil, err + } + + context := &connectionContext{ + conn: conn, + rawConn: udpConn, + } + s.conns[dest] = append(conns, context) + return &interConn{ + ctx: ctx, + quicConn: context.conn, + local: context.conn.LocalAddr(), + remote: destAddr, + }, nil +} + +var client clientConnections + +func init() { + common.Must(internet.RegisterTransportDialer(protocolName, Dial)) +} diff --git a/transport/internet/quic/hub.go b/transport/internet/quic/hub.go new file mode 100644 index 00000000..6d5cec93 --- /dev/null +++ b/transport/internet/quic/hub.go @@ -0,0 +1,113 @@ +package quic + +import ( + "context" + "time" + + "github.com/xtls/quic-go" + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/errors" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/protocol/tls/cert" + "github.com/xtls/xray-core/common/signal/done" + "github.com/xtls/xray-core/transport/internet" + "github.com/xtls/xray-core/transport/internet/tls" +) + +// Listener is an internet.Listener that listens for TCP connections. +type Listener struct { + rawConn *net.UDPConn + listener *quic.Listener + done *done.Instance + addConn internet.ConnHandler +} + +func (l *Listener) keepAccepting(ctx context.Context) { + for { + conn, err := l.listener.Accept(context.Background()) + if err != nil { + errors.LogInfoInner(context.Background(), err, "failed to accept QUIC connection") + if l.done.Done() { + break + } + time.Sleep(time.Second) + continue + } + l.addConn(&interConn{ + ctx: ctx, + quicConn: conn, + local: conn.LocalAddr(), + remote: conn.RemoteAddr(), + }) + } +} + +// Addr implements internet.Listener.Addr. +func (l *Listener) Addr() net.Addr { + return l.listener.Addr() +} + +// Close implements internet.Listener.Close. +func (l *Listener) Close() error { + l.done.Close() + l.listener.Close() + l.rawConn.Close() + return nil +} + +// Listen creates a new Listener based on configurations. +func Listen(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) { + if address.Family().IsDomain() { + return nil, errors.New("domain address is not allows for listening quic") + } + + tlsConfig := tls.ConfigFromStreamSettings(streamSettings) + if tlsConfig == nil { + tlsConfig = &tls.Config{ + Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.DNSNames(internalDomain), cert.CommonName(internalDomain)))}, + } + } + + //config := streamSettings.ProtocolSettings.(*Config) + rawConn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{ + IP: address.IP(), + Port: int(port), + }, streamSettings.SocketSettings) + if err != nil { + return nil, err + } + + quicConfig := &quic.Config{ + KeepAlivePeriod: 0, + HandshakeIdleTimeout: time.Second * 8, + MaxIdleTimeout: time.Second * 300, + MaxIncomingStreams: 32, + MaxIncomingUniStreams: -1, + EnableDatagrams: true, + } + + tr := quic.Transport{ + ConnectionIDLength: 12, + Conn: rawConn.(*net.UDPConn), + } + qListener, err := tr.Listen(tlsConfig.GetTLSConfig(), quicConfig) + if err != nil { + rawConn.Close() + return nil, err + } + + listener := &Listener{ + done: done.New(), + rawConn: rawConn.(*net.UDPConn), + listener: qListener, + addConn: handler, + } + + go listener.keepAccepting(ctx) + + return listener, nil +} + +func init() { + common.Must(internet.RegisterTransportListener(protocolName, Listen)) +} diff --git a/transport/internet/quic/quic.go b/transport/internet/quic/quic.go new file mode 100644 index 00000000..db523ab9 --- /dev/null +++ b/transport/internet/quic/quic.go @@ -0,0 +1,17 @@ +package quic + +import ( + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/transport/internet" +) + +const ( + protocolName = "quic" + internalDomain = "quic.internal.example.com" +) + +func init() { + common.Must(internet.RegisterProtocolConfigCreator(protocolName, func() interface{} { + return new(Config) + })) +} diff --git a/transport/internet/quic/quic_test.go b/transport/internet/quic/quic_test.go new file mode 100644 index 00000000..d7ddd592 --- /dev/null +++ b/transport/internet/quic/quic_test.go @@ -0,0 +1,92 @@ +package quic_test + +import ( + "context" + "crypto/rand" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/protocol/tls/cert" + "github.com/xtls/xray-core/testing/servers/udp" + "github.com/xtls/xray-core/transport/internet" + "github.com/xtls/xray-core/transport/internet/quic" + "github.com/xtls/xray-core/transport/internet/stat" + "github.com/xtls/xray-core/transport/internet/tls" +) + +func TestQuicConnection(t *testing.T) { + port := udp.PickPort() + + listener, err := quic.Listen(context.Background(), net.LocalHostIP, port, &internet.MemoryStreamConfig{ + ProtocolName: "quic", + ProtocolSettings: &quic.Config{}, + SecurityType: "tls", + SecuritySettings: &tls.Config{ + Certificate: []*tls.Certificate{ + tls.ParseCertificate( + cert.MustGenerate(nil, + cert.DNSNames("www.example.com"), + ), + ), + }, + }, + }, func(conn stat.Connection) { + go func() { + defer conn.Close() + + b := buf.New() + defer b.Release() + + for { + b.Clear() + if _, err := b.ReadFrom(conn); err != nil { + return + } + common.Must2(conn.Write(b.Bytes())) + } + }() + }) + common.Must(err) + + defer listener.Close() + + time.Sleep(time.Second) + + dctx := context.Background() + conn, err := quic.Dial(dctx, net.TCPDestination(net.LocalHostIP, port), &internet.MemoryStreamConfig{ + ProtocolName: "quic", + ProtocolSettings: &quic.Config{}, + SecurityType: "tls", + SecuritySettings: &tls.Config{ + ServerName: "www.example.com", + AllowInsecure: true, + }, + }) + common.Must(err) + defer conn.Close() + + const N = 1024 + b1 := make([]byte, N) + common.Must2(rand.Read(b1)) + b2 := buf.New() + + common.Must2(conn.Write(b1)) + + b2.Clear() + common.Must2(b2.ReadFullFrom(conn, N)) + if r := cmp.Diff(b2.Bytes(), b1); r != "" { + t.Error(r) + } + + common.Must2(conn.Write(b1)) + + b2.Clear() + common.Must2(b2.ReadFullFrom(conn, N)) + if r := cmp.Diff(b2.Bytes(), b1); r != "" { + t.Error(r) + } +} From 358bdc258ea8863ebd293310881bc49b0adec15d Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Sun, 2 Feb 2025 11:11:09 -0500 Subject: [PATCH 3/6] Update to latest main --- transport/internet/quic/conn.go | 2 +- transport/internet/quic/dialer.go | 2 +- transport/internet/quic/hub.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/transport/internet/quic/conn.go b/transport/internet/quic/conn.go index b82dcff6..e04a4137 100644 --- a/transport/internet/quic/conn.go +++ b/transport/internet/quic/conn.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/xtls/quic-go" + "github.com/quic-go/quic-go" "github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/net" ) diff --git a/transport/internet/quic/dialer.go b/transport/internet/quic/dialer.go index 316fbbcd..0d8bdf9f 100644 --- a/transport/internet/quic/dialer.go +++ b/transport/internet/quic/dialer.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/xtls/quic-go" + "github.com/quic-go/quic-go" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/net" diff --git a/transport/internet/quic/hub.go b/transport/internet/quic/hub.go index 6d5cec93..ed3f9aa8 100644 --- a/transport/internet/quic/hub.go +++ b/transport/internet/quic/hub.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/xtls/quic-go" + "github.com/quic-go/quic-go" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/net" From 129b2be9c1d3cad9155ce559b68949770f391216 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Sun, 2 Mar 2025 23:53:33 -0500 Subject: [PATCH 4/6] Use capsulate protocol for large UDP packet - make datagram transport without mux functionality - it is now recommended to always pair with mux-cool (XUDP new tunnel non-zero session id) --- transport/internet/quic/conn.go | 136 ++++++++++++++++++++++++--- transport/internet/quic/dialer.go | 67 +------------ transport/internet/quic/hub.go | 9 +- transport/internet/quic/quic_test.go | 17 +++- 4 files changed, 140 insertions(+), 89 deletions(-) diff --git a/transport/internet/quic/conn.go b/transport/internet/quic/conn.go index e04a4137..f79571d5 100644 --- a/transport/internet/quic/conn.go +++ b/transport/internet/quic/conn.go @@ -6,22 +6,83 @@ import ( "github.com/quic-go/quic-go" "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/signal/done" ) +var MaxIncomingStreams = 2 +var currentStream = 0 + type interConn struct { - ctx context.Context - quicConn quic.Connection - local net.Addr - remote net.Addr + ctx context.Context + quicConn quic.Connection // small udp packet can be sent with Datagram directly + streams []quic.Stream // other packets can be sent via steam, it offer mux, reliability, fragmentation and ordering + readChannel chan readResult + done *done.Instance + local net.Addr + remote net.Addr } -func (c *interConn) Read(b []byte) (int, error) { - received, e := c.quicConn.ReceiveDatagram(c.ctx) - if e != nil { - return 0, e +type readResult struct { + buffer []byte + err error +} + +func NewConnInitReader(ctx context.Context, quicConn quic.Connection, done *done.Instance, remote net.Addr) *interConn { + c := &interConn{ + ctx: ctx, + quicConn: quicConn, + readChannel: make(chan readResult), + done: done, + local: quicConn.LocalAddr(), + remote: remote, } - nBytes := copy(b, received[:]) + go func() { + for { + received, e := c.quicConn.ReceiveDatagram(c.ctx) + c.readChannel <- readResult{buffer: received, err: e} + } + }() + go c.acceptStreams() + return c +} + +func (c *interConn) acceptStreams() { + for { + stream, err := c.quicConn.AcceptStream(context.Background()) + if err != nil { + errors.LogInfoInner(context.Background(), err, "failed to accept stream") + select { + case <-c.quicConn.Context().Done(): + return + case <-c.done.Wait(): + if err := c.quicConn.CloseWithError(0, ""); err != nil { + errors.LogInfoInner(context.Background(), err, "failed to close connection") + } + return + default: + time.Sleep(time.Second) + continue + } + } + go func() { + for { + received := make([]byte, buf.Size) + i, e := stream.Read(received) + c.readChannel <- readResult{buffer: received[:i], err: e} + } + }() + c.streams = append(c.streams, stream) + } +} + +func (c *interConn) Read(b []byte) (int, error) { + received := <- c.readChannel + if received.err != nil { + return 0, received.err + } + nBytes := copy(b, received.buffer[:]) return nBytes, nil } @@ -33,11 +94,37 @@ func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error { } func (c *interConn) Write(b []byte) (int, error) { - return len(b), c.quicConn.SendDatagram(b) + var err = c.quicConn.SendDatagram(b) + if _, ok := err.(*quic.DatagramTooLargeError); ok { + if len(c.streams) < MaxIncomingStreams { + stream, err := c.quicConn.OpenStream() + if err == nil { + c.streams = append(c.streams, stream) + } else { + errors.LogInfoInner(c.ctx, err, "failed to openStream: ") + } + } + currentStream++; + if currentStream > len(c.streams) - 1 { + currentStream = 0; + } + return c.streams[currentStream].Write(b) + } + if err != nil { + return 0, err + } + return len(b), nil } func (c *interConn) Close() error { - return nil + var err error + for _, s := range c.streams { + e := s.Close() + if e != nil { + err = e + } + } + return err } func (c *interConn) LocalAddr() net.Addr { @@ -49,13 +136,34 @@ func (c *interConn) RemoteAddr() net.Addr { } func (c *interConn) SetDeadline(t time.Time) error { - return nil + var err error + for _, s := range c.streams { + e := s.SetDeadline(t) + if e != nil { + err = e + } + } + return err } func (c *interConn) SetReadDeadline(t time.Time) error { - return nil + var err error + for _, s := range c.streams { + e := s.SetReadDeadline(t) + if e != nil { + err = e + } + } + return err } func (c *interConn) SetWriteDeadline(t time.Time) error { - return nil + var err error + for _, s := range c.streams { + e := s.SetWriteDeadline(t) + if e != nil { + err = e + } + } + return err } diff --git a/transport/internet/quic/dialer.go b/transport/internet/quic/dialer.go index 0d8bdf9f..65eb59a7 100644 --- a/transport/internet/quic/dialer.go +++ b/transport/internet/quic/dialer.go @@ -2,38 +2,18 @@ package quic import ( "context" - "sync" "time" "github.com/quic-go/quic-go" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/signal/done" "github.com/xtls/xray-core/transport/internet" "github.com/xtls/xray-core/transport/internet/stat" "github.com/xtls/xray-core/transport/internet/tls" ) -type connectionContext struct { - rawConn *net.UDPConn - conn quic.Connection -} - -type clientConnections struct { - access sync.Mutex - conns map[net.Destination][]*connectionContext - // cleanup *task.Periodic -} - -func isActive(s quic.Connection) bool { - select { - case <-s.Context().Done(): - return false - default: - return true - } -} - func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) { tlsConfig := tls.ConfigFromStreamSettings(streamSettings) if tlsConfig == nil { @@ -68,38 +48,11 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me config := streamSettings.ProtocolSettings.(*Config) - return client.openConnection(ctx, destAddr, config, tlsConfig, streamSettings.SocketSettings) + return openConnection(ctx, destAddr, config, tlsConfig, streamSettings.SocketSettings) } -func (s *clientConnections) openConnection(ctx context.Context, destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) { - s.access.Lock() - defer s.access.Unlock() - - if s.conns == nil { - s.conns = make(map[net.Destination][]*connectionContext) - } - +func openConnection(ctx context.Context, destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) { dest := net.DestinationFromAddr(destAddr) - - var conns []*connectionContext - if s, found := s.conns[dest]; found { - conns = s - } - - if len(conns) > 0 { - s := conns[len(conns)-1] - if isActive(s.conn) { - return &interConn{ - ctx: ctx, - quicConn: s.conn, - local: s.conn.LocalAddr(), - remote: destAddr, - }, nil - } else { - errors.LogInfo(ctx, "current quic connection is not active!") - } - } - errors.LogInfo(ctx, "dialing quic to ", dest) rawConn, err := internet.DialSystem(ctx, dest, sockopt) if err != nil { @@ -134,21 +87,9 @@ func (s *clientConnections) openConnection(ctx context.Context, destAddr net.Add return nil, err } - context := &connectionContext{ - conn: conn, - rawConn: udpConn, - } - s.conns[dest] = append(conns, context) - return &interConn{ - ctx: ctx, - quicConn: context.conn, - local: context.conn.LocalAddr(), - remote: destAddr, - }, nil + return NewConnInitReader(ctx, conn, done.New(), destAddr), nil } -var client clientConnections - func init() { common.Must(internet.RegisterTransportDialer(protocolName, Dial)) } diff --git a/transport/internet/quic/hub.go b/transport/internet/quic/hub.go index ed3f9aa8..bfeef877 100644 --- a/transport/internet/quic/hub.go +++ b/transport/internet/quic/hub.go @@ -33,12 +33,7 @@ func (l *Listener) keepAccepting(ctx context.Context) { time.Sleep(time.Second) continue } - l.addConn(&interConn{ - ctx: ctx, - quicConn: conn, - local: conn.LocalAddr(), - remote: conn.RemoteAddr(), - }) + l.addConn(NewConnInitReader(ctx, conn, l.done, conn.RemoteAddr())) } } @@ -81,7 +76,7 @@ func Listen(ctx context.Context, address net.Address, port net.Port, streamSetti KeepAlivePeriod: 0, HandshakeIdleTimeout: time.Second * 8, MaxIdleTimeout: time.Second * 300, - MaxIncomingStreams: 32, + MaxIncomingStreams: 2, MaxIncomingUniStreams: -1, EnableDatagrams: true, } diff --git a/transport/internet/quic/quic_test.go b/transport/internet/quic/quic_test.go index d7ddd592..eb1d707b 100644 --- a/transport/internet/quic/quic_test.go +++ b/transport/internet/quic/quic_test.go @@ -18,7 +18,15 @@ import ( "github.com/xtls/xray-core/transport/internet/tls" ) -func TestQuicConnection(t *testing.T) { +func TestShortQuicConnection(t *testing.T) { + testQuicConnection(t, 1024) +} + +func TestLongQuicConnection(t *testing.T) { + testQuicConnection(t, 1500) +} + +func testQuicConnection(t *testing.T, dataLen int32) { port := udp.PickPort() listener, err := quic.Listen(context.Background(), net.LocalHostIP, port, &internet.MemoryStreamConfig{ @@ -69,15 +77,14 @@ func TestQuicConnection(t *testing.T) { common.Must(err) defer conn.Close() - const N = 1024 - b1 := make([]byte, N) + b1 := make([]byte, dataLen) common.Must2(rand.Read(b1)) b2 := buf.New() common.Must2(conn.Write(b1)) b2.Clear() - common.Must2(b2.ReadFullFrom(conn, N)) + common.Must2(b2.ReadFullFrom(conn, dataLen)) if r := cmp.Diff(b2.Bytes(), b1); r != "" { t.Error(r) } @@ -85,7 +92,7 @@ func TestQuicConnection(t *testing.T) { common.Must2(conn.Write(b1)) b2.Clear() - common.Must2(b2.ReadFullFrom(conn, N)) + common.Must2(b2.ReadFullFrom(conn, dataLen)) if r := cmp.Diff(b2.Bytes(), b1); r != "" { t.Error(r) } From 7c78408df94929b42da1c26695b44c6f9046b4d1 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Sun, 9 Mar 2025 22:59:30 -0400 Subject: [PATCH 5/6] Add some logs for troubleshooting --- transport/internet/quic/conn.go | 7 +++++++ transport/internet/quic/quic_test.go | 2 ++ 2 files changed, 9 insertions(+) diff --git a/transport/internet/quic/conn.go b/transport/internet/quic/conn.go index f79571d5..a0bc207d 100644 --- a/transport/internet/quic/conn.go +++ b/transport/internet/quic/conn.go @@ -41,6 +41,7 @@ func NewConnInitReader(ctx context.Context, quicConn quic.Connection, done *done go func() { for { received, e := c.quicConn.ReceiveDatagram(c.ctx) + errors.LogInfo(c.ctx, "Read ReceiveDatagram ", len(received)) c.readChannel <- readResult{buffer: received, err: e} } }() @@ -51,6 +52,7 @@ func NewConnInitReader(ctx context.Context, quicConn quic.Connection, done *done func (c *interConn) acceptStreams() { for { stream, err := c.quicConn.AcceptStream(context.Background()) + errors.LogInfo(c.ctx, "Read AcceptStream ", err) if err != nil { errors.LogInfoInner(context.Background(), err, "failed to accept stream") select { @@ -70,6 +72,7 @@ func (c *interConn) acceptStreams() { for { received := make([]byte, buf.Size) i, e := stream.Read(received) + errors.LogInfo(c.ctx, "Read stream ", i) c.readChannel <- readResult{buffer: received[:i], err: e} } }() @@ -83,6 +86,7 @@ func (c *interConn) Read(b []byte) (int, error) { return 0, received.err } nBytes := copy(b, received.buffer[:]) + errors.LogInfo(c.ctx, "Read copy ", nBytes) return nBytes, nil } @@ -95,9 +99,11 @@ func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error { func (c *interConn) Write(b []byte) (int, error) { var err = c.quicConn.SendDatagram(b) + errors.LogInfo(c.ctx, "Write SendDatagram ", len(b), err) if _, ok := err.(*quic.DatagramTooLargeError); ok { if len(c.streams) < MaxIncomingStreams { stream, err := c.quicConn.OpenStream() + errors.LogInfo(c.ctx, "Write OpenStream ", err) if err == nil { c.streams = append(c.streams, stream) } else { @@ -108,6 +114,7 @@ func (c *interConn) Write(b []byte) (int, error) { if currentStream > len(c.streams) - 1 { currentStream = 0; } + errors.LogInfo(c.ctx, "Write stream ", len(b), currentStream, len(c.streams)) return c.streams[currentStream].Write(b) } if err != nil { diff --git a/transport/internet/quic/quic_test.go b/transport/internet/quic/quic_test.go index eb1d707b..fbd9cf7e 100644 --- a/transport/internet/quic/quic_test.go +++ b/transport/internet/quic/quic_test.go @@ -89,6 +89,8 @@ func testQuicConnection(t *testing.T, dataLen int32) { t.Error(r) } + time.Sleep(1000 * time.Millisecond) + common.Must2(conn.Write(b1)) b2.Clear() From 33acf3c2b60f99bb1477d5d2aaadd0af66e0a715 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Sun, 16 Mar 2025 23:07:56 -0400 Subject: [PATCH 6/6] Add multi buffer reader --- transport/internet/quic/conn.go | 30 +++++++++++++++++++++++++--- transport/internet/quic/quic_test.go | 4 ++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/transport/internet/quic/conn.go b/transport/internet/quic/conn.go index a0bc207d..ddfc1235 100644 --- a/transport/internet/quic/conn.go +++ b/transport/internet/quic/conn.go @@ -19,6 +19,7 @@ type interConn struct { quicConn quic.Connection // small udp packet can be sent with Datagram directly streams []quic.Stream // other packets can be sent via steam, it offer mux, reliability, fragmentation and ordering readChannel chan readResult + reader buf.MultiBufferContainer done *done.Instance local net.Addr remote net.Addr @@ -34,6 +35,7 @@ func NewConnInitReader(ctx context.Context, quicConn quic.Connection, done *done ctx: ctx, quicConn: quicConn, readChannel: make(chan readResult), + reader: buf.MultiBufferContainer{}, done: done, local: quicConn.LocalAddr(), remote: remote, @@ -81,13 +83,18 @@ func (c *interConn) acceptStreams() { } func (c *interConn) Read(b []byte) (int, error) { + if c.reader.MultiBuffer.Len() > 0 { + return c.reader.Read(b) + } received := <- c.readChannel if received.err != nil { return 0, received.err } - nBytes := copy(b, received.buffer[:]) - errors.LogInfo(c.ctx, "Read copy ", nBytes) - return nBytes, nil + buffer := buf.New() + buffer.Write(received.buffer) + c.reader.MultiBuffer = append(c.reader.MultiBuffer, buffer) + errors.LogInfo(c.ctx, "Read copy ", len(received.buffer)) + return c.reader.Read(b) } func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error { @@ -98,6 +105,23 @@ func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error { } func (c *interConn) Write(b []byte) (int, error) { + if len(b) > 1240 { // TODO: why quic-go increase internal MTU causing packet loss? + if len(c.streams) < MaxIncomingStreams { + stream, err := c.quicConn.OpenStream() + errors.LogInfo(c.ctx, "Write OpenStream ", err) + if err == nil { + c.streams = append(c.streams, stream) + } else { + errors.LogInfoInner(c.ctx, err, "failed to openStream: ") + } + } + currentStream++; + if currentStream > len(c.streams) - 1 { + currentStream = 0; + } + errors.LogInfo(c.ctx, "Write stream ", len(b), currentStream, len(c.streams)) + return c.streams[currentStream].Write(b) + } var err = c.quicConn.SendDatagram(b) errors.LogInfo(c.ctx, "Write SendDatagram ", len(b), err) if _, ok := err.(*quic.DatagramTooLargeError); ok { diff --git a/transport/internet/quic/quic_test.go b/transport/internet/quic/quic_test.go index fbd9cf7e..d174424a 100644 --- a/transport/internet/quic/quic_test.go +++ b/transport/internet/quic/quic_test.go @@ -22,6 +22,10 @@ func TestShortQuicConnection(t *testing.T) { testQuicConnection(t, 1024) } +func TestAroundMTUQuicConnection(t *testing.T) { + testQuicConnection(t, 1247) +} + func TestLongQuicConnection(t *testing.T) { testQuicConnection(t, 1500) }