實作:
1.線程池内初始化n個線程,每個線程都進入一個while循環,并且包含一個任務連結清單
2.當添加新的任務到連結清單中時,發送一個信号去激活線程内其中一個空閑線程,由這個線程去執行任務連結清單中的一個任務 ,并把這個任務從連結清單中删除
3.删除緩存池時,·廣播一個信号給所有的線程,讓各種線程退出,并清除線程池中配置設定的資源。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>
#include <sys/syscall.h>
pid_t gettid(void)
{
return syscall(SYS_gettid);
}
void* routine(void *arg) ;
int poolAddJob(void (*process)(void* arg), void* arg) ;
int poolInit(unsigned int threadNum) ;
int poolDestroy(void) ;
void test(void *) ;
typedef struct job
{
void (*process)(void *arg);
void* arg;
struct job *next;
}job;
typedef struct threadPool
{
pthread_mutex_t pool_lock;
pthread_cond_t job_ready;
job *jobs;
int destroy;
pthread_t *threads;
unsigned int threadNum;
int size;
}threadPool;
static threadPool *pool = NULL;
/*
*initial the thread pool
*/
int poolInit(unsigned int threadNum)
{
int i;
pool = (threadPool*)malloc(sizeof(threadPool));
if ( pool == NULL) return -1;
pthread_mutex_init(&(pool->pool_lock),NULL);
pthread_cond_init(&(pool->job_ready),NULL);
pool->jobs = NULL;
pool->threadNum = threadNum;
pool->size = 0;
pool->destroy = 0;
pool->threads = (pthread_t *)malloc(threadNum * sizeof(pthread_t));
for (i = 0;i<threadNum;i++)
{
pthread_create(&(pool->threads[i]),NULL,routine,NULL);
}
return 0;
}
/*
*add job into the pool
*the pool has a job list
*/
int poolAddJob(void (*process)(void *),void *arg)
{
job *newjob = (job*)malloc(sizeof(job));
newjob->process = process;
newjob->arg = arg;
newjob->next = NULL;
pthread_mutex_lock(&(pool->pool_lock));
job *temp = pool->jobs;
//if the job list is not empty
if(temp != NULL)
{
while(temp->next)
{
temp = temp->next;
}
temp->next = newjob;
}
else //the job list is empty
{
pool->jobs = newjob;
}
pool->size++;
pthread_mutex_unlock(&(pool->pool_lock));
//arise a thread to deal the new job
pthread_cond_signal(&(pool->job_ready));
return 0;
}
int poolDestroy(void)
{
if(pool->destroy) return -1;
int i;
pool->destroy = 1;
//notify all thread
pthread_cond_broadcast(&(pool->job_ready));
for (i = 0;i<pool->threadNum;i++)
{
pthread_join(pool->threads[i],NULL);
}
free(pool->threads);
job *head = NULL;
while(pool->jobs)
{
head = pool->jobs;
pool->jobs = pool->jobs->next;
free(head);
}
pthread_mutex_destroy(&(pool->pool_lock));
pthread_cond_destroy(&(pool->job_ready));
free(pool);
pool = NULL;
return 0;
}
void *routine(void *arg)
{
printf("start thread %u\n",gettid());
while(1)
{
pthread_mutex_lock(&(pool->pool_lock));
while(pool->size == 0 && !pool->destroy)
{
printf("thread %u is waitting\n",gettid());
pthread_cond_wait(&(pool->job_ready),&(pool->pool_lock));
}
if(pool->destroy)
{
pthread_mutex_unlock(&(pool->pool_lock));
printf("thread %u will exit\n",gettid());
pthread_exit(NULL);
}
pool->size--;
printf("thread %u is stating to work,pool->size =%d\n",gettid(),pool->size);
job *newjob = pool->jobs;
pool->jobs = newjob->next;
pthread_mutex_unlock(&(pool->pool_lock));
(*(newjob->process))(newjob->arg);
free(newjob);
newjob = NULL;
}
pthread_exit(NULL);
}
void test(void *arg)
{
printf ("thread %u is working on job %u\n",gettid(),*(int*)arg);
sleep(5);
return;
}
int main()
{
int i;
int *jobs = (int *)malloc(sizeof(int)*10);
poolInit(3);
for (i =0;i<10;i++)
{
jobs[i] = i;
poolAddJob(test,&jobs[i]);
}
sleep(30);
poolDestroy();
free(jobs);
return 0;
}