天天看點

[linux]-------線程池線程池概念線程池的應用場景線程池的實作

上一篇部落格我們實作了讀者寫者模型,在這篇部落格中更進一步的完成線程池。

線程池概念

開辟一塊記憶體空間,裡面存在大量的(未死亡的)線程,池中的線程排程由池管理器來處理,當有線程任務時,從池中選一個線程運作,當運作完畢後,該線程又傳回池中,這樣就避免了反複建立線程所帶來的性能開銷,節省了系統資源。

如果對線程概念不清晰的話,不妨先看看我是一個線程這篇文章

線程池的應用場景

  1. 需要大量的線程來完成任務,且完成任務的時間比較短,例如WEB伺服器完成網頁請求。
  2. 對性能要求苛刻的應用,比如要求伺服器立即響應客戶請求
  3. 接受突發性的大量請求,但是不至于時伺服器是以産生大量的線程的應用。

    線程池就是圍繞上面幾種情況而産生的,下面我們自己實作一個線程池,暫時無法展現出線程池的應用場景。

線程池的實作

我們需要設計線程池這個類的成員有哪些。

  1. 計數器 用于記錄空餘線程的個數
  2. 任務隊列:用于儲存任務的緩沖區
  3. 信号量
class thread_pool
{
private:
	size_t thread_num;//記錄目前有多少程序空閑
	queue<Task> thread_queue;
	pthread_cond_t cond;
	pthread_mutex_t mutex;
}
           

構造函數和析構函數主要是對信号量和鎖進行初始化

thread_pool(int num = 5): thread_num(num)
	{
		pthread_cond_init(&cond,nullptr);
		pthread_mutex_init(&mutex, nullptr);
	}
	~thread_pool()
	{
		pthread_cond_destroy(&cond);
		pthread_mutex_destroy(&mutex);
	}
           

建立線程,對線程池初始化

void Init_thread_pool()
	{
		pthread_t id;
		for(int i = 0; i < thread_num; i++)
		{
			pthread_create(&id, nullptr, Run_func, this);//注意把this作為變量傳給Run_func,這裡隻做示範隻寫了一個運作的函數。
		}
	}

           

下面是整個線程池的重點,執行任務。

static void* Run_func(void* arg)
	{
		pthread_detach(pthread_self());
		thread_pool* pool = (thread_pool*) arg;
		while (1)
		{
			pool->thread_queue;
			while (pool->Is_empty())
			{
				pool->thread_Idle();//idle
			}
			Task it;
			pool->pop_task(it);
			pool->unlock_queue();
			it.Run();
		}
           

首先,因為pthread_create函數的第三個參數為 void *(*start_routine) (void *),但是對于類中的非靜态函數,已經隐含的傳入了this變量,是以我們必須把該函數設為靜态變量,并且把this指針作為參數傳入函數中。

其次,可以發現使用while循環而不是用if判斷隊列是否為空,這是因為有多個線程可能會同時操作,while循環能夠防止誤判,下面這一段雖然說的是生産者消費者模式的wait,但是我覺得也能說明問題

永遠不要在循環之外調用wait方法

《Effective Java》第二版中文版第69條244頁位置對這一點說了一頁,生産者和消費者問題來說:錯誤情況一:如果有兩個生産者A和B,一個消費者C。當存儲空間滿了之後,生産者A和B都被wait,進入等待喚醒隊列。當消費者C取走了一個資料後,如果調用了notifyAll(),注意,此處是調用notifyAll(),則生産者線程A和B都将被喚醒,如果此時A和B中的wait不在while循環中而是在if中,則A和B就不會再次判斷是否符合執行條件,都将直接執行wait()之後的程式,那麼如果A放入了一個資料至存儲空間,則此時存儲空間已經滿了;但是B還是會繼續往存儲空間裡放資料,錯誤便産生了。錯誤情況二:如果有兩個生産者A和B,一個消費者C。當存儲空間滿了之後,生産者A和B都被wait,進入等待喚醒隊列。當消費者C取走了一個資料後,如果調用了notify(),則A和B中的一個将被喚醒,假設A被喚醒,則A向存儲空間放入了一個資料,至此空間就滿了。A執行了notify()之後,如果喚醒了B,那麼B不會再次判斷是否符合執行條件,将直接執行wait()之後的程式,這樣就導緻向已經滿了資料存儲區中再次放入資料。錯誤産生。

最後我們發現,任務的執行是在鎖外面而不是在鎖裡面,難道不管程序安全了嗎?,仔細想想,發現當得到了配置設定的線程就沒人和你競争了,既然沒人和你競争了,自然也不需要鎖的保護了,另外假如我們讓任務的執行放到鎖裡面,事實上,是讓程式變成了串行,效率還不如之前。

完成隻差一步,下面的函數,用來傳入任務,喚醒函數是為了防止隊列裡原來沒有任務,插入新的任務需要喚醒。

void push_task(const Task& t)
	{
		lock_queue();
		thread_queue.push(t);
		wake_thread();
		unlock_queue();
	}
           

完成了上面的函數,基本上就把線程池完成了大半,下面是完整的代碼。

#include <queue>
#include <iostream>
#include <vector>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
using namespace std;

int cal(int x, int y, int op);
typedef int (*HandlerTask_t)(int x,int y,int op);//定義一個函數指針

class Task
{
private:
	int _left;
	int _right;
	int _op;
	HandlerTask_t _handler;
public:
	Task(int left = 0, int right = 0, int op = 0) :_left(left), _right(right), _op(op)
	{
	}

	void Register(HandlerTask_t handler)
	{
		_handler = handler;
	}

	void Run()//任務執行
	{
		int ret = _handler(_left, _right, _op);
		const char* str = "+-*/";
		cout << "thread is [" << pthread_self() << _left << str[_op] << _right << "=" << ret << endl;

	}
	~Task()
	{

	}
};
class thread_pool
{
private:
	size_t thread_num;//記錄目前有多少程序空閑
	queue<Task> thread_queue;
	pthread_cond_t cond;
	pthread_mutex_t mutex;
public:
	thread_pool(int num = 5): thread_num(num)
	{
		pthread_cond_init(&cond,nullptr);
		pthread_mutex_init(&mutex, nullptr);
	}
	~thread_pool()
	{
		pthread_cond_destroy(&cond);
		pthread_mutex_destroy(&mutex);
	}

	void Init_thread_pool()
	{
		pthread_t id;
		for(int i = 0; i < thread_num; i++)
		{
			pthread_create(&id, nullptr, Run_func, this);//注意把this作為變量傳給Run_func,這裡隻做示範隻寫了一個運作的函數。
		}
	}

	void lock_queue()
	{
		pthread_mutex_lock(&mutex);
	}

	void unlock_queue()
	{
		pthread_mutex_unlock(&mutex);
	}

	bool Is_empty()
	{
		return thread_queue.size() == 0;
	}

	void thread_Idle()
	{
		thread_num++;
		pthread_cond_wait(&cond, &mutex);
		thread_num--;
	}
	void wake_thread()
	{
		pthread_cond_signal(&cond);
	}

	void pop_task(Task& t)
	{
		t = thread_queue.front();
		thread_queue.pop();
	}
	
	void push_task(const Task& t)
	{
		lock_queue();
		thread_queue.push(t);
		wake_thread();
		unlock_queue();
	}
		static void* Run_func(void* arg)
	{
		pthread_detach(pthread_self());
		thread_pool* pool = (thread_pool*) arg;
		while (1)
		{
			pool->thread_queue;
			while (pool->Is_empty())
			{
				pool->thread_Idle();//idle
			}
			Task it;
			pool->pop_task(it);
			pool->unlock_queue();
			it.Run();
		}


	}

};



int cal(int x, int y, int op)//模拟一個電腦
{
	int ret = -1;
	switch (op)
	{
	case 0:
		ret = x + y;
		break;
	case 1:
		ret = x - y;
		break;
	case 2:
		ret = x * y;
		break;
	case 3:
		ret = x / y;
		break;
	default:
		cout << "cal error!" << endl;
		break;
	}
}

int main()
{
	thread_pool tp;
	tp.Init_thread_pool();
	srand((unsigned long)time(NULL));
	for (;;) {
		int x = rand() % 100 + 1;
		int y = rand() % 100 + 1;
		int op = rand() % 4;
		Task t(x, y, op);
		t.Register(cal);
		tp.push_task(t);
		sleep(1);
	}
	return 0;
}
           

運作結果如下(仔細想想是真的閑)

[linux]-------線程池線程池概念線程池的應用場景線程池的實作

總結

線程池是為了需要大量的線程來完成任務,且完成任務的時間比較短的情況而産生的,節省了開辟銷毀線程的資源開銷,可以使用多種方式實作.

完成了上面的代碼,我們對于linux的線程的學習告一段落,在下篇部落格中我們終于可以學習網絡相關的知識。