協程
什麼是協程
wikipedia 的定義:協程是一個無優先級的子程式排程元件,允許子程式在特點的地方挂起恢複。
線程包含于程序,協程包含于線程。隻要記憶體足夠,一個線程中可以有任意多個協程,但某一時刻隻能有一個協程在運作,多個協程分享該線程配置設定到的計算機資源。
為什麼需要協程
簡單引入
就實際使用了解來講,協程允許我們寫同步代碼的邏輯,卻做着異步的事,避免了回調嵌套,使得代碼邏輯清晰。code like this:
co(function*(next){
let [err,data]=yield fs.readFile("./test.txt",next);//異步讀檔案
[err]=yield fs.appendFile("./test2.txt",data,next);//異步寫檔案
//....
})()
異步 指令執行之後,結果并不立即顯現的操作稱為異步操作。及其指令執行完成并不代表操作完成。
協程是追求極限性能和優美的代碼結構的産物。
一點曆史
起初人們喜歡同步程式設計,然後發現有一堆線程因為I/O卡在那裡,并發上不去,資源嚴重浪費。
然後出了異步(select,epoll,kqueue,etc),将I/O操作交給核心線程,自己注冊一個回調函數處理最終結果。
然而項目大了之後代碼結構變得不清晰,下面是個小例子。
async_func1("hello world",func(){
async_func2("what's up?",func(){
async_func2("oh ,friend!",func(){
//todo something
})
})
})
于是發明了協程,寫同步的代碼,享受着異步帶來的性能優勢。
程式運作是需要的資源:
- cpu
- 記憶體
- I/O (檔案、網絡,磁盤(記憶體通路不在一個層級,忽略不計))
協程的實作原理(c++和node.js裡面的實作)
libco 一個C++協程庫實作
libco 是騰訊開源的一個C++協程庫,作為微信背景的基礎庫,經受住了實際的檢驗。項目位址:https://github.com/Tencent/libco
個人源碼閱讀項目:https://github.com/yyrdl/libco-code-study (未完結)
libco源代碼檔案一共11個,其中一個是彙編代碼,其餘是C++,閱讀起來相對較容易。
在C++裡面實作協程要解決的問題有如下幾個:
- 何時挂起協程?何時喚醒協程?
- 如何挂起、喚醒協程,如何保護協程運作時的上下文?
- 如何封裝異步操作?
前期知識準備
- 現代作業系統是分時作業系統,資源配置設定的基本機關是程序,CPU排程的基本機關是線程。
- C++程式運作時會有一個運作時棧,一次函數調用就會在棧上生成一個record
- 運作時記憶體空間分為全局變量區(存放函數,全局變量),棧區,堆區。棧區記憶體配置設定從高位址往低位址配置設定,堆區從低位址往高位址配置設定。
- 下一條指令位址存在于指令寄存器IP,ESP寄存值指向目前棧頂位址,EBP指向目前活動棧幀的基位址。
- 發生函數調用時操作為:将參數從右往左依次壓棧,将傳回位址壓棧,将目前EBP寄存器的值壓棧,在棧區配置設定目前函數局部變量所需的空間,表現為修改ESP寄存器的值。
- 協程的上下文包含屬于他的棧區和寄存器裡面存放的值。
何時挂起,喚醒協程?
如開始介紹時所說,協程是為了使用異步的優勢,異步操作是為了避免IO操作阻塞線程。那麼協程挂起的時刻應該是目前協程發起異步操作的時候,而喚醒應該在其他協程退出,并且他的異步操作完成時。
如何挂起、喚醒協程,如何保護協程運作時的上下文?
協程發起異步操作的時刻是該挂起協程的時刻,為了保證喚醒時能正常運作,需要正确儲存并恢複其運作時的上下文。
是以這裡的操作步驟為:
- 儲存目前協程的上下文(運作棧,傳回位址,寄存器狀态)
- 設定将要喚醒的協程的入口指令位址到IP寄存器
- 恢複将要喚醒的協程的上下文
這部分操作相應的源代碼:
.globl coctx_swap//定義該部分彙編代碼對外暴露的函數名
#if !defined( __APPLE__ )
.type coctx_swap, @function
#endif
coctx_swap:
#if defined(__i386__)
leal 4(%esp), %eax //sp R[eax]=R[esp]+4 R[eax]的值應該為coctx_swap的第一個參數在棧中的位址
movl 4(%esp), %esp // R[esp]=Mem[R[esp]+4] 将esp指向 &(curr->ctx) 目前routine 上下文的記憶體位址,ctx在堆區,現在esp應指向reg[0]
leal 32(%esp), %esp //parm a : ®s[7] + sizeof(void*) push 操作是以esp的值為基準,push一個值,則esp的值減一個機關(因為是按棧區的操作邏輯,從高位往低位配置設定位址),但ctx是在堆區,是以應将esp指向reg[7],然後從eax到-4(%eax)push
//儲存寄存器值到棧中,實際對應coctx_t->regs 數組在棧中的位置(參見coctx.h 中coctx_t的定義)
pushl %eax //esp ->parm a
pushl %ebp
pushl %esi
pushl %edi
pushl %edx
pushl %ecx
pushl %ebx
pushl -4(%eax) //将函數傳回位址壓棧,即coctx_swap 之後的指令位址,儲存傳回位址,儲存到coctx_t->regs[0]
//恢複運作目标routine時的環境(各個寄存器的值和棧狀态)
movl 4(%eax), %esp //parm b -> ®s[0] //切換esp到目标 routine ctx在棧中的起始位址,這個位址正好對應regs[0],pop一次 esp會加一個機關的值
popl %eax //ret func addr regs[0] 暫存傳回位址到 EAX
//恢複當時的寄存器狀态
popl %ebx // regs[1]
popl %ecx // regs[2]
popl %edx // regs[3]
popl %edi // regs[4]
popl %esi // regs[5]
popl %ebp // regs[6]
popl %esp // regs[7]
//将傳回位址壓棧
pushl %eax //set ret func addr
//将 eax清零
xorl %eax, %eax
//傳回,這裡傳回之後就切換到目标routine了,C++代碼中調用coctx_swap的地方之後的代碼将得不到立即執行
ret
#elif
這部分代碼隻是做了寄存器部分的操作。依賴的結構體定義,見檔案coctx.h中:
struct coctx_t
{
#if defined(__i386__)
void *regs[ 8 ];//32位機,依次為:ret,ebx,ecx,edx,edi,esi,ebp,eax
#else
void *regs[ 14 ];//64位機的情況
#endif
size_t ss_size;//空間大小
char *ss_sp;//ESP
};
調用coctx_swap 函數隻在檔案co_routine.cpp中的co_swap函數。
儲存運作棧的操作見co_swap函數中調用coctx_swap之前的部分。具體步驟為取目前棧頂位址 (代碼:
char c; esp=&c
),若不是共享棧模型則清理下env,若是則判斷共享棧區有沒有被占用,被占用則從堆區申請記憶體儲存,然後再配置設定共享棧。
需要注意的是,libco運作時的棧區不在是傳統意義上的棧區,其空間實際來自于堆區。
如何封裝異步操作?
這部分代碼見:
- co_hook_sys_call.cpp
- co_routine.cpp
- co_epoll.cpp
- co_epoll.h
核心思想是hook系統本來的I/O接口,比如
socket()
函數,和epoll(kqueue)結合,采用一個
co_eventloop
來統一管理,當發現一個協程發起異步操作時,就将其挂起放入等待隊列,喚醒其他異步操作已經完成的協程。可以聯系libevent裡面的event_loop,差別在在于一個是操作棧區和寄存器恢複協程,一個是調用綁定的回調函數。
node.js裡面協程
node.js 的優勢:
- node.js天生異步(下面是libuv)
- javascript的閉包特性完成了上下文的儲存工作
需要我們做的:
- 實作同步程式設計
附上 文章開始時的代碼:
const fs=require("fs");
const co=require("zco");
co(function*(next){
let [err,data]=yield fs.readFile("./test.txt",next);//異步讀檔案
[err]=yield fs.appendFile("./test2.txt",data,next);//異步寫檔案
//....
})()
JS 中的Generator
Generator是一個疊代器生成器,也是node.js中實作協程的關鍵。
let gen=function *() {
console.log("ok1");
var a=yield 1;
console.log("a:"+a);
var b=yield 2;
console.log("b:"+b);
}
var iterator=gen();
console.log("ok2");
console.log(iterator.next(100));
console.log(iterator.next(101));
console.log(iterator.next(102));
ok2
ok1
{ value: 1, done: false }
a:101
{ value: 2, done: false }
b:102
{ value: undefined, done: true }
/**
* Created by yyrdl on 2017/3/14.
*/
var slice = Array.prototype.slice;
var co = function (gen) {
var iterator,
callback = null,
hasReturn = false;
var _end = function (e, v) {
callback && callback(e, v); //I shoudn't catch the error throwed by user's callback
if(callback==null&&e){//the error should be throwed if no handler instead of catching silently
throw e;
}
}
var run=function(arg){
try {
var v = iterator.next(arg);
hasReturn = true;
v.done && _end(undefined, v.value);
} catch (e) {
_end(e);
}
}
var nextSlave = function (arg) {
hasReturn = false;
run(arg);
}
var next = function () {
var arg = slice.call(arguments);
if (!hasReturn) {//support fake async operation,avoid error: "Generator is already running"
setTimeout(nextSlave, 0, arg);
} else {
nextSlave(arg);
}
}
if ("[object GeneratorFunction]" === Object.prototype.toString.call(gen)) {//todo: support other Generator implements
iterator = gen(next);
} else {
throw new TypeError("the arg of co must be generator function")
}
var future = function (cb) {
if ("function" == typeof cb) {
callback = cb;
}
run();
}
return future;
}
module.exports = co;