diff --git a/observable/iterable.go b/observable/iterable.go index 78929945..ad0943b5 100644 --- a/observable/iterable.go +++ b/observable/iterable.go @@ -1,18 +1,3 @@ package observable -import ( - "errors" -) - type Iterable <-chan interface{} - -func NewIterable(any interface{}) (Iterable, error) { - switch any := any.(type) { - case chan interface{}: - return Iterable(any), nil - case <-chan interface{}: - return Iterable(any), nil - default: - return nil, errors.New("type error") - } -} diff --git a/observable/observable.go b/observable/observable.go index 5ba292ec..4b216d4f 100644 --- a/observable/observable.go +++ b/observable/observable.go @@ -50,7 +50,6 @@ func (o *Observable) Subscribe() (Subscription, error) { func (o *Observable) UnSubscribe(sub Subscription) { elm, exist := o.listener.Load(sub) if !exist { - println("not exist") return } subscriber := elm.(*Subscriber) diff --git a/observable/observable_test.go b/observable/observable_test.go index 10bef10d..c3d178db 100644 --- a/observable/observable_test.go +++ b/observable/observable_test.go @@ -27,11 +27,7 @@ func TestObservable(t *testing.T) { t.Error(err) } count := 0 - for { - _, open := <-data - if !open { - break - } + for range data { count = count + 1 } if count != 5 { @@ -49,11 +45,7 @@ func TestObservable_MutilSubscribe(t *testing.T) { var wg sync.WaitGroup wg.Add(2) waitCh := func(ch <-chan interface{}) { - for { - _, open := <-ch - if !open { - break - } + for range ch { count = count + 1 } wg.Done() @@ -80,6 +72,25 @@ func TestObservable_UnSubscribe(t *testing.T) { } } +func TestObservable_SubscribeClosedSource(t *testing.T) { + iter := iterator([]interface{}{1}) + src := NewObservable(iter) + data, _ := src.Subscribe() + <-data + + _, closed := src.Subscribe() + if closed == nil { + t.Error("Observable should be closed") + } +} + +func TestObservable_UnSubscribeWithNotExistSubscription(t *testing.T) { + sub := Subscription(make(chan interface{})) + iter := iterator([]interface{}{1}) + src := NewObservable(iter) + src.UnSubscribe(sub) +} + func TestObservable_SubscribeGoroutineLeak(t *testing.T) { // waiting for other goroutine recycle time.Sleep(120 * time.Millisecond) @@ -97,11 +108,7 @@ func TestObservable_SubscribeGoroutineLeak(t *testing.T) { var wg sync.WaitGroup wg.Add(max) waitCh := func(ch <-chan interface{}) { - for { - _, open := <-ch - if !open { - break - } + for range ch { } wg.Done() } diff --git a/observable/util.go b/observable/util.go deleted file mode 100644 index d7d02b0b..00000000 --- a/observable/util.go +++ /dev/null @@ -1,15 +0,0 @@ -package observable - -func mergeWithBytes(ch <-chan interface{}, buf []byte) chan interface{} { - out := make(chan interface{}) - go func() { - defer close(out) - if len(buf) != 0 { - out <- buf - } - for elm := range ch { - out <- elm - } - }() - return out -}