C++-异步编程

引入

通常,我们需要异步执行一个函数时,通常使用std::thread创建一个线程,但当我们想要获取这个子线程返回的结果时,可能就需要使用引用或全局变量的方式。

这里引出std::asyncstd::async是一个函数模板,此会启动一个异步任务,执行任务,并最终将结果返回至std::future当中。

std::async相比std::thread有更好的控制性,可以进行延迟创建。

详细

std::future

首先,我们先看一下std::async的返回值std::futurestd::future是一个模板类,cppreference中对其的描述如下:
类模板std::future提供访问异步操作结果的机制:

  • (通过std::asyncstd::packaged_taskstd::promise创建的)异步操作能提供一个std::future对象给该异步操作的创建者。
  • 然后,异步操作的创建者能用各种方法查询、等待或从std::future提取值。若异步操作仍未提供值,则这些方法可能阻塞。
  • 异步操作准备好发送结果给创建者时,它能通过修改链接到创建者的std::future的共享状态(例如std::promise::set_value)进行。
    注意,std::future所引用的共享状态不与另一异步返回对象共享(与std::shared_future相反)。

成员函数

此处介绍几个常用函数

1
2
3
4
5
6
7
8
9
10
11
12
// 获取结果
get:等待直至future拥有合法结果并(依赖于使用哪个模板)获取它。它等效地调用wait等待结果。get的三个版本仅在返回类型有别。
// 状态
valid:检查 future 是否拥有共享状态。
wait:等待结果变得可用;阻塞直至结果变得可用;调用后valid() == true;若调用此函数前valid() == false则行为未定义。
wait_for:等待结果变得可用。阻塞直至经过指定的timeout_duration,或结果变为可用,两者的先到来者。返回值鉴别结果的状态。
wait_until:等待结果变为可用。它阻塞直至抵达指定的timeout_time,或结果变为可用,两者的先到来者。返回值指示wait_until为何返回。

上述两个函数的返回值:
- future_status::deferred:共享状态持有的函数正在延迟运行,结果将仅在显式请求时计算;
- future_status::ready:共享状态就绪;
- future_status::timeout:共享状态在经过指定的等待时间内仍未就绪;

使用

1
2
3
4
5
6
... 
std::future<int> fu = std::async(fun, 1);
// 使用get获取结果,如果操作未结束,则会阻塞并等待结果。
// wait只是等待操作结束,并不能获取结果。
std::cout << fu.get() << std::endl;
...

std::packaged_task

类模板std::packaged_task包装任何可调用(Callable)目标(函数、 lambda表达式、bind表达式或其他函数对象),使得能异步调用它。其返回值或所抛异常被存储于能通过std::future对象访问的共享状态中。

成员函数

此处介绍几个常用函数

1
2
3
4
5
6
7
// 获取结果
get_future:返回与承诺的结果关联的std::future。
// 执行
operator():以args为参数调用存储的任务。任务返回值或任何抛出的异常被存储于共享状态。令共享状态就绪,并解除阻塞任何等待此操作的线程。
make_ready_at_thread_exit:以转发的args为参数调用存储的任务。任务返回值或任何抛出的异常被存储于 *this 的共享状态。
在当前线程退出,并销毁所有线程局域存储期对象后,才令共享状态就绪。
reset:重置状态,抛弃先前执行的结果。构造共享状态。

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <future>
#include <iostream>
#include <thread>

int fib(int n)
{
if (n < 3) return 1;
else return fib(n-1) + fib(n-2);
}

int main()
{
// 包装
std::packaged_task<int(int)> fib_task(&fib);
std::cout << "starting task\n";
// 获取future对象
auto result = fib_task.get_future();
// 创建线程
std::thread t(std::move(fib_task), 42);
std::cout << "waiting for task to finish..." << std::endl;
std::cout << result.get() << '\n';

std::cout << "task complete\n";
t.join();
}

std::promise

类模板std::promise提供存储值或异常的设施,之后通过std::promise对象所创建的std::future对象异步获得结果。简单来说,它的作用是在不同的线程中实现数据的同步,与future结合使用,也间接实现了future在不同线程间的同步。

成员函数

此处介绍几个常用函数

1
2
3
4
5
6
7
8
9
// 获取结果
get_future:返回与承诺的结果关联的future。
// 设置结果
set_value:设置结果为指定值,立即分发提醒。
set_value_at_thread_exit:设置结果为指定值,同时仅在线程退出时分发提醒。
set_exception:设置结果为指示异常,立即分发提醒。
set_exception_at_thread_exit:设置结果为指示异常,同时仅在线程退出时分发提醒。

set_value、set_exception、set_value_at_thread_exit和set_exception_at_thread_exit的操作表现类似。在更新promise对象时获得单个与promise对象关联的互斥。

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>

void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> accumulate_promise)
{
int sum = std::accumulate(first, last, 0);
// 提醒future
accumulate_promise.set_value(sum);
}

void do_work(std::promise<void> barrier)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
barrier.set_value();
}

int main()
{
// 演示用 promise<int> 在线程间传递结果。
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
std::move(accumulate_promise));

// future::get() 将等待直至该 future 拥有合法结果并取得它
// 无需在 get() 前调用 wait()
//accumulate_future.wait(); // 等待结果
std::cout << "result=" << accumulate_future.get() << '\n';
work_thread.join(); // wait for thread completion

// 演示用 promise<void> 在线程间对状态发信号
std::promise<void> barrier;
std::future<void> barrier_future = barrier.get_future();
std::thread new_work_thread(do_work, std::move(barrier));
barrier_future.wait();
new_work_thread.join();
}

需要注意的是在使用的过程中不能多次set_value,也不能多次get_future和多次get,因为一个promise对象只能和一个对象相关联,否则就会抛出异常。

std::async

函数模板async异步地运行函数f(潜在地在可能是线程池一部分的分离线程中),并返回最终将保有该函数调用结果的std::future
按照特定的执行策略policy,以参数args调用函数f

  • 若设置async标志(即(policy & std::launch::async) != 0),则async在新的执行线程(初始化所有线程局域对象后)执行可调用对象f
  • 若设置deferred标志(即(policy & std::launch::deferred) != 0),则async以同std::thread构造函数的方式转换fargs...,但不产出新的执行线程。对同一std::future的所有后续访问都会立即返回结果(future调用getwait时创建线程)。
  • 如果没有使用这个两个参数,也就是第一个参数为空的话,那么第一个参数默认为std::launch::async | std::launch::deferred,这个就不可控了,由操作系统根据当时的运行环境来确定是当前创建线程还是延迟创建线程。

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>

std::mutex m;
struct X {
void foo(int i, const std::string& str)
{
std::lock_guard<std::mutex> lk(m);
std::cout << str << ' ' << i << '\n';
}
void bar(const std::string& str)
{
std::lock_guard<std::mutex> lk(m);
std::cout << str << '\n';
}
int operator()(int i)
{
std::lock_guard<std::mutex> lk(m);
std::cout << i << '\n';
return i + 10;
}
};

int main()
{
X x;
// 以默认策略调用 x.foo(42, "Hello") :
// 可能同时打印 "Hello 42" 或延迟执行
auto a1 = std::async(&X::foo, &x, 42, "Hello");
// 以 deferred 策略调用 x.bar("world!")
// 调用 a2.get() 或 a2.wait() 时打印 "world!"
auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
// 以 async 策略调用 X()(43) :
// 同时打印 "43"
auto a3 = std::async(std::launch::async, X(), 43);
a2.wait(); // 打印 "world!"
std::cout << a3.get() << '\n'; // 打印 "53"
} // 若 a1 在此点未完成,则 a1 的析构函数在此打印 "Hello 42"