From d5aad34fcebc7737e46d274efc9e6c3d058353bd Mon Sep 17 00:00:00 2001 From: "jian.han" Date: Wed, 24 Jan 2018 16:16:51 +1000 Subject: [PATCH] new pipeline example from book --- concurrency/pipeline_from_book/main.go | 62 ++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 concurrency/pipeline_from_book/main.go diff --git a/concurrency/pipeline_from_book/main.go b/concurrency/pipeline_from_book/main.go new file mode 100644 index 0000000..2581892 --- /dev/null +++ b/concurrency/pipeline_from_book/main.go @@ -0,0 +1,62 @@ +package main + +import "github.com/davecgh/go-spew/spew" + +func main() { + run() +} + +func run() { + done := make(chan interface{}) + defer close(done) + intStream := generator(done, 1, 2, 3, 4) + pipline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2) + for v := range pipline { + spew.Dump(v) + } +} + +func generator(done <-chan interface{}, integers ...int) <-chan int { + intStream := make(chan int) + go func() { + defer close(intStream) + for _, i := range integers { + select { + case <-done: + return + case intStream <- i: + } + } + }() + return intStream +} + +func multiply(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int { + multipliedStream := make(chan int) + go func() { + defer close(multipliedStream) + for i := range intStream { + select { + case <-done: + return + case multipliedStream <- i * multiplier: + } + } + }() + return multipliedStream +} + +func add(done <-chan interface{}, intStream <-chan int, additive int) <-chan int { + addStream := make(chan int) + go func() { + defer close(addStream) + for i := range intStream { + select { + case <-done: + return + case addStream <- i: + } + } + }() + return addStream +}