From b8d3cb2925097e9d2d9ed5bb743486f52a0046d1 Mon Sep 17 00:00:00 2001 From: paulaan Date: Tue, 23 Apr 2019 13:05:07 +0700 Subject: [PATCH 1/9] [fanout] We implement this fan-out pattern for our system --- messaging/fanout.go | 144 +++++++++++++++++++++++++++++++ messaging/fanout.md | 205 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 349 insertions(+) create mode 100644 messaging/fanout.go create mode 100644 messaging/fanout.md diff --git a/messaging/fanout.go b/messaging/fanout.go new file mode 100644 index 0000000..e73240b --- /dev/null +++ b/messaging/fanout.go @@ -0,0 +1,144 @@ +package messaging + +import ( + "context" + "io" + "sync" + "sync/atomic" + + "bitbucket.org/sakariai/sakari/log" + "bitbucket.org/sakariai/sakari/log/field" +) + +const ( + MaxWorkers = 32 + MaxQueueSize = 128 +) + +var ( + running uint32 = 0 +) + +type Pipeline struct { + workers []*worker + chain chan interface{} +} + +func (p *Pipeline) Start() { + distributeToChannels := func(ch chan interface{}, cs []*worker) { + writer := cs[0] //first worker must stream as default + for { + for _, c := range cs { + expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 + select { + case val := <-ch: + runningWorker := atomic.LoadUint32(&running) + if c.index <= runningWorker || c.index <= expectationWorkers { + writer = c + } + if c.debug { + log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) + } + go writer.stream(val) + } + } + } + } + + go distributeToChannels(p.chain, p.workers) +} + +func (p *Pipeline) Dispatch(msg interface{}) { + p.chain <- msg +} + +type DispatcherBuilder func() Dispatcher + +func NewPipeline(d DispatcherBuilder, ch chan interface{}, idle uint32, debug bool) *Pipeline { + wk := make([]*worker, 0, MaxWorkers) + for i := 0; i < MaxWorkers; i++ { + wk = append(wk, + &worker{ + index: uint32(i + 1), + chain: make(chan interface{}, MaxQueueSize), + mutex: new(sync.Mutex), + debug: debug, + idle: idle, + Dispatcher: d(), + }) + } + return &Pipeline{workers: wk, chain: ch} +} + +type Dispatcher interface { + Before(context.Context) error + After() error + Process(interface{}) error +} + +type worker struct { + index uint32 + mutex *sync.Mutex + running bool + chain chan interface{} + debug bool + idle uint32 + Dispatcher +} + +func (c *worker) stream(val interface{}) { + c.chain <- val + if !c.running { + c.mutex.Lock() + c.running = true + atomic.AddUint32(&running, 1) + defer atomic.AddUint32(&running, ^uint32(1-1)) + ctx, cancel := context.WithCancel(context.Background()) + err := c.Before(ctx) + + if err != nil { + log.Error("can not start worker", field.Error(err)) + } + defer func(w *worker, cancel context.CancelFunc) { + if w.debug { + log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) + } + err := c.After() + if err != nil { + log.Error("can not finish track issue", field.Error(err)) + } + cancel() + w.mutex.Unlock() + w.running = false + }(c, cancel) + var idle uint32 = 0 + for { + select { + case msg := <-c.chain: + atomic.StoreUint32(&idle, 0) + if msg != nil { + err := c.Process(msg) + if err != nil { + log.Error("can not process message", + field.Any("msg", &msg), + field.Error(err), + ) + } + if err == io.EOF { + return + } + } + default: + atomic.AddUint32(&idle, 1) + if i := atomic.LoadUint32(&idle); i > 0 { + if i > c.idle { + return + } + if c.debug { + log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) + } + } + } + } + } +} diff --git a/messaging/fanout.md b/messaging/fanout.md new file mode 100644 index 0000000..06541d2 --- /dev/null +++ b/messaging/fanout.md @@ -0,0 +1,205 @@ +## Implementation +We can activate worker base on traffic of parent channel + +```go +package concurrency + +import ( + "context" + "io" + "sync" + "sync/atomic" + + "bitbucket.org/sakariai/sakari/log" + "bitbucket.org/sakariai/sakari/log/field" +) + +const ( + MaxWorkers = 32 + MaxQueueSize = 128 +) + +var ( + running uint32 = 0 +) + +type Pipeline struct { + workers []*worker + chain chan interface{} +} + +func (p *Pipeline) Start() { + distributeToChannels := func(ch chan interface{}, cs []*worker) { + writer := cs[0] //first worker must stream as default + for { + for _, c := range cs { + expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 + select { + case val := <-ch: + runningWorker := atomic.LoadUint32(&running) + if c.index <= runningWorker || c.index <= expectationWorkers { + writer = c + } + if c.debug { + log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) + } + go writer.stream(val) + } + } + } + } + + go distributeToChannels(p.chain, p.workers) +} + +func (p *Pipeline) Dispatch(msg interface{}) { + p.chain <- msg +} + +type DispatcherBuilder func() Dispatcher + +func NewPipeline(d DispatcherBuilder, ch chan interface{}, idle uint32, debug bool) *Pipeline { + wk := make([]*worker, 0, MaxWorkers) + for i := 0; i < MaxWorkers; i++ { + wk = append(wk, + &worker{ + index: uint32(i + 1), + chain: make(chan interface{}, MaxQueueSize), + mutex: new(sync.Mutex), + debug: debug, + idle: idle, + Dispatcher: d(), + }) + } + return &Pipeline{workers: wk, chain: ch} +} + +type Dispatcher interface { + Before(context.Context) error + After() error + Process(interface{}) error +} + +type worker struct { + index uint32 + mutex *sync.Mutex + running bool + chain chan interface{} + debug bool + idle uint32 + Dispatcher +} + +func (c *worker) stream(val interface{}) { + c.chain <- val + if !c.running { + c.mutex.Lock() + c.running = true + atomic.AddUint32(&running, 1) + defer atomic.AddUint32(&running, ^uint32(1-1)) + ctx, cancel := context.WithCancel(context.Background()) + err := c.Before(ctx) + + if err != nil { + log.Error("can not start worker", field.Error(err)) + } + defer func(w *worker, cancel context.CancelFunc) { + if w.debug { + log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) + } + err := c.After() + if err != nil { + log.Error("can not finish track issue", field.Error(err)) + } + cancel() + w.mutex.Unlock() + w.running = false + }(c, cancel) + var idle uint32 = 0 + for { + select { + case msg := <-c.chain: + atomic.StoreUint32(&idle, 0) + if msg != nil { + err := c.Process(msg) + if err != nil { + log.Error("can not process message", + field.Any("msg", &msg), + field.Error(err), + ) + } + if err == io.EOF { + return + } + } + default: + atomic.AddUint32(&idle, 1) + if i := atomic.LoadUint32(&idle); i > 0 { + if i > c.idle { + return + } + if c.debug { + log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) + } + } + } + } + } +} + +``` + +## Usage + +```go +import concurrency + +type taggingDispatcher struct { + Address string + stream proto.Havilah_StreamMetricClient + conn *grpc.ClientConn +} + +func (d *taggingDispatcher) Before(ctx context.Context) error { + conn, err := grpc.Dial(d.Address, grpc.WithInsecure()) + if err != nil { + return err + } + d.conn = conn + client := proto.NewHavilahClient(conn) + + stream, err := client.StreamMetric(ctx) + if err != nil { + return err + } + d.stream = stream + return nil +} + +func (d *taggingDispatcher) After() error { + _, err := d.stream.CloseAndRecv() + + e := d.conn.Close() + if e != nil { + log.Error("close havilah connection error", field.Error(e)) + } + return err +} + +func (d *taggingDispatcher) Process(msg interface{}) error { + return d.stream.Send(msg.(*proto.Tagging)) +} + + +tagging := &Tagging{ + topic: topic, + pipeline: concurrency.NewPipeline(func() concurrency.Dispatcher { + return &taggingDispatcher{Address: address} + }, ch, idle, debug), +} +tagging.pipeline.Start() + +func main(){ + tagging.pipeline.Dispatch(youStruct{}) +} +``` From ab12460b5d890a2ab6ec73cb2becbcd5a757ec65 Mon Sep 17 00:00:00 2001 From: paulaan Date: Tue, 23 Apr 2019 13:40:50 +0700 Subject: [PATCH 2/9] [fanout] Refine documents --- messaging/fanout.go | 22 +++---- messaging/fanout.md | 154 ++------------------------------------------ 2 files changed, 16 insertions(+), 160 deletions(-) diff --git a/messaging/fanout.go b/messaging/fanout.go index e73240b..e52785b 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -2,12 +2,10 @@ package messaging import ( "context" + "go.uber.org/zap" "io" "sync" "sync/atomic" - - "bitbucket.org/sakariai/sakari/log" - "bitbucket.org/sakariai/sakari/log/field" ) const ( @@ -17,6 +15,7 @@ const ( var ( running uint32 = 0 + log, _ = zap.NewDevelopment() ) type Pipeline struct { @@ -37,7 +36,7 @@ func (p *Pipeline) Start() { writer = c } if c.debug { - log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) + log.Info("Worker receiving", zap.Any("index", writer.index), zap.Any("running", runningWorker), zap.Any("no# workers", expectationWorkers)) } go writer.stream(val) } @@ -54,8 +53,9 @@ func (p *Pipeline) Dispatch(msg interface{}) { type DispatcherBuilder func() Dispatcher -func NewPipeline(d DispatcherBuilder, ch chan interface{}, idle uint32, debug bool) *Pipeline { +func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline { wk := make([]*worker, 0, MaxWorkers) + ch := make(chan interface{}, 4096) for i := 0; i < MaxWorkers; i++ { wk = append(wk, &worker{ @@ -97,15 +97,15 @@ func (c *worker) stream(val interface{}) { err := c.Before(ctx) if err != nil { - log.Error("can not start worker", field.Error(err)) + log.Error("can not start worker", zap.Error(err)) } defer func(w *worker, cancel context.CancelFunc) { if w.debug { - log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) + log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle)) } err := c.After() if err != nil { - log.Error("can not finish track issue", field.Error(err)) + log.Error("can not finish track issue", zap.Error(err)) } cancel() w.mutex.Unlock() @@ -120,8 +120,8 @@ func (c *worker) stream(val interface{}) { err := c.Process(msg) if err != nil { log.Error("can not process message", - field.Any("msg", &msg), - field.Error(err), + zap.Any("msg", &msg), + zap.Error(err), ) } if err == io.EOF { @@ -135,7 +135,7 @@ func (c *worker) stream(val interface{}) { return } if c.debug { - log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) + log.Info("Idle", zap.Any("worker index", c.index), zap.Any("idle", idle)) } } } diff --git a/messaging/fanout.md b/messaging/fanout.md index 06541d2..c561e81 100644 --- a/messaging/fanout.md +++ b/messaging/fanout.md @@ -1,153 +1,9 @@ +Fan-Out Design Patterns +=================================== ## Implementation -We can activate worker base on traffic of parent channel - -```go -package concurrency - -import ( - "context" - "io" - "sync" - "sync/atomic" - - "bitbucket.org/sakariai/sakari/log" - "bitbucket.org/sakariai/sakari/log/field" -) - -const ( - MaxWorkers = 32 - MaxQueueSize = 128 -) - -var ( - running uint32 = 0 -) - -type Pipeline struct { - workers []*worker - chain chan interface{} -} - -func (p *Pipeline) Start() { - distributeToChannels := func(ch chan interface{}, cs []*worker) { - writer := cs[0] //first worker must stream as default - for { - for _, c := range cs { - expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 - select { - case val := <-ch: - runningWorker := atomic.LoadUint32(&running) - if c.index <= runningWorker || c.index <= expectationWorkers { - writer = c - } - if c.debug { - log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) - } - go writer.stream(val) - } - } - } - } - - go distributeToChannels(p.chain, p.workers) -} - -func (p *Pipeline) Dispatch(msg interface{}) { - p.chain <- msg -} - -type DispatcherBuilder func() Dispatcher - -func NewPipeline(d DispatcherBuilder, ch chan interface{}, idle uint32, debug bool) *Pipeline { - wk := make([]*worker, 0, MaxWorkers) - for i := 0; i < MaxWorkers; i++ { - wk = append(wk, - &worker{ - index: uint32(i + 1), - chain: make(chan interface{}, MaxQueueSize), - mutex: new(sync.Mutex), - debug: debug, - idle: idle, - Dispatcher: d(), - }) - } - return &Pipeline{workers: wk, chain: ch} -} - -type Dispatcher interface { - Before(context.Context) error - After() error - Process(interface{}) error -} - -type worker struct { - index uint32 - mutex *sync.Mutex - running bool - chain chan interface{} - debug bool - idle uint32 - Dispatcher -} - -func (c *worker) stream(val interface{}) { - c.chain <- val - if !c.running { - c.mutex.Lock() - c.running = true - atomic.AddUint32(&running, 1) - defer atomic.AddUint32(&running, ^uint32(1-1)) - ctx, cancel := context.WithCancel(context.Background()) - err := c.Before(ctx) - - if err != nil { - log.Error("can not start worker", field.Error(err)) - } - defer func(w *worker, cancel context.CancelFunc) { - if w.debug { - log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) - } - err := c.After() - if err != nil { - log.Error("can not finish track issue", field.Error(err)) - } - cancel() - w.mutex.Unlock() - w.running = false - }(c, cancel) - var idle uint32 = 0 - for { - select { - case msg := <-c.chain: - atomic.StoreUint32(&idle, 0) - if msg != nil { - err := c.Process(msg) - if err != nil { - log.Error("can not process message", - field.Any("msg", &msg), - field.Error(err), - ) - } - if err == io.EOF { - return - } - } - default: - atomic.AddUint32(&idle, 1) - if i := atomic.LoadUint32(&idle); i > 0 { - if i > c.idle { - return - } - if c.debug { - log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) - } - } - } - } - } -} - -``` +We can activate worker based on traffic of parent channel +`NewPipeline(d DispatcherBuilder, idle uint32, debug bool)` +* Set `idle` around 1000-2000 for deactivate worker in select block ## Usage From 26549a1e8807e9a226c987d4e83e32ce6d692880 Mon Sep 17 00:00:00 2001 From: paulaan Date: Tue, 23 Apr 2019 15:24:12 +0700 Subject: [PATCH 3/9] [fanout] Break worker from higher index --- messaging/fanout.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/messaging/fanout.go b/messaging/fanout.go index e52785b..89e53d5 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -29,18 +29,20 @@ func (p *Pipeline) Start() { for { for _, c := range cs { expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 + runningWorker := atomic.LoadUint32(&running) + if c.index > runningWorker+1 || c.index > expectationWorkers { + break + } select { case val := <-ch: - runningWorker := atomic.LoadUint32(&running) - if c.index <= runningWorker || c.index <= expectationWorkers { - writer = c - } + writer = c if c.debug { log.Info("Worker receiving", zap.Any("index", writer.index), zap.Any("running", runningWorker), zap.Any("no# workers", expectationWorkers)) } go writer.stream(val) } } + } } } From 42e2301e29163d043ecfe37a5483a308405a3a4b Mon Sep 17 00:00:00 2001 From: paulaan Date: Wed, 1 May 2019 18:14:30 +0700 Subject: [PATCH 4/9] [fanout] Pipeline will maintain its broker --- messaging/fanout.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/messaging/fanout.go b/messaging/fanout.go index 89e53d5..3690a1b 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -14,13 +14,13 @@ const ( ) var ( - running uint32 = 0 log, _ = zap.NewDevelopment() ) type Pipeline struct { workers []*worker chain chan interface{} + running *uint32 } func (p *Pipeline) Start() { @@ -29,7 +29,7 @@ func (p *Pipeline) Start() { for { for _, c := range cs { expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 - runningWorker := atomic.LoadUint32(&running) + runningWorker := atomic.LoadUint32(p.running) if c.index > runningWorker+1 || c.index > expectationWorkers { break } @@ -42,11 +42,11 @@ func (p *Pipeline) Start() { go writer.stream(val) } } - } } } +} - go distributeToChannels(p.chain, p.workers) +go distributeToChannels(p.chain, p.workers) } func (p *Pipeline) Dispatch(msg interface{}) { @@ -58,6 +58,7 @@ type DispatcherBuilder func() Dispatcher func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline { wk := make([]*worker, 0, MaxWorkers) ch := make(chan interface{}, 4096) + pipe := &Pipeline{chain: ch, running: new(uint32)} for i := 0; i < MaxWorkers; i++ { wk = append(wk, &worker{ @@ -67,9 +68,11 @@ func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline { debug: debug, idle: idle, Dispatcher: d(), + broker: pipe.running, }) } - return &Pipeline{workers: wk, chain: ch} + pipe.workers = wk + return pipe } type Dispatcher interface { @@ -85,6 +88,7 @@ type worker struct { chain chan interface{} debug bool idle uint32 + broker *uint32 Dispatcher } @@ -93,8 +97,8 @@ func (c *worker) stream(val interface{}) { if !c.running { c.mutex.Lock() c.running = true - atomic.AddUint32(&running, 1) - defer atomic.AddUint32(&running, ^uint32(1-1)) + atomic.AddUint32(c.broker, 1) + defer atomic.AddUint32(c.broker, ^uint32(1 - 1)) ctx, cancel := context.WithCancel(context.Background()) err := c.Before(ctx) From c033afdbea146154d774605688521a2d92861746 Mon Sep 17 00:00:00 2001 From: paulaan Date: Thu, 2 May 2019 17:27:28 +0700 Subject: [PATCH 5/9] [fanout] Organize fan-out pipeline --- messaging/fanout.go | 82 ++++++++++++++++++--------------------------- 1 file changed, 33 insertions(+), 49 deletions(-) diff --git a/messaging/fanout.go b/messaging/fanout.go index 3690a1b..83cf36e 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -8,45 +8,39 @@ import ( "sync/atomic" ) -const ( - MaxWorkers = 32 - MaxQueueSize = 128 +var ( + log, _ = zap.NewDevelopment() ) -var ( - log, _ = zap.NewDevelopment() +const ( + MaxWorkers = 16 + MaxQueueSize = 512 + MasterQueueSize = MaxQueueSize * MaxWorkers ) type Pipeline struct { - workers []*worker + workers map[int]*worker chain chan interface{} - running *uint32 } func (p *Pipeline) Start() { - distributeToChannels := func(ch chan interface{}, cs []*worker) { - writer := cs[0] //first worker must stream as default + go func(pipe *Pipeline) { for { - for _, c := range cs { - expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 - runningWorker := atomic.LoadUint32(p.running) - if c.index > runningWorker+1 || c.index > expectationWorkers { + expectationWorkers := len(pipe.chain) + if expectationWorkers > MaxWorkers { + expectationWorkers = expectationWorkers % MaxWorkers + } + for _, c := range pipe.workers { + if expectationWorkers < int(c.index) { break } select { - case val := <-ch: - writer = c - if c.debug { - log.Info("Worker receiving", zap.Any("index", writer.index), zap.Any("running", runningWorker), zap.Any("no# workers", expectationWorkers)) - } - go writer.stream(val) + case val := <-pipe.chain: + go c.stream(val) } } } - } -} - -go distributeToChannels(p.chain, p.workers) + }(p) } func (p *Pipeline) Dispatch(msg interface{}) { @@ -56,23 +50,19 @@ func (p *Pipeline) Dispatch(msg interface{}) { type DispatcherBuilder func() Dispatcher func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline { - wk := make([]*worker, 0, MaxWorkers) - ch := make(chan interface{}, 4096) - pipe := &Pipeline{chain: ch, running: new(uint32)} + ch := make(chan interface{}, MasterQueueSize) + wk := make(map[int]*worker) for i := 0; i < MaxWorkers; i++ { - wk = append(wk, - &worker{ - index: uint32(i + 1), - chain: make(chan interface{}, MaxQueueSize), - mutex: new(sync.Mutex), - debug: debug, - idle: idle, - Dispatcher: d(), - broker: pipe.running, - }) + wk[i] = &worker{ + index: uint32(i + 1), + chain: make(chan interface{}, MaxQueueSize), + mutex: new(sync.Mutex), + debug: debug, + idle: idle, + Dispatcher: d(), + } } - pipe.workers = wk - return pipe + return &Pipeline{workers: wk, chain: ch} } type Dispatcher interface { @@ -88,7 +78,6 @@ type worker struct { chain chan interface{} debug bool idle uint32 - broker *uint32 Dispatcher } @@ -97,14 +86,7 @@ func (c *worker) stream(val interface{}) { if !c.running { c.mutex.Lock() c.running = true - atomic.AddUint32(c.broker, 1) - defer atomic.AddUint32(c.broker, ^uint32(1 - 1)) ctx, cancel := context.WithCancel(context.Background()) - err := c.Before(ctx) - - if err != nil { - log.Error("can not start worker", zap.Error(err)) - } defer func(w *worker, cancel context.CancelFunc) { if w.debug { log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle)) @@ -117,6 +99,11 @@ func (c *worker) stream(val interface{}) { w.mutex.Unlock() w.running = false }(c, cancel) + err := c.Before(ctx) + + if err != nil { + log.Error("can not start worker", zap.Error(err)) + } var idle uint32 = 0 for { select { @@ -140,9 +127,6 @@ func (c *worker) stream(val interface{}) { if i > c.idle { return } - if c.debug { - log.Info("Idle", zap.Any("worker index", c.index), zap.Any("idle", idle)) - } } } } From 1d5fba6f6f208b5c30c71f46494d974ce1fcefe2 Mon Sep 17 00:00:00 2001 From: paulaan Date: Thu, 2 May 2019 17:37:23 +0700 Subject: [PATCH 6/9] [fanout] Codereview --- messaging/fanout.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/messaging/fanout.go b/messaging/fanout.go index 83cf36e..d2c3304 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -30,8 +30,8 @@ func (p *Pipeline) Start() { if expectationWorkers > MaxWorkers { expectationWorkers = expectationWorkers % MaxWorkers } - for _, c := range pipe.workers { - if expectationWorkers < int(c.index) { + for index, c := range pipe.workers { + if expectationWorkers < index { break } select { From 4655d25d0882c49b6c6f6fbf593ce1d908e8ef6a Mon Sep 17 00:00:00 2001 From: paulaan Date: Thu, 2 May 2019 17:40:57 +0700 Subject: [PATCH 7/9] [fanout] Remove defect --- messaging/fanout.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/messaging/fanout.go b/messaging/fanout.go index d2c3304..83cf36e 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -30,8 +30,8 @@ func (p *Pipeline) Start() { if expectationWorkers > MaxWorkers { expectationWorkers = expectationWorkers % MaxWorkers } - for index, c := range pipe.workers { - if expectationWorkers < index { + for _, c := range pipe.workers { + if expectationWorkers < int(c.index) { break } select { From 9c0c9709ff557163ed67bb5dec765463ee737d3c Mon Sep 17 00:00:00 2001 From: paulaan Date: Thu, 2 May 2019 20:16:56 +0700 Subject: [PATCH 8/9] [fanout] Remove defect --- messaging/fanout.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/messaging/fanout.go b/messaging/fanout.go index 83cf36e..6bbd2be 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -3,7 +3,6 @@ package messaging import ( "context" "go.uber.org/zap" - "io" "sync" "sync/atomic" ) @@ -117,9 +116,6 @@ func (c *worker) stream(val interface{}) { zap.Error(err), ) } - if err == io.EOF { - return - } } default: atomic.AddUint32(&idle, 1) From 44115024e2dcc995afb534be4a688b8019e30084 Mon Sep 17 00:00:00 2001 From: paulaan Date: Mon, 6 May 2019 21:43:42 +0700 Subject: [PATCH 9/9] [fanout] Optimize fanout --- messaging/fanout.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/messaging/fanout.go b/messaging/fanout.go index 6bbd2be..3d56805 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -22,21 +22,21 @@ type Pipeline struct { chain chan interface{} } -func (p *Pipeline) Start() { +func (p *Pipeline) Start(ctx context.Context) { go func(pipe *Pipeline) { for { - expectationWorkers := len(pipe.chain) - if expectationWorkers > MaxWorkers { - expectationWorkers = expectationWorkers % MaxWorkers + expectationWorkers := len(pipe.chain) % MaxWorkers + if expectationWorkers >= MaxWorkers { + expectationWorkers = 0 } - for _, c := range pipe.workers { - if expectationWorkers < int(c.index) { - break - } - select { - case val := <-pipe.chain: - go c.stream(val) + select { + case <-ctx.Done(): + return + case val, ok := <-pipe.chain: + if !ok { + return } + go pipe.workers[expectationWorkers].stream(val) } } }(p)