张子阳的博客

首页 读书 技术 店铺 关于
张子阳的博客 首页 读书 技术 关于

Go select语句用法

2019-06-05 张子阳 分类: Go 语言

Go语言的一大便利之处就是高效地编写并发程序,其中的两个主要机制就是goroutine和channel。在使用channel时,一个常见的任务就是从多个channel中取值进行处理。Go提供了select关键字完成这一操作,select就相当于通道版的switch。然而,因为channel的阻塞特性,使用select时有许多值得注意的地方。这篇文章将简要介绍使用select的通用模式。

用法1:

假设有两个通道,需要从这两个通道中取值,像下面这样的select语句就可以完成这样的操作:

package main

import "fmt"

// 这个文件演示了如何使用Go的Select用法
func producerA(c chan string) {
    c <- "A"
}

func producerB(c chan string) {    
    c <- "B"
}

func getValue() {
    c1, c2 := make(chan string), make(chan string)
    go producerA(c1)
    go producerB(c2)

    for i := 0; i < 2; i++ {
        select {
        case msg := <-c1:
            fmt.Printf("c1: %s\n", msg)
        case msg := <-c2:
            fmt.Printf("c2: %s\n", msg)
        }
    }
    fmt.Println("finished.")
}

用法2: 使用default

上面这种写法的局限就在于:getValue() 必须明确知道要获取的消息数目。如果稍微修改一下:

func producerA(c chan string) {
    for i := 0; i < 3; i++ {
        c <- "A" + strconv.Itoa(i)
    }
}

虽然producer1A可以产生3条消息,但getValue()仍只能获得2条消息,因为for循环只执行了2遍。输出的结果,由于调度时机的不同,可能是:

c1: A0
c1: A1

也可能是:

c2: B
c1: A0

所以,若要获得所有消息,则需要将getValue()中的for循环判断条件改为:i < 4。如果将条件改为i < 8,则会出现下面的结果:

c2: B
c1: A0
c1: A1
c1: A2
fatal error: all goroutines are asleep - deadlock!

此时会得到一个异常:所有的goroutines都休眠了,死锁。这是因为channel是会阻塞的,当for循环取完c1、c2中的值以后,再次进入循环后,就阻塞在了select的位置。因为c1、c2再也不会有新值,所以程序会永远等待下去。

对于这种情况,select提供了default子语句,和switch的default也类似,当所有channel都无法取到值时,则会执行default中的语句。像下面这样修改getValue():

func getValue() {
    c1, c2 := make(chan string), make(chan string)
    go producerA(c1)
    go producerB(c2)

    for i := 0; i < 8; i++ {
        select {
        case msg := <-c1:
            fmt.Printf("c1: %s\n", msg)
        case msg := <-c2:
            fmt.Printf("c2: %s\n", msg)
        default:
            fmt.Printf("empty.%v\n", i)
        }
    }
    fmt.Println("finished.")
}

此时根据对goroutine调度顺序的不同,结果可能是:

empty.0
c2: B
c1: A0
empty.3
empty.4
empty.5
empty.6
empty.7
finished.

也可能是(还有其他可能就不列举了):

empty.0
c1: A0
c2: B
empty.3
empty.4
empty.5
empty.6
empty.7
finished.

此时又出现了新问题:还没有等到取完所有的值,循环就结束了。

对于这个例子,可以通过在default中加入time.Sleep()方法来解决:

func getValue() {
    c1, c2 := make(chan string), make(chan string)
    go producerA(c1)
    go producerB(c2)

    for i := 0; i < 8; i++ {
        select {
        case msg := <-c1:
            fmt.Printf("c1: %s\n", msg)
        case msg := <-c2:
            fmt.Printf("c2: %s\n", msg)
        default:
            time.Sleep(time.Millisecond)
            fmt.Printf("empty.%v\n", i)
        }
    }
    fmt.Println("finished.")
}

其输出结果为:

empty.0
c1: A0
empty.2
c1: A1
c2: B
c1: A2
empty.6
empty.7
finished.

也可以换一种写法,使用计数器,当default语句的执行次数达到一定数目时,退出循环:

func getValue() {
    c1, c2 := make(chan string), make(chan string)
    go producerA(c1)
    go producerB(c2)

    i := 0

Outer:
    for {
        select {
        case msg := <-c1:
            fmt.Printf("c1: %s\n", msg)
        case msg := <-c2:
            fmt.Printf("c2: %s\n", msg)
        default:
            i++
            fmt.Printf("empty.%v\n", i)
            time.Sleep(time.Millisecond)
            if i >= 8 {
                break Outer
            }
        }
    }

    fmt.Println("finished.")
}
这里需要定义一个Outer标签,如果直接在select语句中使用break,那么它仅仅是退出select选择(break总是退出离自己最近的for、switch或者select),而非退出外部的for循环,此时就会造成程序死循环,迅速耗尽CPU资源。

用法3:关闭channel

上面的用法仍存在问题:很多时候,channel可能来自第三方类库(通过参数传入),并非由我们自己定义,它何时结束传值我们并不知道。例如,再次修改下producerA方法:

func producerA(c chan string) {
    for i := 0; i < 3; i++ {
        time.Sleep(time.Duration(3) * time.Millisecond)
        c <- "A" + strconv.Itoa(i)
    }
}

再次运行getValue(),则会出现for循环已经退出了,但是channel还没有结束传值的情况。这样就会漏掉一些值的处理。

此时,可以通过检测channel是否关闭来判断传值是否结束。采用这种方式时,channel的写入方需要显示地执行close(chan)方法,修改producerA()和producerB(),在写入完成后调用close()方法:

func producerA(c chan string) {
    for i := 0; i < 3; i++ {
        c <- "A" + strconv.Itoa(i)
    }
    close(c)
}

func producerB(c chan string) {
    c <- "B"
    close(c)
}

接下来修改getValue()方法如下:

func getValue() {
    c1, c2 := make(chan string), make(chan string)
    go producerA(c1)
    go producerB(c2)

    ok1, ok2 := true, true
    msg := ""

    for {
        select {
        case msg, ok1 = <-c1:
            if ok1 {
                fmt.Printf("c1: %v, ok1: %v\n", msg, ok1)
            }
        case msg, ok2 = <-c2:
            if ok2 {
                fmt.Printf("c2: %v, ok2: %v\n", msg, ok2)
            }
        }
        if !ok1 && !ok2 {
            fmt.Println("all channel closed")
            break
        }
    }
    fmt.Println("finished.")
}

这里核心的修改就是:msg, ok1 = <-c1:,当channel未关闭时,ok1为true,当channel关闭时,ok1为false。输出的结果如下:

c2: B, ok2: true
c1: A0, ok1: true
c1: A1, ok1: true
c1: A2, ok1: true
all channel closed
finished.

此时看似完美地解决了问题,但是这里有一点特别重要,就是:channel关闭后总是可以从已关闭的channle中获取值,不再阻塞,且取得的是channel类型的零值。对于int来说是0,对于此处的字符串来说,就是空字符串。

如果取消掉上面的判断语句:

if !ok1 && !ok2 {
    fmt.Println("all channel closed")
    break
}

则for将会陷入死循环!

测试起见,取消掉for循环中 if ok1if ok2 判断条件:

for {
    select {
    case msg, ok1 = <-c1:
        fmt.Printf("c1: %v, ok1: %v\n", msg, ok1)        
    case msg, ok2 = <-c2:        
        fmt.Printf("c2: %v, ok2: %v\n", msg, ok2)        
    }
    if !ok1 && !ok2 {
        fmt.Println("all channel closed")
        break
    }
}

则会看到类似下面的输出:

c2: B, ok2: true
c2: , ok2: false
c2: , ok2: false
c1: A0, ok1: true
c2: , ok2: false
... (省略若干行)
c2: , ok2: false
c1: A1, ok1: true
c2: , ok2: false
... (省略若干行)
c2: , ok2: false
c1: A2, ok1: true
c2: , ok2: false
... (省略若干行)
c1: , ok1: false
all channel closed
finished.

可以看到:在c2关闭之后,每次执行for循环,遇到 case <-c2: 时,都会进入case子句

用法4:使用超时

上面的做法是getValue()明确的知道producerA()和producerB()方法会通过close()关闭channel。如果getValue()无法确定producer()方法是否会关闭channel,那么通常是采用超时判断:如果超过指定一段时间没有收到channel的消息,则退出循环。

此时,除了上面演示过的使用在default中添加time.Sleep()来完成以外,还可以通过time.After()方法来完成,该方法返回一个channel,并当时间满足参数设定时,channel中将可获取值,进而执行case子句。

简单起见,暂时删除producerB()方法,修改代码如下:

func producerA(c chan string) {
    for i := 0; i < 3; i++ {
        time.Sleep(time.Second)
        c <- "A" + strconv.Itoa(i)
    }
    close(c)
}    
    
func getValue() {
    c1, c2 := make(chan string), make(chan string)
    go producerA(c1)
    go producerB(c2)

    ok1 := true
    msg := ""
    defer fmt.Println("finished.")

    i := 0

    for {
        select {
        case msg, ok1 = <-c1:
            fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i)
        case <-time.After(500 * time.Millisecond):
            fmt.Printf("timeout, i:%v \n", i)
        }
        if !ok1 {
            fmt.Println("all channel closed")
            break
        }
        i++
    }
}

上面这段代码最主要的是加入了 case <-time.After(500 * time.Millisecond): ,这段代码将会在通道等待超时500毫秒时执行,执行完成后将进入下一次for循环,此时计时将会重新开始。这段代码输出如下:

timeout, i:0 
timeout, i:1 
c1: A0, ok1: true, i:2 
timeout, i:3 
c1: A1, ok1: true, i:4 
timeout, i:5 
c1: A2, ok1: true, i:6 
c1: , ok1: false, i:7 
all channel closed
finished.

如本小节开头所说,有时候并不知道通道是否关闭了。如果注释掉producerA上面的close(c),继续执行getValue(),则会陷入死循环:

timeout, i:0 
c1: A0, ok1: true, i:1 
timeout, i:2 
timeout, i:3 
c1: A1, ok1: true, i:4 
timeout, i:5 
c1: A2, ok1: true, i:6 
timeout, i:7 
timeout, i:8 
timeout, i:9 
timeout, i:10 
timeout, i:11 
timeout, i:12 
timeout, i:13
...

通常情况下,这里会设置一个比较大的值,在达到这个值时便直接退出for循环。这里可以使用前面介绍的通过设置标签,然后break的方式退出外部for循环,也可以使用return,然后把本来在for循环执行完成后需要执行的代码放到defer关键词后:

从程序清晰度方面考虑,不用标签会更好一些。
func getValue() {
    c1, c2 := make(chan string), make(chan string)
    go producerA(c1)
    go producerB(c2)

    ok1 := true
    msg := ""
    defer fmt.Println("finished.")

    i := 0

    for {
        select {
        case msg, ok1 = <-c1:
            fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i)
        case <-time.After(1100 * time.Millisecond):
            fmt.Printf("timeout, i:%v \n", i)
            return
        }

        if !ok1 {
            fmt.Println("all channel closed")
            break
        }
        i++
    }
}

注意上面 case <-time.After(1100 * time.Millisecond): 这行代码,由500改为了1100,因为它必须要大于producerA()中的等待时间,否则就会出现还没来得及接收就直接超时退出了。在不知道producerA()写入channel的时间的情况下,可以设置一个更大的值,比如5秒钟。

输出如下:

c1: A0, ok1: true, i:0 
c1: A1, ok1: true, i:1 
c1: A2, ok1: true, i:2 
timeout, i:3 
finished.

注意并没有“all channel closed”的输出,因为channel并没有进行关闭。

目前看来一切都挺正常,但这里藏着一个巨大的陷阱。现在我们将producerB()添加回来,注意它执行了close(c):

func producerA(c chan string) {
    for i := 0; i < 3; i++ {
        time.Sleep(time.Second)
        c <- "A" + strconv.Itoa(i)
    }
}

func producerB(c chan string) {
    c <- "B"
    close(c)
}

func getValue() {
    c1, c2 := make(chan string), make(chan string)
    go producerA(c1)
    go producerB(c2)

    ok1, ok2 := true, true
    msg := ""
    defer fmt.Println("finished.")

    i := 0

    for {
        select {
        case msg, ok1 = <-c1:
            fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i)
        case msg, ok2 = <-c2:
            if ok2 {
                fmt.Printf("c2: %v, ok2: %v, i:%v \n", msg, ok2, i)
            }
        case <-time.After(5 * time.Second):
            fmt.Printf("timeout, i:%v \n", i)
            return            
        }

        if !ok1 && !ok2 {
            fmt.Println("all channel closed")
            break
        }
        i++
    }
}

如果执行上面的代码,产生的输出大致是这样的:

c2: B, ok2: true, i:0 
c1: A0, ok1: true, i:1491475 
c1: A1, ok1: true, i:2859182 
c1: A2, ok1: true, i:4703505

如果你观察一下CPU,会发现现在是100%。因为上面的代码又陷入了死循环,因为c2已经关闭了,所以 case msg, ok2 = <-c2: 立即返回了 空字符串 和 false。导致 case <-time.After(5 * time.Second): 失效了,永远也达不到超时的时间。造成程序反复执行这段代码:

case msg, ok2 = <-c2:
    if ok2 {
        fmt.Printf("c2: %v, ok2: %v, i:%v \n", msg, ok2, i)
    }

为了解决这个问题,依然要依赖于计时器和超时退出策略。注意这里的超时退出无法定义在default中,因为c2已经关闭了,会不断地返回零值,根本不会执行到default。此时可以将策略写在 case msg, ok2 = &-c2: 子句中,代码如下所示:

func getValue() {
    c1, c2 := make(chan string), make(chan string)
    go producerA(c1)
    go producerB(c2)

    ok1, ok2 := true, true
    msg := ""
    defer fmt.Println("finished.")

    i := 0
    j := 0 // 只为c2做的定时器

    for {
        select {
        case msg, ok1 = <-c1:
            if ok1 {
                j = 0 // 重置j定时器
                fmt.Printf("c1: %v, ok1: %v, i:%v \n", msg, ok1, i)
            }
        case msg, ok2 = <-c2:
            if ok2 {
                fmt.Printf("c2: %v, ok2: %v, i:%v \n", msg, ok2, i)
            } else {
                time.Sleep(time.Millisecond)
                // 如果10秒钟c1没有消息,则退出
                if j > 10000 {
                    fmt.Print("c2 return.")
                    return
                }
            }
        case <-time.After(5 * time.Second):
            fmt.Printf("timeout, i:%v \n", i)
            return
        }

        if !ok1 && !ok2 {
            fmt.Println("all channel closed")
            break
        }
        i++
        j++
    }
}

上面的代码会在c1 10秒钟收不到消息后退出。然而,上面的代码不具通用性,如果有更多的channel,代码将会变得愈发复杂。这里我一时半会儿也没有好的解决方案。可以看到,当两个channel都不调用close,或者都调用close,都是比较容易解决的。前者通过default解决,后者通过case语句获得管道状态解决。但是当一个管道关闭,一个管道不关闭时情况就变得难以处理。

因此,当使用自己创建的管道时,在发送完数据后,总是关闭管道是比较好的编程事件。

总结

这篇文章简要介绍了Go语言在使用select关键字处理Channel时可能遇到的问题,这里面有很多隐藏的陷阱,稍不留意就可能写出无响应的程序或者无限循环的程序。

感谢阅读,希望这篇文章能给你带来帮助!