Go select语句用法
Go语言的一大便利之处就是高效地编写并发程序,其中的两个主要机制就是goroutine和channel。在使用channel时,一个常见的任务就是从多个channel中取值进行处理。Go提供了select关键字完成这一操作,select就相当于通道版的switch。然而,因为channel的阻塞特性,使用select时有许多值得注意的地方。这篇文章将简要介绍使用select的通用模式。
用法1:
假设有两个通道,需要从这两个通道中取值,像下面这样的select语句就可以完成这样的操作:
package main import "fmt" // 这个文件演示了如何使用Go的Select用法 func producerA(c chan string) { c <- "A" } func producerB(c chan string) { c <- "B" } func getValue() { c1, c2 := make(chan string), make(chan string) go producerA(c1) go producerB(c2) for i := 0; i < 2; i++ { select { case msg := <-c1: fmt.Printf("c1: %s\n", msg) case msg := <-c2: fmt.Printf("c2: %s\n", msg) } } fmt.Println("finished.") }
用法2: 使用default
上面这种写法的局限就在于:getValue() 必须明确知道要获取的消息数目。如果稍微修改一下:
func producerA(c chan string) { for i := 0; i < 3; i++ { c <- "A" + strconv.Itoa(i) } }
虽然producer1A可以产生3条消息,但getValue()仍只能获得2条消息,因为for循环只执行了2遍。输出的结果,由于调度时机的不同,可能是:
c1: A0 c1: A1
也可能是:
c2: B c1: A0
所以,若要获得所有消息,则需要将getValue()中的for循环判断条件改为:i < 4
。如果将条件改为i < 8
,则会出现下面的结果:
c2: B c1: A0 c1: A1 c1: A2 fatal error: all goroutines are asleep - deadlock!
此时会得到一个异常:所有的goroutines都休眠了,死锁。这是因为channel是会阻塞的,当for循环取完c1、c2中的值以后,再次进入循环后,就阻塞在了select的位置。因为c1、c2再也不会有新值,所以程序会永远等待下去。
对于这种情况,select提供了default子语句,和switch的default也类似,当所有channel都无法取到值时,则会执行default中的语句。像下面这样修改getValue():
func getValue() { c1, c2 := make(chan string), make(chan string) go producerA(c1) go producerB(c2) for i := 0; i < 8; i++ { select { case msg := <-c1: fmt.Printf("c1: %s\n", msg) case msg := <-c2: fmt.Printf("c2: %s\n", msg) default: fmt.Printf("empty.%v\n", i) } } fmt.Println("finished.") }
此时根据对goroutine调度顺序的不同,结果可能是:
empty.0 c2: B c1: A0 empty.3 empty.4 empty.5 empty.6 empty.7 finished.
也可能是(还有其他可能就不列举了):
empty.0 c1: A0 c2: B empty.3 empty.4 empty.5 empty.6 empty.7 finished.
此时又出现了新问题:还没有等到取完所有的值,循环就结束了。
对于这个例子,可以通过在default中加入time.Sleep()方法来解决:
func getValue() { c1, c2 := make(chan string), make(chan string) go producerA(c1) go producerB(c2) for i := 0; i < 8; i++ { select { case msg := <-c1: fmt.Printf("c1: %s\n", msg) case msg := <-c2: fmt.Printf("c2: %s\n", msg) default: time.Sleep(time.Millisecond) fmt.Printf("empty.%v\n", i) } } fmt.Println("finished.") }
其输出结果为:
empty.0 c1: A0 empty.2 c1: A1 c2: B c1: A2 empty.6 empty.7 finished.
也可以换一种写法,使用计数器,当default语句的执行次数达到一定数目时,退出循环:
func getValue() { c1, c2 := make(chan string), make(chan string) go producerA(c1) go producerB(c2) i := 0 Outer: for { select { case msg := <-c1: fmt.Printf("c1: %s\n", msg) case msg := <-c2: fmt.Printf("c2: %s\n", msg) default: i++ fmt.Printf("empty.%v\n", i) time.Sleep(time.Millisecond) if i >= 8 { break Outer } } } fmt.Println("finished.") }
这里需要定义一个Outer标签,如果直接在select语句中使用break,那么它仅仅是退出select选择(break总是退出离自己最近的for、switch或者select),而非退出外部的for循环,此时就会造成程序死循环,迅速耗尽CPU资源。
用法3:关闭channel
上面的用法仍存在问题:很多时候,channel可能来自第三方类库(通过参数传入),并非由我们自己定义,它何时结束传值我们并不知道。例如,再次修改下producerA方法:
func producerA(c chan string) { for i := 0; i < 3; i++ { time.Sleep(time.Duration(3) * time.Millisecond) c <- "A" + strconv.Itoa(i) } }
再次运行getValue(),则会出现for循环已经退出了,但是channel还没有结束传值的情况。这样就会漏掉一些值的处理。
此时,可以通过检测channel是否关闭来判断传值是否结束。采用这种方式时,channel的写入方需要显示地执行close(chan)方法,修改producerA()和producerB(),在写入完成后调用close()方法:
func producerA(c chan string) { for i := 0; i < 3; i++ { c <- "A" + strconv.Itoa(i) } close(c) } func producerB(c chan string) { c <- "B" close(c) }
接下来修改getValue()方法如下:
func getValue() { c1, c2 := make(chan string), make(chan string) go producerA(c1) go producerB(c2) ok1, ok2 := true, true msg := "" for { select { case msg, ok1 = <-c1: if ok1 { fmt.Printf("c1: %v, ok1: %v\n", msg, ok1) } case msg, ok2 = <-c2: if ok2 { fmt.Printf("c2: %v, ok2: %v\n", msg, ok2) } } if !ok1 && !ok2 { fmt.Println("all channel closed") break } } fmt.Println("finished.") }
这里核心的修改就是:msg, ok1 = <-c1:
,当channel未关闭时,ok1为true,当channel关闭时,ok1为false。输出的结果如下:
c2: B, ok2: true c1: A0, ok1: true c1: A1, ok1: true c1: A2, ok1: true all channel closed finished.
此时看似完美地解决了问题,但是这里有一点特别重要,就是:channel关闭后总是可以从已关闭的channle中获取值,不再阻塞,且取得的是channel类型的零值。对于int来说是0,对于此处的字符串来说,就是空字符串。
如果取消掉上面的判断语句:
if !ok1 && !ok2 { fmt.Println("all channel closed") break }
则for将会陷入死循环!
测试起见,取消掉for循环中 if ok1
和 if ok2
判断条件:
for { select { case msg, ok1 = <-c1: fmt.Printf("c1: %v, ok1: %v\n", msg, ok1) case msg, ok2 = <-c2: fmt.Printf("c2: %v, ok2: %v\n", msg, ok2) } if !ok1 && !ok2 { fmt.Println("all channel closed") break } }
则会看到类似下面的输出:
c2: B, ok2: true c2: , ok2: false c2: , ok2: false c1: A0, ok1: true c2: , ok2: false ... (省略若干行) c2: , ok2: false c1: A1, ok1: true c2: , ok2: false ... (省略若干行) c2: , ok2: false c1: A2, ok1: true c2: , ok2: false ... (省略若干行) c1: , ok1: false all channel closed finished.
可以看到:在c2关闭之后,每次执行for循环,遇到 case <-c2: 时,都会进入case子句。
用法4:使用超时
上面的做法是getValue()明确的知道producerA()和producerB()方法会通过close()关闭channel。如果getValue()无法确定producer()方法是否会关闭channel,那么通常是采用超时判断:如果超过指定一段时间没有收到channel的消息,则退出循环。
此时,除了上面演示过的使用在default中添加time.Sleep()来完成以外,还可以通过time.After()方法来完成,该方法返回一个channel,并当时间满足参数设定时,channel中将可获取值,进而执行case子句。
简单起见,暂时删除producerB()方法,修改代码如下:
func producerA(c chan string) { for i := 0; i < 3; i++ { time.Sleep(time.Second) c <- "A" + strconv.Itoa(i) } close(c) } func getValue() { c1, c2 := make(chan string), make(chan string) go producerA(c1) go producerB(c2) ok1 := true msg := "" defer fmt.Println("finished.") i := 0 for { select { case msg, ok1 = <-c1: fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i) case <-time.After(500 * time.Millisecond): fmt.Printf("timeout, i:%v \n", i) } if !ok1 { fmt.Println("all channel closed") break } i++ } }
上面这段代码最主要的是加入了 case <-time.After(500 * time.Millisecond):
,这段代码将会在通道等待超时500毫秒时执行,执行完成后将进入下一次for循环,此时计时将会重新开始。这段代码输出如下:
timeout, i:0 timeout, i:1 c1: A0, ok1: true, i:2 timeout, i:3 c1: A1, ok1: true, i:4 timeout, i:5 c1: A2, ok1: true, i:6 c1: , ok1: false, i:7 all channel closed finished.
如本小节开头所说,有时候并不知道通道是否关闭了。如果注释掉producerA上面的close(c),继续执行getValue(),则会陷入死循环:
timeout, i:0 c1: A0, ok1: true, i:1 timeout, i:2 timeout, i:3 c1: A1, ok1: true, i:4 timeout, i:5 c1: A2, ok1: true, i:6 timeout, i:7 timeout, i:8 timeout, i:9 timeout, i:10 timeout, i:11 timeout, i:12 timeout, i:13 ...
通常情况下,这里会设置一个比较大的值,在达到这个值时便直接退出for循环。这里可以使用前面介绍的通过设置标签,然后break的方式退出外部for循环,也可以使用return,然后把本来在for循环执行完成后需要执行的代码放到defer关键词后:
从程序清晰度方面考虑,不用标签会更好一些。
func getValue() { c1, c2 := make(chan string), make(chan string) go producerA(c1) go producerB(c2) ok1 := true msg := "" defer fmt.Println("finished.") i := 0 for { select { case msg, ok1 = <-c1: fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i) case <-time.After(1100 * time.Millisecond): fmt.Printf("timeout, i:%v \n", i) return } if !ok1 { fmt.Println("all channel closed") break } i++ } }
注意上面 case <-time.After(1100 * time.Millisecond):
这行代码,由500改为了1100,因为它必须要大于producerA()中的等待时间,否则就会出现还没来得及接收就直接超时退出了。在不知道producerA()写入channel的时间的情况下,可以设置一个更大的值,比如5秒钟。
输出如下:
c1: A0, ok1: true, i:0 c1: A1, ok1: true, i:1 c1: A2, ok1: true, i:2 timeout, i:3 finished.
注意并没有“all channel closed”的输出,因为channel并没有进行关闭。
目前看来一切都挺正常,但这里藏着一个巨大的陷阱。现在我们将producerB()添加回来,注意它执行了close(c):
func producerA(c chan string) { for i := 0; i < 3; i++ { time.Sleep(time.Second) c <- "A" + strconv.Itoa(i) } } func producerB(c chan string) { c <- "B" close(c) } func getValue() { c1, c2 := make(chan string), make(chan string) go producerA(c1) go producerB(c2) ok1, ok2 := true, true msg := "" defer fmt.Println("finished.") i := 0 for { select { case msg, ok1 = <-c1: fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i) case msg, ok2 = <-c2: if ok2 { fmt.Printf("c2: %v, ok2: %v, i:%v \n", msg, ok2, i) } case <-time.After(5 * time.Second): fmt.Printf("timeout, i:%v \n", i) return } if !ok1 && !ok2 { fmt.Println("all channel closed") break } i++ } }
如果执行上面的代码,产生的输出大致是这样的:
c2: B, ok2: true, i:0 c1: A0, ok1: true, i:1491475 c1: A1, ok1: true, i:2859182 c1: A2, ok1: true, i:4703505
如果你观察一下CPU,会发现现在是100%。因为上面的代码又陷入了死循环,因为c2已经关闭了,所以 case msg, ok2 = <-c2:
立即返回了 空字符串 和 false。导致 case <-time.After(5 * time.Second):
失效了,永远也达不到超时的时间。造成程序反复执行这段代码:
case msg, ok2 = <-c2: if ok2 { fmt.Printf("c2: %v, ok2: %v, i:%v \n", msg, ok2, i) }
为了解决这个问题,依然要依赖于计时器和超时退出策略。注意这里的超时退出无法定义在default中,因为c2已经关闭了,会不断地返回零值,根本不会执行到default。此时可以将策略写在 case msg, ok2 = &-c2:
子句中,代码如下所示:
func getValue() { c1, c2 := make(chan string), make(chan string) go producerA(c1) go producerB(c2) ok1, ok2 := true, true msg := "" defer fmt.Println("finished.") i := 0 j := 0 // 只为c2做的定时器 for { select { case msg, ok1 = <-c1: if ok1 { j = 0 // 重置j定时器 fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i) } case msg, ok2 = <-c2: if ok2 { fmt.Printf("c2: %v, ok2: %v, i:%v \n", msg, ok2, i) } else { time.Sleep(time.Millisecond) // 如果10秒钟c1没有消息,则退出 if j > 10000 { fmt.Print("c2 return.") return } } case <-time.After(5 * time.Second): fmt.Printf("timeout, i:%v \n", i) return } if !ok1 && !ok2 { fmt.Println("all channel closed") break } i++ j++ } }
上面的代码会在c1 10秒钟收不到消息后退出。然而,上面的代码不具通用性,如果有更多的channel,代码将会变得愈发复杂。这里我一时半会儿也没有好的解决方案。可以看到,当两个channel都不调用close,或者都调用close,都是比较容易解决的。前者通过default解决,后者通过case语句获得管道状态解决。但是当一个管道关闭,一个管道不关闭时情况就变得难以处理。
因此,当使用自己创建的管道时,在发送完数据后,总是关闭管道是比较好的编程事件。
总结
这篇文章简要介绍了Go语言在使用select关键字处理Channel时可能遇到的问题,这里面有很多隐藏的陷阱,稍不留意就可能写出无响应的程序或者无限循环的程序。
感谢阅读,希望这篇文章能给你带来帮助!