天天看點

協程的實作與原理剖析

C/C++Linux伺服器開發/背景架構師知識體系

前言

協程這個概念很久了,好多程式員是實作過這個元件的,網上關于協程的文章,部落格,論壇都是汗牛充棟,在知乎,github上面也有很多大牛寫了關于協程的心得體會。突發奇想,我也來實作一個這樣的元件,并測試了一下性能。借鑒了很多大牛的思想,閱讀了很多大牛的代碼。于是把整個思考過程寫下來。實作代碼

https://github.com/wangbojing/NtyCo

代碼簡單易讀,如果在你的項目中,NtyCo能夠為你解決些許工程問題,那就榮幸之至。

本系列文章的設計思路,是在每一個章的最前面以問題提出,每章節的學習目的。大家能夠帶着每章的問題來讀每章節的内容,友善讀者能夠友善的進入每章節的思考。讀者讀完以後加上案例代碼閱讀,編譯,運作,能夠對神秘的協程有一個全新的了解。能夠運用到工程代碼,幫助你更加友善高效的完成工程工作。

本系列文章僅代表本人觀點,若不有嚴謹的地方,歡迎抛轉。

第一章 協程的起源

問題:協程存在的原因?協程能夠解決哪些問題?

在我們現在CS,BS開發模式下,伺服器的吞吐量是一個很重要的參數。其實吞吐量是IO處理時間加上業務處理。為了簡單起見,比如,用戶端與伺服器之間是長連接配接的,用戶端定期給伺服器發送心跳包資料。用戶端發送一次心跳包到伺服器,伺服器更新該新用戶端狀态的。心跳包發送的過程,業務處理時長等于IO讀取(RECV系統調用)加上業務處理(更新客戶狀态)。吞吐量等于1s業務處理次數。

協程的實作與原理剖析

業務處理(更新用戶端狀态)時間,業務不一樣的,處理時間不一樣,我們就不做讨論。

那如何提升recv的性能。若隻有一個用戶端,recv的性能也沒有必要提升,也不能提升。若在有百萬計的用戶端長連接配接的情況,我們該如何提升。以Linux為例,在這裡需要介紹一個“網紅”就是epoll。伺服器使用epoll管理百萬計的用戶端長連接配接,代碼架構如下:

while (1) {
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);

    for (i = 0;i < nready;i ++) {

        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);
            
            setnonblock(connfd);

            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

        } else {
            handle(sockfd);
        }
    }
}
           

對于響應式伺服器,所有的用戶端的操作驅動都是來源于這個大循環。來源于epoll_wait的回報結果。

對于伺服器處理百萬計的IO。Handle(sockfd)實作方式有兩種。

第一種,handle(sockfd)函數内部對sockfd進行讀寫動作。代碼如下

int handle(int sockfd) {

    recv(sockfd, rbuffer, length, 0);
    
    parser_proto(rbuffer, length);

    send(sockfd, sbuffer, length, 0);
    
}

           

handle的io操作(send,recv)與epoll_wait是在同一個處理流程裡面的。這就是IO同步操作。

優點:

  1. sockfd管理友善。
  2. 操作邏輯清晰。

    缺點:

  3. 伺服器程式依賴epoll_wait的循環響應速度慢。
  4. 程式性能差

第二種,handle(sockfd)函數内部将sockfd的操作,push到線程池中,代碼如下:

nt thread_cb(int sockfd) {
    // 此函數是線上程池建立的線程中運作。
    // 與handle不在一個線程上下文中運作
    recv(sockfd, rbuffer, length, 0);
    parser_proto(rbuffer, length);
    send(sockfd, sbuffer, length, 0);
}

int handle(int sockfd) {
    //此函數在主線程 main_thread 中運作
    //在此處之前,確定線程池已經啟動。
    push_thread(sockfd, thread_cb); //将sockfd放到其他線程中運作。
}

           

Handle函數是将sockfd處理方式放到另一個已經其他的線程中運作,如此做法,将io操作(recv,send)與epoll_wait 不在一個處理流程裡面,使得io操作(recv,send)與epoll_wait實作解耦。這就叫做IO異步操作。

優點:

  1. 子子產品好規劃。
  2. 程式性能高。

    缺點:

    正因為子子產品好規劃,使得子產品之間的sockfd的管理異常麻煩。每一個子線程都需要管理好sockfd,避免在IO操作的時候,sockfd出現關閉或其他異常。

上文有提到IO同步操作,程式響應慢,IO異步操作,程式響應快。

下面來對比一下IO同步操作與IO異步操作。

代碼如下:

https://github.com/wangbojing/c1000k_test/blob/master/server_mulport_epoll.c

在這份代碼的486行,#if 1, 打開的時候,為IO異步操作。關閉的時候,為IO同步操作。

協程的實作與原理剖析

接下來把我測試接入量的結果粘貼出來。

IO異步操作,每1000個連接配接接入的伺服器響應時間(900ms左右)。

協程的實作與原理剖析

IO同步操作,每1000個連接配接接入的伺服器響應時間(6500ms左右)。

協程的實作與原理剖析

IO異步操作與IO同步操作

協程的實作與原理剖析

有沒有一種方式,有異步性能,同步的代碼邏輯。來友善程式設計人員對IO操作的元件呢? 有,采用一種輕量級的協程來實作。在每次send或者recv之前進行切換,再由排程器來處理epoll_wait的流程。

就是采用了基于這樣的思考,寫了NtyCo,實作了一個IO異步操作與協程結合的元件。https://github.com/wangbojing/NtyCo,

第二章 協程的案例

問題:協程如何使用?與線程使用有何差別?

在做網絡IO程式設計的時候,有一個非常理想的情況,就是每次accept傳回的時候,就為新來的用戶端配置設定一個線程,這樣一個用戶端對應一個線程。就不會有多個線程共用一個sockfd。每請求每線程的方式,并且代碼邏輯非常易讀。但是這隻是理想,線程建立代價,排程代價就呵呵了。

先來看一下每請求每線程的代碼如下:

while(1) {
	socklen_t len = sizeof(struct sockaddr_in);
    int clientfd = accept(sockfd, (struct sockaddr*)&remote, &len);

    pthread_t thread_id;
    pthread_create(&thread_id, NULL, client_cb, &clientfd);

}
           

這樣的做法,寫完放到生産環境下面,如果你的老闆不打死你,你來找我。我來幫你老闆,為民除害。

如果我們有協程,我們就可以這樣實作。參考代碼如下:

https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c

while (1) {
    socklen_t len = sizeof(struct sockaddr_in);
    int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);
        
    nty_coroutine *read_co;
    nty_coroutine_create(&read_co, server_reader, &cli_fd);

}
           

這樣的代碼是完全可以放在生成環境下面的。如果你的老闆要打死你,你來找我,我幫你把你老闆打死,為民除害。

線程的API思維來使用協程,函數調用的性能來測試協程。

NtyCo封裝出來了若幹接口,一類是協程本身的,二類是posix的異步封裝

協程API:while

  1. 協程建立
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
           
  1. 協程排程器的運作
void nty_schedule_run(void)
           

POSIX異步封裝API:

int nty_socket(int domain, int type, int protocol)
int nty_accept(int fd, struct sockaddr *addr, socklen_t *len)
int nty_recv(int fd, void *buf, int length)
int nty_send(int fd, const void *buf, int length)
int nty_close(int fd)
           

接口格式與POSIX标準的函數定義一緻。

第三章 協程的實作之工作流程

問題:協程内部是如何工作呢?

先來看一下協程伺服器案例的代碼, 代碼參考:https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c

分别讨論三個協程的比較晦澀的工作流程。第一個協程的建立;第二個IO異步操作;第三個協程子過程回調

3.1 建立協程

當我們需要異步調用的時候,我們會建立一個協程。比如accept傳回一個新的sockfd,建立一個用戶端處理的子過程。再比如需要監聽多個端口的時候,建立一個server的子過程,這樣多個端口同時工作的,是符合微服務的架構的。

建立協程的時候,進行了如何的工作?建立API如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
           

參數1:nty_coroutine **new_co,需要傳入空的協程的對象,這個對象是由内部建立的,并且在函數傳回的時候,會傳回一個内部建立的協程對象。

參數2:proc_coroutine func,協程的子過程。當協程被排程的時候,就會執行該函數。

參數3:void *arg,需要傳入到新協程中的參數。

協程不存在親屬關系,都是一緻的排程關系,接受排程器的排程。調用create API就會建立一個新協程,新協程就會加入到排程器的就緒隊列中。

建立的協程具體步驟會在《協程的實作之原語操作》來描述。

3.2 實作IO異步操作

大部分的朋友會關心IO異步操作如何實作,在send與recv調用的時候,如何實作異步操作的。

先來看一下一段代碼:

while (1) {
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);

    for (i = 0;i < nready;i ++) {

        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);
            
            setnonblock(connfd);

            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

        } else {
            
            epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
            recv(sockfd, buffer, length, 0);

            //parser_proto(buffer, length);

            send(sockfd, buffer, length, 0);
            epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, NULL);
        }
    }
}
           

在進行IO操作(recv,send)之前,先執行了 epoll_ctl的del操作,将相應的sockfd從epfd中删除掉,在執行完IO操作(recv,send)再進行epoll_ctl的add的動作。這段代碼看起來似乎好像沒有什麼作用。

如果是在多個上下文中,這樣的做法就很有意義了。能夠保證sockfd隻在一個上下文中能夠操作IO的。不會出現在多個上下文同時對一個IO進行操作的。協程的IO異步操作正式是采用此模式進行的。

把單一協程的工作與排程器的工作的劃厘清楚,先引入兩個原語操作 resume,yield會在《協程的實作之原語操作》來講解協程所有原語操作的實作,yield就是讓出運作,resume就是恢複運作。排程器與協程的上下文切換如下圖所示

協程的實作與原理剖析

在協程的上下文IO異步操作(nty_recv,nty_send)函數,步驟如下:

  1. 将sockfd 添加到epoll管理中。
  2. 進行上下文環境切換,由協程上下文yield到排程器的上下文。
  3. 排程器擷取下一個協程上下文。Resume新的協程

IO異步操作的上下文切換的時序圖如下:

協程的實作與原理剖析

3.3 回調協程的子過程

在create協程後,何時回調子過程?何種方式回調子過程?

首先來回顧一下x86_64寄存器的相關知識。彙編與寄存器相關知識還會在《協程的實作之切換》繼續深入探讨的。x86_64 的寄存器有16個64位寄存器,分别是:%rax, %rbx,

%rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12, %r13, %r14, %r15。

%rax 作為函數傳回值使用的。

%rsp 棧指針寄存器,指向棧頂

%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數參數,依次對應第1參數,第2參數。。。

%rbx, %rbp, %r12, %r13, %r14, %r15 用作資料存儲,遵循調用者使用規則,換句話說,就是随便用。調用子函數之前要備份它,以防它被修改

%r10, %r11 用作資料存儲,就是使用前要先儲存原值

以NtyCo的實作為例,來分析這個過程。CPU有一個非常重要的寄存器叫做EIP,用來存儲CPU運作下一條指令的位址。我們可以把回調函數的位址存儲到EIP中,将相應的參數存儲到相應的參數寄存器中。實作子過程調用的邏輯代碼如下:

void _exec(nty_coroutine *co) {
    co->func(co->arg); //子過程的回調函數
}

void nty_coroutine_init(nty_coroutine *co) {
    //ctx 就是協程的上下文
    co->ctx.edi = (void*)co; //設定參數
    co->ctx.eip = (void*)_exec; //設定回調函數入口
    //當實作上下文切換的時候,就會執行入口函數_exec , _exec 調用子過程func
}

           

第四章 協程的實作之原語操作

問題:協程的内部原語操作有哪些?分别如何實作的?

協程的核心原語操作:create, resume, yield。協程的原語操作有create怎麼沒有exit?以NtyCo為例,協程一旦建立就不能有使用者自己銷毀,必須得以子過程執行結束,就會自動銷毀協程的上下文資料。以_exec執行入口函數傳回而銷毀協程的上下文與相關資訊。co->func(co->arg) 是子過程,若使用者需要長久運作協程,就必須要在func函數裡面寫入循環等操作。是以NtyCo裡面沒有實作exit的原語操作。

create:建立一個協程。

  1. 排程器是否存在,不存在也建立。排程器作為全局的單例。将排程器的執行個體存儲線上程的私有空間pthread_setspecific。
  2. 配置設定一個coroutine的記憶體空間,分别設定coroutine的資料項,棧空間,棧大小,初始狀态,建立時間,子過程回調函數,子過程的調用參數。
  3. 将新配置設定協程添加到就緒隊列 ready_queue中

實作代碼如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {

    assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
    nty_schedule *sched = nty_coroutine_get_sched();

    if (sched == NULL) {
        nty_schedule_create(0);
        
        sched = nty_coroutine_get_sched();
        if (sched == NULL) {
            printf("Failed to create scheduler\n");
            return -1;
        }
    }

    nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
    if (co == NULL) {
        printf("Failed to allocate memory for new coroutine\n");
        return -2;
    }

    //
    int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
    if (ret) {
        printf("Failed to allocate stack for new coroutine\n");
        free(co);
        return -3;
    }

    co->sched = sched;
    co->stack_size = sched->stack_size;
    co->status = BIT(NTY_COROUTINE_STATUS_NEW); //
    co->id = sched->spawned_coroutines ++;
co->func = func;

    co->fd = -1;
co->events = 0;

    co->arg = arg;
    co->birth = nty_coroutine_usec_now();
    *new_co = co;

    TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);

    return 0;
}
           

yield: 讓出CPU。

void nty_coroutine_yield(nty_coroutine *co)
           

參數:目前運作的協程執行個體

調用後該函數不會立即傳回,而是切換到最近執行resume的上下文。該函數傳回是在執行resume的時候,會有排程器統一選擇resume的,然後再次調用yield的。resume與yield是兩個可逆過程的原子操作。

resume:恢複協程的運作權

int nty_coroutine_resume(nty_coroutine *co)
           

參數:需要恢複運作的協程執行個體

調用後該函數也不會立即傳回,而是切換到運作協程執行個體的yield的位置。傳回是在等協程相應事務處理完成後,主動yield會傳回到resume的地方。

第五章 協程的實作之切換

問題:協程的上下文如何切換?切換代碼如何實作?

首先來回顧一下x86_64寄存器的相關知識。x86_64 的寄存器有16個64位寄存器,分别是:%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,

%r13, %r14, %r15。

%rax 作為函數傳回值使用的。

%rsp 棧指針寄存器,指向棧頂

%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數參數,依次對應第1參數,第2參數。。。

%rbx, %rbp, %r12, %r13, %r14, %r15 用作資料存儲,遵循調用者使用規則,換句話說,就是随便用。調用子函數之前要備份它,以防它被修改

%r10, %r11 用作資料存儲,就是使用前要先儲存原值。

上下文切換,就是将CPU的寄存器暫時儲存,再将即将運作的協程的上下文寄存器,分别mov到相對應的寄存器上。此時上下文完成切換。如下圖所示:

協程的實作與原理剖析

切換_switch函數定義:

int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
           

參數1:即将運作協程的上下文,寄存器清單

參數2:正在運作協程的上下文,寄存器清單

我們nty_cpu_ctx結構體的定義,為了相容x86,結構體項指令采用的是x86的寄存器名字命名。

typedef struct _nty_cpu_ctx {
    void *esp; //
    void *ebp;
    void *eip;
    void *edi;
    void *esi;
    void *ebx;
    void *r1;
    void *r2;
    void *r3;
    void *r4;
    void *r5;
} nty_cpu_ctx;
           

_switch傳回後,執行即将運作協程的上下文。是實作上下文的切換

_switch的實作代碼:

0: __asm__ (
1: "    .text                                  \n"
2: "       .p2align 4,,15                                   \n"
3: ".globl _switch                                          \n"
4: ".globl __switch                                         \n"
5: "_switch:                                                \n"
6: "__switch:                                               \n"
7: "       movq %rsp, 0(%rsi)      # save stack_pointer     \n"
8: "       movq %rbp, 8(%rsi)      # save frame_pointer     \n"
9: "       movq (%rsp), %rax       # save insn_pointer      \n"
10: "       movq %rax, 16(%rsi)                              \n"
11: "       movq %rbx, 24(%rsi)     # save rbx,r12-r15       \n"
12: "       movq %r12, 32(%rsi)                              \n"
13: "       movq %r13, 40(%rsi)                              \n"
14: "       movq %r14, 48(%rsi)                              \n"
15: "       movq %r15, 56(%rsi)                              \n"
16: "       movq 56(%rdi), %r15                              \n"
17: "       movq 48(%rdi), %r14                              \n"
18: "       movq 40(%rdi), %r13     # restore rbx,r12-r15    \n"
19: "       movq 32(%rdi), %r12                              \n"
20: "       movq 24(%rdi), %rbx                              \n"
21: "       movq 8(%rdi), %rbp      # restore frame_pointer  \n"
22: "       movq 0(%rdi), %rsp      # restore stack_pointer  \n"
23: "       movq 16(%rdi), %rax     # restore insn_pointer   \n"
24: "       movq %rax, (%rsp)                                \n"
25: "       ret                                              \n"
26: );
           

按照x86_64的寄存器定義,%rdi儲存第一個參數的值,即new_ctx的值,%rsi儲存第二個參數的值,即儲存cur_ctx的值。X86_64每個寄存器是64bit,8byte。

Movq %rsp, 0(%rsi) 儲存在棧指針到cur_ctx執行個體的rsp項

Movq %rbp, 8(%rsi)

Movq (%rsp), %rax #将棧頂位址裡面的值存儲到rax寄存器中。Ret後出棧,執行棧頂

Movq %rbp, 8(%rsi) #後續的指令都是用來儲存CPU的寄存器到new_ctx的每一項中

Movq 8(%rdi), %rbp #将new_ctx的值

Movq 16(%rdi), %rax #将指令指針rip的值存儲到rax中

Movq %rax, (%rsp) # 将存儲的rip值的rax寄存器指派給棧指針的位址的值。

Ret # 出棧,回到棧指針,執行rip指向的指令。

上下文環境的切換完成。

第六章 協程的實作之定義

問題:協程如何定義? 排程器如何定義?

先來一道設計題:

設計一個協程的運作體R與運作體排程器S的結構體

  1. 運作體R:包含運作狀态{就緒,睡眠,等待},運作體回調函數,回調參數,棧指針,棧大小,目前運作體
  2. 排程器S:包含執行集合{就緒,睡眠,等待}

這道設計題拆分兩個個問題,一個運作體如何高效地在多種狀态集合更換。排程器與運作體的功能界限。

6.1 運作體如何高效地在多種狀态集合更換

新建立的協程,建立完成後,加入到就緒集合,等待排程器的排程;協程在運作完成後,進行IO操作,此時IO并未準備好,進入等待狀态集合;IO準備就緒,協程開始運作,後續進行sleep操作,此時進入到睡眠狀态集合。

就緒(ready),睡眠(sleep),等待(wait)集合該采用如何資料結構來存儲?

就緒(ready)集合并不沒有設定優先級的選型,所有在協程優先級一緻,是以可以使用隊列來存儲就緒的協程,簡稱為就緒隊列(ready_queue)。

睡眠(sleep)集合需要按照睡眠時長進行排序,采用紅黑樹來存儲,簡稱睡眠樹(sleep_tree)紅黑樹在工程實用為<key, value>, key為睡眠時長,value為對應的協程結點。

等待(wait)集合,其功能是在等待IO準備就緒,等待IO也是有時長的,是以等待(wait)集合采用紅黑樹的來存儲,簡稱等待樹(wait_tree),此處借鑒nginx的設計。

資料結構如下圖所示:

協程的實作與原理剖析

Coroutine就是協程的相應屬性,status表示協程的運作狀态。sleep與wait兩顆紅黑樹,ready使用的隊列,比如某協程調用sleep函數,加入睡眠樹(sleep_tree),status |= S即可。比如某協程在等待樹(wait_tree)中,而IO準備就緒放入ready隊列中,隻需要移出等待樹(wait_tree),狀态更改status &= ~W即可。有一個前提條件就是不管何種運作狀态的協程,都在就緒隊列中,隻是同時包含有其他的運作狀态。

6.2 排程器與協程的功能界限

每一協程都需要使用的而且可能會不同屬性的,就是協程屬性。每一協程都需要的而且資料一緻的,就是排程器的屬性。比如棧大小的數值,每個協程都一樣的後不做更改可以作為排程器的屬性,如果每個協程大小不一緻,則可以作為協程的屬性。

用來管理所有協程的屬性,作為排程器的屬性。比如epoll用來管理每一個協程對應的IO,是需要作為排程器屬性。

按照前面幾章的描述,定義一個協程結構體需要多少域,我們描述了每一個協程有自己的上下文環境,需要儲存CPU的寄存器ctx;需要有子過程的回調函數func;需要有子過程回調函數的參數 arg;需要定義自己的棧空間 stack;需要有自己棧空間的大小 stack_size;需要定義協程的建立時間 birth;需要定義協程目前的運作狀态 status;需要定目前運作狀态的結點(ready_next, wait_node, sleep_node);需要定義協程id;需要定義排程器的全局對象 sched。

協程的核心結構體如下:

typedef struct _nty_coroutine {

    nty_cpu_ctx ctx;
    proc_coroutine func;
    void *arg;
    size_t stack_size;

    nty_coroutine_status status;
    nty_schedule *sched;

    uint64_t birth;
    uint64_t id;

    void *stack;

    RB_ENTRY(_nty_coroutine) sleep_node;
    RB_ENTRY(_nty_coroutine) wait_node;

    TAILQ_ENTRY(_nty_coroutine) ready_next;
    TAILQ_ENTRY(_nty_coroutine) defer_next;

} nty_coroutine;
           

排程器是管理所有協程運作的元件,協程與排程器的運作關系。

協程的實作與原理剖析

排程器的屬性,需要有儲存CPU的寄存器上下文 ctx,可以從協程運作狀态yield到排程器運作的。從協程到排程器用yield,從排程器到協程用resume

以下為協程的定義。

typedef struct _nty_coroutine_queue nty_coroutine_queue;

typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;

typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;

typedef struct _nty_schedule {
    uint64_t birth;
nty_cpu_ctx ctx;

    struct _nty_coroutine *curr_thread;
    int page_size;

    int poller_fd;
    int eventfd;
    struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
    int nevents;

    int num_new_events;

    nty_coroutine_queue ready;
    nty_coroutine_rbtree_sleep sleeping;
    nty_coroutine_rbtree_wait waiting;

} nty_schedule;
           

第七章 協程的實作之排程器

問題:協程如何被排程?

排程器的實作,有兩種方案,一種是生産者消費者模式,另一種多狀态運作。

7.1 生産者消費者模式

協程的實作與原理剖析

邏輯代碼如下:

while (1) {

        //周遊睡眠集合,将滿足條件的加入到ready
        nty_coroutine *expired = NULL;
        while ((expired = sleep_tree_expired(sched)) != ) {
            TAILQ_ADD(&sched->ready, expired);
        }

        //周遊等待集合,将滿足添加的加入到ready
        nty_coroutine *wait = NULL;
        int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
        for (i = 0;i < nready;i ++) {
            wait = wait_tree_search(events[i].data.fd);
            TAILQ_ADD(&sched->ready, wait);
        }

        // 使用resume回複ready的協程運作權
        while (!TAILQ_EMPTY(&sched->ready)) {
            nty_coroutine *ready = TAILQ_POP(sched->ready);
            resume(ready);
        }
    }
           

7.2 多狀态運作

協程的實作與原理剖析

實作邏輯代碼如下:

while (1) {

        //周遊睡眠集合,使用resume恢複expired的協程運作權
        nty_coroutine *expired = NULL;
        while ((expired = sleep_tree_expired(sched)) != ) {
            resume(expired);
        }

        //周遊等待集合,使用resume恢複wait的協程運作權
        nty_coroutine *wait = NULL;
        int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
        for (i = 0;i < nready;i ++) {
            wait = wait_tree_search(events[i].data.fd);
            resume(wait);
        }

        // 使用resume恢複ready的協程運作權
        while (!TAILQ_EMPTY(sched->ready)) {
            nty_coroutine *ready = TAILQ_POP(sched->ready);
            resume(ready);
        }
    }
           

第八章 協程性能測試

測試環境:4台VMWare 虛拟機

1台伺服器 6G記憶體,4核CPU

3台用戶端 2G記憶體,2核CPU

作業系統:ubuntu 14.04

伺服器端測試代碼:https://github.com/wangbojing/NtyCo

用戶端測試代碼:https://github.com/wangbojing/c1000k_test/blob/master/client_mutlport_epoll.c

按照每一個連接配接啟動一個協程來測試。每一個協程棧空間 4096byte

6G記憶體 –> 測試協程數量100W無異常。并且能夠正常收發資料。

協程的實作與原理剖析
協程的實作與原理剖析

Linux伺服器開發/架構師面試題、學習資料、教學視訊和學習路線圖(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享有需要的可以自行添加學習交流群960994558

C/C++Linux伺服器開發/背景架構師系統視訊學習:https://ke.qq.com/course/417774?flowToken=1031343

繼續閱讀