Go语言中Channel的底层结构
Little_YangYang

前言

Channel主要用于实现goroutinue之间的通信,以通信的方式来共享内存,其作为Go的底层实现赋能了Go语言并发的数据共享特性,同时由于其设计时就解决了同步问题,我们的程序逻辑只需要保证数据所有权随着channel的通信传递即可,channel主要分为有缓冲和无缓冲的两种。

本文聚合了对于channel的所有学习内容,内容较长,没图不烧猫。

注:本文大量参考了封幼林《深度探索Go语言》、Go 1.25rc1源代码,对于疑惑的地方使用了Gemini 2.5 pro问答,请注意甄别版本。

内存布局

ch := make(chan int)会在堆上分配一个runtime.hchan类型的数据结构,ch本身是存放在函数栈帧上的一个指针,指向堆上的hchan数据结构,由于channel是一个较"重"的结构,所以无法存放于栈帧上,其结构体为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type hchan struct {  
qcount uint // 队列中当前元素数量
dataqsiz uint // 队列长度(环形)
buf unsafe.Pointer // 数组地址
elemsize uint16 //元素大小
closed uint32 // 记录是否已关闭

timer *timer // 专为timer进行优化

elemtype *_type // 元素类型
sendx uint // 发送索引,下一个写下标的位置
recvx uint // 接收索引,下一个应该读取的下标位置
recvq waitq // 接收等待队列
sendq waitq // 发送等待队列
bubble *synctestBubble //仅在开启了synctest构建标签(build tag)时才有效的调试字段

// 并发保护锁
lock mutex
}

有缓冲和无缓冲

Channel有缓冲区时可以为非同步操作,需要对应的内存来存储缓冲区数据,发送操作可以在缓冲区未满时非阻塞完成,无需等待接收者取出,对于接收者来说亦然,可直接在缓冲区内取出数据,但若缓冲区为空则会阻塞

Channel无缓冲区时必定为同步操作,发送者和接收者均互相阻塞等待,完成同步操作。

timer字段

hchan 中的 timer 字段是专门为time.Timertime.Ticker创建的channel所做的一项性能优化。它允许select语句在处理定时器channel时,可以绕过常规的channel发送/接收流程,直接检查定时器的状态,从而大幅提升性能。

这个字段不为 nil 的情况,仅限于这个channel是由time.NewTimertime.Aftertime.NewTicker 等标准库函数在内部创建的。对于我们自己通过 make(chan …) 创建的普通 channel,这个字段永远是 nil。

1
2
3
4
5
6
select {
case <- myChan:
// ...
case <- time.After(1 * time.Second):
// 超时
}

对于上述代码,若无该优化,则time.After(1 * time.Second) 会返回一个 <-chan Time。Runtime 会将这个 timer 对象添加到一个全局的、按时间排序的定时器堆(timer heap)中。runtime的timerproc会计算出下一个最近要到期的定时器是哪个,然后 sleep 到那个时间点,当时间到了timerproc被唤醒,从定时器堆中取出到期的 timer 对象,向这个 timer 内部的 channel 发送一个当前时间的值

问题在于涉及一次goroutine的调度和上下文切换(唤醒 timerproc,然后 timerproc 发送数据后再唤醒等待 select 的 goroutine),对于高频使用 select 和定时器的场景,这个开销会很明显。

于是对该场景进行了优化,当 time.NewTimer time.NewTicker 创建 channel 时,runtime 不仅创建了 hchan 结构,还会创建一个内部的 timer 结构。然后,它会将 hchan.timer 指针指向这个内部的 timer 结构,建立起关联,当 select 语句(runtime.selectgo)扫描它的 case 时,如果发现其中一个 case 的 channel 的 hchan.timer 字段不为 nil,它就知道这是一个定时器 channel。selectgo 不会像对待普通 channel 那样去检查 channel 是否有数据或是否有等待的发送者。相反,它会:

  • 通过 hchan.timer 指针直接访问到那个内部的 timer 对象。
  • 直接读取 timer 对象的状态,比如它的到期时间 when。
  • 将 when 和当前时间进行比较

而后有如下逻辑:

  • 如果 当前时间 >= timer.when,说明定时器已经到期。selectgo 就会立即选择这个 case 执行,就好像真的从 channel 接收到了数据一样。但实际上,timerproc可能根本还没来得及向这个 channel 发送数据。
  • 如果 当前时间 < timer.when,说明定时器还没到期。selectgo 会将这个 goroutine 放入等待队列,并记录下需要被唤醒的时间点是 timer.when。

发送(send)操作

阻塞式

对于Channel的常规Send操作,对于上述ch := make(chan int)来说,常规代码如下:

1
ch <- 10

其中ch可能有缓冲也可能无缓冲,甚至可能会为nil,一般有两种情况不会让send操作阻塞:

  1. 通道ch的recvq中已有goroutinue在等待。
  2. 通道ch有缓冲,并且缓冲区没有用尽

在第一种情况下,当前协程会直接将数据交给recvq队首的协程,然后双方继续执行,无关有无缓冲,若有缓冲区且没有用尽,则会将数据追加到缓冲数组中,继续执行。

同样的,有3种情况可能会让send操作阻塞:

  1. 通道ch为nil
  2. 通道ch无缓冲且recvq为空
  3. 通道ch有缓冲区但用尽缓冲区

对于nil情况下的ch,runtime会允许进行操作,但会使当前协程永久阻塞于这个nil通道上,进而导致死锁抛出异常。

第二种情况下ch为无缓冲通道,recvq没有协程在等待,所以需要去sendq中阻塞等待。

第三种情况由于缓冲区已满,必定recvq为空,否则会直接从缓冲区中提取数据,故等价第二种情况,去sendq中阻塞排队等待。

非阻塞式

对于并发编程中的tryLock式操作,核心思想是尝试获取锁,获取不到就去做其他事,go team负责人rsc认为 TryLock 会鼓励设计者对锁进行不精确的思考,这可能最终会成为 race(竞态) 的根源,虽然为其提供了实现,但非常不建议使用。

对于channel的非阻塞send即为想通过channel发送数据,但是当前没有接收者且没剩余缓冲区的情况下做其他事,可通过下列代码实现:

1
2
3
4
5
6
7
ch := make(chan int, 1)  
select {
case ch <- 10:
//send
default:
//do anything
}

通过select语句亦可实现在多个channel上随机选择可用通道进行操作。

环形缓冲区

Channel的缓冲区为一个形式上的环形缓冲区,recvx指向下一个应该读取(接收者应接收)的下标,sendx指向下一个应该写入(发送者向channel发送)缓冲区的位置下标,其作为一个环形,当recvx与sendx均在该队列上循环移动,依次完成数据的出入。

接收操作

阻塞式

常规recv操作如下所示:

1
2
3
<- ch //丢弃结果
v := <- ch // 从ch中取出结果赋值给变量v
v,ok := <-ch //ok若为false则表示ch已关闭并且v是零值

有两种情况可以不让recv操作阻塞:

  1. 通道sendq已有goroutinue在等待
  2. 通道sendq虽然没goroutinue在等待,但是有缓冲区且有数据

对于第一种情况仍然需要区分有无缓冲区,因为若有缓冲区只有在缓冲区已满的情况下sendq才会开始排队,由于其为FIFO,所以我们应该先从缓冲区内获取数据,若无缓冲则直接从sendq中获取对应goroutinue的数据,并将其置为ready状态,双方均取消同步。

对于第二种情况,则直接从缓冲区中提取数据即可。

对于下列的情况仍然可能会阻塞recv操作:

  1. 通道ch为nil
  2. 通道无缓冲且sendq为空
  3. 通道ch有缓冲且缓冲区无数据

则与send操作类似,要么死锁,要么recv需要去recvq排队。

非阻塞式recv

与send操作类似

1
2
3
4
5
6
7
ch := make(chan int, 1)  
select {
case <- ch:
//recv
default:
//do anything
}

源代码分析

chansend

常规的send操作会调用函数chansend1,函数如下

1
2
3
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, sys.GetCallerPC())
}

非阻塞式的send操作会被编译器转换为对runtime.selectnbsend()函数的调用,后者也只是调用了chansend函数

chansend函数签名

1
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

c是一个hchans结构体指针,指向要send的channel;ep指向要被发送的数据,其类型签名应与channel一致,block表示send操作不能立即完成时是否想要阻塞等待,callerpc用于race监测。

1
2
3
4
5
6
7
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}

上述代码表面若c为nil,进一步判断block,如果为false则直接返回false,否则永久阻塞该goroutinue

1
2
3
if !block && c.closed == 0 && full(c) {
return false
}

如果不想阻塞且未关闭channel的情况下,若已经满了则直接false,防止非阻塞send被阻塞

1
2
3
4
5
6
lock(&c.lock)

if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

给channel加锁,如果通道已经关闭则解锁后panic

1
2
3
4
5
6
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

如果recvq不为空,则代表了缓冲区为空,从中去除第一个排队的协程,将数据传递给这个协程后置为ready状态,然后解锁,返回值为true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if c.qcount < c.dataqsiz {  
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

通过比较qcount和dataqsiz来判断缓冲区是否还有剩余空间,无缓冲的通道会被视为无剩余空间,有剩余空间则追加数据到缓冲区中,并移动sendx下标到下一位,增加qcount,解锁返回True

1
2
3
4
if !block {
unlock(&c.lock)
return false
}

表示channel已满,如果block为false即非阻塞不想等待,则直接解锁后返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
reason := waitReasonChanSend
if c.bubble != nil {
reason = waitReasonSynctestChanSend
}
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)

对于下阻塞的协程,则将当前协程即自己追加到sendq中阻塞排队,gopark函数挂起协程后会调用chanparkcommit函数对通道解锁,直到有接收者接受数据后本协程才会被唤醒

对于send函数来说

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.bubble != nil && getg().bubble != c.bubble {
unlockf()
panic(plainError("send on synctest channel from outside bubble"))
}
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}

其数据传输工作是由sendDirect函数完成的,然后调用unlockf函数解锁hchan,最后通过goready来唤醒接收者协程

1
2
3
4
5
6
7
8
9
10
11
12
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {  
// src is on our stack, dst is a slot on another stack.

// Once we read sg.elem out of sg, it will no longer
// be updated if the destination's stack gets copied (shrunk).
// So make sure that no preemption points can happen between read & use.
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
// No need for cgo write barrier checks because dst is always
// Go memory.
memmove(dst, src, t.Size_)
}

由于发送数据会访问接收者的协程栈,所以sendDirect函数用到了写屏障。

至此完成send操作。

chanrecv

常规recv操作会被转换为runtime.chanrecv1函数

1
2
3
func chanrecv1(c *hchan, elem unsafe.Pointer) {  
chanrecv(c, elem, true)
}

对于有comma ok的写法则会被转换为runtime.chanrecv2函数

1
2
3
4
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {  
_, received = chanrecv(c, elem, true)
return
}

与send类似,非阻塞recv操作会被编译器转换为runtime.selectnbrecv函数或runtime.selectnbrecv2函数(取决于要不要ok)

chanrecv函数签名如下

1
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

c是指向channel结构的指针,ep指向用来接收数据的内存,block表示要不要阻塞

selected为true表示操作完成(可能因为已关闭),false表示目前暂时无法完成,但因不想阻塞而返回

received为true时则表示数据为非通道关闭接收到的,false则需要看selected情况,如果通道关闭而获得0值则selected为true,否则是因为不想阻塞而返回则selected为false

1
2
3
4
5
6
7
8
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}

如果c为nil则进一步判断是否block,否就返回两个false,是则直接调用gopark永久阻塞在该nil通道上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {

if atomic.Load(&c.closed) == 0 {

return
}

if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}

快速路径,是为了让非阻塞的recv在无法立即获取数据时不加锁来实现真正的非阻塞,如果不想阻塞且通道是空的,就判断是通道是否已关闭,因通道不会重复打开,未关闭则返回两个false表示不想阻塞而返回,已关闭就把ep清空后返回true和false,表示因通道关闭而获取到0值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
lock(&c.lock)  

if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}

此处开始加锁,如果closed不为零,则通道已经关闭,先解锁后给ep零值,返回true和false。

反之,通道开放,如果sendq不为空,则从中取出第一个排队的协程sg,如果有缓冲区则需要滚动缓冲区来完成读取,并且让协程sg置为ready状态,解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if c.qcount > 0 {  
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

通过qcount来判断缓冲区是否有数据,无缓冲的情况下视为没有数据,到达这一步的sendq一定为空,如果缓冲区有数据,则将第一个数据取出并给ep,向下移动recvx的下标,减少qcount,解锁,返回两个true

1
2
3
4
5
if !block {
unlock(&c.lock)
return false, false
}

同send,如果block不想阻塞则解锁后返回两个false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg

mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
if c.timer != nil {
blockTimerChan(c)
}

// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
reason := waitReasonChanReceive
if c.bubble != nil {
reason = waitReasonSynctestChanReceive
}
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2)

运行到这一步必定要阻塞当前goroutinue了,先把自己加入到通道的recvq中排队,和send的逻辑类似,gopark会挂起当前协程后调用chanparkcommit来解锁channel,等到后续recv操作完成后会被唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 有人唤醒了本协程
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
if c.timer != nil {
unblockTimerChan(c)
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success

被唤醒可能是因为通道被关闭,所以最后received需要根据唤醒原因来判断,如果等到真实数据则为true,如果因为通道关闭则为false。

对于recv函数来说

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.bubble != nil && getg().bubble != c.bubble {
unlockf()
panic(plainError("receive on synctest channel from outside bubble"))
}
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}

若为无缓冲通道,则直接通过recvDirect函数进行数据拷贝,若有缓冲,即缓冲区已满,sendq不为空,则还需要对缓冲区进行滚动,把缓冲区头部的数据取出来交给接收者,然后把sendq头部协程的数据追加到尾部,并通过goready唤醒发送者协程。

1
2
3
4
5
6
7
8
9
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// dst is on our stack or the heap, src is on another stack.
// The channel is locked, so src will not move during this
// operation.
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
memmove(dst, src, t.Size_)
}

recvDirect和sendDirect类似,要访问其他协程的栈,所以使用写屏障后进行数据复制。

对于多channel的多路select实现

非阻塞式的send和recv均通过select来实现,但对于多路channel的select示例代码:

1
2
3
4
5
6
7
8
9
10
11
ch := make(chan int, 1)  
ch2 := make(chan int, 1)
select {
case v := <-ch:
//case1
fmt.Println(v)
case ch2 <- 10:
//case2
default:
//do anything
}

对于default分支可选,编译器会转换为对runtime.selectgo函数的不同调用,其函数签名如下

1
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

cas0指向一个数组,数组里面装的是select中所有的case分支,按照send在前recv在后的顺序,order0用于指向一个大小等于case分支数量两倍的uint16数组,实际上是作为两个等长数组来使用的,前一个对case中channel的轮询操作进行乱序,后一个用于给case中的channel的加锁操作进行排序,轮询操作是乱序的,避免每次select都按照case的顺序响应,对后面的case是不公平的,加锁则需要按照固定算法来排序,防止出现死锁情况。

pc0和race监测相关,nsends和nrecvs分别表示在case中发送和接收操作的数量。

block表示当所有case都不能立即完成时,是否要阻塞。如果select语句有 default 分支,block就为 false;否则为 true

返回值第一个参数为被选中执行的case在原始select语句中的索引。第二个参数如果被选中的是接收操作,该值表示接收是否成功(即 channel 未关闭)。

在正式检查 channel 状态之前,selectgo`会进行一系列准备工作:

  1. 生成随机轮询顺序 (pollorder):为了保证公平性,避免每次select都从第一个case开始检查,selectgo会将所有非空的channel的case进行一次随机排序。
  2. 生成锁定顺序 (lockorder):为了在对多个 channel 加锁时避免死锁,selectgo会根据 channel 的内存地址对所有case进行排序,并始终按照这个固定的顺序进行加锁。
  3. 锁定所有 channel:在对 channel 进行任何读写检查之前,selectgo会调用sellock函数,按照lockorder的顺序锁住所有相关的 channel。

准备工作完成后,select 的核心逻辑分为三个主要步骤:

查找已就绪的 Channel(非阻塞检查)

这个阶段会以随机顺序遍历所有case,检查是否有可以立即完成的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
var casi int
var cas *scase

for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c

if casi >= nsends { // 这是一个接收 case
sg = c.sendq.dequeue()
if sg != nil { // sendq 中有等待的 goroutine
goto recv
}
if c.qcount > 0 { // 缓冲区有数据
goto bufrecv
}
if c.closed != 0 { // channel 已关闭
goto rclose
}
} else { // 这是一个发送 case
if c.closed != 0 { // channel 已关闭
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil { // recvq 中有等待的 goroutine
goto send
}
if c.qcount < c.dataqsiz { // 缓冲区有空间
goto bufsend
}
}
}

这段代码的逻辑是:

  • 对于接收 case

    1. 检查sendq等待队列,如果发现有正在等待的发送者,则说明可以立即进行一次无缓冲的接收。跳转到recv。
    2. 如果sendq为空,检查缓冲区qcount,如果大于 0,说明可以从缓冲区接收数据。跳转到bufrecv。
    3. 如果缓冲区也为空,检查 channel 是否已关闭,如果已关闭,则可以立即“接收”到零值。跳转到rclose。
  • 对于发送 case

    1. 首先检查 channel 是否已关闭,如果关闭,直接跳转到sclose触发 panic。
    2. 检查recvq等待队列,如果发现有正在等待的接收者,则可以直接把数据发送给它。跳转到send。
    3. 如果recvq为空,检查缓冲区是否已满(qcount < c.dataqsiz),如果未满,则可以将数据放入缓冲区。跳转到bufsend。

如果遍历完所有case都没有找到一个可以立即执行的,则进入下一步。

1
2
3
4
5
if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}

如果block为 false(即存在 default 分支),并且 Pass 1 没有找到任何就绪的 channel,那么就会执行 default 逻辑:解锁所有 channel,并返回 -1 表示没有case被选中。

阻塞等待(将当前 G 加入所有 Channel 的等待队列)

如果所有 case 都不满足,且没有 default 分支,当前 goroutine 就需要被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog() // 为每个 case 创建一个 sudog
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.c = c

*nextp = sg
nextp = &sg.waitlink // 将所有 sudog 链接起来

if casi < nsends {
c.sendq.enqueue(sg) // 发送 case,加入 sendq
} else {
c.recvq.enqueue(sg) // 接收 case,加入 recvq
}
}

// wait for someone to wake us up
gp.param = nil
gopark(selparkcommit, nil, waitReason, traceBlockSelect, 1)

此阶段的核心操作是:

  1. 为每一个case创建一个sudog结构体,代表当前 goroutine 的一次等待操作。
  2. 将所有sudog链接成一个链表,挂在当前 goroutine 的waiting字段上。
  3. 根据case是发送还是接收,将对应的sudog分别加入到其 channel 的sendq或recvq等待队列中。
  4. 调用gopark将当前 goroutine 挂起,陷入沉睡。gopark在挂起前会调用selparkcommit函数,该函数负责解锁所有 channel,以便其他 goroutine 可以操作它们。

当其他 goroutine 向其中一个 channel 发送或接收数据时,会唤醒这个沉睡的 goroutine。唤醒时,会把被满足的那个 case 对应的 sudog 存入当前 goroutine 的 param 字段中。

唤醒与清理

goroutine 被唤醒后,执行会从 gopark 之后继续。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
sellock(scases, lockorder) // 再次锁定所有 channel 以进行清理

gp.selectDone.Store(0)
sg = (*sudog)(gp.param) // 从 param 中获取是哪个 case 唤醒了我们
gp.param = nil


casi = -1
cas = nil
sglist = gp.waiting

for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
// 这是成功唤醒我们的 case
casi = int(casei)
cas = k
caseSuccess = sglist.success
} else {
// 这是未成功的 case,需要从其 channel 的等待队列中移除
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
releaseSudog(sglist) // 释放 sudog
sglist = sgnext
}

唤醒后的清理工作:

  1. 首先,重新锁定所有相关的 channel,以保证清理过程的原子性。
  2. 通过gp.param找到是哪个case(即哪个sudog)成功了,并记录其索引。
  3. 遍历在阻塞等待中创建的所有sudog。对于那些未成功的case,必须将它们的sudog从对应 channel 的等待队列中移除。这一步是select实现的关键,确保了 goroutine 不会“遗留”在其他 channel 的等待队列中,从而避免内存泄漏或逻辑错误。
  4. 释放所有为此次select创建的sudog。

完成清理后,代码会根据成功的case类型(发送或接收),跳转到 send, recv, bufsend, bufrecv 等代码块完成最后的数据拷贝,解锁所有channel,并最终返回。这些收尾代码块与chansend中的逻辑类似,例如send会调用runtime.send函数,将数据直接拷贝给等待的接收者并唤醒它。