最近看到的一個很好的C語言實作的線程池。這個線程池具有比較好的動态調整能力。
https://docs.oracle.com/cd/E19120-01/open.solaris/816-5137/ggedn/index.html
外部接口:
-
構造一個線程池,注意這裡并不建立線程,隻是設定相關參數
min_threads:線程池中存在的最少可用的線程數;
max_threads:線程池中存在的最大可用的線程數;
linger:idle工作線程退出之前存在的秒數;
attr:建立線程相關的屬性;
調用成功傳回一個線程池句柄,錯誤則傳回NULL,且errno置錯誤碼。
typedef struct thr_pool thr_pool_t;
thr_pool_t *thr_pool_create(uint_t min_threads, uint_t max_threads,
uint_t linger, pthread_attr_t *attr);
-
将工作任務加入到線程池任務隊列
void (*func)(void ):要執行的回調函數;
arg:回調函數參數;
int thr_pool_queue(thr_pool_t *pool,
void *(*func)(void *), void *arg);
- 等待線程池任務隊列中任務完成
void thr_pool_wait(thr_pool_t *pool);
- 取消線程池中任務,析構線程池
void thr_pool_destroy(thr_pool_t *pool);
線程池結構體struct thr_pool如下:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwczLcVmds92czlGZvwVP9EUTDZ0aRJkSwk0LcxGbpZ2LcBDM08CXlpXazRnbvZ2LcRlMMVDT2EWNvwFdu9mZvwVP9EkT1sGVN5WNXlFcGNDZxg2MMBjVtJWd0ckW65UbM5WOHJWa5kHTvwFd4VGdvwlMvw1ayFWbyVGdhd3PyQjNycDMxETMyMDM4EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
- 能夠在運作時建立多個線程池,通過pool_forw和pool_back兩個指針域構成一個雙向循環連結清單,來将建立的線程池連結起來。
- pool_mutex、pool_busycv、pool_workcv、pool_waitcv。互斥鎖和條件變量保證線程同步。
- pool_active指針域将目前活躍的任務通過單連結清單連結在一起,連結清單尾指向NULL,注意該連結清單是通過活躍線程的棧空間連結起來的。
- pool_head、pool_tail指向線程中任務隊列的頭和尾。任務隊列節點結構struct job中包含指向下一個節點的指針、任務執行的使用者函數以及參數。
- pool_attr為線程屬性。
- pool_flags線程池狀态。
- pool_linger為idle線程退出前存活的秒數。
- minimum、maximum、pool_nthreads分别為最小線程數,最大線程數以及目前活躍的線程數。
源代碼:
由于之前的實作是在solaris上的,是以做了一點修改能在linux下使用。
thr_pool.h
/*
* Declarations for the clients of a thread pool.
*/
#include <pthread.h>
typedef unsigned int uint_t;
/*
* The thr_pool_t type is opaque to the client.
* It is created by thr_pool_create() and must be passed
* unmodified to the remainder of the interfaces.
*/
typedef struct thr_pool thr_pool_t;
/*
* Create a thread pool.
* min_threads: the minimum number of threads kept in the pool,
* always available to perform work requests.
* max_threads: the maximum number of threads that can be
* in the pool, performing work requests.
* linger: the number of seconds excess idle worker threads
* (greater than min_threads) linger before exiting.
* attr: attributes of all worker threads (can be NULL);
* can be destroyed after calling thr_pool_create().
* On error, thr_pool_create() returns NULL with errno set to the error code.
*/
extern thr_pool_t *thr_pool_create(uint_t min_threads, uint_t max_threads,
uint_t linger, pthread_attr_t *attr);
/*
* Enqueue a work request to the thread pool job queue.
* If there are idle worker threads, awaken one to perform the job.
* Else if the maximum number of workers has not been reached,
* create a new worker thread to perform the job.
* Else just return after adding the job to the queue;
* an existing worker thread will perform the job when
* it finishes the job it is currently performing.
*
* The job is performed as if a new detached thread were created for it:
* pthread_create(NULL, attr, void *(*func)(void *), void *arg);
*
* On error, thr_pool_queue() returns - with errno set to the error code.
*/
extern int thr_pool_queue(thr_pool_t *pool,
void *(*func)(void *), void *arg);
/*
* Wait for all queued jobs to complete.
*/
extern void thr_pool_wait(thr_pool_t *pool);
/*
* Cancel all queued jobs and destroy the pool.
*/
extern void thr_pool_destroy(thr_pool_t *pool);
thr_pool.c
/*
* Thread pool implementation.
* See <thr_pool.h> for interface declarations.
*/
#if !defined(_REENTRANT)
#define _REENTRANT
#endif
#include "thr_pool.h"
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
typedef void (*pFun)(void *);
/*
* FIFO queued job
*/
typedef struct job job_t;
struct job {
job_t *job_next; /* linked list of jobs */
void *(*job_func)(void *); /* function to call */
void *job_arg; /* its argument */
};
/*
* List of active worker threads, linked through their stacks.
*/
typedef struct active active_t;
struct active {
active_t *active_next; /* linked list of threads */
pthread_t active_tid; /* active thread id */
};
/*
* The thread pool, opaque to the clients.
*/
struct thr_pool {
thr_pool_t *pool_forw; /* circular linked list */
thr_pool_t *pool_back; /* of all thread pools */
pthread_mutex_t pool_mutex; /* protects the pool data */
pthread_cond_t pool_busycv; /* synchronization in pool_queue */
pthread_cond_t pool_workcv; /* synchronization with workers */
pthread_cond_t pool_waitcv; /* synchronization in pool_wait() */
active_t *pool_active; /* list of threads performing work */
job_t *pool_head; /* head of FIFO job queue */
job_t *pool_tail; /* tail of FIFO job queue */
pthread_attr_t pool_attr; /* attributes of the workers */
int pool_flags; /* see below */
uint_t pool_linger; /* seconds before idle workers exit */
int pool_minimum; /* minimum number of worker threads */
int pool_maximum; /* maximum number of worker threads */
int pool_nthreads; /* current number of worker threads */
int pool_idle; /* number of idle workers */
};
/* pool_flags */
#define POOL_WAIT /* waiting in thr_pool_wait() */
#define POOL_DESTROY /* pool is being destroyed */
/* the list of all created and not yet destroyed thread pools */
static thr_pool_t *thr_pools = NULL;
/* protects thr_pools */
static pthread_mutex_t thr_pool_lock = PTHREAD_MUTEX_INITIALIZER;
/* set of all signals */
static sigset_t fillset;
static void *worker_thread(void *);
static int
create_worker(thr_pool_t *pool)
{
sigset_t oset;
int error;
pthread_t thread;
(void) pthread_sigmask(SIG_SETMASK, &fillset, &oset);
error = pthread_create(&thread, &pool->pool_attr, worker_thread, pool); //建立線程
(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
return (error);
}
/*
* Worker thread is terminating. Possible reasons:
* - excess idle thread is terminating because there is no work.
* - thread was cancelled (pool is being destroyed).
* - the job function called pthread_exit().
* In the last case, create another worker thread
* if necessary to keep the pool populated.
*/
static void
worker_cleanup(void *vpool)
{
thr_pool_t *pool=(thr_pool_t *)vpool;
--pool->pool_nthreads;
if (pool->pool_flags & POOL_DESTROY) {
if (pool->pool_nthreads == )
(void) pthread_cond_broadcast(&pool->pool_busycv);
} else if (pool->pool_head != NULL &&
pool->pool_nthreads < pool->pool_maximum &&
create_worker(pool) == ) {
pool->pool_nthreads++;
}
(void) pthread_mutex_unlock(&pool->pool_mutex);
}
static void
notify_waiters(thr_pool_t *pool)
{
if (pool->pool_head == NULL && pool->pool_active == NULL) { //任務隊列和活躍線程為空
pool->pool_flags &= ~POOL_WAIT; //清零等待狀态
(void) pthread_cond_broadcast(&pool->pool_waitcv); //激活所有線程
}
}
/*
* Called by a worker thread on return from a job.
*/
static void
job_cleanup(void *vpool)
{
thr_pool_t *pool=(thr_pool_t *)vpool;
pthread_t my_tid = pthread_self();
active_t *activep;
active_t **activepp;
(void) pthread_mutex_lock(&pool->pool_mutex);
for (activepp = &pool->pool_active;
(activep = *activepp) != NULL;
activepp = &activep->active_next) {
if (activep->active_tid == my_tid) {
*activepp = activep->active_next;
break;
}
}
if (pool->pool_flags & POOL_WAIT)
notify_waiters(pool);
}
static void *
worker_thread(void *arg)
{
thr_pool_t *pool = (thr_pool_t *)arg;
int timedout;
job_t *job;
void *(*func)(void *);
active_t active;
struct timespec ts;
/*
* This is the worker's main loop. It will only be left
* if a timeout occurs or if the pool is being destroyed.
*/
(void) pthread_mutex_lock(&pool->pool_mutex); //加鎖
pthread_cleanup_push(worker_cleanup, pool); //将工作線程清理函數壓棧
active.active_tid = pthread_self(); //獲得活躍線程id
for (;;) {
/*
* We don't know what this thread was doing during
* its last job, so we reset its signal mask and
* cancellation state back to the initial values.
*/
(void) pthread_sigmask(SIG_SETMASK, &fillset, NULL);
(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); //取消請求被延期直到
(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
timedout = ; //逾時标志清零
pool->pool_idle++; //idle線程加1
if (pool->pool_flags & POOL_WAIT) //線程池狀态為等待狀态
notify_waiters(pool); //通知等待線程
while (pool->pool_head == NULL &&
!(pool->pool_flags & POOL_DESTROY)) { //任務隊列為空并且線程池沒有删除
if (pool->pool_nthreads <= pool->pool_minimum) { //如果目前工作線程小于等于線程池設定最小線程數
(void) pthread_cond_wait(&pool->pool_workcv,
&pool->pool_mutex); //阻塞目前線程
} else { //否則設定定時器
(void) clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += pool->pool_linger;
if (pool->pool_linger == ||
pthread_cond_timedwait(&pool->pool_workcv,
&pool->pool_mutex, &ts) == ETIMEDOUT) {
timedout = ; //逾時标志置1
break;
}
}
}
pool->pool_idle--; //空閑線程減1
if (pool->pool_flags & POOL_DESTROY) //線程池狀态為删除态,則跳出
break;
if ((job = pool->pool_head) != NULL) { //任務隊列不為空
timedout = ; //逾時标志清零
func = job->job_func;
arg = job->job_arg;
pool->pool_head = job->job_next; //任務出隊
if (job == pool->pool_tail) //若是任務隊列最後一個任務
pool->pool_tail = NULL;
active.active_next = pool->pool_active; //将當期活躍工作線程加入活躍線程連結清單,頭插,注意建立在棧中
pool->pool_active = &active;
(void) pthread_mutex_unlock(&pool->pool_mutex); //解鎖
pthread_cleanup_push(job_cleanup, pool); //壓棧任務清零函數
free(job); //釋放目前任務
/*
* Call the specified job function.
*/
(void) func(arg); //指向真正的使用者函數
/*
* If the job function calls pthread_exit(), the thread
* calls job_cleanup(pool) and worker_cleanup(pool);
* the integrity of the pool is thereby maintained.
*/
pthread_cleanup_pop(); /* job_cleanup(pool) */
}
if (timedout && pool->pool_nthreads > pool->pool_minimum) { //逾時并且目前工作線程大于線程池設定最小線程數
/*
* We timed out and there is no work to be done
* and the number of workers exceeds the minimum.
* Exit now to reduce the size of the pool.
*/
break;
}
}
pthread_cleanup_pop(); /* worker_cleanup(pool) */
return (NULL);
}
static void
clone_attributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr)
{
struct sched_param param;
void *addr;
size_t size;
int value;
(void) pthread_attr_init(new_attr);
//設定線程相關屬性
if (old_attr != NULL) {
(void) pthread_attr_getstack(old_attr, &addr, &size);
/* don't allow a non-NULL thread stack address */
(void) pthread_attr_setstack(new_attr, NULL, size);
(void) pthread_attr_getscope(old_attr, &value);
(void) pthread_attr_setscope(new_attr, value);
(void) pthread_attr_getinheritsched(old_attr, &value);
(void) pthread_attr_setinheritsched(new_attr, value);
(void) pthread_attr_getschedpolicy(old_attr, &value);
(void) pthread_attr_setschedpolicy(new_attr, value);
(void) pthread_attr_getschedparam(old_attr, ¶m);
(void) pthread_attr_setschedparam(new_attr, ¶m);
(void) pthread_attr_getguardsize(old_attr, &size);
(void) pthread_attr_setguardsize(new_attr, size);
}
/* make all pool threads be detached threads */
(void) pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED);
}
thr_pool_t *
thr_pool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
pthread_attr_t *attr)
{
thr_pool_t *pool;
(void) sigfillset(&fillset); //加入所有信号
if (min_threads > max_threads || max_threads < ) {
errno = EINVAL;
return (NULL);
}
if ((pool = malloc(sizeof (*pool))) == NULL) { //為線程池結構體配置設定記憶體
errno = ENOMEM;
return (NULL);
}
//初始化線程池結構體
(void) pthread_mutex_init(&pool->pool_mutex, NULL);
(void) pthread_cond_init(&pool->pool_busycv, NULL);
(void) pthread_cond_init(&pool->pool_workcv, NULL);
(void) pthread_cond_init(&pool->pool_waitcv, NULL);
pool->pool_active = NULL;
pool->pool_head = NULL;
pool->pool_tail = NULL;
pool->pool_flags = ;
pool->pool_linger = linger;
pool->pool_minimum = min_threads;
pool->pool_maximum = max_threads;
pool->pool_nthreads = ;
pool->pool_idle = ;
/*
* We cannot just copy the attribute pointer.
* We need to initialize a new pthread_attr_t structure using
* the values from the caller-supplied attribute structure.
* If the attribute pointer is NULL, we need to initialize
* the new pthread_attr_t structure with default values.
*/
clone_attributes(&pool->pool_attr, attr); //設定線程建立屬性
/* insert into the global list of all thread pools */
(void) pthread_mutex_lock(&thr_pool_lock); //加鎖
if (thr_pools == NULL) { //建立連結清單第一個節點
pool->pool_forw = pool;
pool->pool_back = pool;
thr_pools = pool;
} else { //後面插入
thr_pools->pool_back->pool_forw = pool;
pool->pool_forw = thr_pools;
pool->pool_back = thr_pools->pool_back;
thr_pools->pool_back = pool;
}
(void) pthread_mutex_unlock(&thr_pool_lock); //解鎖
return (pool);
}
int
thr_pool_queue(thr_pool_t *pool, void *(*func)(void *), void *arg)
{
job_t *job;
if ((job = malloc(sizeof (*job))) == NULL) { //為job節點配置設定空間
errno = ENOMEM;
return (-);
}
//初始化job節點
job->job_next = NULL;
job->job_func = func;
job->job_arg = arg;
(void) pthread_mutex_lock(&pool->pool_mutex); //加鎖
//job入隊,尾部入隊
if (pool->pool_head == NULL) //插入第一個job
pool->pool_head = job;
else
pool->pool_tail->job_next = job;
pool->pool_tail = job;
if (pool->pool_idle > ) //有idle線程就從中喚醒一個
(void) pthread_cond_signal(&pool->pool_workcv);
else if (pool->pool_nthreads < pool->pool_maximum &&
create_worker(pool) == ) //目前線程數小于設定的總線程數,則建立一個線程
pool->pool_nthreads++; //目前線程數加1
(void) pthread_mutex_unlock(&pool->pool_mutex); //解鎖
return ();
}
void
thr_pool_wait(thr_pool_t *pool)
{
(void) pthread_mutex_lock(&pool->pool_mutex); //加鎖
pthread_cleanup_push((pFun)pthread_mutex_unlock, &pool->pool_mutex); //壓棧解鎖函數
while (pool->pool_head != NULL || pool->pool_active != NULL) { //任務隊列不為空或者目前活躍線程連結清單不為空
pool->pool_flags |= POOL_WAIT; //置線程池狀态為等待狀态
(void) pthread_cond_wait(&pool->pool_waitcv, &pool->pool_mutex); //阻塞目前線程
}
pthread_cleanup_pop(); /* pthread_mutex_unlock(&pool->pool_mutex); */
}
void
thr_pool_destroy(thr_pool_t *pool)
{
active_t *activep;
job_t *job;
(void) pthread_mutex_lock(&pool->pool_mutex); //加鎖
pthread_cleanup_push((pFun)pthread_mutex_unlock, &pool->pool_mutex); //壓棧解鎖函數
/* mark the pool as being destroyed; wakeup idle workers */
pool->pool_flags |= POOL_DESTROY; //線程池狀态置為删除态
(void) pthread_cond_broadcast(&pool->pool_workcv); //喚醒所有idle線程
/* cancel all active workers */
for (activep = pool->pool_active;
activep != NULL;
activep = activep->active_next) //取消所有活躍線程
(void) pthread_cancel(activep->active_tid);
/* wait for all active workers to finish */
while (pool->pool_active != NULL) {
pool->pool_flags |= POOL_WAIT;
(void) pthread_cond_wait(&pool->pool_waitcv, &pool->pool_mutex);
}
/* the last worker to terminate will wake us up */
while (pool->pool_nthreads != )
(void) pthread_cond_wait(&pool->pool_busycv, &pool->pool_mutex);
pthread_cleanup_pop(); /* pthread_mutex_unlock(&pool->pool_mutex); */
/*
* Unlink the pool from the global list of all pools.
*/
(void) pthread_mutex_lock(&thr_pool_lock);
if (thr_pools == pool)
thr_pools = pool->pool_forw;
if (thr_pools == pool)
thr_pools = NULL;
else {
pool->pool_back->pool_forw = pool->pool_forw;
pool->pool_forw->pool_back = pool->pool_back;
}
(void) pthread_mutex_unlock(&thr_pool_lock);
/*
* There should be no pending jobs, but just in case...
*/
for (job = pool->pool_head; job != NULL; job = pool->pool_head) {
pool->pool_head = job->job_next;
free(job);
}
(void) pthread_attr_destroy(&pool->pool_attr);
free(pool);
}