closeWaitsMessageDeliver test
This commit is contained in:
parent
ceaaff5cdb
commit
0d619cfdf1
1 changed files with 20 additions and 0 deletions
|
@ -313,6 +313,26 @@ func TestPubSub_close(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPubSub_closeWaitsMessageDelivery(t *testing.T) {
|
||||||
|
p := NewPubSub()
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
|
||||||
|
_, err := p.Subscribe("q", func(msg interface{}) {
|
||||||
|
time.Sleep(100*time.Millisecond)
|
||||||
|
wg.Done()
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 0; i < 11; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
err = p.Publish("q", "pew-pew")
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
checkedClose(t, p)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func checkedClose(t *testing.T, c interface {
|
func checkedClose(t *testing.T, c interface {
|
||||||
Close(ctx context.Context) error
|
Close(ctx context.Context) error
|
||||||
}) {
|
}) {
|
||||||
|
|
Loading…
Reference in a new issue