go select
package main
import (
"fmt"
"time"
)
func generator() chan int{
out := make(chan int)
go func() {
i:=0
for true {
time.Sleep(time.Second)
out <- i
i++
}
}()
return out
}
func worker(id int, w chan int) {
for n := range w {
time.Sleep(time.Second * 5)
fmt.Printf("Worker %d received %d\n", id, n)
}
}
func createWorker(id int) chan<- int {
w := make(chan int)
go worker(id, w)
return w
}
func main() {
var c1, c2 = generator(), generator()
worker := createWorker(0)
// channel 是同步阻塞的,如果不想阻塞就可以使用select
n := 0
hasVaule := false
for {
var activeWorker chan<- int // activeWorker 默认是nil,所以不会case到
if hasVaule {
activeWorker = worker // worker 给activeWorker,会case到
}
select {
case n = <-c1:
hasVaule = true
case n = <-c2:
hasVaule = true
case activeWorker<-n: // 注意如果这里消耗太慢,前面的n可能被覆盖多次了
hasVaule = false
}
}
}
为了避免任务处理太慢,n被覆盖,所以我们把所有值放到values
package main
import (
"fmt"
"time"
)
func generator() chan int{
out := make(chan int)
go func() {
i:=0
for true {
time.Sleep(time.Second)
out <- i
i++
}
}()
return out
}
func worker(id int, w chan int) {
for n := range w {
time.Sleep(time.Second * 5)
fmt.Printf("Worker %d received %d\n", id, n)
}
}
func createWorker(id int) chan<- int {
w := make(chan int)
go worker(id, w)
return w
}
func main() {
var c1, c2 = generator(), generator()
worker := createWorker(0)
var values []int
for {
var activeWorker chan<- int
var activeValue int
// 只要values有内容,就把值送到worker
if len(values) > 0{
activeWorker = worker
activeValue = values[0]
}
select {
case n := <-c1:
values = append(values, n) // 把所有的值都放到values
case n := <-c2:
values = append(values, n) // 把所有的值都放到values
case activeWorker<-activeValue: // 每次从values拿一个值发送过去
values = values[1:]
}
}
}