diff --git a/common/observable/observable.go b/common/observable/observable.go index 62b2e153..1e45cc66 100644 --- a/common/observable/observable.go +++ b/common/observable/observable.go @@ -10,6 +10,7 @@ type Observable[T any] struct { listener map[Subscription[T]]*Subscriber[T] mux sync.Mutex done bool + stopCh chan struct{} } func (o *Observable[T]) process() { @@ -31,6 +32,7 @@ func (o *Observable[T]) close() { for _, sub := range o.listener { sub.Close() } + close(o.stopCh) } func (o *Observable[T]) Subscribe() (Subscription[T], error) { @@ -59,6 +61,7 @@ func NewObservable[T any](iter Iterable[T]) *Observable[T] { observable := &Observable[T]{ iterable: iter, listener: map[Subscription[T]]*Subscriber[T]{}, + stopCh: make(chan struct{}), } go observable.process() return observable diff --git a/common/observable/observable_test.go b/common/observable/observable_test.go index d263cb94..6be7f3aa 100644 --- a/common/observable/observable_test.go +++ b/common/observable/observable_test.go @@ -70,9 +70,11 @@ func TestObservable_SubscribeClosedSource(t *testing.T) { src := NewObservable[int](iter) data, _ := src.Subscribe() <-data - - _, closed := src.Subscribe() - assert.NotNil(t, closed) + select { + case <-src.stopCh: + case <-time.After(time.Second): + assert.Fail(t, "timeout not stop") + } } func TestObservable_UnSubscribeWithNotExistSubscription(t *testing.T) {