Add pubsub task

This commit is contained in:
Andrew 2020-03-26 17:42:29 +00:00 committed by verytable
parent 73abece215
commit 2a4bb7f0b8
5 changed files with 426 additions and 0 deletions

17
pubsub/README.md Normal file
View file

@ -0,0 +1,17 @@
## pubsub
В этой задаче нужно написать простую шину событий работающую по принципу pub/sub.
В файле [pubsub.go](./pubsub.go) заданы интерфейсы `PubSub`/`Subscription` с подробным описанием всех методов.
Требования к системе:
- На один топик может подписываться (и отписываться) множество подписчиков
- FIFO для сообщений, вы не должны терять их порядок
- Один медленный подписчик не должен тормозить остальных
- Метод Close должен уважать переданный контекст, так если он отменен - выходим, inflight сообщения выбрасываем
- Если будут использоваться горутины - они не должны течь :)
Нужно написать реализацию и конструктор (см. [my_pubsub.go](./my_pubsub.go):
```
func NewPubSub() PubSub
```

33
pubsub/my_pubsub.go Normal file
View file

@ -0,0 +1,33 @@
// +build !solution
package pubsub
import "context"
var _ Subscription = (*MySubscription)(nil)
type MySubscription struct{}
func (s *MySubscription) Unsubscribe() {
panic("implement me")
}
var _ PubSub = (*MyPubSub)(nil)
type MyPubSub struct{}
func NewPubSub() PubSub {
panic("implement me")
}
func (p *MyPubSub) Subscribe(subj string, cb MsgHandler) (Subscription, error) {
panic("implement me")
}
func (p *MyPubSub) Publish(subj string, msg interface{}) error {
panic("implement me")
}
func (p *MyPubSub) Close(ctx context.Context) error {
panic("implement me")
}

25
pubsub/pubsub.go Normal file
View file

@ -0,0 +1,25 @@
// +build !change
package pubsub
import "context"
// MsgHandler is a callback function that processes messages delivered to subscribers.
type MsgHandler func(msg interface{})
type Subscription interface {
// Unsubscribe will remove interest in the given subject.
Unsubscribe()
}
type PubSub interface {
// Subscribe creates an asynchronous queue subscriber on the given subject.
Subscribe(subj string, cb MsgHandler) (Subscription, error)
// Publish publishes the msg argument to the given subject.
Publish(subj string, msg interface{}) error
// Close will shutdown pub-sub system.
// May be blocked by data delivery until the context is canceled.
Close(ctx context.Context) error
}

View file

@ -0,0 +1,31 @@
package pubsub
import (
"context"
"fmt"
"sync"
)
func ExamplePubSub() {
p := NewPubSub()
defer func() { _ = p.Close(context.Background()) }()
wg := sync.WaitGroup{}
wg.Add(1)
_, err := p.Subscribe("single", func(msg interface{}) {
fmt.Println("new message:", msg)
// Output: new message: blah-blah
wg.Done()
})
if err != nil {
panic(err)
}
err = p.Publish("single", "blah-blah")
if err != nil {
panic(err)
}
wg.Wait()
}

320
pubsub/pubsub_test.go Normal file
View file

@ -0,0 +1,320 @@
package pubsub
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestPubSub_single(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wg := sync.WaitGroup{}
wg.Add(1)
_, err := p.Subscribe("single", func(msg interface{}) {
require.Equal(t, "blah-blah", msg)
wg.Done()
})
require.NoError(t, err)
err = p.Publish("single", "blah-blah")
require.NoError(t, err)
wg.Wait()
}
func TestPubSub_nonBlockPublish(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wg := sync.WaitGroup{}
wg.Add(11)
_, err := p.Subscribe("non-bock-topic", func(msg interface{}) {
time.Sleep(1 * time.Second)
wg.Done()
})
require.NoError(t, err)
done := make(chan struct{})
go func() {
for i := 0; i < 11; i++ {
err = p.Publish("non-bock-topic", "pew-pew")
assert.NoError(t, err)
}
close(done)
}()
select {
case <-time.After(1 * time.Second):
t.Fatal("publish method must not be blocked")
case <-done:
// ok
}
}
func TestPubSub_multipleSubscribers(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wgFirst := sync.WaitGroup{}
wgFirst.Add(1)
_, err := p.Subscribe("multiple", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
wgFirst.Done()
})
require.NoError(t, err)
wgSecond := sync.WaitGroup{}
wgSecond.Add(1)
_, err = p.Subscribe("multiple", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
wgSecond.Done()
})
require.NoError(t, err)
err = p.Publish("multiple", "pew-pew")
require.NoError(t, err)
wgFirst.Wait()
wgSecond.Wait()
}
func TestPubSub_slowpoke(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
samples := 100
wgSlow := sync.WaitGroup{}
wgSlow.Add(samples)
slowCtx, slowCancel := context.WithCancel(context.Background())
defer func() {
slowCancel()
wgSlow.Wait()
}()
_, err := p.Subscribe("slowpoke", func(msg interface{}) {
defer wgSlow.Done()
select {
case <-slowCtx.Done():
return
default:
time.Sleep(1 * time.Second)
}
})
require.NoError(t, err)
fastWg := sync.WaitGroup{}
fastWg.Add(samples)
_, err = p.Subscribe("slowpoke", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
fastWg.Done()
})
require.NoError(t, err)
for i := 0; i < samples; i++ {
err = p.Publish("slowpoke", "pew-pew")
require.NoError(t, err)
}
done := make(chan struct{})
go func() {
fastWg.Wait()
close(done)
}()
select {
case <-time.After(1 * time.Second):
t.Fatal("publish blocks on slowpoke?")
case <-done:
// ok
}
}
func TestPubSub_unsubscribe(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
s, err := p.Subscribe("unsubscribe", func(msg interface{}) {
t.Error("first subscriber must not be called")
})
require.NoError(t, err)
s.Unsubscribe()
wg := sync.WaitGroup{}
wg.Add(1)
_, err = p.Subscribe("unsubscribe", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
wg.Done()
})
require.NoError(t, err)
err = p.Publish("unsubscribe", "pew-pew")
require.NoError(t, err)
wg.Wait()
}
func TestPubSub_sequencePublishers(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wg := sync.WaitGroup{}
wg.Add(10)
_, err := p.Subscribe("topic", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
wg.Done()
})
require.NoError(t, err)
for i := 0; i < 10; i++ {
err := p.Publish("topic", "pew-pew")
require.NoError(t, err)
}
wg.Wait()
}
func TestPubSub_concurrentPublishers(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wg := sync.WaitGroup{}
wg.Add(10)
_, err := p.Subscribe("topic", func(msg interface{}) {
require.Equal(t, "pew-pew", msg)
wg.Done()
})
require.NoError(t, err)
for i := 0; i < 10; i++ {
go func() {
err := p.Publish("topic", "pew-pew")
require.NoError(t, err)
}()
}
wg.Wait()
}
func TestPubSub_msgOrder(t *testing.T) {
p := NewPubSub()
defer checkedClose(t, p)
wg := sync.WaitGroup{}
wg.Add(15)
c := uint64(0)
_, err := p.Subscribe("topic", func(msg interface{}) {
expected := atomic.AddUint64(&c, 1)
require.Equal(t, expected, msg)
wg.Done()
})
require.NoError(t, err)
for i := uint64(1); i < 11; i++ {
if i == 6 {
c := uint64(5)
_, subErr := p.Subscribe("topic", func(msg interface{}) {
expected := atomic.AddUint64(&c, 1)
require.Equal(t, expected, msg)
wg.Done()
})
require.NoError(t, subErr)
}
err = p.Publish("topic", i)
require.NoError(t, err)
}
wg.Wait()
}
func TestPubSub_failAfterClose(t *testing.T) {
p := NewPubSub()
err := p.Close(context.Background())
require.NoError(t, err)
_, err = p.Subscribe("topic", func(msg interface{}) {})
require.Error(t, err)
err = p.Publish("topic", "pew-pew")
require.Error(t, err)
}
func TestPubSub_close(t *testing.T) {
p := NewPubSub()
wg := sync.WaitGroup{}
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err := p.Subscribe("unsubscribe", func(msg interface{}) {
select {
case <-ctx.Done():
// fast exit
return
default:
time.Sleep(2 * time.Second)
wg.Done()
}
})
require.NoError(t, err)
for i := 0; i < 10; i++ {
err = p.Publish("unsubscribe", "pew-pew")
require.NoError(t, err)
}
// do a lot of work
time.Sleep(1 * time.Second)
done := make(chan struct{})
go func() {
closeCtx, closeCancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer closeCancel()
err := p.Close(closeCtx)
if err != nil && closeCtx.Err() == nil {
assert.NoError(t, err)
}
close(done)
}()
select {
case <-time.After(1 * time.Second):
t.Fatal("close must respect context timed out")
case <-done:
cancel()
wg.Wait()
}
}
func checkedClose(t *testing.T, c interface {
Close(ctx context.Context) error
}) {
require.NoError(t, c.Close(context.Background()))
}