线程池 | ThreadPool的submitTask和threadFunc方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
//线程池类型
class ThreadPool
{
public:
ThreadPool();
~ThreadPool();

//设置线程池工作模式
void setMode(PoolMode mode);

//设置task任务队列上限阈值
void setTaskQueMaxThresHold(int threshold);

//设置线程池cached模式下线程数量上限
void setthreadSizeThresHold(int threshold);

//给线程池提交任务
Result submitTask(std::shared_ptr<Task> sp);

//启动线程池 返回值是当前电脑的CPU核心数量
void start(int initThreadSize = std::thread::hardware_concurrency());

//禁用拷贝构造和拷贝赋值
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;

private:
//定义线程的回调函数
void threadFunc(int threadid);

//检查pool的运行状态
bool checkRunningState() const;
private:
//std::vector<std::unique_ptr<Thread>> threads_;//线程列表
std::unordered_map<int, std::unique_ptr<Thread>> threads_;
size_t initThreadSize_;//初识的线程数量

//记录当前线程池里面线程的总数量
std::atomic_int curThreadSize_;

//线程池内线程数量上限
int threadSizeThresHold_;

//记录空闲线程的数量
std::atomic_int idleThreadSize_;

std::queue<std::shared_ptr<Task>> taskQue_;//任务队列
/*任务数量 用原子类型是因为每次给任务分配线程之后,任务就会出队,
数量会进行++或者--,用原子类型来保证这个操作是原子的就行,没必要用互斥锁*/
std::atomic_int taskSize_;
int taskQueMaxThresHold_;//任务数量上限阈值

std::mutex taskQueMtx_;//保证任务队列的线程安全
std::condition_variable notFull_;//表示任务队列不满,给用户线程用,表示可以给任务队列提交自己的任务
std::condition_variable notEmpty_;//表示任务队列不空,给线程池用,表示线程池可以给任务分配线程了
std::condition_variable exitCoond_;//等待线程资源全部回收

PoolMode poolMode_;//当前线程池的工作模式

//表示当前线程池的启动状态
std::atomic_bool isPoolRunning_;

};

1.submitTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//给线程池提交任务 用户调用该接口传入任务对象
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
//获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);

//线程的通信 等待任务队列有空余

//用户提交任务,最长不能阻塞超过1s,否则判断提交任务失败

/*while (taskQue_.size() == taskQueMaxThresHold_)
{
notFull_.wait(lock);
}*/
//这一行相当于上面三行,功能一样
//notFull_.wait(lock, [&]()->bool {return taskQue_.size() < taskQueMaxThresHold_; });

//满足等待1秒要求的
if (!notFull_.wait_for(lock, std::chrono::seconds(1),
[&]()->bool {return taskQue_.size() < (size_t)taskQueMaxThresHold_; }))
{
//表示等了1秒,条件还是没满足,即提交任务失败了
std::cerr << "task queue is full,submit task fail" << std::endl;
return Result(sp,false);
}

//如果有空余,把任务放入任务队列中
taskQue_.emplace(sp);

taskSize_++;//提交了任务,任务数量++

//因为新放了任务,任务队列肯定不空 notEmpty通知
notEmpty_.notify_all();

//cached模式:需要根据任务数量和空闲线程的数量,判断是否需要创建新的线程出来
//适合场景:小而且快的任务
if (poolMode_ == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeThresHold_)
{
std::cout << "create new thread........" << std::endl;
//创建新线程对象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
threads_[threadId]->start();//启动线程
//修改线程个数相关的变量
curThreadSize_++;
idleThreadSize_++;
}


//返回任务的Result对象
return Result(sp);
//return task->getResult();不能是这种形式,因为线程执行结束才会有返回值,线程结束后task就没了,所以不能通过任务来调方法得到Result
}

1.获取锁

提交任务涉及入队操作,对任务队列进行操作必须是线程安全的,所以要先获取锁

unique_lock 会在其生命周期结束时自动释放锁,防止忘记解锁而导致死锁

2.等待任务队列有空余空间

  • notFull_.wait_for(lock, std::chrono::seconds(1), [&]()->bool {return taskQue_.size() < (size_t)taskQueMaxThresHold_; });
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    - 使用条件变量 `notFull_` 来等待任务队列 `taskQue_` 有空余空间。
    - `wait_for` 函数会等待最多 1 秒钟,在等待期间会释放 `taskQueMtx_` 锁,以便其他线程可以添加或移除任务。
    - 等待的条件是任务队列的大小小于 `taskQueMaxThresHold_`,如果条件满足或者等待超时(1 秒),函数将继续执行。
    - 如果等待超时且条件不满足,函数将输出错误信息并返回一个表示失败的 `Result` 对象。

    3.**将任务添加到任务队列中**:

    - ```
    taskQue_.emplace(sp);
    - 如果任务队列有空间,将任务 `sp` 放入任务队列中。
  • taskSize_++;
    
    1
    2
    3
    4
    5
    6
    7

    - 任务数量加一,以跟踪任务队列中的任务数量。

    4.**通知任务队列不为空**:

    - ```
    notEmpty_.notify_all();
    - 使用条件变量 `notEmpty_` 通知所有等待在 `notEmpty_` 上的线程,表明任务队列不为空,可能是正在等待任务的工作线程,它们可以开始处理任务。

5.创建新线程(仅在 CACHED 模式下)

  • if (poolMode_ == PoolMode::MODE_CACHED && taskSize_ > idleThreadSize_ && curThreadSize_ < threadSizeThresHold_)
    
    1
    2
    3
    4
    5
    6
    7

    - 检查线程池的模式是否为 `MODE_CACHED`。
    - 检查任务数量是否大于空闲线程数量且当前线程数量小于线程数量阈值。
    - 如果满足上述条件,将创建一个新的线程。

    - ```cpp
    auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
    - 创建一个新的 `Thread` 对象,并使用 `std::bind` 将 `ThreadPool` 的 `threadFunc` 函数绑定到新线程。
  • int threadId = ptr->getId();
    
    1
    2
    3
    4
    5

    - 获取新线程的 `ID`。

    - ```cp
    threads_.emplace(threadId, std::move(ptr));
    - 将新线程存储到 `threads_` 映射中。
  • threads_[threadId]->start();
    
    1
    2
    3
    4
    5
    6

    - 启动新线程。

    - ```cpp
    curThreadSize_++;
    idleThreadSize_++;
    - 增加当前线程数量和空闲线程数量。
  1. 返回结果

    • return Result(sp);
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114

      - 无论任务是否立即执行,都会返回一个 `Result` 对象,该对象与提交的任务相关联。

      ### 关键点说明:

      - **条件变量和锁的使用**:
      - 条件变量 `notFull_` 和 `notEmpty_` 与互斥锁 `taskQueMtx_` 配合使用,以协调线程池中的任务提交和任务处理。
      - `notFull_` 用于等待任务队列有空余空间,`notEmpty_` 用于通知任务队列不为空。
      - **超时等待机制**:
      - 使用 `wait_for` 避免长时间阻塞提交任务的线程,如果任务队列已满,会等待 1 秒钟,超时后会返回提交失败的结果。
      - **线程创建策略**:
      - 在 `MODE_CACHED` 模式下,根据任务数量、空闲线程数量和线程数量阈值来决定是否创建新线程,适用于处理小而快的任务。

      这个函数是线程池的核心部分,它允许用户提交任务,管理任务队列的大小,并在需要时创建新的线程。使用条件变量和互斥锁确保了任务提交的线程安全性和线程池的高效运行。

      ## 2.threadFunc()

      ```cpp
      //定义线程的回调函数 线程池的所有线程从任务队列中消费任务
      void ThreadPool::threadFunc(int threadid)
      {
      auto lastTime = std::chrono::high_resolution_clock().now();

      for (;;)
      {
      //注意,处理完任务队列相关的操作之后就应该释放锁了,线程执行任务的时候不应该拿着锁阻止其他线程继续拿任务执行
      std::shared_ptr<Task> task;
      {
      //先获取锁
      std::unique_lock<std::mutex> lock(taskQueMtx_);

      std::cout << "thread tid:" << std::this_thread::get_id() << "尝试获取任务" << std::endl;

      //cached模式下,有可能已经创建了很多的线程,但是空闲时间超过了60s,应该把超时的空闲线程给回收掉
      //超过initThreadSize_数量的线程要回收
      //当前时间-上一次线程执行的时间>60s


      //每一秒钟返回一次 怎么区分超时返回还是有任务待执行的返回
      while ( taskQue_.size() == 0)
      {
      //线程池结束,回收线程资源
      if (!isPoolRunning_)
      {
      threads_.erase(threadid);
      std::cout << "threadid" << threadid << "已被回收" << std::endl;
      exitCoond_.notify_all();
      return;
      }
      if (poolMode_ == PoolMode::MODE_CACHED)
      {
      //条件变量超时返回了
      if (std::cv_status::timeout ==
      notEmpty_.wait_for(lock, std::chrono::seconds(1)))
      {
      auto now = std::chrono::high_resolution_clock().now();
      auto dur = std::chrono::duration_cast<std::chrono::seconds> (now - lastTime);
      if (dur.count() >= THREAD_MAX_TDLE_TIME
      && curThreadSize_ > initThreadSize_)
      {
      //开始回收当前线程
      //记录线程数量的相关变量的值的修改
      //把线程对象从线程队列容器中删除 没有办法匹配当前的threadFunc是对应的哪个线程对象
      //通过线程id确定线程对象进而进行一个删除
      threads_.erase(threadid);
      curThreadSize_--;
      idleThreadSize_--;

      std::cout << "threadid" << threadid << "已被回收" << std::endl;
      return;
      }
      }
      }
      else
      {
      //等待notEmpty条件
      notEmpty_.wait(lock);
      }
      /*//线程池要结束,回收线程资源
      if (!isPoolRunning_)
      {
      threads_.erase(threadid);
      std::cout << "threadid" << threadid << "已被回收" << std::endl;
      exitCoond_.notify_all();
      return;
      }*/
      }
      idleThreadSize_--;

      std::cout << "thread tid:" << std::this_thread::get_id() << "获取任务成功" << std::endl;

      //从任务队列中取出一个任务
      task = taskQue_.front();
      taskQue_.pop();
      taskSize_--;

      //如果依然有剩余任务,继续通知其它的线程执行任务
      if (taskQue_.size() > 0)
      {
      notEmpty_.notify_all();
      }

      //取出一个任务需要进行通知,通知可以继续提交生产任务
      notFull_.notify_all();
      }
      //当前线程负责执行这个任务
      if (task != nullptr)
      {
      task->exec();
      }
      idleThreadSize_++;
      lastTime = std::chrono::high_resolution_clock().now();//更新线程调度的执行完任务的时间
      }
      }

函数功能解释:

1.初始化和变量定义

  • auto lastTime = std::chrono::high_resolution_clock().now();
    
    1
    2
    3
    4
    5
    6
    7

    - 记录线程开始执行时的时间,用于后续检查线程是否空闲超时。

    2.**无限循环**:

    - ```cpp
    for (;;)
    - 表示线程会不断尝试从任务队列中获取任务,直到线程池停止或被回收。

3.获取任务的锁

  • std::unique_lock<std::mutex> lock(taskQueMtx_);
    
    1
    2
    3
    4
    5
    6
    7

    - 使用 `std::unique_lock` 锁定 `taskQueMtx_` 互斥锁,确保在操作任务队列时的线程安全。

    4.**检查任务队列是否为空**:

    - ```
    while (taskQue_.size() == 0)
    - 当任务队列为空时,线程需要等待任务。

5.线程池关闭检查

  • if (!isPoolRunning_)
    
    1
    2
    3
    4
    5
    6
    7

    - 如果线程池停止运行,将当前线程从 `threads_` 中移除,通知其他等待的线程(通过 `exitCoond_`),并返回,结束线程。

    6.**缓存模式下的线程回收**:

    - ```
    if (poolMode_ == PoolMode::MODE_CACHED)
    - 在 `MODE_CACHED` 模式下,线程可能会因为长时间空闲而被回收。
  • if (std::cv_status::timeout == notEmpty_.wait_for(lock, std::chrono::seconds(1)))
    
    1
    2
    3
    4
    5

    - 线程等待任务队列不为空,如果等待超时(1 秒),检查是否空闲时间超过 `THREAD_MAX_TDLE_TIME` 秒,且当前线程数大于初始线程数。

    - ```cpp
    threads_.erase(threadid);
    - 如果满足回收条件,将线程从 `threads_` 中移除,更新线程相关的计数,如 `curThreadSize_` 和 `idleThreadSize_`,并结束线程。

7.等待任务队列不为空

  • notEmpty_.wait(lock);
    
    1
    2
    3
    4
    5
    6
    7
    8
    9

    - 当任务队列不为 `MODE_CACHED` 模式或未超时等待时,线程会等待 `notEmpty_` 条件变量,直到有任务可用。

    8.**获取任务并更新状态**:

    - ```cpp
    task = taskQue_.front();
    taskQue_.pop();
    taskSize_--;
    - 当有任务时,从任务队列的前端取出任务,更新任务队列大小和任务计数。

9.通知其他线程

  • if (taskQue_.size() > 0)
    {
        notEmpty_.notify_all();
    }
    
    1
    2
    3
    4
    5

    - 若任务队列还有任务,通知其他线程可以继续执行任务。

    - ```
    notFull_.notify_all();
    - 通知可以继续提交任务。

10.执行任务

  • task->exec();
    
    1
    2
    3
    4
    5
    6
    7

    - 执行取出的任务。

    11.**更新空闲时间**:

    - ```
    lastTime = std::chrono::high_resolution_clock().now();
    - 执行完任务后更新线程的最后执行时间,以便检查空闲时间。

简单来说,该函数的主要流程如下:

  • 线程不断尝试从任务队列中获取任务。
  • MODE_CACHED 模式下,如果长时间等待任务超时且线程空闲,会检查是否需要回收该线程。
  • 当任务队列不为空时,取出任务,执行任务,更新状态并通知其他线程。

注意事项:

  • 确保在多线程环境中,对任务队列的操作是线程安全的,通过 taskQueMtx_ 互斥锁和 notEmpty_notFull_ 等条件变量的协调。
  • 对于 MODE_CACHED 模式,合理控制线程的回收时机,避免频繁创建和回收线程导致性能开销。
  • 当线程池停止时,要确保线程正确回收和资源释放。