1. channel (go)实现
首先明确go语言的设计模块:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。这样在我看来让 go 语言代码更加整洁。因此 go 语言中 Goroutine 之间会通过 Channel 传递数据。基于go 1.15 版本,Channel 的实现:
1.1 Channel 底层数据结构
chan 的底层数据结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq lock mutex }
|
chan 使用 make 关键字创建,可以带缓冲区的异步 Channel 和不带缓冲区的同步 Channel。这里对创建过程不做赘述。基本上分为三种情况:
- 如果 Channel 不存在缓冲区,那么就只会给 hchan 结构体分配内存空间
- 如果 Channel 存储的类型不是指针类型,会为当前的 Channel 和底层的缓冲区(hchan.buf)分配一块连续的内存空间
- 其他情况会单独给 hchan 和缓冲区(hchan.buf)分配内存
1.2 写数据
向 Channel 写数据,chan <- data
这样的操作最终会调用 runtime.chansend
函数,如下代码只保留了关键逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { lock(&c.lock) if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) gp.waiting = nil gp.activeStackChans = false gp.param = nil mysg.c = nil releaseSudog(mysg) return true }
|
如上面代码的注释,总结下写 Channel 的几种情况
- 如果 Channel 的接收队列上存在已经被阻塞的 Goroutine,那么会直接将数据发送给这个 Goroutine,并设置其为下一个可运行的 Goroutine
- 如果 Channel 存在缓冲区并且还有空闲的容量,则将数据存储在缓冲区
- 如果不满足上述条件。设置为非阻塞,直接返回;设置为阻塞,则会创建一个 runtime.sudog 结构并加入 Channel 的发送队列中,当前 Goroutine 陷入阻塞等待其他 Goroutine 从 Channel 接收数据。
1.3 读数据
从 Channel 读数据,i <- chan
这样的操作最终会调用 runtime.chanrecv
函数,如下代码只保留了关键逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { lock(&c.lock) if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } if c.qcount > 0 { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) gp.waiting = nil gp.activeStackChans = false closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
|
读操作主要逻辑如上代码,总结下读 Channel 的几种情况:
- 如果 Channel 的发送队列中存在挂起的 Goroutine,则会将接收位置索引所在的数据拷贝到接收变量所在的内存空间,并将发送队列中的头部 Goroutine 的数据拷贝到缓冲区,且释放这个 Goroutine
- 如果 Channel 的缓冲区中包含数据,则直接拷贝接收位置索引对应的数据
- 如果 Channel 的缓冲区无数据且为阻塞模式,则挂起当前 Goroutine,将 runtime.sudog 结构加入接收队列并陷入睡眠等待调度器的唤醒
1.4 总结 Channel
从 Channel 的实现可以看出,Channel 是一个多读多写的线程安全队列。通过使用 Channel 来同步对内存的访问,实际上就是在使用锁。而对比测试 sync 包中的互斥锁和 Channel,使用 Go 内置的基准测试,得出如下数据:
1 2 3
| BenchmarkSimpleSet-8 3000000 391 ns/op # mutex BenchmarkSimpleChannelSet-8 1000000 1699 ns/op # buffer channel BenchmarkSimpleChannelSet-8 1000000 2252 ns/op # no buffer channel
|
数据来自:https://bravenewgeek.com/go-is-unapologetically-flawed-heres-why-we-use-it/
就 Channel 单纯从线程安全队列的实现角度来看的话,应该还有很大的提升空间。尤其是锁的范围太大,Golang 开发者也应该是这样想的,可以解释为什么在Golang 的标准库,比如“net/http”的实现中几乎找不到 Channel,几乎全是 Mutex。 不过 Channel 更适合做一种协调模式,一种用通信来解决共享内存的安全性的方法。大部分场景使用 Channel 已经完全可以应付或解决多线程安全性问题。如果对性能特别特别敏感,那就要考虑清楚使用 Channel 了。