Golang-Channel底层是怎么实现的?


引言

channel 是 golang 的最重要的一个结构,是区别于其他高级语言的最重要的特色之一,也是 goroutine 通信必须要的要素之一。下文将基于golang1.14从channel的数据结构&收、发操作的代码实现,进一步了解channel。

hchan struct

hchan 中的所有属性大致可以分为三类:

  1. buffer 相关的属性。例如 buf、dataqsiz、qcount 等。 当 channel 的缓冲区大小不为 0 时,buffer 中存放了待接收的数据。使用 ==环形队列==(ring buffer) 实现,FIFO。
  2. waitq 相关的属性,可以理解为是一个 FIFO 的标准队列。其中 recvq 中是正在等待接收数据的 goroutine,sendq 中是等待发送数据的 goroutine。waitq 使用==双向链表==实现。
  3. 其他属性,例如 lock、elemtype、closed 等。
type hchan struct {
  qcount		uint					// 队列中数据个数
  dataqsiz	uint					// channel大小
  buf				unsafe.Pointer// 存放数据的环形数组
  elemsize	uint16				// channel中数据类型的大小
  closed		uint32				// 表示channel是否关闭
  elemtype	*_type				// 元素数据类型
  sendx			uint					// buffer 中已发送的索引位置 send index
  recvx			uint					// buffer 中已接收的索引位置 receive index
  recvq			waitq					// 等待接收的 goroutine list of recv waiters
  sendq			waitq					// 等待发送的 goroutine list of send waiters
  // lock protects all fields in hchan, as well as several
  // fields in sudogs blocked on this channel.
  //
  // Do not change another G's status while holding this lock
  // (in particular, do not ready a G), as this can deadlock
  // with stack shrinking.
  lock			mutex
}

type waitq struct {
  first *sudog
  last	*sudog
}

type sudog struct {
  g						*g
  selectdone	*uint32 // CAS to 1 to win select race (may point to stack)
  next				*sudog
  prev				*sudog
  elem				unsafe.Pointer // data element (may point to stack)
  acquiretime int64
  releasetime int64
  ticket      uint32
  parent      *sudog // semaRoot binary tree
  waitlink    *sudog // g.waiting list or semaRoot
  waittail    *sudog // semaRoot
  c           *hchan // channel
}

我们可以看到

  1. channel 其实就是一个队列加一个锁,只不过这个锁是一个轻量级锁。
  2. recvq 是读操作阻塞在 channel 的 goroutine 列表,sendq 是写操作阻塞在 channel 的 goroutine 列表。
  3. 链表的实现是 sudog,其实就是一个对 g 的结构的封装。

makechan

  1. 参数校验 2-15行
  2. 初始化hchan 17-37行
func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
  // 元素类型大小限制,不能啥也放
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
  // 对齐限制
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	var c *hchan
	switch {
	case mem == 0: // 没有buffer,只分配hchan结构体
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 元素不含指针
		// Allocate hchan and buf in one call. hchan和buffer一起分配,内存块连续
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers. hchan和buffer单独分配
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size) // channel里元素的大小
	c.elemtype = elem // 表示channel里放的是啥
	c.dataqsiz = uint(size) // 数组大小

	return c
}

chansend

chansend 函数是在编译器解析到 c <- x 这样的代码的时候插入的,本质上就是把一个用户元素投递到 hchan 的 ringbuffer 中。chansend 调用的时候,一般用户会遇到三种情况:

  1. 投递成功,非常顺利,正常返回true
  2. 投递受阻,该函数阻塞,goroutine 切走
  3. 投递失败返回false
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // 各种前置检测
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// channel wasn't closed during the first observation.
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	// 开始了,开始了
  // 安全第一,先锁起来
	lock(&c.lock)

  // 向已关闭的channel发东西,panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	
  // 场景1:发的时候刚好有人等着收,不需要走buffer,所以性能最好
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
	// 场景2:还有空间,放入buffer,索引增加
	if c.qcount < c.dataqsiz {
    // 复制,相当于 c.buf[c.sendx]
		qp := chanbuf(c, c.sendx)
    // 数据拷贝到buffer中
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
    // 环形
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
    // 存储元素个数增加
		c.qcount++
		unlock(&c.lock)
		return true
	}
  // 场景3
	// 如果是非阻塞直接返回
	if !block {
		unlock(&c.lock)
		return false
	}

	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
  // goroutine相关结构入队列,等待唤醒
	c.sendq.enqueue(mysg)
	atomic.Store8(&gp.parkingOnChan, 1)
  // 将 goroutine 转入 waiting 状态,并解锁,用户侧看就是阻塞住了。
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
  // 资源释放
	gp.waiting = nil
	gp.activeStackChans = false
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

golang内执行 ==<- x== 会调用chansend函数,会有三种场景:

  1. 场景一:如果有人( goroutine )等着取 channel 的元素,这种场景最快乐,直接把元素给他就完了,然后把它唤醒,hchan 本身递增下 ringbuffer 索引;举一反三:kafka也有这种高效操作。

  2. 场景二:如果 ringbuffer 还有空间,那么就把元素存着,这种也是场景的流程,存和取走的是异步流程,可以把 channel 理解成消息队列,生产者和消费者解耦;

  3. 场景三:ringbuffer 没空间,这个时候就要是否需要 block 了,一般来讲,c <- x 编译出的代码都是 block = true ,那么什么时候 chansend 的 block 参数会是 false 呢?答案是:select 的时候;

select {
  case c <- v:
  // ... foo
  default:
  // ... bar
}

// 编译后
if selectnbsend(c, v) {
  //  ... foo
} else {
  //  ... bar
}

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  // 调用 chansend 函数,block 参数为 false;
  return chansend(c, elem, false, getcallerpc())
}

chanrecv

chanrecv 函数是在编译器解析到 <- c 这样的代码的时候插入的,本质上就是从sender或 hchan 的 ringbuffer 中取一个元素。chanrecv 调用的时候,一般用户会遇到三种情况:

  1. 接收成功,非常顺利,正常返回元素,true
  2. 接收受阻,该函数阻塞,goroutine 切走
  3. 接收失败返回nil,false
// <- c 对应
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}

// v, ok := <- c 对应
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}

// 除了select的时候,block都是true
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// 先各种判断
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// incorrect behavior when racing with a close.
	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}
	
	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

  // 上锁,干活
	lock(&c.lock)

  // 关了且队列里没数据了
	if c.closed != 0 && c.qcount == 0 {
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
	
  // 场景1:你想要的时候刚好有人可以给,直接交到手里就好了
	if sg := c.sendq.dequeue(); sg != nil {
    // 如果 buffer 内没有剩余的元素,直接从sender拿数据,否则,从buffer的头部拿,并将sender的值放到buffer的尾部,拿一个立马在原位置放一个,能一定程度上保证有序性。
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
	// ringbuffer内还有没拿的元素
	if c.qcount > 0 {
		// 从队列拿
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
    // 加索引值
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}
	
	// 没有人等着给,又没有存货,block住了
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
  // 入队等待唤醒
	c.recvq.enqueue(mysg)
	atomic.Store8(&gp.parkingOnChan, 1)
  // goroutine切走,让出cpu
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}

可以看到send和recv代码和处理情况基本一样:

  1. 如果是非阻塞模式( block=false ),并且没有任何可用元素,返回 (selected=false,received=false),这样就不会进到 select 的 case 分支;
  2. 如果是阻塞模式( block=true ),如果 chan 已经 closed 了,那么返回的是 (selected=true,received=false),说明需要进到 select 的分支,但是是没有取到元素的;
  3. 如果是阻塞模式,chan 还是正常状态,那么返回(selected=true,recived=true),说明正常取到了元素;

select部分和send基本一致,在编译时block参数设为false,不再重复。不过recv还可以通过range的方式进行。

  • for循环

    for-range 和 chan 的结束条件只有这个 chan 被 close 了,否则一直会处于这个死循环内部,因为block参数为true。

    for m := range c {
        // ...   do something
    }
    
    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        // 注意了,这个 block=true,说明 chanrecv 内部是阻塞的;
        _, received = chanrecv(c, elem, true)        
        return
    }
    
    // 伪代码
    for (   ; ok = chanrecv2( c, ep )  ;   ) {
        // do something
    }

参考

http://legendtkl.com/2017/08/06/golang-channel-implement/

https://zhuanlan.zhihu.com/p/297053654

https://www.cyhone.com/articles/analysis-of-golang-channel/


  目录