shad-go/lectures/05-concurrency/lecture.slide

500 lines
14 KiB
Text
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

Concurrency with Shared Memory
Лекция 5
Фёдор Короткий
* Concurrency with Shared Memory
- Принципы синхронизации, такие же как в C++ или java.
- Различается набор инструментов в стандартной библиотеке.
* Happens Before
- Одно событие в программе может _произойти_раньше_ (_happens_before_) другого события.
- Внутри одной горутины `a`:=`1` _happens_before_ `a`+=`1`.
a := 1
a += 1
- Посылка значения из канала _happens_before_ получения этого значения из канала.
- Два события A и B _происходят_одновременно_ (_are_concurrent_), если нельзя сказать что одно случилось раньше другого.
* Race Condition
- _Race_Condition_ - ситуация, когда код работает некорректно в зависимости от порядка выполнения нескольких горутин.
* Example Bank
// Package bank implements a bank with only one account.
package bank
var balance int
func Deposit(amount int) { balance = balance + amount }
func Balance() int { return balance }
Работает корректно в одной горутине.
// Alice:
go func() {
bank.Deposit(200)
fmt.Println("=", bank.Balance())
}()
// Bob
go bank.Deposit(100)
Ошибка в этом примере называется _data_race_.
* Data Race breaks memory safety
var x []int
go func() { x = make([]int, 10) }()
go func() { x = make([]int, 1000000) }()
x[999999] = 1 // NOTE: undefined behavior; memory corruption possible!
Программа с _data_race_ перестаёт вести себя так, как написано в коде.
Иногда люди говорят _"это_безопасный_рейс"_. *Не*бывыет*безопасных*data-race-ов*
* Data Race
*Data*race* случается, когда несколько горутин одновременно работают с переменной, и хотябы одна из них пишет в эту переменную.
Как защититься от _data_race_?
1. Не писать в переменную.
2. Не работать с переменной из нескольких горутин.
3. Не работать с переменной одновременно. (mutex)
* Не писать в переменую
var icons = make(map[string]image.Image)
func loadIcon(name string) image.Image
// NOTE: not concurrency-safe!
func Icon(name string) image.Image {
icon, ok := icons[name]
if !ok {
icon = loadIcon(name)
icons[name] = icon
}
return icon
}
Правильно:
var icons = map[string]image.Image{
"spades.png": loadIcon("spades.png"),
"hearts.png": loadIcon("hearts.png"),
"diamonds.png": loadIcon("diamonds.png"),
"clubs.png": loadIcon("clubs.png"),
}
func Icon(name string) image.Image { return icons[name] }
* Не работать с переменной из нескольких горутин
// Package bank provides a concurrency-safe bank with one account.
package bank
var deposits = make(chan int) // send amount to deposit
var balances = make(chan int) // receive balance
func Deposit(amount int) { deposits <- amount }
func Balance() int { return <-balances }
func teller() {
var balance int // balance is confined to teller goroutine
for {
select {
case amount := <-deposits:
balance += amount
case balances <- balance:
}
}
}
func init() {
go teller() // start the monitor goroutine
}
* sync.Mutex
import "sync"
var (
mu sync.Mutex // guards balance
balance int
)
func Deposit(amount int) {
mu.Lock()
balance = balance + amount
mu.Unlock()
}
func Balance() int {
mu.Lock()
defer mu.Unlock()
b := balance
return b
}
* sync.RWMutex
var mu sync.RWMutex
var balance int
func Balance() int {
mu.RLock() // readers lock
defer mu.RUnlock()
return balance
}
* Memory Order
var x, y int
go func() {
x = 1 // A1
fmt.Print("y:", y, " ") // A2
}()
go func() {
y = 1 // B1
fmt.Print("x:", x, " ") // B2
}()
Можем ожидать
y:0 x:1
x:0 y:1
x:1 y:1
y:1 x:1
Но реально может произойти
x:0 y:0
y:0 x:0
* Atomics
var x, y atomic.Int32
go func() {
x.Store(1) // A1
fmt.Print("y:", y.Load(), " ") // A2
}()
go func() {
y.Store(1) // B1
fmt.Print("x:", x.Load(), " ") // B2
}()
In the terminology of the Go memory model, if the effect of an atomic operation A is observed by atomic operation B, then A “synchronizes before” B.
Additionally, all the atomic operations executed in a program behave as though executed in some sequentially consistent order.
This definition provides the same semantics as C++'s sequentially consistent atomics and Java's volatile variables.
* Atomic Int
type Uint32 struct {}
// Load atomically loads and returns the value stored in x.
func (x *Uint32) Load() uint32
// Store atomically stores val into x.
func (x *Uint32) Store(val uint32)
// Swap atomically stores new into x and returns the previous value.
func (x *Uint32) Swap(new uint32) (old uint32)
// CompareAndSwap executes the compare-and-swap operation for x.
func (x *Uint32) CompareAndSwap(old, new uint32) (swapped bool)
// Add atomically adds delta to x and returns the new value.
func (x *Uint32) Add(delta uint32) (new uint32)
* Atomic Value
type Value struct {}
// Load atomically loads and returns the value stored in x.
func (x *Value) Load() any
// Store atomically stores val into x.
func (x *Value) Store(val any)
// Swap atomically stores new into x and returns the previous value.
func (x *Value) Swap(new any) (old any)
// CompareAndSwap executes the compare-and-swap operation for x.
func (x *Value) CompareAndSwap(old, new any) (swapped bool)
* Config update
func main() {
var config atomic.Value // holds current server configuration
// Create initial config value and store into config.
config.Store(loadConfig())
go func() {
// Reload config every 10 seconds
// and update config value with the new version.
for {
time.Sleep(10 * time.Second)
config.Store(loadConfig())
}
}()
// Create worker goroutines that handle incoming requests
// using the latest config value.
for i := 0; i < 10; i++ {
go func() {
for r := range requests() {
c := config.Load()
// Handle request r using config c.
_, _ = r, c
}
}()
}
}
* Ленивая инициализация sync.Once.
var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image{
"spades.png": loadIcon("spades.png")
}
}
// NOTE: not concurrency-safe!
func Icon(name string) image.Image {
if icons == nil {
loadIcons() // one-time initialization
}
return icons[name]
}
* Ленивая инициализация sync.Once.
var mu sync.Mutex // guards icons
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
mu.Lock()
defer mu.Unlock()
if icons == nil {
loadIcons()
}
return icons[name]
}
* Ленивая инициализация sync.Once.
var loadIconsOnce sync.Once
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
* Concurrent cache
.play memo1/memo.go /type Memo/,/OMIT/
* Concurrent cache
.play memo2/memo.go /type Memo/,/OMIT/
* Concurrent cache
.play memo3/memo.go /func \(memo/,/OMIT/
* Concurrent cache
.play memo4/memo.go /type entry/,/OMIT/
* sync.Map
Что происходит, когда go совершает type cast из interface{}?
- Приведение к конкретному типу требует одной проверки на `==`
var v interface{}
x, ok := v.(int)
- Приведение к интерфейсу требует (в общем случае) сравнивать method set.
var v interface{}
x, ok := v.(io.Reader)
// Конкретный тип v должен иметь метод Read([]byte) (int, error)
- Для каждой пары `(concrete`type,`interface`type)` нужно знать, корректно ли такое приведение.
var cache map[typeConversion]conversionResult
* sync.Map
var cache map[typeConversion]conversionResult
Как защитить cache? `sync.Mutex`? Кеш из раннего примера?
Какой паттерн нагрузки?
- Запись в начале программы.
- Потом только чтения.
- Никогда не удаляем ключи.
- Не страшно сделать вычисление несколько раз.
Чего хотим?
- Скорость как у `map` без лока.
* sync.Map
var cache sync.Map
func convertType(from, to typ) *conversionResult {
key := typeConversion{from: from, to: to}
res, ok := cache.Load(key)
if ok {
return res.(*conversionResult)
}
res = doConversion(from, to)
cache.Store(key, res)
return res.(*conversionResult)
}
- `sync.Map` хранит две `map` внутри. `clean` и `dirty`.
- Обращение к `clean` всегда происходит без лока.
- Обращение к `dirty` требует лока.
- Периодически `dirty` повышается до `clean`.
* Как сделать sync.Map без двойного вычисления?
.play synconce/map.go /type result/,/OMIT/
* sync.Cond
type Once struct {
done, running bool
mu sync.Mutex
cond *sync.Cond
}
func (once *Once) Do(f func()) {
once.mu.Lock()
defer once.mu.Unlock()
if once.done {
return
}
if once.running {
once.cond.Wait() // releases and acquires mutex
return
}
once.running = true
once.mu.Unlock()
f()
once.mu.Lock()
once.done = true
once.cond.Broadcast()
}
* golang.org/x/sync
Пакеты в golang.org/x содержат код, который не вошёл в стандартную библиотеку.
- errgroup - `sync.WaitGroup` со встроенной обработкой ошибок
- semaphore - семафор
- singleflight - дедубликация вызовов
* context
type Context interface {
// Возвращает время, когда операция будет оповещена о необходимости завершения
Deadline() (deadline time.Time, ok bool)
// Возвращает канал, который будет закрыт при необходимости завершить операцию
// Служит в качестве инструмента оповещения об отмене
Done() <-chan struct{}
// Если Done не закрыт - возвращает nil.
// Если Done закрыт, Err ошибку с объяснением причины:
// - Canceled - контекст был отменен
// - DeadlineExceeded - наступил дедлайн.
// После возврашения не nil ошибки Err всегда возвращает данную ошибку.
Err() error
// Позволяет получить произвольный объект из контекста
Value(key interface{}) interface{}
}
- Отмена таймауты
- Передача request scoped значений
* context
Типы контекстов:
// root context
todo := context.TODO()
ctx := context.Background()
// manual cancel
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// manual cancel with explicit cause
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(fmt.Errorf("job not needed"))
// cancel by timeout
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second))
defer cancel()
* Отменяем операции
.code context/cancelation/cancelation.go /func SimpleCancelation()/,/OMIT/
.code context/cancelation/cancelation.go /func doSlowJob/,/OMIT/
* Отменяем операции
.code context/cancelation/cancelation.go /func SimpleTimeout()/,/OMIT/
.code context/cancelation/cancelation.go /func doSlowJob/,/OMIT/
* context в библиотеках Go
По соглашению `Context` всегда передается первым параметром в функции, обычно именуясь `ctx`.
database/sql.(*DB).QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error)
database/sql.(*DB).ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)
net/http.NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error)
golang.org/x/sync/errgroup.WithContext(ctx context.Context) (*Group, context.Context)
...
Быстрый пример:
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
defer cancel()
req, _ := http.NewRequestWithContext(ctx, "GET", "http://loremipsum.com", nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
// возможно тут будет DeadlineExceeded
}
* context и передача значений
.code context/value/value.go /type/,/OMIT/
.code context/value/value.go /func main/,/}/