引言
channel 是 golang 的最重要的一个结构,是区别于其他高级语言的最重要的特色之一,也是 goroutine 通信必须要的要素之一。下文将基于golang1.14从channel的数据结构&收、发操作的代码实现,进一步了解channel。
hchan struct
hchan 中的所有属性大致可以分为三类:
- buffer 相关的属性。例如 buf、dataqsiz、qcount 等。 当 channel 的缓冲区大小不为 0 时,buffer 中存放了待接收的数据。使用 ==环形队列==(ring buffer) 实现,FIFO。
- waitq 相关的属性,可以理解为是一个 FIFO 的标准队列。其中 recvq 中是正在等待接收数据的 goroutine,sendq 中是等待发送数据的 goroutine。waitq 使用==双向链表==实现。
- 其他属性,例如 lock、elemtype、closed 等。
type hchan struct {
qcount uint // 队列中数据个数
dataqsiz uint // channel大小
buf unsafe.Pointer// 存放数据的环形数组
elemsize uint16 // channel中数据类型的大小
closed uint32 // 表示channel是否关闭
elemtype *_type // 元素数据类型
sendx uint // buffer 中已发送的索引位置 send index
recvx uint // buffer 中已接收的索引位置 receive index
recvq waitq // 等待接收的 goroutine list of recv waiters
sendq waitq // 等待发送的 goroutine list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
我们可以看到
- channel 其实就是一个队列加一个锁,只不过这个锁是一个轻量级锁。
- recvq 是读操作阻塞在 channel 的 goroutine 列表,sendq 是写操作阻塞在 channel 的 goroutine 列表。
- 链表的实现是 sudog,其实就是一个对 g 的结构的封装。
makechan
- 参数校验 2-15行
- 初始化hchan 17-37行
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
// 元素类型大小限制,不能啥也放
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 对齐限制
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0: // 没有buffer,只分配hchan结构体
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素不含指针
// Allocate hchan and buf in one call. hchan和buffer一起分配,内存块连续
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers. hchan和buffer单独分配
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size) // channel里元素的大小
c.elemtype = elem // 表示channel里放的是啥
c.dataqsiz = uint(size) // 数组大小
return c
}
chansend
chansend 函数是在编译器解析到 c <- x
这样的代码的时候插入的,本质上就是把一个用户元素投递到 hchan 的 ringbuffer 中。chansend 调用的时候,一般用户会遇到三种情况:
- 投递成功,非常顺利,正常返回true
- 投递受阻,该函数阻塞,goroutine 切走
- 投递失败返回false
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 各种前置检测
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 开始了,开始了
// 安全第一,先锁起来
lock(&c.lock)
// 向已关闭的channel发东西,panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 场景1:发的时候刚好有人等着收,不需要走buffer,所以性能最好
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 场景2:还有空间,放入buffer,索引增加
if c.qcount < c.dataqsiz {
// 复制,相当于 c.buf[c.sendx]
qp := chanbuf(c, c.sendx)
// 数据拷贝到buffer中
typedmemmove(c.elemtype, qp, ep)
c.sendx++
// 环形
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 存储元素个数增加
c.qcount++
unlock(&c.lock)
return true
}
// 场景3
// 如果是非阻塞直接返回
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// goroutine相关结构入队列,等待唤醒
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// 将 goroutine 转入 waiting 状态,并解锁,用户侧看就是阻塞住了。
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 资源释放
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
golang内执行 ==<- x== 会调用chansend函数,会有三种场景:
场景一:如果有人( goroutine )等着取 channel 的元素,这种场景最快乐,直接把元素给他就完了,然后把它唤醒,hchan 本身递增下 ringbuffer 索引;举一反三:kafka也有这种高效操作。
场景二:如果 ringbuffer 还有空间,那么就把元素存着,这种也是场景的流程,存和取走的是异步流程,可以把 channel 理解成消息队列,生产者和消费者解耦;
场景三:ringbuffer 没空间,这个时候就要是否需要 block 了,一般来讲,
c <- x
编译出的代码都是block = true
,那么什么时候 chansend 的 block 参数会是 false 呢?答案是:select 的时候;
select {
case c <- v:
// ... foo
default:
// ... bar
}
// 编译后
if selectnbsend(c, v) {
// ... foo
} else {
// ... bar
}
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
// 调用 chansend 函数,block 参数为 false;
return chansend(c, elem, false, getcallerpc())
}
chanrecv
chanrecv 函数是在编译器解析到 <- c
这样的代码的时候插入的,本质上就是从sender或 hchan 的 ringbuffer 中取一个元素。chanrecv 调用的时候,一般用户会遇到三种情况:
- 接收成功,非常顺利,正常返回元素,true
- 接收受阻,该函数阻塞,goroutine 切走
- 接收失败返回nil,false
// <- c 对应
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// v, ok := <- c 对应
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// 除了select的时候,block都是true
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 先各种判断
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// incorrect behavior when racing with a close.
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 上锁,干活
lock(&c.lock)
// 关了且队列里没数据了
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 场景1:你想要的时候刚好有人可以给,直接交到手里就好了
if sg := c.sendq.dequeue(); sg != nil {
// 如果 buffer 内没有剩余的元素,直接从sender拿数据,否则,从buffer的头部拿,并将sender的值放到buffer的尾部,拿一个立马在原位置放一个,能一定程度上保证有序性。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// ringbuffer内还有没拿的元素
if c.qcount > 0 {
// 从队列拿
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
// 加索引值
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 没有人等着给,又没有存货,block住了
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 入队等待唤醒
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// goroutine切走,让出cpu
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
可以看到send和recv代码和处理情况基本一样:
- 如果是非阻塞模式( block=false ),并且没有任何可用元素,返回 (selected=false,received=false),这样就不会进到 select 的 case 分支;
- 如果是阻塞模式( block=true ),如果 chan 已经 closed 了,那么返回的是 (selected=true,received=false),说明需要进到 select 的分支,但是是没有取到元素的;
- 如果是阻塞模式,chan 还是正常状态,那么返回(selected=true,recived=true),说明正常取到了元素;
select部分和send基本一致,在编译时block参数设为false,不再重复。不过recv还可以通过range的方式进行。
for循环
for-range
和 chan 的结束条件只有这个 chan 被 close 了,否则一直会处于这个死循环内部,因为block参数为true。for m := range c { // ... do something } func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { // 注意了,这个 block=true,说明 chanrecv 内部是阻塞的; _, received = chanrecv(c, elem, true) return } // 伪代码 for ( ; ok = chanrecv2( c, ep ) ; ) { // do something }
参考
http://legendtkl.com/2017/08/06/golang-channel-implement/