天天看點

pool函數C語言,C語言實作線程池

https://www.jianshu.com/p/6afdffe94d96

什麼時候需要建立線程池呢?簡單的說,如果一個應用需要頻繁的建立和銷毀線程,而任務執行的時間又非常短,這樣線程建立和銷毀的帶來的開銷就不容忽視,這時也是線程池該出場的機會了。如果線程建立T1和銷毀時間T3相比任務執行時間T2可以忽略不計,則沒有必要使用線程池了。反之如果T1+T3>T2,那就很有必要使用線程池。

下面是Linux系統下用C語言建立的一個線程池。線程池會維護一個任務連結清單(每個CThread_worker結構就是一個任務)。

pool_init()函數預先建立好max_thread_num個線程,每個線程執thread_routine ()函數。該函數中

while (pool->cur_queue_size == 0)

{

pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock));

}

表示如果任務連結清單中沒有任務,則該線程處于阻塞等待狀态。否則從隊列中取出任務并執行。

pool_add_worker()函數向線程池的任務連結清單中加入一個任務,加入後通過調用pthread_cond_signal (&(pool->queue_ready))喚醒一個出于阻塞狀态的線程(如果有的話)。

pool_destroy ()函數用于銷毀線程池,線程池任務連結清單中的任務不會再被執行,但是正在運作的線程會一直把任務運作完後再退出。

下面貼出完整代碼

#include

#include

#include

#include

#include

#include

typedef struct worker

{

void *(*process) (void *arg);

void *arg;

struct worker *next;

} CThread_worker;

typedef struct

{

pthread_mutex_t queue_lock;

pthread_cond_t queue_ready;

CThread_worker *queue_head;

int shutdown;

pthread_t *threadid;

int max_thread_num;

int cur_queue_size;

} CThread_pool;

int pool_add_worker (void *(*process) (void *arg), void *arg);

void *thread_routine (void *arg);

static CThread_pool *pool = NULL;

void pool_init (int max_thread_num)

{

pool = (CThread_pool *) malloc (sizeof (CThread_pool));

pthread_mutex_init (&(pool->queue_lock), NULL);

pthread_cond_init (&(pool->queue_ready), NULL);

pool->queue_head = NULL;

pool->max_thread_num = max_thread_num;

pool->cur_queue_size = 0;

pool->shutdown = 0;

pool->threadid =

(pthread_t *) malloc (max_thread_num * sizeof (pthread_t));

int i = 0;

for (i = 0; i < max_thread_num; i++)

{

pthread_create (&(pool->threadid[i]), NULL, thread_routine,

NULL);

}

}

int pool_add_worker (void *(*process) (void *arg), void *arg)

{

CThread_worker *newworker =

(CThread_worker *) malloc (sizeof (CThread_worker));

newworker->process = process;

newworker->arg = arg;

newworker->next = NULL;

pthread_mutex_lock (&(pool->queue_lock));

CThread_worker *member = pool->queue_head;

if (member != NULL)

{

while (member->next != NULL)

member = member->next;

member->next = newworker;

}

else

{

pool->queue_head = newworker;

}

assert (pool->queue_head != NULL);

pool->cur_queue_size++;

pthread_mutex_unlock (&(pool->queue_lock));

pthread_cond_signal (&(pool->queue_ready));

return 0;

}

int pool_destroy ()

{

if (pool->shutdown)

return -1;

pool->shutdown = 1;

pthread_cond_broadcast (&(pool->queue_ready));

int i;

for (i = 0; i < pool->max_thread_num; i++)

pthread_join (pool->threadid[i], NULL);

free (pool->threadid);

CThread_worker *head = NULL;

while (pool->queue_head != NULL)

{

head = pool->queue_head;

pool->queue_head = pool->queue_head->next;

free (head);

}

pthread_mutex_destroy(&(pool->queue_lock));

pthread_cond_destroy(&(pool->queue_ready));

free (pool);

pool=NULL;

return 0;

}

//非常重要的任務接口函數,各子線程統一調用這個函數,而這個函數内部檢查調用任務隊列中的實際任務函數指針。

void * thread_routine (void *arg)

{

printf ("starting thread 0x%x/n", pthread_self ());

while (1)

{

pthread_mutex_lock (&(pool->queue_lock));

while (pool->cur_queue_size == 0 && !pool->shutdown)

{

printf ("thread 0x%x is waiting/n", pthread_self ());

pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));

}

if (pool->shutdown)

{

pthread_mutex_unlock (&(pool->queue_lock));

printf ("thread 0x%x will exit/n", pthread_self ());

pthread_exit (NULL);

}

printf ("thread 0x%x is starting to work/n", pthread_self ());

assert (pool->cur_queue_size != 0);

assert (pool->queue_head != NULL);

pool->cur_queue_size--;

CThread_worker *worker = pool->queue_head;

pool->queue_head = worker->next;

pthread_mutex_unlock (&(pool->queue_lock));

(*(worker->process)) (worker->arg);

free (worker);

worker = NULL;

}

pthread_exit (NULL);

}

下面是測試代碼

void * myprocess (void *arg)

{

printf ("threadid is 0x%x, working on task %d/n", pthread_self (),*(int *) arg);

sleep (1);

return NULL;

}

int main (int argc, char **argv)

{

pool_init (3);

int *workingnum = (int *) malloc (sizeof (int) * 10);

int i;

for (i = 0; i < 10; i++)

{

workingnum[i] = i;

pool_add_worker (myprocess, &workingnum[i]);

}

sleep (5); //這句可能出問題,偷懶寫法。

pool_destroy ();

free (workingnum);

return 0;

}

将上述所有代碼放入threadpool.c檔案中,

在Linux輸入編譯指令

$ gcc -o threadpool threadpool.c -lpthread

以下是運作結果

starting thread 0xb7df6b90

thread 0xb7df6b90 is waiting

starting thread 0xb75f5b90

thread 0xb75f5b90 is waiting

starting thread 0xb6df4b90

thread 0xb6df4b90 is waiting

thread 0xb7df6b90 is starting to work

threadid is 0xb7df6b90, working on task 0

thread 0xb75f5b90 is starting to work

threadid is 0xb75f5b90, working on task 1

thread 0xb6df4b90 is starting to work

threadid is 0xb6df4b90, working on task 2

thread 0xb7df6b90 is starting to work

threadid is 0xb7df6b90, working on task 3

thread 0xb75f5b90 is starting to work

threadid is 0xb75f5b90, working on task 4

thread 0xb6df4b90 is starting to work

threadid is 0xb6df4b90, working on task 5

thread 0xb7df6b90 is starting to work

threadid is 0xb7df6b90, working on task 6

thread 0xb75f5b90 is starting to work

threadid is 0xb75f5b90, working on task 7

thread 0xb6df4b90 is starting to work

threadid is 0xb6df4b90, working on task 8

thread 0xb7df6b90 is starting to work

threadid is 0xb7df6b90, working on task 9

thread 0xb75f5b90 is waiting

thread 0xb6df4b90 is waiting

thread 0xb7df6b90 is waiting

thread 0xb75f5b90 will exit

thread 0xb6df4b90 will exit

thread 0xb7df6b90 will exit