mirror of
https://github.com/MetaCubeX/Clash.Meta.git
synced 2025-04-19 08:40:57 +00:00
fix: observable test
This commit is contained in:
parent
55cbbf7f41
commit
9e0889c02c
2 changed files with 8 additions and 3 deletions
|
@ -10,6 +10,7 @@ type Observable[T any] struct {
|
|||
listener map[Subscription[T]]*Subscriber[T]
|
||||
mux sync.Mutex
|
||||
done bool
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func (o *Observable[T]) process() {
|
||||
|
@ -31,6 +32,7 @@ func (o *Observable[T]) close() {
|
|||
for _, sub := range o.listener {
|
||||
sub.Close()
|
||||
}
|
||||
close(o.stopCh)
|
||||
}
|
||||
|
||||
func (o *Observable[T]) Subscribe() (Subscription[T], error) {
|
||||
|
@ -59,6 +61,7 @@ func NewObservable[T any](iter Iterable[T]) *Observable[T] {
|
|||
observable := &Observable[T]{
|
||||
iterable: iter,
|
||||
listener: map[Subscription[T]]*Subscriber[T]{},
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
go observable.process()
|
||||
return observable
|
||||
|
|
|
@ -70,9 +70,11 @@ func TestObservable_SubscribeClosedSource(t *testing.T) {
|
|||
src := NewObservable[int](iter)
|
||||
data, _ := src.Subscribe()
|
||||
<-data
|
||||
|
||||
_, closed := src.Subscribe()
|
||||
assert.NotNil(t, closed)
|
||||
select {
|
||||
case <-src.stopCh:
|
||||
case <-time.After(time.Second):
|
||||
assert.Fail(t, "timeout not stop")
|
||||
}
|
||||
}
|
||||
|
||||
func TestObservable_UnSubscribeWithNotExistSubscription(t *testing.T) {
|
||||
|
|
Loading…
Add table
Reference in a new issue