Add batcher task

This commit is contained in:
Fedor Korotkiy 2020-02-15 02:01:42 +03:00
parent b19a92ab27
commit 0975bc3736
4 changed files with 159 additions and 0 deletions

14
batcher/README.md Normal file
View file

@ -0,0 +1,14 @@
# batcher
`*slow.Value` - это аналог `atomic.Value`, с двумя ограничениями:
- Вызов `Load()` всегда занимает не меньше 1ms
- Нельзя делать больше одного вызова `Load()` в один момент времени.
Реализуйте `*Batcher`, который оборачивает `*slow.Value` и ускоряет чтения за счёт батчинга.
Например, если 1000 горутин одновременно сделают вызов `(*Batcher).Load()`, то можно прочитать
значение один раз и раздать всем ожидающим горутинам. Это будет в 1000 раз быстрее,
чем вызывать `(*slow.Value).Load()` напрямую в каждой горутине.
Ваша реализация не должна создавать _stale read_. Тоесть, `Load()` всегда должен возвращать значение
последнего `Store()` на момент начала вызова `Load()`.

12
batcher/batcher.go Normal file
View file

@ -0,0 +1,12 @@
// +build !solution
package batcher
import "gitlab.com/slon/shad-go/batcher/slow"
type Batcher struct {
}
func NewBatcher(v *slow.Value) *Batcher {
return nil
}

97
batcher/batcher_test.go Normal file
View file

@ -0,0 +1,97 @@
package batcher
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"gitlab.com/slon/shad-go/batcher/slow"
)
func TestSimple(t *testing.T) {
var value slow.Value
b := NewBatcher(&value)
value.Store(1)
require.Equal(t, 1, b.Load())
require.Equal(t, 1, value.Load())
value.Store(2)
require.Equal(t, 2, b.Load())
require.Equal(t, 2, value.Load())
}
func TestStaleRead(t *testing.T) {
const (
N = 100
K = 100
M = 10
)
var value slow.Value
b := NewBatcher(&value)
var counter int32
value.Store(counter)
var wg sync.WaitGroup
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
time.Sleep(time.Millisecond * time.Duration(i/N))
for j := 0; j < K; j++ {
counterValue := atomic.LoadInt32(&counter)
batcherValue := b.Load().(int32)
if batcherValue < counterValue {
t.Errorf("load returned old value: counter=%d, batcher=%d", counterValue, batcherValue)
return
}
}
}(i)
}
for i := 0; i < M*K; i++ {
// value is always greater than counter
value.Store(int32(i))
atomic.StoreInt32(&counter, int32(i))
time.Sleep(time.Millisecond / M)
}
wg.Wait()
}
func TestSpeed(t *testing.T) {
const (
N = 100
K = 200
)
var value slow.Value
b := NewBatcher(&value)
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < K; i++ {
b.Load()
}
}()
}
wg.Wait()
require.Truef(t, time.Since(start) < time.Second, "batching it too slow")
}

36
batcher/slow/value.go Normal file
View file

@ -0,0 +1,36 @@
// +build !change
package slow
import (
"sync"
"sync/atomic"
"time"
)
type Value struct {
mu sync.Mutex
value interface{}
readRunning int32
}
func (s *Value) Load() interface{} {
if atomic.SwapInt32(&s.readRunning, 1) == 1 {
panic("another load is running")
}
defer atomic.StoreInt32(&s.readRunning, 0)
s.mu.Lock()
value := s.value
s.mu.Unlock()
time.Sleep(time.Millisecond)
return value
}
func (s *Value) Store(v interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.value = v
}