单向通道

单向通道声明格式

只能发送的单向通道类型为 chan<-,只能接收的单向通道为 <-chan 格式如下:

1
2
3
4
5
/* 只能发送 */
var 通道实例 chan<- 元素类型

/* 只能发送 */
var 通道实例 <-chan 元素类型

单向通道实例

1
2
3
4
5
6
7
8
/* 创建一个通道 */
channel := make(chan int)

/* 声明一个只能发送的通道类型,并赋值为channel */
var channelSendOnly chan<- int = channel

/* 声明一个只能接收的通道类型,并赋值为channel */
var channelReceiveOnly <-chan int = channel

带缓冲的通道

在无缓冲的通道上,为通道增加一个有限大小的存储空间形成一个带缓冲的通道,带缓冲的通道在发送数据的时候无需阻塞等待接收方接收即可完成数据的发送,从而不会发生阻塞,直到通道中没有数据可读的时候,通道会再次进行阻塞。

无缓冲通道保证收发过程同步。无缓冲收发过程类似于快递员给你电话让你下楼取快递,整个递交快递的过程是同步发生的,你和快递员不见不散。但这样做快递员就必须等待所有人下楼完成操作后才能完成所有投递工作。如果快递员将快递放入快递柜中,并通知用户来取,快递员和用户就成了异步收发过程,效率可以有明显的提升。带缓冲的通道就是这样的一个“快递柜”。

创建带缓冲的通道

1
通道实例 := make(chan 通道类型, 缓冲大小)

其中,缓冲大小决定了最多可以保存多少个元素数量

带缓冲通道实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main 

import "fmt"

func main() {
/* 创建一个缓冲大小为3的通道 */
channel := make(chan int, 3)

/* 打印通道的大小 */
fmt.Println(len(channel))

channel <- 1
channel <- 2
channel <- 3

fmt.Println(len(channel))
}

代码输出:

1
2
0
3

缓冲通道的阻塞条件

带缓冲的通道和无缓冲的通道在很多方面是类似的,无缓冲的通道可以看做是长度为0的带缓冲的通道,因此,根据这个特征,带缓冲的通道在下面列举的情况下依然会发生阻塞。

  • 带缓冲的通道被填满,尝试再次发送数据的时候会发生阻塞(这就好比你有三个空杯子,但是当三个空杯子都被装满水之后,你需要先将被子的水倒掉之后才能使用)
  • 带缓冲的通道为空的时候,尝试接收数据会发生阻塞

为什么Go语言对通道要限制长度而不提供无限长度的通道?
我们知道通道(channel)是在两个 goroutine 间通信的桥梁。使用 goroutine 的代码必然有一方提供数据,一方消费数据。当提供数据一方的数据供给速度大于消费方的数据处理速度时,如果通道不限制长度,那么内存将不断膨胀直到应用崩溃。因此,限制通道的长度有利于约束数据提供方的供给速度,供给数据量必须在消费方处理量+通道长度的范围内,才能正常地处理数据。

通道的多路复用

多路复用是通信和网络中在一个信道上传输多路信号或者数据流的过程和技术。多路复用分为:时分复用和频分复用。

电话可以在说话的同时听到对方说话,所以电话是一种多路复用的设备,一条通信线路上可以同时接收或者发送数据。同样的,网线、光纤也都是基于多路复用模式来设计的,网线、光纤不仅可支持同时收发数据,还支持多个人同时收发数据。

在使用通道时,想同时接收多个通道的数据是一件困难的事情。通道在接收数据时,如果没有数据可以接收将会发生阻塞。虽然可以使用如下模式进行遍历,但运行性能会非常差。

1
2
3
4
5
6
for {
data, ok := <-channel-1
data, ok := <-channel-2
...
data, ok := <-channel-n
}

Go语言中提供了 select 关键字,可以同时响应多个通道的操作,select的每个 case 都会对应一个通道的收发过程,当收发完毕的时候,就会触发一个 case 中响应的语句。多个操作在每次 select 中挑选一个进行响应,格式如下:

1
2
3
4
5
6
7
8
9
select {
case 操作1:
响应操作1
case 操作2:
响应操作2
...
default:
没有操作的情况
}

操作1,操作2:包含通道的收发语句,具体参考下表:

操作 语句示例
发送数据 case channel <- 100
接收数据 case data := <-channel
接收任意数据 case <-channel

select 模拟 RPC

服务器开发中会使用 RPC(Remote Procedure Call, 远程过程调用)
简化进程间的通信, RPC能有效的封装通信过程,让远程的数据收发通信过程看起来就像是本地的函数调用一样。

RPC调用流程

RPC相关文章:
1. 远程过程调用协议
2. RPC原理及RPC实例分析
3. RPC服务和HTTP服务对比
4. REST与RPC区别

下面将使用通道代替socket实现 RPC 的过程,客户端与服务器运行在同一个进程,服务器和客户端在两个 goroutine 中运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main 

import (
"errors"
"fmt"
"time"
)


/* 模拟RPC客户端的请求和接收消息封装 */
func RPCclient(channel chan string, reqmsg string) (string, error) {
channel <- reqmsg

// 等待服务器返回信息
select {
case ack := <-channel:
return ack, nil
case <-time.After(time.Second): // 超时1s
return "", errors.New("time out")
}
}

/* 模拟RPC服务器端接收客户端请求和回应 */
func RPCsever(channel chan string) {
for {
//接收客户端请求
data := <-channel

// 打印接收到的数据
fmt.Println("server received:", data)

// 反馈到客户端收到
channel <- "Roger that"
}
}

func mian() {
channel := make(chan string)

// 执行RPC服务器逻辑
go RPCserver(channel)

// 客户端请求数据和接收数据
recv, err := RPCclient(channel, "Hi bro")
if err != nil {
fmt.Println(err)
} else {
fmt.Println("client reveived", recv)
}

}

使用通道响应计时器事件

Go语言中的time包提供了计时器的封装,由于Go语言中的通道和goroutine的设计,定时任务可以在goroutine中通过同步的方式完成,也可以通过在goroutine中异步回调完成。

一段时间之后 time.After

延迟回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func main() {
exit := make(chan int)

fmt.Println("start")

// 过一秒之后,调用匿名函数
time.AfterFunc(time.Second, func () {
fmt.Println("One second after")

exit <- 0
})

<-exit

fmt.Println("all done")
}

调用 time.AfterFunc() 函数,传入等待的时间和一个回调。回调使用一个匿名函数,在时间到达后,匿名函数会在另外一个 goroutine 中被调用。

定点计时

计时器(Timer)的原理和倒计时闹钟一样,都是给定多长时间后触发。打点器(Ticker)的原理和钟表类似,钟表每到一个整点就会触发,这两种方法创建之后就会返回time.Ticker对象和time.Timer对象,里面有个C成员,型是只能接收的时间通道(<-chan Time),使用这个通道就可以获得时间触发的通知。

下面代码创建一个打点器,每 500 毫秒触发一起;创建一个计时器,2 秒后触发,只触发一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"fmt"
"time"
)

func main() {
// 创建打点器,每500毫秒触发一次
ticker := time.NewTicker(time.Millisecond * 500)

// 创建一个计时器, 2秒后触发
stopper := time.NewTimer(time.Second * 2)

// 声明计数变量
var i int

//不断的检查通道情况
for {
// select多路复用
select {
case <-stopper.C:
fmt.Println("stop")
goto StopHere
case <-ticker.C:
i++
fmt.Println("tick", i)
}
}

// goto标签
StopHere:
fmt.Println("done")
}

通道关闭之后继续使用通道

通道是一个引用对象,和 map 类似。map 在没有任何外部引用时,Go 程序在运行时(runtime)会自动对内存进行垃圾回收(Garbage Collection, GC)。类似的,通道也可以被垃圾回收,但是通道也可以被主动关闭。

通道关闭格式

1
close()

关闭的通道依然可以被访问,但是访问被关闭的通道会出发一些问题
给关闭的通道发送数据将会触发panic
被关闭的通道不会置为nil,如果尝试对已经关闭的通道进行发送,将会触发宕机,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import "fmt"

func main() {
channel := make(chan int)

// 关闭通道
close(channel)

/* 打印通道的指针,容量和长度 */
fmt.Println("ptr:%p cap:%d len:%d\n", channel, cap(channel), len(channel))

//给关闭的通道发送数据
channel <- 100
}

代码输出:

1
panic: send on closed channel

从已经关闭的通道中接收数据不会发生阻塞
从已经关闭的通道中接收数据或者正在接收数据时,将会接收到通道类型的领值,然后停止阻塞并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main 

import "fmt"

func main() {
channel := make(chan int, 2)

channel <- 1
channel <- 2

close(channel)

for i := 0; i < cap(channel) + 1; i++ {
value, ok := <-channel
fmt.Println(v, ok)
}
}

代码输出:

1
2
3
1 true
2 true
0 false

运行结果前两行正确输出带缓冲通道的数据,表明缓冲通道在关闭后依然可以访问内部的数据。
运行结果第三行的“0 false”表示通道在关闭状态下取出的值。0 表示这个通道的默认值,false 表示没有获取成功,因为此时通道已经空了。我们发现,在通道关闭后,即便通道没有数据,在获取时也不会发生阻塞,但此时取出数据会失败。

Telnet 回音服务器(TCP服务器的基本结构)

Telnet 协议是 TCP/IP 协议族中的一组,它允许用户(Telnet客户端)通过一个协商过程和一个远程设备进行通信。

服务器的网络库为了完整展示自己的代码实现了完整的收发过程,一般比较倾向于使用发送任意封包返回原数据的逻辑。这个过程类似于对着大山高喊,大山把你的声音原样返回的过程。也就是回音(Echo)。本节使用 Go 语言中的 Socket、goroutine 和通道编写一个简单的 Telnet 协议的回音服务器。

回音服务器的代码分为 4 个部分,分别是接受连接、会话处理、Telnet 命令处理和程序入口。

接收连接

回音服务器能够同时服务于多个连接,要接受连接就需要先创建侦听器,侦听器需要一个侦听地址和协议类型。就像你想卖东西,需要先确认卖什么东西,卖东西的类型就是协议类型,然后需要一个店面,店面位于街区的某一个位置,这就是侦听器的地址,一个服务器可以开启多个侦听器,就像一个街区可以有多个店面,街区上的编号就是地址上的端口号,如图所示:

IP和port端口号

  • 主机IP:一般为一个IP地址或者域名,127.0.0.1或者localhost表示本地主机
  • 端口号:16位无符号整型值,一共有 2^16 = 65536 个有效端口号。

通过地址和协议名创建侦听器后,可以使用侦听器响应客户端连接。响应连接是一个不断循环的过程,就像到银行办理业务时,一般是排队处理,前一个人办理完后,轮到下一个人办理。

我们把每个客户端连接处理业务的过程叫做会话(session)。在会话中处理的操作和接受连接的业务并不冲突可以同时进行。就像银行有 3 个窗口,喊号器会将用户分配到不同的柜台。这里的喊号器就是 Accept 操作,窗口的数量就是 CPU 的处理能力。因此,使用 goroutine 可以轻松实现会话处理和接受连接的并发执行。

Scoket 处理过程

Telnet服务器处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main 

import (
"fmt"
"net"
)

/* 服务逻辑,传入地址和退出通道 */
func server(address string, exitChannel chan int) {
/* 根据传入的地址进行监听 */
listen, err := net.Listen("tcp", address)

/* 如果监听发生错误,打印错误并退出 */
if err != nil {
fmt.Println(err.Error())
exitChannel <- 1
}

/* 打印监听地址,表示监听成功 */
fmt.Println("listen:" + address)

/* 延迟关闭侦听器 */
defer listen.Close()

/* 侦听循环 */
for {
/* 没有新的链接的时候,Accept是阻塞的 */
connect, err := listen.Accept()

/* 发生任何侦听错误,打印错误并退出循环 */
if err != nil {
fmt.Println(err.Error())
continue
}

/* 根据连接开启会话,这个过程需要并行执行 */
go handleSession(connect, exitChannel)
}

}

会话处理

每个链接会话处理就是一个连接数据循环,当没有数据的时候,调用reader.ReadString会发生阻塞,等待数据到来,一旦数据到来,就可以进行各种数据逻辑处理。

Echo回音服务器基本逻辑是“收到什么就返回什么”,render.ReadString可以一直读取Socket连接中的数据直到碰到期望的结尾符。这种期望的结尾符也可以称之为界定符,一般用于TCP封包中逻辑数据拆分开。下例中使用的定界符是回车换行符(“\r\n”),HTTP 协议也是使用同样的定界符。使用 reader.ReadString() 函数可以将封包简单地拆分开。

下图所示为 Telnet 数据处理过程

Telnet 数据处理过程

回音服务器需要将收到的有效数据通过 Socket 发送回去。
Telnet 会话处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main 

import (
"bufio"
"fmt"
"net"
"strings"
)

/* 连接的会话逻辑 */
func handleSession(connect net.Conn, exitChannel chan int) {
fmt.Println("Session started:")
/* 创建一个网络链接数据的读取器 */
reader := bufio.NewReader(connect)
/* */
for {
str, err := reader.ReadString('\n')
if err == nil {
str = string.TrimSpace(str)
/* 处理Telnet指令 */
if !processTelnetCommand(str, exitChannel) {
connect.Close()
break
}
// Echo逻辑, 发什么数据, 原样返回
conn.Write([]byte(str + "\r\n"))
} else {
// 发生错误
fmt.Println("Session closed")
conn.Close()
break
}
}
}

Telnet 命令处理

Telnet 是一种协议,在操作系统中可以在命令行使用 Telnet 命令发起TCP连接。我们一般用 Telnet 来连接 TCP 服务器,键盘输入一行字符回车后,即被发送到服务器上。
在下面的代码中,定义了以下两个特殊的指令,用以实现一些功能:

  • 输入 “@close” 退出当前连接会话
  • 输入 “@shutdown” 终止服务器运行

Telnet 命令处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"string"
)

func processTelnetCommand(str string, exitChannel chan int) bool {
//使用 @close 指令表示终止本次会话
if string.HasPrefix(str, "@close") {
fmt.Println("Session closed")
return false
} else if string.HasPrefix(str, "@shutdown") {
fmt.Println("Server shutdown")
exitChannel <- 0
return false
}
fmt.Println(str)
return true
}

程序入口

Telnet 回应处理主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main 

import "os"

func main() {
exitChannel := make(chan int)

/* 将服务器并发运行 */
go server("127.0.0.1:8080", exitChannel)

/* 通道阻塞,等待接收返回值 */
code := <-exitChannel

os.Exit(code)
}

竞态检测

Go程序可以使用多个通道进行多个 goroutine 间的数据交换,但这仅仅是数据同步中的一种方法。通道内部的实现依然使用了各种锁,因此,优雅代码的代价就是性能。在某些轻量级的场合,原子访问(atomic包)、互斥锁(sync.Mutex)以及等待组(sync.WaitGroup)能最大程度满足需求。
当多线程并发运行的程序竞争访问和修改同一块资源时,会发生竞态问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main 

import (
"fmt"
"sync/atomic"
)

// 序列号
var seq int64

//序列号生成器
func GenID() int64 {
// 尝试原子的增加序列号
atomic.AddInt64(&seq, 1)
return seq
}

func main() {
// 生成10个并发的序列号
for i := 0; i < 10; i++ {
go GenID()
}

fmt.Println(GenID())
}

使用原子操作函数 atomic.AddInt64() 对 seq() 函数加 1 操作。不过这里故意没有使用 atomic.AddInt64() 的返回值作为 GenID() 函数的返回值,因此会造成一个竞态问题。
在运行程序时,为运行参数加入-race参数,开启运行时(runtime)对竞态问题的分析,命令如下:

1
go run -race racedetect.go

代码运行发生宕机,根据报错信息,第 18 行有竞态问题,根据 atomic.AddInt64() 的参数声明,这个函数会将修改后的值以返回值方式传出。下面代码对加粗部分进行了修改:

1
2
3
4
func GenID() int64 {
// 尝试原子的增加序列号
return atomic.AddInt64(&seq, 1)
}

本例中只是对变量进行增减操作,虽然可以使用互斥锁(sync.Mutex)解决竞态问题,但是对性能消耗较大。在这种情况下,推荐使用原子操作(atomic)进行变量操作。

互斥锁(sync.Mutex)和 读写互斥锁(sync.RWMutex)

sync.Mutex 互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个 goroutine 可以访问共享资源。在 Go 程序中的使用非常简单,参见下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"sync"
)

var (
// 逻辑中使用的某个变量
count int
// 与变量对应的互斥锁
countGurd sync.Mutex
)

func GetCount() int {
// 锁定
countGuard.Lock()

// 在函数退出时解除锁定
defer countGuard.Unlock()

return count
}

func SetCount(c int) {
countGuard.Lock()
count = c
countGuard.Unlock()
}

func main() {
// 进行安全的并发设置
SetCount(1)
//可以进行并发安全的获取
fmt.Println(GetCount())
}

sync.RWMutex 读写互斥锁

在读多写少的环境中,可以优先使用读写互斥锁(sync.RWMutex),它比互斥锁更加高效。sync 包中的 RWMutex 提供了读写互斥锁的封装。

1
2
3
4
5
6
7
8
9
10
var (
count int
countGuard sync.RWMutex
)

func GetCount() int {
countGuard.RLock()
defer countGuard.RUnlock()
return count
}

在声明 countGuard 时,从 sync.Mutex 互斥锁改为 sync.RWMutex 读写互斥锁。
获取 count 的过程是一个读取 count 数据的过程,适用于读写互斥锁。在这一行,把 countGuard.Lock() 换做 countGuard.RLock(),将读写互斥锁标记为读状态。如果此时另外一个 goroutine 并发访问了 countGuard,同时也调用了 countGuard.RLock() 时,并不会发生阻塞。

sync.WaitGroup 等待组

除了可以使用通道(channel)和互斥锁进行两个并发程序间的同步外,还可以使用等待组进行多个任务的同步,等待组可以保证在并发环境中完成指定数量的任务。
等待组有下面几个方法可用:

方法名 功能描述
(wg *WaitGroup) Add(delta int) 等待组的计数器 +1
(wg *WaitGroup) Done() 等待组的计数器 -1
(wg *WaitGroup) Wait() 当等待组的计数器不等于0时阻塞到变为0

等待组内部拥有一个计数器,计数器的值可以通过方法调用实现计数器的增加和减少。当我们添加了 N 个并发任务进行工作时,就将等待组的计数器值增加 N。每个任务完成时,这个值减 1。同时,在另外一个 goroutine 中等待这个等待组的计数器值为 0 时,表示所有任务已经完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"fmt"
"net/http"
"sync"
)

func main() {

//声明一个等待组
var wg sync.WaitGroup

//准备一组url
var url = []string{
"https://www.github.com/",
"https://www.python.org/",
"https://www.golangtc.com/",
}

// 进行遍历
for _, url := range urls {
//每个任务开始的时候,将等待组增加1
wg.Add(1)

// 开启一个匿名函数并发
go func(url string) {
//使用defer,表示函数完成时将等待组减1
defer wg.Done

//使用http访问提供的url
_, err := http.Get(url)
fmt.Println(url, err)
} (url)
}

wg.Wait()
fmt.Println("over")
}