一、qt線程池
線程池的概念隻要是程式員大概都知道一些,這裡就不重複進行。但大部分線程池都是針對背景業務。那麼用戶端使用線程池會有什麼不同呢,特别是qt用戶端,使用線程池擷取傳回值,會不會影響界面阻塞等。
二、實作原理
實作機制:線程池使用生産者消費者模型,維護一個任務隊列,提供一個外部接口給上層應用插入任務到隊列中,消費者線程從隊列中取走任務,并執行。為了不影響qt本身的消息循環,提供一個同步接口,實作裡使用eventLoop,任務執行完後發送信号給eventLoop,使它退出。
###三具體實作
直接上幹貨
- 任務類:
//RztThreadTask.h
#ifndef RZTTHREADTASK_H
#define RZTTHREADTASK_H
#include <QObject>
#include <functional>
using RztThreadFunc = std::function<void()>;
class RztThreadTask : public QObject
{
Q_OBJECT
public:
explicit RztThreadTask(RztThreadFunc func);
RztThreadTask(const RztThreadTask &threadTask);
void operator=(const RztThreadTask &threadTask);
RztThreadTask();
void process();
void operator()();
signals:
void sglTaskFinish();//任務完成信号
public slots:
private:
RztThreadFunc m_func;
};
//RztThreadTask.cpp
#include "RztThreadTask.h"
#include <QDebug>
RztThreadTask::RztThreadTask(RztThreadFunc func)
{
m_func = func;
}
RztThreadTask::RztThreadTask(const RztThreadTask &threadTask)
{
m_func = threadTask.m_func;
}
void RztThreadTask::operator=(const RztThreadTask &threadTask)
{
m_func = threadTask.m_func;
}
RztThreadTask::RztThreadTask()
{
}
void RztThreadTask::process()
{
try
{
m_func();//真正上層傳來的lambda函數
emit sglTaskFinish();//執行完發完成信号
}
catch(std::exception &e)
{
qDebug() << "exception error :" << e.what();
}
catch(...)
{
qDebug() << "exception error : unkonw exception";
}
}
void RztThreadTask::operator()()
{
process();//
}
-
線程類
線程類,做的事不多,隻是簡單的封裝下,不喜歡qthread的,可以使用std::thread替換
// RztThread.h
#ifndef RZTTHREAD_H
#define RZTTHREAD_H
#include <QObject>
#include <QThread>
#include "RztThreadTask.h"
class RztThread : public QThread
{
Q_OBJECT
public:
explicit RztThread(RztThreadFunc func);
explicit RztThread(const RztThreadFunc &&func);
RztThread(const RztThread&) = delete;
RztThread& operator=(const RztThread&) = delete;
protected:
void run();
public slots:
private:
RztThreadFunc m_func;
};
#endif // RZTTHREAD_H
//RztThread.cpp
#include "RztThread.h"
RztThread::RztThread(RztThreadFunc func)
:m_func(func)
{}
RztThread::RztThread(const RztThreadFunc &&func)
:m_func(func)
{
}
void RztThread::run()
{
m_func();
}
-
線程池類:
維護任務隊列,并進行線程資源管理
//RztThreadPool.h
#ifdef WIN32
#pragma execution_character_set("utf-8")
#endif
#pragma once
#include <string>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>
#include <QObject>
#include <queue>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <functional>
#include "RztThreadTask.h"
#include "RztThread.h"
class RztThreadPool : public QObject
{
Q_OBJECT
public:
RztThreadPool(unsigned short size = 6);
~RztThreadPool();
int idlThreadCount();
int threadCount();
bool commit(std::shared_ptr<RztThreadTask> funcPtr);
void addThread(unsigned short size);
public:
signals:
private:
std::vector<std::shared_ptr<RztThread>> m_threadPool; //線程池
std::queue<std::shared_ptr<RztThreadTask>> m_queueTasks; //任務隊列
std::mutex m_lock; //同步
std::condition_variable m_taskCon; //條件阻塞
std::atomic<bool> m_running{true}; //線程池是否執行
std::atomic<int> m_idlThreadNum{0}; //空閑線程數量
};
//RztThreadPool.cpp
#include "RztThreadPool.h"
#include <stdexcept>
#include <QDebug>
#define THREADPOOL_MAX_NUM 16
RztThreadPool::RztThreadPool(unsigned short size)
{
addThread(size);
}
RztThreadPool::~RztThreadPool()
{
m_running=false;
m_taskCon.notify_all();
for (auto threadPtr : m_threadPool)
{
if(!threadPtr->wait(100))
{
qDebug() << "thread not exit:" << threadPtr->currentThreadId();
}
}
}
int RztThreadPool::idlThreadCount()
{
return m_idlThreadNum;
}
int RztThreadPool::threadCount()
{
return m_threadPool.size();
}
bool RztThreadPool::commit(std::shared_ptr<RztThreadTask> funcPtr)
{
if (!m_running)
{
qDebug() << "threadPool is stopped!";
return false;
}
{
std::lock_guard<std::mutex> lock(m_lock);
m_queueTasks.push(funcPtr);
}
m_taskCon.notify_one();
return true;
}
void RztThreadPool::addThread(unsigned short size)
{
for (; m_threadPool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
{
RztThreadFunc funPtr = [=]()
{
while (m_running)
{
std::shared_ptr<RztThreadTask> funcPtr;
{
std::unique_lock<std::mutex> lock(m_lock);
m_taskCon.wait(lock, [this]
{
return !m_running || !m_queueTasks.empty();
});
if (!m_running)
{
return;
}
funcPtr = m_queueTasks.front();
m_queueTasks.pop();
}
if(funcPtr)
{
m_idlThreadNum--;
(*funcPtr)();
m_idlThreadNum++;
}
}
};
std::shared_ptr<RztThread> threadPtr = std::make_shared<RztThread>(funPtr);
m_threadPool.push_back(threadPtr);
threadPtr->start();
m_idlThreadNum++;
}
}
-
接口類
提供線程任務接口,主要有異步和同步兩種接口,任務參數傳一個lambda函數就行
//RztThreadMgr.h
#ifdef WIN32
#pragma execution_character_set("utf-8")
#endif
#ifndef RZTTHREADMGR_H
#define RZTTHREADMGR_H
#include <QObject>
#include <QEventLoop>
#include <QTimer>
#include <QMutex>
#include <QHash>
#include <thread>
#include "RztThreadPool.h"
class RztThreadMgr : public QObject
{
Q_OBJECT
public:
explicit RztThreadMgr(QObject *parent = nullptr);
~RztThreadMgr();
QObject* getObject();
//同步接口
bool syncStartOneTask(RztThreadFunc func, int nTime = 1000);
//異步接口
void asyncStartOneTask(RztThreadFunc func);
void addThreads(int nNum);
//空閑線程數量
int idlThreadCount();
//線程數量
int threadCount();
signals:
public slots:
private:
private:
std::shared_ptr<RztThreadPool> m_threadPoolPtr;
QMutex m_mutex;
};
#endif // RZTTHREADMGR_H
//RztThreadMgr.cpp
#ifdef WIN32
#pragma execution_character_set("utf-8")
#endif
#include "RztThreadMgr.h"
#include <QMutexLocker>
#include <functional>
#include <QDebug>
#define MAX_TASK_ID 1000000
RztThreadMgr::RztThreadMgr(QObject *parent) :
QObject(parent), m_mutex(QMutex::Recursive)
{
m_threadPoolPtr = std::make_shared<RztThreadPool>();
}
RztThreadMgr::~RztThreadMgr()
{
}
QObject *RztThreadMgr::getObject()
{
return this;
}
bool RztThreadMgr::syncStartOneTask(RztThreadFunc func, int nTime)
{
QTimer timer;
std::shared_ptr<QEventLoop> loopPtr = std::make_shared<QEventLoop>();
std::shared_ptr<RztThreadTask> taskPtr = std::make_shared<RztThreadTask>(func);
timer.setInterval(nTime);
connect(&timer, &QTimer::timeout, this, [=](){
qDebug() << "timeout exit!";
loopPtr->exit(-1);
});
connect(taskPtr.get(), &RztThreadTask::sglTaskFinish, this, [=](){
loopPtr->exit(0);
qDebug() << "normal exit!";
});
{
QMutexLocker lock(&m_mutex);
m_threadPoolPtr->commit(taskPtr);
}
timer.start();
int ret = loopPtr->exec();
if(ret == -1)
{
return false;
}
return true;
}
void RztThreadMgr::asyncStartOneTask(RztThreadFunc func)
{
QMutexLocker lock(&m_mutex);
std::shared_ptr<RztThreadTask> taskPtr = std::make_shared<RztThreadTask>(func);
m_threadPoolPtr->commit(taskPtr);
}
void RztThreadMgr::addThreads(int nNum)
{
QMutexLocker lock(&m_mutex);
m_threadPoolPtr->addThread(nNum);
}
int RztThreadMgr::idlThreadCount()
{
QMutexLocker lock(&m_mutex);
return m_threadPoolPtr->idlThreadCount();
}
int RztThreadMgr::threadCount()
{
QMutexLocker lock(&m_mutex);
return m_threadPoolPtr->threadCount();
}
-
demo測試
僅截取部分測試函數,示例使用方式,具體可以自己根據應用場景使用
void Widget::test01()
{
int count = 0;
while(true)
{
count++;
threadMgr.asyncStartOneTask(std::bind(&Widget::testLog, this));
bool ret = threadMgr.syncStartOneTask([](){
qDebug() << "task test02";
});
if(!ret)
{
qDebug() << "---------------";
}
qDebug() << "task end";
ret = threadMgr.syncStartOneTask([](){
qDebug() << "task test03";
});
if(!ret)
{
qDebug() << "---------------";
}
ret = threadMgr.syncStartOneTask([](){
QThread::msleep(100);
qDebug() << "task test04";
});
if(!ret)
{
qDebug() << "---------------";
}
QThread::msleep(10);
if(count > 1000)
{
count = 0;
QThread::msleep(110);
}
}
}
void Widget::on_pushButton_clicked()
{
//壓力測試
threadMgr.addThreads(6);
std::thread thread(std::bind(&Widget::test01, this));
std::thread thread2(std::bind(&Widget::test01, this));
std::thread thread3(std::bind(&Widget::test01, this));
std::thread thread4(std::bind(&Widget::test01, this));
std::thread thread5(std::bind(&Widget::test01, this));
//普通測試
test01();
}