异步线程池(基于C++11实现)

线程池是一种用于管理和重用线程的技术,广泛用于需要大量短生命周期线程的应用场景,如并发任务处理、网络服务和高性能计算等。使用线程池可以有效减少线程创建和销毁的开销,提升系统性能。本文将详细讲解线程池的基本概念、设计原则,并提供一个 C++ 实现的示例。

本文中涉及的C++11特性,不熟悉的话可以查阅C++11新特性文档

1. 线程池的设计

线程池的基本思想是预先创建一定数量的线程,并将它们放入一个池中。线程池负责管理线程的生命周期,并将任务分配给空闲线程执行。这样可以避免每次任务执行时都创建和销毁线程的开销。

如果要编写一个线程池,它的组成如下:

  1. 线程池管理器:负责创建、销毁线程,维护线程池状态(如空闲线程、忙碌线程)。
  2. 任务队列:用于存储待执行的任务。任务通常以函数对象(如 std::function)的形式存储。
  3. 工作线程:线程池中的实际线程,它们从任务队列中取出任务并执行。
  4. 同步机制:用于保护任务队列和线程池状态的线程安全操作,通常使用互斥锁和条件变量。

在设计线程池时,我们需要考虑以下几个重要原则:

  1. 线程池大小管理
    • 固定大小:线程池中的线程数量固定不变。适用于负载比较稳定的场景。
    • 动态调整:根据任务负载动态调整线程池大小。适用于负载变化较大的场景。
  2. 任务队列管理
    • FIFO 队列:最常用的任务队列实现方式,按照任务提交的顺序执行任务。
    • 优先级队列:根据任务的优先级执行任务,适用于需要按优先级处理任务的场景。
  3. 线程安全
    • 互斥锁:用于保护共享资源(如任务队列)的访问。
    • 条件变量:用于线程之间的通信,如通知空闲线程有新的任务到来。
    • 原子变量:对原子变量的操作都是原子操作,它是线程安全的。
  4. 任务执行与错误处理
    • 任务执行过程中的异常需要适当处理,以避免线程池中的线程因未捕获异常而终止。

2. 使用C++11实现线程池

2.1 头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#pragma once
#include <thread>
#include <mutex>
#include <vector>
#include <queue>
#include <atomic>
#include <functional>
#include <condition_variable>
#include <map>
#include <future>
using namespace std;

// 线程池类
class ThreadPool
{
public:
ThreadPool(int min = 4, int max = thread::hardware_concurrency());
~ThreadPool();
void addTask(function<void()> f);

private:
void manager();
void worker();
private:
thread* m_manager;
map<thread::id, thread> m_workers;
vector<thread::id> m_ids;
int m_minThreads;
int m_maxThreads;
atomic<bool> m_stop;
atomic<int> m_curThreads;
atomic<int> m_idleThreads;
atomic<int> m_exitNumber;
queue<function<void()>> m_tasks;
mutex m_idsMutex;
mutex m_queueMutex;
condition_variable m_condition;
};

相关函数介绍:

  • 构造函数:初始化线程池,并创建指定数量的工作线程和管理者线程。

  • 析构函数:设置 m_stop 标志为 true,通知所有线程退出,然后等待所有线程退出任务函数并释放线程资源。

  • std::thread::hardware_concurrency() 是 C++ 标准库中 std::thread 类的一个静态成员函数。

    • 用于查询计算机的硬件线程并发能力,即计算机上可以并发执行的线程数。
    • 通常,这个值等于 CPU 核心数或者 CPU 核心数乘以超线程(Hyper-Threading)技术带来的线程数。
  • manager():管理者线程的任务函数

  • worker():工作的线程的任务函数

  • addTask(function<void()> f):将任务添加到任务队列,并通知一个线程有新任务到来。

    • 线程的任务函数是一个可调用对象,类型为 function<void()>,几返回值类型为void,无参数
    • 对应的任务队列为queue<function<void()>> m_tasks

2.2 源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include "threadpool.h"
#include <iostream>

ThreadPool::ThreadPool(int min, int max) : m_maxThreads(max),
m_minThreads(min), m_stop(false), m_exitNumber(0)
{
//m_idleThreads = m_curThreads = max / 2;
m_idleThreads = m_curThreads = min;
cout << "线程数量: " << m_curThreads << endl;
m_manager = new thread(&ThreadPool::manager, this);
for (int i = 0; i < m_curThreads; ++i)
{
thread t(&ThreadPool::worker, this);
m_workers.insert(make_pair(t.get_id(), move(t)));
}
}

ThreadPool::~ThreadPool()
{
m_stop = true;
m_condition.notify_all();
for (auto& it : m_workers)
{
thread& t = it.second;
if (t.joinable())
{
cout << "******** 线程 " << t.get_id() << " 将要退出了..." << endl;
t.join();
}
}
if (m_manager->joinable())
{
m_manager->join();
}
delete m_manager;
}

void ThreadPool::addTask(function<void()> f)
{
{
lock_guard<mutex> locker(m_queueMutex);
m_tasks.emplace(f);
}
m_condition.notify_one();
}

void ThreadPool::manager()
{
while (!m_stop.load())
{
this_thread::sleep_for(chrono::seconds(2));
int idle = m_idleThreads.load();
int current = m_curThreads.load();
if (idle > current / 2 && current > m_minThreads)
{
m_exitNumber.store(2);
m_condition.notify_all();
unique_lock<mutex> lck(m_idsMutex);
for (const auto& id : m_ids)
{
auto it = m_workers.find(id);
if (it != m_workers.end())
{
cout << "############## 线程 " << (*it).first << "被销毁了...." << endl;
(*it).second.join();
m_workers.erase(it);
}
}
m_ids.clear();
}
else if (idle == 0 && current < m_maxThreads)
{
thread t(&ThreadPool::worker, this);
cout << "+++++++++++++++ 添加了一个线程, id: " << t.get_id() << endl;
m_workers.insert(make_pair(t.get_id(), move(t)));
m_curThreads++;
m_idleThreads++;
}
}
}

void ThreadPool::worker()
{
while (!m_stop.load())
{
function<void()> task = nullptr;
{
unique_lock<mutex> locker(m_queueMutex);
while (!m_stop && m_tasks.empty())
{
m_condition.wait(locker);
if (m_exitNumber.load() > 0)
{
cout << "----------------- 线程任务结束, ID: " << this_thread::get_id() << endl;
m_exitNumber--;
m_curThreads--;
unique_lock<mutex> lck(m_idsMutex);
m_ids.emplace_back(this_thread::get_id());
return;
}
}

if (!m_tasks.empty())
{
cout << "取出一个任务..." << endl;
task = move(m_tasks.front());
m_tasks.pop();
}
}

if (task)
{
m_idleThreads--;
task();
m_idleThreads++;
}
}
}

void calc(int x, int y)
{
int res = x + y;
cout << "res = " << res << endl;
this_thread::sleep_for(chrono::seconds(2));
}

int main()
{
ThreadPool pool(4);
for (int i = 0; i < 10; ++i)
{
auto func = bind(calc, i, i * 2);
pool.addTask(func);
}
getchar();
return 0;
}

上面代码的处理逻辑和C语言版线程池 以及C语言改C++版线程池 这两个版本的线程池是相同的,只不过是使用了C++11 中提供的类替换了C语言的API,整体上来看的话代码更简洁,可读性更强。

  • void worker(): 工作的线程的任务函数

    • 工作的线程在该函数内部通过while (!m_stop.load())循环重复的进行从任务队列取数据 -> 处理数据的动作
    • 如果线程池没关闭并且任务队列为空,工作的线程被条件变量阻塞
    • 通过addTask往任务队列中添加了新任务之后,可以唤醒被条件变量阻塞的线程
  • void manager():管理者线程的任务函数

    • 通过睡眠的方式每隔一段时间对线程池中的工作进行进行一次检测
      • 如果空闲线程数量 > 总线程数量的一半并且线程总数量大于最小线程数量,销毁两个工作的线程
      • 空闲线程为0并且线程总数量小于最大线程数,添加一个工作线程
  • void addTask(function<void()> f)

    • 添加新的任务到任务队列
    • 通过notify_one()唤醒一个被阻塞的工作线程
  • 关于 main() 函数中的测试代码

    • 通过添加任务的函数addTask可以得知任务函数返回值类型是void并且无参,但是测试程序中添加的任务函数是void calc(int x, int y)携带了两个参数,很显然和任务函数的参数是类型是不匹配的,程序中的解决方案是对函数和参数进行绑定:

      1
      auto func = bind(calc, i, i * 2);

      这样得到的可调用对象就是无参的。通过这种方式表面看起来只能存储无参函数的任务队列就可以存储带任意参数的有参函数了。

    • getchar();通过该函数阻塞主线程,程序运行完毕之后,按任意键解除阻塞,主线程执行完毕,程序(进程)也就退出了。

3. 线程异步

线程异步(Asynchronous Threading)是一种编程范式,用于执行任务或操作而不阻塞主线程或其他线程的执行。这种方法特别适用于需要同时处理多个操作或在后台执行长时间运行的任务的场景。线程异步的核心思想是将耗时的操作与主执行流程分离,使得系统能够继续处理其他任务,而无需等待耗时操作完成

  • 异步执行:与同步操作不同,异步操作不要求调用者在任务完成之前等待结果。异步操作通常会在后台线程中执行,主线程或其他线程可以继续执行其他任务。
  • 线程:在多线程编程中,异步操作通常通过创建新的线程来实现。新线程会执行异步任务,而主线程则继续进行其他操作。

在上面的线程池代码中,如果任务函数有返回值,我们就可以通过线程异步的方式,将子线程也就是工作的线程的返回值传递给主线程,核心操作就是修改添加任务的函数addTask

为了让线程池添加任务的函数更加灵活和通过以及实现线程异步,我们需要将其修改成一个模板函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#pragma once
#include <thread>
#include <mutex>
#include <vector>
#include <queue>
#include <atomic>
#include <functional>
#include <condition_variable>
#include <map>
#include <future>
using namespace std;

// 线程池类
class ThreadPool
{
public:
ThreadPool(int min, int max = thread::hardware_concurrency());
~ThreadPool();
template<typename F, typename... Args>
auto addTask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type>
{
using returnType = typename result_of<F(Args...)>::type;
auto task = make_shared<packaged_task<returnType()>>(
bind(forward<F>(f), forward<Args>(args)...)
);
future<returnType> res = task->get_future();
{
unique_lock<mutex> lock(m_queueMutex);
m_tasks.emplace([task]() { (*task)(); });
}
m_condition.notify_one();
return res;
}

private:
void manager();
void worker();
private:
thread* m_manager;
map<thread::id, thread> m_workers;
vector<thread::id> m_ids;
int m_minThreads;
int m_maxThreads;
atomic<bool> m_stop;
atomic<int> m_curThreads;
atomic<int> m_idleThreads;
atomic<int> m_exitNumber;
queue<function<void()>> m_tasks;
mutex m_idsMutex;
mutex m_queueMutex;
condition_variable m_condition;
};

关于模板函数addTask的相关细节解释如下:

  • 模板参数 FArgs...
    • F 是一个类型参数,代表任务函数的类型或函数对象的类型。这个函数或函数对象将被传递给 addTask 函数来执行。
    • Args... 是可变参数模板,表示传递给 F 的参数类型。Args 可以是任何数量的参数类型。
  • auto 返回类型:
    • 返回类型是 future<typename result_of<F(Args...)>::type>,表示 addTask 函数会返回一个 future 对象,用于异步获取任务的结果。result_of<F(Args...)>::type 用于推导 F 运行后的返回类型。
    • 通过使用 typename,我们明确告诉编译std::result_of<F(Args...)>::type 是一个类型,而不是其他实体(例如静态成员)。
  • using returnType
    • 使用 result_of<F(Args...)>::type 来获取任务函数 F 执行后的返回类型,并将其命名为 returnType
  • make_shared<packaged_task<returnType()>>
    • std::make_shared 是一个模板函数,用于创建并返回一个 std::shared_ptr,它以一种异常安全的方式分配和构造对象。这里,std::make_shared 用于创建一个指向 std::packaged_task 的共享指针。
    • std::packaged_task 是一个模板类,用于包装一个可调用对象(如函数、lambda 表达式、函数对象等),使其可以异步执行,并允许获取其执行结果。std::packaged_task 提供了一个 std::future 对象,通过该对象可以在任务完成后获取其结果。
    • returnType() 表示 packaged_task 将封装一个返回类型为 returnType 的任务。
  • bind(forward<F>(f), forward<Args>(args)...) :
    • std::bind 是一个标准库函数模板,用于绑定参数到可调用对象上,返回一个新的可调用对象。这里,std::bind 绑定了传入的函数 f 和参数 args...,生成一个不接受参数的新函数对象。
    • std::forward 是一个模板函数,用于完美转发参数。addTask函数的参数(F&& f, Args&&... args)为未定引用类型,std::forward保留了参数的值类别(左值或右值),以确保参数在转发过程中不会被不必要地拷贝或移动。
  • task->get_future()
    • get_future 返回一个 future 对象,这个对象用于获取异步任务的结果。
  • 任务队列
    • unique_lock<mutex> lock(m_queueMutex) 用于加锁,确保线程安全地将任务加入任务队列。
    • m_tasks.emplace([task]() { (*task)(); }); 将任务添加到任务队列中。这里使用了一个 lambda 函数来调用 (*task)(),即执行封装的任务。
  • 通知条件变量
    • m_condition.notify_one() 用于通知等待的线程(如果有的话)任务队列中有新的任务可用。

然后再来看一下测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int calc(int x, int y)
{
int res = x + y;
//cout << "res = " << res << endl;
this_thread::sleep_for(chrono::seconds(2));
return res;
}

int main()
{
ThreadPool pool(4);
vector<future<int>> results;

for (int i = 0; i < 10; ++i)
{
results.emplace_back(pool.addTask(calc, i, i * 2));
}

// 等待并打印结果
for (auto&& res : results)
{
cout << "线程函数返回值: " << res.get() << endl;
}

return 0;
}

关于std::future 类的 get() 方法再来给大家详细说明一下:

  • 阻塞行为

    当调用 get() 方法时,如果异步操作还没有完成,调用线程会被阻塞,直到异步操作完成并且结果准备好为止。这意味着调用 get() 会暂停线程的执行,直到可以安全地获取到结果。

  • 返回值

    一旦异步操作完成,get() 方法会返回存储在 std::future 中的结果。如果异步操作抛出了异常,get() 方法会重新抛出那个异常,所以你需要准备好处理可能的异常。

  • 一次性调用

    get() 方法只能调用一次。调用一次后,future 对象会变得无效,如果你尝试再次调用 get(),程序会抛出 std::future_error 异常。

因此,在测试程序中我们通过vector<future<int>> results;容器存储了各个工作的线程返回的future对象,并通过它的get()方法将数据取出并打印了出来。