《The Way to Go》Chapter14 协程
Chapter 14
协程(goroutine)与通道(channel)
Go为构建并发程序的基本代码块是协程与通道,需要语言、编译器和runtime支持。Go语言提供的垃圾回收器对并发编程至关重要(?)。
原则:不要通过共享内存来通信,而通过通信来共享内存。
并发、并行和协程
- 复习:进程的复数线程是共享同一地址空间的一起工作的执行体,只有在多核、多处理器上并发程序才能做到并行。
- Go中,并发处理的部分被称为
goroutines(go协程)
,在协程与操作系统线程之间无一对一的关系;协程是根据一个/多个线程的可用性,映射在它们之上的。“协程调度器”在Go运行时完成这个工作。 - 协程是轻量的,比线程更轻。在堆中创建协程,且协程对栈进行了分割,从而动态增加缩减内存的使用。
- 协程通过关键字go调用(执行一个函数或方法)来实现,调用后即会分配出独立的栈,开始同时进行的计算过程(并发 / 并行);协程的栈会根据需要进行伸缩,不会出现栈溢出(?)
- main函数也被看作是一个go协程
- 当main函数返回的时候,程序退出,不会等待任何非main协程的结束,所以在服务器程序中,一般会给每个请求启动一个协程,而server函数使用一个无限循环来保持运行状态
使用 GOMAXPROCS
- 设置环境变量,或者使用
runtime.GOMAXPROCS(n)
来设置允许运行时使用多少个操作系统线程,如果未设置GOMAXPROCS
,则所有协程会共享同一个线程。 - 经验:对n个核心的情况设置GOMAXPROCS为n-1来获得最佳性能
- 可以遵循的规则:协程的数量 > 1 + GOMAXPROCS > 1
Go协程与协程
Go协程与其他语言中的协程有两点不同:
- Go协程意味着并行,但协程一般来说不是这样的
- Go协程通过通道channel来通信;协程通过让出(await)和恢复(async)操作来通信
协程间的信道
Go将通道(channel)作为一种特殊的类型,用于发送类型化数据的管道,由其负责协程之间的通信。在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争
通道也是引用类型,通道的声明与实例化方式
var ch1 chan string ch1 = make(chan string) // 精简形式 ch1 := make(chan string)
使用操作符
<-
来表示数据的传输,ch <- int1
表示用通道ch
发送变量int1
,int2 = ← ch
表示,从通道ch
中取出数据,赋值给int2
通道的发送和接收都是原子操作,它们互不干扰地完成
通道阻塞
默认情况下通道是同步且无缓冲的 → 发送/接收操作在对方处理完之前是阻塞的:
- 如果通道中的数据无人接收,就无法再给通道传入其他数据,发送者阻塞;发送操作会等待到通道重新变为可用状态,即通道中没有数据 / 通道为空的情况 → 即发完之后如果通道没有被接收者处理清空,发送操作阻塞
- 如果通道中没有数据,接收者阻塞;接收操作会等待到通道重新变为可用状态,即通道中有数据 / 通道已满的情况 → 即接收完之后如果通道没有被发送者塞满,读取操作阻塞
不带缓冲的通道传递有点像两个人传递东西,如果传出去没有对方接走,人就离开了(不阻塞),东西就会掉下去。传出去必须要等到对方接走,自己才能继续做其他事情,所以是同步的。
func f1(in chan int) {
fmt.Println(<-in)
}
func main() {
out := make(chan int)
out <- 2
go f1(out) // out 中的 2 无人接收,main协程中的'go f1(out)'被阻塞
}
同步通道-使用带缓冲的通道
不带缓冲的通道容量是0,通信要双方都准备好了才能进行,容量1和无缓冲绝对是不一样的。
拓展make命令来设置通道容量,用于提供缓存
buf := 100
ch1 := make(chan string, buf) // buf 是通道可以容纳元素的个数
容量大于0的通道就是异步的了,对于发送操作来说,在发送到通道满载之前不会阻塞;对于接收操作来说,在接收到通道变空之前也不会阻塞;
信号量模式
可以在协程结束时往通道中放置一个值作为处理结束的信号,而在main()
协程中等待从这个通道中取得值,来保证main
不会提前结束
type Empty interface {}
var empty Empty
...
data := make([]float64, N)
res := make([]float64, N)
sem := make(chan Empty, N)
...
// 开始N个协程
for i, xi := range data {
go func (i int, xi float64) {
res[i] = doSomething(i, xi)
sem <- empty
} (i, xi)
}
// 等待所有的协程结束
for i := 0; i < N; i++ { <-sem }
使用带缓冲通道实现信号量
声明空接口类型,并声明信号量
type Empty interface {}
type semaphore chan Empty
将可用资源的数量 N
来初始化信号量 semaphore
:sem
= make(semaphore, N)
便可定义P操作与V操作
// acquire n resources
func (s semaphore) P(n int) {
e := new(Empty)
for i := 0; i < n; i++ {
s <- e
}
}
// release n resources
func (s semaphore) V(n int) {
for i:= 0; i < n; i++{
<- s
}
}
通道的方向
可以通过注解来表示通道只发送或者只接收数据,如下
func source(ch chan<- int){
for { ch <- 1 }
}
func sink(ch <-chan int) {
for { <-ch }
}
通道在创建时都是双向的,它们创建后被用于分配给有方向的通道变量
关闭通道
通道可以被显式地关闭,只有接收者确定不会提供新的值才需要关闭通道
只有发送者需要关闭通道,对于接收者来说关闭通道没有意义
通过逗号ok模式来检测通道是否被关闭
v, ok := <-ch if !ok { break } process(v)
使用for-range来读取通道,会自动检测通道是否关闭
for input := range ch { process(input) }
使用select切换协程
select {
case u:= <- ch1:
...
case v:= <- ch2:
...
...
default: // no value ready to be received
...
}
- 从不同并发执行的协程中获取值可以通过
select
完成,它选择处理列出的多个通道其中一个:- 如果通道都阻塞了,会等待直到其中一个可以处理
- 如果有多个通道可以处理,会随机选择出一个
- 如果没有通道可以处理并且有
default
语句,那么default
语句将执行
- 在无限循环中使用一个
select
语句,来实现一种监听模式。在此模式下,使用break
来使循环退出
通道、超时和计时器(Ticker)
在工厂函数
time.NewTicker
中以Duration
类型的参数传入:func NewTicker(dur) *Ticker
,时间间隔单位纳秒使用
time.Tick()
来返回一个指定时间间隔发送时间的通道定时器
Timer
和计时器Ticker
类似,但定时器只发送一次时间,使用time.After(d Duration)
来返回一个通道场景:从多个数据库同时读取,只需要接收首先到达的答案:这个场景也演示了一个发送数据而不是接收数据的
select
func Query(conns []Conn, query string) Result { ch := make(chan Result, 1) for _, conn := range conns { // 向多个数据库发起请求 go func(c Conn) { select { case ch <- c.DoQuery(query): // 如果还没有到达的答案,结果会被放入通道 default: // 否则进入到default语句退出 } }(conn) } return <- ch // 接收者,仅返回第一个到达通道的结果 }
注意:使用容量为1的通道ch,这样不会导致第一个结果到达时由于接收者
← ch
没有准备好而无法被接收
新旧模型对比:任务和worker
对于任何可以建模为 Master-Worker 范例的问题,一个类似于 worker 使用通道进行通信和交互、Master 进行整体协调的方案都能完美解决。
怎么选择是该使用锁还是通道?
下面列出一个普遍的经验法则:
- 使用锁的情景:
- 访问共享数据结构中的缓存信息
- 保存应用程序上下文和状态信息数据
- 使用通道的情景:
- 与异步操作的结果进行交互
- 分发任务
- 传递数据所有权
当你发现你的锁使用规则变得很复杂时,可以反省使用通道会不会使问题变得简单些。
惰性生成器的实现
生成器是指被调用时返回一个序列中下一个值的函数,生成器只在需要的时候求值,同时保留相关变量资源,是一项在需要时对表达式进行求值的技术。
可以通过传入匿名函数,将函数的计算结果输出到通道,再返回另一个函数作为生成器,用于将通道中的内容输出。需要生成器结果时调用函数即可。
限制同时处理的请求数
使用带缓冲区的通道可以很容易地实现,其缓冲区容量就是同时处理请求的最大数量。
下述示例当信号通道表示缓冲区已满时,handle() 函数会阻塞且不再处理其他请求。
package main
const MAXREQS = 50
var sem = make(chan int, MAXREQS)
func process(r *Request) {
// do something
}
func handle(r *Request) {
sem <- 1 // doesn't matter what we put in it
process(r)
<-sem // one empty place in the buffer: the next request can start
}
链式协程
这样的通道表达式也是可行的:
func f(left, right chan int) { left <- 1 + <-right }
在多核心上并行计算
如果需要把计算量分配到每一个处理器上,让每一部分与其他部分并行,可以先用带缓冲区的通道,在每一个计算部分完成后发送信号;同时还要通过runtime.GOMAXPROCS(N)限定使用的处理器个数。
使用通道并发访问对象
可以使用协程在后台顺序执行匿名函数来替代使用同步互斥锁。比如在构造函数中启动一个后台协程,然后把对象的读写都实现为匿名函数,让其按顺序处理,有效地将它们序列化从而提供了安全的并发访问。
// 构造时开启协程
func NewPerson(name string, salary float64) *Person {
p := &Person{name, salary, make(chan func())}
go p.backend()
return p
}
func (p *Person) backend() {
for f := range p.chF {
f()
}
}
// 读写func()
// Set salary.
func (p *Person) SetSalary(sal float64) {
p.chF <- func() { p.salary = sal }
}
// Retrieve salary.
func (p *Person) Salary() float64 {
fChan := make(chan float64)
p.chF <- func() { fChan <- p.salary }
return <-fChan
}