第十章 并发性(和并行性)

通过这一章,我希望为你提供所有你需要的信息和工具,以便在你的 Rust 程序中有效地利用并发性,在你的库中实现对并发性的支持,并正确使用 Rust 的并发性原语。我不会直接教你如何实现一个并发的数据结构或编写一个高性能的并发应用程序。相反,我的目标是让你充分了解底层机制,使你有能力在你可能需要的情况下自己使用它们。

并发有三种类型:单线程并发(如我们在第 8 章讨论的 async/await),单核多线程并发,以及多核并发,产生真正的并行。每种类型都允许你的程序中的并发任务以不同的方式交错执行。如果你考虑到操作系统调度和抢占的细节,甚至还有更多的子类型,但我们不会太深入地讨论这个问题。

在类型层面上,Rust 只表示并发的一个方面:多线程。一个类型要么可以被一个以上的线程安全使用,要么就不可以。即使你的程序有多个线程(所以是并发的),但只有一个核心(所以不是并行的),Rust 必须假设,如果有多个线程,就可能有并行性。我们将要讨论的大多数类型和技术都同样适用于两个线程是否真的并行执行,所以为了保持语言的简单性,我将在本章中使用 "事情或多或少在同一时间运行 "这一非正式意义上的并发性一词。当区别很重要的时候,我就会指出来。

Rust 基于类型的安全多线程方法的特别之处在于,它不是编译器的一个功能,而是一个库的功能,开发人员可以扩展到开发复杂的并发合约。由于线程安全在类型系统中是通过 SendSync 的实现和约束来表达的,它一直传播到应用程序代码中,整个程序的线程安全仅通过类型检查来检查。

Rust 编程语言已经涵盖了大多数涉及到并发的基础知识,包括 SendSync 特性、ArcMutex 以及通道。因此,我不会在这里重申很多内容,除非在其他主题的背景下值得特别重复的地方。相反,我们将看看是什么让并发变得困难,以及一些常见的并发模式,旨在处理这些困难。在深入研究如何使用原子操作来实现较低级别的并发操作之前,我们还将探讨并发和异步如何相互作用(以及它们如何不相互作用)。最后,我将以一些建议来结束本章,这些建议是关于如何在使用并发代码时保持理智。

并发的麻烦

在我们深入研究并发编程的良好模式和 Rust 的并发机制的细节之前,值得花一些时间来理解为什么并发首先是个挑战。也就是说,为什么我们需要为并发代码提供特殊的模式和机制?

正确性(Correctness)

并发的主要困难是协调对一个在多个线程之间共享的资源的访问--特别是写入访问。如果很多线程想共享一个资源,仅仅是为了读取它,那么这通常很容易:你把它放在一个 Arc 里,或者把它放在你可以得到一个 'static 的东西里,然后你就完成了。 但是一旦任何线程想写,各种问题都会出现,通常是以数据竞争的形式出现。简而言之,当一个线程在更新共享状态的同时,另一个线程也在访问该状态,无论是读取还是更新,都会发生数据竞争。如果没有额外的保障措施,第二个线程可能会读取部分被覆盖的状态,破坏第一个线程所写的部分内容,或者根本看不到第一个线程所写的内容。一般来说,所有的数据竞争都被认为是未定义的行为。

数据竞争是一类更广泛的问题的一部分,这些问题主要(尽管不是唯一)发生在并发环境中:竞争条件。当一个指令序列可能出现多种结果时,就会出现竞争条件,这取决于系统中其他事件的相对时间。这些事件可以是线程执行一段特定的代码,也可以是定时器响起,也可以是网络数据包进来,还可以是其他任何时间可变的事件。 与数据竞争不同,竞争条件本身并不坏,也不被认为是未定义的行为。然而,当特别奇特的竞争发生时,它们是产生错误的温床,正如你在本章中看到的那样。

性能

通常情况下,开发人员在他们的程序中引入并发性,希望能够提高性能。或者,更准确地说,他们希望并发性将使他们能够通过利用更多的硬件资源在每秒内执行更多的操作。这可以通过让一个线程运行而另一个线程在等待来实现,也可以通过让线程在每个核心上同时进行工作来跨越多个核心,否则这些工作会在一个核心上连续发生。大多数开发者在谈论并发性时指的是后一种性能增益,而并发性通常是以可扩展性为框架的。 在这种情况下,可扩展性意味着 "这个程序的性能随着核心数量的增加而扩展",这意味着如果你给你的程序更多的核心,它的性能就会提高。

虽然实现这样的提速是可能的,但它比看起来要难。可扩展性的最终目标是线性可扩展性,即内核数量增加一倍,你的程序在每单位时间内完成的工作量就增加一倍。线性可扩展性通常也被称为完美可扩展性。然而,在现实中,很少有并发程序能够实现这样的速度提升。亚线性扩展更为常见,当你从一个核心到两个核心时,吞吐量几乎是线性增长的,但增加更多的核心会产生递减的回报。有些程序甚至会出现负扩展,即让程序访问更多的内核会降低吞吐量,通常是因为许多线程都在争夺一些共享资源。

想一想一群人试图弹出一块气泡膜上的所有气泡,可能会有帮助--增加更多的人最初会有帮助,但在某些时候你会得到递减的回报,因为拥挤使得任何一个人的工作都更难。如果参与的人特别没有效率,你的小组最终可能会站在一起讨论谁应该弹出下一个泡泡,而根本没有弹出任何泡泡这种本应平行执行的任务之间的干扰被称为争夺,是良好扩展的克星。争论可能以多种方式出现,但主要的犯罪者是相互排斥、共享资源耗尽和虚假共享。

相互排斥(Mutual Exclusion)

当只有一个并发任务被允许在任何时候执行一段特定的代码时,我们说该段代码的执行是相互排斥的--如果一个线程执行它,其他线程就不能同时这样做。这方面的典型例子是互斥锁,或称互斥器,它明确规定在任何时候只有一个线程可以进入你的程序代码的特定关键部分。然而,相互排斥也可以隐性地发生。例如,如果你建立一个线程来管理一个共享资源,并通过 mpsc 通道向它发送作业,该线程有效地实现了相互排斥,因为每次只有一个这样的作业可以执行。·

在调用操作系统或库调用时,也会出现相互排斥的情况,这些调用在内部强制要求单线程访问一个关键部分。例如,多年来,标准的内存分配器对一些分配要求相互排斥,这使得内存分配成为在其他高度并行的程序中产生重大争论的操作。同样地,许多操作系统的操作看起来应该是独立的,比如在同一目录下创建两个不同名字的文件,最终可能不得不在内核中顺序进行。

注意:可扩展的并发分配是 jemalloc 内存分配器存在的理由!

互斥是并行加速的最明显的障碍,因为根据定义,它迫使你的程序的某些部分串行执行。即使你让你的程序的其余部分完美地与内核的数量相匹配,你能达到的总速度也会受到互斥、串行部分的长度的限制。请注意你的互斥部分,并设法将它们限制在严格必要的地方。

注意:对于理论上的人来说,可以用阿姆达尔定律来计算由于代码的相互排斥部分而导致的可实现的速度提升的极限。

共享资源耗尽

不幸的是,即使你在任务中实现了完美的并发性,这些任务需要与之互动的环境本身也可能不是完全可扩展的。内核在给定的 TCP 套接字上每秒只能处理这么多的发送,内存总线一次只能做这么多的读取,而你的 GPU 的并发能力是有限的。这是没办法解决的。环境通常是完美的可扩展性在实践中崩溃的地方,对这种情况的修复往往需要大量的重新设计(甚至是新的硬件!),所以我们不会在本章中多谈这个话题。请记住,可扩展性很少是你可以 "实现 "的,而更多的是你要努力争取的东西。

虚假共享(False Sharing)

当两个不应该互相争夺的操作发生争夺时,就会发生虚假共享,从而阻止了有效的同时执行。这通常是因为这两个操作碰巧在某些共享资源上相交,尽管它们使用的是该资源的不相关部分。

最简单的例子就是锁的过度共享,在这种情况下,一个锁守护着一些复合状态,而两个原本独立的操作都需要使用这个锁来更新它们的特定状态部分。这反过来意味着这些操作必须以串行方式执行,而不是并行方式。在某些情况下,有可能将单个锁分成两个,每个不相干的部分都有一个,这样就可以使操作并行进行。然而,像这样分割一个锁并不总是直接的,状态可能共享一个锁,因为第三个操作需要锁定状态的所有部分。通常情况下,你仍然可以分割锁,但你必须注意不同线程获取分割锁的顺序,以避免当两个操作试图以不同的顺序获取锁时可能发生的死锁(如果你好奇的话,可以查一下哲学家就餐问题)。另外,对于某些问题,你可以通过使用底层算法的无锁版本来完全避免关键部分,尽管这些算法也是很难搞好的。归根结底,虚假共享是一个很难解决的问题,并没有一个单一的万能解决方案,但识别问题是一个好的开始。

一个更微妙的虚假共享的例子发生在 CPU 层面,正如我们在第二章中简要讨论的那样。CPU 在内部对内存的操作是以缓存线--内存中连续字节的较长序列--而不是单个字节为单位,以分摊内存访问的成本。例如,在大多数英特尔处理器上,高速缓存线的大小是 64 字节。这意味着,每一个内存操作最终都要读取或写入 64 字节的某个倍数。当两个内核想要更新两个不同的字节的值,而这两个字节恰好在同一条高速缓存线上时,错误的共享就开始发挥作用;这些更新必须按顺序执行,尽管这些更新在逻辑上是不相干的。

想象一下,你分配了一个整数数组来表示每个线程完成了多少操作,但是这些整数都在同一个缓存线内--现在,所有的并行线程都会为他们的每一个操作争夺这一个缓存线。如果操作相对较快的话,大部分的执行时间可能都会花在争夺这些计数器上。

避免错误的缓存行共享的诀窍是把你的值填充到一个缓存行的大小。这样一来,两个相邻的值总是落在不同的缓存线上。当然,这也会增加你的数据结构的大小,所以只有在基准测试表明有问题时才使用这种方法。

可扩展性的成本

你应该注意并发性的一个正交方面,即首先引入并发性的成本。编译器在优化单线程代码方面非常出色--毕竟他们已经做了很长时间了--而且单线程代码往往比并发代码可以使用更少的昂贵保障措施(比如锁、通道或原子指令)。总的来说,并发的各种成本可以使一个并行程序比它的单线程对应的程序慢,给定任何数量的核心这就是为什么在优化和并行化之前和之后都要进行测量的原因:结果可能会让你吃惊。

如果你对这个话题感到好奇,我强烈建议你阅读 Frank McSherry 的 2015 年论文 "Scalability! 但代价是什么?"(https://www.frankmcsherry.org/assets/COST.pdf),其中揭露了一些特别恶劣的 "昂贵的扩展 "的例子。

并发模型

Rust 有三种模式可以为你的程序添加并发性,你会经常遇到这种情况:共享内存并发性、工作者池和角色 (actors)。仔细研究每一种你可以添加并发的方式将需要一本自己的书,所以在这里我将专注于这三种模式。

共享内存

从概念上讲,共享内存并发是非常直接的:线程通过操作它们之间共享的内存区域进行合作。这可能采取由突变器保护的状态的形式,或者存储在支持许多线程并发访问的 HashMap 中。许多线程可能在不相干的数据片断上执行相同的任务,例如许多线程在一个 Vec 的不相干的子范围上执行某个功能,或者它们可能在执行需要一些共享状态的不同任务,例如在一个数据库中,一个线程处理用户对一个表的查询,而另一个线程在后台优化用于存储该表的数据结构。

当你使用共享内存并发时,你对数据结构的选择是非常重要的,特别是当所涉及的线程需要非常紧密地合作时。一个普通的 mutex 可能会阻止超过非常少的核心数量的扩展,一个读者/写者锁可能会允许更多的并发读取,代价是更慢的写入,而一个分片的读者/写者锁可能允许完全可扩展的读取,代价是使写入高度混乱。同样地,一些并发散列图旨在实现良好的全面性能,而另一些则专门针对,比如说,在写入很少的情况下,并发读取。一般来说,在共享内存并发中,你想使用专门为你的目标用例而设计的数据结构,这样你就可以利用优化,将你的应用程序不关心的性能方面与它关心的方面进行交换。

共享内存并发很适合线程需要共同更新一些共享状态的用例,而这些共享状态是不相通的。也就是说,如果一个线程需要用某个函数 f 来更新状态 s,而另一个线程需要用某个函数 g 来更新状态,并且 f(g(s)) != g(f(s)),那么共享内存并发可能是必要的。如果不是这种情况,其他两种模式可能更适合,因为它们倾向于导致更简单和更高性能的设计。

注意:有些问题有已知的算法,可以在不使用锁的情况下提供并发的共享内存操作。随着内核数量的增加,这些无锁算法可能会比基于锁的算法有更好的扩展性,尽管由于它们的复杂性,它们的单核性能也往往较慢。像往常一样,对于性能问题,首先要进行基准测试,然后寻找其他的解决方案。

工作池(Worker Pools)

在工人池模型中,许多相同的线程从一个共享的工作队列中接收工作,然后它们完全独立地执行。 例如,网络服务器通常有一个工人池来处理进入的连接,异步代码的多线程运行时往往使用工人池来集体执行一个应用程序的所有 futures(或者更准确地说,它的顶级任务)。

共享内存并发和工作者池之间的界限往往是模糊的,因为工作者池倾向于使用共享内存并发来协调它们如何从队列中获取作业,以及如何将未完成的作业返回到队列中。例如,假设你正在使用数据并行库 rayon,对一个向量的每个元素并行地执行一些函数。在幕后,rayon 启动了一个工作池,将向量分割成子范围,然后将子范围分配给池中的线程。当池中的线程完成一个范围时,rayon 会安排它开始工作于下一个未处理的子范围。向量在所有工作线程之间共享,各线程通过一个共享内存队列式的数据结构进行协调,该结构支持工作偷窃。

工作窃取是大多数工作池的一个关键特征。其基本前提是,如果一个线程提前完成其工作,并且没有更多未分配的工作可用,该线程可以窃取已经分配给不同工作线程但尚未启动的工作。并非所有的工作都需要相同的时间来完成,所以即使每个工作者被赋予相同数量的工作,一些工作者最终可能比其他工作者更快地完成他们的工作。 与其坐等那些吸引了长期工作的线程完成,那些提前完成工作的线程应该帮助那些落伍者,以便整个操作更快完成。

要实现一个支持这种工作窃取的数据结构,同时又不会因为线程不断试图从对方那里窃取工作而产生大量的开销,这是一项相当艰巨的任务,但这个功能对于高性能的工作池来说是至关重要的。如果你发现自己需要一个工人池,你最好的选择通常是使用一个已经做了很多工作的工人池,或者至少是重用现有工人池的数据结构,而不是自己从头开始写一个。

当每个线程执行的工作相同,但执行的数据不同时,工人池就很适合。在 Rayon 并行映射操作中,每个线程执行相同的映射计算;他们只是在底层数据的不同子集上执行。在多线程异步运行时,每个线程都简单地调用 Future::poll;它们只是在不同的 futures 上调用它。如果你开始不得不区分你的线程池中的线程,那么不同的设计可能更合适。

连接池

连接池是一种共享内存结构,它保存一组已建立的连接,并将其分配给需要连接的线程。这是管理与外部服务连接的库中的一种常见设计模式。如果一个线程需要一个连接,但没有可用的连接,要么建立一个新的连接,要么强迫该线程阻塞。当一个线程用完一个连接后,它就会把这个连接返回到池子里,从而使它对其他可能在等待的线程可用。

通常情况下,连接池最难的任务是管理连接的生存期。一个连接可以以最后使用它的线程所处的任何状态返回到池中。因此,连接池必须确保与连接相关的任何状态,无论是在客户端还是在服务器上,都已被重置,以便当连接随后被其他线程使用时,该线程可以像被赋予一个新的、专用的连接一样。

actors

角色并发模型在许多方面与工作者池模型相反。worker 池有许多共享作业队列的相同线程,而 actor 模型有许多独立的作业队列,每个作业主题对应一个。每个作业队列提供给一个特定的角色,该角色处理属于应用程序状态子集的所有作业。该状态可能是数据库连接、文件、指标集合数据结构,或任何其他您可以想象到的许多线程可能需要能够访问的结构。无论它是什么,单个角色拥有该状态,如果某个任务想要与该状态交互,它需要向拥有该状态的角色发送一条消息,总结它希望执行的操作。当拥有该消息的角色收到该消息时,它将执行指定的操作,并使用操作的结果(如果相关的话)响应查询任务。由于角色对其内部资源具有独占访问权,因此除了消息传递所需之外,不需要任何锁或其他同步机制。

角色模式的一个关键点是角色之间相互交谈。例如,如果负责记录日志的角色需要写入文件和数据库表,它可能会向负责这些的角色发送消息,要求他们执行各自的操作,然后继续进行下一个日志事件。这样,角色模型比轮子上的辐条,更像一个 web 用户请求到一个 web 服务器可能开始作为一个单独的请求的演员负责连接但也可能产生数十,数百,甚至数千演员更深层次的信息系统满足用户的请求之前。

角色模型中没有任何内容要求每个角色都是自己的线程。相反,大多数角色系统建议应该有大量的角色,因此每个角色应该映射到一个任务而不是一个线程。毕竟,角色只有在执行时才需要独占访问其包裹的资源,并不关心它们是否在自己的线程上。事实上,角色模型经常与工作者池模型结合使用--例如,一个使用多线程异步运行时 Tokio 的应用程序可以为每个角色生成一个异步任务,然后 Tokio 会将每个角色的执行作为其工作者池中的一个工作。因此,一个给定的角色的执行可能会在工作池中从线程移动到线程,因为角色的产生和恢复,但每次角色的执行都会保持对其包裹资源的独占访问。

当你有许多可以相对独立运行的资源,并且每个资源内几乎没有并发的机会时,角色并发模型就很适合。例如,一个操作系统可能有一个负责每个硬件设备的角色,而一个网络服务器可能有一个负责每个后端数据库连接的角色。如果你只需要几个角色,如果工作在角色之间有明显的倾斜,或者如果一些角色成长得很大,那么角色模型就不那么好用了--在所有这些情况下,你的应用程序最终可能会被系统中单个角色的执行速度所瓶颈。 而且,由于角色都期望独占他们的一小块世界,你不能轻易地将这个瓶颈角色的执行并行化。

异步和并行

正如我们在第 8 章中所讨论的,Rust 中的异步可以实现无并行的并发--我们可以使用 selectsjoins 让一个线程轮询多个 future,并在其中一个、一些或所有 futures 完成时继续进行。因为没有涉及到并行性,所以使用 future 的并发性从根本上来说并不要求这些 future 被发送。即使 spawn 一个 future ,作为一个额外的顶级任务运行,从根本上说也不需要 Send,因为一个执行线程可以同时管理许多 future 的轮询。

然而,在大多数情况下,应用程序希望同时拥有并发性和并行性。例如,如果一个网络应用为每个传入的连接构建一个 future,因此同时有许多活动连接,它可能希望异步执行器能够利用主机上的一个以上的核心。这不会自然发生:你的代码必须明确地告诉执行器哪些 future 可以并行运行,哪些不能。

特别是,必须向执行者提供两个信息,让它知道它可以将 future 中的工作分散到一个工人线程池中。首先是有关的 future 是 Send--如果不是,执行者不允许将 future 发送给其他线程进行处理,而且不可能有并行;只有构建这种 future 的线程可以轮询它们。

第二条信息是如何将 future 分割成可以独立运行的任务。这与第 8 章中关于任务与 future 的讨论有关:如果一个巨大的 Future 包含一些 Future 实例,这些实例本身对应于可以并行运行的任务,那么执行者仍然必须在顶层 Future 上调用 poll,而且必须从一个单线程中调用,因为 poll 需要 &mut self。因此,为了实现与 Futures 的并行,你必须明确地生成你希望能够并行运行的 Futures。同时,由于第一个要求,你用于这样做的执行器函数将要求传入的 Future 是 Send

异步同步原语

大多数存在于阻塞代码中的同步原语(想想 std::sync)也有异步的对应物。有异步的通道、互斥、读者/写者锁、障碍以及其他各种类似结构的变体。我们需要这些,因为正如第 8 章中所讨论的,在未来程序中进行阻塞会耽误执行者可能需要做的其他工作,所以是不可取的。

然而,这些基元的异步版本往往比同步版本慢,因为执行必要的唤醒需要额外的机械。出于这个原因,只要不存在阻塞执行器的风险,即使在异步上下文中也可能要使用同步基元。例如,虽然一般来说,获取一个 Mutex 可能会阻塞很长时间,但对于一个特定的 Mutex 来说,这可能不是真的,因为它可能只是很少被获取,而且只在很短的时间内。在这种情况下,阻断很短的时间,直到 Mutex 再次变得可用,实际上可能不会造成任何问题。你要确保在持有 MutexGuard 的时候永远不要屈服或执行其他长期运行的操作,但除此之外,你应该不会遇到问题。

不过,与此类优化一样,请确保首先进行测量,如果同步原语能带来明显的性能改进,则只选择同步原语。如果没有,那么在异步上下文中使用同步基元所带来的额外脚步声可能就不值得了。

低级别的并发性(Lower-Level Concurrency)

标准库提供了 std::sync::atomic 模块,它提供了对底层 CPU 基元的访问,像通道和互斥这样的高级结构是用它构建的。这些基元以原子类型的形式出现,名字以 Atomic--AtomicUsizeAtomicI32AtomicBoolAtomicPtr 等开头,还有两个名为 fencecompiler_fence 的函数。我们将在接下来的几节中分别讨论这些问题。

这些类型是用来构建任何需要在线程之间进行通信的代码的块。互斥、通道、屏障、并发哈希表、无锁栈以及所有其他同步构造最终都依赖于这几个基元来完成其工作。它们在线程之间的轻量级合作中也很方便,因为像互斥这样的重量级同步是过度的(没必要的)--例如,增加一个共享计数器或将一个共享布尔值设置为真。

原子类型很特别,因为它们对多个线程试图并发访问它们时发生的情况有定义的语义。这些类型都支持(大部分)相同的 API:loadstorefetch_*compare_exchange。在本节的其余部分,我们将研究这些类型的作用,如何正确使用它们,以及它们的用途。但首先,我们必须谈谈低级别的内存操作和内存排序。

内存操作

非正式地,我们经常把访问变量称为 "读" 或 "写" 内存。实际上,在代码使用变量和访问内存硬件的实际 CPU 指令之间有很多机制。为了理解并发内存访问的行为,至少在高层次上理解这种机制是很重要的。

编译器决定当你的程序读取一个变量的值或给它分配一个新的值时发出什么指令。它被允许对你的代码进行各种转换和优化,并可能最终重新排列你的程序语句,消除它认为多余的操作,或使用 CPU 寄存器而不是实际内存来存储中间计算。编译器对这些转换有一些限制,但最终只有一部分变量访问实际上是作为内存访问指令而结束的。

在 CPU 层面,内存指令有两种主要形式:加载(loads)和存储(stores)。加载是将字节从内存中的某个位置拉到 CPU 寄存器中,存储是将字节从 CPU 寄存器中存储到内存中的某个位置。加载和存储一次只能操作一小块内存:在现代 CPU 上通常是 8 字节或更少。如果一个变量的访问跨度超过了单次加载或存储所能访问的字节数,编译器会根据情况自动将其变成多条加载或存储指令。CPU 在如何执行程序指令方面也有一些回旋余地,以更好地利用硬件和提高程序性能。例如,现代 CPU 经常并行执行指令,甚至不按顺序执行,当它们之间没有相互依赖关系时。在每个 CPU 和你的计算机的 DRAM 之间还有几层缓存,这意味着对一个给定的内存位置的加载,不一定能看到该内存位置的最新存储,按壁时钟时间计算。

在大多数代码中,编译器和 CPU 只允许以不影响结果程序语义的方式来转换代码,所以这些转换对程序员来说是不可见的。 然而,在并行执行的背景下,这些转换会对应用行为产生重大影响。因此,CPU 通常提供多个不同的加载和存储指令的变体,每个指令对 CPU 如何重新排序以及如何与其他 CPU 上的并行操作交错进行不同的保证。同样,编译器(或者说,编译器编译的语言)提供了不同的注释,你可以用它来强制执行其内存访问的一些子集的特定执行约束。在 Rust 中,这些注释是以原子类型和它们的方法的形式出现的,我们将在本节剩下的时间里把它们挑出来。

原子类型

Rust 的原子类型之所以被称为原子类型,是因为它们可以被原子地访问--也就是说,原子类型变量的值是一次性写入的,绝不会使用多次存储来写入,这就保证了对该变量的加载不能观察到只有组成该值的一些字节发生了变化,而其他的字节还没有变化(尚未)。通过与非原子类型的对比,这一点最容易理解。例如,为一个类型为 (i64,i64) 的元组重新分配一个新的值,通常需要两条 CPU 存储指令,每个 8 字节的值一条。如果一个线程执行这两条存储指令,另一个线程可能会(如果我们暂时忽略借用检查器)在第一次存储后但在第二次存储前读取元组的值,从而最终得到元组值的不一致的观点。它最终会读取第一个元素的新值和第二个元素的旧值,一个从未被任何线程实际存储的值。

CPU 只能以原子方式访问特定大小的值,所以只有几种原子类型,所有这些都在原子模块中。每个原子类型都是 CPU 支持的原子访问的大小之一,有多种变化,比如值是否被签名,以及区分原子的 usize 和指针(与 usize 大小相同)。此外,原子类型有明确的方法来加载和存储它们所持有的值,还有一些更复杂的方法,我们将在后面讨论,这样,程序员编写的代码和产生的 CPU 指令之间的映射就更清楚了。例如,AtomicI32::load 执行一个有符号的 32 位值的单一加载,AtomicPtr::store 执行一个指针大小(64 位平台上的 64 位)值的单一存储。

内存排序

大多数关于原子类型的方法都有一个 Ordering 类型的参数,它决定了原子操作所受到的内存排序限制。在不同的线程中,一个原子值的加载和存储可能会被编译器和 CPU 排序,而这些排序与该原子值上的每个原子操作所要求的内存排序相一致。在接下来的几节中,我们将看到一些例子,说明为什么对排序的控制很重要,而且对于从编译器和 CPU 那里获得预期的语义是必要的。

内存排序往往是反直觉的,因为我们人类喜欢从上到下阅读程序,并想象它们是逐行执行的,但当代码进入硬件时,实际上不是这样执行的。内存访问可以被重新排序,甚至完全被忽略,一个线程的写入可能不会立即被其他线程看到,即使后来的程序顺序写入已经被观察到。

可以这样想:每个内存位置都会看到来自不同线程的修改序列,而不同内存位置的修改序列是独立的。如果两个线程 T1 和 T2 都写到内存位置 M,那么即使用户用秒表测量到 T1 先执行,T2 对 M 的写入仍然可能在两个线程的执行之间没有任何其他限制的情况下先发生。从本质上讲,计算机在确定某个内存位置的值时并不考虑壁钟时间,重要的是程序员对什么是有效的执行的约束。例如,如果 T1 写到 M,然后产生线程 T2,T2 再写到 M,计算机必须承认 T1 的写先发生,因为 T2 的存在取决于 T1。

如果这很难理解,不要着急,内存排序可能是令人费解的,而语言规范往往使用非常精确但不太直观的措辞来描述它。我们可以通过关注底层硬件架构来构建一个更容易掌握的心智模型,即使有点简化。基本上,计算机内存的结构是一个树状的存储层次,叶子是 CPU 寄存器,根部是物理内存芯片上的存储,通常称为主内存。两者之间有几层缓存,层次结构的不同层可以驻扎在不同的硬件上。当一个线程执行存储到一个内存位置时,真正发生的情况是,CPU 开始对特定 CPU 寄存器中的值进行写入请求,然后必须在内存层次结构中向主内存前进。当一个线程执行负载时,请求在层次结构上流动,直到它遇到一个有可用值的层,并从那里返回。问题就出在这里:在所有写入的内存位置的缓存被更新之前,写入的内容在任何地方都是不可见的,但是其他 CPU 可以在同一时间对同一内存位置执行指令,奇怪的事情随之而来。那么,内存排序是一种要求精确语义的方式,即当多个 CPU 访问一个特定的内存位置进行特定操作时,会发生什么。

考虑到这一点,让我们来看看 Ordering 类型,它是我们作为程序员可以规定哪些并发执行是有效的额外约束的主要机制。

Ordering 被定义为一个枚举,其变体在清单 10-1 中显示。

#![allow(unused)]
fn main() {
enum Ordering {
    Relaxed,
    Release,
    Acquire,
    AcqRel,
    SeqCst
}

// 清单 10-1:Ordering 的定义
}

每一种都对从源代码到执行语义的映射施加了不同的限制,我们将在本节的剩余部分依次探讨每一种。

Relaxed Ordering

放松的排序基本上不能保证对值的并发访问,除了访问是原子性的这一事实。特别是,宽松排序对不同线程之间的内存访问的相对顺序没有保证。这是内存排序的最弱形式。清单 10-2 显示了一个简单的程序,其中两个线程使用 Ordering::Relaxed 访问两个原子变量。

#![allow(unused)]
fn main() {
static X: AtomicBool = AtomicBool::new(false);
static Y: AtomicBool = AtomicBool::new(false);
let t1 = spawn(|| {
    let r1 = Y.load(Ordering::Relaxed); // (1)
    X.store(r1, Ordering::Relaxed);     // (2)
});
let t2 = spawn(|| {
    let r2 = X.load(Ordering::Relaxed); // (3)
    Y.store(true, Ordering::Relaxed);   // (4)
});

// 清单 10-2:使用 Ordering::Relaxed 的两个竞争线程
}

看一下作为 t2 产生的线程,你可能会期望 r2 永远不可能为真,因为所有的值都是假的,直到同一个线程在读取 X 之后的那一行将 Y 赋值为真。然而,在宽松的内存排序下,这种结果是完全可能的。原因是,CPU 被允许重新排列所涉及的加载和存储。让我们来看看这里到底发生了什么,使 r2=true 成为可能。

首先,CPU 注意到 4 不必发生在 3 之后,因为 4 不使用 3 的任何输出或副作用,也就是说,43 没有执行依赖性。所以,CPU 决定重新安排它们的顺序,因为 *挥手* 的原因会使你的程序走得更快。因此,CPU 先执行 4,设置 Y=true,尽管 3 还没有运行。然后,t2 被操作系统置入睡眠状态,线程 t1 执行几条指令,或者 t1 只是在另一个核心上执行。在 t1 中,编译器必须先运行 1,然后再运行 2,因为 2 依赖于在 1 中读取的值。因此,t1Y(由 4 写入)中读取真,然后将其写回 X

宽松的内存排序允许这种执行,因为它没有对并发执行施加额外的约束。也就是说,在宽松的内存排序下,编译器必须确保对任何特定线程的执行依赖性得到尊重(就像不涉及原子学一样);它不需要对并发操作的交错做出任何承诺。重排 34 在单线程执行中是允许的,所以在宽松排序下也是允许的。

在某些情况下,这种重新排序的方式是很好的。例如,如果你有一个计数器,它只是记录指标,那么相对于其他指令,它到底什么时候执行并不重要,Ordering::Relaxed 就可以了。在其他情况下,这可能是灾难性的:例如,如果你的程序使用 r2 来计算是否已经设置了安全保护,从而最终错误地认为它们已经设置了。

在编写没有花哨地使用原子学的代码时,你一般不会注意到这种重新排序--CPU 必须保证所写的代码和每个线程实际执行的代码之间没有可观察到的差异,所以一切看起来都是按照你写的顺序运行的。这被称为尊重程序顺序或评估顺序;这些术语是同义词。

获取/释放排序(Acquire/Release Ordering)

在内存排序层次的下一步,我们有 Ordering::AcquireOrdering::ReleaseOrdering::AcqRelAcquireRelease)。在高层次上,它们在一个线程的存储和另一个线程的加载之间建立了执行依赖关系,然后限制了如何对加载和存储进行重新排序的操作。最重要的是,这些依赖关系不仅建立了一个存储和一个单一值的负载之间的关系,而且还对所涉及的线程中的其他负载和存储进行了排序限制。这是因为每次执行都必须尊重程序的顺序;如果线程 B 中的负载与线程 A 中的某个存储有依赖关系(A 中的存储必须在 B 中的负载之前执行),那么在该负载之后 B 中的任何读或写也必须发生在 A 中的存储之后。

注意:Acquire 内存排序只能应用于加载,Release 只能应用于存储,而 AcqRel 只能应用于同时加载和存储的操作(如 fetch_add)。

具体来说,这些内存排序对执行有以下限制:

  1. 使用 Ordering::Release,加载和存储不能向前移动,超过一个存储。
  2. 通过 Ordering::Acquire,加载和存储不能在加载之前被移回。
  3. 一个变量的 Ordering::Acquire 加载必须看到发生在 Ordering::Release 存储之前的所有存储,该存储存储了加载的内容。

为了看看这些内存顺序是如何改变的,清单 10-3 再次显示了清单 10-2,但是将内存顺序换成了获取和释放。

#![allow(unused)]
fn main() {
static X: AtomicBool = AtomicBool::new(false);
static Y: AtomicBool = AtomicBool::new(false);
let t1 = spawn(|| {
    let r1 = Y.load(Ordering::Acquire);
    X.store(r1, Ordering::Release);
});
let t2 = spawn(|| {
    let r2 = X.load(Ordering::Acquire); // (1)
    Y.store(true, Ordering::Release);   // (2)
});

// 清单 10-3:清单 10-2 中的获取/释放内存排序
}

这些额外的限制意味着 t2 不再有可能看到 r2=true。要知道为什么,请考虑清单 10-2 中奇怪结果的主要原因:12 的重新排序。第一个限制,即对有 Ordering::Releasestore 的限制,决定了我们不能把 1 移到 2 下面,所以我们一切都很好!

但这些规则的作用超出了这个简单的例子。例如,想象一下,你实现了一个互斥锁。你想确保一个线程在持有锁的时候所运行的任何加载和存储都只在它实际持有锁的时候执行,并且对后来取得锁的任何线程都是可见的。这正是 ReleaseAcquire 所能做到的。通过执行 Release 存储来释放锁,以及 Acquire 加载来获取锁,你可以保证关键部分的加载和存储不会被移动到锁被实际获取之前或被释放之后!

注意:在一些 CPU 架构上,比如 x86,Acquire/Release 排序是由硬件保证的,使用 Ordering::ReleaseOrdering::Acquire 而不是 Ordering::Relaxed 没有额外的成本。 在其他架构上,情况并非如此,如果你对可以容忍较弱内存排序保证的原子操作切换到 Relaxed,你的程序可能会看到速度的提升。

顺序一致的排序(Sequentially Consistent Ordering)

顺序一致的排序(Ordering::SeqCst)是我们能接触到的最强的内存排序。它的确切保证有点难以确定,但从广义上讲,它不仅要求每个线程看到的结果与 Acquire/Release 一致,而且要求所有线程看到的排序都是一样的。具体来说,Acquire/Release 排序并不能保证,如果两个线程 A 和 B 原子化地加载另外两个线程 X 和 Y 写的值,A 和 B 会看到 X 相对于 Y 写的一致模式。之后,我们将看到顺序一致的排序如何避免这种特殊的意外结果。

#![allow(unused)]
fn main() {
static X: AtomicBool = AtomicBool::new(false);
static Y: AtomicBool = AtomicBool::new(false);
static Z: AtomicI32 = AtomicI32::new(0);
let t1 = spawn(|| {
    X.store(true, Ordering::Release);
});
let t2 = spawn(|| {
    Y.store(true, Ordering::Release);
});
let t3 = spawn(|| {
    while (!X.load(Ordering::Acquire)) {}
    if (Y.load(Ordering::Acquire)) { // (1)
        Z.fetch_add(1, Ordering::Relaxed); }
});
let t4 = spawn(|| {
    while (!Y.load(Ordering::Acquire)) {}
    if (X.load(Ordering::Acquire)) { // (2)
        Z.fetch_add(1, Ordering::Relaxed); }
});

// 清单 10-4:用 `Acquire/Release` 排序的奇怪结果
}

两个线程 t1t2 分别将 XY 设置为真。线程 t3 等待 X 为真;一旦 X 为真,它就检查 Y 是否为真,如果是,就给 Z 加 1。线程 t4 则等待 Y 为真,然后检查 X 是否为真,如果是,就给 Z 加 1。在我告诉你答案之前,考虑到上一节中 ReleaseAcquire 排序的定义,试着用你的方式解决这个问题。

首先,让我们回顾一下 Z 被递增的条件。 如果线程 t3 在观察到 X 是真的之后看到 Y 是真的,那么它就会递增 Z,这只有在 t2t3 评估 1 处的负载之前运行时才会发生。相反,如果线程 t4 在观察到 Y 是真的之后看到 X 是真的,就会增加 Z,所以只有当 t1t4 评估 `2 处的负载之前运行时才会发生。为了简化解释,我们暂且假设每个线程一旦运行就会运行到完成。

从逻辑上讲,如果线程按 1、2、3、4 的顺序运行,Z 可以被递增两次--XY 都被设置为真,然后 t3t4 运行,发现它们递增 Z 的条件被满足。同样地,如果线程按 1、3、2、4 的顺序运行,Z 也只需递增一次。然而,让 Z 为 0 似乎是不可能的:如果我们想阻止 t3 递增 Zt2 必须在 t3 之后运行。由于 t3 只在 t1 之后运行,这意味着 t2t1 之后运行。然而,t4t2 运行后才会运行,所以 t1t4 运行时一定已经运行并将 X 设置为真,所以 t4 会增加 Z

我们无法让 Z 为 0,主要是因为我们人类倾向于线性解释;这个发生了,然后这个发生了,然后这个发生了。计算机没有同样的限制,没有必要把所有的事件都框在一个单一的全局秩序中。在 "释放 "和 "获取 "的规则中,并没有说 t3 必须观察到 t1t2 的执行顺序与 t4 观察到的相同。 就计算机而言,让 t3 观察到 t1 先执行,而让 t4 观察到 t2 先执行,是可以的。考虑到这一点,在一个执行过程中,t3 在观察到 X 是真的之后观察到 Y 是假的(意味着 t2 在 t1 之后运行),而在同一个执行过程中,t4 在观察到 Y 是真的之后观察到 X 是假的(意味着 t2t1 之前运行),这是完全合理的,即使这在我们普通人看来是很过分的。

正如我们前面所讨论的,Acquire/Release 只要求 Ordering::Acquire 对一个变量的加载必须看到发生在 Ordering::Release 存储之前的所有存储,这些存储存储了加载的东西。在刚才讨论的排序中,计算机确实坚持了这一属性:t3 看到了 X == true,并且确实看到了 t1 在设置 X = true 之前的所有存储--没有。它也看到了 Y == false,这是由主线程在程序启动时存储的,所以没有任何相关的存储需要被关注。同样地,t4 看到了 Y = true,也看到了 t2 在设置 Y = true 之前的所有存储--同样,没有任何存储。它还看到了 X == false,它是由主线程存储的,没有前面的存储。没有任何规则被破坏,但这似乎是错误的,不知为何。

我们的直观期望是,我们可以把线程放在一些全局性的顺序中,使每个线程看到的和做的都有意义,但在这个例子中,Acquire/Release 的顺序不是这样的。 为了实现更接近直观期望的东西,我们需要顺序一致性。顺序一致性要求所有参与原子操作的线程进行协调,以确保每个线程观察到的东西对应于(或至少看起来对应于)一些单一的、共同的执行顺序。这使它更容易推理,但也使它成本很高。

标有 Ordering::SeqCst 的原子加载和存储指示编译器采取任何额外的预防措施(例如使用特殊的 CPU 指令)来保证这些加载和存储的顺序一致性。围绕这一点的确切形式是相当复杂的,但顺序一致性基本上可以确保,如果你从所有线程中查看所有相关的 SeqCst 操作,你可以把线程的执行放在某种顺序中,这样被加载和存储的值就会全部匹配起来。

如果我们用 SeqCst 替换清单 10-4 中的所有内存排序参数,那么在所有线程都退出后,Z 不可能是 0,就像我们最初预期的那样。在顺序一致性下,必须可以说 t1 肯定在 t2 之前运行,或者 t2 肯定在 t1 之前运行,所以 t3t4 看到不同顺序的执行是不允许的,因此 Z 不可能为 0。

比较和交换(Compare and Exchange)

除了 loadstore 之外,所有 Rust 的原子类型都提供了一个叫做 compare_exchange 的方法。这个方法是用来原子地和有条件地替换一个值的。你向 compare_exchange 提供你最后观察到的原子变量的值,以及你想用它替换原始值的新值,只有当它仍然与你最后观察到的值相同时,它才会替换该值。要知道为什么这一点很重要,请看清单 10-5 中互斥锁的(破损)实现。这个实现在静态原子变量 LOCK 中记录了锁是否被持有。 我们使用布尔值 true 来表示锁被持有。为了获得锁,线程等待 LOCK 为假,然后再次将其设置为真;然后进入其关键部分,将 LOCK 设置为假,以便在其工作(f)完成后释放锁。

#![allow(unused)]
fn main() {
static LOCK: AtomicBool = AtomicBool::new(false);
fn mutex(f: impl FnOnce()) {
    // Wait for the lock to become free (false).
    while LOCK.load(Ordering::Acquire)
    { /* .. TODO: avoid spinning .. */ }
    // Store the fact that we hold the lock.
    LOCK.store(true, Ordering::Release);
    // Call f while holding the lock.
    f();
    // Release the lock.
    LOCK.store(false, Ordering::Release);
}

// 清单 10-5:互斥锁的不正确实现
}

这基本上是可行的,但它有一个可怕的缺陷--两个线程可能同时看到 LOCK == false,并且都离开了 while 循环。 然后它们都将 LOCK 设置为 true,并且都进入了临界区,这正是互斥函数应该防止的事情!这就是所谓的临界区。

清单 10-5 中的问题是,在我们加载原子变量的当前值和随后更新它之间有一个间隙,在这期间,另一个线程可能会运行并读取或触摸它的值。compare_exchange 解决的正是这个问题--它只在原子变量的值仍然与之前的读取值相匹配的情况下,才会交换其背后的值,否则就会通知你该值已经改变。清单 10-6 显示了使用 compare_exchange 的正确实现。

#![allow(unused)]
fn main() {
static LOCK: AtomicBool = AtomicBool::new(false);
fn mutex(f: impl FnOnce()) {
// Wait for the lock to become free (false).
loop {
    let take = LOCK.compare_exchange(
        false,
        true,
        Ordering::AcqRel,
        Ordering::Relaxed
    );
    match take {
        Ok(false) => break,
        Ok(true) | Err(false) => unreachable!(),
        Err(true) => { /* .. TODO: avoid spinning .. */ }
        }
    } 
    // Call f while holding the lock.
    f();
    // Release the lock.
    LOCK.store(false, Ordering::Release);
}

// 清单 10-6:相互排斥锁的修正实现
}

这一次,我们在循环中使用了 compare_exchange,它同时负责检查当前是否有锁,并存储真值以适当地获取锁。这是通过 compare_exchange 的第一个和第二个参数实现的:在本例中,先是 false,然后是 true。你可以把这个调用理解为 "只有当当前值为 false 时才存储 true"。compare_exchange 方法返回一个 Result,表示该值被成功更新(Ok)或不能被更新(Err)。在任何一种情况下,它都会返回当前值。这对 AtomicBool 来说不是太有用,因为我们知道如果操作失败,值必须是什么,但是对于像 AtomicI32 这样的东西,更新的当前值可以让你快速地重新计算要存储的内容,然后再试一次,而不需要再次加载。

注意:注意,compare_exchange 只检查值是否与作为当前值传入的值相同。如果其他线程修改了原子变量的值,然后再次将其重置为原始值,那么对该变量的 compare_exchange 仍然会成功。这通常被称为 A-B-A 问题。

与简单的加载和存储不同,compare_exchange 需要两个订购参数。第一个是 "成功排序",它决定了在值被成功更新的情况下,compare_exchange 代表的加载和存储应该使用什么内存排序。第二个是 "失败排序",它决定了在加载的值与预期的当前值不一致的情况下,加载的内存排序。这两种排序是分开的,这样开发者就可以给 CPU 留有余地,在适当的时候通过重新排列负载和存储的顺序来提高执行性能,但在成功时仍然可以得到正确的排序。在这种情况下,在锁获取循环的失败迭代中重新排列负载和存储是可以的,但在关键部分内重新排列负载和存储是不可以的,因为它们最终会在关键部分之外。

尽管它的接口很简单,但是 compare_exchange 是一个非常强大的同步原语--以至于理论上已经证明,你可以只用 compare_exchange 来构建所有其他的分布式共识原语。由于这个原因,当你真正深入研究实现细节时,它是许多(如果不是大多数)同步构造的主力。

不过要注意的是,compare_exchange 要求单个 CPU 独占访问底层数值,因此它是硬件层面上的一种互斥形式。这又意味着 compare_exchange 很快就会成为一个可扩展性瓶颈:每次只有一个 CPU 可以取得进展,所以有一部分代码不会随着核心数量的增加而扩展。事实上,情况可能比这更糟糕--CPU 必须进行协调,以确保每次只有一个 CPU 成功地对一个变量进行 compare_exchange(如果你对其工作原理感到好奇,可以看一下 MESI 协议),而这种协调的成本会随着 CPU 数量的增加而呈四级增长。

比较_交换_弱化 (COMPARE_EXCHANGE_WEAK)

细心的读者会注意到 compare_exchange 有一个名字可疑的表弟,compare_exchange_weak,并想知道有什么区别。compare_exchange 的弱变体允许失败,即使原子变量的值仍然与用户传递的预期值一致,而强变体在这种情况下必须成功。

这可能看起来很奇怪--除了值已经改变之外,原子值交换会失败吗?答案是系统架构中没有原生的 compare_exchange 操作。例如,ARM 处理器有锁定加载和条件存储操作,如果相关的锁定加载所读取的值在加载后没有被写入,那么条件存储就会失败。Rust 标准库在 ARM 上实现了 compare_exchange,它在一个循环中调用这对指令,并且只在条件存储成功后返回。这使得清单 10-6 中的代码不必要地低效--我们最终得到了一个嵌套的循环,这需要更多的指令,而且更难优化。由于我们在这种情况下已经有了一个循环,我们可以改用 compare_exchange_weak,删除 Err(false) 上的 unreachable!(),在 ARM 上得到更好的机器代码,在 x86 上得到同样的编译代码!

Fetch 方法

Fetch 方法(fetch_addfetch_subfetch_and 等)被设计为允许更有效地执行交换的原子操作--也就是说,无论执行顺序如何,这些操作都有意义的语义。这样做的动机是,compare_exchange 方法很强大,但也很昂贵--如果两个线程都想更新一个原子变量,一个会成功,而另一个会失败,不得不重试。如果涉及到许多线程,它们都必须调解对底层值的顺序访问,而且在线程失败重试时,会有大量的旋转。

对于那些简单的操作,我们可以告诉 CPU 要对原子变量执行什么操作,而不是因为另一个线程修改了数值而失败和重试。然后,当 CPU 最终获得独占访问权时,它将对当前值执行该操作。想想看,一个 AtomicUsize 可以计算出一个线程池完成的操作数量。如果两个线程同时完成了一项工作,只要它们的增量都被计算在内,哪一个先更新计数器并不重要。

fetch 方法实现了这类交换性操作。它们在一个步骤中执行读取和存储操作,并保证存储操作是在原子变量上执行的,当时它正好持有该方法返回的值。举个例子,AtomicUsize::fetch_add(1, Ordering::Relaxed) 永远不会失败--它总是把 1 加到 AtomicUsize 的当前值上,不管它是什么,并准确地返回 AtomicUsize 的值,当这个线程的 1 被添加时。

fetch 方法往往比 compare_exchange 更有效率,因为当多个线程争夺对一个变量的访问时,它们不需要线程失败和重试。一些硬件架构甚至有专门的 fetch 方法实现,随着参与的 CPU 数量的增加,它的扩展性会更好。 然而,如果有足够多的线程试图对同一个原子变量进行操作,由于需要协调,这些操作会开始变慢,并表现出亚线性的扩展性。一般来说,显著提高并发算法性能的最好方法是将争夺的变量分割成更多的原子变量,而不是从 compare_exchange 切换到 fetch 方法。

注意:fetch_update 方法的名字有点欺骗性--在幕后,它实际上只是一个 compare_exchange_weak 的循环,所以它的性能概况会比其他 fetch 方法更接近 compare_exchange

理智的并发性(Sane Concurrency)

编写正确和高性能的并发代码比编写顺序代码更难;你不仅要考虑可能的执行交错,还要考虑你的代码如何与编译器、CPU 和内存子系统互动。面对如此多的脚步声,你很容易想把你的手扔到空中,完全放弃并发性。在本节中,我们将探讨一些技术和工具,这些技术和工具可以帮助确保你在编写正确的并发代码时没有(那么多)恐惧。

简单开始

生活中的一个事实是,简单、直接、易懂的代码更有可能是正确的。这个原则也适用于并发代码--总是从你能想到的最简单的并发设计开始,然后测量,只有当测量发现有性能问题时,你才应该优化你的算法。

要在实践中遵循这个提示,一开始就要采用不需要复杂地使用原子或大量细粒度锁的并发模式。从运行顺序代码并通过通道进行通信的多个线程开始,或者通过锁进行合作,然后用你关心的工作负载对结果的性能进行基准测试。与实施花哨的无锁算法或将锁分割成一千块以避免错误的共享相比,这样做犯错误的可能性要小得多。对于许多用例来说,这些设计已经足够快了;事实证明,为了使通道和锁表现良好,已经花费了大量的时间和精力如果简单的方法对你的用例来说已经足够快了,为什么还要引入更复杂和容易出错的代码呢?

如果你的基准测试表明有性能问题,那么要弄清楚你的系统到底是哪一部分扩展得不好。在可能的情况下,专注于修复这个瓶颈,并尽可能地通过小的调整来实现。也许把一个锁分成两个,而不是转移到一个并发的哈希表,或者引入另一个线程和一个通道,而不是实现一个无锁的工作窃取队列就足够了。如果是这样,就这么做。

即使你必须直接与原子学和类似的东西打交道,也要保持事情的简单性,直到证明有优化的需要--一开始使用 Ordering::SeqCstcompare_exchange,如果你发现具体的证据表明这些东西正在成为必须处理的瓶颈,再进行迭代。

编写压力测试

作为作者,你对你的代码中可能隐藏的错误有很多洞察力,但不一定知道这些错误是什么(无论如何,还没有)。编写压力测试是一个很好的方法,可以把一些隐藏的 bug 抖出来。压力测试不一定要执行复杂的步骤序列,而是有很多线程并行地进行相对简单的操作。

例如,如果你正在编写一个并发的 HashMap,一个压力测试可能是让 N 个线程插入或更新键,M 个线程读取键,这样,这些 M+N 线程可能经常选择相同的键。这样的测试并不是为了测试一个特定的结果或值,而是试图触发许多可能的操作交错,希望有问题的交错能暴露出来。

压力测试在很多方面都类似于模糊测试;模糊测试会对一个给定的函数产生许多随机输入,而压力测试则会产生许多随机的线程和内存访问时间表。就像模糊测试一样,压力测试也只能和你代码中的断言一样好;它们不能告诉你一个没有以某种容易发现的方式表现出来的错误,比如断言失败或其他类型的恐慌。出于这个原因,在你的低级并发代码中加入断言是个好主意,如果你担心在特别热的循环中的运行时间成本,可以使用 debug_assert_*

使用并发测试工具

编写并发代码的主要挑战是处理不同线程执行的所有可能的交错方式。 正如我们在清单 10-4 中的 Ordering::SeqCst 例子中所看到的,重要的不仅仅是线程调度,还有特定线程在任何特定时间点可能观察到的内存值。 编写执行所有可能的合法执行的测试不仅繁琐而且困难--你需要非常低级的控制,即哪些线程何时执行以及它们的读取返回什么值,操作系统可能不提供这些。

用 Loom 进行模型检查

幸运的是,已经有一个工具可以以 Loom crate 的形式为你简化这种执行探索。考虑到本书和 Rust crate 的相对发布周期,我不会在这里给出如何使用 Loom 的例子,因为当你阅读本书时,这些例子很可能已经过时了,但我将概述它的作用。

Loom 希望你以闭包的形式编写专门的测试用例,并将其传递到 Loom 模型中。该模型会跟踪所有的跨线程交互,并试图通过多次执行测试用例闭包来智能地探索这些交互的所有可能迭代。为了检测和控制线程交互,Loom 为标准库中所有允许线程相互协调的类型提供了替换类型;这包括 std::syncstd::thread 下的大多数类型,以及 UnsafeCell 和其他一些类型。Loom 希望你的应用程序在运行 Loom 测试时能使用这些替换类型。替换类型与 Loom 执行器相连,并执行双重功能:它们作为重新安排的点,以便 Loom 可以在每个可能的线程交互点之后选择下一个要运行的操作,并且它们告知 Loom 要考虑的新的可能的交织方式。从本质上讲,Loom 为每个可能存在多种执行交错的点建立了一个所有可能的未来执行的树,然后尝试一个接一个地执行所有这些执行。

Loom 试图完全探索你提供的测试用例的所有可能的执行情况,这意味着它可以发现只在极其罕见的执行情况下出现的错误,而压力测试在一百年内也不会发现。虽然这对较小的测试用例来说是很好的,但一般来说,将这种严格的测试应用于较大的测试用例是不可行的,这些测试用例涉及更多的操作序列或需要许多线程同时运行。Loom 会花费太多时间来获得代码的适当覆盖。因此,在实践中,你可能想告诉 Loom 只考虑可能执行的子集,Loom 的文档中有更多的细节。

就像压力测试一样,Loom 只能捕捉到表现为恐慌的 bug,所以这又是一个花时间在并发代码中放置战略断言的理由!在很多情况下,甚至值得在并发代码中添加额外的状态跟踪和记账指令,以提供更好的断言。在许多情况下,甚至值得在你的并发代码中添加额外的状态跟踪和记账指令,以提供更好的断言。

使用 ThreadSanitizer 进行运行时检查

对于较大的测试案例,你最好的选择是在谷歌优秀的 ThreadSanitizer(也称为 TSan)下通过几个迭代来运行测试。TSan 通过在每次内存访问之前放置额外的记账指令来自动增强你的代码。然后,当你的代码运行时,这些记账指令会更新并检查一个特殊的状态机,该状态机会标记出任何表明有问题的竞赛条件的并发内存操作。例如,如果线程 B 写入某个原子值 X,但没有与写入 X 的前一个值的线程同步(这里有很多挥手的动作),这表明有写/写竞赛,这几乎总是一个错误。

由于 TSan 只观察你的代码运行,而不像 Loom 那样反复执行,它通常只给你的程序运行时间增加一个恒定因素的开销。虽然这个因素可能很重要(在撰写本文时为 5-15 倍),但它仍然很小,甚至可以在合理的时间内执行最复杂的测试案例。

在写这篇文章的时候,要使用 TSan,你需要使用 Rust 编译器的夜间版本,并传入 -Zsanitizer=thread 命令行参数(或在 RUSTFLAGS 中设置),尽管希望在未来这将成为一个标准的支持选项。其他的净化器也是可用的,它们可以检查诸如越界内存访问、释放后使用、内存泄漏和未初始化内存的读取,你可能也想通过这些来运行你的并发测试套件!

HEISENBUGS

Heisenbugs 是在你试图研究它们时似乎消失的 bug。在试图调试高并发代码时,这种情况经常发生;调试问题的额外工具改变了并发事件的相对时间,可能导致触发错误的执行交错不再发生。

导致并发性错误消失的一个特别常见的原因是使用打印语句,这是迄今为止最常见的调试技术之一。有两个原因导致打印语句对并发性错误有如此大的影响。 第一个,也许是最明显的,是相对而言,打印东西到用户的终端(或标准输出的地方)需要相当长的时间,特别是当你的程序产生大量的输出时。写入终端至少需要往返于操作系统内核以执行写入,但写入也可能需要等待终端本身从进程的输出读入它自己的缓冲区。所有这些额外的时间可能会大大延迟之前与其他线程中的操作竞争的操作,以至于竞争条件消失了。

打印语句干扰并发执行模式的第二个原因是,对标准输出的写入(通常)是由锁来保护的。如果你看一下标准库中的 Stdout 类型,你会发现它持有一个 Mutex,用来保护对输出流的访问。这样做是为了在多个线程试图同时写入时,输出不会出现严重的混乱,如果没有锁,某一行可能会有多个线程写入的字符穿插在一起,但有了锁,线程就会轮流写入。 不幸的是,获取输出锁是另一个线程同步点,而且每个打印线程都会参与其中。这意味着,如果你的代码之前因为两个线程之间缺少同步而被破坏,或者只是因为两个线程之间的特定竞赛是可能的,添加打印语句可能会作为一个副作用来修复这个 bug!

一般来说,当你发现一个看起来像 Heisenbug 的东西时,试着找到其他方法来缩小问题。这可能涉及到使用 Loom 或 TSan,使用 gdb 或 ldb,或使用一个只在最后打印的每线程内存日志。许多日志框架也努力避免在发布日志事件的关键路径上的同步点,所以切换到其中之一可能会使你的生活更轻松。作为额外的奖励,你在修复了某个特定的 bug 后留下的好的日志可能会在以后派上用场。就我个人而言,我是追踪 crate 的忠实粉丝,但也有很多好的选择。

总结

在这一章中,我们首先介绍了并发 Rust 中常见的正确性和性能陷阱,以及一些成功的并发应用程序倾向于使用的高级并发模式来解决这些问题。我们还探讨了异步 Rust 是如何实现无并行的并发的,以及如何在异步 Rust 代码中明确引入并行。然后,我们深入研究了 Rust 的许多不同的底层并发原语,包括它们是如何工作的,它们有什么不同,以及它们都是为了什么。最后,我们探讨了编写更好的并发代码的技术,并研究了 Loom 和 TSan 等工具,它们可以帮助你审查这些代码。在下一章中,我们将继续我们在 Rust 低层的旅程,深入研究外部函数接口,它允许 Rust 代码直接与其他语言编写的代码相连接。