这篇文章主要讲了当前流行的异步编程以及协程语法在 Rust 和 C++ 的基本应用,怎么封装自己的协程,以及有哪些实现细节需要注意。
协程出现的时间其实很早,最早的操作系统运行程序就是使用协程的方式,这个时候操作系统和应用程序互相协作,处于平等的地位,如果应用程序不主动返回到操作系统,那么操作系统将永远没有机会执行其他操作。
协程其实是一个比较宽泛的概念,粗略地讲,普通函数调用后,所有的局部变量都会销毁,也无法保存状态,而协程在返回之后,会保持自己的状态,在下次调用的时候,可以根据这个状态从函数的中间继续运行。例如,编译器中的词法分析器(Lexer),在每次调用之后都会根据当前的分析状态,解析出下一个词法单元给语法分析器,在返回之后,会保存当前的位置信息提供给下一次调用。这个简单的例子也可以认为是一个协程:
int counter() {
static int count = 0;
return count++;
}
通过将 count
变量声明为 static
类型,在函数返回之后,count
的值不会被重置,我们可以多次调用 counter
函数,得到一个递增的整数序列。复杂一点来说,假如我们在编写 HTTP 服务器,那么我们可以写出一个这样的状态机函数:
void serve_http() {
static int step = 0;
static int fd = ...;
static char buffer[BUFFER];
int n = 0;
for (;;) {
switch (step) {
case 0: // Parse header
n = read(fd);
if (n == EWOULDBLOCK) return;
// ...
step = 1;
case 1: // Parse body
// ...
case 2: // Send response
// ...
}
}
}
可以看到,每次调用这个函数的时候,都会从 static
变量中读取之前保存的状态,看看目前应该是解析 Header 还是 Body,而如果发现还没有数据可以读的时候,就返回,让这个线程的别的函数有机会继续执行。
当然,如果使用 static
变量,那么同时只能处理一个请求,换个方法,我们可以先分配好保存状态所需的内存,然后在每次调用这个函数的时候,都将状态作为参数传入,这样我们就可以使用一个函数处理多个请求了:
struct client_state {
int step, fd;
char buffer[BUFFER];
};
void serve_http(struct client_state *state) {
// Use state...
}
而主函数其实也非常简单,通过 select
、epoll
等系统调用,在新的 I/O 事件发生的时候获取对应的状态指针,然后调用协程函数就可以了:
int main() {
for (;;) {
epoll_wait(...);
for (struct epoll_event* p = head; p != NULL; p = p->next) {
struct state* state = (struct state *) p->data;
serve_http(state);
}
}
return 0;
}
所以其实协程并不是什么很复杂深奥和神秘的东西,它就是方便我们异步编程的一种编程方式而已。
上面这种只保存需要的状态的协程,使用状态机的方式实现具体逻辑的,叫做无栈协程。无栈协程优势在于性能好、占用空间少,只需要保存需要的状态,但是可以看出其编程十分复杂,和平时我们直接调用函数相比有很大差别。另外,如果在协程中想要继续调用协程,则需要状态的嵌套。例如,假如发送回复的时候,我们想调用一个单独的 write_response
函数,里面又分为写 Header 和写 Body 两个步骤,那么我们可以写:
struct resp_state { int step, fd; int finished; /* ... */ };
void write_response(struct resp_state *state) {
for (;;) {
switch (state->step) {
case 0:
// ...
case 1:
// ...
}
}
}
那么我们要怎么在 serve_http
里面调用这个函数呢?一种办法是,我们在 client_state
里面保存一个 resp_state
指针,然后,在状态处于发送回复的时候,我们就在状态机中,调用 write_response
函数,然后看看 resp_state
里面的 finished
是否为 1
,是就说明发送回复的状态机也执行完了,可以进行下一步操作了,否则就直接返回,等待下一次 I/O 事件。这种方法就是自顶向下的方法,每次都要从 serve_http
这个最上层的协程开始执行,并通过它来驱动子协程。
另一种办法,则需要我们对代码的整体结构做一些修改,在调用 write_response
之后,其实我们没必要每次都从 serve_http
开始往下执行,我们可以告诉主循环,让它下次收到 I/O 事件的时候,直接调用 write_response
。但这时候,resp_state
就需要保存指向 client_state
的指针,在自己执行完成之后,调用 write_response
,并且重新设置相应的数据结构,在下次 I/O 事件的时候,调用 write_response
。
例如:
struct connection {
(void *f)(void *state);
void *state;
int fd;
}
struct resp_state { /* ... */ (void *parent)(void *state); void *parent_state; };
void write_response(struct resp_state *state) {
for (;;) {
switch (state->step) {
case 0:
// ...
case 1:
// ...
}
}
state->parent(state->parent_state);
}
int main() {
// ...
for (/* each event p*/) {
struct connection *c = (struct connection *) p->data;
c->f(c->state);
}
// ...
}
这种方法就是自底向上的方法,每次继续执行的时候,都从最后一个被调用的协程开始执行,并且子协程会保存返回点,在执行完成后返回到父协程。这样,就节省了从顶级协程一路调用下来的开销。
当然,既然有无栈协程,那么就有有栈协程。有栈协程相比无栈协程,就简单得多了。它就是在协程需要主动返回的时候,将当前的调用栈保存到堆内存中,这样就实现了状态的保存。具体来说,其实就是将系统调度线程的逻辑实现到了用户态中。而执行函数的切换,也和操作系统中切换线程进程的逻辑一样,设置 CPU 相关的寄存器,尤其是栈指针,然后跳转到指定的函数地址就完成了切换。那么问题就在于应该跳转到哪?如果像操作系统一样,那么所有协程不管如何调用,只要遇到需要挂起的情况,就将控制权转移给一个中心的调度器,调度器再选择下一个需要执行的协程继续执行,这就是对称协程。而像上面那种,在需要挂起的时候,一路返回到一开始调用的函数的,就是非对称协程了。
上面其实就是协程的一些基本概念了,但是比实际场景的协程还缺少了一些东西。例如,协程要怎么返回值呢?这里同样有多种解决办法,在自底向上的模型中,可以使用一个共享的变量来交换信息,例如在 client_state
里面添加一个 resp_ret
变量,write_response
函数可以设置这个变量,然后在继续执行 serve_http
的时候,就可以通过读取这个变量获得返回值。而在自顶向下的模型中,也可以直接使用函数的返回值。
对于无栈协程,如果全部都手写状态机的话,其实都是重复的工作,而且容易出错。所以,现代语言通常都会加入对于协程的原生支持,由编译器来生成这些状态机代码。对于有栈协程,则一般是通过库函数的方式来实现的。
如果是无栈协程,那一般在代码里会有 async
或者 await
这样的关键词,await
就是用来告诉编译器划分状态机的位置。而有栈协程,在编写程序的时候感知不到,只是需要调用协程库提供的协程创建函数,在可能阻塞的地方,协程库都会拦截系统调用,转移控制权。
例如,像 JavaScript、Python、C++、Rust 这些使用了 await
关键词的,就是使用的无栈协程,而像 Go 语言,用户像普通函数一样调用例如 read
这样的函数,只是需要使用 go
关键词来创建协程,就是有栈协程了。
通常来讲,无栈协程比有栈协程开销更小,性能更高,而非对称协程和对称协程在功能上没有区别。
无栈协程在有 GC 的语言里实现比较简单,而在没有 GC 的语言,例如 C++ 和 Rust 中,就需要注意非常多的细节。
C++
C++ 中需要实现的东西:
template<class T>
struct Task {
struct promise_type {
ReturnObject get_return_object() { return ReturnObject(coroutine_handle<promise_type>::from_promise(*this)); }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
std::suspend_always yield_value(unsigned value) {
value_ = value;
return {};
}
void unhandled_exception() {}
void return_value(T&& value) {
value_ = value;
}
// void return_void() {}
};
std::coroutine_handle<promise_type> h_;
T value_;
ReturnObject(std::coroutine_handle<> h) : h_(h) {};
operator std::coroutine_handle<promise_type>() const { return h_; }
};
template<class T>
struct Awaiter {
std::coroutine_handle<> *hp_;
// optional
Awaiter operator co_await();
bool await_ready() const noexcept { return false; }
bool await_suspend(std::coroutine_handle<> h) { *hp_ = h; return false; }
T await_resume() const noexcept {}
};
一共九个方法需要实现,看的头都大了。先来看一个简单的例子感受一下上面的代码分别有什么用:
Task counter() {
for (unsigned i = 0; i < 3; ++i)
co_yield i;
co_return 42;
}
int main() {
auto h = counter().h_;
auto &promise = h.promise();
while (!h.done()) { // Do NOT use while(h) (which checks h non-NULL)
std::cout << "counter: " << promise.value_ << std::endl;
h();
}
h.destroy();
return 0;
}
这是一个简单的计数器协程,每次运行都会递增值。前面的 Task
就代表一个协程,里面可以保存返回值,而 promise_type
就是协程对应的 Promise。Promise 和 Future 一般是成对出现的,Promise 用来写入一个未来的值,而 Future 则用来读取一个未来的值。所以,在上面的 promise_type
中,yield_value
和 return_value
就是用来写入值的方法,而我们可以使用 get
方法得到返回的值。一般来说,Future 的值可以有阻塞的和非阻塞的读取的方法,非阻塞就是无论 Promise 有没有已经写入值,都立刻返回,而阻塞的则是等到 Promise 写入值之后再返回。计数器在 Promise 中设置返回值,然后我们就可以在 Task 中读取这个值。C++ 标准说协程的代码相当于:
{
promise-type promise promise-constructor-arguments ;
try {
co_await promise.initial_suspend() ;
function-body
} catch ( ... ) {
if (!initial-await-resume-called)
throw ;
promise.unhandled_exception() ;
}
final-suspend :
co_await promise.final_suspend() ;
}
那么 co_await
又是如何驱动协程的运行呢?再来看一个复杂一点的例子:
#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
auto switch_to_new_thread(std::jthread& out) {
struct awaitable {
std::jthread* p_out;
awaitable(std::jthread* p_out) : p_out(p_out) {
std::cout << __FUNCTION__ << std::endl;
}
~awaitable() {
std::cout << __FUNCTION__ << std::endl;
}
bool await_ready() {
std::cout << __FUNCTION__ << std::endl;
return false;
}
void await_suspend(std::coroutine_handle<> h) {
std::cout << __FUNCTION__ << std::endl;
std::jthread& out = *p_out;
if (out.joinable())
throw std::runtime_error("Output jthread parameter not empty");
out = std::jthread([h] { h.resume(); });
// Potential undefined behavior: accessing potentially destroyed *this
// std::cout << "New thread ID: " << p_out->get_id() << '\n';
std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
}
void await_resume() { std::cout << __FUNCTION__ << std::endl; }
};
return awaitable{&out};
}
struct task {
struct promise_type {
task get_return_object() {
std::cout << __FUNCTION__ << std::endl;
return {};
}
std::suspend_never initial_suspend() {
std::cout << __FUNCTION__ << std::endl;
return {};
}
std::suspend_never final_suspend() noexcept {
std::cout << __FUNCTION__ << std::endl;
return {};
}
void return_void() { std::cout << __FUNCTION__ << std::endl; }
void unhandled_exception() {}
};
task() {
std::cout << __FUNCTION__ << std::endl;
}
~task() {
std::cout << __FUNCTION__ << std::endl;
}
};
task resuming_on_new_thread(std::jthread& out) {
std::cout << "Coroutine started on thread: " << std::this_thread::get_id()
<< '\n';
co_await switch_to_new_thread(out);
// awaiter destroyed here
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id()
<< '\n';
}
int main() {
std::jthread out;
auto t = resuming_on_new_thread(out);
std::cout << "Main finished" << std::endl;
}
使用支持 C++ 20 标准的编译器编译运行之后,可以得到以下的输出:
task::promise_type::get_return_object
task::task
task::promise_type::initial_suspend
Coroutine started on thread: 3804
switch_to_new_thread::awaitable::awaitable
switch_to_new_thread::awaitable::await_ready
switch_to_new_thread::awaitable::await_suspend
New thread ID: 2556
Main finished
task::~task
switch_to_new_thread::awaitable::await_resume
switch_to_new_thread::awaitable::~awaitable
Coroutine resumed on thread: 2556
task::promise_type::return_void
task::promise_type::final_suspend
这个例子里,resuming_on_new_thread
是一个协程,而 switch_to_new_thread
则是协程里 co_await
的一个函数,在 co_await
这个函数之后,协程剩余的部分就会在另一个线程中执行。
co_await
这个是一个操作符,在调用之后,首先会调用函数,得到一个 Awaitable
,然后,调用这个 Awaitable
的 co_await
操作符,获取一个 awaiter
,然后执行 awaiter
的 await_ready
,判断是否需要让出控制权,如果需要的话,就调用 await_suspend
,暂停执行,然后返回。在可以继续执行之后,会调用 await_resume
,获取返回值。用伪代码来描述就是:
auto awaiter = awaitable.operator co_await();
if (!awaiter.await_ready()) {
awaiter.await_suspend(current_coroutine_handle);
continue return;
}
auto value = awaiter.await_resume();
可以看到 await_suspend
这个方法,接受了一个 coroutine_handle
,并且在新开的线程中执行了。这个 coroutine_handle
,就是编译器打包好的,协程的状态以及还没有执行完成的部分,只要调用这个对象,就可以继续执行协程了。
协程能够运行起来,关键是 await_suspend
这个方法,这个方法需要负责设置回调之类的操作,并且在回调中,调用传进来的 Callable,这样协程才能继续运行,否则就卡住了。例如,在服务器程序中,调用 await_suspend
的时候,就将协程注册到 epoll 的 fd 列表里,然后设置在下一次 I/O 发生的时候调用传入的 coroutine_handle
以继续执行暂停的协程。
非常奇怪的是,await_suspend
的返回值类型会影响这个函数实际的行为。如果这个函数返回的是 void
,那么当前的协程(调用 co_await
的协程)会立即返回;如果这个协程返回的是 bool
,那么在返回 true
的时候,当前协程会立即返回,而如果返回 false
表明当前协程可以继续执行,不会返回;而如果这个协程返回的同样是一个 coroutine_handle
,那么就执行这个对象。
C++ 中如果想要嵌套调用协程,是需要自己在 await_suspend
里保存并调用 Callable 的:
struct task<T>::promise_type {
std::coroutine_handle<> continuation;
auto final_suspend() noexcept {
struct awaiter {
auto await_suspend(std::coroutine_handle suspended) {
if (suspended.promise().continuation)
return suspended.promise().continuation.resume();
else
return std::noop_coroutine{};
}
};
return awaiter{};
}
}
这里的 final_suspend
是看看当前协程在运行结束返回时,是否有需要返回的协程,有的话就返回到上一个协程,否则就啥都不做,结束执行。
auto task<T>::operator co_await() const {
struct awaiter {
std::coroutine_handle<promise_type> handle;
auto await_suspend(std::coroutine_handle<> suspended) {
handle.promise().continuation = suspended;
return handle;
}
};
return awaiter{*_coro};
}
这里的 operator co_await
函数则是为了能在一个 task
里面 co_await foo()
,其中 foo()
返回的是另一个 task
,可以看到这个函数做的东西很简单,就是设置了一下被调用的协程的返回协程为当前协程。
有了这些基础之后,只需要对系统的异步 API 进行简单的封装就可以使用协程编程了。例如 co_await read()
可以是在 read
的 await_suspend
中将连接的 fd 注册到 epoll fd 里,然后把协程的调用函数作为回调函数保存起来。
Rust
Rust 则需要实现 Future
这个 trait:
enum Poll { Ready(T), Pending }
trait Future {
type Output;
fn poll(
// Note the change from `&mut self` to `Pin<&mut Self>`:
self: Pin<&mut Self>,
// and the change from `wake: fn()` to `cx: &mut Context<'_>`:
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}
只有一个方法:poll()
,返回 Poll::Pending
表示协程还没有结束,这时候需要负责设置回调,获取 Context
里的 waker
,保存起来,然后在回调处理函数中调用 waker.wake()
,将协程重新加入调度队列中,而返回 Poll::Ready(())
表示协程完成了。
struct Context {
// ...
};
impl Context {
fn waker(&self) -> &Waker;
}
struct Waker {
// ...
};
impl Waker {
fn wake(self);
fn wake_by_ref(&self);
}
当前,Context
的唯一作用就是保存 Waker
对象,而 Waker
唯一作用就是用来 wake
,至于协程所需要的其他信息,可以保存在 Future
对象中。
pub fn waker_fn<F: Fn() + Send + Sync + 'static>(f: F) -> Waker {
let raw = Arc::into_raw(Arc::new(f)) as *const ();
let vtable = &Helper::<F>::VTABLE;
unsafe { Waker::from_raw(RawWaker::new(raw, vtable)) }
}
struct Helper<F>(F);
impl<F: Fn() + Send + Sync + 'static> Helper<F> {
const VTABLE: RawWakerVTable = RawWakerVTable::new(
Self::clone_waker,
Self::wake,
Self::wake_by_ref,
Self::drop_waker,
);
unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const F));
mem::forget(arc.clone());
RawWaker::new(ptr, &Self::VTABLE)
}
unsafe fn wake(ptr: *const ()) {
let arc = Arc::from_raw(ptr as *const F);
(arc)();
}
unsafe fn wake_by_ref(ptr: *const ()) {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const F));
(arc)();
}
unsafe fn drop_waker(ptr: *const ()) {
drop(Arc::from_raw(ptr as *const F));
}
}
值得一提的是,Waker
中保存的是一个 RawWaker
对象,这个对象需要实现 clone_waker
、wake
、wake_by_ref
、drop_waker
这四个方法。为了能够使 Waker
可以静态分配,这四个方法并不是通过实现 trait
的方式来实现的,而是通过手动构造虚函数表来实现的。上面就是一个简单的调用一个函数的 RawWaker
实现。
和 C++ 不同,Rust 中的协程,需要在函数声明前添加 async
关键词,而函数的返回类型并不是 Future,而是实际返回值的类型。
async fn write_header(conn: Connection) -> Result<usize>;
对比
Rust 中嵌套调用协程是编译器帮忙生成的,不需要自己做额外的操作。函数的签名需要加特殊的关键词,返回值的类型是实际返回值的类型。
C++ 的接口显然更复杂,但是它的模型是“自底向上”的,只有当协程能够继续运行的时候才会调用继续函数。函数不需要加特殊的关键词,而是返回一个特殊的包装了返回值类型的协程类型。C++ 也使得你对执行过程有更精细的控制权。
而 Rust 接口简洁不少,基于轮询,模型是“自顶向下”的,需要有一个执行器负责调用 Future
的 poll
接口,整个状态机才能推进,也就是说,Rust 的协程有时可以被不必要地轮询。
另外,Rust 中的 Future 状态类型,在编译的时候就已经确定了,可以当成普通对象一样放在栈上,但也造成有些时候一些变量保存过久,浪费空间;而 C++ 中的 coroutine_handle
则被类型擦除了,编译的时候无法知道具体的类型,也无法知道状态的具体大小,只有在优化结束之后才能确定,这样导致了协程的状态必须分配堆内存,好处是 C++ 的协程,仅会保存需要的状态,比如一个变量只在前面部分使用了,就没有必要保存到后面了。
事件循环逻辑
前面提到,协程在暂停之后,需要有其他代码主动调用协程函数才能让协程继续运行。而所有的协程,通常都是在程序的主函数中,通过一个事件循环来不停调用协程函数来驱动它们的。也就是上面的 HTTP 服务例子里的:
for (/* each event p */) {
p->data->state->event = p->event;
p->data->callback(p->data->state);
}
当然,要想正确的设置回调函数,需要追踪每一个 fd,这也是为什么在使用异步编程库的时候,需要使用库提供的 TCP 函数和连接类型等,因为这样它们最终负责调用系统函数的时候才能区分不同的连接上的不同事件。
对于事件循环,也有多种模式。一种是,类似于 Go 语言和 tokio 运行时,不同的协程可以在不同的操作系统线程上执行,这样的执行模型就像操作系统一样了,不同的进程可以在不同的 CPU 核心上执行,而且在一些核心空闲的时候,可以从其他核心那里“窃取”任务执行,提高 CPU 的利用率。
而另一种执行模型是 Loop Per Core,也就是每个核心单独执行一个单线程的事件循环,这样做的好处是程序的局部性大大提升,而且避免了协程在多核之间迁移执行时的同步开销(例如任务队列需要加锁),协程使用的数据结构也不必考虑多核同步的问题。相比于多核模型,执行效率会更高。但是如果出现了任务负载不均衡的现象,那么这种模型的 CPU 利用率就会下降,此时性能就没有多核模型好了。
所以,使用哪一种执行模型还是取决于应用程序本身的负载特性,并没有说哪一种模型有绝对的优势。
生命周期管理
和多线程编程一样,在使用协程经常犯的一个错误就是对象的生命周期问题。因为协程可以被暂停,然后再在之后继续执行,如果协程保存了对一个对象的引用或者指针,那么必须确保这个对象在协程执行完成之前都一直存活,否则将发生内存错误。在 GC 语言中,这个问题不存在,编程非常简单。而在 C++ 中,可以使用智能指针 unique_ptr
或者 shared_ptr
等来管理对象的生命周期,在 Rust 中,编译器的 Ownership 模型可以帮助程序员发现生命周期问题。
取消
另一个复杂的问题就是怎么样取消协程。例如,在一个耗时的操作中,可能用户已经等得不耐烦了,取消了请求,也有可能超时了,需要终止请求防止过载。
协程的取消难点在于需要追踪一切涉及到的资源,并在取消的时候正确地释放。例如,在 Go 语言中,所有需要取消的协程,参数一定会带一个 ctx context.Context
,并且在所有的异步操作中,同时 select
ctx.Done()
,在取消的之后,ctx
则会一路传播信号,通知涉及到的协程清理资源并退出。
而在 Rust 和 C++ 中则没有约定俗成的取消方法,但是原理也是类似的。在每一个 await
的调用的时候,都可能是这个协程的一个取消点。如果想要处理取消,那么我们可以在设置回调的时候,同时设置一个监听取消的回调,这样在取消信号发生的时候,协程就可以被唤醒,处理异常情况了。在 Rust 的 tokio
中提供了一个 select!
宏,用法和 Go 语言中的类似,原理就是轮流或者随机查询 select!
的每一个 Future,一旦其中一个 Ready
,就执行对应的代码。
锁
在实际应用场景中,一个协程在被暂停之后,可能会在另一个线程被调度执行,而普通的操作系统的锁,有可能需要解锁的线程和加锁的线程是同一个,那么如果在协程中一个锁加锁的时间可能横跨多个 await
点的时候,就不能直接使用普通的锁了,而是要使用支持追踪异步调用信息的异步锁,这样才能在正确的线程释放,避免错误。当然,异步锁的开销比普通的锁的开销要更大,如果加锁的时间没有横跨 await
,那么就不需要使用异步锁。
参考资料
https://www.jonathanmueller.dev/talk/accu2022/
https://www.scs.stanford.edu/~dm/blog/c++-coroutines.html
https://lewissbaker.github.io/2017/09/25/coroutine-theory
https://rust-lang.github.io/async-book/01_getting_started/01_chapter.html