天天看點

qt用戶端技術雜談-線程池(05)

一、qt線程池

線程池的概念隻要是程式員大概都知道一些,這裡就不重複進行。但大部分線程池都是針對背景業務。那麼用戶端使用線程池會有什麼不同呢,特别是qt用戶端,使用線程池擷取傳回值,會不會影響界面阻塞等。
           

二、實作原理

實作機制:線程池使用生産者消費者模型,維護一個任務隊列,提供一個外部接口給上層應用插入任務到隊列中,消費者線程從隊列中取走任務,并執行。為了不影響qt本身的消息循環,提供一個同步接口,實作裡使用eventLoop,任務執行完後發送信号給eventLoop,使它退出。

###三具體實作

直接上幹貨

  1. 任務類:
//RztThreadTask.h
#ifndef RZTTHREADTASK_H
#define RZTTHREADTASK_H
#include <QObject>
#include <functional>
using RztThreadFunc = std::function<void()>;
class RztThreadTask : public QObject
{
    Q_OBJECT
public:
    explicit RztThreadTask(RztThreadFunc func);
    RztThreadTask(const RztThreadTask &threadTask);
    void operator=(const RztThreadTask &threadTask);
    RztThreadTask();

    void            process();
    void            operator()();
signals:
    void            sglTaskFinish();//任務完成信号
public slots:
private:
    RztThreadFunc   m_func;
};

//RztThreadTask.cpp
#include "RztThreadTask.h"
#include <QDebug>
RztThreadTask::RztThreadTask(RztThreadFunc func)
{
    m_func = func;
}
RztThreadTask::RztThreadTask(const RztThreadTask &threadTask)
{
    m_func = threadTask.m_func;
}
void RztThreadTask::operator=(const RztThreadTask &threadTask)
{
    m_func = threadTask.m_func;
}
RztThreadTask::RztThreadTask()
{
}
void RztThreadTask::process()
{
    try
    {
        m_func();//真正上層傳來的lambda函數
        emit sglTaskFinish();//執行完發完成信号
    }
    catch(std::exception &e)
    {
        qDebug() << "exception error :" << e.what();
    }
    catch(...)
    {
        qDebug() << "exception error : unkonw exception";
    }
}
void RztThreadTask::operator()()
{
    process();//
}
           
  1. 線程類

    線程類,做的事不多,隻是簡單的封裝下,不喜歡qthread的,可以使用std::thread替換

// RztThread.h
#ifndef RZTTHREAD_H
#define RZTTHREAD_H

#include <QObject>
#include <QThread>
#include "RztThreadTask.h"

class RztThread : public QThread
{
    Q_OBJECT
public:
    explicit RztThread(RztThreadFunc func);
    explicit RztThread(const RztThreadFunc &&func);
    RztThread(const RztThread&) = delete;
    RztThread& operator=(const RztThread&) = delete;
protected:
    void        run();
public slots:
private:
    RztThreadFunc           m_func;
};
#endif // RZTTHREAD_H

//RztThread.cpp
#include "RztThread.h"
RztThread::RztThread(RztThreadFunc func)
    :m_func(func)
{}
RztThread::RztThread(const RztThreadFunc &&func)
    :m_func(func)
{
}
void RztThread::run()
{
    m_func();
}
           
  1. 線程池類:

    維護任務隊列,并進行線程資源管理

//RztThreadPool.h
#ifdef WIN32
  #pragma execution_character_set("utf-8")
#endif
#pragma once
#include <string>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>
#include <QObject>
#include <queue>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <functional>
#include "RztThreadTask.h"
#include "RztThread.h"
class  RztThreadPool : public QObject
{
    Q_OBJECT
public:
    RztThreadPool(unsigned short size = 6);
    ~RztThreadPool();

    int idlThreadCount();
    int threadCount();
    bool commit(std::shared_ptr<RztThreadTask> funcPtr);
    void addThread(unsigned short size);
public:
signals:
private:
    std::vector<std::shared_ptr<RztThread>> m_threadPool;     //線程池
    std::queue<std::shared_ptr<RztThreadTask>> m_queueTasks;            //任務隊列
    std::mutex m_lock;                   //同步
    std::condition_variable m_taskCon;   //條件阻塞
    std::atomic<bool> m_running{true};     //線程池是否執行
    std::atomic<int>  m_idlThreadNum{0};  //空閑線程數量
};

//RztThreadPool.cpp
#include "RztThreadPool.h"
#include <stdexcept>
#include <QDebug>
#define  THREADPOOL_MAX_NUM 16
RztThreadPool::RztThreadPool(unsigned short size)
{
    addThread(size);
}
RztThreadPool::~RztThreadPool()
{
    m_running=false;
    m_taskCon.notify_all();
    for (auto threadPtr : m_threadPool)
    {
        if(!threadPtr->wait(100))
        {
            qDebug() << "thread not exit:" << threadPtr->currentThreadId();
        }
    }
}
int RztThreadPool::idlThreadCount()
{
    return m_idlThreadNum;
}
int RztThreadPool::threadCount()
{
    return m_threadPool.size();
}
bool RztThreadPool::commit(std::shared_ptr<RztThreadTask> funcPtr)
{
    if (!m_running)
    {
        qDebug() << "threadPool is stopped!";
        return false;
    }
    {
        std::lock_guard<std::mutex> lock(m_lock);
        m_queueTasks.push(funcPtr);
    }

    m_taskCon.notify_one();
    return true;
}
void RztThreadPool::addThread(unsigned short size)
{
    for (; m_threadPool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
    {
        RztThreadFunc funPtr = [=]()
        {
            while (m_running)
            {
                std::shared_ptr<RztThreadTask> funcPtr;
                {
                    std::unique_lock<std::mutex> lock(m_lock);
                    m_taskCon.wait(lock, [this]
                    {
                            return !m_running || !m_queueTasks.empty();
                    });
                    if (!m_running)
                    {
                        return;
                    }
                    funcPtr = m_queueTasks.front();
                    m_queueTasks.pop();
                }
                if(funcPtr)
                {
                    m_idlThreadNum--;
                    (*funcPtr)();
                    m_idlThreadNum++;
                }
            }
        };
        std::shared_ptr<RztThread> threadPtr = std::make_shared<RztThread>(funPtr);
        m_threadPool.push_back(threadPtr);
        threadPtr->start();
        m_idlThreadNum++;
    }
}
           
  1. 接口類

    提供線程任務接口,主要有異步和同步兩種接口,任務參數傳一個lambda函數就行

//RztThreadMgr.h
#ifdef WIN32
  #pragma execution_character_set("utf-8")
#endif
#ifndef RZTTHREADMGR_H
#define RZTTHREADMGR_H

#include <QObject>

#include <QEventLoop>
#include <QTimer>
#include <QMutex>
#include <QHash>
#include <thread>
#include "RztThreadPool.h"
class RztThreadMgr : public QObject
{
    Q_OBJECT
public:
    explicit            RztThreadMgr(QObject *parent = nullptr);
    ~RztThreadMgr();
    QObject*            getObject();
    //同步接口
    bool                syncStartOneTask(RztThreadFunc func, int nTime = 1000);
    //異步接口
    void                asyncStartOneTask(RztThreadFunc func);
    void                addThreads(int nNum);
    //空閑線程數量
    int                 idlThreadCount();
    //線程數量
    int                 threadCount();
signals:
public slots:
private:
private:
    std::shared_ptr<RztThreadPool>              m_threadPoolPtr;
    QMutex                                      m_mutex;
};
#endif // RZTTHREADMGR_H

//RztThreadMgr.cpp
#ifdef WIN32
  #pragma execution_character_set("utf-8")
#endif
#include "RztThreadMgr.h"
#include <QMutexLocker>
#include <functional>
#include <QDebug>
#define MAX_TASK_ID 1000000
RztThreadMgr::RztThreadMgr(QObject *parent) :
    QObject(parent), m_mutex(QMutex::Recursive)
{
    m_threadPoolPtr = std::make_shared<RztThreadPool>();
}
RztThreadMgr::~RztThreadMgr()
{
}
QObject *RztThreadMgr::getObject()
{
    return this;
}
bool RztThreadMgr::syncStartOneTask(RztThreadFunc func, int nTime)
{
    QTimer timer;
    std::shared_ptr<QEventLoop> loopPtr = std::make_shared<QEventLoop>();
    std::shared_ptr<RztThreadTask> taskPtr = std::make_shared<RztThreadTask>(func);

    timer.setInterval(nTime);
    connect(&timer, &QTimer::timeout, this, [=](){
        qDebug() << "timeout exit!";
        loopPtr->exit(-1);
    });
    connect(taskPtr.get(), &RztThreadTask::sglTaskFinish, this, [=](){
        loopPtr->exit(0);
        qDebug() << "normal exit!";
    });
    {
        QMutexLocker lock(&m_mutex);
        m_threadPoolPtr->commit(taskPtr);
    }
    timer.start();
    int ret = loopPtr->exec();
    if(ret == -1)
    {
        return false;
    }
    return true;
}
void RztThreadMgr::asyncStartOneTask(RztThreadFunc func)
{
    QMutexLocker lock(&m_mutex);
    std::shared_ptr<RztThreadTask> taskPtr = std::make_shared<RztThreadTask>(func);
    m_threadPoolPtr->commit(taskPtr);
}
void RztThreadMgr::addThreads(int nNum)
{
    QMutexLocker lock(&m_mutex);
    m_threadPoolPtr->addThread(nNum);
}
int RztThreadMgr::idlThreadCount()
{
    QMutexLocker lock(&m_mutex);
    return m_threadPoolPtr->idlThreadCount();
}
int RztThreadMgr::threadCount()
{
    QMutexLocker lock(&m_mutex);
    return m_threadPoolPtr->threadCount();
}
           
  1. demo測試

    僅截取部分測試函數,示例使用方式,具體可以自己根據應用場景使用

void    Widget::test01()
{
    int count = 0;
    while(true)
    {
        count++;
        threadMgr.asyncStartOneTask(std::bind(&Widget::testLog, this));

        bool ret = threadMgr.syncStartOneTask([](){
            qDebug() << "task test02";
        });

        if(!ret)
        {
            qDebug() << "---------------";
        }
        qDebug() << "task end";

        ret = threadMgr.syncStartOneTask([](){
            qDebug() << "task test03";
        });
        if(!ret)
        {
            qDebug() << "---------------";
        }

        ret = threadMgr.syncStartOneTask([](){
            QThread::msleep(100);
            qDebug() << "task test04";
        });
        if(!ret)
        {
            qDebug() << "---------------";
        }

        QThread::msleep(10);
        if(count > 1000)
        {
            count = 0;
            QThread::msleep(110);
        }
    }

}

void Widget::on_pushButton_clicked()
{
//壓力測試
    threadMgr.addThreads(6);
    std::thread thread(std::bind(&Widget::test01, this));
    std::thread thread2(std::bind(&Widget::test01, this));
    std::thread thread3(std::bind(&Widget::test01, this));
    std::thread thread4(std::bind(&Widget::test01, this));
    std::thread thread5(std::bind(&Widget::test01, this));

//普通測試
    test01();
}
           

繼續閱讀