From 0975bc3736703011ede90f0334e675375b454483 Mon Sep 17 00:00:00 2001 From: Fedor Korotkiy Date: Sat, 15 Feb 2020 02:01:42 +0300 Subject: [PATCH] Add batcher task --- batcher/README.md | 14 ++++++ batcher/batcher.go | 12 +++++ batcher/batcher_test.go | 97 +++++++++++++++++++++++++++++++++++++++++ batcher/slow/value.go | 36 +++++++++++++++ 4 files changed, 159 insertions(+) create mode 100644 batcher/README.md create mode 100644 batcher/batcher.go create mode 100644 batcher/batcher_test.go create mode 100644 batcher/slow/value.go diff --git a/batcher/README.md b/batcher/README.md new file mode 100644 index 0000000..ed83999 --- /dev/null +++ b/batcher/README.md @@ -0,0 +1,14 @@ +# batcher + +`*slow.Value` - это аналог `atomic.Value`, с двумя ограничениями: + - Вызов `Load()` всегда занимает не меньше 1ms + - Нельзя делать больше одного вызова `Load()` в один момент времени. + +Реализуйте `*Batcher`, который оборачивает `*slow.Value` и ускоряет чтения за счёт батчинга. + +Например, если 1000 горутин одновременно сделают вызов `(*Batcher).Load()`, то можно прочитать +значение один раз и раздать всем ожидающим горутинам. Это будет в 1000 раз быстрее, +чем вызывать `(*slow.Value).Load()` напрямую в каждой горутине. + +Ваша реализация не должна создавать _stale read_. Тоесть, `Load()` всегда должен возвращать значение +последнего `Store()` на момент начала вызова `Load()`. diff --git a/batcher/batcher.go b/batcher/batcher.go new file mode 100644 index 0000000..b82cfa4 --- /dev/null +++ b/batcher/batcher.go @@ -0,0 +1,12 @@ +// +build !solution + +package batcher + +import "gitlab.com/slon/shad-go/batcher/slow" + +type Batcher struct { +} + +func NewBatcher(v *slow.Value) *Batcher { + return nil +} diff --git a/batcher/batcher_test.go b/batcher/batcher_test.go new file mode 100644 index 0000000..80a355d --- /dev/null +++ b/batcher/batcher_test.go @@ -0,0 +1,97 @@ +package batcher + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/slon/shad-go/batcher/slow" +) + +func TestSimple(t *testing.T) { + var value slow.Value + b := NewBatcher(&value) + + value.Store(1) + require.Equal(t, 1, b.Load()) + require.Equal(t, 1, value.Load()) + + value.Store(2) + require.Equal(t, 2, b.Load()) + require.Equal(t, 2, value.Load()) +} + +func TestStaleRead(t *testing.T) { + const ( + N = 100 + K = 100 + M = 10 + ) + + var value slow.Value + b := NewBatcher(&value) + + var counter int32 + value.Store(counter) + + var wg sync.WaitGroup + for i := 0; i < N; i++ { + wg.Add(1) + + go func(i int) { + defer wg.Done() + + time.Sleep(time.Millisecond * time.Duration(i/N)) + for j := 0; j < K; j++ { + counterValue := atomic.LoadInt32(&counter) + batcherValue := b.Load().(int32) + + if batcherValue < counterValue { + t.Errorf("load returned old value: counter=%d, batcher=%d", counterValue, batcherValue) + return + } + } + }(i) + } + + for i := 0; i < M*K; i++ { + // value is always greater than counter + value.Store(int32(i)) + atomic.StoreInt32(&counter, int32(i)) + + time.Sleep(time.Millisecond / M) + } + + wg.Wait() +} + +func TestSpeed(t *testing.T) { + const ( + N = 100 + K = 200 + ) + + var value slow.Value + b := NewBatcher(&value) + + start := time.Now() + + var wg sync.WaitGroup + for i := 0; i < N; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + for i := 0; i < K; i++ { + b.Load() + } + }() + } + wg.Wait() + + require.Truef(t, time.Since(start) < time.Second, "batching it too slow") +} diff --git a/batcher/slow/value.go b/batcher/slow/value.go new file mode 100644 index 0000000..5c0e585 --- /dev/null +++ b/batcher/slow/value.go @@ -0,0 +1,36 @@ +// +build !change + +package slow + +import ( + "sync" + "sync/atomic" + "time" +) + +type Value struct { + mu sync.Mutex + value interface{} + readRunning int32 +} + +func (s *Value) Load() interface{} { + if atomic.SwapInt32(&s.readRunning, 1) == 1 { + panic("another load is running") + } + defer atomic.StoreInt32(&s.readRunning, 0) + + s.mu.Lock() + value := s.value + s.mu.Unlock() + + time.Sleep(time.Millisecond) + return value +} + +func (s *Value) Store(v interface{}) { + s.mu.Lock() + defer s.mu.Unlock() + + s.value = v +}