天天看點

線程池

1.線程池基本原理

  在傳統伺服器結構中, 常是 有一個總的 監聽線程監聽有沒有新的使用者連接配接伺服器, 每當有一個新的 使用者進入, 伺服器就開啟一個新的線程使用者處理這 個使用者的資料包。這個線程隻服務于這個使用者 , 當 使用者與伺服器端關閉連接配接以後, 伺服器端銷毀這個線程。然而頻繁地開辟與銷毀線程極大地占用了系統的資源。而且在大量使用者的情況下, 系統為了開辟和銷毀線程将浪費大量的時間和資源。線程池提供了一個解決外部大量使用者與伺服器有限資源的沖突, 線程池和傳統的一個使用者對應一 個線程的處理方法不同, 它的基本思想就是在程式 開始時就在記憶體中開辟一些線程, 線程的數目是 固定的,他們獨自形成一個類, 屏蔽了對外的操作, 而伺服器隻需要将資料包交給線程池就可以了。當有新的客戶請求到達時 , 不是新建立一個線程為其服務 , 而是從“池子”中選擇一個空閑的線程為新的客戶請求服務 ,服務完畢後 , 線程進入空閑線程池中。如果沒有線程空閑 的 話, 就 将 數 據 包 暫 時 積 累 , 等 待 線 程 池 内 有 線 程空閑以後再進行處理。通過對多個任務重用已經存在的線程對象 , 降低了對線程對象建立和銷毀的開銷。當客戶請求 時 , 線程對象 已 經 存 在 , 可 以 提 高 請 求 的響應時間 , 進而整體地提高了系統服務的表現。

  一般來說實作一個線程池主要包括以下幾個組成部分:

1)線程管理器:用于建立并管理線程池。

2)工作線程:線程池中實際執行任務的線程。在初始化線程時會預先建立好固定數目的線程在池中,這些初始化的線程一般處于空閑狀态,一般不占用CPU,占用較小的記憶體空間。

3)任務接口:每個任務必須實作的接口,當線程池的任務隊列中有可執行任務時,被空閑的工作線程調去執行(線程的閑與忙是通過互斥量實作的,跟前面文章中的設定标志位差不多),把任務抽象出來形成接口,可以做到線程池與具體的任務無關。

下面的不在贅述百度《線程池技術在并發伺服器中的應用》寫的非常詳細!

  什麼時候需要建立線程池呢?簡單的說,如果一個應用需要頻繁的建立和銷毀線程,而任務執行的時間又非常短,這樣線程建立和銷毀的帶來的開銷就不容忽視,這時也是線程池該出場的機會了。如果線程建立和銷毀時間相比任務執行時間可以忽略不計,則沒有必要使用線程池了。

    pool_init()函數預先建立好max_thread_num個線程,每個線程執thread_routine ()函數。該函數中

while (pool->cur_queue_size == 0)

{

      pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock));

}

表示如果任務連結清單中沒有任務,則該線程出于阻塞等待狀态。否則從隊列中取出任務并執行。

    pool_add_worker()函數向線程池的任務連結清單中加入一個任務,加入後通過調用pthread_cond_signal (&(pool->queue_ready))喚醒一個出于阻塞狀态的線程(如果有的話)。

    pool_destroy ()函數用于銷毀線程池,線程池任務連結清單中的任務不會再被執行,但是正在運作的線程會一直把任務運作完後再退出。

#include <stdio.h>  

#include <stdlib.h>  

#include <unistd.h>  

#include <sys/types.h>  

#include <pthread.h>  

#include <assert.h>  

/* 

*線程池裡所有運作和等待的任務都是一個CThread_worker 

*由于所有任務都在連結清單裡,是以是一個連結清單結構 

*/  

typedef struct worker  

{  

    /*回調函數,任務運作時會調用此函數,注意也可聲明成其它形式*/  

    void *(*process) (void *arg);  

    void *arg;/*回調函數的參數*/  

    struct worker *next;  

} CThread_worker;  

/*線程池結構*/  

typedef struct  

    pthread_mutex_t queue_lock;  

    pthread_cond_t queue_ready;  

    /*連結清單結構,線程池中所有等待任務*/  

    CThread_worker *queue_head;  

    /*是否銷毀線程池*/  

    int shutdown;  

    pthread_t *threadid;  

    /*線程池中允許的活動線程數目*/  

    int max_thread_num;  

    /*目前等待隊列的任務數目*/  

    int cur_queue_size;  

} CThread_pool;  

int pool_add_worker (void *(*process) (void *arg), void *arg);  

void *thread_routine (void *arg);  

//share resource  

static CThread_pool *pool = NULL;  

void  

pool_init (int max_thread_num)  

    pool = (CThread_pool *) malloc (sizeof (CThread_pool));  

    pthread_mutex_init (&(pool->queue_lock), NULL);  

    pthread_cond_init (&(pool->queue_ready), NULL);  

    pool->queue_head = NULL;  

    pool->max_thread_num = max_thread_num;  

    pool->cur_queue_size = 0;  

    pool->shutdown = 0;  

    pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));  

    int i = 0;  

    for (i = 0; i < max_thread_num; i++)  

    {   

        pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL);  

    }  

}  

/*向線程池中加入任務*/  

int  

pool_add_worker (void *(*process) (void *arg), void *arg)  

    /*構造一個新任務*/  

    CThread_worker *newworker = (CThread_worker *) malloc (sizeof (CThread_worker));  

    newworker->process = process;  

    newworker->arg = arg;  

    newworker->next = NULL;/*别忘置空*/  

    pthread_mutex_lock (&(pool->queue_lock));  

    /*将任務加入到等待隊列中*/  

    CThread_worker *member = pool->queue_head;  

    if (member != NULL)  

    {  

        while (member->next != NULL)  

            member = member->next;  

        member->next = newworker;  

    else  

        pool->queue_head = newworker;  

    assert (pool->queue_head != NULL);  

    pool->cur_queue_size++;  

    pthread_mutex_unlock (&(pool->queue_lock));  

    /*好了,等待隊列中有任務了,喚醒一個等待線程; 

    注意如果所有線程都在忙碌,這句沒有任何作用*/  

    pthread_cond_signal (&(pool->queue_ready));  

    return 0;  

/*銷毀線程池,等待隊列中的任務不會再被執行,但是正在運作的線程會一直 

把任務運作完後再退出*/  

pool_destroy ()  

    if (pool->shutdown)  

        return -1;/*防止兩次調用*/  

    pool->shutdown = 1;  

    /*喚醒所有等待線程,線程池要銷毀了*/  

    pthread_cond_broadcast (&(pool->queue_ready));  

    /*阻塞等待線程退出,否則就成僵屍了*/  

    int i;  

    for (i = 0; i < pool->max_thread_num; i++)  

        pthread_join (pool->threadid[i], NULL);  

    free (pool->threadid);  

    /*銷毀等待隊列*/  

    CThread_worker *head = NULL;  

    while (pool->queue_head != NULL)  

        head = pool->queue_head;  

        pool->queue_head = pool->queue_head->next;  

        free (head);  

    /*條件變量和互斥量也别忘了銷毀*/  

    pthread_mutex_destroy(&(pool->queue_lock));  

    pthread_cond_destroy(&(pool->queue_ready));  

    free (pool);  

    /*銷毀後指針置空是個好習慣*/  

    pool=NULL;  

void *  

thread_routine (void *arg)  

    printf ("starting thread 0x%x\n", pthread_self ());  

    while (1)  

        pthread_mutex_lock (&(pool->queue_lock));  

        /*如果等待隊列為0并且不銷毀線程池,則處于阻塞狀态; 注意 

        pthread_cond_wait是一個原子操作,等待前會解鎖,喚醒後會加鎖*/  

        while (pool->cur_queue_size == 0 && !pool->shutdown)  

        {  

            printf ("thread 0x%x is waiting\n", pthread_self ());  

            pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));  

        }  

        /*線程池要銷毀了*/  

        if (pool->shutdown)  

            /*遇到break,continue,return等跳轉語句,千萬不要忘記先解鎖*/  

            pthread_mutex_unlock (&(pool->queue_lock));  

            printf ("thread 0x%x will exit\n", pthread_self ());  

            pthread_exit (NULL);  

        printf ("thread 0x%x is starting to work\n", pthread_self ());  

        /*assert是調試的好幫手*/  

        assert (pool->cur_queue_size != 0);  

        assert (pool->queue_head != NULL);  

        /*等待隊列長度減去1,并取對外連結表中的頭元素*/  

        pool->cur_queue_size--;  

        CThread_worker *worker = pool->queue_head;  

        pool->queue_head = worker->next;  

        pthread_mutex_unlock (&(pool->queue_lock));  

        /*調用回調函數,執行任務*/  

        (*(worker->process)) (worker->arg);  

        free (worker);  

        worker = NULL;  

    /*這一句應該是不可達的*/  

    pthread_exit (NULL);  

//    下面是測試代碼  

myprocess (void *arg)  

    printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);  

    sleep (1);/*休息一秒,延長任務的執行時間*/  

    return NULL;  

main (int argc, char **argv)  

    pool_init (3);/*線程池中最多三個活動線程*/  

    /*連續向池中投入10個任務*/  

    int *workingnum = (int *) malloc (sizeof (int) * 10);  

    for (i = 0; i < 10; i++)  

        workingnum[i] = i;  

        pool_add_worker (myprocess, &workingnum[i]);  

    /*等待所有任務完成*/  

    sleep (5);  

    /*銷毀線程池*/  

    pool_destroy ();  

    free (workingnum);  

将上述所有代碼放入threadpool.c檔案中,

在Linux輸入編譯指令

$ gcc -o threadpool threadpool.c -lpthread

以下是運作結果

starting thread 0xb7df6b90

thread 0xb7df6b90 is waiting

starting thread 0xb75f5b90

thread 0xb75f5b90 is waiting

starting thread 0xb6df4b90

thread 0xb6df4b90 is waiting

thread 0xb7df6b90 is starting to work

threadid is 0xb7df6b90, working on task 0

thread 0xb75f5b90 is starting to work

threadid is 0xb75f5b90, working on task 1

thread 0xb6df4b90 is starting to work

threadid is 0xb6df4b90, working on task 2

threadid is 0xb7df6b90, working on task 3

threadid is 0xb75f5b90, working on task 4

threadid is 0xb6df4b90, working on task 5

threadid is 0xb7df6b90, working on task 6

threadid is 0xb75f5b90, working on task 7

threadid is 0xb6df4b90, working on task 8

threadid is 0xb7df6b90, working on task 9

thread 0xb75f5b90 will exit

thread 0xb6df4b90 will exit

thread 0xb7df6b90 will exit

本文轉自 傑思 51CTO部落格,原文連結:http://blog.51cto.com/12700807/1948811