Go语言通过编译器运行时(runtime),从语言上支持了并发的特性,Go语言的并发是通过 goroutine 的特性来完成的,goroutine 类似于线程,但是线程是有操作系统的调度来完成的,Go语言可以根据任务需要创建多个 goroutine 并发的工作,goroutine 是由Go语言运行时进行调度的。
Go语言提供了 channel 在多个 goroutine 之间进行通信,goroutine 和 channel 是Go语言秉承了CSP(Communicating Sequential Process,顺序通信进程)并发模式的重要实现基础。

goroutine

在编写 Socket 网络程序时,需要提前准备一个线程池为每一个 Socket 的收发包分配一个线程。开发人员需要在线程数量和 CPU 数量间建立一个对应关系,以保证每个任务能及时地被分配到 CPU 上进行处理,同时避免多个任务频繁地在线程间切换执行而损失效率。

虽然,线程池为逻辑编写者提供了线程分配的抽象机制。但是,如果面对随时随地可能发生的并发和线程处理需求,线程池就不是非常直观和方便了。能否有一种机制:使用者分配足够多的任务,系统能自动帮助使用者把任务分配到 CPU 上,让这些任务尽量并发运作。这种机制在 Go 语言中被称为 goroutine。

goroutine 的概念类似于线程,但 goroutine 由 Go 程序运行时的调度和管理。Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。

使用普通函数创建 goroutine

Go语言中使用 go 这个关键字为一个函数创建一个 goroutine,一个函数可以被创建多个goroutine,但是一个goroutine必定对应一个函数。
为普通函数创建goroutine的格式如下:

1
go 函数名(参数列表)
  • 函数名:要调用的函数名
  • 参数列表:调用函数需要传入的参数

使用 go 关键字创建 goroutine 时,被调用函数的返回值会被忽略。 如果需要在 goroutine 中返回数据,请使用后面介绍的通道(channel)特性,通过通道把数据从 goroutine 中作为返回值传出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"time"
)

func running() {
var times int

for {
times++
fmt.Println("Tick:", time)

time.Sleep(time.Second)
}
}

func main() {
go running()

var input string
fmt.Scanln(&input)
}

代码执行后,命令行会不断地输出 tick,同时可以使用 fmt.Scanln() 接受用户输入。两个环节可以同时进行,这段代码的执行顺序如下图所示。

go并发运行图

这个例子中,Go 程序在启动时,运行时(runtime)会默认为 main() 函数创建一个 goroutine。在 main() 函数的 goroutine 中执行到 go running 语句时,归属于 running() 函数的 goroutine 被创建,running() 函数开始在自己的 goroutine 中执行。此时,main() 继续执行,两个 goroutine 通过 Go 程序的调度机制同时运作。

使用匿名函数创建 goroutine

go关键字后面可以直接使用匿名函数或者闭包来创建goroutine。创建格式如下:

1
2
3
go func(参数列表) {
函数体
} (调用参数列表)

其中:

  • 参数列表:函数体内的参数变量列表
  • 函数体:匿名函数的代码
  • 调用函数参数列表:启动goroutine时,需要向匿名函数传递的调用参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main 

import (
"fmt"
"time"
)

func main() {
go func() {
var times int

for {
times++
fmt.Println("Tick:", times)

time.Sleep(time.Second)
}
}
}()

var input string
fmt.Println(&input)

注意

所有的 goroutine 会在 main() 函数结束的时候一同结束 ,这一点非常重要,这就为什么有时候写的代码只执行了一次就停止的原因,要想并发任务一直进行下去,main()一定要保持运行状态(个人猜测:goroutine实际上是一个协程coroutine,这就意味着程序运行的时候只会产生一个主线程,而主线程应该对应的是main()函数,其它goroutine只是主线程的一个协程而已,所以主线程结束之后,协程都会相应的结束)。

GOMAXPROCS

在Go程序运行的时(runtime)实现了一个小型的任务调度器,这个小型的调度器的工作原理类似于操作系统的调度线程,Go程序可以高效的将CPU资源分配给每一个任务,传统逻辑中,开发者需要维护线程池中线程与 CPU 核心数量的对应关系。同样的,Go 地中也可以通过 runtime.GOMAXPROCS() 函数做到,格式为:

1
runtime.GOMAXPROCS(逻辑CPU数量)

这里的逻辑CPU数量可以有如下几种数值:

  • <1:不修改任何数值。
  • =1:单核心执行。
  • >1:多核并发执行。

一般情况下,可以使用runtime.NumCPU()查询CPU数量,并使用runtime.GOMAXPROCS()函数进行设置,比如:

1
runtime.GOMAXPROCS(runtime.NumCPU())

在1.5版本之前,默认使用单核心进行执行,1.5版本之后默认执行上面的语句进行并发执行,最大效率使用CPU。

并发和并行

并发(concurrency):把任务在不同的时间点交给处理器进行处理,在同一时间任务并不会同时运行。

并行(parallelism):把每个任务分配给每一个处理器独立完成,在同一时间点,任务一定是同时运行。

Go语言在GOMAXPROCS数量和任务数量相等的时候,可以做到并行执行,但一般情况下都是并发执行。对并行感兴趣的童鞋可以关注一下FPGA。

goroutine和coroutine

C#、Luau、Python语言都支持coroutine的特性,coroutine和goroutine在名字和运行上有点类似,都可以将函数或者语句在独立的环境中运行,但是两者还是有区别的。

  1. goroutine可能发生并行执行
  2. coroutine始终顺序执行

goroutine可能发生在多线程的环境下,goroutine无法控制自己获取高优先级;coroutine始终发生在单线程,coroutine程序需要主动交出控制权,宿主才能获取控制权交给其它的coroutine。
goroutine之间的通信使用channel,而coroutine则使用yield和resume操作。

goroutine和coroutine的概念和运行机制都是脱胎于早期的操作系统,coroutine的运行机制属于协作式任务处理,早期的操作系统要求每个应用必须遵循操作系统的任务处理规则,应用程序不需要使用CPU的时候,应该主动交出CPU的使用权。如果开发者无意或者有意的使程序长时间占用CPU,操作系统也无能为力,从而很容易使计算机失去响应或者死机。

goroutine属于抢占式任务处理,和现有的多线程和多进程任务处理相似,应用程序对CPU的最终控制权还是由操作系统来管理,操作系统如果发现一个应用程序长时间大量占用CPU,那么用户可以直接终止该任务(比如打开任务管理器,关闭任务)。

通道 channel

单纯的实现函数并发是没有意义的,函数和函数之间需要数据交换才有意义,虽然可以使用共享内存来实现数据的交换,但是共享内存在不同的goroutine之间发生竞争问题,为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,从而导致性能问题。

Go语言采用通道通信的方式来进行数据的交换,如图所示:

goroutine和channel的通信

在地铁站、食堂、洗手间等公共场所人很多的情况下,大家养成了排队的习惯,目的也是避免拥挤、插队导致的低效的资源使用和交换过程。代码与数据也是如此,多个 goroutine 为了争抢数据,势必造成执行的低效率,使用队列的方式是最高效的,channel 就是一种和队列一样的结构。

通道的特性

Go语言中的通道(chan)是一种特殊的类型,在任何时候只能有一个goroutine访问通道进行数据的发送和获取。

通道的声明格式

1
var 通道变量 chan 通道类型
  • 通道变量:保存通道的变量
  • 通道类型:通道内进行交换的数据类型

注意:chan类型的空值是nil,声明后需要配合 make 函数才能使用。

创建通道

通道是引用类型,需要使用make函数进行创建后才能使用,格式如下:

1
通道实例 := make(chan 数据类型)

实际例子:

1
2
3
4
5
6
7
8
9
channel1 := make(chan int)
channel2 := make(chan interface)

type Equip struct {
/* 一些字段 */
}

/* 创建Equip指针了类型的通道,可以存放*Equip */
channel3 := make(chan *Equip)

使用通道进行数据发送

通道创建之后,就可以使用创建的通道实例进行数据的发送和接收操作。

通道发送数据的格式

通道发送使用操作符 <- 具体的格式为:

1
Channel <- value
  • Channel:通过make创建好的通道实例
  • value:可以是变量、常量、表达式或者函数返回值等。但是值得类型必须和通道类型相同。

例子:

1
2
3
4
channel := make(chan interface{})
channel <- 10
channel <- "hello"
channel <- 3.14

把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时能智能地发现一些永远无法发送成功的语句并做出提示。

使用通道接收数据

通道接收数据的操作符同样也是 <- ,通道接收有如下特性:

  1. 通道的收发操作可以在不同的goroutine之间,毕竟通道的作用就是现实不同goroutine之间通信。
  2. 接收方将持续阻塞直到发送方发送数据
  3. 通道每次只能接收一个数据

阻塞接收数据

阻塞接收数据的时候,将接收量作为通道操作符 <- 的左值,格式如下:

1
2
接收数据的变量 := <-通道变量
data := <-channel

上面的语句执行时将会阻塞,直到接收到数据并赋值给 data 变量。

非阻塞接收数据

使用非阻塞方式从通道接收数据时,语句不会发生阻塞,格式如下:

1
data, ok := <-channel
  • data:表示接收到的数据。未接收到数据时,data 为通道类型的零值。
  • ok:表示是否接收到数据。

非阻塞的通道接收方法可能造成高的 CPU 占用,因此使用非常少。如果需要实现接收超时检测,可以配合 select 和计时器 channel 进行,可以参见后面的内容。

接受任意数据并忽略

阻塞接收数据之后,忽略从通道返回的数据,格式如下:

1
<-channel

执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步

一个采用该方法实现并发同步的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import "fmt"

func main() {
channel := make(chan int)

go func() {
fmt.Println("start a anonymous-function goroutine.")
//通过channel通道通知main的goroutine
channel <- 0
fmt.Println("exit anonymous-function goroutine.")
}()

fmt.Println("start main goroutine.")
fmt.Println("wait anonymous-function goroutine.")

//等待匿名函数的goroutine
<- ch

fmt.Println("all done")
}

代码输出:

1
2
3
4
5
start main goroutine.
wait anonymous-function goroutine.
start a anonymous-function goroutine.
exit anonymous-function goroutine.
all done

循环接收

通道的数据接收可以使用 for...range 语句进行多个元素的接收操作,格式如下:

1
2
3
for data := range channel {
/* 相关数据操作 */
}

通道 channel 是可以进行遍历的,遍历的结果就是接收到的数据。数据类型就是通道的数据类型。通过for遍历获得的变量只有一个,即上面例子中的 data。

遍历通道数据的例子请参考下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"time"
)

func main() {
channel := make(chan int)

go func() {
for i := 3; i >= 0; i-- {
channel <- i
time.Sleep(time.Second)
}
}()


for data := range channel {
fmt.Println(data)
if data == 0 {
break
}
}
}

代码输出如下:

1
2
3
4
3
2
1
0