使用协程提高流水线利用率

CppCon 2018: G. Nishanov “Nano-coroutines to the Rescue! (Using Coroutines TS, of Course)” 的笔记。

无栈协程切换的开销很小,几乎等于一次函数调用,而在 LLC Miss 的时候,访问内存通常需要 60 ns 以上的延迟,而如果访存命令后的命令都依赖于访问的数据,那么此时 CPU 流水线就被阻塞了,无法充分并行执行指令。而 CPU 通常提供了 prefetch 指令,内存系统也提供了一定程度的并行度,可以先发出预取命令,然后切换到其他的请求执行,使得多个访存指令重叠,充分利用流水线。这时候协程也可以看成是软件实现的指令级并行调度技术。而如果使用线程切换,那线程切换的开销会比请求内存的开销还大,所以不太合适。而无栈协程的低开销则可以完美用来重叠访存和计算。

总体的思路是,把 prefetch 当成一个类似 socket 编程中的异步 read 调用,发出 prefetch 指令之后就暂停当前的协程,切换到别的协程运行。在并发请求足够多的时候,同时可以发出足够多的 prefetch 指令,而且,大概率在切换到一个需要真正访问数据的协程的时候,数据已经在 Cache 中,此时访问延迟将大大降低。

template <typename Iterator, typename Value, typename Found, typename NotFound>
auto binary_search(Iterator first, Iterator last, Value val, Found on_found, NotFound on_not_found) -> root_task {
  auto len = last - first;
  while (len > 0) {
    auto half = len / 2;
    auto middle = first + half;
    auto middle_key = co_await prefetch(*middle);
    if (middle_key < val) {
      first = middle + 1;
      len = len - half -1;
    } else {
      len = half;
    }
    if (middle_key == half) {
      co_return on_found(val, middle);
    }
  }
  co_return on_not_found(val);
}

上面是一个二分查找的例子,在数组很大的时候,可能不能完全放入 Cache 中,而且 middle 的访问模式接近于随机。这时候,如果并发请求很多(比如在一个数据库中的 Join 操作),那么就可以先发出 prefetch 指令,然后切换到另一个协程。而这个 prefetch 函数也很简单,只需要返回一个 Awaitable,在暂停协程的时候发出真正的 prefetch 指令,然后在协程恢复运行的时候发起读取内存的请求并返回值即可。

template <typename T>
struct prefetch_awaitable {
  T& value;
  prefetch_awaitable(T& value) : value(value) {}

  bool await_ready() { return false; }

  auto await_suspend(coroutine_handle<> h) {
    _mm_prefetch(static_cast<char const *>(std::addressof(value))), _MM_HINT_NTA);
    scheduler.push_back(h);
    return scheduler.pop_front();
  }

  T& await_resume() { return value; }
};

然后,对于一大堆的请求,我们生成很多协程,然后运行即可,当然,硬件能够支持的并发访存请求有限,我们需要限制并发数,否则可能导致 thrashing:

void parallel_lookup(std::vector<int> const& arr, std::vector<int> const& lookups, int concurrency) {
  size_t found = 0;
  size_t not_found = 0;

  throttler t(concurrency);
  for (auto key : lookups) {
    t.spawn(binary_search(v.begin(), v.end(), key, [&](auto) { ++found; }), [&](auto) { ++not_found; });
  }
  t.join();
}

接下来还是需要解决怎么让这些协程运行起来的问题,也就是 throttler 怎么实现。总的来说就是在生成任务的时候查看并行的协程是否达到了限制,有的话就优先执行队列里面的,否则就加入任务队列。

struct throttler {
  size_t limit;
  explicit throttler(size_t limit) : limit(limit) {}

  void spawn(root_task t) {
    if (limit == 0) {
      scheduler.pop_front().resume();
    }
    auto h = t.set_owner(this);
    scheduler.push_back(h);
    --limit;
  }

  void on_task_done() { ++limit; }
  void join() { scheduler.run(); }
  ~throttler() { join(); }
};

为了能够在运行结束的时候反馈给 throttler,协程的 task 类型需要提供接口设置对应的 throttler。而为了在生成大量协程的时候提高效率,可以使用自定义的内存分配器提高内存分配效率。

struct root_task {
  struct promise_type {
    throttler *owner = nullptr;
    suspend_never final_suspend() { owner->on_task_done(); return {}; }
    void *operator new(size_t sz) { return allocator.alloc(sz); }
    void operator delete(void *p, size_t sz) { allocator.free(p, sz); }
    /* ... */
  };

  auto set_owner(throttler *owner) {
    auto result = h;
    h.promise().owner = owner;
    h = nullptr;
    return result;
  }

  ~root_task() { if (h) h.destroy(); }
private:
  coroutine_handle<promise_type> h;
};

上面的代码在演讲者的服务器上,每次查找平均只要 7.56 ns,达到了 1.46 的 IPC,效率非常高。他还和自己手写状态机的代码进行了对比,手写状态机的实现平均每次查找需要 10 ns。他觉得这可能是由于编译器在编译协程的时候,会先进行一次函数优化,然后再编译成状态机,在编译为状态机后,针对每一个状态再进行优化。而对于手写的状态机,编译器可能就比较难优化。所以,协程甚至有“Negative-cost Abstraction” 的效果。

这个技术有一个缺点,那就是并发度应该设置多少需要手动调整, 和硬件支持的并发度相关,需要通过不断测试调参数。

参考资料

https://www.youtube.com/watch?v=j9tlJAqMV7U

视频里提到的两篇论文:

Interleaving with coroutines: a practical approach for robust index joins (VLDB 2018)

Exploiting coroutines to attack the "killer nanoseconds" (VLDB 2017)

VLDB 2021 上有个利用了相同思想的 CoroBase:GitHub – sfu-dis/corobase: Coroutine-Oriented Main-Memory Database Engine, VLDB 2021.