shad-go/lectures/03-goroutines/lecture.slide

490 lines
10 KiB
Text
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

Горутины и каналы
Лекция 4
Фёдор Короткий
* Горутины
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
* Spinner Example
.play spinner/main.go /func main()/,/END OMIT/
* Clock Server
.play clock1/main.go /func main()/,/^}/
Для проверки работоспособности можно выполнить в unix-консоли команду:
nc localhost 8000
* Clock Handler
.play clock1/main.go /func handleConn/,/^}/
* Concurrent Server
.play clock2/main.go /func main()/,/^}/
* Channels
ch := make(chan int)
- Канал - это *reference*type*.
- Нулевое значение канала - `nil`.
- Каналы поддерживают две операции: _send_ и _receive_.
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
- `close` закрывает канал.
- _send_ будут паниковать.
- _receive_ будут возвращать значения, которые успели записать в канал, и нулевые значения после этого.
close(ch)
ch <- x // panic
* Channel buffer
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel
- Посылка значения по каналу _happens_before_ получения этого значения в другой горутине.
* Netcat example
.play netcat3/main.go /func main()/,/^}/
* Netcat example
.play netcat4/main.go /func main()/,/^}/
* Pipeline
.play pipeline1/main.go /func main()/,/^}/
* Pipeline termination
Завершение `Counter`
go func() { // Counter
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
* Pipeline termination
Завершение `Squarer`
go func() { // Squarer
for {
x, ok := <-naturals
if !ok {
break
}
squares <- x * x
}
close(squares)
}()
Используя `range`.
go func() { // Squarer
for x := range naturals {
squares <- x * x
}
close(squares)
}()
* Unidirectional channel types
func counter(out chan int)
func squarer(out, in chan int)
func printer(in chan int)
Явные типы для _send_only_ и _receive_only_ каналов.
var ch chan int = make(chan int)
var send chan<- int = ch
var recv <-chan int = ch
func counter(out chan<- int)
func squarer(out chan<- int, in <-chan int)
func printer(in <-chan int)
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
* Buffered channels
ch := make(chan string, 3)
ch <- "A"
ch <- "B"
ch <- "C"
fmt.Println(<-ch) // "A"
// not very useful in concurrent program
fmt.Println(cap(ch)) // 3
fmt.Println(len(ch)) // 2
* Mirrored query
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
- Почему нельзя использовать `make(chan string)`?
* Parallel loop
// ImageFile reads an image from infile and writes
// a thumbnail-size version of it in the same directory.
// It returns the generated file name, e.g., "foo.thumb.jpg".
func ImageFile(infile string) (string, error)
// makeThumbnails makes thumbnails of the specified files.
func makeThumbnails(filenames []string) {
for _, f := range filenames {
if _, err := thumbnail.ImageFile(f); err != nil {
log.Println(err)
}
}
}
* Parallel loop
// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
for _, f := range filenames {
go thumbnail.ImageFile(f) // NOTE: ignoring errors
}
}
* Parallel loop
// makeThumbnails3 makes thumbnails of the specified files in parallel.
func makeThumbnails3(filenames []string) {
ch := make(chan struct{})
for _, f := range filenames {
go func(f string) {
thumbnail.ImageFile(f) // NOTE: ignoring errors
ch <- struct{}{}
}(f)
}
// Wait for goroutines to complete.
for range filenames {
<-ch
}
}
* Parallel loop
// makeThumbnails4 makes thumbnails for the specified files in parallel.
// It returns an error if any step failed.
func makeThumbnails4(filenames []string) error {
errors := make(chan error)
for _, f := range filenames {
go func(f string) {
_, err := thumbnail.ImageFile(f)
errors <- err
}(f)
}
for range filenames {
if err := <-errors; err != nil {
return err
}
}
return nil
}
- Найдите баг?)
* Parallel loop
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
type item struct {
thumbfile string
err error
}
ch := make(chan item, len(filenames))
for _, f := range filenames {
go func(f string) {
var it item
it.thumbfile, it.err = thumbnail.ImageFile(f)
ch <- it
}(f)
}
for range filenames {
it := <-ch
if it.err != nil {
return nil, it.err
}
thumbfiles = append(thumbfiles, it.thumbfile)
}
return thumbfiles, nil
}
* Parallel loop
func makeThumbnails5(filenames []string) {
var wg sync.WaitGroup
for _, f := range filenames {
wg.Add(1)
go func(f string) {
defer wg.Done()
_, _ = thumbnail.ImageFile(f)
}(f)
}
wg.Wait()
}
* Web Crawler
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
log.Print(err)
}
return list
}
* Web Crawler
func main() {
worklist := make(chan []string)
// Start with the command-line arguments.
go func() { worklist <- os.Args[1:] }()
// Crawl the web concurrently.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}
- Почему этот код упадёт?
* Web Crawler
// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)
func crawl(url string) []string {
fmt.Println(url)
tokens <- struct{}{} // acquire a token
list, err := links.Extract(url)
<-tokens // release the token
if err != nil {
log.Print(err)
}
return list
}
* Select
func main() {
fmt.Println("Commencing countdown.")
tick := time.Tick(1 * time.Second) // just example, use time.NewTicker.
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
<-tick
}
launch()
}
В другой горутине:
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
abort <- struct{}{}
}()
* Select
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
- select блокируется, пока ни один из `case`-ов не может выполниться.
* Launch Abort
func main() {
// ...create abort channel...
fmt.Println("Commencing countdown. Press return to abort.")
select {
case <-time.After(10 * time.Second):
// Do nothing.
case <-abort:
fmt.Println("Launch aborted!")
return
}
launch()
}
* Ticker
func Tick(d time.Duration) <-chan time.Time {
ch := make(chan time.Time)
go func() {
for {
time.Sleep(d)
ch <- time.Now()
}
}()
return ch
}
- `time.Tick` нельзя остановить
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop() // cause the ticker's goroutine to terminate
<-ticker.C // receive from the ticker's channel
* Non blocking send & receive
select {
case <-ch:
// ...
default:
// ...
}
select {
case ch <- "A":
// ...
default:
// ...
}
* Directory Traversal
.play du1/main.go /func walkDir()/,/-walkDir/
* Directory Traversal
.play du1/main.go /func main/,/^}/
* Directory Traversal Progress
.play du2/main.go /Print the results/,/^\tprint/
* Concurrent Directory Traversal
.play du3/main.go /fileSizes/,/^\t}\(\)/
.play du3/main.go /func walkDir/,/^}/
* Concurrent Directory Traversal
.play du3/main.go /var sema/,/^}/
* Cancelation
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
// Cancel traversal when input is detected.
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
close(done)
}()
* Cancelation
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
// ...
}
}
* Cancelation
for {
select {
case <-done:
// Drain fileSizes to allow existing goroutines to finish.
for range fileSizes {
// Do nothing.
}
return
case size, ok := <-fileSizes:
// ...
}
}
* Cancelation
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // acquire token
case <-done:
return nil // cancelled
}
defer func() { <-sema }() // release token
// ...read directory...
}
* Chat server
.play chat/chat.go /func main/,/^}/
* Chat broadcaster
.play chat/chat.go /type client/,/^}/
* Chat client
.play chat/chat.go /func handle/,/END_WRITER/