Go select语句用法
Go语言的一大便利之处就是高效地编写并发程序,其中的两个主要机制就是goroutine和channel。在使用channel时,一个常见的任务就是从多个channel中取值进行处理。Go提供了select关键字完成这一操作,select就相当于通道版的switch。然而,因为channel的阻塞特性,使用select时有许多值得注意的地方。这篇文章将简要介绍使用select的通用模式。
用法1:
假设有两个通道,需要从这两个通道中取值,像下面这样的select语句就可以完成这样的操作:
package main
import "fmt"
// 这个文件演示了如何使用Go的Select用法
func producerA(c chan string) {
c <- "A"
}
func producerB(c chan string) {
c <- "B"
}
func getValue() {
c1, c2 := make(chan string), make(chan string)
go producerA(c1)
go producerB(c2)
for i := 0; i < 2; i++ {
select {
case msg := <-c1:
fmt.Printf("c1: %s\n", msg)
case msg := <-c2:
fmt.Printf("c2: %s\n", msg)
}
}
fmt.Println("finished.")
}
用法2: 使用default
上面这种写法的局限就在于:getValue() 必须明确知道要获取的消息数目。如果稍微修改一下:
func producerA(c chan string) {
for i := 0; i < 3; i++ {
c <- "A" + strconv.Itoa(i)
}
}
虽然producer1A可以产生3条消息,但getValue()仍只能获得2条消息,因为for循环只执行了2遍。输出的结果,由于调度时机的不同,可能是:
c1: A0 c1: A1
也可能是:
c2: B c1: A0
所以,若要获得所有消息,则需要将getValue()中的for循环判断条件改为:i < 4。如果将条件改为i < 8,则会出现下面的结果:
c2: B c1: A0 c1: A1 c1: A2 fatal error: all goroutines are asleep - deadlock!
此时会得到一个异常:所有的goroutines都休眠了,死锁。这是因为channel是会阻塞的,当for循环取完c1、c2中的值以后,再次进入循环后,就阻塞在了select的位置。因为c1、c2再也不会有新值,所以程序会永远等待下去。
对于这种情况,select提供了default子语句,和switch的default也类似,当所有channel都无法取到值时,则会执行default中的语句。像下面这样修改getValue():
func getValue() {
c1, c2 := make(chan string), make(chan string)
go producerA(c1)
go producerB(c2)
for i := 0; i < 8; i++ {
select {
case msg := <-c1:
fmt.Printf("c1: %s\n", msg)
case msg := <-c2:
fmt.Printf("c2: %s\n", msg)
default:
fmt.Printf("empty.%v\n", i)
}
}
fmt.Println("finished.")
}
此时根据对goroutine调度顺序的不同,结果可能是:
empty.0 c2: B c1: A0 empty.3 empty.4 empty.5 empty.6 empty.7 finished.
也可能是(还有其他可能就不列举了):
empty.0 c1: A0 c2: B empty.3 empty.4 empty.5 empty.6 empty.7 finished.
此时又出现了新问题:还没有等到取完所有的值,循环就结束了。
对于这个例子,可以通过在default中加入time.Sleep()方法来解决:
func getValue() {
c1, c2 := make(chan string), make(chan string)
go producerA(c1)
go producerB(c2)
for i := 0; i < 8; i++ {
select {
case msg := <-c1:
fmt.Printf("c1: %s\n", msg)
case msg := <-c2:
fmt.Printf("c2: %s\n", msg)
default:
time.Sleep(time.Millisecond)
fmt.Printf("empty.%v\n", i)
}
}
fmt.Println("finished.")
}
其输出结果为:
empty.0 c1: A0 empty.2 c1: A1 c2: B c1: A2 empty.6 empty.7 finished.
也可以换一种写法,使用计数器,当default语句的执行次数达到一定数目时,退出循环:
func getValue() {
c1, c2 := make(chan string), make(chan string)
go producerA(c1)
go producerB(c2)
i := 0
Outer:
for {
select {
case msg := <-c1:
fmt.Printf("c1: %s\n", msg)
case msg := <-c2:
fmt.Printf("c2: %s\n", msg)
default:
i++
fmt.Printf("empty.%v\n", i)
time.Sleep(time.Millisecond)
if i >= 8 {
break Outer
}
}
}
fmt.Println("finished.")
}
这里需要定义一个Outer标签,如果直接在select语句中使用break,那么它仅仅是退出select选择(break总是退出离自己最近的for、switch或者select),而非退出外部的for循环,此时就会造成程序死循环,迅速耗尽CPU资源。
用法3:关闭channel
上面的用法仍存在问题:很多时候,channel可能来自第三方类库(通过参数传入),并非由我们自己定义,它何时结束传值我们并不知道。例如,再次修改下producerA方法:
func producerA(c chan string) {
for i := 0; i < 3; i++ {
time.Sleep(time.Duration(3) * time.Millisecond)
c <- "A" + strconv.Itoa(i)
}
}
再次运行getValue(),则会出现for循环已经退出了,但是channel还没有结束传值的情况。这样就会漏掉一些值的处理。
此时,可以通过检测channel是否关闭来判断传值是否结束。采用这种方式时,channel的写入方需要显示地执行close(chan)方法,修改producerA()和producerB(),在写入完成后调用close()方法:
func producerA(c chan string) {
for i := 0; i < 3; i++ {
c <- "A" + strconv.Itoa(i)
}
close(c)
}
func producerB(c chan string) {
c <- "B"
close(c)
}
接下来修改getValue()方法如下:
func getValue() {
c1, c2 := make(chan string), make(chan string)
go producerA(c1)
go producerB(c2)
ok1, ok2 := true, true
msg := ""
for {
select {
case msg, ok1 = <-c1:
if ok1 {
fmt.Printf("c1: %v, ok1: %v\n", msg, ok1)
}
case msg, ok2 = <-c2:
if ok2 {
fmt.Printf("c2: %v, ok2: %v\n", msg, ok2)
}
}
if !ok1 && !ok2 {
fmt.Println("all channel closed")
break
}
}
fmt.Println("finished.")
}
这里核心的修改就是:msg, ok1 = <-c1:,当channel未关闭时,ok1为true,当channel关闭时,ok1为false。输出的结果如下:
c2: B, ok2: true c1: A0, ok1: true c1: A1, ok1: true c1: A2, ok1: true all channel closed finished.
此时看似完美地解决了问题,但是这里有一点特别重要,就是:channel关闭后总是可以从已关闭的channle中获取值,不再阻塞,且取得的是channel类型的零值。对于int来说是0,对于此处的字符串来说,就是空字符串。
如果取消掉上面的判断语句:
if !ok1 && !ok2 {
fmt.Println("all channel closed")
break
}
则for将会陷入死循环!
测试起见,取消掉for循环中 if ok1 和 if ok2 判断条件:
for {
select {
case msg, ok1 = <-c1:
fmt.Printf("c1: %v, ok1: %v\n", msg, ok1)
case msg, ok2 = <-c2:
fmt.Printf("c2: %v, ok2: %v\n", msg, ok2)
}
if !ok1 && !ok2 {
fmt.Println("all channel closed")
break
}
}
则会看到类似下面的输出:
c2: B, ok2: true c2: , ok2: false c2: , ok2: false c1: A0, ok1: true c2: , ok2: false ... (省略若干行) c2: , ok2: false c1: A1, ok1: true c2: , ok2: false ... (省略若干行) c2: , ok2: false c1: A2, ok1: true c2: , ok2: false ... (省略若干行) c1: , ok1: false all channel closed finished.
可以看到:在c2关闭之后,每次执行for循环,遇到 case <-c2: 时,都会进入case子句。
用法4:使用超时
上面的做法是getValue()明确的知道producerA()和producerB()方法会通过close()关闭channel。如果getValue()无法确定producer()方法是否会关闭channel,那么通常是采用超时判断:如果超过指定一段时间没有收到channel的消息,则退出循环。
此时,除了上面演示过的使用在default中添加time.Sleep()来完成以外,还可以通过time.After()方法来完成,该方法返回一个channel,并当时间满足参数设定时,channel中将可获取值,进而执行case子句。
简单起见,暂时删除producerB()方法,修改代码如下:
func producerA(c chan string) {
for i := 0; i < 3; i++ {
time.Sleep(time.Second)
c <- "A" + strconv.Itoa(i)
}
close(c)
}
func getValue() {
c1, c2 := make(chan string), make(chan string)
go producerA(c1)
go producerB(c2)
ok1 := true
msg := ""
defer fmt.Println("finished.")
i := 0
for {
select {
case msg, ok1 = <-c1:
fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i)
case <-time.After(500 * time.Millisecond):
fmt.Printf("timeout, i:%v \n", i)
}
if !ok1 {
fmt.Println("all channel closed")
break
}
i++
}
}
上面这段代码最主要的是加入了 case <-time.After(500 * time.Millisecond): ,这段代码将会在通道等待超时500毫秒时执行,执行完成后将进入下一次for循环,此时计时将会重新开始。这段代码输出如下:
timeout, i:0 timeout, i:1 c1: A0, ok1: true, i:2 timeout, i:3 c1: A1, ok1: true, i:4 timeout, i:5 c1: A2, ok1: true, i:6 c1: , ok1: false, i:7 all channel closed finished.
如本小节开头所说,有时候并不知道通道是否关闭了。如果注释掉producerA上面的close(c),继续执行getValue(),则会陷入死循环:
timeout, i:0 c1: A0, ok1: true, i:1 timeout, i:2 timeout, i:3 c1: A1, ok1: true, i:4 timeout, i:5 c1: A2, ok1: true, i:6 timeout, i:7 timeout, i:8 timeout, i:9 timeout, i:10 timeout, i:11 timeout, i:12 timeout, i:13 ...
通常情况下,这里会设置一个比较大的值,在达到这个值时便直接退出for循环。这里可以使用前面介绍的通过设置标签,然后break的方式退出外部for循环,也可以使用return,然后把本来在for循环执行完成后需要执行的代码放到defer关键词后:
从程序清晰度方面考虑,不用标签会更好一些。
func getValue() {
c1, c2 := make(chan string), make(chan string)
go producerA(c1)
go producerB(c2)
ok1 := true
msg := ""
defer fmt.Println("finished.")
i := 0
for {
select {
case msg, ok1 = <-c1:
fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i)
case <-time.After(1100 * time.Millisecond):
fmt.Printf("timeout, i:%v \n", i)
return
}
if !ok1 {
fmt.Println("all channel closed")
break
}
i++
}
}
注意上面 case <-time.After(1100 * time.Millisecond): 这行代码,由500改为了1100,因为它必须要大于producerA()中的等待时间,否则就会出现还没来得及接收就直接超时退出了。在不知道producerA()写入channel的时间的情况下,可以设置一个更大的值,比如5秒钟。
输出如下:
c1: A0, ok1: true, i:0 c1: A1, ok1: true, i:1 c1: A2, ok1: true, i:2 timeout, i:3 finished.
注意并没有“all channel closed”的输出,因为channel并没有进行关闭。
目前看来一切都挺正常,但这里藏着一个巨大的陷阱。现在我们将producerB()添加回来,注意它执行了close(c):
func producerA(c chan string) {
for i := 0; i < 3; i++ {
time.Sleep(time.Second)
c <- "A" + strconv.Itoa(i)
}
}
func producerB(c chan string) {
c <- "B"
close(c)
}
func getValue() {
c1, c2 := make(chan string), make(chan string)
go producerA(c1)
go producerB(c2)
ok1, ok2 := true, true
msg := ""
defer fmt.Println("finished.")
i := 0
for {
select {
case msg, ok1 = <-c1:
fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i)
case msg, ok2 = <-c2:
if ok2 {
fmt.Printf("c2: %v, ok2: %v, i:%v \n", msg, ok2, i)
}
case <-time.After(5 * time.Second):
fmt.Printf("timeout, i:%v \n", i)
return
}
if !ok1 && !ok2 {
fmt.Println("all channel closed")
break
}
i++
}
}
如果执行上面的代码,产生的输出大致是这样的:
c2: B, ok2: true, i:0 c1: A0, ok1: true, i:1491475 c1: A1, ok1: true, i:2859182 c1: A2, ok1: true, i:4703505
如果你观察一下CPU,会发现现在是100%。因为上面的代码又陷入了死循环,因为c2已经关闭了,所以 case msg, ok2 = <-c2: 立即返回了 空字符串 和 false。导致 case <-time.After(5 * time.Second): 失效了,永远也达不到超时的时间。造成程序反复执行这段代码:
case msg, ok2 = <-c2:
if ok2 {
fmt.Printf("c2: %v, ok2: %v, i:%v \n", msg, ok2, i)
}
为了解决这个问题,依然要依赖于计时器和超时退出策略。注意这里的超时退出无法定义在default中,因为c2已经关闭了,会不断地返回零值,根本不会执行到default。此时可以将策略写在 case msg, ok2 = &-c2: 子句中,代码如下所示:
func getValue() {
c1, c2 := make(chan string), make(chan string)
go producerA(c1)
go producerB(c2)
ok1, ok2 := true, true
msg := ""
defer fmt.Println("finished.")
i := 0
j := 0 // 只为c2做的定时器
for {
select {
case msg, ok1 = <-c1:
if ok1 {
j = 0 // 重置j定时器
fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i)
}
case msg, ok2 = <-c2:
if ok2 {
fmt.Printf("c2: %v, ok2: %v, i:%v \n", msg, ok2, i)
} else {
time.Sleep(time.Millisecond)
// 如果10秒钟c1没有消息,则退出
if j > 10000 {
fmt.Print("c2 return.")
return
}
}
case <-time.After(5 * time.Second):
fmt.Printf("timeout, i:%v \n", i)
return
}
if !ok1 && !ok2 {
fmt.Println("all channel closed")
break
}
i++
j++
}
}
上面的代码会在c1 10秒钟收不到消息后退出。然而,上面的代码不具通用性,如果有更多的channel,代码将会变得愈发复杂。这里我一时半会儿也没有好的解决方案。可以看到,当两个channel都不调用close,或者都调用close,都是比较容易解决的。前者通过default解决,后者通过case语句获得管道状态解决。但是当一个管道关闭,一个管道不关闭时情况就变得难以处理。
因此,当使用自己创建的管道时,在发送完数据后,总是关闭管道是比较好的编程事件。
总结
这篇文章简要介绍了Go语言在使用select关键字处理Channel时可能遇到的问题,这里面有很多隐藏的陷阱,稍不留意就可能写出无响应的程序或者无限循环的程序。
感谢阅读,希望这篇文章能给你带来帮助!