天天看点

多线程+deque实现生产者消费者模型以及最终实现

创造两个线程分别充当生产者和消费者,生产者和消费者对同一个deque进行操作,生产者负责利用生产数据(

deque : push_back

),消费者消费数据(

deque : pop_back

)。但是由于线程竞争资源的优先级是无法确定的,我们最终无法保证程序运行结束,deque始终为空.(如代码1)

#include<iostream>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include<time.h>
#include<deque>
#include<algorithm>
#include<iterator>
using namespace std;


deque<int> dt;
struct pcst
{
    pthread_mutex_t mutex;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
}shared = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,
    PTHREAD_COND_INITIALIZER};

void* producer(void *arg)
{
    pthread_mutex_lock(&shared.mutex);

    int count = ;
    while(count-- > )
    {
        srand(time(NULL));
        dt.push_back(rand() % );
        cout<<dt.back()<<" in"<<endl;
        sleep();
    }
    pthread_cond_signal(&shared.not_empty);
    pthread_mutex_unlock(&shared.mutex);

}
void* consumer(void *arg)
{
    pthread_mutex_lock(&shared.mutex);

    while(!dt.empty())
    {
        cout<<dt.back()<<" out"<<endl;
        dt.pop_back();
    }
    cout<<"size: "<<dt.size()<<endl;
    pthread_cond_signal(&shared.not_full);
    pthread_mutex_unlock(&shared.mutex);
}
int main(int argc, char const* argv[])
{
    pthread_t p_id;
    pthread_t c_id;

    //生产者生产数据
    pthread_create(&p_id, NULL, producer, NULL);
    //消费者消费数据
    pthread_create(&c_id, NULL, consumer, NULL);
    //等待线程结束
    pthread_join(p_id, NULL);
    pthread_join(c_id, NULL);

    return ;
}
           
多线程+deque实现生产者消费者模型以及最终实现

当消费者程序先竞争到资源后,consumer先执行,导致生产者还未生产数据,消费者先消费数据.

为了解决这个问题,一些人可能设置线程属性中线程的优先级,保证生产者先生产数据,消费者再消费数据,如代码2

#include<iostream>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include<time.h>
#include<deque>
#include<algorithm>
#include<iterator>
using namespace std;


struct shed_param
{
    int sched_priority;
};
deque<int> dt;
struct pcst
{
    pthread_mutex_t mutex;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
}shared = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,
    PTHREAD_COND_INITIALIZER};

void* producer(void *arg)
{
    pthread_mutex_lock(&shared.mutex);

    int count = ;
    while(count-- > )
    {
        srand(time(NULL));
        dt.push_back(rand() % );
        cout<<dt.back()<<" in"<<endl;
        sleep();
    }
    pthread_cond_signal(&shared.not_empty);
    pthread_mutex_unlock(&shared.mutex);

}
void* consumer(void *arg)
{
    pthread_mutex_lock(&shared.mutex);

    while(!dt.empty())
    {
        cout<<dt.back()<<" out"<<endl;
        dt.pop_back();
    }
    cout<<"size: "<<dt.size()<<endl;
    pthread_cond_signal(&shared.not_full);
    pthread_mutex_unlock(&shared.mutex);
}
int main(int argc, char const* argv[])
{
    pthread_t p_id;
    pthread_t c_id;

    //设置线程属性,生产者优先级高于消费者.
    pthread_attr_t attr1, attr2;
    pthread_attr_init(&attr1);
    pthread_attr_init(&attr2);
    struct sched_param sp1;
    sp1.sched_priority = ;
    struct sched_param sp2;
    sp2.sched_priority = ;
    pthread_attr_setschedpolicy(&attr1, SCHED_FIFO);
    pthread_attr_setschedpolicy(&attr2, SCHED_FIFO);
    pthread_attr_setschedparam(&attr1, &sp1);
    pthread_attr_setschedparam(&attr2, &sp2);
    //生产者生产数据
    pthread_create(&p_id, &attr1, producer, NULL);
    //消费者消费数据
    pthread_create(&c_id, &attr2, consumer, NULL);
    //等待线程结束
    pthread_join(p_id, NULL);
    pthread_join(c_id, NULL);

    pthread_attr_destroy(&attr1);
    pthread_attr_destroy(&attr2);
    return ;
}
           

但是不可行

多线程+deque实现生产者消费者模型以及最终实现

解释不可行的原因,总的来说,是调度问题,操作系统不能完全保证高优先级的线程先运行,但是我们可以通过线程间同步来解决这个问题:通过同步条件让某个线程等待(这里是让消费者线程先等待),另外一个线程运行

//gdb会打乱线程的执行,真实环境下时间片等由操作系统决定
//打日志,printf/cout不是线程安全的
#include<iostream>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include<time.h>
#include<deque>
#include<algorithm>
#include<iterator>
using namespace std;

deque<int> dt;
struct pcst
{
    pthread_mutex_t mutex;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
}shared = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,
    PTHREAD_COND_INITIALIZER};

void* producer(void *arg)
{
    pthread_mutex_lock(&shared.mutex);

    int count = ;
    while(count-- > )
    {
        srand(time(NULL));
        dt.push_back(rand() % );
        cout<<dt.back()<<" in"<<endl;
        sleep();
    }
    pthread_cond_signal(&shared.not_empty);
    pthread_mutex_unlock(&shared.mutex);

}
void* consumer(void *arg)
{
    pthread_mutex_lock(&shared.mutex);

    if(dt.empty())
    {
        pthread_cond_wait(&shared.not_empty, &shared.mutex);
        pthread_mutex_unlock(&shared.mutex);
    }
    while(!dt.empty())
    {
        cout<<dt.front()<<" out"<<endl;
        dt.pop_front();
    }
    cout<<"size: "<<dt.size()<<endl;
    pthread_cond_signal(&shared.not_full);
    pthread_mutex_unlock(&shared.mutex);
}
int main(int argc, char const* argv[])
{
    pthread_t p_id;
    pthread_t c_id;

    //设置线程属性,生产者优先级高于消费者.
#if 0
    pthread_attr_t attr1, attr2;
    pthread_attr_init(&attr1);
    pthread_attr_init(&attr2);
    struct sched_param sp1;
    sp1.sched_priority = ;
    struct sched_param sp2;
    sp2.sched_priority = ;
    pthread_attr_setschedpolicy(&attr1, SCHED_FIFO);
    pthread_attr_setschedpolicy(&attr2, SCHED_FIFO);
    pthread_attr_setschedparam(&attr1, &sp1);
    pthread_attr_setschedparam(&attr2, &sp2);
#endif
    //生产者生产数据
    pthread_create(&p_id, NULL, producer, NULL);
    //消费者消费数据
    pthread_create(&c_id, NULL, consumer, NULL);
    //等待线程结束
    pthread_join(p_id, NULL);
    pthread_join(c_id, NULL);
#if 0
    pthread_attr_destroy(&attr1);
    pthread_attr_destroy(&attr2);
#endif
    return ;
}
           

这就是最终的代码

测试结果

多线程+deque实现生产者消费者模型以及最终实现

继续阅读