shad-go/lectures/03-goroutines/lecture.slide

487 lines
10 KiB
Text
Raw Normal View History

2020-03-12 13:43:50 +00:00
Горутины и каналы
Лекция 4
Фёдор Короткий
* Горутины
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
* Spinner Example
.play spinner/main.go /func main()/,/END OMIT/
* Clock Server
.play clock1/main.go /func main()/,/^}/
* Clock Handler
.play clock1/main.go /func handleConn/,/^}/
* Concurrent Server
.play clock2/main.go /func main()/,/^}/
* Channels
ch := make(chan int)
- Канал - это *reference*type*.
- Нулевое значение канала - `nil`.
- Каналы поддерживают две операции: _send_ и _receive_.
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
- `close` закрывает канал.
- _send_ будут паниковать.
- _receive_ будут возвращать значения, которые успели записать в канал, и нулевые значения после этого.
close(ch)
ch <- x // panic
* Channel buffer
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel
- Посылка значения по каналу _happens_before_ получения этого значения в другой горутине.
* Netcat example
.play netcat3/main.go /func main()/,/^}/
* Netcat example
.play netcat4/main.go /func main()/,/^}/
* Pipeline
.play pipeline1/main.go /func main()/,/^}/
* Pipeline termination
Завершение `Counter`
go func() { // Counter
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
* Pipeline termination
Завершение `Squarer`
go func() { // Squarer
for {
x, ok := <-naturals
if !ok {
break
}
squares <- x * x
}
close(squares)
}()
Используя `range`.
go func() { // Squarer
for x := range naturals {
squares <- x * x
}
close(squares)
}()
* Unidirectional channel types
func counter(out chan int)
func squarer(out, in chan int)
func printer(in chan int)
Явные типы для _send_only_ и _receive_only_ каналов.
var ch chan int = make(chan int)
var send chan<- int = ch
var recv <-chan int = ch
func counter(out chan<- int)
func squarer(out chan<- int, in <-chan int)
2020-03-12 13:43:50 +00:00
func printer(in <-chan int)
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
* Buffered channels
ch := make(chan string, 3)
ch <- "A"
ch <- "B"
ch <- "C"
fmt.Println(<-ch) // "A"
// not very useful in concurrent program
fmt.Println(cap(ch)) // 3
fmt.Println(len(ch)) // 2
* Mirrored query
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
- Почему нельзя использовать `make(chan string)`?
* Parallel loop
// ImageFile reads an image from infile and writes
// a thumbnail-size version of it in the same directory.
// It returns the generated file name, e.g., "foo.thumb.jpg".
func ImageFile(infile string) (string, error)
// makeThumbnails makes thumbnails of the specified files.
func makeThumbnails(filenames []string) {
for _, f := range filenames {
if _, err := thumbnail.ImageFile(f); err != nil {
log.Println(err)
}
}
}
* Parallel loop
// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
for _, f := range filenames {
go thumbnail.ImageFile(f) // NOTE: ignoring errors
}
}
* Parallel loop
// makeThumbnails3 makes thumbnails of the specified files in parallel.
func makeThumbnails3(filenames []string) {
ch := make(chan struct{})
for _, f := range filenames {
go func(f string) {
thumbnail.ImageFile(f) // NOTE: ignoring errors
ch <- struct{}{}
}(f)
}
// Wait for goroutines to complete.
for range filenames {
<-ch
}
}
* Parallel loop
// makeThumbnails4 makes thumbnails for the specified files in parallel.
// It returns an error if any step failed.
func makeThumbnails4(filenames []string) error {
errors := make(chan error)
for _, f := range filenames {
go func(f string) {
_, err := thumbnail.ImageFile(f)
errors <- err
}(f)
}
for range filenames {
if err := <-errors; err != nil {
return err
}
}
return nil
}
- Найдите баг?)
* Parallel loop
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
type item struct {
thumbfile string
err error
}
ch := make(chan item, len(filenames))
for _, f := range filenames {
go func(f string) {
var it item
it.thumbfile, it.err = thumbnail.ImageFile(f)
ch <- it
}(f)
}
for range filenames {
it := <-ch
if it.err != nil {
return nil, it.err
}
thumbfiles = append(thumbfiles, it.thumbfile)
}
return thumbfiles, nil
}
* Parallel loop
func makeThumbnails5(filenames []string) {
var wg sync.WaitGroup
for _, f := range filenames {
wg.Add(1)
go func(f string) {
defer wg.Done()
_, _ = thumbnail.ImageFile(f)
}(f)
}
wg.Wait()
}
* Web Crawler
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
log.Print(err)
}
return list
}
* Web Crawler
func main() {
worklist := make(chan []string)
// Start with the command-line arguments.
go func() { worklist <- os.Args[1:] }()
// Crawl the web concurrently.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}
- Почему этот код упадёт?
* Web Crawler
// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)
func crawl(url string) []string {
fmt.Println(url)
tokens <- struct{}{} // acquire a token
list, err := links.Extract(url)
<-tokens // release the token
if err != nil {
log.Print(err)
}
return list
}
* Select
func main() {
fmt.Println("Commencing countdown.")
tick := time.Tick(1 * time.Second) // just example, use time.NewTicker.
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
<-tick
}
launch()
}
В другой горутине:
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
abort <- struct{}{}
}()
* Select
select {
2020-03-12 16:38:15 +00:00
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
2020-03-12 13:43:50 +00:00
}
- select блокируется, пока ни один из `case`-ов не может выполниться.
* Launch Abort
func main() {
// ...create abort channel...
fmt.Println("Commencing countdown. Press return to abort.")
select {
2020-03-12 16:38:15 +00:00
case <-time.After(10 * time.Second):
// Do nothing.
case <-abort:
fmt.Println("Launch aborted!")
return
2020-03-12 13:43:50 +00:00
}
launch()
}
* Ticker
func Tick(d time.Duration) <-chan time.Time {
ch := make(chan time.Time)
go func() {
2021-03-04 18:10:30 +00:00
for {
time.Sleep(d)
ch <- time.Now()
}
2020-03-12 16:36:38 +00:00
}()
2020-03-12 13:43:50 +00:00
return ch
}
- `time.Tick` нельзя остановить
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop() // cause the ticker's goroutine to terminate
<-ticker.C // receive from the ticker's channel
* Non blocking send & receive
select {
case <-ch:
// ...
default:
// ...
}
select {
case ch <- "A":
// ...
default:
// ...
}
* Directory Traversal
.play du1/main.go /func walkDir()/,/-walkDir/
* Directory Traversal
.play du1/main.go /func main/,/^}/
* Directory Traversal Progress
.play du2/main.go /Print the results/,/^\tprint/
* Concurrent Directory Traversal
.play du3/main.go /fileSizes/,/^\t}\(\)/
.play du3/main.go /func walkDir/,/^}/
* Concurrent Directory Traversal
.play du3/main.go /var sema/,/^}/
* Cancelation
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
// Cancel traversal when input is detected.
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
close(done)
}()
* Cancelation
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
// ...
}
}
* Cancelation
for {
select {
case <-done:
// Drain fileSizes to allow existing goroutines to finish.
for range fileSizes {
// Do nothing.
}
return
case size, ok := <-fileSizes:
// ...
}
}
* Cancelation
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // acquire token
case <-done:
return nil // cancelled
}
defer func() { <-sema }() // release token
// ...read directory...
}
* Chat server
.play chat/chat.go /func main/,/^}/
* Chat broadcaster
.play chat/chat.go /type client/,/^}/
* Chat client
.play chat/chat.go /func handle/,/END_WRITER/