From 8d36fd77154cb54ea16ad36ec9113ddc7997cda2 Mon Sep 17 00:00:00 2001 From: Meo597 <197331664+Meo597@users.noreply.github.com> Date: Tue, 25 Mar 2025 18:17:15 +0800 Subject: [PATCH] VLESS: Add rate limiting to fallback handling via token bucket --- common/buf/reader.go | 26 ++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 ++ proxy/vless/inbound/inbound.go | 14 ++++++++++++-- 4 files changed, 41 insertions(+), 2 deletions(-) diff --git a/common/buf/reader.go b/common/buf/reader.go index 33d362d4..bbb95595 100644 --- a/common/buf/reader.go +++ b/common/buf/reader.go @@ -3,6 +3,8 @@ package buf import ( "io" + "github.com/juju/ratelimit" + "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/errors" ) @@ -172,3 +174,27 @@ func (r *PacketReader) ReadMultiBuffer() (MultiBuffer, error) { } return MultiBuffer{b}, nil } + +// Reader returns a reader that is rate limited by +// the given token bucket. Each token in the bucket +// represents one byte. +type RateLimitedReader struct { + Reader Reader + Bucket *ratelimit.Bucket +} + +// ReadMultiBuffer implements Reader. +func (r *RateLimitedReader) ReadMultiBuffer() (MultiBuffer, error) { + b, err := r.Reader.ReadMultiBuffer() + if err != nil { + return nil, err + } + + var total int64 + for _, buf := range b { + total += int64(buf.Len()) + } + r.Bucket.Wait(total) + + return b, nil +} diff --git a/go.mod b/go.mod index 456b3ab4..5b2ce0b2 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/btree v1.1.2 // indirect github.com/google/pprof v0.0.0-20240528025155-186aa0362fba // indirect + github.com/juju/ratelimit v1.0.2 github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/onsi/ginkgo/v2 v2.19.0 // indirect diff --git a/go.sum b/go.sum index a43edf73..ca93b0ed 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/h12w/go-socks5 v0.0.0-20200522160539-76189e178364 h1:5XxdakFhqd9dnXoAZy1Mb2R/DZ6D1e+0bGC/JhucGYI= github.com/h12w/go-socks5 v0.0.0-20200522160539-76189e178364/go.mod h1:eDJQioIyy4Yn3MVivT7rv/39gAJTrA7lgmYr8EW950c= +github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI= +github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= diff --git a/proxy/vless/inbound/inbound.go b/proxy/vless/inbound/inbound.go index 1da2e091..0d4dad1b 100644 --- a/proxy/vless/inbound/inbound.go +++ b/proxy/vless/inbound/inbound.go @@ -11,6 +11,8 @@ import ( "time" "unsafe" + "github.com/juju/ratelimit" + "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/errors" @@ -405,7 +407,11 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s return errors.New("failed to set PROXY protocol v", fb.Xver).Base(err).AtWarning() } } - if err := buf.Copy(reader, serverWriter, buf.UpdateActivity(timer)); err != nil { + rlReader := &buf.RateLimitedReader{ + Reader: reader, + Bucket: ratelimit.NewBucketWithRate(128*1024, 512*1024), + } + if err := buf.Copy(rlReader, serverWriter, buf.UpdateActivity(timer)); err != nil { return errors.New("failed to fallback request payload").Base(err).AtInfo() } return nil @@ -414,8 +420,12 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s writer := buf.NewWriter(connection) getResponse := func() error { + rlServerReader := &buf.RateLimitedReader{ + Reader: serverReader, + Bucket: ratelimit.NewBucketWithRate(128*1024, 512*1024), + } defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) - if err := buf.Copy(serverReader, writer, buf.UpdateActivity(timer)); err != nil { + if err := buf.Copy(rlServerReader, writer, buf.UpdateActivity(timer)); err != nil { return errors.New("failed to deliver response payload").Base(err).AtInfo() } return nil