2020-03-12 13:43:50 +00:00
|
|
|
|
Горутины и каналы
|
|
|
|
|
Лекция 4
|
|
|
|
|
|
|
|
|
|
Фёдор Короткий
|
|
|
|
|
|
|
|
|
|
* Горутины
|
|
|
|
|
|
|
|
|
|
f() // call f(); wait for it to return
|
|
|
|
|
go f() // create a new goroutine that calls f(); don't wait
|
|
|
|
|
|
|
|
|
|
* Spinner Example
|
|
|
|
|
|
|
|
|
|
.play spinner/main.go /func main()/,/END OMIT/
|
|
|
|
|
|
|
|
|
|
* Clock Server
|
|
|
|
|
|
|
|
|
|
.play clock1/main.go /func main()/,/^}/
|
|
|
|
|
|
|
|
|
|
* Clock Handler
|
|
|
|
|
|
|
|
|
|
.play clock1/main.go /func handleConn/,/^}/
|
|
|
|
|
|
|
|
|
|
* Concurrent Server
|
|
|
|
|
|
|
|
|
|
.play clock2/main.go /func main()/,/^}/
|
|
|
|
|
|
|
|
|
|
* Channels
|
|
|
|
|
|
|
|
|
|
ch := make(chan int)
|
|
|
|
|
|
|
|
|
|
- Канал - это *reference*type*.
|
|
|
|
|
- Нулевое значение канала - `nil`.
|
|
|
|
|
- Каналы поддерживают две операции: _send_ и _receive_.
|
|
|
|
|
|
|
|
|
|
ch <- x // a send statement
|
|
|
|
|
x = <-ch // a receive expression in an assignment statement
|
|
|
|
|
<-ch // a receive statement; result is discarded
|
|
|
|
|
|
|
|
|
|
- `close` закрывает канал.
|
|
|
|
|
- _send_ будут паниковать.
|
|
|
|
|
- _receive_ будут возвращать значения, которые успели записать в канал, и нулевые значения после этого.
|
|
|
|
|
|
|
|
|
|
close(ch)
|
|
|
|
|
ch <- x // panic
|
|
|
|
|
|
|
|
|
|
* Channel buffer
|
|
|
|
|
|
|
|
|
|
ch = make(chan int) // unbuffered channel
|
|
|
|
|
ch = make(chan int, 0) // unbuffered channel
|
|
|
|
|
ch = make(chan int, 3) // buffered channel
|
|
|
|
|
|
|
|
|
|
- Посылка значения по каналу _happens_before_ получения этого значения в другой горутине.
|
|
|
|
|
|
|
|
|
|
* Netcat example
|
|
|
|
|
|
|
|
|
|
.play netcat3/main.go /func main()/,/^}/
|
|
|
|
|
|
|
|
|
|
* Netcat example
|
|
|
|
|
|
|
|
|
|
.play netcat4/main.go /func main()/,/^}/
|
|
|
|
|
|
|
|
|
|
* Pipeline
|
|
|
|
|
|
|
|
|
|
.play pipeline1/main.go /func main()/,/^}/
|
|
|
|
|
|
|
|
|
|
* Pipeline termination
|
|
|
|
|
|
|
|
|
|
Завершение `Counter`
|
|
|
|
|
|
|
|
|
|
go func() { // Counter
|
|
|
|
|
for x := 0; x < 100; x++ {
|
|
|
|
|
naturals <- x
|
|
|
|
|
}
|
|
|
|
|
close(naturals)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
* Pipeline termination
|
|
|
|
|
|
|
|
|
|
Завершение `Squarer`
|
|
|
|
|
|
|
|
|
|
go func() { // Squarer
|
|
|
|
|
for {
|
|
|
|
|
x, ok := <-naturals
|
|
|
|
|
if !ok {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
squares <- x * x
|
|
|
|
|
}
|
|
|
|
|
close(squares)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
Используя `range`.
|
|
|
|
|
|
|
|
|
|
go func() { // Squarer
|
|
|
|
|
for x := range naturals {
|
|
|
|
|
squares <- x * x
|
|
|
|
|
}
|
|
|
|
|
close(squares)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
* Unidirectional channel types
|
|
|
|
|
|
|
|
|
|
func counter(out chan int)
|
|
|
|
|
func squarer(out, in chan int)
|
|
|
|
|
func printer(in chan int)
|
|
|
|
|
|
|
|
|
|
Явные типы для _send_only_ и _receive_only_ каналов.
|
|
|
|
|
|
|
|
|
|
var ch chan int = make(chan int)
|
|
|
|
|
var send chan<- int = ch
|
|
|
|
|
var recv <-chan int = ch
|
|
|
|
|
|
|
|
|
|
func counter(out chan<- int)
|
|
|
|
|
func squarer(out chan<- int, int <-chan int)
|
|
|
|
|
func printer(in <-chan int)
|
|
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
|
naturals := make(chan int)
|
|
|
|
|
squares := make(chan int)
|
|
|
|
|
go counter(naturals)
|
|
|
|
|
go squarer(squares, naturals)
|
|
|
|
|
printer(squares)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Buffered channels
|
|
|
|
|
|
|
|
|
|
ch := make(chan string, 3)
|
|
|
|
|
|
|
|
|
|
ch <- "A"
|
|
|
|
|
ch <- "B"
|
|
|
|
|
ch <- "C"
|
|
|
|
|
|
|
|
|
|
fmt.Println(<-ch) // "A"
|
|
|
|
|
|
|
|
|
|
// not very useful in concurrent program
|
|
|
|
|
fmt.Println(cap(ch)) // 3
|
|
|
|
|
fmt.Println(len(ch)) // 2
|
|
|
|
|
|
|
|
|
|
* Mirrored query
|
|
|
|
|
|
|
|
|
|
func mirroredQuery() string {
|
|
|
|
|
responses := make(chan string, 3)
|
|
|
|
|
go func() { responses <- request("asia.gopl.io") }()
|
|
|
|
|
go func() { responses <- request("europe.gopl.io") }()
|
|
|
|
|
go func() { responses <- request("americas.gopl.io") }()
|
|
|
|
|
return <-responses // return the quickest response
|
|
|
|
|
}
|
|
|
|
|
func request(hostname string) (response string) { /* ... */ }
|
|
|
|
|
|
|
|
|
|
- Почему нельзя использовать `make(chan string)`?
|
|
|
|
|
|
|
|
|
|
* Parallel loop
|
|
|
|
|
|
|
|
|
|
// ImageFile reads an image from infile and writes
|
|
|
|
|
// a thumbnail-size version of it in the same directory.
|
|
|
|
|
// It returns the generated file name, e.g., "foo.thumb.jpg".
|
|
|
|
|
func ImageFile(infile string) (string, error)
|
|
|
|
|
|
|
|
|
|
// makeThumbnails makes thumbnails of the specified files.
|
|
|
|
|
func makeThumbnails(filenames []string) {
|
|
|
|
|
for _, f := range filenames {
|
|
|
|
|
if _, err := thumbnail.ImageFile(f); err != nil {
|
|
|
|
|
log.Println(err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Parallel loop
|
|
|
|
|
|
|
|
|
|
// NOTE: incorrect!
|
|
|
|
|
func makeThumbnails2(filenames []string) {
|
|
|
|
|
for _, f := range filenames {
|
|
|
|
|
go thumbnail.ImageFile(f) // NOTE: ignoring errors
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Parallel loop
|
|
|
|
|
|
|
|
|
|
// makeThumbnails3 makes thumbnails of the specified files in parallel.
|
|
|
|
|
func makeThumbnails3(filenames []string) {
|
|
|
|
|
ch := make(chan struct{})
|
|
|
|
|
for _, f := range filenames {
|
|
|
|
|
go func(f string) {
|
|
|
|
|
thumbnail.ImageFile(f) // NOTE: ignoring errors
|
|
|
|
|
ch <- struct{}{}
|
|
|
|
|
}(f)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait for goroutines to complete.
|
|
|
|
|
for range filenames {
|
|
|
|
|
<-ch
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Parallel loop
|
|
|
|
|
|
|
|
|
|
// makeThumbnails4 makes thumbnails for the specified files in parallel.
|
|
|
|
|
// It returns an error if any step failed.
|
|
|
|
|
func makeThumbnails4(filenames []string) error {
|
|
|
|
|
errors := make(chan error)
|
|
|
|
|
for _, f := range filenames {
|
|
|
|
|
go func(f string) {
|
|
|
|
|
_, err := thumbnail.ImageFile(f)
|
|
|
|
|
errors <- err
|
|
|
|
|
}(f)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for range filenames {
|
|
|
|
|
if err := <-errors; err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- Найдите баг?)
|
|
|
|
|
|
|
|
|
|
* Parallel loop
|
|
|
|
|
|
|
|
|
|
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
|
|
|
|
|
type item struct {
|
|
|
|
|
thumbfile string
|
|
|
|
|
err error
|
|
|
|
|
}
|
|
|
|
|
ch := make(chan item, len(filenames))
|
|
|
|
|
for _, f := range filenames {
|
|
|
|
|
go func(f string) {
|
|
|
|
|
var it item
|
|
|
|
|
it.thumbfile, it.err = thumbnail.ImageFile(f)
|
|
|
|
|
ch <- it
|
|
|
|
|
}(f)
|
|
|
|
|
}
|
|
|
|
|
for range filenames {
|
|
|
|
|
it := <-ch
|
|
|
|
|
if it.err != nil {
|
|
|
|
|
return nil, it.err
|
|
|
|
|
}
|
|
|
|
|
thumbfiles = append(thumbfiles, it.thumbfile)
|
|
|
|
|
}
|
|
|
|
|
return thumbfiles, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Parallel loop
|
|
|
|
|
|
|
|
|
|
func makeThumbnails5(filenames []string) {
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
for _, f := range filenames {
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func(f string) {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
|
|
_, _ = thumbnail.ImageFile(f)
|
|
|
|
|
}(f)
|
|
|
|
|
}
|
|
|
|
|
wg.Wait()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Web Crawler
|
|
|
|
|
|
|
|
|
|
func crawl(url string) []string {
|
|
|
|
|
fmt.Println(url)
|
|
|
|
|
list, err := links.Extract(url)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Print(err)
|
|
|
|
|
}
|
|
|
|
|
return list
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Web Crawler
|
|
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
|
worklist := make(chan []string)
|
|
|
|
|
|
|
|
|
|
// Start with the command-line arguments.
|
|
|
|
|
go func() { worklist <- os.Args[1:] }()
|
|
|
|
|
|
|
|
|
|
// Crawl the web concurrently.
|
|
|
|
|
seen := make(map[string]bool)
|
|
|
|
|
for list := range worklist {
|
|
|
|
|
for _, link := range list {
|
|
|
|
|
if !seen[link] {
|
|
|
|
|
seen[link] = true
|
|
|
|
|
go func(link string) {
|
|
|
|
|
worklist <- crawl(link)
|
|
|
|
|
}(link)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- Почему этот код упадёт?
|
|
|
|
|
|
|
|
|
|
* Web Crawler
|
|
|
|
|
|
|
|
|
|
// tokens is a counting semaphore used to
|
|
|
|
|
// enforce a limit of 20 concurrent requests.
|
|
|
|
|
var tokens = make(chan struct{}, 20)
|
|
|
|
|
func crawl(url string) []string {
|
|
|
|
|
fmt.Println(url)
|
|
|
|
|
tokens <- struct{}{} // acquire a token
|
|
|
|
|
list, err := links.Extract(url)
|
|
|
|
|
<-tokens // release the token
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Print(err)
|
|
|
|
|
}
|
|
|
|
|
return list
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Select
|
|
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
|
fmt.Println("Commencing countdown.")
|
|
|
|
|
tick := time.Tick(1 * time.Second) // just example, use time.NewTicker.
|
|
|
|
|
for countdown := 10; countdown > 0; countdown-- {
|
|
|
|
|
fmt.Println(countdown)
|
|
|
|
|
<-tick
|
|
|
|
|
}
|
|
|
|
|
launch()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
В другой горутине:
|
|
|
|
|
|
|
|
|
|
abort := make(chan struct{})
|
|
|
|
|
go func() {
|
|
|
|
|
os.Stdin.Read(make([]byte, 1)) // read a single byte
|
|
|
|
|
abort <- struct{}{}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
* Select
|
|
|
|
|
|
|
|
|
|
select {
|
2020-03-12 16:38:15 +00:00
|
|
|
|
case <-ch1:
|
|
|
|
|
// ...
|
|
|
|
|
case x := <-ch2:
|
|
|
|
|
// ...use x...
|
|
|
|
|
case ch3 <- y:
|
|
|
|
|
// ...
|
|
|
|
|
default:
|
|
|
|
|
// ...
|
2020-03-12 13:43:50 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- select блокируется, пока ни один из `case`-ов не может выполниться.
|
|
|
|
|
|
|
|
|
|
* Launch Abort
|
|
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
|
// ...create abort channel...
|
|
|
|
|
fmt.Println("Commencing countdown. Press return to abort.")
|
|
|
|
|
select {
|
2020-03-12 16:38:15 +00:00
|
|
|
|
case <-time.After(10 * time.Second):
|
|
|
|
|
// Do nothing.
|
|
|
|
|
case <-abort:
|
|
|
|
|
fmt.Println("Launch aborted!")
|
|
|
|
|
return
|
2020-03-12 13:43:50 +00:00
|
|
|
|
}
|
|
|
|
|
launch()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Ticker
|
|
|
|
|
|
|
|
|
|
func Tick(d time.Duration) <-chan time.Time {
|
|
|
|
|
ch := make(chan time.Time)
|
|
|
|
|
go func() {
|
2021-03-04 18:10:30 +00:00
|
|
|
|
for {
|
|
|
|
|
time.Sleep(d)
|
|
|
|
|
ch <- time.Now()
|
|
|
|
|
}
|
2020-03-12 16:36:38 +00:00
|
|
|
|
}()
|
2020-03-12 13:43:50 +00:00
|
|
|
|
return ch
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- `time.Tick` нельзя остановить
|
|
|
|
|
|
|
|
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
|
|
|
defer ticker.Stop() // cause the ticker's goroutine to terminate
|
|
|
|
|
|
|
|
|
|
<-ticker.C // receive from the ticker's channel
|
|
|
|
|
|
|
|
|
|
* Non blocking send & receive
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-ch:
|
|
|
|
|
// ...
|
|
|
|
|
default:
|
|
|
|
|
// ...
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case ch <- "A":
|
|
|
|
|
// ...
|
|
|
|
|
default:
|
|
|
|
|
// ...
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Directory Traversal
|
|
|
|
|
|
|
|
|
|
.play du1/main.go /func walkDir()/,/-walkDir/
|
|
|
|
|
|
|
|
|
|
* Directory Traversal
|
|
|
|
|
|
|
|
|
|
.play du1/main.go /func main/,/^}/
|
|
|
|
|
|
|
|
|
|
* Directory Traversal Progress
|
|
|
|
|
|
|
|
|
|
.play du2/main.go /Print the results/,/^\tprint/
|
|
|
|
|
|
|
|
|
|
* Concurrent Directory Traversal
|
|
|
|
|
|
|
|
|
|
.play du3/main.go /fileSizes/,/^\t}\(\)/
|
|
|
|
|
|
|
|
|
|
.play du3/main.go /func walkDir/,/^}/
|
|
|
|
|
|
|
|
|
|
* Concurrent Directory Traversal
|
|
|
|
|
|
|
|
|
|
.play du3/main.go /var sema/,/^}/
|
|
|
|
|
|
|
|
|
|
* Cancelation
|
|
|
|
|
|
|
|
|
|
var done = make(chan struct{})
|
|
|
|
|
|
|
|
|
|
func cancelled() bool {
|
|
|
|
|
select {
|
|
|
|
|
case <-done:
|
|
|
|
|
return true
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Cancel traversal when input is detected.
|
|
|
|
|
go func() {
|
|
|
|
|
os.Stdin.Read(make([]byte, 1)) // read a single byte
|
|
|
|
|
close(done)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
* Cancelation
|
|
|
|
|
|
|
|
|
|
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
|
|
|
|
|
defer n.Done()
|
|
|
|
|
if cancelled() {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
for _, entry := range dirents(dir) {
|
|
|
|
|
// ...
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Cancelation
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-done:
|
|
|
|
|
// Drain fileSizes to allow existing goroutines to finish.
|
|
|
|
|
for range fileSizes {
|
|
|
|
|
// Do nothing.
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
case size, ok := <-fileSizes:
|
|
|
|
|
// ...
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Cancelation
|
|
|
|
|
|
|
|
|
|
func dirents(dir string) []os.FileInfo {
|
|
|
|
|
select {
|
|
|
|
|
case sema <- struct{}{}: // acquire token
|
|
|
|
|
case <-done:
|
|
|
|
|
return nil // cancelled
|
|
|
|
|
}
|
|
|
|
|
defer func() { <-sema }() // release token
|
|
|
|
|
// ...read directory...
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
* Chat server
|
|
|
|
|
|
|
|
|
|
.play chat/chat.go /func main/,/^}/
|
|
|
|
|
|
|
|
|
|
* Chat broadcaster
|
|
|
|
|
|
|
|
|
|
.play chat/chat.go /type client/,/^}/
|
|
|
|
|
|
|
|
|
|
* Chat client
|
|
|
|
|
|
|
|
|
|
.play chat/chat.go /func handle/,/END_WRITER/
|
|
|
|
|
|