500 lines
14 KiB
Text
500 lines
14 KiB
Text
Concurrency with Shared Memory
|
||
Лекция 5
|
||
|
||
Фёдор Короткий
|
||
|
||
* Concurrency with Shared Memory
|
||
|
||
- Принципы синхронизации, такие же как в C++ или java.
|
||
- Различается набор инструментов в стандартной библиотеке.
|
||
|
||
* Happens Before
|
||
|
||
- Одно событие в программе может _произойти_раньше_ (_happens_before_) другого события.
|
||
|
||
- Внутри одной горутины `a`:=`1` _happens_before_ `a`+=`1`.
|
||
|
||
a := 1
|
||
a += 1
|
||
|
||
- Посылка значения из канала _happens_before_ получения этого значения из канала.
|
||
|
||
- Два события A и B _происходят_одновременно_ (_are_concurrent_), если нельзя сказать что одно случилось раньше другого.
|
||
|
||
* Race Condition
|
||
|
||
- _Race_Condition_ - ситуация, когда код работает некорректно в зависимости от порядка выполнения нескольких горутин.
|
||
|
||
* Example Bank
|
||
|
||
// Package bank implements a bank with only one account.
|
||
package bank
|
||
|
||
var balance int
|
||
|
||
func Deposit(amount int) { balance = balance + amount }
|
||
|
||
func Balance() int { return balance }
|
||
|
||
Работает корректно в одной горутине.
|
||
|
||
// Alice:
|
||
go func() {
|
||
bank.Deposit(200)
|
||
fmt.Println("=", bank.Balance())
|
||
}()
|
||
|
||
// Bob
|
||
go bank.Deposit(100)
|
||
|
||
Ошибка в этом примере называется _data_race_.
|
||
|
||
* Data Race breaks memory safety
|
||
|
||
var x []int
|
||
go func() { x = make([]int, 10) }()
|
||
go func() { x = make([]int, 1000000) }()
|
||
x[999999] = 1 // NOTE: undefined behavior; memory corruption possible!
|
||
|
||
Программа с _data_race_ перестаёт вести себя так, как написано в коде.
|
||
|
||
Иногда люди говорят _"это_безопасный_рейс"_. *Не*бывыет*безопасных*data-race-ов*
|
||
|
||
* Data Race
|
||
|
||
*Data*race* случается, когда несколько горутин одновременно работают с переменной, и хотябы одна из них пишет в эту переменную.
|
||
|
||
Как защититься от _data_race_?
|
||
|
||
1. Не писать в переменную.
|
||
2. Не работать с переменной из нескольких горутин.
|
||
3. Не работать с переменной одновременно. (mutex)
|
||
|
||
* Не писать в переменую
|
||
|
||
var icons = make(map[string]image.Image)
|
||
func loadIcon(name string) image.Image
|
||
|
||
// NOTE: not concurrency-safe!
|
||
func Icon(name string) image.Image {
|
||
icon, ok := icons[name]
|
||
if !ok {
|
||
icon = loadIcon(name)
|
||
icons[name] = icon
|
||
}
|
||
return icon
|
||
}
|
||
|
||
Правильно:
|
||
|
||
var icons = map[string]image.Image{
|
||
"spades.png": loadIcon("spades.png"),
|
||
"hearts.png": loadIcon("hearts.png"),
|
||
"diamonds.png": loadIcon("diamonds.png"),
|
||
"clubs.png": loadIcon("clubs.png"),
|
||
}
|
||
|
||
func Icon(name string) image.Image { return icons[name] }
|
||
|
||
* Не работать с переменной из нескольких горутин
|
||
|
||
// Package bank provides a concurrency-safe bank with one account.
|
||
package bank
|
||
|
||
var deposits = make(chan int) // send amount to deposit
|
||
var balances = make(chan int) // receive balance
|
||
|
||
func Deposit(amount int) { deposits <- amount }
|
||
func Balance() int { return <-balances }
|
||
|
||
func teller() {
|
||
var balance int // balance is confined to teller goroutine
|
||
for {
|
||
select {
|
||
case amount := <-deposits:
|
||
balance += amount
|
||
case balances <- balance:
|
||
}
|
||
}
|
||
}
|
||
|
||
func init() {
|
||
go teller() // start the monitor goroutine
|
||
}
|
||
|
||
* sync.Mutex
|
||
|
||
import "sync"
|
||
|
||
var (
|
||
mu sync.Mutex // guards balance
|
||
balance int
|
||
)
|
||
|
||
func Deposit(amount int) {
|
||
mu.Lock()
|
||
balance = balance + amount
|
||
mu.Unlock()
|
||
}
|
||
|
||
func Balance() int {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
b := balance
|
||
return b
|
||
}
|
||
|
||
* sync.RWMutex
|
||
|
||
var mu sync.RWMutex
|
||
var balance int
|
||
|
||
func Balance() int {
|
||
mu.RLock() // readers lock
|
||
defer mu.RUnlock()
|
||
return balance
|
||
}
|
||
|
||
* Memory Order
|
||
|
||
var x, y int
|
||
go func() {
|
||
x = 1 // A1
|
||
fmt.Print("y:", y, " ") // A2
|
||
}()
|
||
go func() {
|
||
y = 1 // B1
|
||
fmt.Print("x:", x, " ") // B2
|
||
}()
|
||
|
||
Можем ожидать
|
||
|
||
y:0 x:1
|
||
x:0 y:1
|
||
x:1 y:1
|
||
y:1 x:1
|
||
|
||
Но реально может произойти
|
||
|
||
x:0 y:0
|
||
y:0 x:0
|
||
|
||
* Atomics
|
||
|
||
var x, y atomic.Int32
|
||
go func() {
|
||
x.Store(1) // A1
|
||
fmt.Print("y:", y.Load(), " ") // A2
|
||
}()
|
||
go func() {
|
||
y.Store(1) // B1
|
||
fmt.Print("x:", x.Load(), " ") // B2
|
||
}()
|
||
|
||
In the terminology of the Go memory model, if the effect of an atomic operation A is observed by atomic operation B, then A “synchronizes before” B.
|
||
|
||
Additionally, all the atomic operations executed in a program behave as though executed in some sequentially consistent order.
|
||
|
||
This definition provides the same semantics as C++'s sequentially consistent atomics and Java's volatile variables.
|
||
|
||
* Atomic Int
|
||
|
||
type Uint32 struct {}
|
||
|
||
// Load atomically loads and returns the value stored in x.
|
||
func (x *Uint32) Load() uint32
|
||
|
||
// Store atomically stores val into x.
|
||
func (x *Uint32) Store(val uint32)
|
||
|
||
// Swap atomically stores new into x and returns the previous value.
|
||
func (x *Uint32) Swap(new uint32) (old uint32)
|
||
|
||
// CompareAndSwap executes the compare-and-swap operation for x.
|
||
func (x *Uint32) CompareAndSwap(old, new uint32) (swapped bool)
|
||
|
||
// Add atomically adds delta to x and returns the new value.
|
||
func (x *Uint32) Add(delta uint32) (new uint32)
|
||
|
||
* Atomic Value
|
||
|
||
type Value struct {}
|
||
|
||
// Load atomically loads and returns the value stored in x.
|
||
func (x *Value) Load() any
|
||
|
||
// Store atomically stores val into x.
|
||
func (x *Value) Store(val any)
|
||
|
||
// Swap atomically stores new into x and returns the previous value.
|
||
func (x *Value) Swap(new any) (old any)
|
||
|
||
// CompareAndSwap executes the compare-and-swap operation for x.
|
||
func (x *Value) CompareAndSwap(old, new any) (swapped bool)
|
||
|
||
* Config update
|
||
|
||
func main() {
|
||
var config atomic.Value // holds current server configuration
|
||
// Create initial config value and store into config.
|
||
config.Store(loadConfig())
|
||
go func() {
|
||
// Reload config every 10 seconds
|
||
// and update config value with the new version.
|
||
for {
|
||
time.Sleep(10 * time.Second)
|
||
config.Store(loadConfig())
|
||
}
|
||
}()
|
||
// Create worker goroutines that handle incoming requests
|
||
// using the latest config value.
|
||
for i := 0; i < 10; i++ {
|
||
go func() {
|
||
for r := range requests() {
|
||
c := config.Load()
|
||
// Handle request r using config c.
|
||
_, _ = r, c
|
||
}
|
||
}()
|
||
}
|
||
}
|
||
|
||
* Ленивая инициализация sync.Once.
|
||
|
||
var icons map[string]image.Image
|
||
|
||
func loadIcons() {
|
||
icons = map[string]image.Image{
|
||
"spades.png": loadIcon("spades.png")
|
||
}
|
||
}
|
||
|
||
// NOTE: not concurrency-safe!
|
||
func Icon(name string) image.Image {
|
||
if icons == nil {
|
||
loadIcons() // one-time initialization
|
||
}
|
||
return icons[name]
|
||
}
|
||
|
||
* Ленивая инициализация sync.Once.
|
||
|
||
var mu sync.Mutex // guards icons
|
||
var icons map[string]image.Image
|
||
|
||
// Concurrency-safe.
|
||
func Icon(name string) image.Image {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
if icons == nil {
|
||
loadIcons()
|
||
}
|
||
return icons[name]
|
||
}
|
||
|
||
* Ленивая инициализация sync.Once.
|
||
|
||
var loadIconsOnce sync.Once
|
||
var icons map[string]image.Image
|
||
|
||
// Concurrency-safe.
|
||
func Icon(name string) image.Image {
|
||
loadIconsOnce.Do(loadIcons)
|
||
return icons[name]
|
||
}
|
||
|
||
* Concurrent cache
|
||
|
||
.play memo1/memo.go /type Memo/,/OMIT/
|
||
|
||
* Concurrent cache
|
||
|
||
.play memo2/memo.go /type Memo/,/OMIT/
|
||
|
||
* Concurrent cache
|
||
|
||
.play memo3/memo.go /func \(memo/,/OMIT/
|
||
|
||
* Concurrent cache
|
||
|
||
.play memo4/memo.go /type entry/,/OMIT/
|
||
|
||
* sync.Map
|
||
|
||
Что происходит, когда go совершает type cast из interface{}?
|
||
|
||
- Приведение к конкретному типу требует одной проверки на `==`
|
||
|
||
var v interface{}
|
||
x, ok := v.(int)
|
||
|
||
- Приведение к интерфейсу требует (в общем случае) сравнивать method set.
|
||
|
||
var v interface{}
|
||
x, ok := v.(io.Reader)
|
||
|
||
// Конкретный тип v должен иметь метод Read([]byte) (int, error)
|
||
|
||
- Для каждой пары `(concrete`type,`interface`type)` нужно знать, корректно ли такое приведение.
|
||
|
||
var cache map[typeConversion]conversionResult
|
||
|
||
* sync.Map
|
||
|
||
var cache map[typeConversion]conversionResult
|
||
|
||
Как защитить cache? `sync.Mutex`? Кеш из раннего примера?
|
||
|
||
Какой паттерн нагрузки?
|
||
|
||
- Запись в начале программы.
|
||
- Потом только чтения.
|
||
- Никогда не удаляем ключи.
|
||
- Не страшно сделать вычисление несколько раз.
|
||
|
||
Чего хотим?
|
||
|
||
- Скорость как у `map` без лока.
|
||
|
||
* sync.Map
|
||
|
||
var cache sync.Map
|
||
|
||
func convertType(from, to typ) *conversionResult {
|
||
key := typeConversion{from: from, to: to}
|
||
res, ok := cache.Load(key)
|
||
if ok {
|
||
return res.(*conversionResult)
|
||
}
|
||
|
||
res = doConversion(from, to)
|
||
cache.Store(key, res)
|
||
return res.(*conversionResult)
|
||
}
|
||
|
||
- `sync.Map` хранит две `map` внутри. `clean` и `dirty`.
|
||
- Обращение к `clean` всегда происходит без лока.
|
||
- Обращение к `dirty` требует лока.
|
||
- Периодически `dirty` повышается до `clean`.
|
||
|
||
* Как сделать sync.Map без двойного вычисления?
|
||
|
||
.play synconce/map.go /type result/,/OMIT/
|
||
|
||
* sync.Cond
|
||
|
||
type Once struct {
|
||
done, running bool
|
||
mu sync.Mutex
|
||
cond *sync.Cond
|
||
}
|
||
|
||
func (once *Once) Do(f func()) {
|
||
once.mu.Lock()
|
||
defer once.mu.Unlock()
|
||
if once.done {
|
||
return
|
||
}
|
||
if once.running {
|
||
once.cond.Wait() // releases and acquires mutex
|
||
return
|
||
}
|
||
|
||
once.running = true
|
||
once.mu.Unlock()
|
||
f()
|
||
once.mu.Lock()
|
||
once.done = true
|
||
once.cond.Broadcast()
|
||
}
|
||
|
||
* golang.org/x/sync
|
||
|
||
Пакеты в golang.org/x содержат код, который не вошёл в стандартную библиотеку.
|
||
|
||
- errgroup - `sync.WaitGroup` со встроенной обработкой ошибок
|
||
- semaphore - семафор
|
||
- singleflight - дедубликация вызовов
|
||
|
||
* context
|
||
|
||
type Context interface {
|
||
// Возвращает время, когда операция будет оповещена о необходимости завершения
|
||
Deadline() (deadline time.Time, ok bool)
|
||
|
||
// Возвращает канал, который будет закрыт при необходимости завершить операцию
|
||
// Служит в качестве инструмента оповещения об отмене
|
||
Done() <-chan struct{}
|
||
|
||
// Если Done не закрыт - возвращает nil.
|
||
// Если Done закрыт, Err ошибку с объяснением причины:
|
||
// - Canceled - контекст был отменен
|
||
// - DeadlineExceeded - наступил дедлайн.
|
||
// После возврашения не nil ошибки Err всегда возвращает данную ошибку.
|
||
Err() error
|
||
|
||
// Позволяет получить произвольный объект из контекста
|
||
Value(key interface{}) interface{}
|
||
}
|
||
|
||
- Отмена таймауты
|
||
- Передача request scoped значений
|
||
|
||
* context
|
||
|
||
Типы контекстов:
|
||
|
||
// root context
|
||
todo := context.TODO()
|
||
ctx := context.Background()
|
||
|
||
// manual cancel
|
||
ctx, cancel := context.WithCancel(ctx)
|
||
defer cancel()
|
||
|
||
// manual cancel with explicit cause
|
||
ctx, cancel := context.WithCancelCause(ctx)
|
||
defer cancel(fmt.Errorf("job not needed"))
|
||
|
||
// cancel by timeout
|
||
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
|
||
defer cancel()
|
||
|
||
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second))
|
||
defer cancel()
|
||
|
||
* Отменяем операции
|
||
|
||
.code context/cancelation/cancelation.go /func SimpleCancelation()/,/OMIT/
|
||
.code context/cancelation/cancelation.go /func doSlowJob/,/OMIT/
|
||
|
||
* Отменяем операции
|
||
|
||
.code context/cancelation/cancelation.go /func SimpleTimeout()/,/OMIT/
|
||
.code context/cancelation/cancelation.go /func doSlowJob/,/OMIT/
|
||
|
||
* context в библиотеках Go
|
||
|
||
По соглашению `Context` всегда передается первым параметром в функции, обычно именуясь `ctx`.
|
||
|
||
database/sql.(*DB).QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error)
|
||
database/sql.(*DB).ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)
|
||
net/http.NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error)
|
||
golang.org/x/sync/errgroup.WithContext(ctx context.Context) (*Group, context.Context)
|
||
...
|
||
|
||
Быстрый пример:
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
|
||
defer cancel()
|
||
|
||
req, _ := http.NewRequestWithContext(ctx, "GET", "http://loremipsum.com", nil)
|
||
resp, err := http.DefaultClient.Do(req)
|
||
if err != nil {
|
||
// возможно тут будет DeadlineExceeded
|
||
}
|
||
|
||
* context и передача значений
|
||
|
||
.code context/value/value.go /type/,/OMIT/
|
||
.code context/value/value.go /func main/,/}/
|