From 33acf3c2b60f99bb1477d5d2aaadd0af66e0a715 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Sun, 16 Mar 2025 23:07:56 -0400 Subject: [PATCH] Add multi buffer reader --- transport/internet/quic/conn.go | 30 +++++++++++++++++++++++++--- transport/internet/quic/quic_test.go | 4 ++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/transport/internet/quic/conn.go b/transport/internet/quic/conn.go index a0bc207d..ddfc1235 100644 --- a/transport/internet/quic/conn.go +++ b/transport/internet/quic/conn.go @@ -19,6 +19,7 @@ type interConn struct { quicConn quic.Connection // small udp packet can be sent with Datagram directly streams []quic.Stream // other packets can be sent via steam, it offer mux, reliability, fragmentation and ordering readChannel chan readResult + reader buf.MultiBufferContainer done *done.Instance local net.Addr remote net.Addr @@ -34,6 +35,7 @@ func NewConnInitReader(ctx context.Context, quicConn quic.Connection, done *done ctx: ctx, quicConn: quicConn, readChannel: make(chan readResult), + reader: buf.MultiBufferContainer{}, done: done, local: quicConn.LocalAddr(), remote: remote, @@ -81,13 +83,18 @@ func (c *interConn) acceptStreams() { } func (c *interConn) Read(b []byte) (int, error) { + if c.reader.MultiBuffer.Len() > 0 { + return c.reader.Read(b) + } received := <- c.readChannel if received.err != nil { return 0, received.err } - nBytes := copy(b, received.buffer[:]) - errors.LogInfo(c.ctx, "Read copy ", nBytes) - return nBytes, nil + buffer := buf.New() + buffer.Write(received.buffer) + c.reader.MultiBuffer = append(c.reader.MultiBuffer, buffer) + errors.LogInfo(c.ctx, "Read copy ", len(received.buffer)) + return c.reader.Read(b) } func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error { @@ -98,6 +105,23 @@ func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error { } func (c *interConn) Write(b []byte) (int, error) { + if len(b) > 1240 { // TODO: why quic-go increase internal MTU causing packet loss? + if len(c.streams) < MaxIncomingStreams { + stream, err := c.quicConn.OpenStream() + errors.LogInfo(c.ctx, "Write OpenStream ", err) + if err == nil { + c.streams = append(c.streams, stream) + } else { + errors.LogInfoInner(c.ctx, err, "failed to openStream: ") + } + } + currentStream++; + if currentStream > len(c.streams) - 1 { + currentStream = 0; + } + errors.LogInfo(c.ctx, "Write stream ", len(b), currentStream, len(c.streams)) + return c.streams[currentStream].Write(b) + } var err = c.quicConn.SendDatagram(b) errors.LogInfo(c.ctx, "Write SendDatagram ", len(b), err) if _, ok := err.(*quic.DatagramTooLargeError); ok { diff --git a/transport/internet/quic/quic_test.go b/transport/internet/quic/quic_test.go index fbd9cf7e..d174424a 100644 --- a/transport/internet/quic/quic_test.go +++ b/transport/internet/quic/quic_test.go @@ -22,6 +22,10 @@ func TestShortQuicConnection(t *testing.T) { testQuicConnection(t, 1024) } +func TestAroundMTUQuicConnection(t *testing.T) { + testQuicConnection(t, 1247) +} + func TestLongQuicConnection(t *testing.T) { testQuicConnection(t, 1500) }