key: 79 title: 用 C 语言实现协程
协程与线程(进程)1都是模拟多任务(routine)并发执行的软件实现。操作系统线程在一个 CPU 线程中模拟并发执行多个任务,而协程(coroutine)在一个操作系统线程中模拟并发执行多个任务。 因此协程也被称为“用户态线程”、“轻量级线程”。之所以说“模拟并发执行”,是因为任务实际并没有同时运行,而是通过来回切换实现的。
线程切换由操作系统负责,而协程切换通常由程序员直接控制。程序员通过 resume/yield 操作控制协程切换。resume 操作唤醒一个指定协程;yield 操作挂起当前协程,切换回唤醒它的协程。如果你用过 Lua 的协程,就会很熟悉这套流程。不过 Lua 是基于 Lua 虚拟机(LVM)的脚本语言,它只需要 LVM 中执行“虚拟的”上下文切换。本文介绍如何用 C 语言(和一点点汇编)实现一个 native 协程,执行真正的上下文切换。这个实现非常简单,总共不到 200 行代码。我参考了 libco 的实现。本文的完整代码见 toy-coroutine。
所谓的上下文,就是一段程序之前做了什么,接下来要做什么,以及做事情过程的中间产物。例如我们有函数 f
。f
需要知道下一个指令是什么才能接着往下执行,便是“接下来要做什么”。f
函数还需要知道之前是谁调用了它,以便把结果返回给调用者,便是“之前做了什么”。在 f
函数执行过程中,局部变量要存好(不能被写坏),接下来的指令才能正确执行。这便是“过程的中间产物”。
在 x86-64 下,“之前做了什么” 存储在栈里。函数调用会执行 call
指令,把当前函数的下一个指令的地址压入栈顶,然后再跳转到被调用函数。被调用函数返回时执行 ret
指令,从栈顶取出调用者的返回点地址,然后跳转到返回点。因此栈上存有所有前序调用者的返回点地址。
函数的局部变量通常储存在 16 个通用寄存器中,如果寄存器不够用,就存在栈里(只要在函数返回前将它们弹出,让栈顶是返回点地址即可)。函数调用的参数也是局部变量,存在约定的 6 个通用寄存器里。如果不够用,也存在栈里。
至于“接下来要做什么”,其实也在栈里。上下文切换不过是调用一个函数,调用者在调用它之前已经把下一个指令的地址压栈了。当上下文切换函数返回,ret
指令自然会跳转到接下来要执行的指令。所以上下文就是 16 个通用寄存器 + 栈。
所有的协程共享同一个 CPU,也就共享同样的 16 个通用寄存器。如果我们要把 A 协程切换成 B 协程,就要把当前 16 个通用寄存器的值存在 A 协程的数据结构里;然后再从 B 协程的数据结构里取出 B 协程的寄存器的值,写回通用寄存器中。我们还要处理栈。不过栈与寄存器不同,x86-64 规定 %rsp
寄存器(也是通用寄存器之一)存的值便是栈顶的地址。不同的协程不必共享栈,它们可以分配各自的栈,上下文切换时将 %rsp
指向各自的栈顶即可。
实际上我们不必存储全部的 16 个通用寄存器,它们有些是暂存寄存器(Scratch Registers),是允许被写坏的。这些寄存器的值可能在执行一次函数调用后就变了(被被调用函数写坏的)。编译器也不会在暂存寄存器里存储函数调用后还要用的值。参考 libco 的实现,我们存储 13 个寄存器:
enum {
CO_R15 = 0,
CO_R14,
CO_R13,
CO_R12,
CO_R9,
CO_R8,
CO_RBP,
CO_RDI,
CO_RSI,
CO_RDX,
CO_RCX,
CO_RBX,
CO_RSP,
};
struct co_context {
void *regs[13];
};
有些寄存器有特殊的用途。这里我们只需要知道这三个:
%rsp
: 栈寄存器,指向栈顶。%rdi
, %rsi
: 第一参数寄存器和第二参数寄存器,调用函数前将第一个参数存在 %rdi
里,第二个存在 %rsi
里(剩下的四个依次是 %rdx
, %rcx
, %r8
, %r9
, 不过这里我们用不上),然后执行 call
指令。接着我们定义一个函数做上下文切换,把当前通用寄存器的值保存在 curr
中,再把 next
中保存的寄存器的值写回各个通用寄存器。
extern void co_ctx_swap(struct co_context *curr, struct co_context *next);
Emm,这个函数没法用 C 语言实现,我们得用到一点点汇编了。其实非常简单,我们只需要用 movq
指令存取寄存器。代码如下:
.globl co_ctx_swap
co_ctx_swap:
movq %rsp, 96(%rdi)
movq %rbx, 88(%rdi)
movq %rcx, 80(%rdi)
movq %rdx, 72(%rdi)
movq %rsi, 64(%rdi)
movq %rdi, 56(%rdi)
movq %rbp, 48(%rdi)
movq %r8, 40(%rdi)
movq %r9, 32(%rdi)
movq %r12, 24(%rdi)
movq %r13, 16(%rdi)
movq %r14, 8(%rdi)
movq %r15, (%rdi)
movq (%rsi), %r15
movq 8(%rsi), %r14
movq 16(%rsi), %r13
movq 24(%rsi), %r12
movq 32(%rsi), %r9
movq 40(%rsi), %r8
movq 48(%rsi), %rbp
movq 56(%rsi), %rdi
movq 72(%rsi), %rdx
movq 80(%rsi), %rcx
movq 88(%rsi), %rbx
movq 96(%rsi), %rsp
movq 64(%rsi), %rsi
ret
不懂汇编没关系(其实我也不是很懂),只需要知道 movq
指令将第一个操作数的值复制到第二个操作数中。%
开头的标识符为寄存器。%rsp
这样不带括号的,表示存取寄存器的值。(%rdi)
这种带括号的,表示去内存里存取地址为 %rdi
的数据。如果括号前面有数字几,就表示这个地址要加几。movq
存取数据的长度为 8 字节,寄存器的长度也是 8 字节。
还记得前面说过,%rdi
是第一个参数,%rsi
是第二个参数吗?所以 %rdi
就是 struct co_context *curr
。96(%rdi)
就是 curr->regs[12]
,88(%rdi)
就是 curr->regs[11]
,……,(%rdi)
就是 curr->regs[0]
。上半部分把 13 个通用寄存器的值全部存到了 curr
里。同理 %rsi
就是 struct co_context *next
。(%rsi)
就是 next->regs[0]
、8(%rsi)
就是 next->regs[1]
,依次类推。于是下半部分把 next
中保存的寄存器的值写回寄存器中。最后执行 ret
指令返回。
注意 29 行写入 %rsp
的值就是上次挂起时第 4 行保存的值,这个值我们原封未动,也没有做任何栈操作。因此最后 ret
返回时,栈顶就是 co_ctx_swap
的调用者设置的返回点地址。一个协程调用 co_ctx_swap
将自己挂起,便陷入沉睡。当 co_ctx_swap
返回之时,便是其它协程调用 co_ctx_swap
将它唤醒之时。此时寄存器被还原、栈被还原、也回到了返回点。它便知道自己之前做了什么、接下来要做什么、中间产物是怎样的。
struct co_context
仅存储协程的上下文。我们还需要维护给协程分配的栈空间、记录入口函数地址等。我们定义 struct coroutine
表示协程对象。
typedef void (*start_coroutine)();
struct coroutine {
struct co_context ctx;
char *stack;
size_t stack_size;
start_coroutine start;
};
struct coroutine *co_new(start_coroutine start, size_t stack_size) {
struct coroutine *co = malloc(sizeof(struct coroutine));
memset(&co->ctx, 0, sizeof(co->ctx));
if (stack_size) {
co->stack = malloc(stack_size);
} else {
co->stack = NULL;
}
co->stack_size = stack_size;
co->start = start;
return co;
}
void co_free(struct coroutine *co) {
free(co->stack);
free(co);
}
co_new
创建一个新协程,接受两个参数:start
协程入口函数指针,和 stack_size
栈大小;这类似于 pthread_create
。co_new
分配协程的栈空间并设置好各个字段。
要把主线程切换到我们新创建的协程,这里有两个问题。一是主线程并不是一个协程,新协程跟谁交换上下文呢?二是新创建的协程的上下文是空的(19 行),切换过去肯定跑不起来。
第一个问题很简单:创建一个就行。因为主线程已经跑起来了,要切换到新协程,主线程只需要一个“容器”把它的上下文装进去。直接执行 main = co_new(NULL, 0)
创建主协程,调用 co_ctx_swap(&main->ctx, &new->ctx)
便可切换到新协程。此时主线(协)程的上下文保存在 main
中,当新协程反向调用 co_ctx_swap(&new->ctx, &main->ctx)
,便又切换回主协程了。
为了解决第二个问题,我们需要对新协程初始化。co_ctx_swap
将新协程的上下文复制到 CPU 后,执行 ret
返回栈顶记录的地址。因此我们要将栈顶置为协程入口函数的地址,这样在 co_ctx_swap
返回后便跳转到协程入口函数了。
void co_ctx_make(struct coroutine *co) {
char *sp = co->stack + co->stack_size - sizeof(void*);
sp = (char*)((intptr_t)sp & -16LL);
*(void**)sp = (void*)co->start;
co->ctx.regs[CO_RSP] = sp;
}
因为 x86 的栈是从高地址向低地址增长的,初始栈为空,所以栈顶应该指向 co->stack
的最末尾。又因为 x86 的栈必须 16 字节对齐,所以执行 (intptr_t)sp & -16LL
(-16 低 4 位为 0,其它都为 1)得到栈顶地址。然后将栈顶置为 co->start
,也就是入口函数的地址。最后我们把保存的 rsp 寄存器的值设置为栈顶地址,这个值会在 co_ctx_swap
被复制到寄存器 %rsp
中。
现在我们的协程已经可以跑起来了。写一个简单的例子试试:
struct coroutine *main_co, *new_co;
void foo() {
printf("here is the new coroutine\n");
co_ctx_swap(&new_co->ctx, &main_co->ctx);
printf("new coroutine resumed\n");
co_ctx_swap(&new_co->ctx, &main_co->ctx);
}
int main() {
main_co = co_new(NULL, 0);
new_co = co_new(foo, 1024 * 1024);
co_ctx_make(new_co);
co_ctx_swap(&main_co->ctx, &new_co->ctx);
printf("main coroutine here\n");
co_ctx_swap(&main_co->ctx, &new_co->ctx);
return 0;
}
把上面所有的 C 代码复制到文件 co.c
,汇编代码存为 co.S
,然后执行 gcc -o co co.c co.S
编译,运行试试!
现在的协程虽然可以跑,但是使用起来很不方便,需要手动交换上下文,也容易出错。我们需要实现 resume/yield 操作。resume 操作唤醒指定协程,也就是当前协程与指定协程交换。yield 挂起当前协程,将当前协程与上次唤醒它的协程交换。因此我们需要记录当前运行的协程;而对于每个协程,要保存唤醒它的协程的指针。
协程切换要遵循这几条规则:
基于此,我们给协程定义五个状态:
enum {
CO_STATUS_INIT, // 初始状态
CO_STATUS_PENDING, // 执行 yield 操作进入的挂起状态
CO_STATUS_NORMAL, // 执行 resume 操作进入的挂起状态
CO_STATUS_RUNNING, // 运行状态
CO_STATUS_DEAD, // 死亡状态
};
我们使用全局变量 g_curr_co
记录当前协程。每个协程还要记录当前状态和唤醒自己的协程。
struct coroutine {
struct co_context ctx;
char *stack;
size_t stack_size;
int status; // 协程状态
struct coroutine *prev; // 唤醒者
start_coroutine start;
};
struct coroutine *g_curr_co = NULL; // 当前协程
struct coroutine *co_new(start_coroutine start, size_t stack_size) {
struct coroutine *co = malloc(sizeof(struct coroutine));
...
co->status = CO_STATUS_INIT;
co->prev = NULL;
return co;
}
void check_init() {
if (!g_curr_co) { // 初始化主协程
g_curr_co = co_new(NULL, 0);
g_curr_co->status = CO_STATUS_RUNNING; // 主协程状态初始为 RUNNING
}
}
接着实现 resume 操作和 yield 操作。根据上面描述的思路,实现起来很容易。
int co_resume(struct coroutine *next) {
check_init();
switch (next->status) {
case CO_STATUS_INIT: // 初始状态,需要执行 co_ctx_make 初始化
co_ctx_make(next);
case CO_STATUS_PENDING: // 只有处于 INIT 和 PENDING 状态的协程可以被 resume 唤醒
break;
default:
return -1;
}
struct coroutine *curr = g_curr_co;
g_curr_co = next; // g_curr_co 指向新协程
next->prev = curr; // 设置新协程的唤醒者为当前协程
curr->status = CO_STATUS_NORMAL; // 当前协程进入 NORMAL 状态
next->status = CO_STATUS_RUNNING; // 新协程进入 RUNNING 状态
co_ctx_swap(&curr->ctx, &next->ctx);
return 0;
}
int co_yield() {
check_init();
struct coroutine *curr = g_curr_co;
struct coroutine *prev = curr->prev;
if (!prev) { // 没有唤醒者,不能执行 yield 操作
return -1;
}
g_curr_co = prev; // g_curr_co 指向当前协程的唤醒者
if (curr->status != CO_STATUS_DEAD) {
curr->status = CO_STATUS_PENDING; // 当前协程进入 PENDING 状态
}
prev->status = CO_STATUS_RUNNING; // 唤醒者进入 RUNNING 状态
co_ctx_swap(&curr->ctx, &prev->ctx);
return 0;
}
除主协程外的协程结束运行时一定要执行 yield 操作将自己切出,否则它不知道该返回到哪儿。为了不让使用者手动执行这个操作,我们将协程入口函数封装一层。
void co_entrance(struct coroutine *co) {
co->start(); // 执行入口函数
co->status = CO_STATUS_DEAD;
co_yield(); // 已经置为 DEAD 状态了,切出后不会有人唤醒它了。这里 co_yield 永远不会返回
// 不会走到这里来
}
void co_ctx_make(struct coroutine *co) {
char *sp = co->stack + co->stack_size - sizeof(void*);
sp = (char*)((intptr_t)sp & -16LL);
*(void**)sp = (void*)co_entrance; // 设置入口地址为 co_entrance
co->ctx.regs[CO_RSP] = sp;
co->ctx.regs[CO_RDI] = co; // rdi 为第一参数寄存器,将它的值置为 co,这样 co_entrance 就能拿到它的参数了
}
这样我们的协程用起来就更方便了:
void foo() {
printf("here is the new coroutine\n");
co_yield();
printf("new coroutine resumed\n");
}
int main() {
struct coroutine *co = co_new(foo, 1024 * 1024);
co_resume(co);
printf("main coroutine here\n");
co_resume(co);
return 0;
}
resume/yield 可以用于传递参数。运行上面的例子,我们发现 co_yield
返回之时便是其它协程调用 co_resume
之时;而 co_resume
返回之时又是其它协程调用 co_yield
之时。因此 resume 操作接受参数,传递给 yield 返回;yield 操作接受参数,传递给 resume 返回。这样方便在协程之间传递数据。
我们在 struct coroutine
中新增一个 data
字段用于传递参数。协程切换时,如果要给目标协程传递参数,就对目标协程的 data
字段赋值。协程切换后,就能从 data
字段中取出上一个协程传递的参数。
struct coroutine {
...
void *data; // 参数
};
int co_resume(struct coroutine *next, void *param, void **result) {
...
next->data = param; // 切换到 next 协程,给 next 协程的参数
co_ctx_swap(&curr->ctx, &next->ctx);
if (result) {
*result = curr->data;
}
return 0;
}
int co_yield(void *result, void **param) {
...
prev->data = result; // 切回 prev 协程,给 prev 协程的结果
co_ctx_swap(&curr->ctx, &prev->ctx);
if (param) {
*param = curr->data; // 其它协程唤醒它时给它的参数
}
return 0;
}
我们重新定义协程入口函数,让它接受参数和返回值。第一次 resume 的参数传给入口函数;入口函数的返回值在最后一次 yield 时传出去。
typedef void *(*start_coroutine)(void *);
static void co_entrance(struct coroutine *co) {
void *result = co->start(co->data);
co->status = CO_STATUS_DEAD;
co_yield(result, NULL); // 协程的最后一次 yield 操作,将入口函数的返回值传出去
}
现在,我们的协程库已经完全实现了。我们可以写一些例子测试一下。比如说我们可以创建一个源源不断生成以 n 开头的自然数的协程:
void *number(void *param) {
intptr_t i = (intptr_t)param;
co_yield(NULL, NULL); // 初始化后立刻 yield
while (1) {
co_yield((void*)i, NULL);
++i;
}
}
int main() {
struct coroutine *num = co_new(number, 1024 * 1024);
co_resume(num, (void*)0, NULL); // 初始化为以 0 开头的自然数流
for (int i = 0; i < 10; ++i) {
intptr_t n;
co_resume(num, NULL, (void**)&n);
printf("%ld ", n);
}
co_free(num);
return 0;
}
运行结果就是
0 1 2 3 4 5 6 7 8 9
这个协程就是一个无限流。我们还可以写一个将两个无限流逐项相加的协程:
void *add(void *param) {
struct coroutine **cov = param, *co0 = cov[0], *co1 = cov[1]; // cov 指向前序协程的栈,这里要立刻将其中的数据取出来
co_yield(NULL, NULL); // 同样,初始化后立刻 yield
while (1) {
intptr_t a, b;
co_resume(co0, NULL, (void**)&a);
co_resume(co1, NULL, (void**)&b);
co_yield((void*)(a + b), NULL);
}
}
然后将 0 开头的自然数流与 1 开头的自然数流逐项相加,得到奇数无限流(0 + 1 = 1, 1 + 2 = 3, 2 + 3 = 5, ...)
int main() {
struct coroutine *num0 = co_new(number, 1024 * 1024);
struct coroutine *num1 = co_new(number, 1024 * 1024);
struct coroutine *co_add = co_new(add, 1024 * 1024);
co_resume(num0, (void*)0, NULL); // 以 0 开头的自然数流
co_resume(num1, (void*)1, NULL); // 以 1 开头的自然数流
struct coroutine *cov[] = {num0, num1};
co_resume(co_add, cov, NULL); // 初始化 add 协程
for (int i = 0; i < 10; ++i) {
intptr_t s;
co_resume(co_add, NULL, (void**)&s);
printf("%ld ", s);
}
co_free(num0), co_free(num1), co_free(co_add);
return 0;
}
运行结果就是
1 3 5 7 9 11 13 15 17 19
当然还有更好玩的。我们可以实现一个斐波那契数列生成器。斐波那契数列可以自我定义:令 f(i) 是以第 i 项开头的斐波那契数列,f(a) + f(b) 表示将两个数列逐项相加,那么如下所示,f(2) = f(0) + f(1)。
0 1 1 2 3 5
+ 1 1 2 3 5 8
-----------------------
1 2 3 5 8 13
所以我们可以这样做
void *fib(void *param) {
co_yield((void*)0, NULL); // 第 0 项
co_yield((void*)1, NULL); // 第 1 项
struct coroutine *f0 = co_new(fib, 1024 * 1024);
struct coroutine *f1 = co_new(fib, 1024 * 1024);
co_resume(f1, NULL, NULL); // f1 先走一步,让它成为以第 1 项开头的斐波那契数列
struct coroutine *co_add = co_new(add, 1024 * 1024);
struct coroutine *cov[] = {f0, f1};
co_resume(co_add, cov, NULL); // 将 f0 与 f1 逐项相加
while (1) {
intptr_t s;
co_resume(co_add, NULL, (void**)&s);
co_yield((void*)s, NULL);
}
}
int main() {
struct coroutine *f = co_new(fib, 1024 * 1024);
for (int i = 0; i < 10; ++i) {
intptr_t s;
co_resume(f, NULL, (void**)&s);
printf("%ld ", s);
}
}
运行结果便是斐波那契数列:
0 1 1 2 3 5 8 13 21 34
不过这种写法会创建大量协程,性能很低。仅供演示<span style="background-color: var(--post-text-color)">~~(炫技)~~</span>。
对于 Linux 内核而言,线程和进程是同一个东西↩