From 0dd35222f58857fdc8bc666df2969e4d81c92f7c Mon Sep 17 00:00:00 2001
From: Robert McLeod <robert@penguinpower.co.nz>
Date: Mon, 28 Sep 2020 17:44:56 +1300
Subject: [PATCH] Update publish_subscribe.md

---
 messaging/publish_subscribe.md | 66 ++++++++++++++++------------------
 1 file changed, 31 insertions(+), 35 deletions(-)

diff --git a/messaging/publish_subscribe.md b/messaging/publish_subscribe.md
index 638363e..5aa5798 100644
--- a/messaging/publish_subscribe.md
+++ b/messaging/publish_subscribe.md
@@ -12,63 +12,59 @@ To accomplish this, an intermediary, called a "message broker" or "event bus",
 receives published messages, and then routes them on to subscribers.
 
 
-There are three components **messages**, **topics**, **users**.
+There are three components **messages**, **topics**, **subscriptions**.
 
 ```go
 type Message struct {
     // Contents
 }
 
-
 type Subscription struct {
-	ch chan<- Message
-
-	Inbox chan Message
+	closed bool
+	inbox chan Message
 }
 
-func (s *Subscription) Publish(msg Message) error {
-	if _, ok := <-s.ch; !ok {
-		return errors.New("Topic has been closed")
+func (s *Subscription) Next() (Message, error) {
+	if s.closed {
+		return Message{}, errors.New("subscription closed")
 	}
+	
+	m, ok := <-s.inbox
+	if !ok {
+		return Message{}, errors.New("subscription closed")
+	}
+	
+	return m, nil
+}
 
-	s.ch <- msg
-
-	return nil
+func (s *Subscription) Unsubscribe(Subscription) error {
+	s.closed = true
+	close(s.inbox)
 }
 ```
 
 ```go
 type Topic struct {
-	Subscribers    []Session
+	Subscribers    []Subscription
 	MessageHistory []Message
 }
 
-func (t *Topic) Subscribe(uid uint64) (Subscription, error) {
-    // Get session and create one if it's the first
-
-    // Add session to the Topic & MessageHistory
-
-    // Create a subscription
+func (t *Topic) Subscribe() (Subscription) {
+	return Subscription{inbox: make(chan Message)}
 }
 
-func (t *Topic) Unsubscribe(Subscription) error {
-	// Implementation
-}
+func (t *Topic) Publish(msg Message) error {
+	for _, sub := range t.Subscribers {
+		if sub.closed {
+			continue
+		}
+		
+		go func() {
+			sub.inbox <- msg
+		}()
+	}
 
-func (t *Topic) Delete() error {
-	// Implementation
-}
-```
-
-```go
-type User struct {
-    ID uint64
-    Name string
-}
-
-type Session struct {
-    User User
-    Timestamp time.Time
+	return nil
 }
 ```