Golang学习 - 通道(channel)
hanpy

Channel 类型是 Go 语言自带的唯一一个可以满足并发安全性的类型,一个通道相当于一个先进先出的队列,各个元素值都是严格按照发送的顺序排列的,先被发送到通道的元素值一定会先被接收。

通道的基本使用

通道的声明与初始化

在声明时,channel必须与一个实际的类型T绑定在一起,代表通道中能够读取和传递的元素类型。通道的表示形式有如下有三种:chan T、chan←T、←chan T

1
2
3
4
5
6
// 如果只是声明,那通道的值是nil
var ch chan int
fmt.Println(ch) // nil

var ch1 = make(chan int) // 无缓冲通道
var ch2 = make(chan int, 5) // 有缓冲通道

通道写入数据

通道的写入语法很简单

1
2
ch1 := make(chan int)
ch1 <- 1

无缓冲通道的写入还有一个地方需要注意,能够向通道写入数据的前提是必须有另一个协程在读取通道,否则当前的协程就会陷入休眠,如果读和写在一个协程中,还会发生死锁。

1
2
3
ch1 := make(chan int)
ch1 <- 1
// fatal error: all goroutines are asleep - deadlock!
1
2
3
4
5
// 无缓冲通道的写入
ch1 := make(chan int)
ch1 <- 1
<-ch1
// fatal error: all goroutines are asleep - deadlock!

通道读取数据

通道中读取数据可以直接使用←c

1
2
3
4
// 方式1
data := <- ch
//方式2 data 表示接收到的数据。未接收到数据时,data为channel类型的零值,ok(布尔类型)表示是否接收到数据
data,ok := <- ch

和写入数据一样,如果不能直接读取通道的数据,那么当前的读 取协程将陷入堵塞,直到有协程写入通道为止

循环接收

for循环需要判断通道关闭,需要自己根据情况来跳出循环,否则就会一直取到通道类型0值的数据
for...range会自动判断出channel已关闭,而无须通过判断来终止循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import "fmt"

func main() {
ch := make(chan int)

// 写入
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}()

// 1. for接收
for {
out,ok := <-ch
if !ok {
fmt.Println("通道已关闭")
break
}
fmt.Printf("接收数据 ==> %v \n", out)
}

// 2. for...range接收
//for i := range ch {
// fmt.Printf("接收数据 ==> %v \n", i)
//}

fmt.Println("程序运行结束!")
}

通道关闭

通道的关闭,需要用到内置的close函数

1
2
ch := make(chan int)
close(ch)

重复关闭一个channel将导致panic异常,试图关闭一个nil值 的channel也将导致panic异常。

通道底层原理

通道数据结构

go/src/runtime/chan.go | hchan

1
2
3
4
5
6
7
8
9
10
11
12
13
type hchan struct {
qcount uint // 通道队列中的数据个数
dataqsiz uint // 通道队列中的数据大小
buf unsafe.Pointer // 存放实际数据的指针
elemsize uint16 // 通道类型大小
closed uint32 // 通道是否关闭
elemtype *_type // 通道类型
sendx uint // 记录发送者在buf中的序号
recvx uint // 记录接受者在buf中的序号
recvq waitq // 读取的阻塞协程队列
sendq waitq // 写入的阻塞协程队列
lock mutex // 锁,并发保护
}

对于有缓存的通道,存储在buf中的数据虽然是线性的数组,但是 用数组和序号recvx、recvq模拟了一个环形队列,recvx可以找到从buf哪个位置获取通道中的元素,而sendx能够找到写 入时放入buf的位置

《Go语言底层原理剖析》中的图

image

通道初始化

使用make初始化通道,经过类型检查之会转换为OMAKE类型的节点,最后会调用了makechan函数,第1个参数代表通道的类型,第2个参数代表通道中元素的大小
go/src/runtime/chan.go | makechan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// t:通道类型
// size:通道元素的大小
func makechan(t *chantype, size int) *hchan {
elem := t.elem

// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}

// 计算为元素分配的大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

var c *hchan
switch {
case mem == 0: // 当分配的大小为0时,只用在内存中分配hchan结构体的大小即 可。
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0: // 当通道的元素中不包含指针时,连续分配hchan结构体大小+size 元素大小。
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 当通道的元素中包含指针时,需要单独分配内存空间,因为当元 素中包含指针时,需要单独分配空间才能正常进行垃圾回收。
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}

通道写入原理

go/src/runtime/chan.go | chansend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 通过为nil,休眠
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

// ---省略代码---

// 加锁
lock(&c.lock)

// 通道被关闭,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// 存在等待的接收者时,通过 runtime.send 直接将数据发送给阻塞的接收者
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

// 当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据

if !block {
unlock(&c.lock)
return false
}

// 在 channel 上阻塞,receiver 会帮我们完成后续的工作
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}

mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)

atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}

在写入的开始就加锁了,可以看出通道是并发安全的。
写入元素时,分成了3种不同的情况

1. 有正在等待的读取协程
hchan结构中的recvq字段存储了正在等待的协程链表,每个 协程对应一个sudog结构,它是对协程的封装,包含了准备获取的协程 中的元素指针等。
c.recvq.dequeue()会返回第一个等待的协程,send函数将元素直接复制到对应的 协程中,再唤醒被堵塞的协程

1
2
3
4
5
6
7
8
9
10
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// ...
}

go/src/runtime/chan.go | send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// Gwaiting -> Grunnable
goready(gp, skip+1)
}

sendDirect将发送的数据直接拷贝到对应的地址上
goready将等待接收数据的Goroutine标记成可运行状态Grunnable并把该Goroutine放到发送方所在的处理器的runnext上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;

2. 缓冲区有空余
Channel包含缓冲区并且Channel中的数据没有装满的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    
// ---省略代码---
if c.qcount < c.dataqsiz {
// 计算出下一个可以存储数据的位置
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
// 将发送的数据拷贝到缓冲区中并增加 sendx 索引和 qcount 计数器
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// ---省略代码---
}

3. 缓冲区无空余(会阻塞发送协程)
如果当前通道无缓冲区或者当前缓冲区已经满了,则代表当前协程的sudog结构需要放入sendq链表末尾中,并且当前协程陷入休眠状态,等待被唤醒重新执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
if !block {
unlock(&c.lock)
return false
}

// 获取当前发送数据使用的协程(Goroutine)
gp := getg()
// 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息(其实就是包装sudog)
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}

mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil

// 将刚刚创建并初始化的 runtime.sudog 加入发送等待队列,
// 设置到当前 Goroutine 的 waiting 上,表示 Goroutine 正在等待该 sudog 准备就绪;
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// 将这个发送 g 从 Grunning -> Gwaiting
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}

通道读取原理

读取通道和写入通道的原理非常相似,在运行时调用了chanrecv函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ---省略代码---

// Channel 的 sendq 队列中包含处于等待状态的 Goroutine 时,该函数会取出队列头等待的 Goroutine
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

// 当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 recvx 的索引位置中取出数据进行处理
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

// 获取当前发送数据使用的协程(Goroutine)
gp := getg()
// 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息(其实就是包装sudog)
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}

mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)

atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)


if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}