为了适配 async 模型,tokio 重新实现了标准库中的 fs,net,channel 等模块,提供想对应的 async 方法。

本文以 TcpListener 为例,剖析 tokio 如何实现异步版本的 TcpListener,以及如何与 Reactor 整合。

数据结构关系-自顶向下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
pub struct TcpListener {
    io: PollEvented<mio::net::TcpListener>,
}
// 表示一个与 Reactor 关联的 IO 资源
pub(crate) struct PollEvented<E: Source> {
    io: Option<E>,
    registration: Registration,
}
// 表示一个已经注册到了 Reactor Io 资源
// handle 表示注册的 Reactor Driver handle
// shared 表示注册的 Io 相关信息引用,可用了观测 IO 状态
pub(crate) struct Registration {
    /// Handle to the associated driver.
    handle: Handle,

    /// Reference to state stored by the driver.
    shared: slab::Ref<ScheduledIo>,
}
// 注册的 IO source 信息,包括就绪 events等
pub(crate) struct ScheduledIo {
    /// Packs the resource's readiness with the resource's generation.
    readiness: AtomicUsize,

    waiters: Mutex<Waiters>,
}

PollEvented 是一个可注册到 Reactor 的 Source 通用实现。实现了 mio Source Trait 的资源都可以借助此类型实现异步版本的方法。

Registration 是与 Reactor 交互的关键,可通过 handle 访问 Reactor(Driver), 可通过 shared 访问自身注册的事件状态。

accept 实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
    let (mio, addr) = self
    .io
    .registration()
    .async_io(Interest::READABLE, || self.io.accept())
    .await?;

    let stream = TcpStream::new(mio)?;
    Ok((stream, addr))
}

pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
    loop {
        let event = self.readiness(interest).await?;

        match f() {
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                self.clear_readiness(event);
            }
            x => return x,
        }
    }
}

fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
    Readiness {
        scheduled_io: self,
        state: State::Init,
        waiter: UnsafeCell::new(Waiter {
            pointers: linked_list::Pointers::new(),
            waker: None,
            is_ready: false,
            interest,
            _p: PhantomPinned,
        }),
    }
}

Registration 提供的 async_io 抽象了异步 io。通过传入 Interest 和 执行函数,可以搭建需要的异步方法。

ScheduledIo 提供的 readiness_fut 函数,返回实现了 Future 的 Readiness 结构,作为最底层的叶子。

1
2
3
4
5
6
7
8
9
struct Readiness<'a> {
    scheduled_io: &'a ScheduledIo,

    // 记录状态机的状态
    state: State,

    /// Entry in the waiter `LinkedList`.
    waiter: UnsafeCell<Waiter>,
}

那注册的事件 Ready 时,如何唤醒 Readiness Future 呢?

在 poll 无法返回 Ready 时,需要让 Reactor 更新 wake 函数。通过设置 Readiness.waiter 的 waker 函数,并将 waiter 加入ScheduledIo.waiters 中。而 ScheduledIo 是 Reactor 和 PollEvented 共享的,从而实现了 Reactor 中注册的 Resouce 的 waker 函数更新。

在这里, Readiness.waiter 代表了 await 在 readiness 上的上层 Future,即 PollEvented。对 Reactor 来说,即 waite 在相关事件上的某种东西,具体是什么 Reactor 不关心,只在 Ready 时,唤醒所有的 waiter。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    ...
     // Safety: called while locked
    unsafe {
        (*waiter.get()).waker = Some(cx.waker().clone());
    }
    ...
     // Insert the waiter into the linked list
    // safety: pointers from `UnsafeCell` are never null.
    waiters
    .list
    .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
    ...
}

在 Reator 的实现中, Ready 事件 dispatch 实现,会遍历所有 waiter,并调用 wake 函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
    ...
    let io = match resources.get(addr) {
        Some(io) => io,
        None => return,
    };
    ...
    io.wake(ready);
}

fn ScheduledIo::wake0(&self, ready: Ready, shutdown: bool) {
    ...
    wakers.wake_all();
    ...
}

在 Reactor dispatch 时,会通过 set_readiness 函数设置 ScheduledIo 的 readiness。在 PollEvented 第一次被唤醒时,会先检查

ScheduledIo.readiness, 然后才检查 Waiter 自己的 is_ready 状态。因为第一次 poll 时,ScheduledIo.readiness 就是最新的状态。

其他异步 io 实现

fs

并非所有的异步版本 io 资源都需要 Reactor,比如文件系统。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
    let path = path.as_ref().to_owned();
    let std = asyncify(|| StdFile::open(path)).await?;

    Ok(File::from_std(std))
}

pub(crate) async fn asyncify<F, T>(f: F) -> io::Result<T>
where
    F: FnOnce() -> io::Result<T> + Send + 'static,
    T: Send + 'static,
{
    match spawn_blocking(f).await {
        Ok(res) => res,
        Err(_) => Err(io::Error::new(
            io::ErrorKind::Other,
            "background task failed",
        )),
    }
}

通过 spawn_blocking 返回一个 JoinHandle,即返回了一个可以 await 的 future,实现了将 open 操作转移到了 block 线程池,但自身确实非阻塞的。

Channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
pub struct UnboundedReceiver<T> {
    /// The channel receiver
    chan: chan::Rx<T, Semaphore>,
}

pub async fn UnboundedReceiver::recv(&mut self) -> Option<T> {
    use crate::future::poll_fn;

    poll_fn(|cx| self.poll_recv(cx)).await
}

/// Future for the [`poll_fn`] function.
pub struct PollFn<F> {
    f: F,
}

impl<F> Unpin for PollFn<F> {}

/// Creates a new future wrapping around a function returning [`Poll`].
pub fn poll_fn<T, F>(f: F) -> PollFn<F>
where
    F: FnMut(&mut Context<'_>) -> Poll<T>,
{
    PollFn { f }
}

UnboundedReceiver 本身没有实现 Future,但是通过 poll_fn 函数,返回的 PollFn 实现了 Future。

UnboundedReceiver 的成员函数作为了 poll 函数的一部分。即 UnboundedReceiver 自身保存了future 的状态。

在 ready 时,如何通知 await 的 future 呢?

在往 channel push,调用 rx_waker.wake()。同时,在 recv 无法 ready 时,也会更新 waker 函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
impl<T, S> Chan<T, S> {
    fn send(&self, value: T) {
        // Push the value
        self.tx.push(value);

        // Notify the rx task
        self.rx_waker.wake();
    }
    
    pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        ...
        self.inner.rx_waker.register_by_ref(cx.waker());
        ...
    }
}

time

time 是最上层的 driver,本身采用时间轮的实现方式。park 时,会得到最近的一次唤醒时间,然后调用 park_timeout 在下层 driver。 最下层的 driver 是 io driver,io driver park_timeout 时调用 mio 的 poll 方法,并指定 timeout,从而实现 timeout 后唤醒 driver 树。

mio 如何唤醒 driver 树

如果 runtime 添加了新的 task,而此时 driver 还可能处于 park 状态。spawn task 时,会先将任务加入 run_queue, 然后调用 driver 的 unpark。unpark 会一直调用到 io driver 的 unpark,然后调用 mio 的 waker,从 poll 中返回。

底层的 task,mio 事件是何时添加的?

在创建的时候注册到 mio,通过 PollEvented 完成注册。poll 时,只会去检查事件是否 ready,不会再 poll 时注册。

1
2
3
4
pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
   let io = PollEvented::new(listener)?;
   Ok(TcpListener { io })
}