diff --git a/pubsub/README.md b/pubsub/README.md new file mode 100644 index 0000000..d5633d8 --- /dev/null +++ b/pubsub/README.md @@ -0,0 +1,17 @@ +## pubsub + +В этой задаче нужно написать простую шину событий работающую по принципу pub/sub. + +В файле [pubsub.go](./pubsub.go) заданы интерфейсы `PubSub`/`Subscription` с подробным описанием всех методов. + +Требования к системе: + - На один топик может подписываться (и отписываться) множество подписчиков + - FIFO для сообщений, вы не должны терять их порядок + - Один медленный подписчик не должен тормозить остальных + - Метод Close должен уважать переданный контекст, так если он отменен - выходим, inflight сообщения выбрасываем + - Если будут использоваться горутины - они не должны течь :) + +Нужно написать реализацию и конструктор (см. [my_pubsub.go](./my_pubsub.go): +``` +func NewPubSub() PubSub +``` diff --git a/pubsub/my_pubsub.go b/pubsub/my_pubsub.go new file mode 100644 index 0000000..4755374 --- /dev/null +++ b/pubsub/my_pubsub.go @@ -0,0 +1,33 @@ +// +build !solution + +package pubsub + +import "context" + +var _ Subscription = (*MySubscription)(nil) + +type MySubscription struct{} + +func (s *MySubscription) Unsubscribe() { + panic("implement me") +} + +var _ PubSub = (*MyPubSub)(nil) + +type MyPubSub struct{} + +func NewPubSub() PubSub { + panic("implement me") +} + +func (p *MyPubSub) Subscribe(subj string, cb MsgHandler) (Subscription, error) { + panic("implement me") +} + +func (p *MyPubSub) Publish(subj string, msg interface{}) error { + panic("implement me") +} + +func (p *MyPubSub) Close(ctx context.Context) error { + panic("implement me") +} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go new file mode 100644 index 0000000..ac3b8f3 --- /dev/null +++ b/pubsub/pubsub.go @@ -0,0 +1,25 @@ +// +build !change + +package pubsub + +import "context" + +// MsgHandler is a callback function that processes messages delivered to subscribers. +type MsgHandler func(msg interface{}) + +type Subscription interface { + // Unsubscribe will remove interest in the given subject. + Unsubscribe() +} + +type PubSub interface { + // Subscribe creates an asynchronous queue subscriber on the given subject. + Subscribe(subj string, cb MsgHandler) (Subscription, error) + + // Publish publishes the msg argument to the given subject. + Publish(subj string, msg interface{}) error + + // Close will shutdown pub-sub system. + // May be blocked by data delivery until the context is canceled. + Close(ctx context.Context) error +} diff --git a/pubsub/pubsub_example_test.go b/pubsub/pubsub_example_test.go new file mode 100644 index 0000000..9139ada --- /dev/null +++ b/pubsub/pubsub_example_test.go @@ -0,0 +1,31 @@ +package pubsub + +import ( + "context" + "fmt" + "sync" +) + +func ExamplePubSub() { + p := NewPubSub() + defer func() { _ = p.Close(context.Background()) }() + + wg := sync.WaitGroup{} + wg.Add(1) + + _, err := p.Subscribe("single", func(msg interface{}) { + fmt.Println("new message:", msg) + // Output: new message: blah-blah + wg.Done() + }) + if err != nil { + panic(err) + } + + err = p.Publish("single", "blah-blah") + if err != nil { + panic(err) + } + + wg.Wait() +} diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go new file mode 100644 index 0000000..e98a243 --- /dev/null +++ b/pubsub/pubsub_test.go @@ -0,0 +1,320 @@ +package pubsub + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +func TestPubSub_single(t *testing.T) { + p := NewPubSub() + defer checkedClose(t, p) + + wg := sync.WaitGroup{} + wg.Add(1) + + _, err := p.Subscribe("single", func(msg interface{}) { + require.Equal(t, "blah-blah", msg) + wg.Done() + }) + require.NoError(t, err) + + err = p.Publish("single", "blah-blah") + require.NoError(t, err) + + wg.Wait() +} + +func TestPubSub_nonBlockPublish(t *testing.T) { + p := NewPubSub() + defer checkedClose(t, p) + + wg := sync.WaitGroup{} + wg.Add(11) + + _, err := p.Subscribe("non-bock-topic", func(msg interface{}) { + time.Sleep(1 * time.Second) + wg.Done() + }) + require.NoError(t, err) + + done := make(chan struct{}) + go func() { + for i := 0; i < 11; i++ { + err = p.Publish("non-bock-topic", "pew-pew") + assert.NoError(t, err) + } + close(done) + }() + + select { + case <-time.After(1 * time.Second): + t.Fatal("publish method must not be blocked") + case <-done: + // ok + } +} + +func TestPubSub_multipleSubscribers(t *testing.T) { + p := NewPubSub() + defer checkedClose(t, p) + + wgFirst := sync.WaitGroup{} + wgFirst.Add(1) + + _, err := p.Subscribe("multiple", func(msg interface{}) { + require.Equal(t, "pew-pew", msg) + wgFirst.Done() + }) + require.NoError(t, err) + + wgSecond := sync.WaitGroup{} + wgSecond.Add(1) + + _, err = p.Subscribe("multiple", func(msg interface{}) { + require.Equal(t, "pew-pew", msg) + wgSecond.Done() + }) + require.NoError(t, err) + + err = p.Publish("multiple", "pew-pew") + require.NoError(t, err) + + wgFirst.Wait() + wgSecond.Wait() +} + +func TestPubSub_slowpoke(t *testing.T) { + p := NewPubSub() + defer checkedClose(t, p) + + samples := 100 + + wgSlow := sync.WaitGroup{} + wgSlow.Add(samples) + slowCtx, slowCancel := context.WithCancel(context.Background()) + defer func() { + slowCancel() + wgSlow.Wait() + }() + + _, err := p.Subscribe("slowpoke", func(msg interface{}) { + defer wgSlow.Done() + + select { + case <-slowCtx.Done(): + return + default: + time.Sleep(1 * time.Second) + } + }) + require.NoError(t, err) + + fastWg := sync.WaitGroup{} + fastWg.Add(samples) + + _, err = p.Subscribe("slowpoke", func(msg interface{}) { + require.Equal(t, "pew-pew", msg) + fastWg.Done() + }) + require.NoError(t, err) + + for i := 0; i < samples; i++ { + err = p.Publish("slowpoke", "pew-pew") + require.NoError(t, err) + } + + done := make(chan struct{}) + go func() { + fastWg.Wait() + close(done) + }() + + select { + case <-time.After(1 * time.Second): + t.Fatal("publish blocks on slowpoke?") + case <-done: + // ok + } +} + +func TestPubSub_unsubscribe(t *testing.T) { + p := NewPubSub() + defer checkedClose(t, p) + + s, err := p.Subscribe("unsubscribe", func(msg interface{}) { + t.Error("first subscriber must not be called") + }) + require.NoError(t, err) + + s.Unsubscribe() + + wg := sync.WaitGroup{} + wg.Add(1) + + _, err = p.Subscribe("unsubscribe", func(msg interface{}) { + require.Equal(t, "pew-pew", msg) + wg.Done() + }) + require.NoError(t, err) + + err = p.Publish("unsubscribe", "pew-pew") + require.NoError(t, err) + + wg.Wait() +} + +func TestPubSub_sequencePublishers(t *testing.T) { + p := NewPubSub() + defer checkedClose(t, p) + + wg := sync.WaitGroup{} + wg.Add(10) + + _, err := p.Subscribe("topic", func(msg interface{}) { + require.Equal(t, "pew-pew", msg) + wg.Done() + }) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + err := p.Publish("topic", "pew-pew") + require.NoError(t, err) + } + + wg.Wait() +} + +func TestPubSub_concurrentPublishers(t *testing.T) { + p := NewPubSub() + defer checkedClose(t, p) + + wg := sync.WaitGroup{} + wg.Add(10) + + _, err := p.Subscribe("topic", func(msg interface{}) { + require.Equal(t, "pew-pew", msg) + wg.Done() + }) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + go func() { + err := p.Publish("topic", "pew-pew") + require.NoError(t, err) + }() + } + + wg.Wait() +} + +func TestPubSub_msgOrder(t *testing.T) { + p := NewPubSub() + defer checkedClose(t, p) + + wg := sync.WaitGroup{} + wg.Add(15) + + c := uint64(0) + _, err := p.Subscribe("topic", func(msg interface{}) { + expected := atomic.AddUint64(&c, 1) + require.Equal(t, expected, msg) + wg.Done() + }) + require.NoError(t, err) + + for i := uint64(1); i < 11; i++ { + if i == 6 { + c := uint64(5) + _, subErr := p.Subscribe("topic", func(msg interface{}) { + expected := atomic.AddUint64(&c, 1) + require.Equal(t, expected, msg) + wg.Done() + }) + require.NoError(t, subErr) + } + + err = p.Publish("topic", i) + require.NoError(t, err) + } + + wg.Wait() +} + +func TestPubSub_failAfterClose(t *testing.T) { + p := NewPubSub() + err := p.Close(context.Background()) + require.NoError(t, err) + + _, err = p.Subscribe("topic", func(msg interface{}) {}) + require.Error(t, err) + + err = p.Publish("topic", "pew-pew") + require.Error(t, err) +} + +func TestPubSub_close(t *testing.T) { + p := NewPubSub() + + wg := sync.WaitGroup{} + wg.Add(1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, err := p.Subscribe("unsubscribe", func(msg interface{}) { + select { + case <-ctx.Done(): + // fast exit + return + default: + time.Sleep(2 * time.Second) + wg.Done() + } + }) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + err = p.Publish("unsubscribe", "pew-pew") + require.NoError(t, err) + } + + // do a lot of work + time.Sleep(1 * time.Second) + + done := make(chan struct{}) + go func() { + closeCtx, closeCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer closeCancel() + + err := p.Close(closeCtx) + if err != nil && closeCtx.Err() == nil { + assert.NoError(t, err) + } + close(done) + }() + + select { + case <-time.After(1 * time.Second): + t.Fatal("close must respect context timed out") + case <-done: + cancel() + wg.Wait() + } +} + +func checkedClose(t *testing.T, c interface { + Close(ctx context.Context) error +}) { + require.NoError(t, c.Close(context.Background())) +}