天天看點

使用pthread模拟一個簡單線程池

實作:

1.線程池内初始化n個線程,每個線程都進入一個while循環,并且包含一個任務連結清單

2.當添加新的任務到連結清單中時,發送一個信号去激活線程内其中一個空閑線程,由這個線程去執行任務連結清單中的一個任務 ,并把這個任務從連結清單中删除

3.删除緩存池時,·廣播一個信号給所有的線程,讓各種線程退出,并清除線程池中配置設定的資源。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>
#include <sys/syscall.h>
pid_t gettid(void)
{
    return syscall(SYS_gettid);
}

void* routine(void *arg) ;
int poolAddJob(void (*process)(void* arg), void* arg) ;
int poolInit(unsigned int threadNum) ;
int poolDestroy(void) ;
void test(void *) ;

typedef struct job
{
    void (*process)(void *arg);
    void* arg;
    struct job *next;
}job;


typedef struct threadPool
{
    pthread_mutex_t pool_lock;
    pthread_cond_t job_ready;
    job *jobs;
    int destroy;
    pthread_t *threads;
    unsigned int threadNum;
    int size;
}threadPool;

static threadPool *pool = NULL;

/*
 *initial the thread pool
 */
int poolInit(unsigned int threadNum)
{
    int i;
    pool = (threadPool*)malloc(sizeof(threadPool));
    if ( pool == NULL) return -1;
    pthread_mutex_init(&(pool->pool_lock),NULL);
    pthread_cond_init(&(pool->job_ready),NULL);
    pool->jobs = NULL;
    pool->threadNum = threadNum;
    pool->size = 0;
    pool->destroy = 0;
    pool->threads = (pthread_t *)malloc(threadNum * sizeof(pthread_t));
    for (i = 0;i<threadNum;i++)
    {
        pthread_create(&(pool->threads[i]),NULL,routine,NULL);
    }
    return 0;
}


/*
 *add job into the pool
 *the pool has a job list
 */

int poolAddJob(void (*process)(void *),void *arg)
{
    job *newjob = (job*)malloc(sizeof(job));
    newjob->process = process;
    newjob->arg = arg;
    newjob->next = NULL;

    pthread_mutex_lock(&(pool->pool_lock));
    job *temp = pool->jobs;

    //if the job list is not empty
    if(temp != NULL)
    {
        while(temp->next)
        {
            temp = temp->next;
        }
        temp->next = newjob;
    }
    else //the job list is empty
    {
        pool->jobs = newjob;
    }

    pool->size++;
    pthread_mutex_unlock(&(pool->pool_lock));
    //arise a thread to deal the new job
    pthread_cond_signal(&(pool->job_ready));
    return 0;
}

int poolDestroy(void)
{
    if(pool->destroy)   return -1;
    int i;
    pool->destroy = 1;
    //notify all thread
    pthread_cond_broadcast(&(pool->job_ready));
    for (i = 0;i<pool->threadNum;i++)
    {
        pthread_join(pool->threads[i],NULL);
    }

    free(pool->threads);
    job *head = NULL;
    while(pool->jobs)
    {
        head = pool->jobs;
        pool->jobs = pool->jobs->next;
        free(head);
    }

    pthread_mutex_destroy(&(pool->pool_lock));
    pthread_cond_destroy(&(pool->job_ready));
    free(pool);
    pool = NULL;
    return 0;
}


void *routine(void *arg)
{
    printf("start  thread %u\n",gettid());
    while(1)
    {
        pthread_mutex_lock(&(pool->pool_lock));
        while(pool->size == 0 && !pool->destroy)
        {
            printf("thread %u is waitting\n",gettid());
            pthread_cond_wait(&(pool->job_ready),&(pool->pool_lock));
        }

        if(pool->destroy)
        {
            pthread_mutex_unlock(&(pool->pool_lock));
            printf("thread %u will exit\n",gettid());
            pthread_exit(NULL);
        }

        pool->size--;
        printf("thread %u is stating to work,pool->size =%d\n",gettid(),pool->size);
        job *newjob = pool->jobs;
        pool->jobs = newjob->next;
        pthread_mutex_unlock(&(pool->pool_lock));
        (*(newjob->process))(newjob->arg);
        free(newjob);
        newjob = NULL;
    }
    pthread_exit(NULL);
}

void test(void *arg)
{
    printf ("thread %u is working on job %u\n",gettid(),*(int*)arg);
    sleep(5);
    return;
}

int main()
{
    int i;
    int *jobs = (int *)malloc(sizeof(int)*10);
    poolInit(3);
    for (i =0;i<10;i++)
    {
        jobs[i] = i;
        poolAddJob(test,&jobs[i]);
    }
    sleep(30);
    poolDestroy();
    free(jobs);
    return 0;
}