diff --git a/concurrency/subtasks/divide_and_conquer/divide_and_conquer.go b/concurrency/subtasks/divide_and_conquer/divide_and_conquer.go new file mode 100644 index 0000000..0c60ae5 --- /dev/null +++ b/concurrency/subtasks/divide_and_conquer/divide_and_conquer.go @@ -0,0 +1,104 @@ +package divide_and_conquer + +import ( + "fmt" + "reflect" + "runtime" + "time" + + "github.com/davecgh/go-spew/spew" +) + +func RunDivideAndConquer() { + type in struct { + a int + b int + } + + type out struct { + source string + result int + } + + evaluators := []Evaluator{ + EvaluatorFunc(func(inV interface{}) (interface{}, error) { + i := inV.(in) + r := i.a + i.b + return out{"Plus", r}, nil + }), + EvaluatorFunc(func(inV interface{}) (interface{}, error) { + i := inV.(in) + r := i.a * i.b + return out{"Multi", r}, nil + }), + EvaluatorFunc(func(inV interface{}) (interface{}, error) { + i := inV.(in) + r := i.a - i.b + return out{"min", r}, nil + }), + EvaluatorFunc(func(inV interface{}) (interface{}, error) { + i := inV.(in) + r := i.a / i.b + return out{"divider", r}, nil + }), + } + + r, errors := DivideAndConquer(in{2, 3}, evaluators, 10*time.Millisecond) + spew.Dump(r, errors) +} + +type Evaluator interface { + Evaluate(data interface{}) (interface{}, error) + Name() string +} +type EvaluatorFunc func(interface{}) (interface{}, error) + +func (ef EvaluatorFunc) Evaluate(in interface{}) (interface{}, error) { + return ef(in) +} + +func (ef EvaluatorFunc) Name() string { + return runtime.FuncForPC(reflect.ValueOf(ef).Pointer()).Name() +} + +func DivideAndConquer(data interface{}, evaluators []Evaluator, timeout time.Duration) ([]interface{}, []error) { + gather := make(chan interface{}, len(evaluators)) + errors := make(chan error, len(evaluators)) + for _, v := range evaluators { + go func(e Evaluator) { + // Why not just use an unbuffered channel? The answer is that we don’t want to leak any goroutines. + // While the Go runtime is capable of handling thousands or hundreds of thousands of goroutines at a time, + // each goroutine does use some resources, so you don’t want to leave them hanging around when + // you don’t have to. If you do, a long-running Go program will start performing poorly + ch := make(chan interface{}, 1) + ech := make(chan error, 1) + go func() { + result, err := e.Evaluate(data) + if err != nil { + errors <- err + } else { + ch <- result + } + }() + select { + case r := <-ch: + gather <- r + case err := <-ech: + errors <- err + case <-time.After(timeout): + errors <- fmt.Errorf("%s timeout after %v on %v", e.Name(), timeout, data) + } + }(v) + } + out := make([]interface{}, 0, len(evaluators)) + errs := make([]error, 0, len(evaluators)) + for range evaluators { + select { + case r := <-gather: + out = append(out, r) + case e := <-errors: + errs = append(errs, e) + } + } + return out, errs +} diff --git a/concurrency/subtasks/fetchers/fetchers.go b/concurrency/subtasks/fetchers/fetchers.go new file mode 100644 index 0000000..cf1c9f0 --- /dev/null +++ b/concurrency/subtasks/fetchers/fetchers.go @@ -0,0 +1 @@ +package fetchers diff --git a/concurrency/subtasks/main.go b/concurrency/subtasks/main.go index 947a1cd..13d1ef6 100644 --- a/concurrency/subtasks/main.go +++ b/concurrency/subtasks/main.go @@ -1,13 +1,6 @@ package main -import ( - "fmt" - "reflect" - "runtime" - "time" - - "github.com/davecgh/go-spew/spew" -) +import "github.com/jianhan/go-patterns/concurrency/subtasks/divide_and_conquer" // https://medium.com/capital-one-developers/buffered-channels-in-go-what-are-they-good-for-43703871828 @@ -18,95 +11,5 @@ import ( // ideal way to gather the data back from your subtasks. func main() { - type in struct { - a int - b int - } - - type out struct { - source string - result int - } - - evaluators := []Evaluator{ - EvaluatorFunc(func(inV interface{}) (interface{}, error) { - i := inV.(in) - r := i.a + i.b - return out{"Plus", r}, nil - }), - EvaluatorFunc(func(inV interface{}) (interface{}, error) { - i := inV.(in) - r := i.a * i.b - return out{"Multi", r}, nil - }), - EvaluatorFunc(func(inV interface{}) (interface{}, error) { - i := inV.(in) - r := i.a - i.b - return out{"min", r}, nil - }), - EvaluatorFunc(func(inV interface{}) (interface{}, error) { - i := inV.(in) - r := i.a / i.b - return out{"divider", r}, nil - }), - } - - r, errors := DivideAndConquer(in{2, 3}, evaluators, 10*time.Millisecond) - spew.Dump(r, errors) -} - -type Evaluator interface { - Evaluate(data interface{}) (interface{}, error) - Name() string -} -type EvaluatorFunc func(interface{}) (interface{}, error) - -func (ef EvaluatorFunc) Evaluate(in interface{}) (interface{}, error) { - return ef(in) -} - -func (ef EvaluatorFunc) Name() string { - return runtime.FuncForPC(reflect.ValueOf(ef).Pointer()).Name() -} - -func DivideAndConquer(data interface{}, evaluators []Evaluator, timeout time.Duration) ([]interface{}, []error) { - gather := make(chan interface{}, len(evaluators)) - errors := make(chan error, len(evaluators)) - for _, v := range evaluators { - go func(e Evaluator) { - // Why not just use an unbuffered channel? The answer is that we don’t want to leak any goroutines. - // While the Go runtime is capable of handling thousands or hundreds of thousands of goroutines at a time, - // each goroutine does use some resources, so you don’t want to leave them hanging around when - // you don’t have to. If you do, a long-running Go program will start performing poorly - ch := make(chan interface{}, 1) - ech := make(chan error, 1) - go func() { - result, err := e.Evaluate(data) - if err != nil { - errors <- err - } else { - ch <- result - } - }() - select { - case r := <-ch: - gather <- r - case err := <-ech: - errors <- err - case <-time.After(timeout): - errors <- fmt.Errorf("%s timeout after %v on %v", e.Name(), timeout, data) - } - }(v) - } - out := make([]interface{}, 0, len(evaluators)) - errs := make([]error, 0, len(evaluators)) - for range evaluators { - select { - case r := <-gather: - out = append(out, r) - case e := <-errors: - errs = append(errs, e) - } - } - return out, errs + divide_and_conquer.RunDivideAndConquer() }