laitimes

The Go language is analyzed from the source code based on signal preemptive scheduling

This time, it is based on signal-based preemptive scheduling.

introduce#

Prior to Go's 1.14 version, preemptive trial scheduling was based on collaboration and required to actively give up execution, but this could not handle some edge cases that could not be preempted. For example, for loops or garbage collection occupy threads for a long time, some of which were not solved by signal-based preemptive scheduling until 1.14.

Let's verify the preemption difference between versions 1.14 and 1.13 with an example:

Copypackage main

import (
	"fmt"
	"os"
	"runtime"
	"runtime/trace"
	"sync"
)

func main() {
	runtime.GOMAXPROCS(1)
	f, _ := os.Create("trace.output")
	defer f.Close()
	_ = trace.Start(f)
	defer trace.Stop()
	var wg sync.WaitGroup
	for i := 0; i < 30; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			t := 0
			for i:=0;i<1e8;i++ {
				t+=2
			}
			fmt.Println("total:", t)
		}()
	}
	wg.Wait()
}
           

In this example, go trace will be used to trace the call of the execution process. Specify the runtime. GOMAXPROCS(1) sets the maximum number of CPU cores that can be used simultaneously to 1, using only one P (processor), so as to ensure that it is a single-processor scenario. Then call a for loop to open 10 goroutines to execute func functions, which is a purely computational and time-consuming function that prevents goroutines from idling execution to execute.

Below we compile the program to analyze the trace output:

Copy$ go build -gcflags "-N -l" main.go 
-N表示禁用优化
-l禁用内联

$ ./main 
           

Then we get the trace.output file and visualize it:

Copy$ go tool trace -http=":6060" ./trace.output
           

Go1.13 trace analysis#

The Go language is analyzed from the source code based on signal preemptive scheduling

As can be seen from this diagram above:

  1. Because we limit ourselves to only one P, there is only one Proc0 in the PROCS column;
  2. We started 30 goroutines inside the for loop, so we can count the color boxes in Proc0, exactly 30;
  3. 30 goroutines are executed serially in Proc0, one executes and then executes the other, without preemption;
  4. Click on the details bar of a goroutines casually to see that the Wall Duration is about 0.23s, indicating that the goroutines have been continuously executed for 0.23s, and the execution time of a total of 10 goroutines is about 7s;
  5. Start Stack Trace is main.main.func1:20, above the code is func function execution header: go func();
  6. End Stack Trace is main.main.func1:26, and in code is func function last execution print:fmt. Println("total:", t);

From the above trace analysis, we can see that Go's cooperative scheduling has no effect on the calcSum function, and once the execution starts, it can only wait for the execution to end. Each goroutine took 0.23s of this long time, and it was impossible to preempt its execution.

Go 1.14 and above trace analysis#

The Go language is analyzed from the source code based on signal preemptive scheduling

After Go 1.14, signal-based preemptive scheduling was introduced, and you can see from the above figure that the Proc0 column is densely packed with goroutines when switching, and there will no longer be goroutines once the execution starts, only to wait for the execution to end.

The running time above is about 4s, which can be ignored, because I ran on two machines with different configurations (mainly because I had to find two identical machines).

Let's take a closer look at the details:

The Go language is analyzed from the source code based on signal preemptive scheduling

From this breakdown you can see:

  1. This goroutine ran for 0.025s and gave up execution;
  2. Start Stack Trace is main.main.func1:21, the same as above;
  3. End Stack Trace is runtime.asyncPreempt:50, this function is executed when receiving a preemption signal, and it is clearly known from this place that it is preempted asynchronously;

analyse#

Installation of preemptive signals#

runtime/signal_unix.go

When the program starts, register the handler function runtime.doSigPreempt for the SIGURG signal in runtime.sighandler.

initsig

Copyfunc initsig(preinit bool) {
	// 预初始化
	if !preinit { 
		signalsOK = true
	} 
	//遍历信号数组
	for i := uint32(0); i < _NSIG; i++ {
		t := &sigtable[i]
		//略过信号:SIGKILL、SIGSTOP、SIGTSTP、SIGCONT、SIGTTIN、SIGTTOU
		if t.flags == 0 || t.flags&_SigDefault != 0 {
			continue
		} 
		...  
		setsig(i, funcPC(sighandler))
	}
}
 

           

The initsig function will iterate through all semaphores and then call the setsig function to register. We can look at the global variable sigtable to see what information is:

Copyvar sigtable = [...]sigTabT{
	/* 0 */ {0, "SIGNONE: no trap"},
	/* 1 */ {_SigNotify + _SigKill, "SIGHUP: terminal line hangup"},
	/* 2 */ {_SigNotify + _SigKill, "SIGINT: interrupt"},
	/* 3 */ {_SigNotify + _SigThrow, "SIGQUIT: quit"},
	/* 4 */ {_SigThrow + _SigUnblock, "SIGILL: illegal instruction"},
	/* 5 */ {_SigThrow + _SigUnblock, "SIGTRAP: trace trap"},
	/* 6 */ {_SigNotify + _SigThrow, "SIGABRT: abort"},
	/* 7 */ {_SigPanic + _SigUnblock, "SIGBUS: bus error"},
	/* 8 */ {_SigPanic + _SigUnblock, "SIGFPE: floating-point exception"},
	/* 9 */ {0, "SIGKILL: kill"},
	/* 10 */ {_SigNotify, "SIGUSR1: user-defined signal 1"},
	/* 11 */ {_SigPanic + _SigUnblock, "SIGSEGV: segmentation violation"},
	/* 12 */ {_SigNotify, "SIGUSR2: user-defined signal 2"},
	/* 13 */ {_SigNotify, "SIGPIPE: write to broken pipe"},
	/* 14 */ {_SigNotify, "SIGALRM: alarm clock"},
	/* 15 */ {_SigNotify + _SigKill, "SIGTERM: termination"},
	/* 16 */ {_SigThrow + _SigUnblock, "SIGSTKFLT: stack fault"},
	/* 17 */ {_SigNotify + _SigUnblock + _SigIgn, "SIGCHLD: child status has changed"},
	/* 18 */ {_SigNotify + _SigDefault + _SigIgn, "SIGCONT: continue"},
	/* 19 */ {0, "SIGSTOP: stop, unblockable"},
	/* 20 */ {_SigNotify + _SigDefault + _SigIgn, "SIGTSTP: keyboard stop"},
	/* 21 */ {_SigNotify + _SigDefault + _SigIgn, "SIGTTIN: background read from tty"},
	/* 22 */ {_SigNotify + _SigDefault + _SigIgn, "SIGTTOU: background write to tty"},
  				 
	/* 23 */ {_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"},
	/* 24 */ {_SigNotify, "SIGXCPU: cpu limit exceeded"},
	/* 25 */ {_SigNotify, "SIGXFSZ: file size limit exceeded"},
	/* 26 */ {_SigNotify, "SIGVTALRM: virtual alarm clock"},
	/* 27 */ {_SigNotify + _SigUnblock, "SIGPROF: profiling alarm clock"},
	/* 28 */ {_SigNotify + _SigIgn, "SIGWINCH: window size change"},
	/* 29 */ {_SigNotify, "SIGIO: i/o now possible"},
	/* 30 */ {_SigNotify, "SIGPWR: power failure restart"},
	/* 31 */ {_SigThrow, "SIGSYS: bad system call"},
	/* 32 */ {_SigSetStack + _SigUnblock, "signal 32"}, /* SIGCANCEL; see issue 6997 */
	/* 33 */ {_SigSetStack + _SigUnblock, "signal 33"}, /* SIGSETXID; see issues 3871, 9400, 12498 */
	...
}
           

The specific signal meaning can be seen in this introduction: Unix signal https://zh.wikipedia.org/wiki/Unix signal. It should be noted that the preemption signal here is _SigNotify + _SigIgn as follows:

Copy{_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"}
           

Let's take a look at the setsig function, which is in the runtime/os_linux.go file:

setsig

Copyfunc setsig(i uint32, fn uintptr) {
	var sa sigactiont
	sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTORER | _SA_RESTART
	sigfillset(&sa.sa_mask)
	...
	if fn == funcPC(sighandler) {
        // CGO 相关
		if iscgo {
			fn = funcPC(cgoSigtramp)
		} else {
            // 替换为调用 sigtramp
			fn = funcPC(sigtramp)
		}
	}
	sa.sa_handler = fn
	sigaction(i, &sa, nil)
}
           

It should be noted here that when fn equals sighandler, the called function is replaced with sigtramp. The sigaction function calls the system call functions sys_signal and sys_rt_sigaction under Linux to implement the installation signal.

Perform preemption signals#

When the signal is here, the signal is processed when it occurs, which should have been after sending the preemption signal, but here I will first talk about the installation signal first. You can jump to send a preemption signal and come back.

The above analysis can see that when fn is equal to sighandler, the called function will be replaced with sigtramp, which is the assembly implementation, let's take a look.

src/runtime/sys_linux_amd64.s:

CopyTEXT runtime·sigtramp<ABIInternal>(SB),NOSPLIT,$72
	...
	// We don't save mxcsr or the x87 control word because sigtrampgo doesn't
	// modify them.

	MOVQ	DX, ctx-56(SP)
	MOVQ	SI, info-64(SP)
	MOVQ	DI, signum-72(SP)
	MOVQ	$runtime·sigtrampgo(SB), AX
	CALL AX

	...
	RET
           

This will be called to indicate that the signal has sent a response, and runtime·sigtramp will process the signal. Runtime·Sigtramp continues to call Runtime·SigtrampGo.

This function is in the runtime/signal_unix.go file:

sigtrampgo&sighandler

Copyfunc sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) {
	if sigfwdgo(sig, info, ctx) {
		return
	}
	c := &sigctxt{info, ctx}
	g := sigFetchG(c)
	... 
	sighandler(sig, info, ctx, g)
	setg(g)
	if setStack {
		restoreGsignalStack(&gsignalStack)
	}
}


func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
	_g_ := getg()
	c := &sigctxt{info, ctxt}
	... 
  // 如果是一个抢占信号
	if sig == sigPreempt && debug.asyncpreemptoff == 0 { 
   		// 处理抢占信号
		doSigPreempt(gp, c) 
	}

	...
}
           

A lot of other signal processing is done in the sighandler method, and we only care about the preemption part of the code, which will eventually be performed by the doSigPreempt method.

This function is in the runtime/signal_unix.go file:

doSigPreempt

Copyfunc doSigPreempt(gp *g, ctxt *sigctxt) { 
	// 检查此 G 是否要被抢占并且可以安全地抢占
	if wantAsyncPreempt(gp) { 
		// 检查是否能安全的进行抢占
		if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
			// 修改寄存器,并执行抢占调用
			ctxt.pushCall(funcPC(asyncPreempt), newpc)
		}
	}
 
	// 更新一下抢占相关字段
	atomic.Xadd(&gp.m.preemptGen, 1)
	atomic.Store(&gp.m.signalPending, 0) 
}


           

The function processes the preemption signal, gets the current SP and PC registers and calls ctxt.pushCall to modify the register, and calls the asyncPreempt function of runtime/preempt.go.

Copy// 保存用户态寄存器后调用asyncPreempt2
func asyncPreempt()
           

The assembly code for asyncPreempt is in src/runtime/preempt_amd64.s, which saves the user-mode register and then calls the asyncPreempt2 function of runtime/preempt.go:

asyncPreempt2

Copyfunc asyncPreempt2() {
	gp := getg()
	gp.asyncSafePoint = true
	// 该 G 是否可以被抢占 
	if gp.preemptStop { 
		mcall(preemptPark)
	} else { 
    	// 让 G 放弃当前在 M 上的执行权利,将 G 放入全局队列等待后续调度
		mcall(gopreempt_m)
	}
	gp.asyncSafePoint = false
}
           

This function will get the current G and then determine the preemptStop value of G, and preemptStop will mark the Goroutine in the _Grunning state as being preemptible when calling the suspendG function of runtime/preempt.go gp.preemptStop = true, indicating that the G can be preempted.

Let's take a look at the preemptPark function of runtime/proc.go that executes the preemption task:

preemptPark

Copyfunc preemptPark(gp *g) {
	
	status := readgstatus(gp)
	if status&^_Gscan != _Grunning {
		dumpgstatus(gp)
		throw("bad g status")
	}
	gp.waitreason = waitReasonPreempted 
	casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)
    // 使当前 m 放弃 g,让出线程
	dropg()
    // 修改当前 Goroutine 的状态到 _Gpreempted
	casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted)
    // 并继续执行调度
	schedule()
}
           

PreemptPark modifies the current Goroutine state to _Gpreempted, calls dropg to release threads, and finally calls the schedule function to continue the task loop scheduling of other Goroutines.

gopreempt_m

The gopreempt_m method is more like giving up rights than preemption, and then rejoining the execution queue to wait for scheduling.

Copyfunc gopreempt_m(gp *g) { 
	goschedImpl(gp)
}

func goschedImpl(gp *g) {
	status := readgstatus(gp)
	...
  // 更新状态为 _Grunnable
	casgstatus(gp, _Grunning, _Grunnable)
  // 使当前 m 放弃 g,让出线程
	dropg()
	lock(&sched.lock)
  // 重新加入到全局执行队列中
	globrunqput(gp)
	unlock(&sched.lock)
	// 并继续执行调度
	schedule()
}
           

Preemption signal sending#

The transmission of the preemption signal is carried out by preemptM.

This function is in the runtime/signal_unix.go file:

preemptM

Copyconst sigPreempt = _SIGURG

func preemptM(mp *m) {
	...
	if atomic.Cas(&mp.signalPending, 0, 1) { 
		
		// preemptM 向 M 发送抢占请求。
		// 接收到该请求后,如果正在运行的 G 或 P 被标记为抢占,并且 Goroutine 处于异步安全点,
		// 它将抢占 Goroutine。
		signalM(mp, sigPreempt)
	}
}
           

preemptM This function calls signalM to send the _SIGURG signal of the initialized installation to the specified M.

The main places to use preemptM to send preemption signals are as follows:

  1. Go background monitoring runtime.sysmon detects timeouts and sends preemption signals;
  2. Go GC stack scanning sends preemption signals;
  3. When Go GC STW calls preemptall to preempt all P and pause it;

Go background monitoring performs preemption#

System monitoring runtime.sysmon calls runtime.retake in a loop to preempt a processor that is running or in a system call, and the function iterates through the global processor at runtime.

The main purpose of system monitoring by preempting in the loop is to avoid starvation caused by G occupying M for too long.

runtime.retake is mainly divided into two parts:

  1. Call preemptone to preempt the current processor;
  2. Call handoffp to cede access to the processor;

Preempt the current processor

Copyfunc retake(now int64) uint32 {
	n := 0
	 
	lock(&allpLock) 
	// 遍历 allp 数组
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
		if _p_ == nil { 
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false
		if s == _Prunning || s == _Psyscall {
			// 调度次数
			t := int64(_p_.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				// 处理器上次调度时间
				pd.schedwhen = now
			// 抢占 G 的执行,如果上一次触发调度的时间已经过去了 10ms
			} else if pd.schedwhen+forcePreemptNS <= now {
				preemptone(_p_)
				sysretake = true
			}
		}
		...
	}
	unlock(&allpLock)
	return uint32(n)
}
           

This process will get the current P state, if in the _Prunning or _Psyscall state, and the last time the scheduling time has passed 10ms, then preemptone will be called to send a preemptive signal, preemptone we have already talked about above, will not repeat here.

The Go language is analyzed from the source code based on signal preemptive scheduling

Call handoffp to cede access to the processor

Copyfunc retake(now int64) uint32 {
	n := 0
	lock(&allpLock) 
	// 遍历 allp 数组
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
		if _p_ == nil { 
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false
		...
		if s == _Psyscall { 
			// 系统调用的次数
			t := int64(_p_.syscalltick)
			if !sysretake && int64(pd.syscalltick) != t {
				pd.syscalltick = uint32(t)
				// 系统调用的时间
				pd.syscallwhen = now
				continue
			} 
			if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
				continue
			} 
			unlock(&allpLock) 
			incidlelocked(-1)
			if atomic.Cas(&_p_.status, s, _Pidle) { 
				n++
				_p_.syscalltick++
				// 让出处理器的使用权
				handoffp(_p_)
			}
			incidlelocked(1)
			lock(&allpLock)
		}
	}
	unlock(&allpLock)
	return uint32(n)
}
           

This process will determine the state of P If it is in the _Psyscall state, a judgment will be made, and if one is not satisfied, call handoffp to give up the right to use P:

  1. runqempty(_p_): Determines whether P's task queue is empty;
  2. atomic. Load(&sched.nmspinning)+atomic. Load(&sched.npidle): nmspinning indicates the number of G being stolen, npidle indicates the number of idle P, determines whether there is an idle P and is scheduling P stealing G;
  3. pd.syscallwhen+10*1000*1000 > now: Determine whether the system call time exceeds 10ms;

The Go GC stack scan sends a preemption signal#

GC-related content can be found here: "Go GC Implementation Principles and Source Code Analysis https://www.luozhiyun.com/archives/475". Go will scan the stack of G when marking the GC root at GC time, and will call suspendG before scanning to suspend G execution before scanning, and call resumeG again to resume execution after scanning.

The function can be found at: runtime/mgcmark.go:

markroot

Copyfunc markroot(gcw *gcWork, i uint32) { 
	...
 	switch { 
	...
	// 扫描各个 G 的栈
	default: 
		// 获取需要扫描的 G
		var gp *g
		if baseStacks <= i && i < end {
			gp = allgs[i-baseStacks]
		} else {
			throw("markroot: bad index")
		} 
		...
		// 转交给g0进行扫描
		systemstack(func() {  
			...
			// 挂起 G,让对应的 G 停止运行
			stopped := suspendG(gp)
			if stopped.dead {
				gp.gcscandone = true
				return
			}
			if gp.gcscandone {
				throw("g already scanned")
			}
			// 扫描g的栈
			scanstack(gp, gcw)
			gp.gcscandone = true
			// 恢复该 G 的执行
			resumeG(stopped) 
		})
	}
}
           

markroot will switch to G0 before scanning the stack, pass it to G0 for scanning, and then call suspendG to determine the running state of G, and if the G is in the running state _Grunning, it will set preemptStop to true and send a preemption signal.

The function is available at: runtime/preempt.go:

suspendG

Copyfunc suspendG(gp *g) suspendGState {
	...
	const yieldDelay = 10 * 1000

	var nextPreemptM int64
	for i := 0; ; i++ {
		switch s := readgstatus(gp); s { 
		... 
		case _Grunning:
			if gp.preemptStop && gp.preempt && gp.stackguard0 == stackPreempt && asyncM == gp.m && atomic.Load(&asyncM.preemptGen) == asyncGen {
				break
			}
			if !castogscanstatus(gp, _Grunning, _Gscanrunning) {
				break
			}
			// 设置抢占字段
			gp.preemptStop = true
			gp.preempt = true
			gp.stackguard0 = stackPreempt
 
			asyncM2 := gp.m
			asyncGen2 := atomic.Load(&asyncM2.preemptGen)
			// asyncM 与 asyncGen 标记的是循环里 上次抢占的信息,用来校验不能重复抢占
			needAsync := asyncM != asyncM2 || asyncGen != asyncGen2
			asyncM = asyncM2
			asyncGen = asyncGen2

			casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)
 
			if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync { 
				now := nanotime()
				// 限制抢占的频率
				if now >= nextPreemptM {
					nextPreemptM = now + yieldDelay/2
					// 执行抢占信号发送
					preemptM(asyncM)
				}
			}
		}
		...
	}
}
           

For the suspendG function, I only truncated how G is handled in the _Grunning state. This state sets preemptStop to true and is the only place where it is set to true. PreemptStop is related to the execution of preemption signals, and students who forget can turn to the asyncPreempt2 function above.

Go GC StopTheWorld grabs all P's#

Go GC STW is executed via the stopTheWorldWithSema function, which is executed at runtime/proc.go:

stopTheWorldWithSema

Copyfunc stopTheWorldWithSema() {
	_g_ := getg() 

	lock(&sched.lock)
	sched.stopwait = gomaxprocs
	// 标记 gcwaiting,调度时看见此标记会进入等待
	atomic.Store(&sched.gcwaiting, 1)
	// 发送抢占信号
	preemptall() 
	// 暂停当前 P
	_g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
	...
	wait := sched.stopwait > 0
	unlock(&sched.lock)
	if wait {
		for {
			//  等待 100 us
			if notetsleep(&sched.stopnote, 100*1000) {
				noteclear(&sched.stopnote)
				break
			}
			// 再次进行发送抢占信号
			preemptall()
		}
	}
	...
}
           

The stopTheWorldWithSema function calls preemptall to send a preemption signal to all P.

The file location of the preemptall function is at runtime/proc.go:

preemptall

Copyfunc preemptall() bool {
   res := false
   // 遍历所有的 P
   for _, _p_ := range allp {
      if _p_.status != _Prunning {
         continue
      }
      // 对正在运行的 P 发送抢占信号
      if preemptone(_p_) {
         res = true
      }
   }
   return res
}
           

The preemptone called by preemptall will mark the G in the M corresponding to P as performing preemption; Finally, preemptM is called to send a preemption signal to M.

The file location of the function is at runtime/proc.go:

preemptone

Copyfunc preemptone(_p_ *p) bool {
	// 获取 P 对应的 M
	mp := _p_.m.ptr()
	if mp == nil || mp == getg().m {
		return false
	}
	// 获取 M 正在执行的 G
	gp := mp.curg
	if gp == nil || gp == mp.g0 {
		return false
	}
	// 将 G 标记为抢占
	gp.preempt = true

	// 在栈扩张的时候会检测是否被抢占
	gp.stackguard0 = stackPreempt

	// 请求该 P 的异步抢占
	if preemptMSupported && debug.asyncpreemptoff == 0 {
		_p_.preempt = true
		preemptM(mp)
	} 
	return true
}
           
The Go language is analyzed from the source code based on signal preemptive scheduling

summary#

At this point, we have taken a complete look at the signal-based preemption scheduling process. To summarize the specific logic:

  1. When the program starts, the handler function runtime.doSigPreempt registers the _SIGURG signal;
  2. At this point, an M1 sends an interrupt signal to M2 via the signalM function _SIGURG;
  3. M2 receives a signal, the operating system interrupts its code execution, and switches to the signal processing function runtime.doSigPreempt;
  4. M2 calls runtime.asyncPreempt to modify the context of execution, re-enter the scheduling loop, and then dispatch other G;
The Go language is analyzed from the source code based on signal preemptive scheduling

Reference#

Linux User Preemption and Kernel Preemption in Detail https://blog.csdn.net/gatieme/article/details/51872618

What the sysmon background monitoring thread does https://www.bookstack.cn/read/qcrao-Go-Questions/goroutine scheduler-sysmon background monitoring thread does what .md

Go: Asynchronous Preemption https://medium.com/a-journey-with-go/go-asynchronous-preemption-b5194227371c

Unix Signal https://zh.wikipedia.org/wiki/Unix Signal

Linux signaling mechanism http://gityuan.com/2015/12/20/signal/

Anatomy of the Golang Killer trace https://juejin.cn/post/6844903887757901831

Detailed implementation of the Go language scheduling loop source code https://www.luozhiyun.com/archives/448

Signal Processing Mechanism https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/signal/#662-