一、線程池
大多數的網絡伺服器,包括Web伺服器都具有一個特點,就是機關時間内必須處理數目巨大的連接配接請求,但是處理時間卻是比較短的。在傳統的多線程伺服器模型中是這樣實作的:一旦有個請求到達,就建立一個新的線程,由該線程執行任務,任務執行完畢之後,線程就退出。這就是"即時建立,即時銷毀"的政策。盡管與建立程序相比,建立線程的時間已經大大的縮短,但是如果送出給線程的任務是執行時間較短,而且執行次數非常頻繁,那麼伺服器就将處于一個不停的建立線程和銷毀線程的狀态。這筆開銷是不可忽略的,尤其是線程執行的時間非常非常短的情況。
線程池就是為了解決上述問題的,它的實作原理是這樣的:在應用程式啟動之後,就馬上建立一定數量的線程,放入空閑的隊列中。這些線程都是處于阻塞狀态,這些線程隻占一點記憶體,不占用CPU。當任務到來後,線程池将選擇一個空閑的線程,将任務傳入此線程中運作。當所有的線程都處在處理任務的時候,線程池将自動建立一定的數量的新線程,用于處理更多的任務。執行任務完成之後線程并不退出,而是繼續線上程池中等待下一次任務。當大部分線程處于阻塞狀态時,線程池将自動銷毀一部分的線程,回收系統資源。
下面是一個簡單線程池的實作,這個線程池的代碼是我參考網上的一個例子實作的,由于找不到出處了,就沒辦法注明參考自哪裡了。它的方案是這樣的:程式啟動之前,初始化線程池,啟動線程池中的線程,由于還沒有任務到來,線程池中的所有線程都處在阻塞狀态,當一有任務到達就從線程池中取出一個空閑線程處理,如果所有的線程都處于工作狀态,就添加到隊列,進行排隊。如果隊列中的任務個數大于隊列的所能容納的最大數量,那就不能添加任務到隊列中,隻能等待隊列不滿才能添加任務到隊列中。
主要由兩個檔案組成一個threadpool.h頭檔案和一個threadpool.c源檔案組成。源碼中已有重要的注釋,就不加以分析了。
threadpool.h檔案:
[cpp] view plaincopy
1. struct job
2. {
3. void* (*callback_function)(void *arg); //線程回調函數
4. void *arg; //回調函數參數
5. struct job *next;
6. };
7.
8. struct threadpool
9. {
10. int thread_num; //線程池中開啟線程的個數
11. int queue_max_num; //隊列中最大job的個數
12. struct job *head; //指向job的頭指針
13. struct job *tail; //指向job的尾指針
14. //線程池中所有線程的pthread_t
15. //互斥信号量
16. //隊列為空的條件變量
17. //隊列不為空的條件變量
18. //隊列不為滿的條件變量
19. int queue_cur_num; //隊列目前的job個數
20. int queue_close; //隊列是否已經關閉
21. int pool_close; //線程池是否已經關閉
22. };
23.
24. //================================================================================================
25. //函數名: threadpool_init
26. //函數描述: 初始化線程池
27. //輸入: [in] thread_num 線程池開啟的線程個數
28. // [in] queue_max_num 隊列的最大job個數
29. //輸出: 無
30. //傳回: 成功:線程池位址 失敗:NULL
31. //================================================================================================
32. struct threadpool* threadpool_init(int thread_num, int queue_max_num);
33.
34. //================================================================================================
35. //函數名: threadpool_add_job
36. //函數描述: 向線程池中添加任務
37. //輸入: [in] pool 線程池位址
38. // [in] callback_function 回調函數
39. // [in] arg 回調函數參數
40. //輸出: 無
41. //傳回: 成功:0 失敗:-1
42. //================================================================================================
43. int threadpool_add_job(struct threadpool *pool, void* (*callback_function)(void *arg), void *arg);
44.
45. //================================================================================================
46. //函數名: threadpool_destroy
47. //函數描述: 銷毀線程池
48. //輸入: [in] pool 線程池位址
49. //輸出: 無
50. //傳回: 成功:0 失敗:-1
51. //================================================================================================
52. int threadpool_destroy(struct threadpool *pool);
53.
54. //================================================================================================
55. //函數名: threadpool_function
56. //函數描述: 線程池中線程函數
57. //輸入: [in] arg 線程池位址
58. //輸出: 無
59. //傳回: 無
60. //================================================================================================
61. void* threadpool_function(void* arg);
threadpool.c檔案:
[cpp] view plaincopy
1. #include "threadpool.h"
2.
3. struct threadpool* threadpool_init(int thread_num, int queue_max_num)
4. {
5. struct threadpool *pool = NULL;
6. do
7. {
8. sizeof(struct threadpool));
9. if (NULL == pool)
10. {
11. "failed to malloc threadpool!\n");
12. break;
13. }
14. pool->thread_num = thread_num;
15. pool->queue_max_num = queue_max_num;
16. pool->queue_cur_num = 0;
17. pool->head = NULL;
18. pool->tail = NULL;
19. if (pthread_mutex_init(&(pool->mutex), NULL))
20. {
21. "failed to init mutex!\n");
22. break;
23. }
24. if (pthread_cond_init(&(pool->queue_empty), NULL))
25. {
26. "failed to init queue_empty!\n");
27. break;
28. }
29. if (pthread_cond_init(&(pool->queue_not_empty), NULL))
30. {
31. "failed to init queue_not_empty!\n");
32. break;
33. }
34. if (pthread_cond_init(&(pool->queue_not_full), NULL))
35. {
36. "failed to init queue_not_full!\n");
37. break;
38. }
39. sizeof(pthread_t) * thread_num);
40. if (NULL == pool->pthreads)
41. {
42. "failed to malloc pthreads!\n");
43. break;
44. }
45. pool->queue_close = 0;
46. pool->pool_close = 0;
47. int i;
48. for (i = 0; i < pool->thread_num; ++i)
49. {
50. void *)pool);
51. }
52.
53. return pool;
54. while (0);
55.
56. return NULL;
57. }
58.
59. int threadpool_add_job(struct threadpool* pool, void* (*callback_function)(void *arg), void *arg)
60. {
61. assert(pool != NULL);
62. assert(callback_function != NULL);
63. assert(arg != NULL);
64.
65. pthread_mutex_lock(&(pool->mutex));
66. while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close))
67. {
68. //隊列滿的時候就等待
69. }
70. if (pool->queue_close || pool->pool_close) //隊列關閉或者線程池關閉就退出
71. {
72. pthread_mutex_unlock(&(pool->mutex));
73. return -1;
74. }
75. struct job *pjob =(struct job*) malloc(sizeof(struct job));
76. if (NULL == pjob)
77. {
78. pthread_mutex_unlock(&(pool->mutex));
79. return -1;
80. }
81. pjob->callback_function = callback_function;
82. pjob->arg = arg;
83. pjob->next = NULL;
84. if (pool->head == NULL)
85. {
86. pool->head = pool->tail = pjob;
87. //隊列空的時候,有任務來時就通知線程池中的線程:隊列非空
88. }
89. else
90. {
91. pool->tail->next = pjob;
92. pool->tail = pjob;
93. }
94. pool->queue_cur_num++;
95. pthread_mutex_unlock(&(pool->mutex));
96. return 0;
97. }
98.
99. void* threadpool_function(void* arg)
100. {
101. struct threadpool *pool = (struct threadpool*)arg;
102. struct job *pjob = NULL;
103. while (1) //死循環
104. {
105. pthread_mutex_lock(&(pool->mutex));
106. while ((pool->queue_cur_num == 0) && !pool->pool_close) //隊列為空時,就等待隊列非空
107. {
108. pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
109. }
110. if (pool->pool_close) //線程池關閉,線程就退出
111. {
112. pthread_mutex_unlock(&(pool->mutex));
113. pthread_exit(NULL);
114. }
115. pool->queue_cur_num--;
116. pjob = pool->head;
117. if (pool->queue_cur_num == 0)
118. {
119. pool->head = pool->tail = NULL;
120. }
121. else
122. {
123. pool->head = pjob->next;
124. }
125. if (pool->queue_cur_num == 0)
126. {
127. //隊列為空,就可以通知threadpool_destroy函數,銷毀線程函數
128. }
129. if (pool->queue_cur_num == pool->queue_max_num - 1)
130. {
131. //隊列非滿,就可以通知threadpool_add_job函數,添加新任務
132. }
133. pthread_mutex_unlock(&(pool->mutex));
134.
135. //線程真正要做的工作,回調函數的調用
136. free(pjob);
137. pjob = NULL;
138. }
139. }
140. int threadpool_destroy(struct threadpool *pool)
141. {
142. assert(pool != NULL);
143. pthread_mutex_lock(&(pool->mutex));
144. if (pool->queue_close || pool->pool_close) //線程池已經退出了,就直接傳回
145. {
146. pthread_mutex_unlock(&(pool->mutex));
147. return -1;
148. }
149.
150. //置隊列關閉标志
151. while (pool->queue_cur_num != 0)
152. {
153. //等待隊列為空
154. }
155.
156. //置線程池關閉标志
157. pthread_mutex_unlock(&(pool->mutex));
158. //喚醒線程池中正在阻塞的線程
159. //喚醒添加任務的threadpool_add_job函數
160. int i;
161. for (i = 0; i < pool->thread_num; ++i)
162. {
163. //等待線程池的所有線程執行完畢
164. }
165.
166. //清理資源
167. pthread_cond_destroy(&(pool->queue_empty));
168. pthread_cond_destroy(&(pool->queue_not_empty));
169. pthread_cond_destroy(&(pool->queue_not_full));
170. free(pool->pthreads);
171. struct job *p;
172. while (pool->head != NULL)
173. {
174. p = pool->head;
175. pool->head = p->next;
176. free(p);
177. }
178. free(pool);
179. return 0;
180. }
測試檔案main.c檔案:
[cpp] view plaincopy
1. #include "threadpool.h"
2.
3. void* work(void* arg)
4. {
5. char *p = (char*) arg;
6. "threadpool callback fuction : %s.\n", p);
7. sleep(1);
8. }
9.
10. int main(void)
11. {
12. struct threadpool *pool = threadpool_init(10, 20);
13. "1");
14. "2");
15. "3");
16. "4");
17. "5");
18. "6");
19. "7");
20. "8");
21. "9");
22. "10");
23. "11");
24. "12");
25. "13");
26. "14");
27. "15");
28. "16");
29. "17");
30. "18");
31. "19");
32. "20");
33. "21");
34. "22");
35. "23");
36. "24");
37. "25");
38. "26");
39. "27");
40. "28");
41. "29");
42. "30");
43. "31");
44. "32");
45. "33");
46. "34");
47. "35");
48. "36");
49. "37");
50. "38");
51. "39");
52. "40");
53.
54. sleep(5);
55. threadpool_destroy(pool);
56. return 0;
57. }
二、線程池補充
上面的文章介紹了線程池的原理及意義。
下面,介紹的這個線程池與上面提到的那個線程池有一部分相似的地方。
主要差別為:
1、線程池中的每個線程都有自己的互斥量和條件變量,而不是線程池共享一個。
2、線程池中的線程在程式結束時,等待線程池中線程停止的機制不同。
該程式主要由兩個檔案構成,分别為ThreadPool.h和ThreadPool.cpp檔案。
ThreadPool.h檔案:
[cpp] view plaincopy
1. #define MAXT_IN_POOL 200
2. #define BUSY_THRESHOlD 0.5
3. #define MANAGE_INTREVAL 2
4.
5. class ThreadPool;
6.
7. typedef void (*dispatch_fn)(void*);
8.
9. //線程函數參數
10. typedef struct tagThread
11. {
12. //線程ID
13. //信号量
14. //條件變量
15. //調用的函數,任務
16. void* args; //函數參數
17. //線程池指針
18. }_thread;
19.
20. //線程池
21. class ThreadPool
22. {
23. public:
24. //================================================================================================
25. //函數名: ThreadPool
26. //函數描述: 構造函數
27. //輸入: [in] max_threads_in_pool 線程池最大線程數
28. //輸入: [in] min_threads_in_pool 線程池最小問題數
29. //輸出: 無
30. //傳回: 無
31. //================================================================================================
32. int max_threads_in_pool, unsigned int min_threads_in_pool = 2);
33. ~ThreadPool();
34.
35. //================================================================================================
36. //函數名: dispatch_threadpool
37. //函數描述: 将任務加入線程池,由線程池進行分發
38. //輸入: [in] dispatch_me 調用的函數位址
39. //輸入: [in] dispatch_me 函數參數
40. //輸出: 無
41. //傳回: 無
42. //================================================================================================
43. void dispatch_threadpool(dispatch_fn dispatch_me, void* dispatch_me);
44. private:
45. //信号量
46. //線程池中線程有空閑線程的條件變量
47. //線程池中線程為滿的條件變量
48. //線程池中線程為空的條件變量
49. int tp_min; //線程池的最小線程數
50. int tp_max; //線程池的最大線程數
51. int tp_avail; //線程池中空閑的線程數
52. int tp_total; //線程池中已建立的線程數
53. //指向線程池中所有空閑線程的參數的指針
54. bool tp_stop; //線程池是否已停止
55.
56. //================================================================================================
57. //函數名: add_avail
58. //函數描述: 加入空閑線程
59. //輸入: [in] avail 線程的參數
60. //輸出: 無
61. //傳回: 成功:true,失敗:false
62. //================================================================================================
63. bool add_avail(_thread* avail);
64.
65. //================================================================================================
66. //函數名: work_thread
67. //函數描述: 線程函數
68. //輸入: [in] args 參數
69. //輸出: 無
70. //傳回: 無
71. //================================================================================================
72. static void* work_thread(void* args);
73.
74. //================================================================================================
75. //函數名: add_thread
76. //函數描述: 添加一個線程
77. //輸入: [in] dispatch_me 函數指針
78. //輸入: [in] args 函數參數
79. //輸出: 無
80. //傳回: 無
81. //================================================================================================
82. bool add_thread(dispatch_fn dispatch_me, void* args);
83.
84. //================================================================================================
85. //函數名: syn_all
86. //函數描述: 等待線程池中所有線程空閑
87. //輸入: 無
88. //輸出: 無
89. //傳回: 無
90. //================================================================================================
91. void syn_all();
92. };
[cpp] view plaincopy
1. ThreadPool::ThreadPool(unsigned int max_threads_in_pool, unsigned int min_threads_in_pool)
2. {
3. pthread_t manage_id;
4.
5. if (min_threads_in_pool <= 0 || max_threads_in_pool < 0 || min_threads_in_pool > max_threads_in_pool || max_threads_in_pool > MAXT_IN_POOL)
6. {
7. return ;
8. }
9.
10. //初始化線程池
11. tp_total = 0;
12. tp_min = min_threads_in_pool;
13. tp_max = max_threads_in_pool;
14. false;
15. sizeof(void *) * max_threads_in_pool);
16. if (NULL == tp_list)
17. {
18. return;
19. }
20. sizeof(void *) * max_threads_in_pool);
21.
22. pthread_mutex_init(&tp_mutex, NULL);
23. pthread_cond_init(&tp_idle, NULL);
24. pthread_cond_init(&tp_full, NULL);
25. pthread_cond_init(&tp_empty, NULL);
26. }
27.
28. bool ThreadPool::add_avail(_thread* avail)
29. {
30. bool ret = false;
31.
32. pthread_mutex_lock(&tp_mutex);
33. if (tp_avail < tp_max)
34. {
35. tp_list[tp_avail] = avail;
36. tp_avail++;
37. //線程池中有線程為空閑
38. if (tp_avail >= tp_total)
39. {
40. //線程池中所有線程都為為空閑
41. }
42. true;
43. }
44. pthread_mutex_unlock(&tp_mutex);
45.
46. return ret;
47. }
48.
49. void* ThreadPool::work_thread(void* args)
50. {
51. thread = (_thread*) args;
52. thread->parent;
53. while (pool->tp_stop == false)
54. {
55. thread->do_job(thread->args);
56. thread->thread_mutex); //執行完任務之後,添加到空閑線程隊列中
57. if (pool->add_avail(thread))
58. {
59. thread->thread_cond, &thread->thread_mutex);
60. thread->thread_mutex);
61. }
62. else
63. {
64. thread->thread_mutex);
65. thread->thread_mutex);
66. thread->thread_cond);
67. thread);
68. break;
69. }
70. }
71.
72. pthread_mutex_lock(&pool->tp_mutex);
73. pool->tp_total--;
74. if (pool->tp_total <= 0)
75. {
76. pthread_cond_signal(&pool->tp_empty);
77. }
78. pthread_mutex_unlock(&pool->tp_mutex);
79.
80. return NULL;
81. }
82.
83. bool ThreadPool::add_thread(dispatch_fn dispatch_me, void* args) //添加一個線程
84. {
85. thread = NULL;
86. pthread_attr_t attr;
87.
88. thread = (_thread *) malloc(sizeof(_thread));
89. if (NULL == thread)
90. {
91. return false;
92. }
93.
94. thread->thread_mutex, NULL);
95. thread->thread_cond, NULL);
96. thread->do_job = dispatch_me;
97. thread->args = args;
98. thread->parent = this;
99. pthread_attr_init(&attr);
100. pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
101.
102. if (pthread_create(&thread->thread_id, &attr, work_thread, (void *) thread) != 0)
103. {
104. thread->thread_mutex);
105. thread->thread_cond);
106. pthread_attr_destroy(&attr);
107. thread);
108. return false;
109. }
110. tp_total++;
111.
112. return true;
113. }
114.
115. void ThreadPool::dispatch_threadpool(dispatch_fn dispatch_me, void* args)
116. {
117. thread = NULL;
118.
119. pthread_mutex_lock(&tp_mutex);
120.
121. if (tp_avail <= 0 && tp_total >= tp_max) //無可用線程,而且線程數已達最大值,等待空閑線程
122. {
123. pthread_cond_wait(&tp_idle, &tp_mutex);
124. }
125.
126. if (tp_avail <= 0) //無可用線程,而且線程數未達最大值,添加線程
127. {
128. if (!add_thread(dispatch_me, args))
129. {
130. return;
131. }
132. }
133. else //有可用線程
134. {
135. tp_avail--;
136. thread = tp_list[tp_avail];
137. tp_list[tp_avail] = NULL;
138. thread->do_job = dispatch_me;
139. thread->args = args;
140.
141. thread->thread_mutex);
142. thread->thread_cond);
143. thread->thread_mutex);
144. }
145.
146. pthread_mutex_unlock(&tp_mutex);
147. }
148.
149.
150. void ThreadPool::syn_all()
151. {
152. if (tp_avail < tp_total) //等待線程池中所有線程都為空閑狀态
153. {
154. pthread_cond_wait(&tp_full, &tp_mutex);
155. }
156.
157. true;
158. int i = 0;
159. for (i = 0; i < tp_avail; i++) //喚醒線程池中所有線程
160. {
161. thread = tp_list[i];
162. thread->thread_mutex);
163. thread->thread_cond);
164. thread->thread_mutex);
165. }
166. if (tp_total > 0)
167. {
168. //等待線程池中所有線程都結束
169. }
170. }
171.
172. ThreadPool::~ThreadPool()
173. {
174. sleep(MANAGE_INTREVAL);
175. pthread_mutex_lock(&tp_mutex);
176. //等待線程池為空
177. int i = 0;
178. for (i = 0; i < tp_total; i++) //資源釋放
179. {
180. free(tp_list[i]);
181. tp_list[i] = NULL;
182. }
183. pthread_mutex_unlock(&tp_mutex);
184. pthread_mutex_destroy(&tp_mutex);
185. pthread_cond_destroy(&tp_idle);
186. pthread_cond_destroy(&tp_full);
187. pthread_cond_destroy(&tp_empty);
188. free(tp_list);
189. }