按需启动任意多个goroutine的方法,通过通道在不同goroutine之间进行通信

按需启动任意多个goroutine的方法,通过通道在不同goroutine之间进行通信

本文学习目标

  1. 学会启动goroutine
  2. 学会使用通道进行通信
  3. 理解通道流水线

假设现在有一个地鼠工厂,里面绝大多数地鼠都在忙着干活,当然也有少数地鼠在角落偷偷睡懒觉。工厂里面有一只位高权重的地鼠,她负责向其他地鼠发号施令。地鼠们会为了完成她分派的任务而四处奔波并且相互协作,最后将自己的工作成果汇报给她。有些地鼠会将东西传递到工厂外面,而另一些地鼠则会接收来自工厂外面的东西。

到目前为止,我们编写过的所有Go程序就像这间工厂里面的单只地鼠一样,只会埋头苦干而从不打扰其他地鼠。但实际上真正的Go程序更像一个完整的工厂,里面包含许多独立运行的任务,例如从Web服务器中获取数据、计算精确到百万分位的圆周率数字以及控制机械臂等,而这些并发任务之间则通过相互通信来达成共同的目的。

在Go中,独立运行的任务被称为goroutine。在本章中,我们将会学习按需启动任意多个goroutine的方法,并通过通道在不同goroutine之间进行通信。虽然goroutine跟其他语言中的协程、纤程、进程和线程都有相似的地方,但goroutine跟它们并不完全相同。goroutine的创建效率非常高,并且Go也能够直截了当地协同多个并发操作。

请考虑这一点

假设你正在编写一个需要执行一系列动作的程序,其中每个动作都需要耗费很多时间,并且在动作执行期间可能还需要等待某些事情发生。虽然我们可以使用直观的顺序式代码来编写这个程序,但是当我们想要同时执行程序中的两个或多个动作的时候,我们又该怎么办呢?

例如,你可能会让程序的一部分遍历一个电子邮件地址列表,并向其中的每个地址都发送一封电子邮件,至于程序的另一部分则负责等待传入的电子邮件并将其存储至数据库。如果是这样,你会如何编写这个程序?

在某些语言中,将顺序式代码转换成并发式代码通常需要做大量修改。但是在使用Go语言的时候,你可以在每个独立的任务中继续使用相同的顺序式代码,然后通过goroutine以并发方式运行任意数量的任务。

30.1 启动goroutine
启动goroutine就像调用函数一样简单,你唯一要做的就是在调用前面写下一个关键字go。

代码清单30-1模拟了之前提到的在工厂角落里打瞌睡的地鼠。这个程序的行为非常简单,但是你也可以把里面的Sleep语句看作是某种需要大量计算的操作。因为当main函数返回的时候,该程度运行的所有goroutine都会立即停止,所以main函数必须等待足够长的时间以便打瞌睡的地鼠可以打印出它的“…snore…”消息。为了保证goroutine能够顺利执行,main函数的等待时间将比实际所需的更长一些。

 

代码清单30-1 打瞌睡的地鼠:sleepygopher.go

package main
import (
    "fmt"
    "time"
)
func main() {
    go sleepyGopher()  ←--- 启动goroutine
    time.Sleep(4 * time.Second)  ←--- 等待地鼠从瞌睡中苏醒
}  ←--- 所有goroutine将在程序运行至此时停止
func sleepyGopher() {
    time.Sleep(3 * time.Second)  ←--- 地鼠睡着了
    fmt.Println("... snore ...")
}

每次使用关键字go都会产生一个新的goroutine。从表面上来看,所有goroutine似乎都在同时运行,但由于计算机通常只具有有限数量的处理单元,因此从技术上说,这些goroutine并不是真的在同时运行。

实际上,计算机的处理器通常会使用一种名为分时的技术,在多个goroutine上面轮流花费一些时间。因为分时的具体实施细节通常只有Go运行时、操作系统和使用的处理器会知道,所以我们在使用goroutine的时候,应该假设不同goroutine中的各项操作将以任意顺序执行。

作为例子,代码清单30-2中的main函数将启动5个sleepyGopher goroutine,并让它们都在休眠3秒之后打印出相同的输出。

代码清单30-2 5只打瞌睡的地鼠:sleepygophers.go

package main

import (
    "fmt"
    "time"
)

func main() {
    for i := 0; i < 5; i++ {
        go sleepyGopher()
    }
    time.Sleep(4 * time.Second)
}

func sleepyGopher() {
    time.Sleep(3 * time.Second)
    fmt.Println("... snore ...")
}

为了找出最先从瞌睡中苏醒的地鼠,我们将向每个goroutine传递一个实参。向goroutine传递实参就跟向函数传递实参一样,都会导致传入的值被复制并以形参的方式传递。

如果你运行下面的代码清单30-3,那么就会发现尽管我们都是按顺序一个接一个地启动程序中的goroutine,但它们结束的顺序是各不相同的。如果你在Go Playground之外的地方执行这个程序,那么它每次都将以不同的顺序输出各个goroutine的打印结果。

代码清单30-3 给地鼠做标记:identifiedgophers.go

func main() {
    for i := 0; i < 5; i++ {
        go sleepyGopher(i)
    }
    time.Sleep(4 * time.Second)
}

func sleepyGopher(id int) {
    time.Sleep(3 * time.Second)
    fmt.Println("... ", id, " snore ...")
}

这段代码有一个问题,它明明只需要等待超过3秒即可,但是它现在却等待了4秒之久。更重要的是,如果goroutine除休眠之外还需要做其他事情,那么我们将无法得知它们需要运行多长时间才能结束。为此,我们需要通过一些手段来让代码知悉所有goroutine将在何时结束。幸运的是,Go的通道正好能够实现这一目的。

30.3 通道

通道(channel)可以在多个goroutine之间安全地传递值,它就像老式办公室中传递邮件用的气动管道系统:你只需把对象放到管道里面,它就会飞快地出现在管道的另一端,然后其他人就可以取走这个对象了。

跟Go中的其他类型一样,你可以将通道用作变量、传递至函数、存储在结构中,或者做你想让它做的几乎任何事情。

跟创建映射或切片时的情况一样,创建通道需要用到内置的make函数,并且你还需要在创建时为其指定相应的类型。例如,以下这个通道就只能发送和接收整数值

c := make(chan int)

在有了通道之后,我们就可以通过左箭头操作符(<-)向它发送值或者从它那里接收值了。

在向通道发送值的时候,我们需要将通道表达式放在左箭头操作符的左边,而待发送的值则放在左箭头操作符的右边,就好像通过箭头将值流入通道里面一样。发送操作会等待直到有另一个goroutine尝试对相同的通道执行接收操作为止。执行发送操作的goroutine在等待期间将无法执行其他操作,但是其他未在等待通道操作的goroutine仍然可以继续自由地运行。作为例子,以下代码演示了怎样将值99发送至通道c:

c <- 99

在通过通道接收值的时候,我们需要将左箭头操作符放在通道的左边,让箭头指向通道之外的地方。下面的代码从通道c中接收了一个值,并将它赋值给变量r

r := <-c

跟执行发送操作时一样,执行接收操作的goroutine将等待直到有另一个goroutine尝试向相同的通道执行发送操作为止。

注意 虽然在单个代码行上执行通道接收操作的做法非常常见,但这并不是必需的。通道接收操作就跟其他表达式一样,可以应用在任何能够使用表达式的地方。

 

代码清单30-4中的main函数创建了一个通道,并将其传递给了5个打瞌睡的地鼠goroutine。每个goroutine都会休眠一段时间,然后向通道发送一个值来表明自己的身份,而main函数则会等待这5个goroutine发回的消息。这一机制可以确保当main函数执行至末尾的时候,所有goroutine都已经结束了休眠,而main函数则能够在不打扰任何地鼠美梦的情况下返回。举一个现实点的例子,如果现在有一个程序,它需要将某些复杂的数学运算结果存储到在线存储器里面,那么当它在同时保存多个结果的时候,我们肯定不希望程序在所有结果都被成功存储之前就草草退出。

代码清单30-4 使用通道引导打瞌睡的地鼠:simplechan.go

func main() {
    c := make(chan int)  ←--- 创建出用于通信的通道
    for i := 0; i < 5; i++ {
        go sleepyGopher(i, c)
    }
    for i := 0; i < 5; i++ {
        gopherID := <-c  ←--- 从通道中接收值
        fmt.Println("gopher ", gopherID, " has finished sleeping")
    }
}

func sleepyGopher(id int, c chan int) {  ←--- 将通道声明为实参
    time.Sleep(3 * time.Second)
    fmt.Println("... ", id, " snore ...")
    c <- id  ←--- 将值回传至main函数
}

30.4 使用select处理多个通道

在前面的例子中,我们使用了单个通道来等待多个goroutine。这种做法在所有goroutine都产生相同类型的值时相当好用,但情况并不总是如此。在实际中,程序通常需要等待两种或者多种不同类型的值。

这种情况的一个例子是,当我们在等待通道中的某些值时,可能并不愿意等得太久。例如,我们可能会对打瞌睡的地鼠感到不耐烦,并在等候一段时间之后选择放弃。或者我们想要在网络请求发生数秒之后将其判断为超时,而不是白白地等候好几分钟。

值得一提的是,Go标准库提供了一个非常棒的函数time.After来帮助我们实现这一目的。这个函数会返回一个通道,该通道会在经过特定时间之后接收到一个值(发送该值的goroutine是Go运行时的其中一部分)。

如果程序打算继续从打瞌睡的地鼠goroutine那里接收值,那么它必须等待直到所有goroutine都结束休眠或者我们的耐心耗尽为止。这意味着程序必须同时等待计时器通道和其他通道,而select语句正好能够做到这一点。

select语句跟我们前面在第3章看到过的switch语句有点儿相似,该语句包含的每个case分支都持有一个针对通道的接收或发送操作。select会等待直到某个分支的操作就绪,然后执行该操作及其关联的分支语句,它就像是在同时监控两个通道,并在发现其中一个通道出现情况时采取行动。

代码清单30-5使用了time.After函数来创建超时通道,并使用了select语句来同时等待打瞌睡的地鼠通道和超时通道。

代码清单30-5 不耐烦地等待打瞌睡的地鼠:select1.go

timeout := time.After(2 * time.Second)
for i := 0; i < 5; i++ {
    select {  ←--- select语句
    case gopherID := <-c:  ←--- 等待地鼠醒来
        fmt.Println("gopher ", gopherID, " has finished sleeping")
    case <-timeout:  ←--- 等待直到时间耗尽
        fmt.Println("my patience ran out")
        return  ←--- 放弃等待然后返回
    }
}

提示 select语句在不包含任何分支的情况下将永远地等待下去。当你启动多个goroutine并且打算让它们无限期地运行下去的时候,就可以用这个方法来阻止main函数返回。

 

 

因为所有地鼠goroutine都会正好休眠3秒,而我们的耐心总会在所有地鼠都醒来之后才耗尽,所以这个程序初看上去并不是特别有趣。但如果我们像下面的代码清单30-6那样,让各个地鼠goroutine随机地休眠一段时间,那么当你运行这个程序的时候,就会发现有些地鼠能够及时醒来,而有些则不能。

代码清单30-6 随机打瞌睡的地鼠:select2.go

func sleepyGopher(id int, c chan int) {
    time.Sleep(time.Duration(rand.Intn(4000)) * time.Millisecond)
    c <- id
}

提示 这个模式适用于任何想要控制事件完成时间的场景。通过将动作放入goroutine并在动作完成时向通道执行发送操作,我们可以为Go中的任何动作都设置超时。

注意 即使程序已经停止等待goroutine,但只要main函数还没返回,仍在运行的goroutine就会继续占用内存。所以在情况允许的情况下,我们还是应该尽量结束无用的goroutine。

什么都不做的nil通道

因为创建通道需要显式地使用make函数,所以你可能会好奇,如果我们不使用make函数初始化通道变量的值,那么会发生什么?答案是,跟映射、切片和指针一样,通道的值也可以是nil,而这个值实际上也是它们默认的零值。

对值为nil的通道执行发送或接收操作并不会引发惊恐,但是会导致操作永久阻塞,就好像遇到了一个从来没有接收或者发送过任何值的通道一样。但如果你尝试对值为nil的通道执行稍后将要介绍的close函数,那么该函数将引发惊恐。

初看上去,值为nil的通道似乎没什么用处,但事实恰恰相反。例如,对于一个包含select语句的循环,如果我们不希望程序在每次循环的时候都等待select语句涉及的所有通道,那么可以先将某些通道设置为nil,等到待发送的值准备就绪之后,再为通道变量赋予一个非 nil 值并执行实际的发送操作。
到目前为止,一切都如我们意料中的那样。当 main函数对通道执行接收操作的时候,它将会找到地鼠goroutine向该通道发送的值。但如果程序在没有任何goroutine向通道发送值的情况下,意外地对通道执行了接收操作,那么会出现什么情况?如果它执行的不是接收操作而是发送操作呢?

 

30.5 阻塞和死锁

当goroutine在等待通道的发送或者接收操作的时候,我们就说它被阻塞了。听上去,这似乎跟我们写一个不做任何事情只会空转的无限循环一样,并且它们从表面上看也非常相似。但实际上,如果你在笔记本电脑的程序中运行类似的无限循环,那么过不了多久,你就会发现笔记本电脑由于忙着执行这个循环而变得越来越热,并且风扇也开始转得越来越快了。与此相反,除goroutine本身占用的少量内存之外,被阻塞的goroutine并不消耗任何资源。goroutine会静静地停在那里,等待导致它阻塞的事情发生,然后解除阻塞。

当一个或多个goroutine因为某些永远无法发生的事情而被阻塞时,我们称这种情况为死锁,而出现死锁的程序通常都会崩溃或者被挂起。引发死锁的代码甚至可以非常简单,就像这样:

func main() {
    c := make(chan int)
    <-c 
}

在大型程序中,死锁可能会涉及多个goroutine之间一系列错综复杂的依赖关系。

虽然死锁在理论上很难杜绝,但通过遵守稍后介绍的一些简单规则,在实际中创建出不会死锁的程序并不困难。即使你真的发现了死锁,Go也可以向你展示所有goroutine的状态,因此找出症结解决问题通常并不是一件难事。

30.6 地鼠装配线

到目前为止,我们只看到了一些昏昏欲睡的地鼠,它们的所作所为就是打个瞌睡,然后醒来向通道发送一个值。但事实上并非整个工厂的地鼠都是如此,例如,装配线上的地鼠就在兢兢业业地工作。它们会从装配线中较为前端的地鼠那里接收到物品,并对该物品做一些处理,然后把物品传递给装配线上的下一只地鼠。尽管装配线上的每只地鼠完成的工作都很简单,但整条装配线最终产生的结果可能是相当复杂的。

这种名为流水线的技术能够有效地处理庞大的数据流,而无须占用大量内存。尽管每个goroutine每次只能持有单个值,但随着时间推移,它们将能够处理数以百万计的值。除此之外,你也可以把流水线看作是一种“思维工具”,它可以帮助你更容易地解决某类问题。

万事俱备,我们现在已经具有了将多个goroutine组装为流水线所需的全部工具。在这个流水线中,Go值将沿着流水线向下流动,从一个goroutine传递至下一个goroutine。流水线上的工人将不断地从它们的上游邻居那里接收值,并在对值执行某些操作之后,将其结果发送至下游。

接下来我们将构建一条处理字符串值的工人装配线。代码清单30-7展示了位于装配线起始端的地鼠,它们是流的源头,这些地鼠只会发送值而不会读取任何值。其他程序的流水线起始端通常会从文件、数据库或者网络中读取数据,但我们的地鼠程序只会发送几个任意的值。为了在所有值均已发送完成时通知下游地鼠,程序使用了空字符串作为哨兵值,并将其用于标识发送已经完成。
代码清单30-7 源头地鼠:pipeline1.go

func sourceGopher(downstream chan string) {
    for _, v := range []string{"hello world", "a bad apple", "goodbye all"}
 {
        downstream <- v
    }
    downstream <- ""
}

代码清单30-8中的地鼠会筛选出装配线上所有不好的东西。具体来说,这个函数会从上游通道中读取值,并在字符串值不为”bad”的情况下将其发送至下游通道。当函数见到结尾的空字符串时,它就会停止筛选工作,并确保将空字符串也发送给下游的地鼠。

代码清单30-8 过滤地鼠:pipeline1.go

func filterGopher(upstream, downstream chan string) {
    for {
        item := <-upstream
        if item == "" {
            downstream <- ""
            return
        }
        if !strings.Contains(item, "bad") {
            downstream <- item
        }
    }
}

位于装配线最末端的是打印地鼠,这只地鼠没有任何下游,代码清单30-9展示了它的定义。在其他程序中,位于流水线末端的函数通常会将结果存储到文件或者数据库里面,或者将这些结果的摘要打印出来,代码清单30-9的打印地鼠将打印出它看到的所有值。

代码清单30-9 打印地鼠:pipeline1.go

func printGopher(upstream chan string) {
    for {
        v := <-upstream
        if v == "" {
            return
        }
        fmt.Println(v)
    }
}

一切准备就绪,现在我们可以将所有地鼠程序组装起来了。整条流水线共分为源头、过滤和打印这3个阶段,但是只用到了两个通道。因为我们希望可以在整条流水线都被处理完成之后再退出程序,所以我们没有为最后一只地鼠创建新的goroutine。当printGopher函数返回的时候,我们可以确认其他两个goroutine已经完成了它们的工作,而printGopher也可以顺利地返回至main函数,然后完成整个程序。代码清单30-10和图30-2展示了这一过程。

代码清单30-10 组装:pipeline1.go

func main() {
    c0 := make(chan string)
    c1 := make(chan string)
    go sourceGopher(c0)
    go filterGopher(c0, c1)
    printGopher(c1)
}

图30-2 地鼠流水线

目前实现的这个流水线程序虽然可以正常运作,但它有一个问题:程序使用了空字符串来表示所有值均已发送完毕,但是当它需要像处理其他值一样处理空字符串的时候,该怎么办?为此,我们可以使用结构值来代替单纯的字符串值,在结构里面分别包含一个字符串和一个布尔值,并使用布尔值来表示当前字符串是否是最后一个值。

但事实上还有更好的办法。Go允许我们在没有值可供发送的情况下通过close函数关闭通道,就像这样:

close(c)

通道被关闭之后将无法写入任何值,如果尝试写入值将会引发惊恐。尝试读取已被关闭的通道将会获得一个与通道类型对应的零值,而这个零值就可以代替上述程序中的空字符串。

注意 当心!如果你在循环里面读取一个已关闭的通道,并且没有检查该通道是否已经关闭,那么这个循环将一直运转下去,并耗费大量的处理器时间。为了避免这种情况发生,请务必对那些可能会被关闭的通道做相应的检查

执行以下代码可以获悉通道是否已经被关闭:

v, ok := <-c

通过将接收操作的执行结果赋值给两个变量,我们可以根据第二个变量的值来判断此次通道读取操作是否成功。如果该变量的值为false,那么说明通道已被关闭。

在了解了通道的这一特性之后,关闭整条流水线将变得更加容易。代码清单30-11展示了应用这一特性之后的源头地鼠goroutine。

代码清单30-11 修改后的源头地鼠:pipeline2.go

func sourceGopher(downstream chan string) {
    for _, v := range []string{"hello world", "a bad apple", "goodbye all"}
{
        downstream <- v
    }
    close(downstream)
}

代码清单30-12展示了修改之后的过滤地鼠goroutine。

代码清单30-12 修改后的过滤地鼠:pipeline2.go

func filterGopher(upstream, downstream chan string) {
    for {
        item, ok := <-upstream
        if !ok {
            close(downstream)
            return
        }
        if !strings.Contains(item, "bad") {
            downstream <- item
        }
    }
}

因为“从通道里面读取值,直到它被关闭为止”这种模式实在是太常用了,所以Go为此提供了一种快捷方式。通过在range语句里面使用通道,程序可以在通道被关闭之前,一直从通道里面读取值。

这也意味着我们可以通过range循环以更简单的方式重写过滤地鼠的代码。代码清单30-13展示了重写之后的代码,它的行为跟之前展示的过滤地鼠代码一模一样。

代码清单30-13 使用range实现的过滤地鼠代码:pipeline2.go

func filterGopher(upstream, downstream chan string) {
    for item := range upstream {
        if !strings.Contains(item, "bad") {
            downstream <- item
        }
    }
    close(downstream)
}

正如下面的代码清单30-14所示,跟过滤地鼠一样,我们也可以使用range语句重写打印地鼠的代码,使得这只位于装配线末端的地鼠可以读取通道中的所有消息并且一个接一个地打印它们。

代码清单30-14 使用range实现的打印地鼠代码:pipeline2.go

func printGopher(upstream chan string) {
    for v := range upstream {
        fmt.Println(v)
    }
}

 

未经允许不得转载:微信 美文-微信文章库-我的知识库 » 按需启动任意多个goroutine的方法,通过通道在不同goroutine之间进行通信

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

我的知识库