C/C++手撕線程池(線程池的封裝和實現)
本文使用的源碼地址:
https://github.com/SCljh/thread_pool
線程池描述
?
池式結構
??在計算機體系結構中有許多池式結構:內存池、數據庫連接池、請求池、消息隊列、
對象池等等。
??池式結構解決的主要問題為緩沖問題,起到的是緩沖區的作用。
?
線程池
?通過使用線程池,我們可以有效降低多線程操作中任務申請和釋放產生的性能消耗。
特別是當我們每個線程的任務處理比較快時,系統大部分性能消耗都花在了
pthread_create以及釋放線程的過程中。
那既然是這樣的話,何不在程序開始運行階段提前創建好一堆線程,
等我們需要用的時候只要去這一堆線程中領一個線程,用完了再放回去,
等程序運行結束時統一釋放這一堆線程呢?按照這個想法,線程池出現了。
?
線程池解決的問題
解決任務處理
阻塞IO
解決線程創建于銷毀的成本問題
管理線程
??線程池應用之一:日志存儲
??在服務器保存日志至磁盤上時,性能往往壓在磁盤讀寫上,
而引入線程池利用異步解耦可以解決磁盤讀寫性能問題。
??線程池的主要作用:異步解耦
?
說了那么多線程池的優點,那接下來要做的就是手撕這誘人的玩意了。
?
樸實無華但不枯燥的代碼(以c++為例)
本文主要講解的是c++線程池的實現,C語言實現其實思想和c++是一致的,
具體的代碼可見文章開頭的鏈接。
線程池中比較關鍵的東西
??若想自己編寫一個線程池框架,那么可以先關注線程池中比較關鍵的東西:
工作隊列
任務隊列
線程池的池
pthread_create中的回調函數
為什么說這些東西比較關鍵?因為這“大四元”基本上支撐起了整個線程池的框架。
而線程池的框架如下所示:
————————————————

工作隊列
??worker隊列,首先要有worker才有隊列,我們首先定義worker結構體:
??可想而知,worker中要有create_pthread函數的id參數,
還需要有控制每一個worker live or die的標志terminate,
我們最好再設置一個標志表示這個worker是否在工作。
最后,我們要知道這個worker隸屬于那個線程池(至于為什么下文將介紹)
struct NWORKER{
pthread_t threadid; //線程id
bool terminate; //是否需要結束該worker的標志
int isWorking; //該worker是否在工作
ThreadPool *pool; //隸屬于的線程池
}任務隊列struct NJOB{
void (*func)(void *arg); //任務函數
void *user_data; //函數參數
};線程池本池
??對于一個線程池,任務隊列和工作隊列已經是必不可少的東西了,
那線程池還有需要哪些東西輔助它以達到它該有的功能呢?
??一說到線程,那處理好線程同步就是一個繞不開的話題,
那在線程池中我們需要處理的臨界資源有哪些呢?
想想我們工作隊列中的每個worker都在等待一個任務隊列看其是否有任務到來,
所以很容易得出結論我們必須要在線程池中實現兩把鎖:
一把是用來控制對任務隊列操作的互斥鎖,
另一把是當任務隊列有新任務時喚醒worker的條件鎖。
??有了這兩把鎖,線程池中再加點必要的一些數字以及對線程池操作的函數,
那么這個類就寫完了。實現代碼如下:
class ThreadPool{
private:
struct NWORKER{
pthread_t threadid;
bool terminate;
int isWorking;
ThreadPool *pool;
} *m_workers;
struct NJOB{
void (*func)(void *arg); //任務函數
void *user_data;
};
public:
//線程池初始化
//numWorkers:線程數量
ThreadPool(int numWorkers, int max_jobs);
//銷毀線程池
~ThreadPool();
//面向用戶的添加任務
int pushJob(void (*func)(void *data), void *arg, int len);
private:
//向線程池中添加任務
bool _addJob(NJOB* job);
//回調函數
static void* _run(void *arg);
void _threadLoop(void *arg);
private:
std::list<NJOB*> m_jobs_list;
int m_max_jobs; //任務隊列中的最大任務數
int m_sum_thread; //worker總數
int m_free_thread; //空閑worker數
pthread_cond_t m_jobs_cond; //線程條件等待
pthread_mutex_t m_jobs_mutex; //為任務加鎖防止一個任務
被兩個線程執行等其他情況
};可以看到我們做了一些必要的封裝,只給用戶提供了構造函數、
析構函數以及添加任務的函數。這也是一個基本的線程池框架必要的接口。
回調函數
static?
??根據上方代碼可以看見,回調函數為static函數。
我可不想在我使用使用回調函數的時候自動給我加上*this參數。
??首先回調函數是每個線程創建之后就開始執行的函數,
該函數作為pthread_create的第三個參數傳入。我們來看看pthread_create的函數原型:
int pthread_create(pthread_t *tidp,const pthread_attr_t *attr, void *(*start_rtn)(void*),void *arg);
注意到,此處的我們傳入的回調函數必須是接受一個void*參數,
且返回類型為void*的函數。如果我們將回調函數寫成線程池的普通成員函數,
那么c++會在這個函數參數前默認加上一個*this參數,
**這也是為什么我們能在成員函數中使用當前對象中的一些屬性。
**然而就是這個原因,若我們傳入的回調函數指針為類的成員函數,
那c++編譯器會破壞我們的函數結構(因為給我們加了一個形參),
導致pthread_create的第三個參數不符合要求而報錯:

看吧,編譯器不讓我們用non-static的成員函數作為回調函數傳入pthread_create中。
其實在c++中,大多數回調函數都有這個要求。
??那為什么static就可以呢?
這是因為static函數為類的靜態函數,當類的成員函數被static修飾后,
調用該函數將不會默認傳遞*this指針,
這也是為什么static成員函數中不能使用對象的非static屬性:
你*this指針都沒傳我上哪去找你的對象?
函數本身
??在運行回調函數的時候,我們又想用對象里的東西(比如鎖),
編譯器又不讓我們用,那怎么辦?別忘了雖然static函數沒有*this指針,
但它卻可以有一個*void的參數啊。有了這個*void,我們還怕少一個*this指針?
我們可以先寫一個static函數,將需要的對象指針通過形參傳到這個函數里,
再在這個函數中通過這個對象調用成員函數的方法,就能使用這個對象的成員屬性了。
??就像這樣:
//run為static函數
void* ThreadPool::_run(void *arg) {
NWORKER *worker = (NWORKER *)arg;
worker->pool->_threadLoop(arg);
}
//threadLoop為普通成員函數
void ThreadPool::_threadLoop(void *arg) {
//在這里就能直接用當前ThreadPool對象的東西了
}
至于threadLoop的實現,由于線程是要一直存在的,
一個while(true)的循環肯定少不了了。這個循環中具體做什么呢:
不斷檢查任務隊列中是否有job:
如果有,則取出這個job,并將該job從任務隊列中刪除,且執行job中的func函數。
如果沒有,調用pthread_cond_wait函數等待job到來時被喚醒。
若當前worker的terminate為真,則退出循環結束線程。
注意在對job操作前別忘了加鎖,函數實現如下:
void ThreadPool::_threadLoop(void *arg) {
NWORKER *worker = (NWORKER*)arg;
while (1){
//線程只有兩個狀態:執行\等待
//查看任務隊列前先獲取鎖
pthread_mutex_lock(&m_jobs_mutex);
//當前沒有任務
while (m_jobs_list.size() == 0) {
//檢查worker是否需要結束生命
if (worker->terminate) break;
//條件等待直到被喚醒
pthread_cond_wait(&m_jobs_cond,&m_jobs_mutex);
}
//檢查worker是否需要結束生命
if (worker->terminate){
pthread_mutex_unlock(&m_jobs_mutex);
break;
}
//獲取到job后將該job從任務隊列移出,免得其他worker過來重復做這個任務
struct NJOB *job = m_jobs_list.front();
m_jobs_list.pop_front();
//對任務隊列的操作結束,釋放鎖
pthread_mutex_unlock(&m_jobs_mutex);
m_free_thread--;
worker->isWorking = true;
//執行job中的func
job->func(job->user_data);
worker->isWorking = false;
free(job->user_data);
free(job);
}
free(worker);
pthread_exit(NULL);
}
原文鏈接:https://blog.csdn.net/ACMer_L/article/details/107578636
————————————————
原文鏈接:https://blog.csdn.net/ACMer_L/article/details/107578636
*博客內容為網友個人發布,僅代表博主個人觀點,如有侵權請聯系工作人員刪除。

