实现:
1.线程池内初始化n个线程,每个线程都进入一个while循环,并且包含一个任务链表
2.当添加新的任务到链表中时,发送一个信号去激活线程内其中一个空闲线程,由这个线程去执行任务链表中的一个任务 ,并把这个任务从链表中删除
3.删除缓存池时,·广播一个信号给所有的线程,让各种线程退出,并清除线程池中分配的资源。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>
#include <sys/syscall.h>
pid_t gettid(void)
{
return syscall(SYS_gettid);
}
void* routine(void *arg) ;
int poolAddJob(void (*process)(void* arg), void* arg) ;
int poolInit(unsigned int threadNum) ;
int poolDestroy(void) ;
void test(void *) ;
typedef struct job
{
void (*process)(void *arg);
void* arg;
struct job *next;
}job;
typedef struct threadPool
{
pthread_mutex_t pool_lock;
pthread_cond_t job_ready;
job *jobs;
int destroy;
pthread_t *threads;
unsigned int threadNum;
int size;
}threadPool;
static threadPool *pool = NULL;
/*
*initial the thread pool
*/
int poolInit(unsigned int threadNum)
{
int i;
pool = (threadPool*)malloc(sizeof(threadPool));
if ( pool == NULL) return -1;
pthread_mutex_init(&(pool->pool_lock),NULL);
pthread_cond_init(&(pool->job_ready),NULL);
pool->jobs = NULL;
pool->threadNum = threadNum;
pool->size = 0;
pool->destroy = 0;
pool->threads = (pthread_t *)malloc(threadNum * sizeof(pthread_t));
for (i = 0;i<threadNum;i++)
{
pthread_create(&(pool->threads[i]),NULL,routine,NULL);
}
return 0;
}
/*
*add job into the pool
*the pool has a job list
*/
int poolAddJob(void (*process)(void *),void *arg)
{
job *newjob = (job*)malloc(sizeof(job));
newjob->process = process;
newjob->arg = arg;
newjob->next = NULL;
pthread_mutex_lock(&(pool->pool_lock));
job *temp = pool->jobs;
//if the job list is not empty
if(temp != NULL)
{
while(temp->next)
{
temp = temp->next;
}
temp->next = newjob;
}
else //the job list is empty
{
pool->jobs = newjob;
}
pool->size++;
pthread_mutex_unlock(&(pool->pool_lock));
//arise a thread to deal the new job
pthread_cond_signal(&(pool->job_ready));
return 0;
}
int poolDestroy(void)
{
if(pool->destroy) return -1;
int i;
pool->destroy = 1;
//notify all thread
pthread_cond_broadcast(&(pool->job_ready));
for (i = 0;i<pool->threadNum;i++)
{
pthread_join(pool->threads[i],NULL);
}
free(pool->threads);
job *head = NULL;
while(pool->jobs)
{
head = pool->jobs;
pool->jobs = pool->jobs->next;
free(head);
}
pthread_mutex_destroy(&(pool->pool_lock));
pthread_cond_destroy(&(pool->job_ready));
free(pool);
pool = NULL;
return 0;
}
void *routine(void *arg)
{
printf("start thread %u\n",gettid());
while(1)
{
pthread_mutex_lock(&(pool->pool_lock));
while(pool->size == 0 && !pool->destroy)
{
printf("thread %u is waitting\n",gettid());
pthread_cond_wait(&(pool->job_ready),&(pool->pool_lock));
}
if(pool->destroy)
{
pthread_mutex_unlock(&(pool->pool_lock));
printf("thread %u will exit\n",gettid());
pthread_exit(NULL);
}
pool->size--;
printf("thread %u is stating to work,pool->size =%d\n",gettid(),pool->size);
job *newjob = pool->jobs;
pool->jobs = newjob->next;
pthread_mutex_unlock(&(pool->pool_lock));
(*(newjob->process))(newjob->arg);
free(newjob);
newjob = NULL;
}
pthread_exit(NULL);
}
void test(void *arg)
{
printf ("thread %u is working on job %u\n",gettid(),*(int*)arg);
sleep(5);
return;
}
int main()
{
int i;
int *jobs = (int *)malloc(sizeof(int)*10);
poolInit(3);
for (i =0;i<10;i++)
{
jobs[i] = i;
poolAddJob(test,&jobs[i]);
}
sleep(30);
poolDestroy();
free(jobs);
return 0;
}