第八章 异步编程

异步编程,顾名思义,就是不同步的编程。在高水平上,异步操作是一种在后台执行的操作,程序不会等待异步操作完成,而是会立即继续下一行代码。如果你还不熟悉异步编程,这个定义可能会觉得不够充分,因为它实际上并没有解释异步编程是什么。要真正理解异步编程模型以及它在 Rust 中的工作原理,我们首先要挖掘出什么是替代方案。也就是说,在理解异步编程模型之前,我们需要先理解同步编程模型。这对于澄清概念和展示使用异步编程的代价都很重要:异步的解决方案并不总是正确的 在这一章的开始,我们将快速浏览一下异步编程作为一个概念的动机;然后我们将深入探讨 Rust 中的异步在引擎盖下的实际工作情况。

异步是怎么回事?(What’s the Deal with Asynchrony?)

在我们讨论同步和异步编程模型的细节之前,我们首先需要快速浏览一下你的计算机在运行你的程序时究竟在做什么。 计算机的速度很快。非常快。事实上,如此之快,以至于它们大部分时间都在等待事情发生。除非你在解压文件、编码音频或计算数字,否则你的 CPU 有可能大部分时间是闲置的,等待操作完成。它在等待一个网络数据包的到来,等待鼠标的移动,等待磁盘完成一些字节的写入,甚至可能只是等待从主内存的读取完成。从 CPU 的角度来看,大多数这样的事件都是在几个月后发生的。当一个事件发生时,CPU 会多运行几条指令,然后再回去等待。看看你的 CPU 利用率--它可能在某个低的个位数,这可能是它大部分时间徘徊的地方。

同步接口

同步接口允许你的程序(或者说,你程序中的单个线程)一次只执行一个操作;每个操作都要等上一个同步操作完成后才能运行。你在野外看到的大多数接口都是同步的:你调用它们,它们去做一些事情,最后它们在操作完成后返回,你的程序可以继续运行。正如我们在本章后面所看到的,其原因是使一个操作异步化需要相当多的额外机制。除非你需要异步的好处,否则坚持同步模式需要更少的繁文缛节。

同步接口隐藏了所有这些等待;应用程序调用一个函数,说 "把这些字节写到这个文件",一段时间后,该函数完成,下一行代码执行。 在幕后,真正发生的是操作系统排队向磁盘写操作,然后让应用程序进入睡眠状态,直到磁盘报告说它已经完成写入。应用程序将此体验为函数需要很长的时间来执行,但实际上它根本没有真正执行,只是在等待。

以这种方式顺序执行操作的接口也常常被称为阻塞,因为接口中的操作必须等待某些外部事件的发生才能取得进展,在该事件发生之前,会阻止进一步的执行。无论你把一个接口称为同步还是阻塞,其基本思想都是一样的:在当前操作完成之前,应用程序不会继续前进。当操作在等待时,应用程序也在等待。

同步接口通常被认为是容易操作和简单推理的,因为你的代码一次只执行一行。但它们也允许应用程序一次只做一件事。这意味着,如果你想让你的程序等待用户输入或网络数据包,你就不走运了,除非你的操作系统专门为此提供了一个操作。 同样,即使你的程序可以在磁盘写文件时做一些其他有用的工作,它也没有这个选择,因为文件写入操作会阻止执行。

多线程

到目前为止,允许并发执行的最常见的解决方案是使用多线程。在多线程程序中,每个线程负责执行一个特定的独立的阻塞操作序列,操作系统在各线程之间进行多路复用,这样,如果任何线程可以取得进展,就会取得进展。如果一个线程阻塞,其他一些线程可能仍然可以运行,因此应用程序可以继续做有用的工作。

通常情况下,这些线程使用同步基元(primitive)(如锁或通道)相互通信,以便应用程序仍然可以协调它们的工作。例如,你可能有一个线程在等待用户输入,一个线程在等待网络数据包,另一个线程在等待这些线程中的任何一个在所有三个线程之间共享的通道上发送一个消息。

多线程为你提供了并发性--在任何时候都可以执行多个独立操作的能力。由运行应用程序的系统(在这种情况下,操作系统)在没有被阻塞的线程中进行选择,并决定下一步执行哪一个。如果一个线程被阻塞了,它可以选择运行另一个可以取得进展的线程来代替。

多线程与阻塞接口相结合,可以让你走得更远,大量可用于生产的软件都是以这种方式构建的。但这种方法也不是没有缺点的。首先,对所有这些线程的跟踪很快就会变得很麻烦;如果你必须为每一个并发任务(包括像等待键盘输入这样的简单任务)建立一个线程,那么线程的数量就会迅速增加,而跟踪所有这些线程的交互、通信和协调所需的额外复杂性也会增加。

第二,线程越多,切换成本就越高。每当一个线程停止运行,另一个线程重新启动时,你需要在操作系统调度器上做一次往返,而这不是免费的。在一些平台上,生成新的线程也是一个相当沉重的过程。 具有高性能需求的应用程序通常通过重复使用线程和使用操作系统调用来减轻这种成本,允许你在许多相关的操作上进行阻塞,但最终你留下了同样的问题:阻塞接口要求你有与你想要进行的阻塞调用数量一样多的线程。

最后,线程在你的程序中引入了并行性。并发和并行的区别很微妙,但很重要:并发意味着任务的执行是交错进行的,而并行意味着多个任务同时执行。如果你有两个任务,它们的执行用 ASCII 表示可能看起来像_--(并发性)与=====(并行性)。多线程不一定意味着并行,即使你有很多线程,你可能只有一个核心,所以在给定的时间内只有一个线程在执行,但这两者通常是相辅相成的。你可以通过使用 Mutex 或其他同步原语使两个线程在执行中相互排斥,但这也会引入额外的复杂性线程想要并行运行。虽然并行通常是一件好事,但谁不想让自己的程序在更多的内核上运行得更快呢,这也意味着你的程序必须处理对共享数据结构的真正同步访问。这意味着要从 RcCellRefCell 转移到功能更强大但也更慢的 ArcMutex。虽然你可能想在你的并发程序中使用后一种类型以实现并行性,但线程迫使你使用它们。我们将在第 10 章中更详细地研究多线程。

异步接口

现在我们已经探索了同步接口,我们可以看看另一个选择:异步或非阻塞接口。一个异步接口可能不会直接产生结果,而是表明结果将在稍后的时间内出现。这让调用者有机会在此期间做其他事情,而不是在该特定操作完成之前必须睡觉。在 Rust 的术语中,非同步接口是一个返回 Poll 的方法,如清单 8-1 中定义。

#![allow(unused)]
fn main() {
enum Poll<T> {
    Ready(T),
    Pending
}

// 清单 8-1:异步的核心:"你在这里或稍后再来 "的类型
}

Poll 通常出现在名称以 poll 开头的函数的返回类型中--这些方法表明它们可以在不阻塞的情况下尝试进行操作。我们将在本章后面讨论它们是如何做到这一点的,但一般来说,它们会在正常阻塞之前尽可能多地执行操作,然后返回。最重要的是,他们会记住他们离开的地方,这样他们就可以在以后恢复执行,并再次取得更多进展。

这些非阻塞函数使我们能够轻松地同时执行多个任务。例如,如果你想从网络或用户的键盘上读取信息,无论哪一个先有事件,你所要做的就是在一个循环中轮询这两者,直到其中一个返回 Poll::Ready。不需要任何额外的线程或同步!

这里的循环一词应该让你有点紧张。你不希望你的程序在一秒钟内通过 30 亿次的循环,而在下一次输入发生之前,它可能是几分钟。在阻塞接口的世界里,这不是一个问题,因为操作系统只是让线程进入睡眠状态,然后在相关事件发生时将其唤醒,但在这个勇敢的非阻塞的新世界里,我们如何避免在等待时燃烧循环? 这就是本章剩余部分的内容。

标准化的轮询

为了达到一个每个库都能以非阻塞方式使用的世界,我们可以让每个库的作者编写他们自己的轮询方法,所有这些方法的名称、签名和返回类型都略有不同,但这很快就会变得不方便。相反,在 Rust 中,轮询是通过 Future trait 标准化的。清单 8-2 中显示了 Future 的简化版本(我们将在本章的后面回到真正的版本)。

#![allow(unused)]
fn main() {
trait Future {
    type Output;
    fn poll(&mut self) -> Poll<Self::Output>;
}

// 清单 8-2: Future trait 的简化视图
}

实现 Future 特性的类型被称为 futures,代表可能还没有的值。一个 future 可以代表一个网络数据包的下一次到来,鼠标光标的下一次移动,或者仅仅是某个时间点的过去。你可以把 Future<Output = Foo> 理解为 "一个在 future 会产生 Foo 的类型"。像这样的类型在其他语言中经常被称为承诺--它们承诺最终会产生指定的类型。当一个 future 最终返回 Poll::Ready(T) 时,我们说这个 future 被解析为一个 T

有了这个特性,我们可以概括提供 poll 方法的模式。与使用 poll_recvpoll_keypress 这样的方法不同,我们可以使用 recvkeypress 这样的方法,它们都返回具有适当 Output 类型的 impl Future。这并不会改变你必须轮询它们的事实(我们稍后会处理这个问题),但这确实意味着至少有一个标准化的接口来处理这些挂起的值,我们不需要到处使用 poll_ 前缀。

注意:在一般情况下,在一个 future 返回 Poll::Ready 后,你不应该再次轮询。如果你这样做了,那么这个 future 就有权利恐慌了。一个在返回 Ready 后可以安全轮询的 future,有时被称为融合的 future。

人体工程学的 Futures

以我到目前为止所描述的方式编写一个实现 Future 的类型是相当麻烦的。要知道为什么,首先看一下清单 8-3 中相当直接的异步代码块,它只是试图将消息从输入通道 rx 转发到输出通道 tx

#![allow(unused)]
fn main() {
async fn forward<T>(rx: Receiver<T>, tx: Sender<T>) {
    while let Some(t) = rx.next().await {
        tx.send(t).await;
    }
}

// 清单 8-3:使用`async`和`await`实现一个通道转发的 future
}

这段使用 asyncawait 语法编写的代码,看起来与同等的同步代码非常相似,很容易阅读。我们简单地在一个循环中发送我们收到的每一条消息,直到没有更多的消息为止,每一个 await 点都对应着一个同步变体可能阻塞的地方。现在想想,如果你不得不通过手动实现 Future trait 来表达这段代码,由于每次调用 poll 都是从函数的顶部开始的,你需要打包必要的状态,以便从代码最后产生的地方继续下去。结果是相当怪异的,正如清单 8-4 所展示的。

#![allow(unused)]
fn main() {
enum Forward<T> { // (1)
    WaitingForReceive(ReceiveFuture<T>, Option<Sender<T>>),
    WaitingForSend(SendFuture<T>, Option<Receiver<T>>),
}
impl<T> Future for Forward<T> {
    type Output = (); // (2)
    fn poll(&mut self) -> Poll<Self::Output> {
        match self { // (3)
            Forward::WaitingForReceive(recv, tx) => {
                if let Poll::Ready((rx, v)) = recv.poll() {
                    if let Some(v) = v {
                        let tx = tx.take().unwrap(); // (4)
                        *self = Forward::WaitingForSend(tx.send(v), Some(rx)); // (5)
                        // Try to make progress on sending.
                        return self.poll(); // (6)
                    } else {
                        // No more items.
                        Poll::Ready(())
                    }
                } else {
                    Poll::Pending
                }
            }
            Forward::WaitingForSend(send, rx) => {
                if let Poll::Ready(tx) = send.poll() {
                    let rx = rx.take().unwrap();
                    *self = Forward::WaitingForReceive(rx.receive(), Some(tx));
                    // Try to make progress on receiving.
                    return self.poll();
                } else {
                    Poll::Pending
                }
            }
        }
    }
}

// 清单 8-4:手动实现通道转发的 future
}

在 Rust 中,你很少需要写这样的代码,但它让我们了解到事情是如何运作的,所以让我们来看看它。首先,我们将我们的 future 类型定义为枚举 (1),我们将用它来跟踪我们当前正在等待的东西。 这是由于当我们返回 Poll::Pending 时,对 poll 的下一次调用将从函数的顶部开始。我们需要一些方法来知道我们正在做什么,这样我们就知道该继续哪个操作了。此外,我们需要根据我们正在做的事情来跟踪不同的信息:如果我们正在等待一个接收完成,我们需要保留那个 ReceiveFuture(其定义在本例中没有显示),这样我们就可以在下次自己被轮询时轮询它,SendFuture 也是如此。这里的选项可能也会让你觉得很奇怪;我们很快就会回到这些问题上。

当我们为 Forward 实现 Future 时,我们将其输出类型声明为 (2),因为这个 future 实际上并不返回任何东西。相反,当它完成了从输入通道到输出通道的所有转发后,future 就会解析(没有结果)。在一个更完整的例子中,我们的转发类型的 Output 可能是一个 Result ,这样它就可以把来自 receive()send() 的错误从堆栈中传回给正在轮询转发完成情况的函数。但是这段代码已经够复杂了,所以我们改天再讨论这个问题。

Forward 被轮询时,它需要恢复到它最后离开的地方,我们通过匹配当前在 self (3) 中持有的枚举变量来找到它。无论我们进入哪个分支,第一步都是轮询阻止当前操作进展的 future;如果我们试图接收,我们轮询 ReceiveFuture,如果我们试图发送,我们轮询 SendFuture。如果对 poll 的调用返回 Poll::Pending,那么我们就不能取得任何进展,并且我们自己也返回 Poll::Pending。但是,如果当前的 future 解决了,我们就有工作要做了!

当内部 futures 之一解决时,我们需要通过切换存储在 self 中的枚举变量来更新当前的操作是什么。为了做到这一点,我们必须从 self 中移出,调用 Receiver::receiveSender::send--但我们不能这样做,因为我们只有&mut self。因此,我们把要移动的状态存储在一个 Option 中,然后用 Option::take (4) 移出。这样做很傻,因为我们将要覆盖 self (5),因此 OptionS 总是 Some,但有时需要一些技巧来使借用检查器高兴。

最后,如果我们确实取得了进展,我们会再次轮询 self (6),这样如果我们可以立即在待定的发送或接收上取得进展,我们就会这样做。这对于实现真正的 Future 特性时的正确性来说是必要的,我们稍后会回到这个问题上,但现在可以把它看作是一种优化。

我们只是手写了一个状态机:一个有许多可能的状态并在它们之间移动以响应特定事件的类型。这只是一个相当简单的状态机。想象一下,在更复杂的用例中,你不得不写这样的代码,因为你有额外的中间步骤

除了编写笨重的状态机之外,我们还必须知道 Sender::sendReceiver::receive 所返回的 future 的类型,这样我们才能将它们存储在我们的类型中。如果这些方法返回的是 impl Future,我们就没有办法为我们的变量写出类型。 sendreceive 方法还必须拥有发送者和接收者的所有权;如果它们不这样做,它们返回的 future 的生存期就会与self 的借用相联系,当我们从轮询中返回时,它就会结束。但这是行不通的,因为我们正试图将这些 future 存储在 self 中。

注意:你可能已经注意到 Receiver 看起来很像 Iterator 的异步版本。其他人也注意到了同样的事情,标准库正准备为能够有意义地实现 poll_next 的类型添加一个 trait。下一步,这些异步迭代器(通常被称为流)最终可能会得到一流的语言支持,比如直接在它们上面循环的能力。

归根结底,这段代码很难写,很难读,也很难改。例如,如果我们想增加错误处理,代码的复杂性将大大增加。幸运的是,有一个更好的方法!

async/await

Rust 1.39 给了我们 async 关键字和密切相关的 await 后缀操作符,我们在清单 8-3 的原始例子中使用了它们。它们一起为编写像清单 8-5 中的异步状态机提供了更方便的机制。 具体来说,它们可以让你以这样的方式编写代码,甚至看起来都不像一个状态机!

#![allow(unused)]
fn main() {
async fn forward<T>(rx: Receiver<T>, tx: Sender<T>) {
    while let Some(t) = rx.next().await {
        tx.send(t).await;
    }
}

// 清单 8-5:使用 async 和 await 实现一个通道转发的 future,与清单 8-3 重复
}

如果你对 asyncawait 没有太多的经验,清单 8-4 和清单 8-5 之间的区别可能会让你明白为什么 Rust 社区看到它们的登陆会如此兴奋。 但由于这是一本中级书,让我们再深入一点,了解这一小段代码是如何取代更长的手工实现的。要做到这一点,我们首先要谈一谈生成器--实现 asyncawait 的机制。

生成器(Generators)

简单地说,生成器是一段带有一些额外的编译器生成位的代码,这些位使生成器能够在执行过程中停止或生成,然后从最后生成的位置继续执行。以清单 8-3 中的 forward 函数为例。假设它到达了要发送的调用,但是通道当前已满。这个函数不能取得任何进展,但它也不能阻塞(毕竟这是非阻塞代码),所以它需要返回。现在假设通道最终清除,我们想继续发送。如果我们再次从顶部调用 forward,它将再次调用 next,我们之前试图发送的项目将丢失,所以这是不好的。相反,我们变成了一个生成器。

每当 forward 生成器无法继续执行时,它需要将其当前状态存储在某个地方,以便当它最终恢复执行时,它将以正确的状态在正确的位置继续执行。它通过编译器生成的相关数据结构保存状态,该结构包含生成器在给定时间点的所有状态。该数据结构上的一个方法(也是生成的)允许生成器从其当前状态恢复,存储在 &mut self 中,并在生成器再次无法继续时更新状态。

这种 "返回但允许我稍后恢复 "的操作被称为让步 (yielding),这实际上意味着它在返回的同时保留了一些额外的状态在一边。当我们以后想恢复对 forward 的调用时,我们调用已知的进入生成器的入口(resume 方法,这是异步生成器的轮询),生成器检查先前存储在 self 中的状态以决定下一步该做什么。 这与我们在清单 8-4 中手动做的事情完全一样换句话说,清单 8-5 中的代码松散地描述了清单 8-6 中的假设代码。

#![allow(unused)]
fn main() {
generator fn forward<T>(rx: Receiver<T>, tx: Sender<T>) {
    loop {
        let mut f = rx.next();
        let r = if let Poll::Ready(r) = f.poll() { r } else { yield };
        if let Some(t) = r {
            let mut f = tx.send(t);
            let _ = if let Poll::Ready(r) = f.poll() { r } else { yield };
        } else { break Poll::Ready(()); }
    }
}

// 清单 8-6:将 async/await 解构为一个生成器
}

在写这篇文章的时候,生成器实际上在 Rust 中是不能使用的--它们只是被编译器内部用来实现 async/await--但这在将来可能会改变。生成器在很多情况下都很方便,比如实现迭代器而不需要随身携带一个 struct ,或者实现一个 impl Iterator 它可以计算出如何一次产生一个项目。

如果你仔细看一下列表 8-5 和 8-6,一旦你知道每一个 awaityield 实际上是一个函数的返回,它们就会显得有些神奇。毕竟,在函数中有几个局部变量,而且不清楚当我们以后恢复时如何恢复它们。这就是编译器产生的生成器部分发挥作用的地方。编译器透明地注入代码,将这些变量持久化,并在执行时从生成器的相关数据结构中读取这些变量,而不是栈。因此,如果你声明、写入或从某个局部变量 a 中读取,你实际上是在操作类似于 self.a 的东西。这一切真的很奇妙。

手动 forward 实现和异步/等待版本之间微妙但重要的区别是,后者可以跨让步(yield)点保持引用。这使得清单 8-5 中的 Receiver::nextSender::send 等函数可以使用 &mut self,而不是清单 8-4 中的 self。如果我们试图在手动状态机的实现中为这些方法使用 &mut self 接收器,借用检查器将没有办法强制执行存储在 Forward 中的 RreceiverReceiver::next 被调用和它返回的 future 被解析之间不能被引用,因此它将拒绝该代码。只有将 Receiver 移到 future 中,我们才能让编译器相信 Receiver 在其他方面是不可访问的。同时,通过 async/await,借用检查器可以在编译器将其变成状态机之前检查代码,并验证 rx 确实没有被再次访问,直到 future 被丢弃之后,当它的 await 返回时。

生成器的大小

用来支持生成器状态的数据结构必须能够容纳任何一个 yield 点的组合状态。如果你的 async fn 包含,比如说,一个 [u8; 8192],这 8KiB 必须存储在发生器本身。即使你的 async fn 只包含较小的局部变量,它也必须包含它所等待的任何 future,因为它需要在稍后调用 poll 时能够轮询这样的 future。

这种嵌套意味着生成器,以及基于异步函数和块的 future,可以变得相当大,而在你的代码中却没有任何可见的迹象表明其大小增加。 这反过来又会影响你的程序的运行性能,因为这些巨大的生成器可能必须在函数调用之间以及在数据结构中复制,这相当于相当数量的内存复制。事实上,你通常可以通过在你的应用程序的性能档案中寻找在 memcpy 函数中花费的过多时间来识别你的基于生成器的 future 的大小是如何影响性能的!

然而,找到这些大型 future 并不容易,往往需要手动识别长的或复杂的异步函数链。Clippy 将来可能会在这方面提供帮助,但在写这篇文章的时候,你只能靠自己了。当你发现一个特别大的 future 时,你有两个选择:你可以尝试减少异步函数需要的本地状态的数量,或者你可以把 future 移到堆里(用 Box::pin),这样,移动 future 只需要移动它的指针。后者是迄今为止最简单的方法,但它也引入了一个额外的分配和一个指针指示。你最好的选择通常是把有问题的 future 时放在堆上,测量你的性能,然后用你的性能基准来指导你。

PinUnpin

我们还没有完全完成。虽然生成器很整洁,但到目前为止,我所描述的技术出现了一个挑战。特别是,如果生成器中的代码(或者相当于异步块)需要一个对局部变量的引用,会发生什么并不清楚。在清单 8-5 的代码中,如果下一条消息没有立即可用,rx.next() 返回的 future 必须持有对 rx 的引用,这样它就知道在生成器下一次恢复时应该在哪里再试。当生成器停止工作时,future 和 future 所包含的引用就被藏在生成器中。但是,如果生成器被移动,现在会发生什么呢?具体来说,请看清单 8-7 中的代码,它调用了 forward

#![allow(unused)]
fn main() {
async fn try_forward<T>(rx: Receiver<T>, tx: Sender<T>) -> Option<impl Future> {
    let mut f = forward(rx, tx);
    if f.poll().is_pending() { Some(f) } else { None }
}

// 清单 8-7:在轮询后移动一个 future
}

try_forward 函数只进行一次轮询 forward,以尽可能多地转发消息而不阻塞。如果接收器仍然可能产生更多的消息(也就是说,如果它返回 Poll::Pending 而不是 Poll::Ready(None)),这些消息将被推迟到以后的某个时间转发,方法是将转发 future 返回给调用者,调用者可以选择在它认为合适的时候再次轮询。

让我们用我们到目前为止对 asyncawait 的了解来研究一下这里发生了什么。当我们轮询前向生成器时,它经过了不知多少次的 while 循环,如果接收方结束了,最终返回 Poll::Ready(()),否则返回 Poll::Pending。如果它返回 Poll::Pending,生成器包含一个从 rx.next()tx.send(t) 返回的 future。这些 future 都包含对最初提供给 forward 的参数之一的引用(分别是 rxtx),这些参数也必须存储在生成器中。但是当 try_forward 返回整个生成器时,生成器的字段也会移动。因此,rxtx 不再位于内存中的相同位置,而存储在停顿的 future 中的引用也不再指向正确的数据了

我们在这里遇到的是一个自引用的数据结构的情况:一个既持有数据又对该数据进行引用的结构。有了生成器,这些自引用结构很容易构建,如果不能支持它们,将是对人机工程学的重大打击,因为这意味着你将无法在任何 yield 点上持有引用。在 Rust 中支持自引用数据结构的(巧妙的)解决方案是以 Pin 类型和 Unpin 特性的形式出现的。简而言之,Pin 是一个封装类型,它可以防止被封装的类型被(安全地)移动,而 Unpin 是一个标记特质,表示实现的类型可以安全地从 Pin 中移除。

Pin

这里有很多细微的差别,所以让我们从 Pin 包装器的一个具体使用开始。清单 8-2 给了你一个简化的 Future trait,但我们现在准备剥开简化的一部分。清单 8-8 显示了 Future trait 在某种程度上更接近其最终形式。

#![allow(unused)]
fn main() {
trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>) -> Poll<Self::Output>;
}

// 清单 8-8:一个不那么简化的关于 `future` trait 的视图,其中包括 `Pin`
}

特别是,这个定义要求你在 Pin<&mut Self >上调用 poll。一旦你在 Pin 后面有一个值,这就构成了一个契约,这个值将永远不会再移动。这意味着你可以随心所欲地在内部构建自我引用,完全按照你对生成器的要求。

注意:虽然 Future 使用了 Pin,但 Pin 并不与 Future trait 相联系--你可以将 Pin 用于任何自引用的数据结构。

但你如何让一个 Pin 调用轮询?以及 Pin 如何确保包含的值不会移动?为了看看这个魔法是如何运作的,让我们看看 std::pin::Pin 的定义和它的一些关键方法,如清单 8-9 所示。

#![allow(unused)]
fn main() {
struct Pin<P> { pointer: P }
impl<P> Pin<P> where P: Deref {
    pub unsafe fn new_unchecked(pointer: P) -> Self;
}
impl<'a, T> Pin<&'a mut T> {
    pub unsafe fn get_unchecked_mut(self) -> &'a mut T;
}
impl<P> Deref for Pin<P> where P: Deref {
    type Target = P::Target;
    fn deref(&self) -> &Self::Target;
}

// 清单 8-9:std::pin::Pin 和它的关键方法
}

这里有很多东西需要解读,我们要把清单 8-9 中的定义复习几遍,才能让所有的部分都有意义,所以请耐心等待。

首先,你会注意到,Pin 持有一个指针类型。也就是说,它不是直接持有某个 T,而是持有一个通过 Deref 推断到 T 的类型 P。这意味着你不是拥有一个 Pin<MyType>,而是拥有一个 Pin<Box<MyType> 或者 Pin<Rc<MyType> 或者 Pin<&mut MyType>。这样设计的原因很简单--Pin 的主要目标是确保一旦你把一个 T 放在 Pin 后面,这个 T 就不会移动,因为这样做可能会使存储在 T 中的自我引用失效。 在本节的其余部分,我将把 P 称为指针类型,T 称为目标类型。

接下来,注意到 Pin 的构造函数 new_unchecked 是不安全的。这是因为编译器没有办法实际检查指针类型是否确实承诺了被指向的(目标)类型不会再移动。例如,考虑到栈中的变量 foo。如果 Pin 的构造函数是安全的,我们可以做 Pin::new(&mut foo),调用一个需要 Pin<&mut Self> 的方法(从而假设 Self 不会再移动),然后放弃 Pin。在这一点上,我们可以随心所欲地修改 foo,因为它不再是借来的了--包括移动它!然后我们可以再次钉住它,并调用 Self 的方法。然后我们可以再次钉住它,并调用同样的方法,这样它就不会知道它第一次构建的任何自引用指针现在都是无效的了。

Pin 构造函数的安全性

Pin 的构造函数不安全的另一个原因是,它的安全性取决于本身是安全的 traits 的实现。例如,Pin<P>实现 get_unchecked_mut 的方式是使用 DerefMut::deref_mutP 的实现。虽然对 get_unchecked_mut 的调用是不安全的,但 impl DerefMut for P 却不是。然而它收到了一个 &mut self,因此可以自由地(并且没有不安全的代码)移动 T。因此,对 Pin::new_unchecked 的安全要求不仅是指针类型不会让目标类型再次被移动(就像在 Pin<&mut T>的例子中),而且它的 DerefDerefMutDrop 实现也不会将指向的值移动到它们所接收的 &mut self 后面。

然后我们进入 get_unchecked_mut 方法,它给你一个 Pin 指针类型背后的 T 的可变引用。这个方法也是不安全的,因为一旦我们给出了一个 &mut T,调用者就必须保证不会用这个 &mut T 来移动这个 T 或以其他方式使其内存失效,以免任何自引用被失效。如果这个方法不是不安全的,调用者可以调用一个接收 Pin<&mut Self> 的方法,然后在两个 Pin<&mut _> 上调用 get_unchecked_mut 的安全变量,然后使用 mem::swap 来交换 Pin 后面的值。如果我们再在任何一个 Pin 上调用一个接收 Pin<&mut Self> 的方法,它关于 Self 没有移动的假设就会被违反,它所存储的任何内部引用都会失效。

也许令人惊讶的是,Pin<P> 总是实现 Deref<Target = T>,而这是完全安全的。原因是 &T 并不能让你在不写其他不安全代码的情况下移动 T(例如 UnsafeCell,我们将在第 9 章讨论)。这是一个很好的例子,说明为什么不安全块的范围会超出它所包含的代码。如果你在应用程序的某个部分写了一些代码,使用 UnsafeCell(不安全地)替换了一个&后面的 T,那么这个&T 最初可能来自一个 Pin<&mut T>,你现在已经违反了 Pin 后面的 T 可能永远不会移动的不变性,即使你不安全地替换 &T 的地方甚至没有提到 Pin

注意:如果你在阅读本章时浏览了 Pin 文档,你可能已经注意到 Pin::set,它接收一个&mut self 和一个<P as Deref>::Target,并安全地改变 Pin 后面的值。这是有可能的,因为 set 并不返回之前被钉住的值--它只是把它丢在原地,并把新的值存储在那里。因此,它并不违反钉子的不变性:旧的值被放置在钉子之后,从未在钉子之外被访问过。

Unpin: 安全 Pinning 的关键

在这一点上,你可能会问:鉴于获取一个可变的引用是不安全的,为什么不直接让 Pin 持有一个 T?也就是说,与其要求通过指针类型进行转接,不如为 get_unchecked_mut 制定契约,即只有在你没有移动 Pin 的情况下才可以安全调用它。这个问题的答案就在于指针设计中对 Pin 的安全使用。回想一下,我们首先需要 Pin 的全部原因是,我们可以有可能包含对自己的引用的目标类型(比如生成器),并给他们的方法一个保证,即目标类型没有移动,因此内部的自我引用仍然有效。Pin 让我们使用类型系统来执行这一保证,这很好。但不幸的是,就目前的设计而言,Pin 的工作非常不容易。这是因为它总是需要不安全的代码,即使你正在处理一个不包含任何自我引用的目标类型,因此并不关心它是否被移动。

这就是标记属性 Unpin 发挥作用的地方。一个类型的 Unpin 实现简单地断言,当被用作目标类型时,该类型可以安全地从 Pin 中移动出来。也就是说,该类型承诺,当被用作目标类型时,它将永远不会使用任何关于参照物不再移动的 Pin 的保证,因此这些保证可能被破坏。Unpin 是一个自动 trait,就像 SendSync 一样,所以对于任何只包含 Unpin 成员的类型,编译器都会自动实现。只有那些明确选择不使用 Unpin 的类型(比如生成器)和包含这些类型的类型才是 !Unpin

对于 Unpin 的目标类型,我们可以提供一个更简单的安全接口给 Pin,如清单 8-10 所示。

#![allow(unused)]
fn main() {
impl<P> Pin<P> where P: Deref, P::Target: Unpin {
    pub fn new(pointer: P) -> Self;
}
impl<P> DerefMut for Pin<P> where P: DerefMut, P::Target: Unpin {
    fn deref_mut(&mut self) -> &mut Self::Target;
}

// 清单 8-10:对 Unpin 目标类型的安全 API
}

为了理解清单 8-10 中的安全 API,请思考清单 8-9 中不安全方法的安全要求:函数 Pin::new_unchecked 是不安全的,因为调用者必须保证参照物不能被移动到 Pin 之外,而且指针类型的 DerefDerefMutDrop 的实现不会通过它们接收的参照物移动参照物。这些要求是为了确保一旦我们给一个 T 发出一个 Pin,我们就不会再移动这个 T。但是如果这个 TUnpin,它已经声明它不关心它是否被移动,即使它之前被钉住了,所以如果调用者不满足这些要求也没关系!

类似地,get_unchecked_mut 是不安全的,因为调用者必须保证它不会将 T&mut T 中移出--但是在 T:Unpin 中,T 已经声明它在被钉住后也可以被移动,所以这个安全要求不再重要了。这意味着对于 Pin<P>P::Target:Unpin,我们可以简单地提供这两个方法的安全变体(DerefMutget_unchecked_mut 的安全版本)。事实上,我们甚至可以提供一个 Pin::into_inner,如果目标类型是 Unpin,它可以简单地返回拥有的 P,因为 Pin 本质上是不相关的。

获取 Pin 的方法

有了对 PinUnpin 的新理解,我们现在可以在使用清单 8-8 中要求 Pin<&mut Self> 的新 Future 定义上取得进展。第一步是构建所需的类型。 如果 Future 的类型是 Unpin,这一步就很容易了,我们只需使用 Pin::new(&mut future)。如果它不是 Unpin,我们可以通过两种主要方式之一钉住 future:钉住堆或钉住栈。

让我们从钉在堆里开始。Pin 的主要契约是,一旦某个东西被钉住,它就不能移动。Pining API 负责为 Pin 上的所有方法和特性遵守这一契约,所以任何构造 Pin 的函数的主要作用是确保如果 Pin 本身移动,引用值也不会移动。确保这一点的最简单的方法是将参照物放在堆上,然后在 Pin 中放置一个指向参照物的指针。然后你可以随心所欲地移动 Pin,但目标将保持在原来的位置。这就是 Box::pin(安全)方法背后的原理,它接收一个 T 并返回一个 Pin<Box<T>>。它没有任何魔力;它只是断言 Box 遵循 Pin 的构造函数、DerefDrop 契约。

UNPIN BOX

当我们在讨论 Box 的时候,看看 BoxUnpin 的实现吧。Box 类型无条件地对任何 T 实现了 Unpin,即使该 T 不是 Unpin。这可能会让你感到奇怪,因为前面说过,Unpin 是一个自动特性,通常只有在一个类型的所有成员都是 Unpin 的情况下才会为该类型实现。Box 是一个例外,因为它可以提供一个安全的 Pin 构造函数:如果你移动一个 Box<T>,你不会移动 T。 换句话说,无条件的实现断言你可以将一个 Box<T> 移出 Pin,即使 T 不能被移出 Pin。 然而请注意,这并不能使你将一个是 !UnpinT 移出 Pin<Box<T>>

另一个选项,即钉在栈上,就比较麻烦了,在写这篇文章的时候需要一点不安全的代码。我们必须确保被钉住的值在带有&mutPin 被丢弃后不能被访问。如清单 8-11 中的宏所示,我们通过遮蔽(shadowing)该值来实现这一目标,或者通过使用提供该宏的某个 crate 来实现。有一天,它甚至可能被纳入标准库中。

#![allow(unused)]
fn main() {
macro_rules! pin_mut {
    ($var:ident) => {
        let mut $var = $var;
        let mut $var = unsafe { Pin::new_unchecked(&mut $var) };
    }
}

// 清单 8-11:用于钉住栈的宏程序
}

通过获取要钉在栈上的变量名称,该宏确保调用者已经在栈的某个地方得到了它想要钉的值。$var 的遮蔽确保了调用者不能放弃 Pin 并继续使用未被钉住的值(这将违反任何目标类型的!!Unpin 契约)。通过移动存储在$var 中的值,该宏还确保调用者不能在不放弃原始变量的情况下放弃绑定宏声明的$var。具体来说,如果没有这一行,调用者可以写(注意额外的范围):

#![allow(unused)]
fn main() {
let foo = /* */; { pin_mut!(foo); foo.poll() }; foo.mut_self_method();
}

在这里,我们把 foo 的一个钉住的实例给了 poll,但后来我们又对 foo 使用了一个没有 Pin 的 &mut,这违反了引脚契约。另一方面,有了额外的重新赋值,这段代码也会把 foo 移到新的作用域中,使它在作用域结束后无法使用。

因此,在栈上的钉子需要不安全的代码,与 Box::pin 不同,但避免了 Box 引入的额外分配,也可以在 no_std 环境下工作。

返回 Future

我们现在有了钉住的 future,而且我们知道这意味着什么。但你可能已经注意到,在你用 asyncawait 编写的大多数异步代码中,这些重要的钉住东西都没有显示出来。 这是因为编译器把它隐藏起来了。

回想一下我们讨论清单 8-5 的时候,我告诉你 <expr>.await 脱糖会变成类似的东西:

#![allow(unused)]
fn main() {
loop { if let Poll::Ready(r) = expr.poll() { break r } else { yield } }
}

这是一个非常轻微的简化,因为正如我们所看到的,只有当你有一个 Pin<&mut Self> 作为 future 的时候,你才能调用 Future::poll。 解码实际上要复杂一些,如清单 8-12 所示。

#![allow(unused)]
fn main() {
match expr { // (1)
    mut pinned => loop {
    match unsafe { Pin::new_unchecked(&mut pinned) }.poll() { // (2)
            Poll::Ready(r) => break r,
            Poll::Pending => yield,
        }
    }
}

// 清单 8-12:对 `<expr>.await` 进行更准确的解读。
}

匹配 (1) 是一个巧妙的速记,不仅可以确保扩展仍然是一个有效的表达式,而且可以将表达式的结果移到一个变量中,然后我们可以将其钉在栈上。除此之外,主要的新增内容是对 Pin::new_unchecked (2) 的调用。这个调用是安全的,因为对于包含异步块的轮询,由于 Future::poll 的签名,它必须已经被钉住了。而这个异步块是被轮询过的,以便我们能达到对 Pin::new_unchecked 的调用,所以生成器的状态是被钉住的。由于 pinned 存储在与异步块相对应的生成器中(必须如此才能正确恢复 yield),我们知道 pinned 不会再移动。此外,除了通过 Pin 之外,一旦我们进入循环,pinned 就不能被访问,所以没有任何代码能够从 pinned 的值中移动出来。因此,我们满足了 Pin::new_unchecked 的所有安全要求,并且代码是安全的。

进入睡眠(Going to Sleep)

我们在 Pin 的问题上走得很深,但现在我们从另一边出来了,还有一个围绕着 future 的问题可能已经让你的大脑发痒了。如果对 Future::poll 的调用返回 Poll::Pending,你需要在以后的时间里再次调用 poll,以检查你是否可以取得进展。这个东西通常被称为执行器 (executor)。你的执行器可以是一个简单的循环,轮询所有你正在等待的 future,直到它们都返回 Poll::Ready,但这将消耗大量的 CPU 周期,你可能会用在其他更有用的地方,比如运行你的网页浏览器。相反,我们希望执行器做任何它能做的有用的工作,然后进入睡眠状态。它应该保持睡眠状态,直到其中一个 future 能够取得进展,然后才会醒来做另一个程序,然后再去睡觉。

唤醒(Waking Up)

决定何时对一个给定的 future 进行回访的条件差别很大。它可能是 "当一个网络包到达这个端口时","当鼠标光标移动时","当有人在这个通道上发送时","当 CPU 收到一个特定的中断时",甚至是 "在这么多时间过去后"。在此基础上,开发者可以编写自己的 future,包裹多个其他 future,因此,他们可能有多个唤醒条件。一些 future 甚至可以引入他们自己完全自定义的唤醒事件。

为了适应这些众多的用例,Rust 引入了 Waker 的概念:一种唤醒执行器的方式,以示可以取得进展。Waker 是使整个 future 机制发挥作用的原因。执行者构建了一个 Waker,它与执行者用来进入睡眠状态的机制结合在一起,并将 Waker 传递给它所轮询的任何 future。怎么做?通过 Future::poll 的额外参数,到目前为止我还没有告诉你。这一点很抱歉。清单 8-13 给出了 Future 的最终和真正的定义--不再有谎言了!

#![allow(unused)]
fn main() {
trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
`
// 清单 8-13:实际的 Future trait
}

&mut Context 包含 Waker。参数是一个 Context,而不是直接的 Waker,所以如果认为有必要的话,我们可以用额外的上下文来增强异步生态系统的 futures。

Waker 的主要方法是 wake(以及通过引用的变体 wake_by_ref),它应该在 future 可以再次取得进展时被调用。wake 方法不需要任何参数,其效果完全由构建 Waker 的执行者定义。你看,Waker 在秘密地对执行者进行通用。或者,更准确地说,不管是什么构造的 Waker 都可以决定 Waker::wake 被调用时,Waker 被克隆时,Waker 被放弃时,会发生什么。 这一切都通过一个手动实现的 vtable 发生,其功能类似于我们在第二章讨论的动态调度。

构造一个 Waker 的过程有点复杂,它的机制对于使用一个 Waker 来说并不是那么重要,但是你可以在标准库中的 RawWakerVTable 类型中看到构建模块。它有一个构造函数,接收 wakewake_by_ref 以及 CloneDrop 的函数指针。RawWakerVTable 通常在一个执行者的所有 Waker 中共享,它与一个原始指针捆绑在一起,用于保存每个 Waker 实例的特定数据(比如它是为哪个 future 服务的),并被转化为一个 RawWaker。这又被传递给 Waker::from_raw,产生一个安全的 Waker,可以传递给 Future::poll

履行轮询契约(Fulfilling the Poll Contract)

到目前为止,我们已经绕过了 future 对唤醒器的实际作用。 这个想法相当简单:如果 Future::poll 返回 Poll::Pending,那么 future 就有责任确保在 future 下次能够取得进展时调用所提供的唤醒器。大多数 future 维护这一属性的方法是,只有当其他一些 future 也返回 Poll::Pending 时,才返回 Poll::Pending;以这种方式,它琐碎地履行了 poll 的契约,因为内部 future 必须遵循同样的契约。但不可能一路都是乌龟。在某些时候,你会遇到一个不轮询其他 future 的 future,而是做一些事情,比如写到网络套接字或试图在一个通道上接收。这些通常被称为叶子 future,因为它们没有孩子。一个叶子 future 没有内部 future,而是直接代表一些可能还没有准备好返回结果的资源。

注意:轮询契约是清单 8-4 中的递归轮询调用 (6) 对于正确性来说是必要的原因。

叶子 future 通常有两种形式:一种是等待同一进程中的事件(如通道接收器),另一种是等待进程外部的事件(如 TCP 数据包读取)。那些等待内部事件的程序都倾向于遵循相同的模式:将唤醒你的代码存放在可以找到它的地方,当它产生相关事件时,让代码调用唤醒 Waker。例如,考虑一个需要在内存通道中等待消息的叶子 future。它将其 Waker 存储在发送方和接收方共享的通道部分,然后返回 Poll::Pending。当一个发送者出现并向通道中注入消息时,它注意到等待的接收者留在那里的唤醒器,并在从 send 返回之前调用唤醒器。现在接收方被唤醒了,并且轮询合同得到了维护。

处理外部事件的叶子 future 涉及的内容更多,因为产生它们所等待的事件的代码对 future 或 waker 一无所知。最常见的生成代码是操作系统内核,它知道磁盘何时准备好或计时器何时到期,但它也可能是一个 C 语言库,当一个操作完成时调用一个回调到 Rust 或其他类似的外部实体。像这样包裹外部资源的叶子 future 可以旋转一个线程,执行一个阻塞的系统调用(或等待 C 的回调),然后使用内部唤醒机制,但这将是一种浪费;你会在每次操作必须等待时旋转一个线程,并留下许多单一用途的线程坐在周围等待事情。

相反,执行者倾向于提供叶子 future 的实现,在幕后与执行者沟通,安排与操作系统的适当互动。具体如何安排取决于执行器和操作系统,但大致上说,执行器会跟踪所有它应该在下次睡眠时监听的事件源。当叶子的 future 意识到它必须等待一个外部事件时,它就会更新该执行器的状态(它知道这个状态,因为它是由执行器 crate 提供的),以包括该外部事件源和它的 Waker。当执行器不能再取得进展时,它就会收集各种待定的叶子 future 所等待的所有事件源,并对操作系统进行一个大的阻塞调用,告诉它在叶子 future 所等待的任何资源有新的事件时返回。在 Linux 上,这通常是通过 epoll 系统调用实现的;Windows、BSD、macOS 和几乎所有其他操作系统都提供类似的机制。当该调用返回时,执行者在所有与操作系统报告的事件源相关的唤醒者上调用唤醒,因此,轮询合约得到了履行。

注意:反应器(reactor)是执行器的一部分,叶子 future 用它来注册事件源,当它没有更多有用的工作要做时,执行器就会等待它。将执行器和反应器分开是可能的,但将它们捆绑在一起通常会提高性能,因为两者可以更容易地共同优化。

叶子 future 和执行器之间紧密结合的一个连锁反应是,来自一个执行器 crate 的叶子 future 往往不能用于不同的执行器。或者至少,除非叶子 future 的执行器也在运行,否则它们不能被使用。当叶子 future 去存储它的 Waker 并注册它所等待的事件源时,它所对应的执行器需要有该状态的设置并需要运行,这样事件源才会被实际监控并最终被唤醒。有一些方法可以解决这个问题,比如让叶子 future 生成一个执行器,如果一个执行器还没有运行的话,但这并不总是可取的,因为这意味着一个应用程序可以透明地最终有多个执行器同时运行,这可能会降低性能,并意味着你在调试时必须检查多个执行器的状态。

希望支持多个执行器的库 crate 必须对其叶子资源进行通用。例如,一个库可以存储一个泛型的 T:AsyncRead + AsyncWrite,而不是使用特定执行器的 TcpStreamFile future 类型。然而,生态系统还没有确定这些特征到底应该是什么样子的,以及哪些特征是需要的,所以就目前而言,要使代码在执行器上真正通用是相当困难的。例如,虽然 AsyncReadAsyncWrite 在整个生态系统中是比较常见的(或者在必要时可以很容易地进行调整),但目前没有任何 trait 用于在后台运行一个 future(spwning,我们将在后面讨论)或用于表示一个计时器。

醒着是用词不当(Waking Is a Misnomer)

你可能已经意识到 Waker::wake 似乎不一定能唤醒任何东西。例如,对于外部事件(如上一节所述),执行者已经被唤醒了,而它在属于该执行者的 Waker 上调用唤醒,这似乎很愚蠢实际情况是,Waker::wake` 是一个错误的名称--实际上,它是一个特定 future 可运行的信号。 也就是说,它告诉执行器,当它有机会时,应该确保轮询这个特定的 future,而不是再去睡觉,因为这个 future 可以取得进展。如果执行器目前正在睡觉,这可能会唤醒它,这样它就会去轮询那个 future,但这更像是一种副作用,而不是其主要目的。

对于执行器来说,知道哪些 future 是可运行的是很重要的,原因有二。首先,它需要知道什么时候可以停止轮询一个 future 并进入睡眠状态;仅仅轮询每个 future 直到它返回 Poll::Pending 是不够的,因为轮询一个较晚的 future 可能会让一个较早的 future 取得进展。考虑到这样的情况:两个 future 在通道上来回跳动消息,相互之间。当你轮询一个时,另一个就准备好了,反之亦然。在这种情况下,执行器不应该进入睡眠状态,因为总是有更多的工作要做。

第二,知道哪些 future 是可运行的,可以让执行者避免不必要地轮询 future。如果一个执行器管理着数以千计的待定 future,它不应该仅仅因为一个事件使其中一个 future 可运行而轮询所有的 future。如果是这样,执行异步代码确实会变得非常慢。

任务和子执行器(Tasks and Subexecutors)

异步程序中的 future 形成了一棵树:一个 future 可以包含任何数量的其他 future,这些 future 又可以包含其他 future,一直到与 waker 交互的叶子 future。每棵树的根是你给执行者的主要 "运行" 函数的 future。这些根 future 被称为任务,它们是执行器和 future 树之间的唯一联系点。执行者在任务上调用轮询,从那时起,每个包含的 future 的代码必须弄清楚哪些内部 future 需要轮询,一直到相关叶子。

执行者通常为他们轮询的每个任务构建一个单独的 Waker,这样当 wake 被调用时,他们就知道哪个任务是可运行的,并能将其标记为可运行。这就是 RawWaker 中的原始指针的作用--在共享各种 Waker 方法的代码时区分不同的任务。

当执行者最终轮询一个任务时,该任务从其实现的 Future::poll 的顶部开始运行,并且必须从那里决定如何到达更深的 future,现在可以取得进展。由于每个 future 只知道它自己的字段,而不知道整个树的情况,这一切都通过调用轮询来发生,每个轮询都要穿越树中的一条边。

选择哪一个内部 future 进行轮询通常是显而易见的,但并非总是如此。在 async/await 的情况下,要轮询的 future 是我们被阻塞的那个。但是在一个等待几个 future 中的第一个取得进展的 future(通常称为选择),或者等待一组 future 中的所有 future(通常称为连接),有许多选择。一个必须做出这种选择的 future 基本上是一个子执行器。它可以轮询其内部的所有 future,但这样做可能是相当浪费的。相反,这些子执行器在对任何内部 future 调用轮询之前,通常会用它们自己的 Waker 类型来包装它们在轮询的 Context 中收到的 Waker。在包装代码中,他们在对原始 Waker 调用唤醒之前,在自己的状态中把刚刚轮询的 future 标记为可运行。这样,当执行者最终再次轮询子执行者的 future 时,子执行者可以查阅自己的内部状态,以找出哪些内部 future 导致了当前调用的轮询,然后只轮询这些 future。

异步代码中的阻塞

对于从异步代码中调用同步代码,你必须小心谨慎,因为执行器线程在执行当前任务时所花费的任何时间都是它没有在运行其他任务时花费的时间。如果一个任务长期占据当前线程而不回馈给执行者,这可能发生在执行一个阻塞的系统调用(如 std::sync::sleep),运行一个偶尔不回馈的子执行器,或者运行在一个没有等待的紧密循环中,那么当前执行者线程负责的其他任务就不能在这段时间运行。通常情况下,这表现为在某些任务可以取得进展时(比如客户端连接时)和它们实际执行时之间的长时间延迟。

一些多线程执行器实现了工作窃取技术,闲置的执行器线程从繁忙的执行器线程中窃取任务,但这更像是一种缓解措施,而不是一种解决方案。最终,你可能会出现这样的情况:所有的执行器线程都被阻塞了,因此在其中一个阻塞操作完成之前,没有任务可以运行。

一般来说,在执行计算密集型操作或调用可能在异步上下文中阻塞的函数时,应该非常小心。这样的操作应该尽可能转换为异步操作,或者在专用线程上执行,然后使用支持异步的基元(primitive)进行通信,如通道。一些执行器还提供了一些机制,用于指示异步代码的特定部分可能会阻塞,或者用于在循环的上下文中自愿 yield,否则可能不会 yield,这可以构成解决方案的一部分。一个好的经验法则是,任何 future 都不应该能够运行超过 1 毫秒而不返回 Poll::Pending。

用 spawn 把这一切联系起来(Tying It All Together with spawn)

在使用异步执行器时,你可能会遇到一个产生 future 的操作。我们现在可以探索这意味着什么了!让我们通过例子来做。让我们通过举例的方式来进行。首先,考虑清单 8-14 中的简单服务器实现

#![allow(unused)]
fn main() {
async fn handle_client(socket: TcpStream) -> Result<()> {
    // Interact with the client over the given socket.
} 

async fn server(socket: TcpListener) -> Result<()> {
    while let Some(stream) = socket.accept().await? {
        handle_client(stream).await?;
    }
}

// 清单 8-14:按顺序处理连接
}

顶层的服务器函数本质上是一个大的 future,它监听新的连接,并在新的连接到来时做一些事情。你把这个 future 交给执行器并说 "运行这个",由于你不希望你的程序立即退出,你可能会让执行器在这个 future 上阻塞。 也就是说,调用执行器来运行服务器 future 将不会返回,直到服务器 future 解决,这可能永远不会(另一个客户端可能稍后到达)。

现在,每当一个新的客户端连接进来时,清单 8-14 中的代码就会建立一个新的 future(通过调用 handle_client)来处理这个连接。因为处理本身就是一个 future,所以我们 await 它,然后转到下一个客户端连接。

这种方法的缺点是,我们每次只处理一个连接,没有并发性。一旦服务器接受了一个连接,就会调用 handle_client 函数,由于我们在等待它,所以我们不会再绕过这个循环,直到 handle_client 的返回 future 解决(估计是在该客户离开后)。

我们可以在此基础上进行改进,保留一组所有的客户 future,让服务器接受新连接的循环也检查所有的客户 future,看看是否有客户可以取得进展。 清单 8-15 显示了这可能是什么样子。

#![allow(unused)]
fn main() {
async fn server(socket: TcpListener) -> Result<()> {
    let mut clients = Vec::new();
    loop {
        poll_client_futures(&mut clients)?;
        if let Some(stream) = socket.try_accept()? {
            clients.push(handle_client(stream));
        }
    }
}

// 清单 8-15:处理与手动执行器的连接
}

这至少可以同时处理许多连接,但这是很复杂的。它的效率也不高,因为代码现在是忙循环,在处理我们已有的连接和接受新的连接之间切换。而且它每次都要检查每个连接,因为它不知道哪些连接可以取得进展(如果有的话)。它也不能在任何时候 await,因为这将阻止其他 future 取得进展。你可以实现你自己的 wakers,以确保代码只轮询可以取得进展的 future,但最终这是在开发你自己的迷你执行器的路上。

坚持只用一个服务器任务的另一个缺点是,服务器最终是单线程的,因为它内部包含了所有客户端连接的 future。只有一个任务,为了对其进行轮询,代码必须持有对该任务的 future 的独占引用(poll 取得 Pin<&mut Self>),而在同一时间只有一个线程可以持有。

解决办法是让每个客户的 future 成为自己的任务,让执行者在所有任务之间进行复用。你猜对了,你是通过生成 future 来做的。执行器将继续在服务器的 future 任务上进行阻塞,但如果它不能在该 future 任务上取得进展,它将使用其执行机制在幕后同时在其他任务上取得进展。最重要的是,如果执行器是多线程的,而你的客户端 future 是 Send 的,它可以并行地运行它们,因为它可以同时持有独立任务的&mutS。清单 8-16 给出了一个例子,说明这可能是什么样子。

#![allow(unused)]
fn main() {
async fn server(socket: TcpListener) -> Result<()> {
    while let Some(stream) = socket.accept().await? {
    // Spawn a new task with the Future that represents this client.
    // The current task will continue to just poll for more connections
    // and will run concurrently (and possibly in parallel) with handle_client.
    spawn(handle_client(stream));
}

// 清单 8-16: 生成 future 以创建更多可以并发轮询的任务
}

当你 spawn 一个 future,从而使其成为一个任务,这有点像 spawn 一个线程。future 继续在后台运行,并与交给执行器的任何其他任务同时复用。然而,与生成的线程不同,生成的任务仍然依赖于被执行器轮询。如果执行器停止运行,无论是因为你放弃它,还是因为你的代码不再运行执行器的代码,那些生成的任务将停止进展。在服务器的例子中,想象一下,如果主服务器的 future 由于某种原因解决了,会发生什么。由于执行器已将控制权交还给你的代码,它不能继续做,嗯,任何事情。多线程执行器通常会产生后台线程,即使执行器将控制权交还给用户的代码,也会继续轮询任务,但并不是所有的执行器都会这样做,所以在你依赖这种行为之前,请检查你的执行器。

总结

在本章中,我们看了一下 Rust 中可用的异步结构的幕后。我们看到了编译器是如何实现生成器和自引用类型的,以及为什么这项工作对于支持我们现在所知道的 async/await 是必要的。 我们还探讨了 future 是如何执行的,以及当任何特定时刻只有部分任务可以取得进展时,wakers 是如何允许执行者在任务之间进行复用的。在下一章中,我们将解决可能是 Rust 中最深层和讨论最多的领域:不安全代码。深吸一口气,然后翻开这一页。