线程池 | 改进版

1.改进点

1.如何能让用户提交任务的过程更加简单方便?

1
2
3
4

pool.submitTask(std::make_shared<MyTask>(1, 100));
改为
pool.submitTask(sum1,10,20);

省略掉用户自己创建myTask类和重写run方法的这个过程

直接传入函数作为线程函数,后面是该函数的参数作为任务给到线程池

使用可变参模板编程来实现

2.Result以及相关的类型都是自己写的,其实都是有现成的

使用future代替Result,future等价于咱们自己写的Result

2.改进后的

主要改动的就是submitTask函数,threadFunc少量改动,其他的函数没有改动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
#ifndef THREADPOOL_H
#define THREADPOOL_H

#include<iostream>
#include<vector>
#include<queue>
#include<memory>
#include<atomic>
#include<mutex>
#include<condition_variable>
#include<functional>
#include<thread>
#include<unordered_map>
#include<future>

const int TASK_MAX_THRESHHOLD = 2;
const int THREAD_MAX_THRESHHOLD = 10;
const int THREAD_MAX_TDLE_TIME = 10;//单位是秒

//线程池支持的模式
enum class PoolMode
{
MODE_FIXED,//固定数量的线程
MODE_CACHED,//线程数量可动态增长
};

//线程类型
class Thread
{
public:
using ThreadFunc = std::function<void(int)>;
//线程构造
Thread(ThreadFunc func)
:func_(func),
threadId_(generateId_++)
{}

//线程析构
~Thread() = default;

//启动线程
void start()
{
//创建一个线程来执行一个线程函数
std::thread t(func_, threadId_);//C++11l来说 线程对象t除了start作用域就要析构消失了所以要分离线程

//设置分离线程
t.detach();
}

//获取线程id
int getId() const
{
return threadId_;
}

private:
ThreadFunc func_;
static int generateId_;
int threadId_;//保存线程id
};

int Thread::generateId_ = 0;

//线程池类型
class ThreadPool
{
public:
ThreadPool() :
initThreadSize_(0),
taskSize_(0),
idleThreadSize_(0),
threadSizeThresHold_(THREAD_MAX_THRESHHOLD),
taskQueMaxThresHold_(TASK_MAX_THRESHHOLD),
curThreadSize_(0),
poolMode_(PoolMode::MODE_FIXED),
isPoolRunning_(false)
{}
~ThreadPool()
{
isPoolRunning_ = false;

//等待线程池里面所有的线程返回
std::unique_lock<std::mutex> lock(taskQueMtx_);

notEmpty_.notify_all();

exitCoond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}

//设置线程池工作模式
void setMode(PoolMode mode)
{
if (checkRunningState())
return;
poolMode_ = mode;
}

//设置task任务队列上限阈值
void setTaskQueMaxThresHold(int threshold)
{
taskQueMaxThresHold_ = threshold;
}

//设置线程池cached模式下线程数量上限
void setthreadSizeThresHold(int threshold)
{
if (checkRunningState())
return;
if (poolMode_ == PoolMode::MODE_CACHED)
{
threadSizeThresHold_ = threshold;
}
}

//给线程池提交任务
//使用可变参模板编程让submitTask可以接收任意任务函数和任意数量的参数
template<typename Func,typename ...Args>
auto submitTask(Func&& func, Args&&... args) -> std::future<decltype(func(args...))>
{
using RType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<RType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<RType> result = task->get_future();

//获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);

//线程的通信 等待任务队列有空余

//用户提交任务,最长不能阻塞超过1s,否则判断提交任务失败
//满足等待1秒要求的
if (!notFull_.wait_for(lock, std::chrono::seconds(1),
[&]()->bool {return taskQue_.size() < (size_t)taskQueMaxThresHold_; }))
{
//表示等了1秒,条件还是没满足,即提交任务失败了
std::cerr << "task queue is full,submit task fail" << std::endl;
auto task = std::make_shared<std::packaged_task<RType()>>(
[]()->RType {return RType(); }
);
(*task)();
return task->get_future();
}

//如果有空余,把任务放入任务队列中
//taskQue_.emplace(sp);

//using Task = std::function<void()>;
//任务队列里面都是没有返回值的,
//但是这个task可能有返回值,
//那就用没有返回值的lambda表达式给套在外面,里面去调用这个task
taskQue_.emplace([task]() {(*task)();});

taskSize_++;//提交了任务,任务数量++

//因为新放了任务,任务队列肯定不空 notEmpty通知
notEmpty_.notify_all();

//cached模式:需要根据任务数量和空闲线程的数量,判断是否需要创建新的线程出来
//适合场景:小而且快的任务
if (poolMode_ == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeThresHold_)
{
std::cout << "create new thread........" << std::endl;
//创建新线程对象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
threads_[threadId]->start();//启动线程
//修改线程个数相关的变量
curThreadSize_++;
idleThreadSize_++;
}


//返回任务的Result对象
return result;
}

//启动线程池 返回值是当前电脑的CPU核心数量
void start(int initThreadSize = std::thread::hardware_concurrency())
{
//设置线程池的启动状态
isPoolRunning_ = true;
//记录初始线程个数
initThreadSize_ = initThreadSize;
curThreadSize_ = initThreadSize;

//创建线程对象的时候,把线程函数给到线程对象(使用bind绑定器实现)
for (int i = 0; i < initThreadSize_; i++)
{
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
//threads_.emplace_back(std::move(ptr));
}

//启动所有线程
for (int i = 0; i < initThreadSize_; i++)
{
threads_[i]->start();//需要执行一个线程函数
idleThreadSize_++;//记录初始空闲线程数量
}
}

//禁用拷贝构造和拷贝赋值
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;

private:
//定义线程的回调函数
void threadFunc(int threadid)
{
auto lastTime = std::chrono::high_resolution_clock().now();

for (;;)
{
//注意,处理完任务队列相关的操作之后就应该释放锁了,线程执行任务的时候不应该拿着锁阻止其他线程继续拿任务执行
Task task;
{
//先获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);

std::cout << "thread tid:" << std::this_thread::get_id() << "尝试获取任务" << std::endl;

//cached模式下,有可能已经创建了很多的线程,但是空闲时间超过了60s,应该把超时的空闲线程给回收掉
//超过initThreadSize_数量的线程要回收
//当前时间-上一次线程执行的时间>60s


//每一秒钟返回一次 怎么区分超时返回还是有任务待执行的返回
while (taskQue_.size() == 0)
{
//线程池结束,回收线程资源
if (!isPoolRunning_)
{
threads_.erase(threadid);
std::cout << "threadid" << threadid << "已被回收" << std::endl;
exitCoond_.notify_all();
return;
}
if (poolMode_ == PoolMode::MODE_CACHED)
{
//条件变量超时返回了
if (std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds> (now - lastTime);
if (dur.count() >= THREAD_MAX_TDLE_TIME
&& curThreadSize_ > initThreadSize_)
{
//开始回收当前线程
//记录线程数量的相关变量的值的修改
//把线程对象从线程队列容器中删除 没有办法匹配当前的threadFunc是对应的哪个线程对象
//通过线程id确定线程对象进而进行一个删除
threads_.erase(threadid);
curThreadSize_--;
idleThreadSize_--;

std::cout << "threadid" << threadid << "已被回收" << std::endl;
return;
}
}
}
else
{
//等待notEmpty条件
notEmpty_.wait(lock);
}
}
idleThreadSize_--;

std::cout << "thread tid:" << std::this_thread::get_id() << "获取任务成功" << std::endl;

//从任务队列中取出一个任务
task = taskQue_.front();
taskQue_.pop();
taskSize_--;

//如果依然有剩余任务,继续通知其它的线程执行任务
if (taskQue_.size() > 0)
{
notEmpty_.notify_all();
}

//取出一个任务需要进行通知,通知可以继续提交生产任务
notFull_.notify_all();
}
//当前线程负责执行这个任务
if (task != nullptr)
{
//task->exec();
task();
}
idleThreadSize_++;
lastTime = std::chrono::high_resolution_clock().now();//更新线程调度的执行完任务的时间
}
}

//检查pool的运行状态
bool checkRunningState() const
{
return isPoolRunning_;
}
private:
//std::vector<std::unique_ptr<Thread>> threads_;//线程列表
std::unordered_map<int, std::unique_ptr<Thread>> threads_;
size_t initThreadSize_;//初识的线程数量

//记录当前线程池里面线程的总数量
std::atomic_int curThreadSize_;

//线程池内线程数量上限
int threadSizeThresHold_;

//记录空闲线程的数量
std::atomic_int idleThreadSize_;

//Task任务对应一个函数对象
using Task = std::function<void()>;
std::queue<Task> taskQue_;//任务队列
/*任务数量 用原子类型是因为每次给任务分配线程之后,任务就会出队,
数量会进行++或者--,用原子类型来保证这个操作是原子的就行,没必要用互斥锁*/
std::atomic_int taskSize_;
int taskQueMaxThresHold_;//任务数量上限阈值

std::mutex taskQueMtx_;//保证任务队列的线程安全
std::condition_variable notFull_;//表示任务队列不满,给用户线程用,表示可以给任务队列提交自己的任务
std::condition_variable notEmpty_;//表示任务队列不空,给线程池用,表示线程池可以给任务分配线程了
std::condition_variable exitCoond_;//等待线程资源全部回收

PoolMode poolMode_;//当前线程池的工作模式

//表示当前线程池的启动状态
std::atomic_bool isPoolRunning_;

};


#endif

重难点:可变参模板编程,future和packaged_task的使用,bind和function的理解

packaged_task 、future知识点-CSDN博客

3.测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include"threadPool.h"
#include<chrono>
using namespace std;

int sum(int a, int b)
{
this_thread::sleep_for(chrono::seconds(2));
return a + b;
}

int main()
{
ThreadPool pool;
//pool.setMode(PoolMode::MODE_CACHED);
pool.start(2);
future<int> res= pool.submitTask(sum,10,20);
future<int> res1 = pool.submitTask(sum, 10, 20);
future<int> res2 = pool.submitTask(sum, 10, 20);
future<int> res3 = pool.submitTask(sum, 10, 20);
future<int> res4 = pool.submitTask(sum, 10, 20);
cout << res.get() << endl;
cout << res1.get() << endl;
cout << res2.get() << endl;
cout << res3.get() << endl;
cout << res4.get() << endl;
return 0;
}