并行算法

1. 什么是并行算法

在 C++17 之前,当我们使用标准库算法(如 std::sort, std::accumulate)时,程序是串行执行的。

想象一下我们要洗一盆子的脏衣服:

  • 串行执行:一件一件地洗,洗完第一件再洗第二件。即使有十个空闲的水槽,也只用了一个。
  • 并行执行:有五个朋友(CPU 核心),大家一人抓几件衣服同时洗,速度自然快得多。

以前,如果想利用多核加速排序或遍历,必须手动写复杂的线程代码,不仅难写,还容易出错。C++17 为几乎所有的标准算法(约 70 个)引入了执行策略参数。现在,只需要在调用算法时多加一个参数,编译器和标准库就会自动利用多核 CPU 来帮我们加速计算,完全不需要自己写线程代码。

要使用并行算法,需要包含头文件:

1
2
3
4
#include <algorithm>    // 算法头文件
#include <execution> // 并行执行策略头文件
// 执行策略位于std::execution命名空间
using namespace std::execution;

C++17 定义了三种主要的执行策略,执行策略是一个枚举类型,分为以下几类:

  1. std::execution::seq (Sequential)
    • 含义:串行执行,默认行为
    • 行为:算法在单个线程上按顺序执行,保证函数调用的顺序与顺序迭代器的顺序一致。
    • 用途:作为默认行为的显式声明,或者在调试时使用。
  2. std::execution::par (Parallel)
    • 含义:并行执行。
    • 行为:算法会将任务拆分,分配到多个线程上并行执行,线程间需要同步访问共享数据。
    • 核心重点:线程之间的执行顺序是不确定的,且元素索引的访问顺序与函数调用顺序没有关系。
  3. std::execution::par_unseq (Parallel + Vectorized)
    • 含义:并行执行 + 向量化执行。
    • 行为:不仅使用多线程,还允许使用单指令多数据指令集进行硬件级别的加速。
    • 限制:对代码中的函数体要求最严,不能包含锁或内存分配等操作(对元素的访问不能有数据竞争)。

C++17标准库中支持并行执行的算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 排序算法
std::sort
std::stable_sort
std::partial_sort

// 查找算法
std::find
std::find_if
std::find_if_not
std::search
std::search_n

// 计数算法
std::count
std::count_if

// 变换算法
std::transform
std::for_each
std::for_each_n

// 归约算法
std::reduce
std::transform_reduce

// 扫描算法
std::inclusive_scan
std::exclusive_scan
std::transform_inclusive_scan
std::transform_exclusive_scan

// 其他算法
std::fill
std::generate
std::replace
std::replace_if
std::copy
std::copy_if
std::move
std::unique
std::reverse
std::rotate

开启算法并行语法非常简单:在算法的第一个参数位置传入执行策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <vector>
#include <algorithm> // 算法
#include <execution> // 执行策略

int main()
{
std::vector<int> v(10000000);
// 填充数据...
for(int i=0; i<v.size(); ++i) v[i] = i;

// --- 旧方式 (串行) ---
// 单线程排序
std::sort(v.begin(), v.end());

// --- 新方式 (并行) ---
// 多线程排序,自动利用多核 CPU
// 编译器会自动根据 CPU 核心数分配线程
std::sort(std::execution::par, v.begin(), v.end());

return 0;
}

2. 必须警惕的巨坑

因为并行算法会同时修改数据,如果我们不注意线程安全,程序会崩溃或产生错误结果。

2.1 数据竞争

std::execution::par 模式下,传递给算法的函数(如 Lambda 表达式)可能会被多个线程同时调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <vector>
#include <algorithm>
#include <execution>
#include <iostream>

int main()
{
std::vector<int> v = {1, 2, 3, 4, 5, 6, 7, 8, 9};
int sum = 0;

std::for_each(std::execution::par, v.begin(), v.end(), [&](int x) {
sum += x; // 危险!!!
});

std::cout << "Sum: " << sum << std::endl;
return 0;
}

在上面的示例中多个线程同时执行 sum += x,这是一个经典的竞态条件,结果是不确定的。正确的处理方式是使用 std::atomic 或者使用 std::reduce 代替 for_each 进行求和。

  • 使用原子操作

    1
    2
    3
    4
    5
    6
    #include <atomic>
    // ...
    std::atomic<int> sum = 0; // 使用原子变量保证线程安全
    std::for_each(std::execution::par, v.begin(), v.end(), [&](int x) {
    sum += x;
    });
  • 使用专门的归约算法std::reduce

    std::reduce 是 C++17 新增的,专门用于并行环境下的求和(见下文)。

2.2 死锁与互斥锁

在并行策略中,代码会在多个线程中运行。如果在 Lambda 中加锁:

1
2
3
4
5
std::mutex mtx;
std::for_each(std::execution::par, v.begin(), v.end(), [&](int& x) {
std::lock_guard<std::mutex> lock(mtx);
// 访问 x
});

这段代码存在非常严重的问题,它会导致并行算法完全失效,甚至比单线程执行更慢,并可能带来严重的性能问题甚至死锁风险。以下是详细分析:

  1. 并行算法失效(串行化执行)
    • std::for_eachstd::execution::par 策略会将任务分发给多个线程并行执行。
    • Lambda 中使用 std::lock_guard<std::mutex> 会导致所有线程串行化竞争同一把锁(mtx),因为每次迭代都需要先获取锁才能访问 x
    • 这相当于把并行任务变成了串行执行,完全失去了并行的意义。
  2. 锁竞争导致性能下降
    • 如果 v 很大(例如 100 万个元素),线程会频繁竞争锁,导致上下文切换开销极大。
    • 测量时间通常会发现,这段代码的执行时间比单线程版本更慢(因为锁竞争的开销)。
  3. 可能的死锁风险
    • 如果在锁内又有其他需要同步的操作(例如调用外部函数而该函数内部也尝试加锁),可能导致死锁
    • 即使没有死锁,锁的长时间持有也会进一步加剧性能问题。

使用 std::execution::par 时,除了 std::for_each(它保留了先后顺序的语义但不保证调用顺序)等极少数算法外,大多数算法对元素的访问顺序是不确定的。

2.3 par_unseq 的特殊限制

par_unseq 的特殊限制是不允许阻塞,这是最隐晦的特性。当使用 par_unseq 时,意味着我们的代码可能会在 单指令多数据向量中被执行。此时,如果代码中包含:

  1. 互斥锁
  2. 内存分配 (new, malloc) —— 分配器内部可能有锁
  3. 任何可能阻塞线程或挂起的函数调用

程序将直接崩溃或产生未定义行为。par_unseq 要求代码是可向量化(简单来说,就是允许编译器或运行时系统将循环转换为向指令,从而极大地提高计算密集型任务的性能)的。

3. 新增算法

std::accumulate 是我们熟悉的旧式求和算法,但在并行环境下,它有一个问题:它要求必须严格按照从左到右的顺序相加(例如 (((1+2)+3)+4))。这种顺序依赖导致它很难并行化。

C++17 引入了 std::reduce允许以任意顺序结合元素(例如((1+2) + (3+4)) + ...),这种方式可以将大数组切分成小块,分配给不同线程分别计算,最后再合并结果。

3.1 std::reduce

在 C++ 标准库中,std::reduce 有多个重载版本,函数原型:

1
2
3
4
5
6
7
8
9
10
11
//位于 <numeric> 头文件
template< class ExecutionPolicy, class ForwardIt, class T, class BinaryOp >
T reduce( ExecutionPolicy&& policy,
ForwardIt first, ForwardIt last,
T init,
BinaryOp op );

template<class ExecutionPolicy, class T, class InputIt>
T reduce(ExecutionPolicy&& exec,
InputIt first, InputIt last,
T init); // 注意:这里没有 BinaryOp 参数
  • ExecutionPolicy: 执行策略(如 std::execution::seq, par, par_unseq)。
  • ForwardIt: 迭代器范围。
  • T: 初始值的类型。
  • BinaryOp: 可调用对象(二元操作符),签名等效于 T fun(const T1&, const T2&)

当省略最后一个 BinaryOp 参数时,编译器会自动调用没有该参数的重载函数。这个简化的重载函数内部,等价于使用了 std::plus<>{} 对范围内的所有元素进行累加。

使用 std::reduce 时,操作必须满足以下数学性质,否则结果是未定义的(特别是并行运行时)。

  1. 结合律:操作必须可以重新分组 –(a op b) op c 必须等于 a op (b op c)

    • 加法(1+2)+3 = 1+(2+3),可以用 std::reduce
    • 乘法:同上。
    • 减法(10-5)-2 = 3,但 10-(5-2) = 7不可用!
    • 除法:不可用。
  2. 单位元:这是与 std::accumulate 最容易混淆的地方。

    如果没有显式提供初始值,std::reduce 会默认使用 T()(即值初始化,通常是 0)。这个初始值不仅要作为起点,还可能被多次用于合并子节点的结果。在并行树状结构中,某个叶子节点可能实际上执行的是 init op element。因此,init op x 结果必须等于 x

    • 对于加法,初始值 00 + x = x。正确。
    • 对于乘法,初始值 11 * x = x。正确。

    警告:在做自定义操作(如矩阵乘法或字符串拼接)时,请务必手动传入正确的 init 值,不要依赖默认值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
#include <vector>
#include <numeric> // std::reduce
#include <execution> // 执行策略

int main()
{
// 创建一个包含 1 到 100 的 vector
std::vector<int> v(100);
std::iota(v.begin(), v.end(), 1); // 填充 1, 2, ..., 100
// --- 多线程并行执行 (性能提升) ---
int sum_par = std::reduce(
std::execution::par,
v.begin(), v.end(),
0 // 初始值
);
std::cout << "Parallel Sum: " << sum_par << std::endl; // 5050

// 计算乘积
std::vector<int> v1 = {1, 2, 3, 4, 5};
// 使用 std::multiplies 函数对象
auto product = std::reduce(
std::execution::par,
v1.begin(), v1.end(),
1, // 乘法的单位元
std::multiplies<int>()
);
std::cout << "Product: " << product << std::endl; // 输出: 120

// 比较长度,保留更长者
std::vector<std::string> words = {
"Apple", "Banana", "Pear", "Watermelon", "Kiwi"
};
auto longer_string = std::reduce(
std::execution::par,
words.begin(), words.end(),
std::string(""), // 初始值:空字符串
[](const std::string& a, const std::string& b) {
return a.size() > b.size() ? a : b;
}
);
std::cout << "Longest word: " << longer_string << std::endl;
}
  • 计算乘法满足结合律,因此可以安全使用 std::reduce。注意初始值必须是 1(乘法的单位元)。
  • 在寻找最长字符串的时候,为了处理空vector的情况,初始值通常设为空字符串,或者你期望的默认值。

3.2 std::transform_reduce

std::transform_reduce 是 C++17 引入的一个非常强大的算法,可以简单理解为 std::transform(变换) 和 std::reduce(归约) 的结合体,或者说是映射-归约模式的 C++ 标准库实现。

它的核心作用是:先对范围内的每个元素进行一次操作(变换),然后将变换后的结果进行归约(累加等)。

相比于先写一个 transform 循环再写一个 reduce 循环,transform_reduce 通常更高效,特别是在使用并行策略(par / par_unseq)时,因为它可以减少内存访问次数并允许编译器进行更深度的优化(如融合循环)。

std::transform_reduce 有两种截然不同的重载形式。

  1. Map-Reduce 模式(变换后归约):这是最常用的形式。

    1
    2
    3
    4
    5
    6
    7
    template<class ExecutionPolicy, class ForwardIt1, class ForwardIt2, class T, class BinaryOp1, class BinaryOp2>
    T transform_reduce(ExecutionPolicy&& policy,
    ForwardIt1 first1, ForwardIt1 last1,
    ForwardIt2 first2,
    T init,
    BinaryOp1 binary_op1, // 归约操作
    BinaryOp2 binary_op2); // 变换操作
    • 应用场景:如果有两个范围(例如两个向量)时,需要先对每一对元素做某种操作(比如相乘),然后把所有结果加起来(比如求内积)。
    • 参数
      • init: 归约的初始值。
      • binary_op1: 归约函数(通常是 std::plus),必须满足结合律。
      • binary_op2: 变换函数(例如 std::multiplies),作用于每一对元素 *(first1+i)*(first2+i)

    计算向量的内积是 Map-Reduce 模式的典型应用。向量内积用最通俗的话说,它的意思是:两个向量中对应位置的元素相乘,然后把所有乘积累加起来,得到一个总和。

    假设有两个容器里边分别记录了水果的数量和单价,我们要计算这些水果算总价:

    • vector A(数量):3个苹果,2个香蕉,5个橘子。 -> vector A = [3, 2, 5]
    • vector B(单价):5元/苹果,3元/香蕉,2元/橘子。 -> vector B = [5, 3, 2]

    计算内积的过程就是算总价的过程:

    1. 苹果总价 = 数量 × 单价 = 3 × 5 = 15
    2. 香蕉总价 = 数量 × 单价 = 2 × 3 = 6
    3. 橘子总价 = 数量 × 单价 = 5 × 2 = 10
    4. 内积(总花费) = 15 + 6 + 10 = 31
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    #include <iostream>
    #include <vector>
    #include <numeric>
    #include <execution>

    int main()
    {
    std::vector<double> v1 = {1.0, 2.0, 3.0, 4.0};
    std::vector<double> v2 = {5.0, 6.0, 7.0, 8.0};

    // 使用 std::plus 归约,使用 std::multiplies 变换
    // 策略:par (并行)
    double dp = std::transform_reduce(
    std::execution::par,
    v1.begin(), v1.end(), v2.begin(), // 范围1 和 范围2的起点
    0.0, // 初始值
    std::plus<>(), // 归约:加法
    std::multiplies<>() // 变换:乘法
    );

    std::cout << "Dot Product: " << dp << std::endl;
    // 结果: 1*5 + 2*6 + 3*7 + 4*8 = 70
    }
  2. 单范围变换后归约:不需要第二个范围。

    1
    2
    3
    4
    5
    6
    template<class ExecutionPolicy, class UnaryOp, class T, class BinaryOp>
    T transform_reduce(ExecutionPolicy&& policy,
    ForwardIt first, ForwardIt last,
    T init,
    BinaryOp binary_op1, // 归约操作
    UnaryOp unary_op); // 变换操作

    应用场景:对容器中每个元素应用一元函数(如 x * x),然后累加。

    计算数组元素的平方和用到的就是单范围变换后归约,示例代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    #include <iostream>
    #include <vector>
    #include <numeric>
    #include <execution>

    int main()
    {
    std::vector<int> v = {1, 2, 3, 4, 5};

    // 目标:1*1 + 2*2 + 3*3 + ...
    auto sum_of_squares = std::transform_reduce(
    std::execution::par,
    v.begin(), v.end(),
    0, // 初始值
    std::plus<>(), // 归约:累加
    [](int x) { return x * x; } // 变换:平方 (Lambda)
    );

    std::cout << "Sum of Squares: " << sum_of_squares << std::endl; // 55
    }