线程池 | Thread、ThreadPool类

1.线程池支持的模式

1
2
3
4
5
6
//线程池支持的模式
enum class PoolMode
{
MODE_FIXED,//固定数量的线程
MODE_CACHED,//线程数量可动态增长
};

2.Thread类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//线程类型
class Thread
{
public:
using ThreadFunc = std::function<void(int)>;
//线程构造
Thread(ThreadFunc func);

//线程析构
~Thread();

//启动线程
void start();

//获取线程id
int getId() const;

private:
ThreadFunc func_;//回调函数
static int generateId_;//单调自增的一个id,用来标识一个线程
int threadId_;//保存线程id
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
///////////////////线程类的方法实现

//线程构造
Thread::Thread(ThreadFunc func)
:func_(func),
threadId_(generateId_++)
{}

//线程析构
Thread::~Thread(){}

//启动线程
void Thread::start()
{
//创建一个线程来执行一个线程函数
std::thread t(func_,threadId_);//C++11l来说 线程对象t除了start作用域就要析构消失了所以要分离线程
//设置分离线程
t.detach();
}

//获取线程id
int Thread::getId() const
{
return threadId_;
}

3.ThraedPool类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
//线程池类型
class ThreadPool
{
public:
ThreadPool();
~ThreadPool();

//设置线程池工作模式
void setMode(PoolMode mode);

//设置task任务队列上限阈值
void setTaskQueMaxThresHold(int threshold);

//设置线程池cached模式下线程数量上限
void setthreadSizeThresHold(int threshold);

//给线程池提交任务
Result submitTask(std::shared_ptr<Task> sp);

//启动线程池 返回值是当前电脑的CPU核心数量
void start(int initThreadSize = std::thread::hardware_concurrency());

//禁用拷贝构造和拷贝赋值
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;

private:
//定义线程的回调函数
void threadFunc(int threadid);

//检查pool的运行状态
bool checkRunningState() const;
private:
//std::vector<std::unique_ptr<Thread>> threads_;//线程列表
std::unordered_map<int, std::unique_ptr<Thread>> threads_;
size_t initThreadSize_;//初识的线程数量

//记录当前线程池里面线程的总数量
std::atomic_int curThreadSize_;

//线程池内线程数量上限
int threadSizeThresHold_;

//记录空闲线程的数量
std::atomic_int idleThreadSize_;

std::queue<std::shared_ptr<Task>> taskQue_;//任务队列
/*任务数量 用原子类型是因为每次给任务分配线程之后,任务就会出队,
数量会进行++或者--,用原子类型来保证这个操作是原子的就行,没必要用互斥锁*/
std::atomic_int taskSize_;
int taskQueMaxThresHold_;//任务数量上限阈值

std::mutex taskQueMtx_;//保证任务队列的线程安全
std::condition_variable notFull_;//表示任务队列不满,给用户线程用,表示可以给任务队列提交自己的任务
std::condition_variable notEmpty_;//表示任务队列不空,给线程池用,表示线程池可以给任务分配线程了
std::condition_variable exitCoond_;//等待线程资源全部回收

PoolMode poolMode_;//当前线程池的工作模式

//表示当前线程池的启动状态
std::atomic_bool isPoolRunning_;

};

0.常量、全局变量定义

1
2
3
4
5
6
/////////////////线程池类的方法实现
const int TASK_MAX_THRESHHOLD = 1024;//最多支持多少任务
const int THREAD_MAX_THRESHHOLD = 10;//最多支持多少线程
const int THREAD_MAX_TDLE_TIME = 10;//最长空闲时间,超过就要回收单位是秒

int Thread::generateId_ = 0;

数量少是因为测试方便,大家可以自行修改

###1.checkRunningState()

1
2
3
4
bool ThreadPool::checkRunningState() const
{
return isPoolRunning_;
}

2.ThreadPool()

1
2
3
4
5
6
7
8
9
10
11
//线程池构造
ThreadPool::ThreadPool() :
initThreadSize_(0),
taskSize_(0),
idleThreadSize_(0),
threadSizeThresHold_(THREAD_MAX_THRESHHOLD),
taskQueMaxThresHold_(TASK_MAX_THRESHHOLD),
curThreadSize_(0),
poolMode_(PoolMode::MODE_FIXED),
isPoolRunning_(false)
{}

3.setMode()

1
2
3
4
5
6
7
//设置线程池工作模式
void ThreadPool::setMode(PoolMode mode)
{
if (checkRunningState())
return;
poolMode_ = mode;
}

4.setTaskQueMaxThresHold()

1
2
3
4
5
//设置task任务队列上限阈值
void ThreadPool::setTaskQueMaxThresHold(int threshold)
{
taskQueMaxThresHold_ = threshold;
}

5.setthreadSizeThresHold()

1
2
3
4
5
6
7
8
9
10
//设置线程池cached模式下线程数量上限
void ThreadPool::setthreadSizeThresHold(int threshold)
{
if (checkRunningState())
return;
if (poolMode_ == PoolMode::MODE_CACHED)
{
threadSizeThresHold_ = threshold;
}
}

6.start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//启动线程池
void ThreadPool::start(int initThreadSize)
{
//设置线程池的启动状态
isPoolRunning_ = true;
//记录初始线程个数
initThreadSize_ = initThreadSize;
curThreadSize_ = initThreadSize;

//创建线程对象的时候,把线程函数给到线程对象(使用bind绑定器实现)
for (int i = 0; i < initThreadSize_; i++)
{
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
//threads_.emplace_back(std::move(ptr));
}

//启动所有线程
for (int i = 0; i < initThreadSize_; i++)
{
threads_[i]->start();//需要执行一个线程函数
idleThreadSize_++;//记录初始空闲线程数量
}
}

7.submitTask()、threadFunc()

线程池 | ThreadPool的submitTask和threadFunc方法-CSDN博客

8.~ThreadPool()

1
2
3
4
5
6
7
8
9
10
11
12
//线程池析构
ThreadPool::~ThreadPool()
{
isPoolRunning_ = false;

//等待线程池里面所有的线程返回
std::unique_lock<std::mutex> lock(taskQueMtx_);

notEmpty_.notify_all();

exitCoond_.wait(lock, [&]()->bool {return threads_.size() == 0 ; });
}

4.使用线程池的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*
example:
ThreadPool pool;
pool.start(4);

class MyTask:public Task
{
public:
void run(){//线程代码....}
};

pool.submitTask(std::make_shared<MyTask>());

*/