天天看點

linux中線程池【轉】

一、線程池

大多數的網絡伺服器,包括Web伺服器都具有一個特點,就是機關時間内必須處理數目巨大的連接配接請求,但是處理時間卻是比較短的。在傳統的多線程伺服器模型中是這樣實作的:一旦有個請求到達,就建立一個新的線程,由該線程執行任務,任務執行完畢之後,線程就退出。這就是"即時建立,即時銷毀"的政策。盡管與建立程序相比,建立線程的時間已經大大的縮短,但是如果送出給線程的任務是執行時間較短,而且執行次數非常頻繁,那麼伺服器就将處于一個不停的建立線程和銷毀線程的狀态。這筆開銷是不可忽略的,尤其是線程執行的時間非常非常短的情況。

  線程池就是為了解決上述問題的,它的實作原理是這樣的:在應用程式啟動之後,就馬上建立一定數量的線程,放入空閑的隊列中。這些線程都是處于阻塞狀态,這些線程隻占一點記憶體,不占用CPU。當任務到來後,線程池将選擇一個空閑的線程,将任務傳入此線程中運作。當所有的線程都處在處理任務的時候,線程池将自動建立一定的數量的新線程,用于處理更多的任務。執行任務完成之後線程并不退出,而是繼續線上程池中等待下一次任務。當大部分線程處于阻塞狀态時,線程池将自動銷毀一部分的線程,回收系統資源。

  下面是一個簡單線程池的實作,這個線程池的代碼是我參考網上的一個例子實作的,由于找不到出處了,就沒辦法注明參考自哪裡了。它的方案是這樣的:程式啟動之前,初始化線程池,啟動線程池中的線程,由于還沒有任務到來,線程池中的所有線程都處在阻塞狀态,當一有任務到達就從線程池中取出一個空閑線程處理,如果所有的線程都處于工作狀态,就添加到隊列,進行排隊。如果隊列中的任務個數大于隊列的所能容納的最大數量,那就不能添加任務到隊列中,隻能等待隊列不滿才能添加任務到隊列中。

  主要由兩個檔案組成一個threadpool.h頭檔案和一個threadpool.c源檔案組成。源碼中已有重要的注釋,就不加以分析了。

  threadpool.h檔案:

[cpp] ​​view plain​​​​copy​​

1. struct job
2. {
3. void* (*callback_function)(void *arg);    //線程回調函數
4. void *arg;                                //回調函數參數
5. struct job *next;
6. };
7. 
8. struct threadpool
9. {
10. int thread_num;                   //線程池中開啟線程的個數
11. int queue_max_num;                //隊列中最大job的個數
12. struct job *head;                 //指向job的頭指針
13. struct job *tail;                 //指向job的尾指針
14. //線程池中所有線程的pthread_t
15. //互斥信号量
16. //隊列為空的條件變量
17. //隊列不為空的條件變量
18. //隊列不為滿的條件變量
19. int queue_cur_num;                //隊列目前的job個數
20. int queue_close;                  //隊列是否已經關閉
21. int pool_close;                   //線程池是否已經關閉
22. };
23. 
24. //================================================================================================
25. //函數名:                   threadpool_init
26. //函數描述:                 初始化線程池
27. //輸入:                    [in] thread_num     線程池開啟的線程個數
28. //                         [in] queue_max_num  隊列的最大job個數
29. //輸出:                    無
30. //傳回:                    成功:線程池位址 失敗:NULL
31. //================================================================================================
32. struct threadpool* threadpool_init(int thread_num, int queue_max_num);
33. 
34. //================================================================================================
35. //函數名:                    threadpool_add_job
36. //函數描述:                  向線程池中添加任務
37. //輸入:                     [in] pool                  線程池位址
38. //                          [in] callback_function     回調函數
39. //                          [in] arg                     回調函數參數
40. //輸出:                     無
41. //傳回:                     成功:0 失敗:-1
42. //================================================================================================
43. int threadpool_add_job(struct threadpool *pool, void* (*callback_function)(void *arg), void *arg);
44. 
45. //================================================================================================
46. //函數名:                    threadpool_destroy
47. //函數描述:                   銷毀線程池
48. //輸入:                      [in] pool                  線程池位址
49. //輸出:                      無
50. //傳回:                      成功:0 失敗:-1
51. //================================================================================================
52. int threadpool_destroy(struct threadpool *pool);
53. 
54. //================================================================================================
55. //函數名:                    threadpool_function
56. //函數描述:                  線程池中線程函數
57. //輸入:                     [in] arg                  線程池位址
58. //輸出:                     無
59. //傳回:                     無
60. //================================================================================================
61. void* threadpool_function(void* arg);      

 threadpool.c檔案:

[cpp] ​​view plain​​​​copy​​

1. #include "threadpool.h"
2. 
3. struct threadpool* threadpool_init(int thread_num, int queue_max_num)
4. {
5. struct threadpool *pool = NULL;
6. do
7. {
8. sizeof(struct threadpool));
9. if (NULL == pool)
10. {
11. "failed to malloc threadpool!\n");
12. break;
13. }
14. pool->thread_num = thread_num;
15. pool->queue_max_num = queue_max_num;
16. pool->queue_cur_num = 0;
17. pool->head = NULL;
18. pool->tail = NULL;
19. if (pthread_mutex_init(&(pool->mutex), NULL))
20. {
21. "failed to init mutex!\n");
22. break;
23. }
24. if (pthread_cond_init(&(pool->queue_empty), NULL))
25. {
26. "failed to init queue_empty!\n");
27. break;
28. }
29. if (pthread_cond_init(&(pool->queue_not_empty), NULL))
30. {
31. "failed to init queue_not_empty!\n");
32. break;
33. }
34. if (pthread_cond_init(&(pool->queue_not_full), NULL))
35. {
36. "failed to init queue_not_full!\n");
37. break;
38. }
39. sizeof(pthread_t) * thread_num);
40. if (NULL == pool->pthreads)
41. {
42. "failed to malloc pthreads!\n");
43. break;
44. }
45. pool->queue_close = 0;
46. pool->pool_close = 0;
47. int i;
48. for (i = 0; i < pool->thread_num; ++i)
49. {
50. void *)pool);
51. }
52. 
53. return pool;
54. while (0);
55. 
56. return NULL;
57. }
58. 
59. int threadpool_add_job(struct threadpool* pool, void* (*callback_function)(void *arg), void *arg)
60. {
61. assert(pool != NULL);
62. assert(callback_function != NULL);
63. assert(arg != NULL);
64. 
65. pthread_mutex_lock(&(pool->mutex));
66. while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close))
67. {
68. //隊列滿的時候就等待
69. }
70. if (pool->queue_close || pool->pool_close)    //隊列關閉或者線程池關閉就退出
71. {
72. pthread_mutex_unlock(&(pool->mutex));
73. return -1;
74. }
75. struct job *pjob =(struct job*) malloc(sizeof(struct job));
76. if (NULL == pjob)
77. {
78. pthread_mutex_unlock(&(pool->mutex));
79. return -1;
80. }
81. pjob->callback_function = callback_function;
82. pjob->arg = arg;
83. pjob->next = NULL;
84. if (pool->head == NULL)
85. {
86. pool->head = pool->tail = pjob;
87. //隊列空的時候,有任務來時就通知線程池中的線程:隊列非空
88. }
89. else
90. {
91. pool->tail->next = pjob;
92. pool->tail = pjob;
93. }
94. pool->queue_cur_num++;
95. pthread_mutex_unlock(&(pool->mutex));
96. return 0;
97. }
98. 
99. void* threadpool_function(void* arg)
100. {
101. struct threadpool *pool = (struct threadpool*)arg;
102. struct job *pjob = NULL;
103. while (1)  //死循環
104. {
105. pthread_mutex_lock(&(pool->mutex));
106. while ((pool->queue_cur_num == 0) && !pool->pool_close)   //隊列為空時,就等待隊列非空
107. {
108. pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
109. }
110. if (pool->pool_close)   //線程池關閉,線程就退出
111. {
112. pthread_mutex_unlock(&(pool->mutex));
113. pthread_exit(NULL);
114. }
115. pool->queue_cur_num--;
116. pjob = pool->head;
117. if (pool->queue_cur_num == 0)
118. {
119. pool->head = pool->tail = NULL;
120. }
121. else
122. {
123. pool->head = pjob->next;
124. }
125. if (pool->queue_cur_num == 0)
126. {
127. //隊列為空,就可以通知threadpool_destroy函數,銷毀線程函數
128. }
129. if (pool->queue_cur_num == pool->queue_max_num - 1)
130. {
131. //隊列非滿,就可以通知threadpool_add_job函數,添加新任務
132. }
133. pthread_mutex_unlock(&(pool->mutex));
134. 
135. //線程真正要做的工作,回調函數的調用
136. free(pjob);
137. pjob = NULL;
138. }
139. }
140. int threadpool_destroy(struct threadpool *pool)
141. {
142. assert(pool != NULL);
143. pthread_mutex_lock(&(pool->mutex));
144. if (pool->queue_close || pool->pool_close)   //線程池已經退出了,就直接傳回
145. {
146. pthread_mutex_unlock(&(pool->mutex));
147. return -1;
148. }
149. 
150. //置隊列關閉标志
151. while (pool->queue_cur_num != 0)
152. {
153. //等待隊列為空
154. }
155. 
156. //置線程池關閉标志
157. pthread_mutex_unlock(&(pool->mutex));
158. //喚醒線程池中正在阻塞的線程
159. //喚醒添加任務的threadpool_add_job函數
160. int i;
161. for (i = 0; i < pool->thread_num; ++i)
162. {
163. //等待線程池的所有線程執行完畢
164. }
165. 
166. //清理資源
167. pthread_cond_destroy(&(pool->queue_empty));
168. pthread_cond_destroy(&(pool->queue_not_empty));
169. pthread_cond_destroy(&(pool->queue_not_full));
170. free(pool->pthreads);
171. struct job *p;
172. while (pool->head != NULL)
173. {
174. p = pool->head;
175. pool->head = p->next;
176. free(p);
177. }
178. free(pool);
179. return 0;
180. }      

測試檔案main.c檔案:

[cpp] ​​view plain​​​​copy​​
1. #include "threadpool.h"
2. 
3. void* work(void* arg)
4. {
5. char *p = (char*) arg;
6. "threadpool callback fuction : %s.\n", p);
7. sleep(1);
8. }
9. 
10. int main(void)
11. {
12. struct threadpool *pool = threadpool_init(10, 20);
13. "1");
14. "2");
15. "3");
16. "4");
17. "5");
18. "6");
19. "7");
20. "8");
21. "9");
22. "10");
23. "11");
24. "12");
25. "13");
26. "14");
27. "15");
28. "16");
29. "17");
30. "18");
31. "19");
32. "20");
33. "21");
34. "22");
35. "23");
36. "24");
37. "25");
38. "26");
39. "27");
40. "28");
41. "29");
42. "30");
43. "31");
44. "32");
45. "33");
46. "34");
47. "35");
48. "36");
49. "37");
50. "38");
51. "39");
52. "40");
53. 
54. sleep(5);
55. threadpool_destroy(pool);
56. return 0;
57. }      

二、線程池補充

上面的文章介紹了線程池的原理及意義。

下面,介紹的這個線程池與上面提到的那個線程池有一部分相似的地方。

  主要差別為:

    1、線程池中的每個線程都有自己的互斥量和條件變量,而不是線程池共享一個。

    2、線程池中的線程在程式結束時,等待線程池中線程停止的機制不同。

  該程式主要由兩個檔案構成,分别為ThreadPool.h和ThreadPool.cpp檔案。

  ThreadPool.h檔案:

[cpp] ​​view plain​​​​copy​​
1. #define MAXT_IN_POOL 200
2. #define BUSY_THRESHOlD 0.5
3. #define MANAGE_INTREVAL 2
4. 
5. class ThreadPool;
6. 
7. typedef void (*dispatch_fn)(void*);
8. 
9. //線程函數參數
10. typedef struct tagThread
11. {
12. //線程ID
13. //信号量
14. //條件變量
15. //調用的函數,任務
16. void* args;                    //函數參數
17. //線程池指針
18. }_thread;
19. 
20. //線程池
21. class ThreadPool
22. {
23. public:
24. //================================================================================================
25. //函數名:                  ThreadPool
26. //函數描述:                構造函數
27. //輸入:                    [in] max_threads_in_pool 線程池最大線程數
28. //輸入:                    [in] min_threads_in_pool 線程池最小問題數
29. //輸出:                    無
30. //傳回:                    無
31. //================================================================================================
32. int max_threads_in_pool, unsigned int min_threads_in_pool = 2);
33. ~ThreadPool();
34. 
35. //================================================================================================
36. //函數名:                  dispatch_threadpool
37. //函數描述:                将任務加入線程池,由線程池進行分發
38. //輸入:                    [in] dispatch_me 調用的函數位址
39. //輸入:                    [in] dispatch_me 函數參數
40. //輸出:                    無
41. //傳回:                    無
42. //================================================================================================
43. void dispatch_threadpool(dispatch_fn dispatch_me, void* dispatch_me);
44. private:
45. //信号量
46. //線程池中線程有空閑線程的條件變量
47. //線程池中線程為滿的條件變量
48. //線程池中線程為空的條件變量
49. int tp_min;                //線程池的最小線程數
50. int tp_max;                //線程池的最大線程數
51. int tp_avail;              //線程池中空閑的線程數
52. int tp_total;              //線程池中已建立的線程數
53. //指向線程池中所有空閑線程的參數的指針
54. bool tp_stop;              //線程池是否已停止
55. 
56. //================================================================================================
57. //函數名:                  add_avail
58. //函數描述:                加入空閑線程
59. //輸入:                    [in] avail 線程的參數
60. //輸出:                    無
61. //傳回:                    成功:true,失敗:false
62. //================================================================================================
63. bool add_avail(_thread* avail);
64. 
65. //================================================================================================
66. //函數名:                  work_thread
67. //函數描述:                線程函數
68. //輸入:                    [in] args 參數
69. //輸出:                    無
70. //傳回:                    無
71. //================================================================================================
72. static void* work_thread(void* args);
73. 
74. //================================================================================================
75. //函數名:                  add_thread
76. //函數描述:                添加一個線程
77. //輸入:                    [in] dispatch_me 函數指針
78. //輸入:                    [in] args        函數參數
79. //輸出:                    無
80. //傳回:                    無
81. //================================================================================================
82. bool add_thread(dispatch_fn dispatch_me, void* args);
83. 
84. //================================================================================================
85. //函數名:                  syn_all
86. //函數描述:                等待線程池中所有線程空閑
87. //輸入:                    無
88. //輸出:                    無
89. //傳回:                    無
90. //================================================================================================
91. void syn_all();
92. };      
[cpp] ​​view plain​​​​copy​​
1. ThreadPool::ThreadPool(unsigned int max_threads_in_pool, unsigned int min_threads_in_pool)
2. {
3. pthread_t manage_id;
4. 
5. if (min_threads_in_pool <= 0 || max_threads_in_pool < 0 || min_threads_in_pool > max_threads_in_pool || max_threads_in_pool > MAXT_IN_POOL)
6. {
7. return ;
8. }
9. 
10. //初始化線程池
11. tp_total = 0;
12. tp_min = min_threads_in_pool;
13. tp_max = max_threads_in_pool;
14. false;
15. sizeof(void *) * max_threads_in_pool);
16. if (NULL == tp_list)
17. {
18. return;
19. }
20. sizeof(void *) * max_threads_in_pool);
21. 
22. pthread_mutex_init(&tp_mutex, NULL);
23. pthread_cond_init(&tp_idle, NULL);
24. pthread_cond_init(&tp_full, NULL);
25. pthread_cond_init(&tp_empty, NULL);
26. }
27. 
28. bool ThreadPool::add_avail(_thread* avail)
29. {
30. bool ret = false;
31. 
32. pthread_mutex_lock(&tp_mutex);
33. if (tp_avail < tp_max)
34. {
35. tp_list[tp_avail] = avail;
36. tp_avail++;
37. //線程池中有線程為空閑
38. if (tp_avail >= tp_total)
39. {
40. //線程池中所有線程都為為空閑
41. }
42. true;
43. }
44. pthread_mutex_unlock(&tp_mutex);
45. 
46. return ret;
47. }
48. 
49. void* ThreadPool::work_thread(void* args)
50. {
51. thread = (_thread*) args;
52. thread->parent;
53. while (pool->tp_stop == false)
54. {
55. thread->do_job(thread->args);
56. thread->thread_mutex); //執行完任務之後,添加到空閑線程隊列中
57. if (pool->add_avail(thread))
58. {
59. thread->thread_cond, &thread->thread_mutex);
60. thread->thread_mutex);
61. }
62. else
63. {
64. thread->thread_mutex);
65. thread->thread_mutex);
66. thread->thread_cond);
67. thread);
68. break;
69. }
70. }
71. 
72. pthread_mutex_lock(&pool->tp_mutex);
73. pool->tp_total--;
74. if (pool->tp_total <= 0)
75. {
76. pthread_cond_signal(&pool->tp_empty);
77. }
78. pthread_mutex_unlock(&pool->tp_mutex);
79. 
80. return NULL;
81. }
82. 
83. bool ThreadPool::add_thread(dispatch_fn dispatch_me, void* args)  //添加一個線程
84. {
85. thread = NULL;
86. pthread_attr_t attr;
87. 
88. thread = (_thread *) malloc(sizeof(_thread));
89. if (NULL == thread)
90. {
91. return false;
92. }
93. 
94. thread->thread_mutex, NULL);
95. thread->thread_cond, NULL);
96. thread->do_job = dispatch_me;
97. thread->args = args;
98. thread->parent = this;
99. pthread_attr_init(&attr);
100. pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
101. 
102. if (pthread_create(&thread->thread_id, &attr, work_thread, (void *) thread) != 0)
103. {
104. thread->thread_mutex);
105. thread->thread_cond);
106. pthread_attr_destroy(&attr);
107. thread);
108. return false;
109. }
110. tp_total++;
111. 
112. return true;
113. }
114. 
115. void ThreadPool::dispatch_threadpool(dispatch_fn dispatch_me, void* args)
116. {
117. thread = NULL;
118. 
119. pthread_mutex_lock(&tp_mutex);
120. 
121. if (tp_avail <= 0 && tp_total >= tp_max) //無可用線程,而且線程數已達最大值,等待空閑線程
122. {
123. pthread_cond_wait(&tp_idle, &tp_mutex);
124. }
125. 
126. if (tp_avail <= 0)  //無可用線程,而且線程數未達最大值,添加線程
127. {
128. if (!add_thread(dispatch_me, args))
129. {
130. return;
131. }
132. }
133. else   //有可用線程
134. {
135. tp_avail--;
136. thread = tp_list[tp_avail];
137. tp_list[tp_avail] = NULL;
138. thread->do_job = dispatch_me;
139. thread->args = args;
140. 
141. thread->thread_mutex);
142. thread->thread_cond);
143. thread->thread_mutex);
144. }
145. 
146. pthread_mutex_unlock(&tp_mutex);
147. }
148. 
149. 
150. void ThreadPool::syn_all()
151. {
152. if (tp_avail < tp_total)   //等待線程池中所有線程都為空閑狀态
153. {
154. pthread_cond_wait(&tp_full, &tp_mutex);
155. }
156. 
157. true;
158. int i = 0;
159. for (i = 0; i < tp_avail; i++)  //喚醒線程池中所有線程
160. {
161. thread = tp_list[i];
162. thread->thread_mutex);
163. thread->thread_cond);
164. thread->thread_mutex);
165. }
166. if (tp_total > 0)
167. {
168. //等待線程池中所有線程都結束
169. }
170. }
171. 
172. ThreadPool::~ThreadPool()
173. {
174. sleep(MANAGE_INTREVAL);
175. pthread_mutex_lock(&tp_mutex);
176. //等待線程池為空
177. int i = 0;
178. for (i = 0; i < tp_total; i++)  //資源釋放
179. {
180. free(tp_list[i]);
181. tp_list[i] = NULL;
182. }
183. pthread_mutex_unlock(&tp_mutex);
184. pthread_mutex_destroy(&tp_mutex);
185. pthread_cond_destroy(&tp_idle);
186. pthread_cond_destroy(&tp_full);
187. pthread_cond_destroy(&tp_empty);
188. free(tp_list);
189. }      

繼續閱讀