有关Go语言基于信号抢占式调度的示例分析
这篇文章将为大家详细讲解有关Go语言基于信号抢占式调度的示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
介绍
在 Go 的 1.14 版本之前抢占试调度都是基于协作的,需要自己主动的让出执行,但是这样是无法处理一些无法被抢占的边缘情况。例如:for 循环或者垃圾回收长时间占用线程,这些问题中的一部分直到 1.14 才被基于信号的抢占式调度解决。
下面我们通过一个例子来验证一下1.14 版本和 1.13 版本之间的抢占差异:
packagemain import( "fmt" "os" "runtime" "runtime/trace" "sync" ) funcmain(){ runtime.GOMAXPROCS(1) f,_:=os.Create("trace.output") deferf.Close() _=trace.Start(f) defertrace.Stop() varwgsync.WaitGroup fori:=0;i<30;i++{ wg.Add(1) gofunc(){ deferwg.Done() t:=0 fori:=0;i<1e8;i++{ t+=2 } fmt.Println("total:",t) }() } wg.Wait() }
这个例子中会通过 go trace 来进行执行过程的调用跟踪。在代码中指定 runtime.GOMAXPROCS(1)
设置最大的可同时使用的 CPU 核数为1,只用一个 P(处理器),这样就确保是单处理器的场景。然后调用一个 for 循环开启 10 个 goroutines 来执行 func 函数,这是一个纯计算且耗时的函数,防止 goroutines 空闲让出执行。
下面我们编译程序分析 trace 输出:
$gobuild-gcflags"-N-l"main.go -N表示禁用优化 -l禁用内联 $./main
然后我们获取到 trace.output 文件后进行可视化展示:
$gotooltrace-http=":6060"./trace.output
Go1.13 trace 分析
从上面的这个图可以看出:
因为我们限定了只有一个 P,所以在 PROCS 这一栏里面只有一个 Proc0;
我们在 for 循环里面启动了 30 个 goroutines ,所以我们可以数一下 Proc0 里面的颜色框框,刚好30 个;
30 个 goroutines 在 Proc0 里面是串行执行的,一个执行完再执行另一个,没有进行抢占;
随便点击一个 goroutines 的详情栏可以看到 Wall Duration 为 0.23s 左右,表示这个 goroutines 持续执行了 0.23s,总共 10 个 goroutines 执行时间是 7s 左右;
切入调用栈 Start Stack Trace 是 main.main.func1:20,在代码上面是 func 函数执行头: go func()
;
切走调用栈 End Stack Trace 是 main.main.func1:26,在代码上是 func 函数最后执行打印:fmt.Println("total:", t)
;
从上面的 trace 分析可以知道,Go 的协作式调度对 calcSum 函数是毫无作用的,一旦执行开始,只能等执行结束。每个 goroutine 耗费了 0.23s 这么长的时间,也无法抢占它的执行权。
Go 1.14 以上 trace 分析
在 Go 1.14 之后引入了基于信号的抢占式调度,从上面的图可以看到 Proc0 这一栏中密密麻麻都是 goroutines 在切换时的调用情况,不会再出现 goroutines 一旦执行开始,只能等执行结束这种情况。
上面跑动的时间是 4s 左右这个情况可以忽略,因为我是在两台配置不同的机器上跑的(主要是我闲麻烦要找两台一样的机器)。
下面我们拉近了看一下明细情况:
通过这个明细可以看出:
这个 goroutine 运行了 0.025s 就让出执行了;
切入调用栈 Start Stack Trace 是 main.main.func1:21,和上面一样;
切走调用栈 End Stack Trace 是 runtime.asyncPreempt:50 ,这个函数是收到抢占信号时执行的函数,从这个地方也能明确的知道,被异步抢占了;
分析
抢占信号的安装
runtime/signal_unix.go
程序启动时,在runtime.sighandler
中注册 SIGURG
信号的处理函数runtime.doSigPreempt
。
initsig
funcinitsig(preinitbool){ //预初始化 if!preinit{ signalsOK=true } //遍历信号数组 fori:=uint32(0);i<_NSIG;i++{ t:=&sigtable[i] //略过信号:SIGKILL、SIGSTOP、SIGTSTP、SIGCONT、SIGTTIN、SIGTTOU ift.flags==0||t.flags&_SigDefault!=0{ continue } ... setsig(i,funcPC(sighandler)) } }
在 initsig 函数里面会遍历所有的信号量,然后调用 setsig 函数进行注册。我们可以查看 sigtable 这个全局变量看看有什么信息:
varsigtable=[...]sigTabT{ /*0*/{0,"SIGNONE:notrap"}, /*1*/{_SigNotify+_SigKill,"SIGHUP:terminallinehangup"}, /*2*/{_SigNotify+_SigKill,"SIGINT:interrupt"}, /*3*/{_SigNotify+_SigThrow,"SIGQUIT:quit"}, /*4*/{_SigThrow+_SigUnblock,"SIGILL:illegalinstruction"}, /*5*/{_SigThrow+_SigUnblock,"SIGTRAP:tracetrap"}, /*6*/{_SigNotify+_SigThrow,"SIGABRT:abort"}, /*7*/{_SigPanic+_SigUnblock,"SIGBUS:buserror"}, /*8*/{_SigPanic+_SigUnblock,"SIGFPE:floating-pointexception"}, /*9*/{0,"SIGKILL:kill"}, /*10*/{_SigNotify,"SIGUSR1:user-definedsignal1"}, /*11*/{_SigPanic+_SigUnblock,"SIGSEGV:segmentationviolation"}, /*12*/{_SigNotify,"SIGUSR2:user-definedsignal2"}, /*13*/{_SigNotify,"SIGPIPE:writetobrokenpipe"}, /*14*/{_SigNotify,"SIGALRM:alarmclock"}, /*15*/{_SigNotify+_SigKill,"SIGTERM:termination"}, /*16*/{_SigThrow+_SigUnblock,"SIGSTKFLT:stackfault"}, /*17*/{_SigNotify+_SigUnblock+_SigIgn,"SIGCHLD:childstatushaschanged"}, /*18*/{_SigNotify+_SigDefault+_SigIgn,"SIGCONT:continue"}, /*19*/{0,"SIGSTOP:stop,unblockable"}, /*20*/{_SigNotify+_SigDefault+_SigIgn,"SIGTSTP:keyboardstop"}, /*21*/{_SigNotify+_SigDefault+_SigIgn,"SIGTTIN:backgroundreadfromtty"}, /*22*/{_SigNotify+_SigDefault+_SigIgn,"SIGTTOU:backgroundwritetotty"}, /*23*/{_SigNotify+_SigIgn,"SIGURG:urgentconditiononsocket"}, /*24*/{_SigNotify,"SIGXCPU:cpulimitexceeded"}, /*25*/{_SigNotify,"SIGXFSZ:filesizelimitexceeded"}, /*26*/{_SigNotify,"SIGVTALRM:virtualalarmclock"}, /*27*/{_SigNotify+_SigUnblock,"SIGPROF:profilingalarmclock"}, /*28*/{_SigNotify+_SigIgn,"SIGWINCH:windowsizechange"}, /*29*/{_SigNotify,"SIGIO:i/onowpossible"}, /*30*/{_SigNotify,"SIGPWR:powerfailurerestart"}, /*31*/{_SigThrow,"SIGSYS:badsystemcall"}, /*32*/{_SigSetStack+_SigUnblock,"signal32"},/*SIGCANCEL;seeissue6997*/ /*33*/{_SigSetStack+_SigUnblock,"signal33"},/*SIGSETXID;seeissues3871,9400,12498*/ ... }
具体的信号含义可以看这个介绍:Unix信号 https://zh.wikipedia.org/wiki/Unix信号。需要注意的是,抢占信号在这里是 _SigNotify + _SigIgn
如下:
{_SigNotify+_SigIgn,"SIGURG:urgentconditiononsocket"}
下面我们看一下 setsig 函数,这个函数是在 runtime/os_linux.go
文件里面:
setsig
funcsetsig(iuint32,fnuintptr){ varsasigactiont sa.sa_flags=_SA_SIGINFO|_SA_ONSTACK|_SA_RESTORER|_SA_RESTART sigfillset(&sa.sa_mask) ... iffn==funcPC(sighandler){ //CGO相关 ifiscgo{ fn=funcPC(cgoSigtramp) }else{ //替换为调用sigtramp fn=funcPC(sigtramp) } } sa.sa_handler=fn sigaction(i,&sa,nil) }
这里需要注意的是,当 fn 等于 sighandler 的时候,调用的函数会被替换成 sigtramp。sigaction 函数在 Linux 下会调用系统调用函数 sys_signal 以及 sys_rt_sigaction 实现安装信号。
执行抢占信号
到了这里是信号发生的时候进行信号的处理,原本应该是在发送抢占信号之后,但是这里我先顺着安装信号往下先讲了。大家可以跳到发送抢占信号后再回来。
上面分析可以看到当 fn 等于 sighandler 的时候,调用的函数会被替换成 sigtramp,sigtramp是汇编实现,下面我们看看。
src/runtime/sys_linux_amd64.s
:
TEXTruntime·sigtramp<ABIInternal>(SB),NOSPLIT,$72 ... //Wedon'tsavemxcsrorthex87controlwordbecausesigtrampgodoesn't //modifythem. MOVQ DX,ctx-56(SP) MOVQ SI,info-64(SP) MOVQ DI,signum-72(SP) MOVQ $runtime·sigtrampgo(SB),AX CALLAX ... RET
这里会被调用说明信号已经发送响应了,runtime·sigtramp
会进行信号的处理。runtime·sigtramp
会继续调用 runtime·sigtrampgo
。
这个函数在 runtime/signal_unix.go
文件中:
sigtrampgo&sighandler
funcsigtrampgo(siguint32,info*siginfo,ctxunsafe.Pointer){ ifsigfwdgo(sig,info,ctx){ return } c:=&sigctxt{info,ctx} g:=sigFetchG(c) ... sighandler(sig,info,ctx,g) setg(g) ifsetStack{ restoreGsignalStack(&gsignalStack) } } funcsighandler(siguint32,info*siginfo,ctxtunsafe.Pointer,gp*g){ _g_:=getg() c:=&sigctxt{info,ctxt} ... //如果是一个抢占信号 ifsig==sigPreempt&&debug.asyncpreemptoff==0{ //处理抢占信号 doSigPreempt(gp,c) } ... }
sighandler 方法里面做了很多其他信号的处理工作,我们只关心抢占部分的代码,这里最终会通过 doSigPreempt 方法执行抢占。
这个函数在 runtime/signal_unix.go
文件中:
doSigPreempt
funcdoSigPreempt(gp*g,ctxt*sigctxt){ //检查此G是否要被抢占并且可以安全地抢占 ifwantAsyncPreempt(gp){ //检查是否能安全的进行抢占 ifok,newpc:=isAsyncSafePoint(gp,ctxt.sigpc(),ctxt.sigsp(),ctxt.siglr());ok{ //修改寄存器,并执行抢占调用 ctxt.pushCall(funcPC(asyncPreempt),newpc) } } //更新一下抢占相关字段 atomic.Xadd(&gp.m.preemptGen,1) atomic.Store(&gp.m.signalPending,0) }
函数会处理抢占信号,获取当前的 SP 和 PC 寄存器并调用 ctxt.pushCall
修改寄存器,并调用 runtime/preempt.go
的 asyncPreempt 函数。
//保存用户态寄存器后调用asyncPreempt2 funcasyncPreempt()
asyncPreempt 的汇编代码在 src/runtime/preempt_amd64.s
中,该函数会保存用户态寄存器后调用 runtime/preempt.go
的 asyncPreempt2 函数中:
asyncPreempt2
funcasyncPreempt2(){ gp:=getg() gp.asyncSafePoint=true //该G是否可以被抢占 ifgp.preemptStop{ mcall(preemptPark) }else{ //让G放弃当前在M上的执行权利,将G放入全局队列等待后续调度 mcall(gopreempt_m) } gp.asyncSafePoint=false }
该函数会获取当前 G ,然后判断 G 的 preemptStop 值,preemptStop 会在调用 runtime/preempt.go
的 suspendG 函数的时候将 _Grunning
状态的 Goroutine 标记成可以被抢占 gp.preemptStop = true
,表示该 G 可以被抢占。
下面我们看一下执行抢占任务会调用的 runtime/proc.go
的 preemptPark函数:
preemptPark
funcpreemptPark(gp*g){ status:=readgstatus(gp) ifstatus&^_Gscan!=_Grunning{ dumpgstatus(gp) throw("badgstatus") } gp.waitreason=waitReasonPreempted casGToPreemptScan(gp,_Grunning,_Gscan|_Gpreempted) //使当前m放弃g,让出线程 dropg() //修改当前Goroutine的状态到_Gpreempted casfrom_Gscanstatus(gp,_Gscan|_Gpreempted,_Gpreempted) //并继续执行调度 schedule() }
preemptPark 会修改当前 Goroutine 的状态到 _Gpreempted
,调用 dropg 让出线程,最后调用 schedule 函数继续执行其他 Goroutine 的任务循环调度。
gopreempt_m
gopreempt_m 方法比起抢占更像是主动让权,然后重新加入到执行队列中等待调度。
funcgopreempt_m(gp*g){ goschedImpl(gp) } funcgoschedImpl(gp*g){ status:=readgstatus(gp) ... //更新状态为_Grunnable casgstatus(gp,_Grunning,_Grunnable) //使当前m放弃g,让出线程 dropg() lock(&sched.lock) //重新加入到全局执行队列中 globrunqput(gp) unlock(&sched.lock) //并继续执行调度 schedule() }
抢占信号发送
抢占信号的发送是由 preemptM 进行的。
这个函数在runtime/signal_unix.go
文件中:
preemptM
constsigPreempt=_SIGURG funcpreemptM(mp*m){ ... ifatomic.Cas(&mp.signalPending,0,1){ //preemptM向M发送抢占请求。 //接收到该请求后,如果正在运行的G或P被标记为抢占,并且Goroutine处于异步安全点, //它将抢占Goroutine。 signalM(mp,sigPreempt) } }
preemptM 这个函数会调用 signalM 将在初始化的安装的 _SIGURG
信号发送到指定的 M 上。
使用 preemptM 发送抢占信号的地方主要有下面几个:
Go 后台监控 runtime.sysmon 检测超时发送抢占信号;
Go GC 栈扫描发送抢占信号;
Go GC STW 的时候调用 preemptall 抢占所有 P,让其暂停;
Go 后台监控执行抢占
系统监控 runtime.sysmon
会在循环中调用 runtime.retake
抢占处于运行或者系统调用中的处理器,该函数会遍历运行时的全局处理器。
系统监控通过在循环中抢占主要是为了避免 G 占用 M 的时间过长造成饥饿。
runtime.retake
主要分为两部分:
调用 preemptone 抢占当前处理器;
调用 handoffp 让出处理器的使用权;
抢占当前处理器
funcretake(nowint64)uint32{ n:=0 lock(&allpLock) //遍历allp数组 fori:=0;i<len(allp);i++{ _p_:=allp[i] if_p_==nil{ continue } pd:=&_p_.sysmontick s:=_p_.status sysretake:=false ifs==_Prunning||s==_Psyscall{ //调度次数 t:=int64(_p_.schedtick) ifint64(pd.schedtick)!=t{ pd.schedtick=uint32(t) //处理器上次调度时间 pd.schedwhen=now //抢占G的执行,如果上一次触发调度的时间已经过去了10ms }elseifpd.schedwhen+forcePreemptNS<=now{ preemptone(_p_) sysretake=true } } ... } unlock(&allpLock) returnuint32(n) }
这一过程会获取当前 P 的状态,如果处于 _Prunning
或者 _Psyscall
状态时,并且上一次触发调度的时间已经过去了 10ms,那么会调用 preemptone 进行抢占信号的发送,preemptone 在上面我们已经讲过了,这里就不再复述。
调用 handoffp 让出处理器的使用权
funcretake(nowint64)uint32{ n:=0 lock(&allpLock) //遍历allp数组 fori:=0;i<len(allp);i++{ _p_:=allp[i] if_p_==nil{ continue } pd:=&_p_.sysmontick s:=_p_.status sysretake:=false ... ifs==_Psyscall{ //系统调用的次数 t:=int64(_p_.syscalltick) if!sysretake&&int64(pd.syscalltick)!=t{ pd.syscalltick=uint32(t) //系统调用的时间 pd.syscallwhen=now continue } ifrunqempty(_p_)&&atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle)>0&&pd.syscallwhen+10*1000*1000>now{ continue } unlock(&allpLock) incidlelocked(-1) ifatomic.Cas(&_p_.status,s,_Pidle){ n++ _p_.syscalltick++ //让出处理器的使用权 handoffp(_p_) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) returnuint32(n) }
这一过程会判断 P 的状态如果处于 _Psyscall
状态时,会进行一个判断,有一个不满足则调用 handoffp 让出 P 的使用权:
runqempty(_p_)
:判断 P 的任务队列是否为空;
atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle)
:nmspinning 表示正在窃取 G 的数量,npidle 表示空闲 P 的数量,判断是否存在空闲 P 和正在进行调度窃取 G 的 P;
pd.syscallwhen+10*1000*1000 > now
:判断是否系统调用时间超过了 10ms ;
Go GC 栈扫描发送抢占信号
GC 相关的内容可以看这篇:《Go语言GC实现原理及源码分析 https://www.luozhiyun.com/archives/475》。Go 在 GC 时对 GC Root 进行标记的时候会扫描 G 的栈,扫描之前会调用 suspendG 挂起 G 的执行才进行扫描,扫描完毕之后再次调用 resumeG 恢复执行。
该函数在:runtime/mgcmark.go
:
markroot
funcmarkroot(gcw*gcWork,iuint32){ ... switch{ ... //扫描各个G的栈 default: //获取需要扫描的G vargp*g ifbaseStacks<=i&&i<end{ gp=allgs[i-baseStacks] }else{ throw("markroot:badindex") } ... //转交给g0进行扫描 systemstack(func(){ ... //挂起G,让对应的G停止运行 stopped:=suspendG(gp) ifstopped.dead{ gp.gcscandone=true return } ifgp.gcscandone{ throw("galreadyscanned") } //扫描g的栈 scanstack(gp,gcw) gp.gcscandone=true //恢复该G的执行 resumeG(stopped) }) } }
markroot 在扫描栈之前会切换到 G0 转交给g0进行扫描,然后调用 suspendG 会判断 G 的运行状态,如果该 G 处于 运行状态 _Grunning
,那么会设置 preemptStop 为 true 并发送抢占信号。
该函数在:runtime/preempt.go
:
suspendG
funcsuspendG(gp*g)suspendGState{ ... constyieldDelay=10*1000 varnextPreemptMint64 fori:=0;;i++{ switchs:=readgstatus(gp);s{ ... case_Grunning: ifgp.preemptStop&&gp.preempt&&gp.stackguard0==stackPreempt&&asyncM==gp.m&&atomic.Load(&asyncM.preemptGen)==asyncGen{ break } if!castogscanstatus(gp,_Grunning,_Gscanrunning){ break } //设置抢占字段 gp.preemptStop=true gp.preempt=true gp.stackguard0=stackPreempt asyncM2:=gp.m asyncGen2:=atomic.Load(&asyncM2.preemptGen) //asyncM与asyncGen标记的是循环里上次抢占的信息,用来校验不能重复抢占 needAsync:=asyncM!=asyncM2||asyncGen!=asyncGen2 asyncM=asyncM2 asyncGen=asyncGen2 casfrom_Gscanstatus(gp,_Gscanrunning,_Grunning) ifpreemptMSupported&&debug.asyncpreemptoff==0&&needAsync{ now:=nanotime() //限制抢占的频率 ifnow>=nextPreemptM{ nextPreemptM=now+yieldDelay/2 //执行抢占信号发送 preemptM(asyncM) } } } ... } }
对于 suspendG 函数我只截取出了 G 在 _Grunning
状态下的处理情况。该状态下会将 preemptStop 设置为 true,也是唯一一个地方设置为 true 的地方。preemptStop 和抢占信号的执行有关,忘记的同学可以翻到上面的 asyncPreempt2 函数中。
Go GC StopTheWorld 抢占所有 P
Go GC STW 是通过 stopTheWorldWithSema 函数来执行的,该函数在 runtime/proc.go
:
stopTheWorldWithSema
funcstopTheWorldWithSema(){ _g_:=getg() lock(&sched.lock) sched.stopwait=gomaxprocs //标记gcwaiting,调度时看见此标记会进入等待 atomic.Store(&sched.gcwaiting,1) //发送抢占信号 preemptall() //暂停当前P _g_.m.p.ptr().status=_Pgcstop//Pgcstopisonlydiagnostic. ... wait:=sched.stopwait>0 unlock(&sched.lock) ifwait{ for{ //等待100us ifnotetsleep(&sched.stopnote,100*1000){ noteclear(&sched.stopnote) break } //再次进行发送抢占信号 preemptall() } } ... }
stopTheWorldWithSema 函数会调用 preemptall 对所有的 P 发送抢占信号。
preemptall 函数的文件位置在 runtime/proc.go
:
preemptall
funcpreemptall()bool{ res:=false //遍历所有的P for_,_p_:=rangeallp{ if_p_.status!=_Prunning{ continue } //对正在运行的P发送抢占信号 ifpreemptone(_p_){ res=true } } returnres }
preemptall 调用的 preemptone 会将 P 对应的 M 中正在执行的 G 并标记为正在执行抢占;最后会调用 preemptM 向 M 发送抢占信号。
该函数的文件位置在 runtime/proc.go
:
preemptone
funcpreemptone(_p_*p)bool{ //获取P对应的M mp:=_p_.m.ptr() ifmp==nil||mp==getg().m{ returnfalse } //获取M正在执行的G gp:=mp.curg ifgp==nil||gp==mp.g0{ returnfalse } //将G标记为抢占 gp.preempt=true //在栈扩张的时候会检测是否被抢占 gp.stackguard0=stackPreempt //请求该P的异步抢占 ifpreemptMSupported&&debug.asyncpreemptoff==0{ _p_.preempt=true preemptM(mp) } returntrue }
具体的逻辑:
程序启动时,在注册 _SIGURG
信号的处理函数 runtime.doSigPreempt
;
此时有一个 M1 通过 signalM 函数向 M2 发送中断信号 _SIGURG
;
M2 收到信号,操作系统中断其执行代码,并切换到信号处理函数runtime.doSigPreempt
;
M2 调用 runtime.asyncPreempt
修改执行的上下文,重新进入调度循环进而调度其他 G;
关于Go语言基于信号抢占式调度的示例分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
版权声明
本文仅代表作者观点,不代表博信信息网立场。