Concurrency with Shared Memory Лекция 5 Фёдор Короткий * Concurrency with Shared Memory - Принципы синхронизации, такие же как в C++ или java. - Различается набор инструментов в стандартной библиотеке. * Happens Before - Одно событие в программе может _произойти_раньше_ (_happens_before_) другого события. - Внутри одной горутины `a`:=`1` _happens_before_ `a`+=`1`. a := 1 a += 1 - Посылка значения из канал _happens_before_ получения этого значения из канала. - Два события A и B _происходят_одновременно_ (_are_concurrent_), если нельзя сказать что одно случилось раньше другого. * Race Condition - _Race_Condition_ - ситуация, когда код работает некорректно в зависимости от порядка выполнения нескольких горутин. * Example Bank // Package bank implements a bank with only one account. package bank var balance int func Deposit(amount int) { balance = balance + amount } func Balance() int { return balance } Работает корректно в одной горутине. // Alice: go func() { bank.Deposit(200) fmt.Println("=", bank.Balance()) }() // Bob go bank.Deposit(100) Ошибка в этом примере называется _data_race_. * Data Race breaks memory safety var x []int go func() { x = make([]int, 10) }() go func() { x = make([]int, 1000000) }() x[999999] = 1 // NOTE: undefined behavior; memory corruption possible! Программа с _data_race_ перестаёт вести себя так, как написано в коде. Иногда люди говорят _"это_безопасный_рейс"_. *Не*бывыет*безопасных*data-race-ов* * Data Race *Data*race* случается, когда несколько горутин одновременно работают с переменной, и хотябы одна из них пишет в эту переменную. Как защититься от _data_race_? 1. Не писать в переменную. 2. Не работать с переменной из нескольких горутин. 3. Не работать с переменной одновременно. (mutex) * Не писать в переменую var icons = make(map[string]image.Image) func loadIcon(name string) image.Image // NOTE: not concurrency-safe! func Icon(name string) image.Image { icon, ok := icons[name] if !ok { icon = loadIcon(name) icons[name] = icon } return icon } Правильно: var icons = map[string]image.Image{ "spades.png": loadIcon("spades.png"), "hearts.png": loadIcon("hearts.png"), "diamonds.png": loadIcon("diamonds.png"), "clubs.png": loadIcon("clubs.png"), } func Icon(name string) image.Image { return icons[name] } * Не работать с переменной из нескольких горутин // Package bank provides a concurrency-safe bank with one account. package bank var deposits = make(chan int) // send amount to deposit var balances = make(chan int) // receive balance func Deposit(amount int) { deposits <- amount } func Balance() int { return <-balances } func teller() { var balance int // balance is confined to teller goroutine for { select { case amount := <-deposits: balance += amount case balances <- balance: } } } func init() { go teller() // start the monitor goroutine } * sync.Mutex import "sync" var ( mu sync.Mutex // guards balance balance int ) func Deposit(amount int) { mu.Lock() balance = balance + amount mu.Unlock() } func Balance() int { mu.Lock() defer mu.Unlock() b := balance return b } * sync.RWMutex var mu sync.RWMutex var balance int func Balance() int { mu.RLock() // readers lock defer mu.RUnlock() return balance } * Memory Order var x, y int go func() { x = 1 // A1 fmt.Print("y:", y, " ") // A2 }() go func() { y = 1 // B1 fmt.Print("x:", x, " ") // B2 }() Можем ожидать y:0 x:1 x:0 y:1 x:1 y:1 y:1 x:1 Но реально может произойти x:0 y:0 y:0 x:0 * Ленивая инициализация sync.Once. var icons map[string]image.Image func loadIcons() { icons = map[string]image.Image{ "spades.png": loadIcon("spades.png") } } // NOTE: not concurrency-safe! func Icon(name string) image.Image { if icons == nil { loadIcons() // one-time initialization } return icons[name] } * Ленивая инициализация sync.Once. var mu sync.Mutex // guards icons var icons map[string]image.Image // Concurrency-safe. func Icon(name string) image.Image { mu.Lock() defer mu.Unlock() if icons == nil { loadIcons() } return icons[name] } * Ленивая инициализация sync.Once. var loadIconsOnce sync.Once var icons map[string]image.Image // Concurrency-safe. func Icon(name string) image.Image { loadIconsOnce.Do(loadIcons) return icons[name] } * Concurrent cache .play memo1/memo.go /type Memo/,/OMIT/ * Concurrent cache .play memo2/memo.go /type Memo/,/OMIT/ * Concurrent cache .play memo3/memo.go /func \(memo/,/OMIT/ * Concurrent cache .play memo4/memo.go /type entry/,/OMIT/ * sync.Map Что происходит, когда go совершает type cast из interface{}? - Приведение к конкретному типу требует одной проверки на `==` var v interface{} x, ok := v.(int) - Приведение к интерфейсу требует (в общем случае) сравнивать method set. var v interface{} x, ok := v.(io.Reader) // Конкретный тип v должен иметь метод Read([]byte) (int, error) - Для каждой пары `(concrete`type,`interface`type)` нужно знать, корректно ли такое приведение. var cache map[typeConversion]conversionResult * sync.Map var cache map[typeConversion]conversionResult Как защитить cache? `sync.Mutex`? Кеш из раннего примера? Какой паттерн нагрузки? - Запись в начале программы. - Потом только чтения. - Никогда не удаляем ключи. - Не страшно сделать вычисление несколько раз. Чего хотим? - Скорость как у `map` без лока. * sync.Map var cache sync.Map func convertType(from, to typ) *conversionResult { key := typeConversion{from: from, to: to} res, ok := cache.Load(key) if ok { return res.(*conversionResult) } res = doConversion(from, to) cache.Store(key, res) return res.(*conversionResult) } - `sync.Map` хранит две `map` внутри. `clean` и `dirty`. - Обращение к `clean` всегда происходит без лока. - Обращение к `dirty` требует лока. - Периодически `dirty` повышается до `clean`. * Как сделать sync.Map без двойного вычисления? .play synconce/map.go /type result/,/OMIT/ * sync.Cond type Once struct { done, running bool mu sync.Mutex cond *sync.Cond } func (once *Once) Do(f func()) { once.mu.Lock() defer once.mu.Unlock() if done { return } if running { once.cond.Wait() // releases and acquires mutex return } running = true once.mu.Unlock() f() once.mu.Lock() done = true once.cond.Broadcast() } * golang.org/x/sync Пакеты в golang.org/x содержат код, который не вошёл в стандартную библиотеку. - errgroup - `sync.WaitGroup` со встроенной обработкой ошибок - semaphore - семафор - singleflight - дедубликация вызовов * context type Context interface { // Возвращает время, когда операция будет оповещена о необходимости завершения Deadline() (deadline time.Time, ok bool) // Возвращает канал, который будет закрыт при необходимости завершить операцию // Служит в качестве инструмента оповещения об отмене Done() <-chan struct{} // Если Done не закрыт - возвращает nil. // Если Done закрыт, Err ошибку с объяснением причины: // - Canceled - контекст был отменен // - DeadlineExceeded - наступил дедлайн. // После возврашения не nil ошибки Err всегда возвращает данную ошибку. Err() error // Позволяет получить произвольный объект из контекста Value(key interface{}) interface{} } - Отмена таймауты - Передача request scoped значений * context Типы контекстов: // root context todo := context.TODO() ctx := context.Background() // manual cancel ctx, cancel := context.WithCancel(ctx) defer cancel() // cancel by timeout ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) defer cancel() ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second)) defer cancel() * Отменяем операции .code context/cancelation/cancelation.go /func SimpleCancelation()/,/OMIT/ .code context/cancelation/cancelation.go /func doSlowJob/,/OMIT/ * Отменяем операции .code context/cancelation/cancelation.go /func SimpleTimeout()/,/OMIT/ .code context/cancelation/cancelation.go /func doSlowJob/,/OMIT/ * context в библиотеках Go По соглашению `Context` всегда передается первым параметром в функции, обычно именуясь `ctx`. database/sql.(*DB).QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) database/sql.(*DB).ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) net/http.NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error) golang.org/x/sync/errgroup.WithContext(ctx context.Context) (*Group, context.Context) ... Быстрый пример: ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second) defer cancel() req, _ := http.NewRequestWithContext(ctx, "GET", "http://loremipsum.com", nil) resp, err := http.DefaultClient.Do(req) if err != nil { // возможно тут будет DeadlineExceeded }