继续分享 Go 调度器系列文章第四篇:GMP 模型调度策略。沿着思路,我们已经聊过:什么是 GMP 、 GMP 如何启动调度、GMP 的调度时机,本篇文章将是 GMP 系列的最后一篇文章,我们来聊一聊 GMP 的调度策略,了解一下是什么样的调度策略,能够为 Go 程序提供如此快的并发性能!在本篇文章中,你可以了解到以下内容:
本文专业术语解释:
Go 调度器系列文章(阅读前面的文章,有助于理解本文细节内容):
源码解读环境:Go 版本 1.20.7、linux 系统
通过对 GMP 系列文章的学习,我们知道调度循环是从 schedule 函数开始的,今天我们就从这个函数入手,详细分析一下 GMP 的调度策略。我们先阅读一下源码,随后画个流程图详细分析!
源码:runtime/proc.go 3349
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
mp := getg().m
...
// 如果 M 锁定了执行的 G
if mp.lockedg != 0 {
// 停止执行锁定到 g 的当前 m,直到 g 再次可运行。m 被阻塞在 m.park
stoplockedm()
// m 被唤醒,运行锁定的 g
execute(mp.lockedg.ptr(), false) // Never returns.
}
...
top:
pp := mp.p.ptr()
pp.preempt = false
...
// 获取一个可运行的 G,可能会阻塞直到有可运行的任务
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
// 这个线程将运行一个 goroutine 并且不再旋转,
// 因此,如果它被标记为旋转,我们现在需要重置它,并可能启动一个新的旋转 M。
if mp.spinning {
resetspinning()
}
// 处理被禁止调度的 G
...
// 如果要调度一个非正常的 goroutine(GCworker 或 Tracereader),则唤醒 P(如果有)。
if tryWakeP {
wakep()
}
// 如果 G 锁定了执行的 M'
if gp.lockedm != 0 {
// 解除 G 所在的 P 和当前 M 的关系
// 由 M' 接管 P,唤醒 M‘
// 阻塞 M 进入睡眠
startlockedm(gp)
// M 被重新唤醒,回到 top 开启新的一次调度循环
goto top
}
// 执行 gp
execute(gp, inheritTime)
}
schedule 函数逻辑也比较简单,我们总结一下:
stoplockedm 和 startlockedm 函数比较简单,这里带着大家过一下:
stoplockedm 源码:runtime/proc.go 2564
// 停止执行锁定到g的当前m,直到g可以再次运行为止
// 返回获取的P
func stoplockedm() {
gp := getg()
...
if gp.m.p != 0 {
// Schedule another M to run this p.
pp := releasep() // 释放 P 到空闲状态,解除 P 和 M 之间的关系
handoffp(pp) // 寻找一个 M 接管 P
}
// 增加被锁定的空闲 M 数量
incidlelocked(1)
// Wait until another thread schedules lockedg again.
// M 通过 notesleep 阻塞在 m.park 字段
mPark()
...
acquirep(gp.m.nextp.ptr())
gp.m.nextp = 0
}
stoplockedm 函数主要逻辑总结:
startlockedm 源码:runtime/proc.go 2594
// 调度锁定的 M 执行锁定的 G
func startlockedm(gp *g) {
mp := gp.lockedm.ptr() // 获取G 锁定的 M'
...
// directly handoff current P to the locked m
incidlelocked(-1) // 锁定的空闲 M 数量 -1
pp := releasep() // 解除 G 所在的 P 和当前 M 的关系
// 由 M' 接管 P,需要提前绑定到 m.nextp,后续 M' 被唤醒
// 可以直接使用 m.nextp 绑定 P,然后执行 G
mp.nextp.set(pp)
notewakeup(&mp.park) // 唤醒 M‘
stopm() // 阻塞 M 进入睡眠
}
startlockedm 函数主要逻辑总结:
阻塞和唤醒 M 的代码逻辑稍微有点割裂,看起来有点费劲,这里举例一个具体的场景,并画了流程图进行分析:
这幅图看着有点复杂,但是逻辑是很简单的,为了照顾新来的朋友,我们这里简单解释一下:
当我们聊清楚被锁定的 M、G 的调度策略以后,后续就属于正常的调度了,从图中可以看出,红色步骤为调度执行的主流程图 schedule -> findRunnable -> execute -> gogo -> mcall -> schedule,其中最重要的就是 findRunnable 函数代表的调度策略了,接下来我们就移步进入 findRunnable 函数。
findRunnable 总共 278 行代码,是调度器中的一个核心函数,它的主要任务是从各种队列中找到一个可以执行的 Goroutine,主要逻辑分为三部分:
由于这块代码过于复杂,只能分块去讲,这里给出整体的流程图,帮助大家从整体上了解一下调度策略:
本文 GC 不是重点,但 GMP 调度器组件和 GC 垃圾回收组件经常会交叉执行,因此简单了解一下即可!
源码:runtime/proc.go 2686
// 寻找一个可运行的 G 去执行 execute
// inheritTime 是否需要继承上一个 G 的调度周期
// tryWakeP 表示如果返回的不是一个普通的 G,需要尝试去唤醒 P(比如 GC 的工作 G)
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
mp := getg().m
top:
pp := mp.p.ptr()
// 要执行 GC STW
if sched.gcwaiting.Load() {
// 暂停现在的 M 为了 stopTheWorld = STW
// 当 world 重新启动,恢复 M 的运行
gcstopm()
goto top
}
...
}
在 Go 语言的运行时(runtime)中,sched.gcwaiting 是一个标志,用于表示当前是否有垃圾回收(GC)的“stop-the-world”(STW)事件正在等待发生或正在进行中。当这个标志被设置时,意味着运行时需要暂停所有的用户 Goroutines 以执行 GC 的某个阶段。具体来说,GC 的某些阶段(如标记或清理)需要确保没有用户 Goroutines 同时访问堆上的对象,因为这可能会导致不一致的状态。为了实现这一点,Go 运行时会在这些关键阶段暂停所有用户 Goroutines,这个过程被称为“stop-the-world”。在这个标志被设置期间,调度器会尝试确保所有的 M 都响应 GC 的暂停请求。一旦所有的 M 都已经暂停,GC 就可以安全地执行其需要的工作。当 GC 完成该阶段后,它会允许 M 重新开始执行用户 Goroutines,并清除 sched.gcwaiting 标志。
简要分析这段代码的逻辑:
gcstopm 源码:runtime/proc.go 2610
// 暂停现在的 M 为了 stopTheWorld = STW
// 当 world 重新启动,恢复 M 的运行
func gcstopm() {
gp := getg()
...
if gp.m.spinning {
// 如果 M 在自旋状态,设置为非自旋
gp.m.spinning = false
// OK to just drop nmspinning here,
// startTheWorld will unpark threads as necessary.
if sched.nmspinning.Add(-1) < 0 {
throw("gcstopm: negative nmspinning")
}
}
pp := releasep() // 解除 m 和 p 的关系,p 重置为 _Pidle 状态
lock(&sched.lock)
pp.status = _Pgcstop // 设置 P 状态为 _Pgcstop
sched.stopwait--
// sched.stopwait 初始值为 gomaxprocs
// 当 sched.stopwait == 0 表示 P 都被设置为 _Pgcstop
if sched.stopwait == 0 {
// 唤醒待执行的任务(比如垃圾回收)
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
stopm() // 停止当前 m 的执行,直到有新的工作可用
}
gcstopm 函数主要负责停止所有 M 的执行,从而可以唤醒 GC 释放内存,该段代码逻辑清晰,看注释应该可以看懂,这里偷个懒不解释了!
findRunnable 函数的主要任务是从各种队列中找到一个可以执行的 Goroutine,包括 GC worker、G 的可运行队列、网络轮询、通过 stealWork 窃取其他 P 的 G。
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
// Try to schedule a GC worker.
if gcBlackenEnabled != 0 {
gp, tnow := gcController.findRunnableGCWorker(pp, now)
if gp != nil {
return gp, false, true
}
now = tnow
}
...
}
调度器在寻找可运行的 Goroutine 时会优先考虑 GC 的工作:
这样做的目的是确保 GC 工作能够及时得到执行,从而保持内存的使用在一个可控的范围内。在 GC 期间,尤其是在标记阶段,运行时需要确保有足够的线程来处理 GC 任务,以避免 GC 延迟过长,从而影响程序的性能。
G 可运行队列分为两种:全局可运行队列、P 本地可运行队列;接下来的源码将介绍如何从可运行队列获取 G:
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
// 每隔一段时间检查一次全局可运行队列以确保公平性。
// 否则,两个 Goroutine 可以通过不断地互相重生来完全占用本地运行队列。
// 每隔 61 个调度时钟周期,尝试从全局运行队列中获取一个 G
if pp.schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp := globrunqget(pp, 1)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
...
// 尝试从本地运行队列中获取一个可运行的 G
if gp, inheritTime := runqget(pp); gp != nil {
return gp, inheritTime, false
}
// 如果全局运行队列非空,则尝试从全局运行队列中获取 G
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(pp, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
...
}
主要逻辑梳理:
这些步骤共同构成了 Go 调度器在查找可运行 Goroutine 时的基本策略,即优先考虑本地运行队列,同时确保全局运行队列中的 Goroutine 也能得到公平的执行机会。通过这种方式,Go 运行时能够在多核处理器上高效地调度和执行大量的并发 Goroutines。
globrunqget 源码
// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqget(pp *p, max int32) *g {
assertLockHeld(&sched.lock) // 断言锁已持有
// 如果全局运行队列的大小为 0,则直接返回 nil
if sched.runqsize == 0 {
return nil
}
// 计算要获取的 Goroutine 数量
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
// 不会超过传入的 max 参数
n = max
}
if n > int32(len(pp.runq))/2 {
// 不会超过当前处理器(P)的本地运行队列长度的一半
n = int32(len(pp.runq)) / 2
}
sched.runqsize -= n // 更新全局运行队列的大小
gp := sched.runq.pop() //从全局运行队列中获取一个 Goroutine
n--
for ; n > 0; n-- {
// 从全局运行队列中获取剩余的 Goroutines
gp1 := sched.runq.pop()
// 放入当前处理器(P)的本地运行队列中
runqput(pp, gp1, false)
}
return gp
}
globrunqget 函数用于从全局运行队列(sched.runq)中获取一批 Goroutines(G)以供执行:
runqget 源码
func runqget(pp *p) (gp *g, inheritTime bool) {
// 检查 runnext
next := pp.runnext
if next != 0 && pp.runnext.cas(next, 0) {
return next.ptr(), true
}
for {
h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
t := pp.runqtail
if t == h {
return nil, false
}
gp := pp.runq[h%uint32(len(pp.runq))].ptr()
if atomic.CasRel(&pp.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
runqget 函数用于从当前处理器(P)的本地运行队列(pp.runq)中获取一个 Goroutine(G)以供执行:
runqget 函数通过优先检查 runnext 字段,然后从本地运行队列中获取 Goroutine 的方式,实现了高效的 Goroutine 调度。这种方式可以减少不必要的竞争和锁开销,提高调度器的性能。随后使用了自旋获取操作,实现了无锁化,进而提升并发性能,具体无锁化的实现方式后续在窃取 G 小节进行分析。
Go 语言的网络轮询使用的是 epoll 多路复用网络 IO,可以参考文章 《4. IO 多路复用之 epoll 核心原理解析》。网络轮询是 Go 运行时用来检查是否有就绪的网络事件(如新的网络连接、可读/可写的网络套接字等)并执行相应的处理函数的机制。这对于实现高效的 I/O 并发尤为重要,因为它允许 Go 程序在等待网络事件时继续执行其他任务。
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
// 如果网络轮询已初始化,并且有等待的网络事件,并且上次轮询的时间不为零
if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
// 尝试非阻塞获取准备就绪的网络事件列表
if list := netpoll(0); !list.empty() { // non-blocking
// 从列表中弹出一个 G,准备调度这个 G
gp := list.pop()
// 剩余的 G 加入可运行队列,等待调度
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable) // 修改 gp 状态为可运行
...
return gp, false, false
}
}
...
}
代码逻辑如下:
1.条件检查:首先检查是否满足以下三个条件:
2.非阻塞网络轮询:如果满足上述条件,则调用 netpoll(0) 进行非阻塞的网络轮询。这里的参数 0 表示不阻塞等待网络事件,立即返回。
3.处理就绪事件:如果 netpoll 返回的列表不为空,说明有就绪的网络事件。执行以下操作:
如果网络轮询没有找到就绪的 Goroutine,或者网络轮询的条件不满足,findRunnable 函数会继续执行其他逻辑来尝试找到可运行的 Goroutine,下一个就是从其他处理器 P 窃取等。
M 自旋是指在没有可运行的 Goroutine 时,M 会继续尝试从其他 P 窃取任务,而不是立即进入睡眠状态。这有助于减少线程唤醒和调度的开销,提高系统的响应性。接下来我们就一起来看看,如何从其他 P 窃取 G:
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
// 如果 M 处于自旋状态 || 将旋转的 M 数量限制为繁忙的 P 数量的一半
if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
// 如果 M 不在自旋状态,则将其切换为自旋状态
if !mp.spinning {
mp.becomeSpinning()
}
// 尝试从其他 P 中窃取任务
gp, inheritTime, tnow, w, newWork := stealWork(now)
if gp != nil {
return gp, inheritTime, false
}
if newWork {
// 可能有定时器到期触发的 G 可执行或有 GC 工作;重启 find 即可发现。
goto top
}
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
// 等待定时器触发,设置最早的定时器触发时间
pollUntil = w
}
}
...
}
代码逻辑如下:
这段代码通过自旋和窃取工作来减少 M 的空闲时间,提高处理器的利用率。当没有可运行的 Goroutine 时,M 会继续自旋一段时间,尝试从其他 P 窃取任务,而不是立即阻塞。这有助于减少线程调度的开销,提高系统的整体性能。
stealWork 用于尝试从其他处理器(P)窃取可运行的 Goroutine(G),接下来我们详细聊一下代码细节:
stealWork 源码:runtime/proc.go 3056
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
pp := getg().m.p.ptr()
ranTimer := false // 标记是否有定时器被运行
const stealTries = 4 // 定义窃取尝试的次数
for i := 0; i < stealTries; i++ {
// 在最后一次循环时检查定时器或运行下一个 G。
stealTimersOrRunNextG := i == stealTries-1
// 遍历所有 P(使用 stealOrder 枚举)
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting.Load() {
// 如果 GC 等待中,则可能有 GC 工作可做,返回以重启 findRunnable。
return nil, false, now, pollUntil, true
}
p2 := allp[enum.position()]
if pp == p2 {
continue // 跳过当前 P
}
// 最后一次窃取循环 && P 拥有计时器
if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
// 检查定时器并运行到期的定时器
tnow, w, ran := checkTimers(p2, now)
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
// 有定时器运行,
if ran {
// P 本地可能有新 G(p2 的定时器到期执行,触发放入当前 P 队列)
if gp, inheritTime := runqget(pp); gp != nil {
return gp, inheritTime, now, pollUntil, ranTimer
}
ranTimer = true // ranTimer 会被设置为 true
}
}
// 如果 P 不空闲,尝试从其 runq 窃取 G
if !idlepMask.read(enum.position()) {
if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
return gp, false, now, pollUntil, ranTimer
}
}
}
}
// No goroutines found to steal. Regardless, running a timer may have
// made some goroutine ready that we missed. Indicate the next timer to
// wait for.
return nil, false, now, pollUntil, ranTimer
}
stealWork 函数是 Go 调度器中的一个重要部分,用于在多个 Processor(P)之间“窃取”工作,即寻找并尝试执行其他 Processor 上的可运行 Goroutines。这是 Go 调度器实现工作窃取算法的核心,有助于提高多核 CPU 的利用率和程序的总体性能。 参数介绍:
代码主要逻辑:
在窃取过程中,函数会考虑 GC 工作和定时器到期的可能性。如果有 GC 工作需要处理或有定时器到期触发了新的 G,函数会提前返回以便调度器能够及时处理这些情况。窃取算法使用了一个枚举器 stealOrder 来决定遍历 P 的顺序,这有助于减少争用和提供更好的负载均衡。同时,通过检查 idlepMask 可以避免不必要的窃取尝试,提高效率。stealWork 函数通过窃取机制来分发工作,从而提高了系统的整体吞吐量和响应性。
runqsteal 源码:runtime/proc.go 6214
func runqsteal(pp, p2 *p, stealRunNextG bool) *g {
t := pp.runqtail // 尾部索引
// 窃取 Goroutines
n := runqgrab(p2, &pp.runq, t, stealRunNextG)
if n == 0 {
return nil
}
n--
// 计算窃取到的最后一个 Goroutine 在 pp 的可运行队列中的位置
// 获取 gp
gp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr()
if n == 0 {
return gp
}
h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with consumers
if t-h+n >= uint32(len(pp.runq)) {
throw("runqsteal: runq overflow")
}
// 更新 pp 的可运行队列的尾部索引
atomic.StoreRel(&pp.runqtail, t+n) // store-release, makes the item available for consumption
return gp
}
runqsteal 函数用于从一个 Processor(p2)的本地可运行队列中窃取一半的 Goroutines,并将它们放到另一个 Processor(pp)的本地可运行队列中。这种窃取机制有助于在多个 Processor 之间平衡工作负载,从而提高多核 CPU 的利用率。
参数和返回值:
主要逻辑:
这里有个点需要强调一下:runqsteal 函数中的操作涉及到处理器之间的数据竞争和同步问题,因此使用了原子操作来确保数据的一致性和顺序性。例如,atomic.LoadAcq 和 atomic.StoreRel 分别用于执行带获取语义的加载操作和带释放语义的存储操作,以确保在窃取 Goroutines 的过程中,PP 数据的一致性。
runqgrab 函数的作用是从运行队列中“抓取”一些 Goroutine,并放入一个批量处理队列中。这个函数主要用于负载均衡和并发控制。
func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&pp.runqtail) // load-acquire, synchronize with the producer
n := t - h // 计算运行队列中的 Goroutine 数量
n = n - n/2 // 取一半 G,这是偷取策略
if n == 0 {
// 本地队列没有可偷取的 G
if stealRunNextG {
// Try to steal from pp.runnext.尝试偷取 pp.runnext
if next := pp.runnext; next != 0 {
...
if !pp.runnext.cas(next, 0) {
continue
}
// 获取到 next G,插入队列头部
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
return 0
}
if n > uint32(len(pp.runq)/2) {
continue
}
for i := uint32(0); i < n; i++ {
// 窃取的 G 循环插入 batch 队列
g := pp.runq[(h+i)%uint32(len(pp.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
// 更新 pp 本地队列的头部指针,表示被窃取了 n 个
if atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}
参数:
主要逻辑:
这里有个很重要的点:并发状况的处理。通过对调度策略的分析,我们可以发现 P 从本地队列获取 G 以及被窃取,是存在并发情况的,面对并发 Go 是怎么处理的呢?
在 Go 语言的运行时系统中,为了提高并发性能,调度器通常会避免使用显式的锁机制,而是利用原子操作和内存屏障来实现无锁化操作。
通过结合原子操作和内存屏障,以及自旋重复获取的机制,Go 调度器能够在不使用显式锁的情况下实现无锁化操作,提高并发性能并确保数据的一致性和正确性。
当没有 G 可执行时,Go 调度器并没有直接让 M 放弃 CPU 执行权,进入睡眠状态,而是尽自己所能找活干,接下来我们就一起看看 M 是如何找活的吧!
如果处理器处于 GC 的标记阶段,并且有可安全扫描和标记为黑色的对象(即那些已经确定为活跃状态的对象),那么处理器应该继续执行这些标记任务,而不是立即放弃控制权。这样做的好处是,它可以在等待新工作到来的同时,继续推进 GC 的进度,从而有助于减少 GC 停顿的时间,提高整体的程序性能。
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
// 到这里,表示没有任何事情可以做
//
// 当处理器(P)在 GC 的标记阶段,且当前没有其他紧急任务需要处理时
// 如果处理器处于 GC 的标记阶段,并且有可安全扫描和标记为黑色的对象,
// 那么处理器应该继续执行这些标记任务,而不是立即放弃控制权。
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(pp) && gcController.addIdleMarkWorker() {
node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
if node != nil {
pp.gcMarkWorkerMode = gcMarkWorkerIdleMode
gp := node.gp.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
gcController.removeIdleMarkWorker()
}
...
}
我们可以看到代码中 gcController.addIdleMarkWorker,GC 会尝试增加一个 worker,因为 worker 池子里没有空闲的 worker,如果能增加成功,就可以安排 M 去执行 GC 标记工作。
当 GC 都不缺人的时候,就得考虑释放 P 了,但在释放之前,又进行了一系列的检查,为了最大限度的找活干,我们继续看看都干啥了:
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
// 放弃 P 之前要做一些检查工作
allpSnapshot := allp
idlepMaskSnapshot := idlepMask
timerpMaskSnapshot := timerpMask
// 有 GC STW || runSafePointFn 可执行,则返回 top
lock(&sched.lock)
if sched.gcwaiting.Load() || pp.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
// 全局可执行队列不为空,直接获取一批 G,放入 P 本地
// 返回第一个可执行的 G
if sched.runqsize != 0 {
gp := globrunqget(pp, 0)
unlock(&sched.lock)
return gp, false, false
}
if !mp.spinning && sched.needspinning.Load() == 1 {
// 如果 M 不在自旋状态,并且需要自旋,则切换为自旋状态
mp.becomeSpinning()
unlock(&sched.lock)
goto top
}
// releasep 解除 M 和 P 的关系,并设置 P 状态 _Pidle
if releasep() != pp {
throw("findrunnable: wrong p")
}
now = pidleput(pp, now) // P 重新加入空闲队列
unlock(&sched.lock)
...
}
代码的主要逻辑如下:
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
wasSpinning := mp.spinning
// 如果 M 还处于自旋状态,目前已解除 P
if mp.spinning {
// 重置为非自旋
mp.spinning = false
if sched.nmspinning.Add(-1) < 0 {
throw("findrunnable: negative nmspinning")
}
// Check all runqueues once again.
// 再次尝试看现在有没有能偷的工作
// 有的话返回一个空闲 P,绑定 M,并重新寻找 G
// 以便窃取工作继续执行
pp := checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
if pp != nil {
acquirep(pp)
mp.becomeSpinning()
goto top
}
// Check for idle-priority GC work again.
// 再次查看 GC 是否有工作可以执行
// 函数 checkIdleGCNoP 尝试在没有当前处理器(P)的情况下,
// 找到一个可用的处理器 P 和一个 G 处理垃圾回收工作
pp, gp := checkIdleGCNoP()
if pp != nil {
acquirep(pp)
mp.becomeSpinning()
// Run the idle worker.
pp.gcMarkWorkerMode = gcMarkWorkerIdleMode
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
// 检查定时器的创建或过期时间,更新 pollUntil
pollUntil = checkTimersNoP(allpSnapshot, timerpMaskSnapshot, pollUntil)
}
...
}
M 自旋状态指的是线程在没有工作时不断检查是否有新工作可做的状态,而非自旋状态则是线程在没有工作时进入休眠或等待状态。线程(M)从自旋状态到非自旋状态转换期间,会并发的产生新工作提交,而这段代码就是为了解决在并发环境中安全地进行这种转换,同时确保不会丢失任何新提交的工作。 工作源涉及到多个方面,包括:
当调度器发现没有可运行的 goroutine 时,它可能会选择让网络轮询器阻塞,而不是立即让出 CPU。这样做可以提高系统的响应性,因为一旦有新的网络连接、数据到达或者其他网络事件发生,网络轮询器可以迅速唤醒,并调度相关的 goroutine 进行处理。
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
// 轮询网络直到下一个计时器
// 网络轮询是否已初始化 && (是否有等待的网络事件 || 是否有一个指定的轮询超时时间)&& 上次轮询的时间戳是否为非零
if netpollinited() && (netpollWaiters.Load() > 0 || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 {
sched.pollUntil.Store(pollUntil)
if mp.p != 0 {
throw("findrunnable: netpoll with p")
}
if mp.spinning {
throw("findrunnable: netpoll with spinning")
}
// Refresh now.
now = nanotime()
// 计算轮询延迟时间
delay := int64(-1)
if pollUntil != 0 {
delay = pollUntil - now
if delay < 0 {
delay = 0
}
}
if faketime != 0 {
// 如果使用了 faketime,轮询将不会阻塞,直接进行轮询。
// When using fake time, just poll.
delay = 0
}
// delay 表示阻塞等待的时长,delay = 0 表示非阻塞调用网络轮询
list := netpoll(delay) // block until new work is available
sched.pollUntil.Store(0)
sched.lastpoll.Store(now) // 设置上一次网络轮询时间
if faketime != 0 && list.empty() {
// 使用了 fake time && 没有网络事件准备好
// 阻塞 M,等待被唤醒
stopm()
goto top // M 唤醒后,回到 top
}
lock(&sched.lock)
pp, _ := pidleget(now) // 尝试获取一个空闲的处理器(P)
unlock(&sched.lock)
if pp == nil {
// 如果没有获取到处理器,把剩余的事件列表注入到全局队列中,以供其他线程处理
injectglist(&list)
} else {
// 成功获取到一个处理器,绑定 M、P
acquirep(pp)
// 检查网络轮询返回的事件列表是否为空
if !list.empty() {
// 不为空,则处理网络事件
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
if wasSpinning {
// 之前线程是在自旋状态,它将恢复自旋状态并跳回到调度循环的顶部
mp.becomeSpinning()
}
goto top
}
} else if pollUntil != 0 && netpollinited() {
// 当轮询超时时间不为0 && 网络轮询已经初始化
// 获取调度器中 sched.pollUntil 字段
// 调度器应该阻塞网络轮询直到这个时间点
pollerPollUntil := sched.pollUntil.Load()
// 如果 sched.pollUntil 的值为 0,这通常意味着网络轮询器不应该阻塞,或者应该立即被打断
// 如果 sched.pollUntil 表示的时间点晚于 pollUntil 表示的时间点,
// 那么网络轮询器应该被打断,因为有一个更早的时间点需要被考虑。
if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
// 打断任何正在进行的网络轮询
netpollBreak()
}
}
...
}
这段代码用于处理网络轮询(netpoll)以及相关的调度操作:
这段代码用于在没有处理器可用时进行网络轮询,以处理异步网络事件。
当前面一系列检查都无法找到可执行的 G 的时候,就只能选择休眠 M,让出 CPU 了。
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
...
stopm() // 停止当前 m 的执行,直到有新的工作可用
goto top
}
stopm 源码:runtime/proc.go 2317
// 停止当前 m 的执行,直到有新的工作可用。
// 返回获取到的 P。
func stopm() {
gp := getg()
...
lock(&sched.lock)
mput(gp.m) // m 放入空闲列表 sched.midle
unlock(&sched.lock)
mPark() // 阻塞,等待唤醒
// m 被唤醒后,绑定一个 P,唤醒 m 前会提前绑定 P 到 gp.m.nextp 字段
acquirep(gp.m.nextp.ptr())
gp.m.nextp = 0 // 使用完,重置为 0
}
回到 schedule 函数的主流程,最后一步代码:execute(gp, inheritTime) 用于执行调度策略选出的 G。
源码:runtime/proc.go 2646
// Schedules gp to run on the current M.
// 如果 inheritTime 为 true,继承当前时间片,
// 否则新开启一个时间片
func execute(gp *g, inheritTime bool) {
mp := getg().m
...
mp.curg = gp // 设置 M 当前执行的 G
gp.m = mp // 绑定 G、M 关系
casgstatus(gp, _Grunnable, _Grunning) // G 设置为执行中
gp.waitsince = 0
gp.preempt = false // 初始化抢占标志
gp.stackguard0 = gp.stack.lo + _StackGuard // 初始化栈检查保护字段
if !inheritTime {
// 如果 inheritTime = false,使用新的时间片,执行 G
// 否则继承上一次调度的时间片,和 sysmon 监控线程逻辑有关
mp.p.ptr().schedtick++
}
...
gogo(&gp.sched) // 切换到 G 栈执行用户代码
}
execute 代码逻辑比较简单,不总结了,这里贴上 《14. Go调度器系列解读(一):什么是 GMP?》 文章中提到的调度流程图,希望可以帮助各位从整体理解 GMP 的核心调度逻辑。
本文是 Go 调度器系列最后一篇文章,主要是讲述 Go 调度器的调度策略,下面我们总结一下 Go 调度器策略的要点和优势:
1.优先考虑 GC 的标记工作:确保 GC 工作能够及时得到执行,从而保持内存的使用在一个可控的范围内;在标记阶段,运行时需要确保有足够的线程来处理 GC 任务,以避免 GC 延迟过长,从而影响程序的性能。
2.每隔 61 个调度时钟周期检查全局运行队列:每隔 61 个周期,如果全局运行队列(sched.runq)非空,调度器会尝试从全局队列中获取一个 Goroutine 来执行。这样做是为了防止本地运行队列被少数几个 Goroutine 长期占用,从而导致其他 Goroutine 得不到执行机会。
3.从本地运行队列中获取 G:在尝试从全局运行队列获取 Goroutine 之前,调度器会先检查当前处理器(P)的本地运行队列(pp.runq)。如果本地队列中有可运行的 Goroutine,则优先执行它们。这样做的目的是减少并发,并发挥利用程序局部性的优势。
4.再次检查全局运行队列:如果在本地运行队列中没有找到可运行的 Goroutine,并且全局运行队列非空,调度器会再次尝试从全局队列中获取 Goroutine。注意这里和第一步的区别在于,这一步不是周期性执行的,而是在本地队列为空时才会执行。此时并不是单纯的获取一个 G,而是通过负载均衡获取多个 G 到 P 的本地队列。
5.从网络轮询中获取 G:网络轮询是 Go 运行时用来检查是否有就绪的网络事件(如新的网络连接、可读/可写的网络套接字等)并执行相应的处理函数的机制。这对于实现高效的 I/O 并发尤为重要,因为它允许 Go 程序在等待网络事件时继续执行其他任务。
6.从其他 P 中窃取 G:当没有可运行的 Goroutine 时,M 会继续自旋一段时间,尝试从其他 P 窃取任务,而不是立即阻塞,这有助于减少线程调度的开销,提高系统的整体性能。窃取算法也是经过巧妙设计的,为了更好的支持 G 的调度,实现负载均衡。
4. 没有 G 可执行时,会尝试以下工作,尽力为 M 找一些事情,而不是立即让出执行权:
用一句话总结 Go 的调度策略就是:尽最大努力从各种队列中找到一个可以执行的 G,支持锁定、窃取机制,支持 GC、网络轮询、定时器等组件的并发调度,可以做到负载均衡,能够减少线程调度的开销,并提升网路 IO 系统的响应性能。
还有一点值得提一下:在 Go 语言的运行时系统中,为了提高并发性能,调度器通常会避免使用显式的锁机制,而是利用原子操作和内存屏障来实现无锁化操作。
通过结合原子操作和内存屏障,以及自旋重复获取的机制,Go 调度器能够在不使用显式锁的情况下实现无锁化操作,提高并发性能并确保数据的一致性和正确性。
阅读量:512
点赞量:0
收藏量:0