Горутины и каналы Лекция 4 Фёдор Короткий * Горутины f() // call f(); wait for it to return go f() // create a new goroutine that calls f(); don't wait * Spinner Example .play spinner/main.go /func main()/,/END OMIT/ * Clock Server .play clock1/main.go /func main()/,/^}/ Для проверки работоспособности можно выполнить в unix-консоли команду: nc localhost 8000 * Clock Handler .play clock1/main.go /func handleConn/,/^}/ * Concurrent Server .play clock2/main.go /func main()/,/^}/ * Channels ch := make(chan int) - Канал - это *reference*type*. - Нулевое значение канала - `nil`. - Каналы поддерживают две операции: _send_ и _receive_. ch <- x // a send statement x = <-ch // a receive expression in an assignment statement <-ch // a receive statement; result is discarded - `close` закрывает канал. - _send_ будут паниковать. - _receive_ будут возвращать значения, которые успели записать в канал, и нулевые значения после этого. close(ch) ch <- x // panic * Channel buffer ch = make(chan int) // unbuffered channel ch = make(chan int, 0) // unbuffered channel ch = make(chan int, 3) // buffered channel - Посылка значения по каналу _happens_before_ получения этого значения в другой горутине. * Netcat example .play netcat3/main.go /func main()/,/^}/ * Netcat example .play netcat4/main.go /func main()/,/^}/ * Pipeline .play pipeline1/main.go /func main()/,/^}/ * Pipeline termination Завершение `Counter` go func() { // Counter for x := 0; x < 100; x++ { naturals <- x } close(naturals) }() * Pipeline termination Завершение `Squarer` go func() { // Squarer for { x, ok := <-naturals if !ok { break } squares <- x * x } close(squares) }() Используя `range`. go func() { // Squarer for x := range naturals { squares <- x * x } close(squares) }() * Unidirectional channel types func counter(out chan int) func squarer(out, in chan int) func printer(in chan int) Явные типы для _send_only_ и _receive_only_ каналов. var ch chan int = make(chan int) var send chan<- int = ch var recv <-chan int = ch func counter(out chan<- int) func squarer(out chan<- int, in <-chan int) func printer(in <-chan int) func main() { naturals := make(chan int) squares := make(chan int) go counter(naturals) go squarer(squares, naturals) printer(squares) } * Buffered channels ch := make(chan string, 3) ch <- "A" ch <- "B" ch <- "C" fmt.Println(<-ch) // "A" // not very useful in concurrent program fmt.Println(cap(ch)) // 3 fmt.Println(len(ch)) // 2 * Mirrored query func mirroredQuery() string { responses := make(chan string, 3) go func() { responses <- request("asia.gopl.io") }() go func() { responses <- request("europe.gopl.io") }() go func() { responses <- request("americas.gopl.io") }() return <-responses // return the quickest response } func request(hostname string) (response string) { /* ... */ } - Почему нельзя использовать `make(chan string)`? * Parallel loop // ImageFile reads an image from infile and writes // a thumbnail-size version of it in the same directory. // It returns the generated file name, e.g., "foo.thumb.jpg". func ImageFile(infile string) (string, error) // makeThumbnails makes thumbnails of the specified files. func makeThumbnails(filenames []string) { for _, f := range filenames { if _, err := thumbnail.ImageFile(f); err != nil { log.Println(err) } } } * Parallel loop // NOTE: incorrect! func makeThumbnails2(filenames []string) { for _, f := range filenames { go thumbnail.ImageFile(f) // NOTE: ignoring errors } } * Parallel loop // makeThumbnails3 makes thumbnails of the specified files in parallel. func makeThumbnails3(filenames []string) { ch := make(chan struct{}) for _, f := range filenames { go func(f string) { thumbnail.ImageFile(f) // NOTE: ignoring errors ch <- struct{}{} }(f) } // Wait for goroutines to complete. for range filenames { <-ch } } * Parallel loop // makeThumbnails4 makes thumbnails for the specified files in parallel. // It returns an error if any step failed. func makeThumbnails4(filenames []string) error { errors := make(chan error) for _, f := range filenames { go func(f string) { _, err := thumbnail.ImageFile(f) errors <- err }(f) } for range filenames { if err := <-errors; err != nil { return err } } return nil } - Найдите баг?) * Parallel loop func makeThumbnails5(filenames []string) (thumbfiles []string, err error) { type item struct { thumbfile string err error } ch := make(chan item, len(filenames)) for _, f := range filenames { go func(f string) { var it item it.thumbfile, it.err = thumbnail.ImageFile(f) ch <- it }(f) } for range filenames { it := <-ch if it.err != nil { return nil, it.err } thumbfiles = append(thumbfiles, it.thumbfile) } return thumbfiles, nil } * Parallel loop func makeThumbnails5(filenames []string) { var wg sync.WaitGroup for _, f := range filenames { wg.Add(1) go func(f string) { defer wg.Done() _, _ = thumbnail.ImageFile(f) }(f) } wg.Wait() } * Web Crawler func crawl(url string) []string { fmt.Println(url) list, err := links.Extract(url) if err != nil { log.Print(err) } return list } * Web Crawler func main() { worklist := make(chan []string) // Start with the command-line arguments. go func() { worklist <- os.Args[1:] }() // Crawl the web concurrently. seen := make(map[string]bool) for list := range worklist { for _, link := range list { if !seen[link] { seen[link] = true go func(link string) { worklist <- crawl(link) }(link) } } } } - Почему этот код упадёт? * Web Crawler // tokens is a counting semaphore used to // enforce a limit of 20 concurrent requests. var tokens = make(chan struct{}, 20) func crawl(url string) []string { fmt.Println(url) tokens <- struct{}{} // acquire a token list, err := links.Extract(url) <-tokens // release the token if err != nil { log.Print(err) } return list } * Select func main() { fmt.Println("Commencing countdown.") tick := time.Tick(1 * time.Second) // just example, use time.NewTicker. for countdown := 10; countdown > 0; countdown-- { fmt.Println(countdown) <-tick } launch() } В другой горутине: abort := make(chan struct{}) go func() { os.Stdin.Read(make([]byte, 1)) // read a single byte abort <- struct{}{} }() * Select select { case <-ch1: // ... case x := <-ch2: // ...use x... case ch3 <- y: // ... default: // ... } - select блокируется, пока ни один из `case`-ов не может выполниться. * Launch Abort func main() { // ...create abort channel... fmt.Println("Commencing countdown. Press return to abort.") select { case <-time.After(10 * time.Second): // Do nothing. case <-abort: fmt.Println("Launch aborted!") return } launch() } * Ticker func Tick(d time.Duration) <-chan time.Time { ch := make(chan time.Time) go func() { for { time.Sleep(d) ch <- time.Now() } }() return ch } - `time.Tick` нельзя остановить ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() // cause the ticker's goroutine to terminate <-ticker.C // receive from the ticker's channel * Non blocking send & receive select { case <-ch: // ... default: // ... } select { case ch <- "A": // ... default: // ... } * Directory Traversal .play du1/main.go /func walkDir()/,/-walkDir/ * Directory Traversal .play du1/main.go /func main/,/^}/ * Directory Traversal Progress .play du2/main.go /Print the results/,/^\tprint/ * Concurrent Directory Traversal .play du3/main.go /fileSizes/,/^\t}\(\)/ .play du3/main.go /func walkDir/,/^}/ * Concurrent Directory Traversal .play du3/main.go /var sema/,/^}/ * Cancelation var done = make(chan struct{}) func cancelled() bool { select { case <-done: return true default: return false } } // Cancel traversal when input is detected. go func() { os.Stdin.Read(make([]byte, 1)) // read a single byte close(done) }() * Cancelation func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) { defer n.Done() if cancelled() { return } for _, entry := range dirents(dir) { // ... } } * Cancelation for { select { case <-done: // Drain fileSizes to allow existing goroutines to finish. for range fileSizes { // Do nothing. } return case size, ok := <-fileSizes: // ... } } * Cancelation func dirents(dir string) []os.FileInfo { select { case sema <- struct{}{}: // acquire token case <-done: return nil // cancelled } defer func() { <-sema }() // release token // ...read directory... } * Chat server .play chat/chat.go /func main/,/^}/ * Chat broadcaster .play chat/chat.go /type client/,/^}/ * Chat client .play chat/chat.go /func handle/,/END_WRITER/