網(wǎng)站開(kāi)源系統(tǒng)免費(fèi)網(wǎng)站建設(shè)seo
[Linux]線(xiàn)程池
文章目錄
- [Linux]線(xiàn)程池
- 線(xiàn)程池的概念
- 線(xiàn)程池的優(yōu)點(diǎn)
- 線(xiàn)程池的應(yīng)用場(chǎng)景
- 線(xiàn)程池的實(shí)現(xiàn)
線(xiàn)程池的概念
線(xiàn)程池是一種線(xiàn)程使用模式。線(xiàn)程池是一種特殊的生產(chǎn)消費(fèi)模型,用戶(hù)作為生產(chǎn)者,線(xiàn)程池作為消費(fèi)者和緩沖區(qū)。
線(xiàn)程過(guò)多會(huì)帶來(lái)調(diào)度開(kāi)銷(xiāo),進(jìn)而影響緩存局部和整體性能,而線(xiàn)程池維護(hù)著多個(gè)線(xiàn)程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。
線(xiàn)程池的優(yōu)點(diǎn)
- 線(xiàn)程池避免了在處理短時(shí)間任務(wù)時(shí)創(chuàng)建與銷(xiāo)毀線(xiàn)程的代價(jià)。
- 線(xiàn)程池不僅能夠保證內(nèi)核充分利用,還能防止過(guò)分調(diào)度。
注意: 線(xiàn)程池中可用線(xiàn)程的數(shù)量應(yīng)該取決于可用的并發(fā)處理器、處理器內(nèi)核、內(nèi)存、網(wǎng)絡(luò)sockets等的數(shù)量。
線(xiàn)程池的應(yīng)用場(chǎng)景
- 需要大量的線(xiàn)程來(lái)完成任務(wù),且完成任務(wù)的時(shí)間比較短。
- 對(duì)性能要求苛刻的應(yīng)用,比如要求服務(wù)器迅速響應(yīng)客戶(hù)請(qǐng)求。
- 接受突發(fā)性的大量請(qǐng)求,但不至于使服務(wù)器因此產(chǎn)生大量線(xiàn)程的應(yīng)用。
線(xiàn)程池的實(shí)現(xiàn)
下面我們實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線(xiàn)程池,線(xiàn)程池中提供了一個(gè)任務(wù)隊(duì)列,以及若干個(gè)線(xiàn)程(多線(xiàn)程)。
- 線(xiàn)程池中的多個(gè)線(xiàn)程負(fù)責(zé)從任務(wù)隊(duì)列當(dāng)中拿任務(wù),并將拿到的任務(wù)進(jìn)行處理。
- 線(xiàn)程池對(duì)外提供一個(gè)Push接口,用于讓外部線(xiàn)程能夠?qū)⑷蝿?wù)Push到任務(wù)隊(duì)列當(dāng)中。
線(xiàn)程池的代碼如下:
#pragma once#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <vector>
#include <queue>const int N = 5; // 線(xiàn)程池內(nèi)線(xiàn)程數(shù)量template <class T>
class ThreadPool
{
public:ThreadPool(int num = N) : _num(num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}void LockQueue(){pthread_mutex_lock(&_mutex);}void UnLockQueue(){pthread_mutex_unlock(&_mutex);}void threadWait(){pthread_cond_wait(&_cond, &_mutex);}void threadWakeUP(){pthread_cond_signal(&_cond);}T getTask(){T t = _tasks.front();_tasks.pop();return t;}bool isEmpty(){return _tasks.empty();}static void *threadRoutine(void *args){pthread_detach(pthread_self());ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);while (true){tp->LockQueue();while (tp->isEmpty()){tp->threadWait();}T t = tp->getTask();tp->UnLockQueue();t.Run();//任務(wù)處理}}void Start(){pthread_t tid;for (int i = 0; i < _num; i++){pthread_create(&tid, nullptr, threadRoutine, this);}}void PushTask(T &task) // 添加任務(wù){LockQueue();_tasks.push(task);threadWakeUP();UnLockQueue();}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _num; // 線(xiàn)程數(shù)std::queue<T> _tasks; // 任務(wù)隊(duì)列pthread_mutex_t _mutex; // 保證互斥訪問(wèn)任務(wù)隊(duì)列這一共享資源pthread_cond_t _cond; // 根據(jù)任務(wù)隊(duì)列中的任務(wù)數(shù)量控制線(xiàn)程的等待和運(yùn)行
};
為什么線(xiàn)程池中需要有互斥鎖和條件變量?
互斥鎖: 任務(wù)隊(duì)列是一個(gè)共享資源,外部線(xiàn)程可以調(diào)用添加任務(wù)的接口訪問(wèn)任務(wù)隊(duì)列,線(xiàn)程池內(nèi)部的線(xiàn)程可以直接訪問(wèn)任務(wù)隊(duì)列處理任務(wù),可能會(huì)造成任務(wù)隊(duì)列的并發(fā)訪問(wèn)問(wèn)題,因此需要利用互斥鎖保護(hù)任務(wù)隊(duì)列中的數(shù)據(jù)。
條件變量: 線(xiàn)程池當(dāng)中的線(xiàn)程要從任務(wù)隊(duì)列里拿任務(wù),前提條件是任務(wù)隊(duì)列中必須要有任務(wù),因此線(xiàn)程池當(dāng)中的線(xiàn)程在拿任務(wù)之前,需要先判斷任務(wù)隊(duì)列當(dāng)中是否有任務(wù),若此時(shí)任務(wù)隊(duì)列為空,那么該線(xiàn)程應(yīng)該進(jìn)行等待,直到任務(wù)隊(duì)列中有任務(wù)時(shí)再將其喚醒,因此我們需要引入條件變量。
當(dāng)外部線(xiàn)程向任務(wù)隊(duì)列中Push一個(gè)任務(wù)后,此時(shí)可能有線(xiàn)程正處于等待狀態(tài),因此在新增任務(wù)后需要喚醒在條件變量下等待的線(xiàn)程。
為什么線(xiàn)程池中的線(xiàn)程執(zhí)行例程需要設(shè)置為靜態(tài)方法?
使用pthread_create
函數(shù)創(chuàng)建線(xiàn)程時(shí),需要為創(chuàng)建的線(xiàn)程傳入一個(gè)執(zhí)行方法threadRoutine,該執(zhí)行方法只有一個(gè)參數(shù)類(lèi)型為void的參數(shù),以及返回類(lèi)型為void的返回值。
如果threadRoutine作為類(lèi)的成員函數(shù),該函數(shù)的第一個(gè)參數(shù)是隱藏的this指針,無(wú)法通過(guò)編譯。而靜態(tài)成員函數(shù)屬于類(lèi),而不屬于某個(gè)對(duì)象,也就是說(shuō)靜態(tài)成員函數(shù)是沒(méi)有隱藏的this指針的,因此我們需要將threadRoutine設(shè)置為靜態(tài)方法,此時(shí)threadRoutine函數(shù)才真正只有一個(gè)參數(shù)類(lèi)型為void的參數(shù)。
但是在靜態(tài)成員函數(shù)內(nèi)部無(wú)法調(diào)用非靜態(tài)成員函數(shù),而我們需要在threadRoutine函數(shù)當(dāng)中調(diào)用該類(lèi)的某些非靜態(tài)成員函數(shù)。因此我們需要在創(chuàng)建線(xiàn)程時(shí),向threadRoutine函數(shù)傳入的當(dāng)前對(duì)象的this指針,此時(shí)我們就能夠通過(guò)該this指針在threadRoutine函數(shù)內(nèi)部調(diào)用非靜態(tài)成員函數(shù)了。
任務(wù)類(lèi)型的設(shè)計(jì)
由于線(xiàn)程池編寫(xiě)的是模板化的,因此任務(wù)類(lèi)型可以是任意的,但是由于處理任務(wù)的邏輯是通過(guò)調(diào)用任務(wù)的Run函數(shù),因此任務(wù)類(lèi)中必須實(shí)現(xiàn)Run函數(shù)才能使用該線(xiàn)程池。
例如,實(shí)現(xiàn)一個(gè)計(jì)算任務(wù)類(lèi)如下:
#include <cstdlib>
#include <iostream>class Task
{
public:Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitcode(0){}void Run()//對(duì)傳入數(shù)據(jù)進(jìn)行操作{switch (_op){case '+':_result = _x + _y;break;case '-':_result = _x - _y;break;case '*':_result = _x * _y;break;case '/':if (_y == 0) _exitcode = -1;else_result = _x / _y;break;case '%':if (_y == 0) _exitcode = -2;else_result = _x % _y;break;default:break;}std::string result = std::to_string(_x) + _op + std::to_string(_y) + "=" + std::to_string(_result) + "(exicode:" + std::to_string(_exitcode);std::cout << result << std::endl;}private:int _x;//左操作數(shù)int _y;//右操作數(shù)char _op;//操作符int _result;//算數(shù)結(jié)果int _exitcode;//退出碼
};
線(xiàn)程池內(nèi)的線(xiàn)程在從任務(wù)隊(duì)列拿出任務(wù)進(jìn)行處理的過(guò)程,并不需要關(guān)心這些任務(wù)的類(lèi)型和來(lái)源,只需要拿到任務(wù)后執(zhí)行對(duì)應(yīng)的Run方法即可。
主線(xiàn)程實(shí)現(xiàn)
主線(xiàn)程只需要不斷向任務(wù)隊(duì)列當(dāng)中Push任務(wù)就行了,此后線(xiàn)程池當(dāng)中的線(xiàn)程會(huì)從任務(wù)隊(duì)列當(dāng)中獲取到這些任務(wù)并進(jìn)行處理。
#include "ThreadPoolv1.hpp"
#include "Task.hpp"
#include <memory>
#include <ctime>using namespace std;int main()
{std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());tp->Start();time(nullptr);const char* ops = "+-*/%";while(true){int x, y;x = rand() % 50;y = rand() % 50;char op = ops[rand()%5];Task t(x, y, op);tp->PushTask(t);sleep(1);}return 0;
}
運(yùn)行代碼后會(huì)產(chǎn)生六個(gè)線(xiàn)程,其中一個(gè)是主線(xiàn)程,另外五個(gè)是線(xiàn)程池內(nèi)處理任務(wù)的線(xiàn)程: