线程池 | ThreadPool的submitTask和threadFunc方法
线程池 | ThreadPool的submitTask和threadFunc方法
1 | //线程池类型 |
1.submitTask()
1 | //给线程池提交任务 用户调用该接口传入任务对象 |
1.获取锁
提交任务涉及入队操作,对任务队列进行操作必须是线程安全的,所以要先获取锁
unique_lock
会在其生命周期结束时自动释放锁,防止忘记解锁而导致死锁
2.等待任务队列有空余空间:
notFull_.wait_for(lock, std::chrono::seconds(1), [&]()->bool {return taskQue_.size() < (size_t)taskQueMaxThresHold_; });
- 如果任务队列有空间,将任务 `sp` 放入任务队列中。1
2
3
4
5
6
7
8
9
10
- 使用条件变量 `notFull_` 来等待任务队列 `taskQue_` 有空余空间。
- `wait_for` 函数会等待最多 1 秒钟,在等待期间会释放 `taskQueMtx_` 锁,以便其他线程可以添加或移除任务。
- 等待的条件是任务队列的大小小于 `taskQueMaxThresHold_`,如果条件满足或者等待超时(1 秒),函数将继续执行。
- 如果等待超时且条件不满足,函数将输出错误信息并返回一个表示失败的 `Result` 对象。
3.**将任务添加到任务队列中**:
- ```
taskQue_.emplace(sp);taskSize_++;
- 使用条件变量 `notEmpty_` 通知所有等待在 `notEmpty_` 上的线程,表明任务队列不为空,可能是正在等待任务的工作线程,它们可以开始处理任务。1
2
3
4
5
6
7
- 任务数量加一,以跟踪任务队列中的任务数量。
4.**通知任务队列不为空**:
- ```
notEmpty_.notify_all();
5.创建新线程(仅在 CACHED 模式下):
if (poolMode_ == PoolMode::MODE_CACHED && taskSize_ > idleThreadSize_ && curThreadSize_ < threadSizeThresHold_)
- 创建一个新的 `Thread` 对象,并使用 `std::bind` 将 `ThreadPool` 的 `threadFunc` 函数绑定到新线程。1
2
3
4
5
6
7
- 检查线程池的模式是否为 `MODE_CACHED`。
- 检查任务数量是否大于空闲线程数量且当前线程数量小于线程数量阈值。
- 如果满足上述条件,将创建一个新的线程。
- ```cpp
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));int threadId = ptr->getId();
- 将新线程存储到 `threads_` 映射中。1
2
3
4
5
- 获取新线程的 `ID`。
- ```cp
threads_.emplace(threadId, std::move(ptr));threads_[threadId]->start();
- 增加当前线程数量和空闲线程数量。1
2
3
4
5
6
- 启动新线程。
- ```cpp
curThreadSize_++;
idleThreadSize_++;
返回结果:
return Result(sp);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
- 无论任务是否立即执行,都会返回一个 `Result` 对象,该对象与提交的任务相关联。
### 关键点说明:
- **条件变量和锁的使用**:
- 条件变量 `notFull_` 和 `notEmpty_` 与互斥锁 `taskQueMtx_` 配合使用,以协调线程池中的任务提交和任务处理。
- `notFull_` 用于等待任务队列有空余空间,`notEmpty_` 用于通知任务队列不为空。
- **超时等待机制**:
- 使用 `wait_for` 避免长时间阻塞提交任务的线程,如果任务队列已满,会等待 1 秒钟,超时后会返回提交失败的结果。
- **线程创建策略**:
- 在 `MODE_CACHED` 模式下,根据任务数量、空闲线程数量和线程数量阈值来决定是否创建新线程,适用于处理小而快的任务。
这个函数是线程池的核心部分,它允许用户提交任务,管理任务队列的大小,并在需要时创建新的线程。使用条件变量和互斥锁确保了任务提交的线程安全性和线程池的高效运行。
## 2.threadFunc()
```cpp
//定义线程的回调函数 线程池的所有线程从任务队列中消费任务
void ThreadPool::threadFunc(int threadid)
{
auto lastTime = std::chrono::high_resolution_clock().now();
for (;;)
{
//注意,处理完任务队列相关的操作之后就应该释放锁了,线程执行任务的时候不应该拿着锁阻止其他线程继续拿任务执行
std::shared_ptr<Task> task;
{
//先获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
std::cout << "thread tid:" << std::this_thread::get_id() << "尝试获取任务" << std::endl;
//cached模式下,有可能已经创建了很多的线程,但是空闲时间超过了60s,应该把超时的空闲线程给回收掉
//超过initThreadSize_数量的线程要回收
//当前时间-上一次线程执行的时间>60s
//每一秒钟返回一次 怎么区分超时返回还是有任务待执行的返回
while ( taskQue_.size() == 0)
{
//线程池结束,回收线程资源
if (!isPoolRunning_)
{
threads_.erase(threadid);
std::cout << "threadid" << threadid << "已被回收" << std::endl;
exitCoond_.notify_all();
return;
}
if (poolMode_ == PoolMode::MODE_CACHED)
{
//条件变量超时返回了
if (std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds> (now - lastTime);
if (dur.count() >= THREAD_MAX_TDLE_TIME
&& curThreadSize_ > initThreadSize_)
{
//开始回收当前线程
//记录线程数量的相关变量的值的修改
//把线程对象从线程队列容器中删除 没有办法匹配当前的threadFunc是对应的哪个线程对象
//通过线程id确定线程对象进而进行一个删除
threads_.erase(threadid);
curThreadSize_--;
idleThreadSize_--;
std::cout << "threadid" << threadid << "已被回收" << std::endl;
return;
}
}
}
else
{
//等待notEmpty条件
notEmpty_.wait(lock);
}
/*//线程池要结束,回收线程资源
if (!isPoolRunning_)
{
threads_.erase(threadid);
std::cout << "threadid" << threadid << "已被回收" << std::endl;
exitCoond_.notify_all();
return;
}*/
}
idleThreadSize_--;
std::cout << "thread tid:" << std::this_thread::get_id() << "获取任务成功" << std::endl;
//从任务队列中取出一个任务
task = taskQue_.front();
taskQue_.pop();
taskSize_--;
//如果依然有剩余任务,继续通知其它的线程执行任务
if (taskQue_.size() > 0)
{
notEmpty_.notify_all();
}
//取出一个任务需要进行通知,通知可以继续提交生产任务
notFull_.notify_all();
}
//当前线程负责执行这个任务
if (task != nullptr)
{
task->exec();
}
idleThreadSize_++;
lastTime = std::chrono::high_resolution_clock().now();//更新线程调度的执行完任务的时间
}
}
函数功能解释:
1.初始化和变量定义:
auto lastTime = std::chrono::high_resolution_clock().now();
- 表示线程会不断尝试从任务队列中获取任务,直到线程池停止或被回收。1
2
3
4
5
6
7
- 记录线程开始执行时的时间,用于后续检查线程是否空闲超时。
2.**无限循环**:
- ```cpp
for (;;)
3.获取任务的锁:
std::unique_lock<std::mutex> lock(taskQueMtx_);
- 当任务队列为空时,线程需要等待任务。1
2
3
4
5
6
7
- 使用 `std::unique_lock` 锁定 `taskQueMtx_` 互斥锁,确保在操作任务队列时的线程安全。
4.**检查任务队列是否为空**:
- ```
while (taskQue_.size() == 0)
5.线程池关闭检查:
if (!isPoolRunning_)
- 在 `MODE_CACHED` 模式下,线程可能会因为长时间空闲而被回收。1
2
3
4
5
6
7
- 如果线程池停止运行,将当前线程从 `threads_` 中移除,通知其他等待的线程(通过 `exitCoond_`),并返回,结束线程。
6.**缓存模式下的线程回收**:
- ```
if (poolMode_ == PoolMode::MODE_CACHED)if (std::cv_status::timeout == notEmpty_.wait_for(lock, std::chrono::seconds(1)))
- 如果满足回收条件,将线程从 `threads_` 中移除,更新线程相关的计数,如 `curThreadSize_` 和 `idleThreadSize_`,并结束线程。1
2
3
4
5
- 线程等待任务队列不为空,如果等待超时(1 秒),检查是否空闲时间超过 `THREAD_MAX_TDLE_TIME` 秒,且当前线程数大于初始线程数。
- ```cpp
threads_.erase(threadid);
7.等待任务队列不为空:
notEmpty_.wait(lock);
- 当有任务时,从任务队列的前端取出任务,更新任务队列大小和任务计数。1
2
3
4
5
6
7
8
9
- 当任务队列不为 `MODE_CACHED` 模式或未超时等待时,线程会等待 `notEmpty_` 条件变量,直到有任务可用。
8.**获取任务并更新状态**:
- ```cpp
task = taskQue_.front();
taskQue_.pop();
taskSize_--;
9.通知其他线程:
if (taskQue_.size() > 0) { notEmpty_.notify_all(); }
- 通知可以继续提交任务。1
2
3
4
5
- 若任务队列还有任务,通知其他线程可以继续执行任务。
- ```
notFull_.notify_all();
10.执行任务:
task->exec();
- 执行完任务后更新线程的最后执行时间,以便检查空闲时间。1
2
3
4
5
6
7
- 执行取出的任务。
11.**更新空闲时间**:
- ```
lastTime = std::chrono::high_resolution_clock().now();
简单来说,该函数的主要流程如下:
- 线程不断尝试从任务队列中获取任务。
- 在
MODE_CACHED
模式下,如果长时间等待任务超时且线程空闲,会检查是否需要回收该线程。 - 当任务队列不为空时,取出任务,执行任务,更新状态并通知其他线程。
注意事项:
- 确保在多线程环境中,对任务队列的操作是线程安全的,通过
taskQueMtx_
互斥锁和notEmpty_
、notFull_
等条件变量的协调。 - 对于
MODE_CACHED
模式,合理控制线程的回收时机,避免频繁创建和回收线程导致性能开销。 - 当线程池停止时,要确保线程正确回收和资源释放。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 Darlingの妙妙屋!
评论