1
0
Fork 0
mirror of https://github.com/tmrts/go-patterns.git synced 2025-04-03 13:13:34 +03:00
This commit is contained in:
Wei Fu 2017-07-19 10:41:53 +00:00 committed by GitHub
commit c940c02412
7 changed files with 292 additions and 1 deletions

View file

@ -68,7 +68,7 @@ A curated collection of idiomatic design & application patterns for Go language.
| [Generators](/concurrency/generator.md) | Yields a sequence of values one at a time | ✔ |
| [Reactor](/concurrency/reactor.md) | Demultiplexes service requests delivered concurrently to a service handler and dispatches them syncronously to the associated request handlers | ✘ |
| [Parallelism](/concurrency/parallelism.md) | Completes large number of independent tasks | ✔ |
| [Producer Consumer](/concurrency/producer_consumer.md) | Separates tasks from task executions | |
| [Producer Consumer](/concurrency/producer_consumer.md) | Separates tasks from task executions | |
## Messaging Patterns

View file

@ -0,0 +1,39 @@
package main
import (
pc "./producer_consumer"
"strconv"
"time"
)
func main() {
// unix socket address
pAddr := "/tmp/producer"
cAddr1 := "/tmp/consumer1"
cAddr2 := "/tmp/consumer2"
// producer buffer size
bufSize := 2
// start producer
producer := pc.NewProducer(pAddr, bufSize)
pc.StartProducer(producer)
// generate tasks
for i := 0; i < 10; i++ {
go func(num int) {
pc.EnqueueTask(pAddr, pc.Task{strconv.Itoa(num)})
}(i)
}
// start two consumers
consumer1 := pc.NewConsumer(cAddr1)
pc.StartConsumer(consumer1, pAddr)
consumer2 := pc.NewConsumer(cAddr2)
pc.StartConsumer(consumer2, pAddr)
time.Sleep(30 * time.Second)
pc.Shutdown(pAddr, true)
pc.Shutdown(cAddr1, false)
pc.Shutdown(cAddr2, false)
}

View file

@ -0,0 +1,12 @@
# Producer Consumer
What is the producer-consumer pattern?
> In computing, the producerconsumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer. --- from [wikipedia](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)
## Implementaion
More information can be found in [producer_consumer package](producer_consumer).
## Example
An example can be found in [producer_consumer.go](producer_consumer.go).

View file

@ -0,0 +1,67 @@
package producer_consumer
import (
"errors"
"log"
"net"
"net/rpc"
"os"
"time"
)
const (
TimeoutEnqueueTask = 5 * time.Second
TimeoutRegister = 5 * time.Second
TimeoutDial = 5 * time.Second
TimeoutShutdown = 5 * time.Second
ProcessDuration = 3 * time.Second
)
var (
ErrorTEnqueueTask = errors.New("Timeout for enqueuing task")
ErrorTRegister = errors.New("Timeout for registering")
ErrorTDial = errors.New("Timeout for dailing")
LogInfo = log.New(os.Stdout, "[Info] ", log.Ltime)
LogError = log.New(os.Stderr, "[Error] ", log.Ltime)
)
func Shutdown(address string, isProducer bool) {
serviceMethod := "Producer.Shutdown"
if !isProducer {
serviceMethod = "Consumer.Shutdown"
}
err := rpcCall(address, serviceMethod, struct{}{}, &struct{}{})
if err != nil {
LogError.Println(err)
}
}
func rpcCall(address string, serviceMethod string, args interface{}, reply *struct{}) error {
conn, err := net.DialTimeout("unix", address, TimeoutDial)
if err != nil {
return err
}
defer conn.Close()
client := rpc.NewClient(conn)
err = client.Call(serviceMethod, args, reply)
if err != nil {
return err
}
return nil
}
func serverAccept(server *rpc.Server, l net.Listener) {
for {
conn, err := l.Accept()
if err != nil {
LogError.Println(err)
return
}
go server.ServeConn(conn)
}
}

View file

@ -0,0 +1,55 @@
package producer_consumer
import (
"net"
"net/rpc"
"os"
"time"
)
type Consumer struct {
address string
l net.Listener
}
func NewConsumer(address string) *Consumer {
return &Consumer{address: address}
}
func StartConsumer(c *Consumer, pAddr string) {
server := rpc.NewServer()
server.Register(c)
os.Remove(c.address)
l, err := net.Listen("unix", c.address)
if err != nil {
LogError.Println(err)
}
c.l = l
go serverAccept(server, c.l)
err = rpcCall(pAddr, "Producer.Register", c.address, &struct{}{})
if err != nil {
LogError.Println(err)
c.l.Close()
}
}
func (c *Consumer) DoTask(task Task, _ *struct{}) error {
// takes time to do the task
time.Sleep(ProcessDuration)
LogInfo.Printf("[%s] finish task[%s] successfully!", c.address, task.String())
return nil
}
func (c *Consumer) Shutdown(_ struct{}, _ *struct{}) error {
if err := c.l.Close(); err != nil {
return err
}
LogInfo.Printf("[%s] shutdown successfully!", c.address)
return nil
}

View file

@ -0,0 +1,108 @@
package producer_consumer
import (
"net"
"net/rpc"
"os"
"time"
)
type Producer struct {
address string
tasks chan Task
done chan struct{}
consumers chan string
l net.Listener
}
func NewProducer(address string, capacity int) *Producer {
return &Producer{
address: address,
tasks: make(chan Task, capacity),
consumers: make(chan string, capacity),
done: make(chan struct{}, 0),
}
}
func StartProducer(p *Producer) {
server := rpc.NewServer()
server.Register(p)
os.Remove(p.address)
l, err := net.Listen("unix", p.address)
if err != nil {
LogError.Println(err)
}
p.l = l
go serverAccept(server, p.l)
go p.schedule()
}
func EnqueueTask(pAddr string, task Task) {
err := rpcCall(pAddr, "Producer.Enqueue", task, &struct{}{})
if err != nil {
LogError.Println(err)
}
}
func (p *Producer) schedule() {
for {
select {
case task := <-p.tasks:
// one consumer consumes one job at one time
consumer := <-p.consumers
go func(consumer string) {
// if rpcCall fails, this consumer will be regarded as unavailable.
err := rpcCall(consumer, "Consumer.DoTask", task, &struct{}{})
if err != nil {
LogError.Printf("[%s] %s", p.address, err.Error())
} else { // re-register consumer
go func(consumer string) {
select {
case p.consumers <- consumer:
case <-time.After(5 * time.Second):
LogError.Printf("[%s] %s", p.address, ErrorTRegister)
}
}(consumer)
}
}(consumer)
case <-p.done:
break
}
}
}
func (p *Producer) Enqueue(task Task, _ *struct{}) error {
select {
case p.tasks <- task:
LogInfo.Printf("[%s] enqueue task[%s] successfully!", p.address, task.String())
case <-time.After(TimeoutEnqueueTask):
return ErrorTEnqueueTask
}
return nil
}
func (p *Producer) Register(address string, _ *struct{}) error {
select {
case p.consumers <- address:
LogInfo.Printf("[%s] register consumer[%s] successfully!", p.address, address)
case <-time.After(TimeoutRegister):
return ErrorTRegister
}
return nil
}
func (p *Producer) Shutdown(_ struct{}, _ *struct{}) error {
p.done <- struct{}{}
if err := p.l.Close(); err != nil {
return err
}
LogInfo.Printf("[%s] shutdown successfully!\n", p.address)
return nil
}

View file

@ -0,0 +1,10 @@
package producer_consumer
// Add whatever you want
type Task struct {
Name string
}
func (t *Task) String() string {
return t.Name
}