本文继续分享 Go 调度器系列文章第三篇:GMP 模型调度时机。前面已经分享了什么是 GMP,以及 GMP 如何启动的知识,接下来我们聊一聊 GMP 在哪些时机会触发 goroutine 调度。在本篇文章中,你可以了解到以下内容:
本文专业术语解释:
Go 调度器系列文章(阅读前面的文章,有助于理解本文细节内容):
源码解读环境:Go 版本 1.20.7、linux 系统
本小节将从整体角度介绍 GMP 的调度时机,主要分为正常调度、主动调度、被动调度和抢占调度四种情况,如下图所示:
1.协助式抢占:针对 g 运行时间太长(一般是 10ms)的情况,retake 会设置抢占标志,随后由 g 进行扩栈检查时,根据抢占标志触发抢占调度,最终也是通过调用类似于 gosche_m 函数的方式主动放弃执行权,形成的调度;这种抢占方式有一个很明显的缺点:一个没有主动放弃执行权、且不参与任何函数调用的函数,直到执行完毕之前, 是不会被抢占的。
2.信号异步抢占:针对 g 运行时间太长(一般是 10ms)的情况,retake 会在支持异步抢占的系统内,直接发送信号给 M,M 收到信号后实施异步抢占,最终也是通过调用类似于 gosche_m 函数的方式主动放弃执行权,形成的调度;这种抢占时为了解决由密集循环导致的无法抢占的问题。
3.针对 g 长时间处于系统调用之中的情况,g 在进入系统调用时,会通过 runtime·entersyscall 函数解除 m 与 p 的绑定关系;retake 会定期检查所有 p,当满足一定条件时,会调用 handoffp 寻找新的工作线程来接管这个 p,通过抢占 p,实现抢占调度。
接下来,我们就具体聊一聊每一个调度时机的细节!
正常调度;g 顺利执行完成,并进入下一次调度循环,调度流程图如下:
文章《14. Go调度器系列解读(一):什么是 GMP?》详细讲述了 GMP 对象的创建和一个线程的正常调度流程,这里我们简单复习一下:
CALL runtime·goexit1(SB)
;其中 g.sched.pc 会指向 G 的任务函数地址,当调度到 G 时,就能执行用户代码。CALL runtime·goexit1(SB)
,CPU 从此处开始继续执行。以上便是 G 正常调度循环过程,源码请参考文章《14. Go调度器系列解读(一):什么是 GMP?》,该文章包括以下内容:
主动调度:业务程序主动调用 runtime.Gosched 函数让出 CPU 而产生的调度。
源码:src/runtime/proc.go 317
// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so execution resumes automatically.
func Gosched() {
checkTimeouts()
mcall(gosched_m) // 切换到当前 m 的 g0 栈执行 gosched_m 函数
}
// Gosched continuation on g0.
// gp 为被调度的 g,而不是 g0
func gosched_m(gp *g) {
if trace.enabled {
traceGoSched()
}
goschedImpl(gp)
}
func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
// 放弃当前 g 的运行状态
casgstatus(gp, _Grunning, _Grunnable)
// 使当前 m 放弃 g
dropg() // 设置当前 m.curg = nil, gp.m = nil
lock(&sched.lock)
globrunqput(gp) // 把 gp 放入 sched 的全局运行队列 runq
unlock(&sched.lock)
schedule() // 进入新一轮调度
}
Gosched 函数源码比较简单,当业务代码主动调用 runtime.Gosched() 函数时,会沿着函数调用链( runtime.Gosched -> mcall(gosched_m) -> gosched_m(gp *g) -> goschedImpl(gp *g) -> schedule )执行,直到开启下一次的调度循环,主要逻辑如下:
被动调度:g 执行业务代码时,因条件不满足需要等待,而发生的调度。这里我们举个简单的例子演示一下:
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan int)
go func() {
time.Sleep(2 * time.Second)
c <- 1000
}()
x := <-c
fmt.Println(x)
}
该程序启动时,main goroutine 首先会创建一个无缓存的 channel,然后启动一个新的 goroutine,2秒后向 channel 发送数据;main goroutine 等待去读取这个 channel,此时 main goroutine 会因为 channel 没有数据,而等待 2 秒,2 秒后唤醒 main goroutine,读取数据继续执行,打印读取到的数据,最后结束程序。
关于 channel 阻塞 goroutine 的源码(channel 源码解读),之前的文章详细分析过,无关代码就直接省略了,这里我们重点关注一下 channel 阻塞的逻辑:
源码:src/runtime/chan.go 457
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
lock(&c.lock) // 锁住 chan,控制并发
...
gp := getg() // 获取 g
mysg := acquireSudog() // 初始化一个 sudog 对象
...
mysg.elem = ep // mysg.elem 用于接收数据
mysg.g = gp // mysg 绑定 g
...
c.recvq.enqueue(mysg) // mysg 入队 chan 接收等待队列
...
// 切换调度协程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}
// gopark 函数的等待解锁参数
func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
...
unlock((*mutex)(chanLock)) // 解锁
return true
}
// runtime/proc.go 364
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
...
mp := acquirem() // 获取当前 m
...
mp.waitlock = lock // lock = unsafe.Pointer(&c.lock)
mp.waitunlockf = unlockf // 设置等待解锁的函数
// can't do anything that might move the G between Ms here.
mcall(park_m)
}
channel 接收源码关于阻塞接收的主要逻辑:
// park continuation on g0.
func park_m(gp *g) {
mp := getg().m
...
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := mp.waitunlockf; fn != nil {
ok := fn(gp, mp.waitlock) // 该场景下,完成 chan 的解锁 ok = true
mp.waitunlockf = nil
mp.waitlock = nil
if !ok {
...
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule() // 开启下一次调度
}
park_m 函数的主要作用是将指定的 g 结构体从运行状态(_Grunning)切换到等待状态(_Gwaiting),并释放其关联的 M 以供其他协程使用。以下是代码的详细解释:
根据以上源码,总结被动调度流程图如下(依旧和前面两种调度时机的函数链流程类似,可以看出 Go 设计者对普通的调度时机做了统一的封装和流程规划,我们也应该学习这样写代码):
虽然被动调度讲到 G 阻塞等待,内容就已经讲完了,不过我写东西,喜欢有始有终,既然有了 G 阻塞,那必然就会有 G 唤醒的过程,因此想把知识点补全,接下来我们聊一聊 G 如何被唤醒。
本章提供的 go 程序中,启动了另外一个协程,其中 c <- 1000 代码开启了 G 唤醒的过程。
源码:src/runtime/chan.go 160
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
lock(&c.lock) // 加锁
...
if sg := c.recvq.dequeue(); sg != nil {
// 出队一个等待接收的 goroutine
// 将数据发送给等待接收的 sudog
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
}
上一小节中讲到 sudog 被 chanrecv 函数塞入了 c.recvq 队列;而 chansend 函数负责往 channel 发送数据,该段代码会尝试从 c.recvq 队列出队一个等待接收的 sudog,然后利用 send 函数发送数据,并将 sudog 中的 g 唤醒,接下来我们看一看 send 函数。
// sg 表示接收方 goroutine
// ep 表示要发送的数据
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
// 接收数据的地址不为空,则拷贝数据 (sg.elem 用来接收数据)
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g // 从 sudog 获取 goroutine
unlockf() // 解锁 hchan (结合 chansend 函数加锁)
...
// 调用 goready 函数将接收方 goroutine 唤醒并标记为可运行状态
goready(gp, skip+1)
}
这里解释一下 send 函数的主要逻辑:
源码:src/runtime/proc.go 390
func goready(gp *g, traceskip int) {
// 切换系统栈,一般是 g0
systemstack(func() {
ready(gp, traceskip, true)
})
}
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
...
status := readgstatus(gp)
// 获取 m,并加锁,禁止被抢占
mp := acquirem() // disable preemption because it can be holding p in a local var
...
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable) // g 状态改变
runqput(mp.p.ptr(), gp, next) // 优先放入 p.runnext
wakep() // 尝试唤醒一个 p
releasem(mp) // 释放 m 锁
}
以上用到的函数我们都一行行分析过源码,想要深入了解的同学,可以参考这两篇文章:《channel 源码解读》 和 《14. Go调度器系列解读(一):什么是 GMP?》。
抢占调度:由于 g 运行时间太长或长时间处于系统调用之中,被调度器剥夺运行权,从而发生的调度。
为了看明白抢占的调度时机,让我们先深入了解 sysmon 函数以及 Golang 监控线程的工作内容。
sysmon 是一个核心函数,它决定了 Go 的抢占时机。这个函数不依赖于 P(处理器),可以直接绑定在 M(机器)上执行。这意味着它可以独立于特定的处理器或核来运行,这有助于实现更细粒度的任务调度。在 runtime.main 函数中会启动一个 sysmon 监控线程,该线程启动会执行 sysmon 函数(线程启动可以参考文章 《14. Go调度器系列解读(一):什么是 GMP?》 ),且永远不会返回,接下来我们看一下源码。
源码:src/runtime/proc.go 5297
func sysmon() {
lock(&sched.lock)
sched.nmsys++
checkdead() // 检查死锁
unlock(&sched.lock)
lasttrace := int64(0) // 记录上一次调度器跟踪的时间
idle := 0 // 连续多少个周期没有抢占 g
delay := uint32(0) // 要暂停的微妙数
for {
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms 最多暂停 10ms
delay = 10 * 1000
}
usleep(delay) // 暂停当前执行的线程一段时间,单位微秒
now := nanotime()
// 调试变量未开启 && (至少有一个 g 在等待被 GC || P 都是空闲的,也就是没有 g 需要执行)
if debug.schedtrace <= 0 && (sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs) {
lock(&sched.lock)
// 二次检查,确保数据一致性
if sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs {
syscallWake := false
next := timeSleepUntil() // 所有 P 中最早的定时器到期时间
if next > now {
...
// 休眠一段时间(sleep 表示时间),时间到了自己苏醒
syscallWake = notetsleep(&sched.sysmonnote, sleep)
...
// 清理休眠的数据
noteclear(&sched.sysmonnote)
}
if syscallWake {
idle = 0
delay = 20
}
}
unlock(&sched.lock)
}
lock(&sched.sysmonlock)
...
// 如果网络没有被轮询超过10毫秒,那么就会进行网络轮询。
lastpoll := sched.lastpoll.Load() // 获取最后一次轮询的时间
// 网络已经初始化 && 上次的轮询时间不是零 && 自上次轮询以来已经过去了足够的时间 10ms
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
sched.lastpoll.CompareAndSwap(lastpoll, now) // 更新最后一次轮询的时间
// 执行网络轮询,并返回一个包含可执行的 goroutine 的列表。
// 这是一个非阻塞操作,意味着它不会等待网络I/O完成。
list := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
// 如果返回的列表不是空的(即有goroutine在等待网络I/O完成)
// 减少空闲锁定的M的数量(为了模拟一个正在运行的M,防止死锁)
incidlelocked(-1)
injectglist(&list) // 注入等待的 goroutine 列表到调度器中
incidlelocked(1) // 增加空闲锁定的M的数量,恢复系统正常状态
}
}
...
// 唤醒 scavenge 垃圾回收器
if scavenger.sysmonWake.Load() != 0 {
// Kick the scavenger awake if someone requested it.
scavenger.wake()
}
// retake P's blocked in syscalls 重新获取因系统调用而被阻塞的 P
// and preempt long running G's 抢占长时间运行的 G
// 这里就是监控线程抢占调度的时机
if retake(now) != 0 {
idle = 0 // 触发 retake,则 idle 从 0 开始继续计数
} else {
idle++ // 否则 ++
}
// check if we need to force a GC 检查是否需要强制进行垃圾回收(基于时间触发的垃圾回收)
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && forcegc.idle.Load() {
lock(&forcegc.lock)
forcegc.idle.Store(false) // 将forcegc.idle设置为非空闲状态
var list gList
list.push(forcegc.g) // 将 forcegc.g 添加到 list 中, forcegc.g 在 init 中启动
injectglist(&list) // 将 Goroutine 列表注入到调度器中,以便它们能够被执行
unlock(&forcegc.lock)
}
// 跟踪和调试 Go 语言的运行时调度器
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
unlock(&sched.sysmonlock)
}
}
sysmon 是一个无限循环,始终在后台运行,执行各种监控任务。下面总结一下该线程的主要工作:
sysmon 函数和 Golang 监控线程的工作涉及到系统监控、资源管理、垃圾回收协调和线程调度等多个方面。它们共同维护着系统的健康和性能,确保 Go 程序能够高效、稳定地运行。
通过对 sysmon 函数的分析,我们可以知道,系统线程会定期通过 retake 函数对 goroutine 发起抢占,那么接下来,我们就一起来看看 retake 如何抢占 goroutine。
源码:src/runtime/proc.go 5454
func retake(now int64) uint32 {
n := 0
lock(&allpLock)
for i := 0; i < len(allp); i++ {
pp := allp[i] // 遍历所有的 p
if pp == nil {
continue
}
// 用于 sysmon 线程记录被监控 p 的系统调用时间和运行时间
pd := &pp.sysmontick
s := pp.status
sysretake := false // 是否需要进行系统抢占
if s == _Prunning || s == _Psyscall {
// 如果 P 处于运行或系统调用状态
// Preempt G if it's running for too long.
t := int64(pp.schedtick) // 获取 P 的调度时钟计数,调度一次则 +1
// 如果系统监控信息中的调度时钟与当前 P 的不一致,则更新系统监控信息
if int64(pd.schedtick) != t {
// 已经不是同一次调度时钟计数,更新监控线程信息
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
// 还处于同一次调度 && 如果距离上次调度的时间已经超过一定阈值,则设置抢占标志
preemptone(pp)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
// 系统调用前会解除 m 和 p 的关系,因此无法顺利执行 preemptone
sysretake = true
}
}
// 如果 P 处于系统调用状态
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(pp.syscalltick) // 获取 P 的系统调用时钟计数
// 未进行系统抢占 && 系统监控信息中的系统调用时钟与当前 P 的不一致
// 则更新系统监控信息
if !sysretake && int64(pd.syscalltick) != t {
// 不是同一次系统调用了,需要更新信息,等待下一轮抢占
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// 运行队列为空 && 有自旋状态的 m 或 有空闲的 p && 距离监控线程记录的系统调用的时间大于一定阈值 10ms
if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
incidlelocked(-1)
// 尝试将 P 状态从 _Psyscall 改为 _Pidle 空闲
if atomic.Cas(&pp.status, s, _Pidle) {
...
n++ // 增加系统监控 retake 的空闲 P 数量
pp.syscalltick++ // 增加 P 的系统调用时钟计数
handoffp(pp) // 寻找一新的 m 接管 p
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n) // 返回触发抢占的 P 数量
}
我们分析一下 retake 函数的主要逻辑:
根据 retake 函数的逻辑,抢占调度分为两种情况:
通过对 retake 函数的分析,我们可以知道抢占触发的时机,但依然无法了解抢占设计的整体过程,那我们就从这两种场景出发,聊一聊 Go 中的抢占逻辑的全貌!
通过上文分析,我们知道当 g 连续运行时间超过 10 ms 时,retake 会调用 preemptone 函数向该 g 发出抢占请求,其实这里并不是真正触发抢占调度的地方,而只是打上可抢占的标志,我们具体来看一下 preemptone 源码。
源码:src/runtime/proc.go 5551
func preemptone(pp *p) bool {
// 获取 p 绑定的 m,获取不到就返回 false
mp := pp.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg // 当前 g 不能是 g0
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true // 设置可抢占标志
// Goroutine 中的每次调用都会通过将当前堆栈指针与 gp->stackguard0 进行比较
// 来检查堆栈溢出。 将 gp->stackguard0 设置为 StackPreempt
// 将抢占合并到正常的堆栈溢出检查中。
gp.stackguard0 = stackPreempt
// 如果支持异步抢占并且没有禁用异步抢占(只有 windouw 支持异步)
if preemptMSupported && debug.asyncpreemptoff == 0 {
pp.preempt = true
// 信号的发送,直接向需要进行抢占的 m 发送 SIGURG 信号
// m 会根据系统信号回调异步处理抢占
preemptM(mp)
}
return true
}
preemptone 函数逻辑比较简单,涉及到两种不同的抢占方式:
这种抢占调度是通过抢占标记的方式实现的,基本逻辑是在每个函数调用的序言 (汇编:函数调用的最前方)插入抢占检测指令,当检测到当前 Goroutine 被标记为被应该被抢占时, 则主动中断执行,让出执行权利。
Go 采用的是动态扩缩栈的机制,扩缩机制也是经过演进的:
因此,为了实现动态扩缩栈,运行时需要为栈溢出做检查,而栈分段检查的代码是由编译器在预处理阶段插入的,在预处理阶段编译器会为没有被 go:nosplit 标记的函数的序言部分会插入分段检查的代码,从而在发生栈溢出的情况下, 触发 runtime.morestack_noctxt 调用。举个 main 函数的例子:
package main
func main() {
sum(1, 2)
}
func sum(a, b int) int {
return a + b
}
使用 go build -gcflags="-S -l -N" main.go 2> main.s
编译为汇编代码:
main.main STEXT size=54 args=0x0 locals=0x18 funcid=0x0 align=0x0
0x0000 00000 (main.go:3) TEXT main.main(SB), ABIInternal, $24-0
0x0000 00000 (main.go:3) CMPQ SP, 16(R14)
0x0004 00004 (main.go:3) PCDATA $0, $-2
0x0004 00004 (main.go:3) JLS 47
0x0006 00006 (main.go:3) PCDATA $0, $-1
0x0006 00006 (main.go:3) SUBQ $24, SP
0x000a 00010 (main.go:3) MOVQ BP, 16(SP)
0x000f 00015 (main.go:3) LEAQ 16(SP), BP
0x0014 00020 (main.go:3) FUNCDATA $0, gclocals·g2BeySu+wFnoycgXfElmcg==(SB)
0x0014 00020 (main.go:3) FUNCDATA $1, gclocals·g2BeySu+wFnoycgXfElmcg==(SB)
0x0014 00020 (main.go:4) MOVL $1, AX
0x0019 00025 (main.go:4) MOVL $2, BX
0x001e 00030 (main.go:4) PCDATA $1, $0
0x001e 00030 (main.go:4) NOP
0x0020 00032 (main.go:4) CALL main.sum(SB)
0x0025 00037 (main.go:5) MOVQ 16(SP), BP
0x002a 00042 (main.go:5) ADDQ $24, SP
0x002e 00046 (main.go:5) RET
0x002f 00047 (main.go:5) NOP
0x002f 00047 (main.go:3) PCDATA $1, $-1
0x002f 00047 (main.go:3) PCDATA $0, $-2
0x002f 00047 (main.go:3) CALL runtime.morestack_noctxt(SB)
0x0034 00052 (main.go:3) PCDATA $0, $-1
0x0034 00052 (main.go:3) JMP 0
从上边 main 函数的汇编源码可以看到,JLS 47
指令可以跳转到 CALL runtime.morestack_noctxt(SB)
处触发栈扩张检查,那我们来分析一下跳转条件:
CMPQ SP, 16(R14)
用于比较 SP 和 16(R14) 大小,当 SP 小于 16(R14) 时,会发生栈扩张检查,那 16(R14) 是什么呢?从抢占调度的角度来看,这种发生在函数序言部分的抢占有一个重要目的,就是能够简单且安全的记录执行现场,我们一起来看一下 morestack_noctxt 函数:
源码:src/runtime/asm_amd64.s 578
// morestack but not preserving ctxt.
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
MOVL $0, DX
JMP runtime·morestack(SB)
TEXT runtime·morestack(SB),NOSPLIT,$0-0
...
get_tls(CX) // 获取 tls
MOVQ g(CX), SI // SI = g
...
// Set g->sched to context in f.
// SP 栈顶寄存器现在指向的是 morestack_noctxt 函数的返回地址
// 保存 g.sched.gobuf 的执行现场,寄存器的值
MOVQ 0(SP), AX // f's PC
MOVQ AX, (g_sched+gobuf_pc)(SI)
LEAQ 8(SP), AX // f's SP
MOVQ AX, (g_sched+gobuf_sp)(SI)
MOVQ BP, (g_sched+gobuf_bp)(SI)
MOVQ DX, (g_sched+gobuf_ctxt)(SI)
// Call newstack on m->g0's stack.
// 切换到 g0 栈,并设置 tls 的 g 为 g0
MOVQ m_g0(BX), BX
MOVQ BX, g(CX)
MOVQ (g_sched+gobuf_sp)(BX), SP
// 执行之后 CPU 就开始使用 g0 的栈了,然后 call newstack
CALL runtime·newstack(SB)
CALL runtime·abort(SB) // crash if newstack returns
RET
morestack_noctxt 直接调用 morestack 函数,在 morestack 函数中,保存了 g 一系列执行现场,并将 SP 切换为 g0 栈,然后 call newstack 开始执行 newstack 函数。在这里可以看到,记录 g 的执行现场还是很简单的,我们继续看一下 newstack 函数。
源码:src/runtime/stack.go 964
func newstack() {
thisg := getg()
...
gp := thisg.m.curg
...
stackguard0 := atomic.Loaduintptr(&gp.stackguard0)
preempt := stackguard0 == stackPreempt
// 如果是发起的抢占请求,而非真正的栈扩张检查
if preempt {
// 如果正持有锁、分配内存或抢占被禁用,则不发生抢占
if !canPreemptM(thisg.m) {
// 不发生抢占,继续调度
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // 重新进入调度循环
}
}
...
if preempt {
...
// 如果需要对栈进行调整
if gp.preemptShrink {
// 我们正在一个同步安全点,因此等待栈收缩
gp.preemptShrink = false
shrinkstack(gp)
}
// 抢占时,过渡到 _Gpreempted 状态
if gp.preemptStop {
preemptPark(gp) // never returns
}
// 表现得像是调用了 runtime.Gosched,主动让权
gopreempt_m(gp) // never return
}
...
}
func canPreemptM(mp *m) bool {
return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}
func gopreempt_m(gp *g) {
...
goschedImpl(gp)
}
分析一下上边代码的主要逻辑:
通过对这种协作式抢占的分析也可以看出,这种抢占是保守式的抢占,优先级低于运行时,还需要函数调用协作执行,所以这种抢占方式有一个很明显的缺点:一个没有主动放弃执行权、且不参与任何函数调用的函数,直到执行完毕之前, 是不会被抢占的。因此,为了解决这个问题,Go 后续推出了基于信号的抢占方式。
现代操作系统的调度器多为抢占式调度,其实现方式是通过硬件中断来支持线程的切换,进而能安全的保存运行上下文。Go 运行时实现的抢占调度也是类似于这样原理:
Go 调度器系列文章中讨论过两种 m 的创建方式,
在 mcommoninit 函数中会调用 mpreinit 函数,最终为 M 创建一个 gsignal 协程,用于在 M 上处理信号。
源码:runtime/proc.go 811
func mcommoninit(mp *m) {
...
// 初始化 gsignal,用于处理 m 上的信号。
mpreinit(mp)
// gsignal 的运行栈边界处理
if mp.gsignal != nil {
mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
}
...
}
// 从一个父线程上进行调用(引导时为主线程),可以分配内存
func mpreinit(mp *m) {
mp.gsignal = malg(32 * 1024) // OS X 需要 >= 8K,此处创建处理 singnal 的 g
mp.gsignal.m = mp // 指定 gsignal 拥有的 m
}
在调度器的初始化的阶段 sigsave 会通过系统调用将主线程的屏蔽字保存到 m.sigmask,sigsave 执行完毕后,将 sigmask 保存到 initSigmask 这一全局变量中,用于初始化新创建的 M 的信号屏蔽字,在新创建 M 时,会调用 newm 将 M 的 sigmask 进行设置。
源码:runtime/proc.go 677
func schedinit() {
...
mcommoninit(gp.m, -1)
...
sigsave(&gp.m.sigmask)
initSigmask = gp.m.sigmask
...
}
// 源码 2184
func newm(fn func(), pp *p, id int64) {
...
mp.sigmask = initSigmask
...
}
至此,m0 和 m1 函数调用都进入了 mstart,后续的函数调用顺序则相同:mstart -> mstart0 -> mstart1 -> minit -> mstartm0(只有 m0 才调用)-> schedule。这里我们只讨论信号的注册过程,minit 会调用 minitSignalMask 函数为 M 设置信号的屏蔽字,通过 sigmask 来获得当前 M 的屏蔽字,而后通过遍历所有运行时信号表来对屏蔽字进行初始化:
源码:runtime/signal_unix.go 1240
func minitSignalMask() {
nmask := getg().m.sigmask
for i := range sigtable {
// 判断某个信号是否为不可阻止的信号,
if !blockableSig(uint32(i)) {
// 如果是不可阻止的信号,则删除对应的屏蔽字所在位
// 不可阻止,意味着无法由信号处理函数处理,需要去除
sigdelset(&nmask, i)
}
}
// 重新设置屏蔽字
sigprocmask(_SIG_SETMASK, &nmask, nil)
}
随后在 M0 上会调用 mstartm0,进而调用 initsig 初始化信号,对于一个需要设置 sighandler 的信号,会通过 setsig 来设置信号对应的处理函数 sigtramp。
源码:runtime/signal_unix.go 114
func initsig(preinit bool) {
...
for i := uint32(0); i < _NSIG; i++ {
fwdSig[i] = getsig(i) // 初始化信号处理函数 为 nil
...
// 设置信号处理函数
setsig(i, abi.FuncPCABIInternal(sighandler))
}
}
func getsig(i uint32) uintptr {
var sa usigactiont
sigaction(i, nil, &sa) // 通过系统调用设置信号处理函数
return *(*uintptr)(unsafe.Pointer(&sa.__sigaction_u))
}
func setsig(i uint32, fn uintptr) {
var sa usigactiont
sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTART
sa.sa_mask = ^uint32(0)
if fn == abi.FuncPCABIInternal(sighandler) {
if iscgo {
fn = abi.FuncPCABI0(cgoSigtramp)
} else {
fn = abi.FuncPCABI0(sigtramp)
}
}
*(*uintptr)(unsafe.Pointer(&sa.__sigaction_u)) = fn
sigaction(i, &sa, nil) // 通过系统调用设置信号处理函数
}
至此信号初始化完毕!
在 preemptone 中我们聊到使用 preemptM 主动触发信号抢占,其实原理很简单,直接向需要进行抢占的 M 发送 SIGURG 信号即可:
源码:runtime/signal_unix.go 368
const sigPreempt = _SIGURG
func preemptM(mp *m) {
...
if mp.signalPending.CompareAndSwap(0, 1) {
if GOOS == "darwin" || GOOS == "ios" {
pendingPreemptSignals.Add(1)
}
signalM(mp, sigPreempt)
}
...
}
func signalM(mp *m, sig int) {
pthread_kill(pthread(mp.procid), uint32(sig))
}
当监控线程 sysmon 向 M 线程发送 _SIGURG 信号后,M 捕获到信号,开始调用信号处理函数 sigtramp -> sigtrampgo -> sighandler:
源码:runtime/sys_linux_386.s 431
// Called using C ABI.
TEXT runtime·sigtramp(SB),NOSPLIT|TOPFRAME,$28
...
CALL runtime·sigtrampgo(SB)
...
RET
源码:runtime/signal_unix.go 608
func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) {
...
sighandler(sig, info, ctx, gp)
...
}
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
gsignal := getg()
mp := gsignal.m
c := &sigctxt{info, ctxt}
...
// 处理抢占信号
if sig == sigPreempt && debug.asyncpreemptoff == 0 && !delayedSignal {
// Might be a preemption signal.
doSigPreempt(gp, c)
}
...
}
func doSigPreempt(gp *g, ctxt *sigctxt) {
// 检查 G 是否需要被抢占、抢占是否安全
if wantAsyncPreempt(gp) {
// isAsyncSafePoint 报告指令 PC 处的 gp 是否为异步安全点
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
// 插入抢占调用,调整 PC 寄存器,让 go 运行时恢复时,
// 从 asyncPreempt 函数开始执行
ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
}
}
...
}
func (c *sigctxt) pushCall(targetPC, resumePC uintptr) {
// Make it look like we called target at resumePC.
sp := uintptr(c.rsp())
sp -= goarch.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = resumePC
c.set_rsp(uint64(sp)) // 设置 SP 寄存器
c.set_rip(uint64(targetPC)) // 设置 PC 寄存器
}
在 sighandler 信号处理函数中,判断信号是否是 sigPreempt 抢占信号,然后调用 doSigPreempt 处理异步抢占,抢占之前需要判断抢占是否安全(过程是比较复杂的),然后通过 ctxt.pushCall 更改 SP、PC 等寄存器,改变 Go 代码的执行顺序,当中断处理结束时,从 asyncPreempt 函数开始正常执行!
源码:runtime/preempt_amd64.s
TEXT ·asyncPreempt(SB),NOSPLIT|NOFRAME,$0-0
...
CALL ·asyncPreempt2(SB)
...
RET
asyncPreempt 调用了 asyncPreempt2 函数,asyncPreempt2 函数中的逻辑我们就相当熟悉了,mcall 用于切换 g 到 g0 栈,然后调用 preemptPark 或 gopreempt_m 函数继续开启下一次循环调度(源码自己看哈)。
源码:runtime/preempt.go 301
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop {
mcall(preemptPark)
} else {
mcall(gopreempt_m)
}
gp.asyncSafePoint = false
}
再次总结一下异步抢占的整体过程:
当 P 状态为 _Psyscall 时,g 已经阻塞在系统调用上,此时 sysmon 会通过 retake 函数对 P 实施抢占,这种抢占方式被称之为 handoff,本质是抢占 P,为 P 重新寻找一个 M 继续执行,原来的 M 会阻塞在系统调用中。这里就涉及三个重要逻辑步骤:
因为用户代码特权级较低,无权访问需要最高特权级才能访问的内核地址空间的代码和数据,因此用户代码想要访问内核数据,必须使用系统调用。Linux 系统调用为用户态进程提供了硬件的抽象接口,每个系统调用被赋予一个独一无二的系统调用号,当用户空间的进程执行一个系统调用时,会使用调用号指明系统调用;在 Go 中使用Syscall 函数进行系统调用。
源码:syscall/syscall_linux.go 68
func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno) {
runtime_entersyscall()
r1, r2, err = RawSyscall6(trap, a1, a2, a3, 0, 0, 0)
runtime_exitsyscall()
return
}
通过源码可以发现,系统调用执行时,在系统调用执行的前后分别调用了 runtime_entersyscall 和 runtime_exitsyscall 两个函数,这两个函数刚好负责进入系统调用前的准备工作和系统调用结束后的收尾工作,我们一起来看一下。
entersyscall 源码:runtime/proc.go 3843
func entersyscall() {
reentersyscall(getcallerpc(), getcallersp())
}
func reentersyscall(pc, sp uintptr) {
gp := getg()
gp.m.locks++
/// 设置栈警戒值为 stackPreempt,表示在 syscall 期间可以发生抢占
gp.stackguard0 = stackPreempt
gp.throwsplit = true
// Leave SP around for GC and traceback.
save(pc, sp) // 保存 pc 和 sp 到当前 G 的栈中
gp.syscallsp = sp
gp.syscallpc = pc
casgstatus(gp, _Grunning, _Gsyscall) // 将当前 G 的状态切换为 _Gsyscall
...
pp := gp.m.p.ptr() // 获取当前 G 所在的 P
pp.m = 0 // 解除当前 P 和 M 之间的关联
gp.m.oldp.set(pp) // 把 P 记录在 oldp 中,等从系统调用返回时,优先绑定这个 P
gp.m.p = 0 // 解除当前 M 和 P 之间的关联
// 修改当前 P 的状态,sysmon 线程依赖状态实施抢占
atomic.Store(&pp.status, _Psyscall)
...
gp.m.locks--
}
entersyscall 函数直接调用了 reentersyscall 函数,reentersyscall 首先把现场信息保存在当前 G 的 sched 成员中;然后解除 M 和 P 的绑定关系,这样 sysmon 线程就不需要加锁解除 M 和 P 的关系了,可以直接执行 handoffp 操作;并设置 P 的状态为_Psyscall,前面我们已经看到 sysmon监控线程需要依赖该状态实施抢占。
exitsyscall 源码:runtime/proc.go 3938
func exitsyscall() {
gp := getg()
...
oldp := gp.m.oldp.ptr() // 进入系统调用之前所绑定的 p
gp.m.oldp = 0
// 尝试获取 P
if exitsyscallfast(oldp) {
...
// 系统调用完成,增加 syscalltick 计数
gp.m.p.ptr().syscalltick++
// 重新把 g 设置成 _Grunning 状态
casgstatus(gp, _Gsyscall, _Grunning)
...
return
}
...
// 没有拿到 P,执行不了了
// 调用 exitsyscall0 处理 syscall 的退出过程
mcall(exitsyscall0)
...
}
由于在进入系统调用前,解除了 M 和 P 的关系,因此从系统调用返回,需要调用 exitsyscallfast 重新获取 P,才能继续调度执行;如果获取不到 P,则调用 mcall(exitsyscall0) 解除 M 和 G 的关系,将 G 重新放入可执行队列中,等待调度器的下一次调度。
exitsyscallfast 源码:runtime/proc.go 4022
func exitsyscallfast(oldp *p) bool {
gp := getg()
...
// 如果存在旧的 P 且旧 P 的状态为 _Psyscall,将其状态切换为 _Pidle
// 优先使用原来的 P
if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
wirep(oldp) // 绑定 m 和 p
exitsyscallfast_reacquired() // 更新系统调度计数等状态
return true
}
// Try to get any other idle P.
if sched.pidle != 0 {
var ok bool
systemstack(func() {
// exitsyscallfast_pidle 获取一个空闲的 P,并绑定到 M 上
ok = exitsyscallfast_pidle()
...
})
if ok {
return true
}
}
return false
}
快速路径 runtime.exitsyscallfast 处理流程如下:
exitsyscall0 源码:runtime/proc.go 4105
有关 mcall 源码请参考 《14. Go调度器系列解读(一):什么是 GMP?》
func exitsyscall0(gp *g) {
// 将 G 的状态从 _Gsyscall 切换为 _Grunnable
casgstatus(gp, _Gsyscall, _Grunnable)
dropg() // 释放当前 G,解除和 M 的关系
lock(&sched.lock)
var pp *p
// 如果调度器启用,尝试从空闲 P 队列中获取 P
if schedEnabled(gp) {
// 之前 M 获取不到 P,这里再尝试获取一下,万一能获取到呢
pp, _ = pidleget(0)
}
var locked bool
if pp == nil {
// 如果未获取到 P,将 G 放入全局运行队列
globrunqput(gp)
locked = gp.lockedm != 0 // 查看 g 是不是绑定了 m
}
...
unlock(&sched.lock)
if pp != nil {
// 如果获取到 P
acquirep(pp) // 绑定 M 和 P
execute(gp, false) // Never returns. 执行调度,已经有 G,直接执行
}
if locked {
// 如果 g 绑定了 m,必须在该 m 上执行
// 停止执行锁定到 g 的当前 m,直到 g 再次可运行。
// 使用 mPark() 睡眠 m,直到被唤醒
stoplockedm()
// 唤醒以后说明,有 p 绑定 m,直接运行 g 即可
execute(gp, false) // Never returns.
}
stopm() // 当前工作线程进入睡眠,等待被其它线程唤醒 mPark()
// 从睡眠中被其它线程唤醒,执行 schedule 调度循环重新开始工作
schedule() // Never returns.
}
exitsyscall0 执行逻辑如下:
sysmon 会通过 retake 函数对正处于系统调用状态的 P 实施抢占,最终调用 handoffp 函数,为 P 再寻找一个 M,重新开始执行!
handoffp 源码:runtime/proc.go 2458
func handoffp(pp *p) {
// 检查 P 的本地队列是否非空,或者全局运行队列的大小是否不为零
if !runqempty(pp) || sched.runqsize != 0 {
startm(pp, false) // 启动一个 M 来执行任务
return
}
// 如果追踪已启用或正在关闭,并且追踪读取器可用
if (trace.enabled || trace.shutdown) && traceReaderAvailable() != nil {
startm(pp, false)
return
}
// 如果垃圾回收的 blacken 模式已启用,并且存在需要标记的工作
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(pp) {
startm(pp, false)
return
}
// 检查是否有其他 M 正在自旋状态,如果没有且没有空闲的 M,则尝试将一个 M 设置为自旋状态并启动它
if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) {
sched.needspinning.Store(0)
startm(pp, true)
return
}
lock(&sched.lock)
// 检查 GC 是否正在等待
if sched.gcwaiting.Load() {
pp.status = _Pgcstop // 将 P 的状态设置为 _Pgcstop
sched.stopwait-- // 减少等待计数
// 如果等待计数为 0,说明 P 都被置为 _Pgcstop
// 可以唤醒 GC 执行了
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
// 检查 P 上是否存在需要运行的 SafePoint 函数
if pp.runSafePointFn != 0 && atomic.Cas(&pp.runSafePointFn, 1, 0) {
sched.safePointFn(pp) // 执行 SafePoint 函数
sched.safePointWait-- // 减少 SafePoint 等待计数
// 如果等待计数为 0
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
}
// 如果全局可执行队列不为空
if sched.runqsize != 0 {
unlock(&sched.lock)
startm(pp, false) // 启动一个 M 来执行任务
return
}
// 如果当前空闲的 P 数量为 gomaxprocs-1,并且上次轮询的时间不为零
if sched.npidle.Load() == gomaxprocs-1 && sched.lastpoll.Load() != 0 {
unlock(&sched.lock)
startm(pp, false)
return
}
when := nobarrierWakeTime(pp) // 计算无障碍唤醒时间
pidleput(pp, 0) // 将 P 放入空闲队列
unlock(&sched.lock)
if when != 0 {
// wakeNetPoller 唤醒在网络轮询器中休眠的线程,如果它在 when 参数之前不被唤醒;
// 或者它会唤醒一个空闲的 P 来为定时器和网络轮询器提供服务(如果还没有的话)。
wakeNetPoller(when)
}
}
handoff 会对当前的条件进行检查,如果满足下面的条件,则会调用 startm 函数,启动新的工作线程 M 来与当前的 P 进行关联,实现对 P 的接管,从而继续执行可运行的 G。
还记得文章 《14. Go调度器系列解读(一):什么是 GMP?》 中总结了 GMP 的状态变更,今天随着新知识的拓展,我们再来把状态变更图补充一下!
G 的状态变更
G 基本上状态变更就差不多了,这里稍微总结一下:
接下来就是本节内容调度时机相关的状态变化了:
P 的状态变更
本篇文章我们讲述了有关 GMP 模型调度时机的知识内容,首先我们从整体角度讲述了正常调度、主动调度、被动调度和抢占调度等四种调度时机。
正常调度、主动调度和被动调度可以简化为如下流程:
抢占调度逻辑复杂一些,由监控线程 sysmon 定时触发,分为三种情况:
阅读量:511
点赞量:0
收藏量:0