天天看點

簡單實作帶有資料緩沖池的生産者消費者模型

//先把那個資料池實作了再說。

//more UserData.h

#ifndef USER_DATA_H

#define USER_DATA_H

#include <pthread.h>

#include <queue>

#include <string>

using namespace std;

class UserData //定義一個資料緩沖池,這樣就可以限定這個緩沖池的大小,輕松實作生産者消費者模型,使之更加安全可靠。

{

public:

        UserData();

        ~UserData();

void SetMaxNum(int m);

        void DataPush(string str);

        string DataPop();

private:

        queue<string> strQueue;

        int maxNum;

        pthread_mutex_t lock;

        pthread_cond_t full;

        pthread_cond_t empty;

};

#endif

//more UserData.cpp

#include "UserData.h"

#include <iostream>

UserData::UserData()

{

        pthread_mutex_init(&lock,0);

        pthread_cond_init(&full,0);

        pthread_cond_init(&empty,0);

}

UserData::~UserData()

{

pthread_mutex_destroy(&lock);

pthread_cond_destroy(&full);

pthread_cond_destroy(&empty);

}

void UserData::SetMaxNum(int m)

{

        maxNum = m;

}

void UserData::DataPush(string str)

{

        pthread_mutex_lock(&lock);

        while(strQueue.size() >= maxNum)

        {

                std::cout << "full" << endl;

                pthread_cond_wait(&full,&lock);

        }

        strQueue.push(str);

        pthread_cond_broadcast(&empty);

        pthread_mutex_unlock(&lock);

}

string UserData::DataPop()

{

        string resStr;

        pthread_mutex_lock(&lock);

        while(strQueue.empty())

        {

                std::cout << "empty" << endl;

                pthread_cond_wait(&empty,&lock);

        }

        resStr = strQueue.front();

        strQueue.pop();

        pthread_cond_broadcast(&full);

        pthread_mutex_unlock(&lock);

        return resStr;

}

//main.cpp

#include <iostream>

using namespace std;

#include <pthread.h>

#include "UserData.h"

#include <unistd.h>

class Thread //編寫一個線程基類,是如此的簡單,呵呵

{

private:

        pthread_t t;

public:

        virtual void run()=0;

        static void* run(void* p);

        void start();

};

void* Thread::run(void* p)

{

        Thread* pThread = (Thread*)p;

        pThread->run();

}

void Thread::start()

{

        pthread_create(&t,0,run,this);

}

//線程基類到此編寫完畢,下面的類隻要繼承這個基類就可以輕松實作線程的建立了。

class Productor:public Thread //這個生産者就這樣随便寫一個了,他的任務就是給緩沖池裡面扔資料。

{

private:

        UserData* pUserData;

public:

        void run();

        Productor(UserData* ud);

        void Produce();

};

Productor::Productor(UserData* ud)

{

        pUserData = ud;

}

void Productor::run()

{

        Produce();

        delete this; //非常抱歉,這裡又來這一套,但是以後我會再想一個辦法解決記憶體釋放的問題的。

}

void Productor::Produce()

{

        int num = 15; //在這裡又來搞這一套,連續往資料池裡扔15個資料。當然這裡不一定是真正的連續,因為資料池的最大資料量可能不到15,

//資料滿了之後//那個扔資料的過程就會等待,等到資料池不滿為止。

        string str = "love";

        while(num--)

        {

                pUserData->DataPush(str);

                cout << "product " << str << endl;

        }

}

class Consumer:public Thread  //簡單定義一個消費者

{

private:

        UserData* pUserData;

public:

        void run();

        Consumer(UserData* ud);

        void Consume();

};

Consumer::Consumer(UserData* ud)

{

        pUserData = ud;

}

void Consumer::run()

{

        int num = 15;

        while(num--)

        {

                Consume();

                sleep(1); //這是讓消費者消費的慢一點,好讓大家知道,生産者很快,但是資料池滿了之後,他會等待消費者消費,等不滿之後再生産,他會很耐心,不會等不及

//就往下跑。

        }

        delete this;

}

void Consumer::Consume()

{

        cout << "consume " <<  pUserData->DataPop() << endl;

}

int main()

{

        UserData* pUserData = new UserData();

        pUserData->SetMaxNum(8);

        Productor* pPro = new Productor(pUserData);

        pPro->start();

        Consumer* pCon = new Consumer(pUserData);

        pCon->start();

delete pUserData;// 注意new和delete是應該成對出現的。雖然上面中間兩個new在這裡你沒有看到delete,但是其實是有的,我在run函數裡寫了。

        pause();

        return 0;

}