C++ 异步编程的奇妙之旅
笔者在这篇文章里,打算聊的是从古早的写法到现代版语言标准下,异步计算在多线程的演进。
一提到异步,大家可能想到的是多线程/进程编程,甚至分布式编程,这符合大方向,没错。不过,笔者在这篇文章里,打算聊的是从古早的写法到现代版语言标准下,异步计算在多线程的演进。
举个题目为例,假设有个比较费时的计算任务,需要放在后台线程(子线程)中执行,然后前台线程(当前线程)适时去读取计算结果,那么正在阅读本文的你会如何实现?
经典的味道
按照以前的思路(也就是古早写法),这需要首先创建新的线程,将任务函数和函数参数传入,还需要共享数据缓冲用于传递计算结果,再在跨线程中运用互斥量、条件变量、原子量等实现数据同步。
经典 C++ 时期,创建线程必须要依赖系统提供的线程库,比如 POSIX 的 pthread()。这种 API 都比较老旧,而且使用不方便,分分钟让你怀疑人生。
那么我们就来看看传统的写法:
#include <pthread.h>
#include <iostream>
#include <unistd.h>
struct data_t {
int a;
int b;
};
void *compute(void* arg) {
data_t *data = static_cast<data_t *>(arg);
int *sum = new int;
if (nullptr == sum) {
std::cerr << "create buff for task fail" << std::endl;
return nullptr;
}
*sum = data->a + data->b;
sleep(2); // 模拟耗时
return sum;
}
int main() {
pthread_t thread;
data_t data = {3, 4};
void *ret_val = nullptr;
if (pthread_create(&thread, nullptr, compute, &data) != 0) {
std::cerr << "create thread err!" << std::endl;
return -1;
}
if (pthread_join(thread, &ret_val) != 0) {
std::cerr << "get compute result err!" << std::endl;
return -2;
} else {
if (nullptr == ret_val) {
std::cerr << "get compute result fail!"
<< std::endl;
return -3;
} else {
std::cout << "get compute result:"
<< *static_cast<int*>(ret_val)
<< std::endl;
}
}
return 0;
}
compute 是任务函数,计算结果通过函数返回。为了模拟耗时动作,直接调用 sleep()。
POSIX 库提供的 pthread_create 用于创建子线程,传入第一个参数用于返回线程句柄,第二个参数是线程属性(空就是默认),第三个参数是固定类型为 void*(void*)
的线程函数(我们这里传入 compute),最后一个参数就是传递给线程函数的参数。
你看,线程函数都是固定格式的,传入的参数也被限制为空类型的指针,比较死板,所以任务函数的参数必须先封装在结构体内再以指针的形式传递。
pthread_join 用于等待线程函数返回并获取返回值。
从代码篇幅来看,有些啰里啰唆的。
运行输出:
get compute result:7
示例代码目标是对两个数(3 和 4)取和,运行结果 OK。
为什么 c++ 语言层面没有提供线程相关的集成类或者函数?
std::thread
进入 C++ 11 后,标准库提供了非常方便的类 std::thread 用来创建线程对象。
std::thread 又是如何使用?把上面的例子重写一遍:
#include <thread>
#include <chrono>
#include <iostream>
void compute(int a, int b, int **res) {
int *sum = new int;
*sum = a + b;
// 模拟耗时
std::this_thread::sleep_for(std::chrono::seconds(1));
if (nullptr == res) {
std::cerr << "can not transfer result" << std::endl;
} else {
*res = sum;
}
}
int main() {
int *ret_val = nullptr;
std::thread t(compute, 3, 4, &ret_val);
t.join();
if (ret_val != nullptr) {
std::cout << "get compute result:"
<< *ret_val
<< std::endl;
delete ret_val;
ret_val = nullptr;
}
return 0;
}
相比上一个使用系统 API 创建异步任务的做法,使用 std::thread 类要稍微简单一些,而且更灵活了,比如线程函数的参数变成了可变长参数。
但是,std::thread 提供的 join() 方法不能获取线程执行函数的返回值,所以需要利用其他方式获取异步计算结果。
为了获取异步计算的结果值,需要特意往子线程的执行函数传递缓存地址的指针 res,线程执行函数内会分配用于保存计算结果的堆缓存,堆缓存地址再通过 res 返回。
这样传递指针,虽然可以正常执行,但是会引入跨线程的数据共享问题,进而需要添加更多的同步措施。
当前的例子过于简单,没有过多的数据交互,仅需要在异步线程结束之后才读取缓存结果即可。如果面对多个线程竞争数据读写,那么编写的过程又会变得麻烦。
有没有更方便的特性可供使用?
std::promise 和 std::future
为了方便异步编程获取计算结果,C++ 11 同样提供了 std::promise 和 std::future 配合使用。
std::promise 是一个模板类,顾名思义就是承诺,用途是在异步任务中一旦计算完成,利用它履行赋值的承诺,通过 std::promise 对象的 set_value() 实现。
std::future 也是一个模板类,对象存储的是在将来会被赋值的结果。std::future 也提供了方法用于获取结果,比如 get(),在结果就绪之前,该方法会提供阻塞机制。结果是由 std::promise 对象提供,所以它俩是关联的。我们无需自己创建 std::future 对象,可通过 std::promise 对象的 get_future() 获取。
看到这里,你可能会瞬间感叹:标准库还有这等利器!
利器用上,赶紧把上一个例子重新实现一下,看效果:
#include <thread>
#include <future>
#include <utility>
#include <chrono>
#include <iostream>
void compute(int a, int b, std::promise<int> promise_) {
int sum = a + b;
// 模拟耗时
std::this_thread::sleep_for(std::chrono::seconds(1));
promise_.set_value(sum);
}
int main() {
std::promise<int> promise_;
std::future<int> future_ = promise_.get_future();
std::thread t(compute, 3, 4, std::move(promise_));
std::cout << "get compute result:"
<< future_.get()
<< std::endl;
t.join();
return 0;
}
在创建子线程之前,实例化 std::promise 的对象 promise_,利用 std::promise 对象的 get_future() 方法获取 std::promise 对象协助创建的 std::future 对象 future_。
创建 std::thread 对象时,在传入线程函数时,附带传入的函数参数中需要包括 promise_ 对象。我这里使用 std::move 将主线程中的 promise_ 对象转化为可移动的值,这样主线程中 main 函数的 promise_ 对象资源和状态都会被转移到线程函数中的新 promise_ 对象,包括和 future_ 对象的联系。
在线程函数中,一旦计算完成,只需要调用 promise_ 对象的 set_value() 方法,就可将结果赋值给 future_ 对象。
在主线程中,future_ 对象调用 get() 可返回被赋值的计算结果,如果计算结果未就绪,get() 会等待并阻塞当前线程。
std::promise::set_value 和 std::future::get() 仅能被调用一次,否则会抛出异常 std::future_error。如果你需要 get() 多次,可以改为使用 std::shared_future。
从上面的代码来看,std::promise 和 std::future 的使用减少了对线程间的数据同步问题的关心,大大简化互动过程。
但是,如果你细心看上面的示例代码,有个问题:为什么 std::thread 的 join() 必须在 future_.get() 之后才执行?如果我把它们的调用顺序反过来,等待任务线程结束并返回后,主线程才去读取 future_ 的值呢?
int main() {
std::promise<int> promise_;
std::future<int> future_ = promise_.get_future();
std::thread t(compute, 3, 4, std::move(promise_));
t.join();
std::cout << "get compute result:"
<< future_.get()
<< std::endl;
// t.join();
return 0;
}
编译输出:
get compute result:7
编译过程没有报错,计算结果也正常。
任务线程结束并返回后,和 future_ 对象关联的 promise_ 对象已经被释放和失效,可是这时 future_ 对象还能正常返回原来 promise_ 对象赋值的结果,说明 future_ 对象虽然由 promise_ 对象创建,但是内部资源不受 promise_ 对象同步释放。
std::packaged_task
在应用 std::promise 和 std::future 快速实现跨线程异步计算的实例代码里,其实有一个很不爽快的问题,任务函数的计算结果没有直观地通过 return 语句返回。
由于 std::thread 不能接受线程函数的返回值,导致需要将 std::promise 对象传递进入线程执行函数(任务函数)中,再利用这个 std::promise 对象传递计算结果给调用线程的 std::future 对象。
虽然目的是转存计算结果,但是任务函数变得冗余。简直就像,让你发现显示屏上多了根头发,怎么看怎么难受!
既然不喜欢这样的写法,那么我们就想办法优化。
任务函数的计算结果应该是直观地通过函数 return 语句返回:
int compute(int a, int b) {
int sum = a + b;
// 模拟耗时
std::this_thread::sleep_for(std::chrono::seconds(1));
return sum;
}
在创建 std::thread 对象时,原应传递的任务函数指针改为传递一个函数对象,其余参数照旧。这个函数对象应该内部调用了任务函数,并接收返回值,再通过 std::promise 传递到调用线程的 std::future 对象。
为了创建这个函数对象,可以封装一个类 my_packaged_task。这个类 my_packaged_task 也应该提供方法 get_future() 供调用线程获取与 std::promise 对象关联的 std::future 对象。
基于这种思路,原先的启动异步计算的过程可以如下:
int main() {
my_packaged_task<int(int, int)> task(compute);
std::future<int> f = task.get_future();
std::thread t(std::move(task), 3, 4);
t.detach();
std::cout << "get compute result:"
<< f.get() << std::endl;
return 0;
}
这样,只要实现了类 my_packaged_task,std::promise 就可以被隐藏起来了,再也不用关心它到底如何赋值,在哪里赋值。
为了适应任务函数的多样性,包括返回值的类型、型参的类型等,my_packaged_task 应该被定义为模版类。
template <typename Func>
class my_packaged_task;
template <typename Ret, typename... Args>
class my_packaged_task<Ret(Args...)> {
//...
};
先声明 my_packaged_task 的通用模板,再定义模板的特例化。Ret 代表任务函数的返回值类型,Args 代表任务函数的可变长参数的类型。
判断模版定义是不是特例化,可以依据模板类型后边是否有
<>
来决定。
实现 my_packaged_task 模板的特例化时,需要定义被隐藏的 std::promise 对象成员:
template <typename Ret, typename... Args>
class my_packaged_task<Ret(Args...)> {
private:
std::promise<Ret> promise_;
//...
};
为了能调用任务函数,也需要定义一个可调用对象成员:
template <typename Ret, typename... Args>
class my_packaged_task<Ret(Args...)> {
private:
//...
std::function<Ret(Args...)> func_;
public:
my_packaged_task(std::function<Ret(Args...)> func)
: func_(std::move(func)) {}
//...
};
如上,任务函数可以在实例化 my_packaged_task 类时指定。
std::thread 对象在启动线程执行函数(任务函数)时,会调用 my_packaged_task 函数对象的 () 操作符。所以可以在重写 my_packaged_task 类的 () 操作符时,调用任务函数,然后获得任务函数的返回值,再调用 std::promise 对象赋值。
template <typename Ret, typename... Args>
class my_packaged_task<Ret(Args...)> {
//...
public:
//...
void operator()(Args&&... args) {
try {
promise_.set_value(func_(std::forward<Args&&>(args)...));
} catch (...) {
promise_.set_exception(std::current_exception());
}
}
//...
};
与 std::promise 对象关联的 std::future 对象需要通过成员 get_future() 返回:
template <typename Ret, typename... Args>
class my_packaged_task<Ret(Args...)> {
//...
public:
//...
std::future<Ret> get_future() {
return promise_.get_future();
}
};
从以上的优化思路来看,my_packaged_task 就是一个任务函数和 std::thread 的适配器。
是的,在以上的过程中,我们实现了标准库里 std::packaged_task 的简化版。如果在 main 函数里直接把 my_packaged_task 替换成 std::packaged_task,可以直接编译,并且运行结果是一致的,所以我们可以直接使用 std::packaged_task 来简化任务函数的结果返回。
std::async
到目前为止,为了启动异步计算的过程还是需要创建一些和计算过程无关的对象,比如 std::packaged_task 和 std::thread 对象等,而真正有意义的就只有用于获取异步计算结果的 std::future 对象。
所以,我们可以大胆地设想一下,能不能继续优化,比如用一个函数或类对象就把 std::packaged_task 和 std::thread 对象等封装起来,对于用户来讲,无须关心它们,仅需要通过这个函数或者类获取 std::future 对象即可。
假设我们用函数 my_async 实现了上面的猜想,启动异步计算的过程就可简化成这样:
int main() {
std::future<int> f = my_async(compute, 3, 4);
std::cout << "get compute result:"
<< f.get() << std::endl;
return 0;
}
任务函数 compute 和前一个例子无异,已经是非常干净俐落的了,不打算再深究它。那么这个 my_async 到底该如何实现?
从最终简化的目标来看,启动异步计算的过程省略了这部份的代码:
std::packaged_task<int(int, int)> task(compute);
std::future<int> f = task.get_future();
std::thread t(std::move(task), 3, 4);
t.detach();
那么是否可以直接把这段代码封装起来?应该是可以的,但是基于 compute 的形式是可变化的,比如上面的这段代码里 compute 的类型为 int(int, int),如果换成其它类型又如何?可见 my_async 应该是用模版函数实现:
template <typename Func, typename... Args>
std::future<typename std::result_of<Func(Args...)>::type>
my_async(Func&& func, Args&&... args) {
using Result_t = typename std::result_of<Func(Args...)>::type;
std::packaged_task<Result_t(Args...)> task(func);
std::future<Result_t> future = task.get_future();
std::thread t(std::move(task), args...);
t.detach();
return future;
}
这里为什么必须要用 t.detach()
呢?是否可换成 t.join()
?
my_async 的目标是启动异步计算,那么调用 my_async 的时候我们假设也是不能阻塞的,所以线程应该被放飞而不是等待线程函数执行返回。
事实上,标准库也提供了 std::async() 模版函数实现以上的优化,但是 std::async() 实现的功能更完善,不仅仅是提供立刻执行的异步处理,还提供了延迟的同步处理。
std::async() 提供的立刻执行的异步处理就和上面我们实现的 my_async 功能一样,使用形式还需要添加指明使用的策略 std::launch。
策略有三个:
-
std::launch::async,保证行为是异步的,任务函数是在子线程中被执行。
-
std::launch::deferred,不是异步行为,任务函数会被延迟执行,比如调用
std::future::get()
或std::future::wait()
时,才触发执行,而且不会创建新线程。总结起来,就是需要结果时才执行计算。 -
默认值,类似于 std::launch::async | std::launch::deferred,是否是异步执行,依赖于系统负载,用户无法控制。
更多
比较新的 C++ 20 还提供了 std::jthread 和协程(coroutine),边幅有限,暂不继续展开了,有空再聊这个话题。
祝大家假期愉快!