Improve: delete useless code and code coverage is now 100%

This commit is contained in:
Dreamacro 2018-07-12 18:03:02 +08:00
parent 283e4e1f4f
commit 39b45513af
4 changed files with 22 additions and 46 deletions

View file

@ -1,18 +1,3 @@
package observable package observable
import (
"errors"
)
type Iterable <-chan interface{} type Iterable <-chan interface{}
func NewIterable(any interface{}) (Iterable, error) {
switch any := any.(type) {
case chan interface{}:
return Iterable(any), nil
case <-chan interface{}:
return Iterable(any), nil
default:
return nil, errors.New("type error")
}
}

View file

@ -50,7 +50,6 @@ func (o *Observable) Subscribe() (Subscription, error) {
func (o *Observable) UnSubscribe(sub Subscription) { func (o *Observable) UnSubscribe(sub Subscription) {
elm, exist := o.listener.Load(sub) elm, exist := o.listener.Load(sub)
if !exist { if !exist {
println("not exist")
return return
} }
subscriber := elm.(*Subscriber) subscriber := elm.(*Subscriber)

View file

@ -27,11 +27,7 @@ func TestObservable(t *testing.T) {
t.Error(err) t.Error(err)
} }
count := 0 count := 0
for { for range data {
_, open := <-data
if !open {
break
}
count = count + 1 count = count + 1
} }
if count != 5 { if count != 5 {
@ -49,11 +45,7 @@ func TestObservable_MutilSubscribe(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
waitCh := func(ch <-chan interface{}) { waitCh := func(ch <-chan interface{}) {
for { for range ch {
_, open := <-ch
if !open {
break
}
count = count + 1 count = count + 1
} }
wg.Done() wg.Done()
@ -80,6 +72,25 @@ func TestObservable_UnSubscribe(t *testing.T) {
} }
} }
func TestObservable_SubscribeClosedSource(t *testing.T) {
iter := iterator([]interface{}{1})
src := NewObservable(iter)
data, _ := src.Subscribe()
<-data
_, closed := src.Subscribe()
if closed == nil {
t.Error("Observable should be closed")
}
}
func TestObservable_UnSubscribeWithNotExistSubscription(t *testing.T) {
sub := Subscription(make(chan interface{}))
iter := iterator([]interface{}{1})
src := NewObservable(iter)
src.UnSubscribe(sub)
}
func TestObservable_SubscribeGoroutineLeak(t *testing.T) { func TestObservable_SubscribeGoroutineLeak(t *testing.T) {
// waiting for other goroutine recycle // waiting for other goroutine recycle
time.Sleep(120 * time.Millisecond) time.Sleep(120 * time.Millisecond)
@ -97,11 +108,7 @@ func TestObservable_SubscribeGoroutineLeak(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(max) wg.Add(max)
waitCh := func(ch <-chan interface{}) { waitCh := func(ch <-chan interface{}) {
for { for range ch {
_, open := <-ch
if !open {
break
}
} }
wg.Done() wg.Done()
} }

View file

@ -1,15 +0,0 @@
package observable
func mergeWithBytes(ch <-chan interface{}, buf []byte) chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
if len(buf) != 0 {
out <- buf
}
for elm := range ch {
out <- elm
}
}()
return out
}