shad-go/pubsub/pubsub_test.go

369 lines
6.7 KiB
Go
Raw Normal View History

2020-03-26 17:42:29 +00:00
package pubsub
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestPubSub_single(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wg := sync.WaitGroup{}
wg.Add(1)
_, err := p.Subscribe("single", func(msg interface{}) {
require.Equal(t, "blah-blah", msg)
wg.Done()
})
require.NoError(t, err)
err = p.Publish("single", "blah-blah")
require.NoError(t, err)
wg.Wait()
}
func TestPubSub_nonBlockPublish(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wg := sync.WaitGroup{}
wg.Add(11)
_, err := p.Subscribe("non-bock-topic", func(msg interface{}) {
2020-03-29 18:32:26 +00:00
time.Sleep(10 * time.Millisecond)
2020-03-26 17:42:29 +00:00
wg.Done()
})
require.NoError(t, err)
done := make(chan struct{})
go func() {
for i := 0; i < 11; i++ {
err = p.Publish("non-bock-topic", "pew-pew")
assert.NoError(t, err)
}
close(done)
}()
select {
2020-03-29 18:32:26 +00:00
case <-time.After(10 * time.Second):
2020-03-26 17:42:29 +00:00
t.Fatal("publish method must not be blocked")
case <-done:
// ok
}
}
func TestPubSub_multipleSubjects(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wg := sync.WaitGroup{}
wg.Add(2)
_, err := p.Subscribe("sub1", func(msg interface{}) {
require.Equal(t, "blah-blah-1", msg)
wg.Done()
})
require.NoError(t, err)
_, err = p.Subscribe("sub2", func(msg interface{}) {
require.Equal(t, "blah-blah-2", msg)
wg.Done()
})
require.NoError(t, err)
err = p.Publish("sub1", "blah-blah-1")
2020-04-02 19:25:04 +00:00
require.NoError(t, err)
err = p.Publish("sub2", "blah-blah-2")
require.NoError(t, err)
wg.Wait()
}
2020-03-26 17:42:29 +00:00
func TestPubSub_multipleSubscribers(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wgFirst := sync.WaitGroup{}
wgFirst.Add(1)
_, err := p.Subscribe("multiple", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
wgFirst.Done()
})
require.NoError(t, err)
wgSecond := sync.WaitGroup{}
wgSecond.Add(1)
_, err = p.Subscribe("multiple", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
wgSecond.Done()
})
require.NoError(t, err)
err = p.Publish("multiple", "pew-pew")
require.NoError(t, err)
wgFirst.Wait()
wgSecond.Wait()
}
func TestPubSub_slowpoke(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
samples := 100
wgSlow := sync.WaitGroup{}
wgSlow.Add(samples)
slowCtx, slowCancel := context.WithCancel(context.Background())
defer func() {
slowCancel()
wgSlow.Wait()
}()
_, err := p.Subscribe("slowpoke", func(msg interface{}) {
defer wgSlow.Done()
select {
case <-slowCtx.Done():
return
default:
time.Sleep(1 * time.Second)
}
})
require.NoError(t, err)
fastWg := sync.WaitGroup{}
fastWg.Add(samples)
_, err = p.Subscribe("slowpoke", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
fastWg.Done()
})
require.NoError(t, err)
for i := 0; i < samples; i++ {
err = p.Publish("slowpoke", "pew-pew")
require.NoError(t, err)
}
done := make(chan struct{})
go func() {
fastWg.Wait()
close(done)
}()
select {
case <-time.After(1 * time.Second):
t.Fatal("publish blocks on slowpoke?")
case <-done:
// ok
}
}
func TestPubSub_unsubscribe(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
s, err := p.Subscribe("unsubscribe", func(msg interface{}) {
t.Error("first subscriber must not be called")
})
require.NoError(t, err)
s.Unsubscribe()
wg := sync.WaitGroup{}
wg.Add(1)
_, err = p.Subscribe("unsubscribe", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
wg.Done()
})
require.NoError(t, err)
err = p.Publish("unsubscribe", "pew-pew")
require.NoError(t, err)
wg.Wait()
}
func TestPubSub_sequencePublishers(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wg := sync.WaitGroup{}
wg.Add(10)
_, err := p.Subscribe("topic", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
wg.Done()
})
require.NoError(t, err)
for i := 0; i < 10; i++ {
err := p.Publish("topic", "pew-pew")
require.NoError(t, err)
}
wg.Wait()
}
func TestPubSub_concurrentPublishers(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wg := sync.WaitGroup{}
wg.Add(10)
_, err := p.Subscribe("topic", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
wg.Done()
})
require.NoError(t, err)
for i := 0; i < 10; i++ {
go func() {
err := p.Publish("topic", "pew-pew")
require.NoError(t, err)
}()
}
wg.Wait()
}
func TestPubSub_msgOrder(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wg := sync.WaitGroup{}
wg.Add(15)
c := uint64(0)
_, err := p.Subscribe("topic", func(msg interface{}) {
expected := atomic.AddUint64(&c, 1)
require.Equal(t, expected, msg)
wg.Done()
})
require.NoError(t, err)
for i := uint64(1); i < 11; i++ {
if i == 6 {
c := uint64(5)
_, subErr := p.Subscribe("topic", func(msg interface{}) {
expected := atomic.AddUint64(&c, 1)
require.Equal(t, expected, msg)
wg.Done()
})
require.NoError(t, subErr)
}
err = p.Publish("topic", i)
require.NoError(t, err)
}
wg.Wait()
}
func TestPubSub_failAfterClose(t *testing.T) {
p := NewPubSub()
err := p.Close(context.Background())
require.NoError(t, err)
_, err = p.Subscribe("topic", func(msg interface{}) {})
require.Error(t, err)
err = p.Publish("topic", "pew-pew")
require.Error(t, err)
}
func TestPubSub_close(t *testing.T) {
p := NewPubSub()
wg := sync.WaitGroup{}
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err := p.Subscribe("unsubscribe", func(msg interface{}) {
select {
case <-ctx.Done():
// fast exit
return
default:
time.Sleep(2 * time.Second)
wg.Done()
}
})
require.NoError(t, err)
for i := 0; i < 10; i++ {
err = p.Publish("unsubscribe", "pew-pew")
require.NoError(t, err)
}
// do a lot of work
time.Sleep(1 * time.Second)
done := make(chan struct{})
go func() {
closeCtx, closeCancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer closeCancel()
err := p.Close(closeCtx)
if err != nil && closeCtx.Err() == nil {
assert.NoError(t, err)
}
close(done)
}()
select {
case <-time.After(1 * time.Second):
t.Fatal("close must respect context timed out")
case <-done:
cancel()
wg.Wait()
}
}
2020-03-29 09:48:02 +00:00
func TestPubSub_closeWaitsMessageDelivery(t *testing.T) {
p := NewPubSub()
var msgs []any
2020-03-29 09:48:02 +00:00
_, err := p.Subscribe("q", func(msg interface{}) {
msgs = append(msgs, msg)
2020-03-29 18:32:26 +00:00
time.Sleep(100 * time.Millisecond)
2020-03-29 09:48:02 +00:00
})
require.NoError(t, err)
const N = 11
for i := 0; i < N; i++ {
2020-03-29 09:48:02 +00:00
err = p.Publish("q", "pew-pew")
require.NoError(t, err)
}
checkedClose(t, p)
require.Len(t, msgs, N)
2020-03-29 09:48:02 +00:00
}
2020-03-26 17:42:29 +00:00
func checkedClose(t *testing.T, c interface {
Close(ctx context.Context) error
}) {
require.NoError(t, c.Close(context.Background()))
}