本人就職于國際知名終端廠商,負責modem晶片研發。
在5G早期負責終端資料業務層、核心網相關的開發工作,目前牽頭6G算力網絡技術标準研究。
文章目錄
- 線程池Code詳解
-
- 線程池原理圖
- 使用方法
- 線程池code詳解
-
- 線程池結構體及參數解釋
- 工作線程結構體及參數解釋
- 線程池任務隊列/傳回值隊列結構體及參數詳解
- 任務結構體及參數詳解
- 線程池初始化函數code詳解
- 線程建立函數code詳解
- push任務到線程池各函數code詳解
- pull任務傳回值各函數code詳解
- Abort任務函數code詳解
- 工作線程函數code詳解
- 建立任務對象函數code詳解
- 擷取回調函數參數/傳回值參數 函數code詳解
- 任務隊列中删除比對key的任務函數code詳解
- 單元測試
-
- 線程池基本功能UT
- 線程池性能UT
線程池Code詳解
代碼傳送門
本文介紹一個簡單的線程池Code,首先介紹一些術語以友善後續code的了解
Thread pool
這是一個工作伺服器線程池,将一系列的工作線程映射到CPU核心。每個工作線程循環的從相同的輸入隊列中擷取工作,如果有傳回值則工作線程會将傳回值放到傳回值隊列。
在這個線程池中允許取消任務,例如,通過之前任務的傳回值,考慮某個已經被放到任務隊列但還沒有執行的任務再執行已經沒有意義。
Jobs
任務的結構體是notifiedFIFO_elt_t(結構體參數後面會詳解),任務可以通過函數newNotifiedFIFO_elt() 來建立,通過函數delNotifiedFIFO_elt() 删除,或者使用者自己建立。
Queues of jobs
隊列結構體notifiedFIFO_t(結構體參數後面會詳解),通過init_notifiedFIFO()函數建立,如果需要删除建立者隻需要free隊列對象即可。通過push_notifiedFIFO()函數可以向隊列添加任務,通過pull_notifiedFIFO()和poll_notifiedFIFO()可以擷取任務的傳回值,兩個函數的差別是pull_notifiedFIFO()有鎖而poll_notifiedFIFO()無鎖。使用abort_notifiedFIFO()函數使用者可以删除某個特定的任務。
Thread pools initialization
使用init_tpool()函數使用者可以建立一個或者多個線程池,通過參數設定工作線程的核心親和性。如果CPU存在則會将此工作線程的親和性與設定的CPU綁定,如果不希望與任何一個CPU綁定則将對應的參數值設定為負數。所有建立的工作線程都是實時線程,他們的名字都是“Tpool_‘coreID’”.
adding jobs
新任務通過pushTpool()函數加入到任務隊列
abort
通過任務ID與abortTpool()函數可以取消所有比對這個ID的任務,被取消的任務沒有傳回值,如果任務已經在執行并且有傳回值,則任務執行完畢後産生的傳回值也會被删除。
Performance measurements
測試工具會記錄creationTime、startProcessingTime、endProcessingTime、returnTime。我們需要設定環境變量thread-pool-measurements為一個有效的linux pipe檔案,并通過這個性能測試工具将測試值呈現出來,具體實踐後面會介紹。
線程池原理圖
線程池通過線程初始化函數
initTPool(char *params,tpool_t *pool, bool performanceMeas)實作線程池初始化,初始化内容包括:
1.開辟線程池空間
2.初始化線程池參數
3.根據參數params建立工作線程
一次性初始化完畢之後,工作線程阻塞等待任務隊列廣播(當有任務加入隊列時,會向所有阻塞工作線程廣播),收到廣播後工作線程從任務隊列中競争任務。競争成功則修改目前工作線程的狀态,開始執行任務,執行完畢如果有傳回值則将結果入隊到response隊列(這個參數在建立任務的時候初始化),運作完畢或者競争失敗則重新進入阻塞狀态。
缺點:沒有管理者線程,所有工作線程一次性建立,無法根據任務量動态增删工作線程,如果任務量小,工作線程建立較多則浪費資源且易出現“饑餓”的情況,如果任務量突增,工作線程建立較少,則易導緻任務等待逾時的問題。
使用方法
1.首先在整個項目啟動時建立一個線程池對象(已配置設定記憶體)調用初始化線程池函數完成線程池初始化:
2.在執行過程中通過pushTpool函數動态添加任務,
通過pullTpoll函數擷取任務傳回值,
也可以通過abortTpool函數删除某個任務
更詳細的解釋可以參考後面的code詳解
線程池code詳解
線程池結構體及參數解釋
typedef struct thread_pool {
//activated線程池目前狀态,true表示激活線程池,false表示關閉線程池
//因為沒有管理者線程這個參數隻在初始化線程池時有效,建立成功後無法通過此變量動态
//關閉線程池
int activated;
//measurePerf性能測量标志位,開啟時會自動計算任務入隊時間,開始執行時間,執行結
//束時間,傳回時間,并将結果write到特定的隊列中,通過性能測量display函數可以顯示測
//量結果。後面的UT會介紹
bool measurePerf;
//性能測試時有名管道的write描述符
int traceFd;
//性能測試時有名管道的read描述符
int dummyTraceFd;
//性能測試時cpu Cycles by MicroSec
uint64_t cpuCyclesMicroSec;
//程式中隻有定義沒有使用,具體用途未知
uint64_t startProcessingUE;
//目前線程池的工作線程計數器
int nbThreads;
//隻有定義和初始化,沒有具體使用,用途未知
bool restrictRNTI;
//線程池任務隊列
notifiedFIFO_t incomingFifo;
//線程池中工作線程線性表指針
struct one_thread *allthreads;
} tpool_t;
工作線程結構體及參數解釋
struct one_thread {
//線程TID
pthread_t threadID;
//此線程線上程池中的ID,此ID在initPool時指派,指派與建立順序有關,例如第一個創
//建的線程ID=0,第二個ID=1...
int id;
//目前線程的CPU affinity
int coreID;
//此線程name
char name[256];
//目前線程正在處理的任務Key ,主要用于删除任務時關鍵字比對
uint64_t runningOnKey;
//目前處理的任務是否已經被abort flag,如果True則丢棄傳回值
bool abortFlag;
//線程所線上程池資訊,用于擷取目前線程池資訊,例如讀取線程池任務隊列等
struct thread_pool *pool;
//工作線程結構體中的next域
struct one_thread *next;
};
線程池任務隊列/傳回值隊列結構體及參數詳解
//此隊列采用線性存儲結構,無大小限制(預設不超過系統尋址範圍)
typedef struct notifiedFIFO_s {
//隊列頭指針
notifiedFIFO_elt_t *outF;
//隊列尾指針
notifiedFIFO_elt_t *inF;
//隊列互斥鎖
pthread_mutex_t lockF;
//隊列信号量
pthread_cond_t notifF;
} notifiedFIFO_t;
任務結構體及參數詳解
typedef struct notifiedFIFO_elt_s {
//任務結構體next域
struct notifiedFIFO_elt_s *next;
//任務key,在目前code中通過此參數abort對應的任務
uint64_t key; //To filter out elements
//此任務結束後結果傳回隊列位址
struct notifiedFIFO_s *reponseFifo;
//回調函數
void (*processingFunc)(void *);
//任務是否已經配置設定記憶體flag ,避免記憶體洩露和多次釋放
bool malloced;
//性能測量開啟後,此任務加入任務隊列的時間(時間由rdtsc擷取,ARM下需要修改)
uint64_t creationTime;
//性能測量開啟後,此任務開始執行的時間(時間由rdtsc擷取,ARM下需要修改)
uint64_t startProcessingTime;
//性能測量開啟後,此任務執行完畢的時間(時間由rdtsc擷取,ARM下需要修改)
uint64_t endProcessingTime;
//性能測量開啟後,此任務執行完畢其傳回值輸入response隊列的時間(時間由rdtsc擷取,
//ARM下需要修改)
uint64_t returnTime;
//回調函數的函數參數
void *msgData;
} notifiedFIFO_elt_t;
線程池初始化函數code詳解
void initTpool(
char *params, //線程池初始化參數以‘,’分隔參數,具體用法在UT篇介紹
tpool_t *pool, //已配置設定記憶體的線程池參數位址
bool performanceMeas) //性能測量開關
{
memset(pool,0,sizeof(*pool));
//擷取環境變量,此變量為性能測試的有名管道name
char *measr=getenv("threadPoolMeasurements");
pool->measurePerf=performanceMeas;
// force measurement if the output is defined
pool->measurePerf=measr!=NULL;
if (measr) {
//建立有名管道,owner/group/other權限為讀寫
mkfifo(measr,0666);
//建立管道read描述符
AssertFatal(-1 != (pool->dummyTraceFd=open(measr, O_RDONLY| O_NONBLOCK)),"");
//建立管道write描述符
AssertFatal(-1 != (pool->traceFd=open(measr,O_WRONLY|O_APPEND|O_NOATIME|O_NONBLOCK)),"");
} else
pool->traceFd=-1;
//線程池激活
pool->activated=true;
//初始化任務隊列
initNotifiedFIFO(&pool->incomingFifo);
//兩個臨時變量,用與擷取params參數
char *saveptr, * curptr;
//臨時變量指向新配置設定的工作線程記憶體位址
struct one_thread * thread;
//初始化目前線程池存活線程數量
pool->nbThreads=0;
pool->restrictRNTI=false;
//以‘,’分割參數
curptr=strtok_r(params,",",&saveptr);
while ( curptr!=NULL ) {
int c=toupper(curptr[0]);
switch (c) {
case 'U':
pool->restrictRNTI=true;
break;
case 'N':
pool->activated=false;
break;
default:
//開始建立工作線程
thread=pool->allthreads;
pool->allthreads=(struct one_thread *)malloc(sizeof(struct one_thread));
//将新建立的工作線程挂到線程池工作線程線性表上
pool->allthreads->next=thread;
//下述code個人認為存在問題,故注釋掉修改為上述code。因為下述code
//每建立一個線程next域都指向自己,會導緻在abortPool函數中産生死循環(已
//UT驗證)
//pool->allthreads->next=pool->allthreads;
printf("create a thread for core %d\n", atoi(curptr));
pool->allthreads->coreID=atoi(curptr);
pool->allthreads->id=pool->nbThreads;
pool->allthreads->pool=pool;
//Configure the thread scheduler policy for Linux
// set the thread name for debugging
sprintf(pool->allthreads->name,"Tpool_%d",pool->allthreads->coreID);
//建立工作線程(之後會對此函數詳解)
threadCreate(&pool->allthreads->threadID,
one_thread,
(void *)pool->allthreads,
pool->allthreads->name,
pool->allthreads->coreID,
OAI_PRIORITY_RT);
//線程計數器加一
pool->nbThreads++;
}
//讀取下一個參數
curptr=strtok_r(NULL,",",&saveptr);
}
if (pool->activated && pool->nbThreads==0) {
printf("No servers created in the thread pool, exit\n");
exit(1);
}
}
線程建立函數code詳解
void threadCreate(pthread_t* t, //TID
void * (*func)(void*), //回調函數
void * param, //回調函數參數
char* name, //線程name
int affinity, //cpu affinity
int priority) //線程排程優先級
{
pthread_attr_t attr;
//初始化線程屬性
pthread_attr_init(&attr);
//設定線程資源釋放類型
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
//設定線程資源繼承類型
pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);
//設定線程排程方式
pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
struct sched_param sparam={0};
sparam.sched_priority = priority;
//設定線程屬性
pthread_attr_setschedparam(&attr, &sparam);
//建立線程
pthread_create(t, &attr, func, param);
//設定線程name
pthread_setname_np(*t, name);
if (affinity != -1 ) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
//設定cpu affinity mask
CPU_SET(affinity, &cpuset);
//設定線程cpu affinity
AssertFatal( pthread_setaffinity_np(*t, sizeof(cpu_set_t), &cpuset) == 0, "Error setting processor affinity");
}
pthread_attr_destroy(&attr);
}
push任務到線程池各函數code詳解
static inline void pushTpool(tpool_t *t, //線程池
notifiedFIFO_elt_t *msg) //需要入隊的任務
{
if (t->measurePerf) msg->creationTime=rdtsc();
//将任務入隊線程池任務隊列
pushNotifiedFIFO(&t->incomingFifo, msg);
}
static inline void
pushNotifiedFIFO(notifiedFIFO_t *nf,
notifiedFIFO_elt_t *msg)
{
mutexlock(nf->lockF);
pushNotifiedFIFO_nothreadSafe(nf,msg);
condbroadcast(nf->notifF);
mutexunlock(nf->lockF);
}
static inline void
pushNotifiedFIFO_nothreadSafe(
notifiedFIFO_t *nf,
notifiedFIFO_elt_t *msg)
{
msg->next=NULL;
//隊列為空則直接加入到隊頭
if (nf->outF == NULL)
nf->outF = msg;
//隊列非空加入到隊尾
if (nf->inF != NULL)
nf->inF->next = msg;
//隊尾指針指向新加入的任務
nf->inF = msg;
}
pull任務傳回值各函數code詳解
static inline notifiedFIFO_elt_t *pullTpool(
notifiedFIFO_t *responseFifo,
tpool_t *t)
{
//從傳回值隊列擷取任務傳回值
notifiedFIFO_elt_t *msg= pullNotifiedFIFO(responseFifo);
if (t->measurePerf)
msg->returnTime=rdtsc();
//将性能測試資料寫入Linux pipe
if (t->traceFd >= 0)
if(write(t->traceFd, msg, sizeof(*msg)));
return msg;
}
static inline notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf)
{
mutexlock(nf->lockF);
notifiedFIFO_elt_t *ret;
//阻塞等到傳回值
while((ret=pullNotifiedFIFO_nothreadSafe(nf)) == NULL)
condwait(nf->notifF, nf->lockF);
mutexunlock(nf->lockF);
return ret;
}
static inline
notifiedFIFO_elt_t *pullNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf)
{
if (nf->outF == NULL)
return NULL;
notifiedFIFO_elt_t *ret=nf->outF;
//循環隊列檢測
if (nf->outF==nf->outF->next)
printf("Circular list in thread pool: push several times the same buffer is forbidden\n");
nf->outF=nf->outF->next;
//重置隊列空狀态參數
if (nf->outF==NULL)
nf->inF=NULL;
return ret;
}
Abort任務函數code詳解
static inline
void abortTpool(tpool_t *t, uint64_t key)
{
notifiedFIFO_t *nf=&t->incomingFifo;
mutexlock(nf->lockF);
//采用二級指針删除清單元素
notifiedFIFO_elt_t **start=&nf->outF;
//從任務隊列中删除比對key的所有任務
while(*start!=NULL) {
if ( (*start)->key == key ) {
notifiedFIFO_elt_t *request=*start;
//從清單中移除對象 *start為目前待删除元素的next域位址
*start=(*start)->next;
//free任務對象
delNotifiedFIFO_elt(request);
}
if (*start != NULL)
//start為目前元素的前一個元素的next域位址
start=&(*start)->next;
}
struct one_thread *ptr=t->allthreads;
//将目前比對key的已經執行的任務删除(并不能終止任務,隻是删除傳回值)
while(ptr!=NULL) {
if (ptr->runningOnKey==key)
//abortFlag置為true,如果有傳回值則丢棄
ptr->abortFlag=true;
ptr=ptr->next;
}
mutexunlock(nf->lockF);
}
工作線程函數code詳解
void *one_thread(void *arg) {
struct one_thread *myThread=(struct one_thread *) arg;
struct thread_pool *tp=myThread->pool;
// Infinite loop to process requests
do {
//從任務隊列競争任務
notifiedFIFO_elt_t *elt=pullNotifiedFifoRemember(&tp->incomingFifo, myThread);
if (tp->measurePerf) elt->startProcessingTime=rdtsc();
//執行回調函數
elt->processingFunc(NotifiedFifoData(elt));
if (tp->measurePerf) elt->endProcessingTime=rdtsc();
//判斷此任務是否有傳回值
if (elt->reponseFifo) {
// Check if the job is still alive, else it has been aborted
mutexlock(tp->incomingFifo.lockF);
//如果abortFlag為true則删除任務,傳回值也被删除
if (myThread->abortFlag)
delNotifiedFIFO_elt(elt);
else
//否則傳回值進入傳回值隊列
pushNotifiedFIFO(elt->reponseFifo, elt);
mutexunlock(tp->incomingFifo.lockF);
}
} while (true);
}
static inline
notifiedFIFO_elt_t *pullNotifiedFifoRemember(
notifiedFIFO_t *nf,
struct one_thread *thr)
{
mutexlock(nf->lockF);
//競争任務,如果沒有任務則阻塞
while(!nf->outF)
condwait(nf->notifF, nf->lockF);
//競争成功擷取任務對象
notifiedFIFO_elt_t *ret=nf->outF;
nf->outF=nf->outF->next;
if (nf->outF==NULL)
nf->inF=NULL;
// For abort feature
thr->runningOnKey=ret->key;
thr->abortFlag=false;
mutexunlock(nf->lockF);
return ret;
}
建立任務對象函數code詳解
static inline notifiedFIFO_elt_t *newNotifiedFIFO_elt(
int size, //回調函數參數size
uint64_t key, //任務id
notifiedFIFO_t *reponseFifo, //傳回值隊列
void (*processingFunc)(void *)) //回調函數
{
notifiedFIFO_elt_t *ret;
//為任務對象開辟記憶體 sizeof(notifiedFIFO_elt_t)+size+32 具體模型見下圖
assert( NULL != (ret=(notifiedFIFO_elt_t *) malloc(sizeof(notifiedFIFO_elt_t)+size+32)));
ret->next=NULL;
ret->key=key;
ret->reponseFifo=reponseFifo;
ret->processingFunc=processingFunc;
// We set user data piece aligend 32 bytes to be able to process it with
//SIMD
//函數參數與傳回值結構體位址(aligend 32 bytes)
ret->msgData=(void *)ret+(sizeof(notifiedFIFO_elt_t)/32+1)*32;
ret->malloced=true;
return ret;
}
擷取回調函數參數/傳回值參數 函數code詳解
static inline void *NotifiedFifoData(notifiedFIFO_elt_t *elt) {
return elt->msgData;
}
任務隊列中删除比對key的任務函數code詳解
static inline void
abortNotifiedFIFO(notifiedFIFO_t *nf, uint64_t key)
{
mutexlock(nf->lockF);
notifiedFIFO_elt_t **start=&nf->outF;
//二級指針删除清單元素
while(*start!=NULL) {
if ( (*start)->key == key ) {
notifiedFIFO_elt_t *request=*start;
*start=(*start)->next;
delNotifiedFIFO_elt(request);
}
if (*start != NULL)
start=&(*start)->next;
}
mutexunlock(nf->lockF);
}
單元測試
撰寫CmakeLists.txt,内容如下:
Target:
thread-pool-test用于線程池UT
measurement_display 用于線程池性能測試
project(measurement_display C)
add_definitions(-DTEST_THREAD_POOL)
set(CMAKE_C_FLAGS -g)
set(OPENAIR_DIR /home/share/openairinterface5g-develop-nr)
#************************measurement_display**************************
include_directories(${OPENAIR_DIR}/)
include_directories(${OPENAIR_DIR}/common/utils/threadPool/)
include_directories(${OPENAIR_DIR}/common/utils/)
include_directories(${OPENAIR_DIR}/common/utils/LOG)
include_directories(${OPENAIR_DIR}/common/utils/T/)
include_directories(${OPENAIR_DIR}/targets/ARCH/COMMON/)
add_executable( measurement_display
${OPENAIR_DIR}/common/utils/backtrace.c
measurement_display.c
)
target_link_libraries(measurement_display
-lpthread
)
#*************************thread-pool-test****************************
include_directories(${OPENAIR_DIR}/openair2/COMMON/)
add_executable( thread-pool-test
${OPENAIR_DIR}/common/utils/backtrace.c
${OPENAIR_DIR}/common/utils/LOG/log.c
${OPENAIR_DIR}/common/config/config_userapi.c
${OPENAIR_DIR}/common/config/config_load_configmodule.c
${OPENAIR_DIR}/common/config/config_cmdline.c
${OPENAIR_DIR}/common/utils/system.c
thread-pool.c
)
target_link_libraries(thread-pool-test
-lpthread
-ldl
)
分别執行 cmake CmakeLists.txt、cmake生成上述兩個目标檔案
線程池基本功能UT
原始code中已經包括了線程池基本功能測試,通過宏控TEST_THREAD_POOL開關,上述CmakeLists.txt已經打開此宏控。
基本功能測試涉及上述所有基本函數,下面介紹線程池初始化參數使用方法。
參數包含字元數字(可以是負數),包含兩個字元參數‘U’set restrictRNTI=true,‘N’set activated=false。例如"1,1,2,-1,u"表示在此線程池中總共建立4個線程,第一個參數cpu affinity 1,第二個線程cpu affinity 1,第三個線程cpu affinity 2,第四個線程不與任何CPU綁定。
測試回調函數及任務結構體如下圖:
測試結果如下圖:
線程池性能UT
性能測試首先需要設定環境變量threadPoolMeasurements
運作性能測試工具和線程池UT code,性能測試結果如下
代碼傳送門