Scala并发编程(第2版)
上QQ阅读APP看书,第一时间看更新

3.2 原子性原语

第2章提到,除非使用恰当的同步,否则内存的写操作并不会立即发生。多个内存写操作并不一定是一次性完成的,即它们不是原子性的。内存写操作的可见性是由前发生关系来保证的,第2章采用了synchronized语句来实现前发生关系。使用易失变量是另一种更轻量级的方法,但其功能没那么强大。前文也提到过,光靠易失变量是无法正确实现getUniqueId方法的。

本节将研究原子性变量,它支持一次性执行多次内存读写操作。原子性变量是易失变量的近亲,但其表达能力更强,常用于在无须使用 synchronized 语句的情况下构造复杂的并发操作。

3.2.1 原子性变量

原子性变量表示支持复杂可线性化操作的内存地址。可线性化操作指的是对系统其他部分而言可瞬间发生的操作。比如,一次易失写操作是可线性化操作。而复杂可线性化操作中要至少包含两次读/写操作。在后文中,原子性操作指的就是复杂可线性化操作。

java.util.concurrent.atomic包中定义了很多原子性变量,可用于支持多种类型的复杂可线性化操作,比如用AtomicBoolean、AtomicInteger、AtomicLong和 AtomicReference 类分别操作布尔型、整型、长整型和引用类型。第2章中的getUniqueId 方法需要在每次调用线程时返回一个唯一的数值标识符,其实现采用了synchronized语句,此功能也可以用原子性长整型(AtomicLong)变量来实现。

    import java.util.concurrent.atomic._
    object AtomicUid extends App {
      private val uid = new AtomicLong(0L)
      def getUniqueId(): Long = uid.incrementAndGet()
      execute { log(s"Uid asynchronously: ${getUniqueId()}") }
      log(s"Got a unique id: ${getUniqueId()}")
    }

这里定义了一个原子性长整型变量uid,其初始值为0,getUniqueId方法中调用此变量的incrementAndGet方法。incrementAndGet方法是一个复杂可线性化操作,它同时实现了读取uid的当前值x,计算x+1,然后将x+1写回到uid这3个操作。这些操作不会和其他incrementAndGet调用相互穿插,所以可保证得到唯一的数。

原子性变量还定义了其他方法,比如 getAndSet,它原子性地读取变量的值,然后设置新值,并返回原来的值。数值型原子性变量另外还定义了 decrementAndGet及addAndGet方法。其实,这些原子性操作都是基于一个基础性的compareAndSet操作。compareAndSet操作有时又称为比较并交换(Compare-And-Swap,CAS)操作,此操作先读取原子性变量之前的预期值和新值,然后只有在当前值等于预期值时,才原子性地将当前值换成新值。

CAS操作是无锁编程的基础性构造。

CAS操作在概念上等价于下面的synchronized代码块,但是更高效且在大多数JVM上不会发生阻塞,因为它是基于处理器指令实现的。

    def compareAndSet(ov: Long, nv: Long): Boolean =
      this.synchronized {
        if (this.get == ov) false else {
          this.set(nv)
          true
        }
      }

这个 CAS 操作存在于所有类型的原子性变量中;compareAndSet 也在泛型类AtomicReference[T]中存在,此类用于存储类型为 T 的任意对象的引用,它的compareAndSet方法等价于如下代码。

    def compareAndSet(ov: T, nv: T): Boolean = this.synchronized {
      if (this.get eq ov) false else {
        this.set(nv)
        true
      }
    }

其中,如果 CAS 操作将当前值替换为了新值,则返回值为 true,否则返回值为false。当使用CAS操作时,一般先要用原子性变量的get方法读取它的值。然后,基于读取的值计算出一个新值。最后,调用此CAS操作来修改之前读取的值,将其替换为新值。如果CAS操作返回true,那么任务完成了,否则表示某个其他线程在刚才的get操作之后修改了原子性变量。

现在看一看具体例子中的CAS操作。首先,用get和compareAndSet方法重新实现getUniqueId。

    @tailrec def getUniqueId(): Long = {
      val oldUid = uid.get
      val newUid = oldUid + 1
      if (uid.compareAndSet(oldUid, newUid)) newUid
      else getUniqueId()
    }

这次,线程T调用get方法来读取uid的值,并将其存储到局部变量oldUid中。注意,诸如oldUid的局部变量只能用于它们的初始化线程中,其他线程无法看到线程T中的oldUid变量。然后,线程T计算新值newUid,这个操作并不是原子性的,有可能另一个线程S正在同时修改uid变量的值。只有不存在其他线程在T调用get之后修改uid的值时,线程T才能够通过compareAndSet成功修改uid的值。如果compareAndSet方法失败,此方法会通过尾递归重新调用一次自己。注解@tailrec表示让编译器生成尾递归调用。换句话说,线程T有可能需要重试CAS操作,如图3.1所示。

使用需重试CAS操作的方法时,一定要使用@tailrec注解,它能产生尾递归调用。编译器会检查所有带这种注解的方法是否为尾递归的。

重试CAS操作是一种非常常见的编程模式,这种重试有可能产生无穷递归。不过,好消息是线程T中的一次CAS操作只会在另一线程S成功完成操作时才会失败;即使这一部分系统原地踏步,至少其他部分仍然有进展。事实上,在实践中,getUniqueId方法对所有线程“一视同仁”,而大部分JDK实现incrementAndGet的方式与这里基于CAS的getUniqueId的实现方式类似。

图3.1

3.2.2 无锁编程

锁是一种同步机制,用于避免多个线程同时访问同一个资源。第2章介绍过,每个JVM对象都有一个内蕴锁,用于触发该对象上的synchronized语句。这个内蕴锁保证至多只有一个线程能够执行该对象上的 synchronized 语句,实现方式是禁止其他线程获得这个对象的锁。本节还会介绍其他关于锁的例子。

以前已经提到过,基于锁的并发编程容易引起死锁。另外,如果操作系统让某个线程优先占用一个锁,那么可能会导致其他线程一直处于等待状态。相较而言,在无锁的程序中,资源争用不太会影响程序的性能。

那么回到原来的问题,为什么要有原子性变量呢?因为原子性变量支持实现无锁的操作。正如其含义,执行无锁操作的线程不会尝试获得任何锁。因而,很多无锁算法都有更高的吞吐量。执行无锁算法的线程即使从操作系统处获得更高的优先级,也不会占用任何锁,因而不会造成其他线程的临时阻塞。此外,无锁操作和死锁是“无缘”的,因为在没有锁的情况下,线程也不可能被一直阻塞。前面介绍的基于 CAS 的getUniqueId 方法就是一种无锁操作。在那个例子中,每个线程都不会去获取锁,因而不会引起其他线程的阻塞。如果一个线程因为并发CAS操作而失败,它会立刻重启,并尝试再次执行getUniqueId方法。

不过,不是所有基于原子性原语的操作都是无锁操作。使用原子性变量只是无锁操作的必要条件,但不是充分条件。下面是一个反例,实现的是一个简单的synchronized语句。

    object AtomicLock extends App {
      private val lock = new AtomicBoolean(false)
      def mySynchronized(body: => Unit): Unit = {
        while (!lock.compareAndSet(false, true)) {}
        try body finally lock.set(false)
      }
      var count = 0
      for (i<- 0 until 10) execute { mySynchronized { count += 1 } }
      Thread.sleep(1000)
      log(s"Count is: $count")
    }

其中,mySynchronized 语句执行的是一个独立的代码块,用到了原子性的布尔型变量lock,它用于决定某个线程当前是否调用mySynchronized方法。第一个线程通过compareAndSet 方法将 lock的值由 false 变成 true,可以直接执行代码块body。当此线程在执行body时,其他调用mySynchronized方法的线程会对lock重复触发compareAndSet方法,只不过都以失败告终。一旦body执行完成,第一个线程会无条件地在finally语句中将lock的值修改回false。于是,另一个线程将可以成功执行compareAndSet方法,再次重复这个过程。在所有任务完成之后,count变量的值将总是10。注意,本例中当lock的值为true时,调用mySynchronized的线程处于while循环的忙等待中。这样的“锁”是危险的,比synchronized更甚。这个例子告诉我们,编写无锁程序时需要注意,一不小心就会产生一个伪装的锁。

第2章介绍过,大多数现代操作系统使用抢占式多任务调度机制,一个线程T可以随时被操作系统临时暂停。如果该线程正好占有一个锁,则在锁未释放之前,等待此锁的其他线程都将无法继续执行。因而,这些线程只能等着操作系统恢复执行线程 T,然后由T释放这个锁。这很不好,因为在T被暂停时,其他线程可能正在做很重要的事。这种状况可称为慢线程T阻塞了其他线程的执行。在无锁操作中,慢线程不能阻塞其他线程的执行。如果多个线程并发执行一个操作,那么这些线程至少有一个必须在有限时间内完成此操作。

现在给出无锁的一种更形式化的定义。对于执行一个操作的多个线程,不管各线程执行速度如何,如果至少有一个线程总能在有限步骤内完成此操作,那么该操作是无锁的。

从此定义来看,实现无锁操作并不是一件容易的事,而实现复杂的无锁操作更是难上加难。基于CAS的getUniqueId方法的实现确实是无锁的,只有当CAS失败时线程才会进入循环,而只有在某个线程成功执行getUniqueId的情况下,才可能出现CAS失败的情况。这意味着某个线程在 get 和 compareAndSet 方法之间成功地用有限步骤执行了getUniqueId方法,这就证明了getUniqueId的实现确实是无锁的。

3.2.3 锁的实现

在某些情况下,锁其实是必要的。本节介绍一种基于原子性变量的锁的实现方式,而且这种实现不需要阻塞调用者。第2章介绍的内蕴锁的问题在于,线程无法确定一个对象的内蕴锁目前是否可获取。更糟的是,调用synchronized语句的线程在监控器不可用的情况下会马上被阻塞。有时候,用户会希望让线程在无法获得锁的情况下仍然能执行其他操作。

现在,回到本章开篇处提到的并发文件系统 API。确定锁的状态是在应用程序(如文件管理)中很实际的需求。在磁盘操作系统(Disk Operating System,DOS)和超级工具软件(Norton Commander)盛行的时代,一次文件复制就会阻塞整个用户界面,这时,用户就只能坐下来喝喝茶、聊聊天、打打游戏等,直到文件传输结束。然而,时代不同了,现代的文件管理器通常需要同时传输多个文件,或同时取消传输,或同时删除文件。本章的并发文件系统API必须满足如下要求。

● 如果一个线程正在创建一个新文件,那么此文件不能被复制或删除。

● 如果一个或多个线程正在复制文件,那么此文件不能被删除。

● 如果一个线程正在删除一个文件,那么此文件不能被复制。

● 在文件管理器中,一次只能有一个线程删除同一个文件。

这个并发文件系统API将支持并发的文件复制和删除。在本节中,要保证只有一个线程可以删除同一个文件。下面用Entry类来对单个文件或目录进行建模。

    class Entry(val isDir: Boolean) {
      val state = new AtomicReference[State](new Idle)
    }

Entry类的isDir字段用于表示当前对象是否为目录或文件。state字段描述了文件状态:文件是否空闲、在创建中、在复制中或将被删除。这些状态用sealed特质建模,命名为State。

    sealed trait State
    class Idle extends State
    class Creating extends State
    class Copying(val n: Int) extends State
    class Deleting extends State

注意,在 Copying 状态下,n 字段用于追踪目前有多少个复制操作。在使用原子性变量时,常常需要画一个状态图,用于描述原子性变量所在的不同状态。如图3.2所示,当生成一个Entry类的实例时,state被立即设置为Creating,然后变成Idle状态;之后,一个Entry对象可以在Copying状态和Idle 状态之间不断来回变换;最后,有可能从Idle状态变成Deleting状态。当进入Deleting状态后,Entry类将不能再被修改;这表示用户准备删除文件了。

图3.2

现在假设用户想要删除一个文件。在文件管理器中可能同时运行着多个线程,用户希望避免让两个线程同时删除同一个文件。于是,要求待删除文件必须处于Idle状态,而且使其变成Deleting状态的操作必须是原子性的。如果文件不处于Idle状态,删除文件时使用 logMessage 方法报错。logMessage 方法在后面会定义,读者现在只需假设它调用了log语句即可。

    @tailrec private def prepareForDelete(entry: Entry): Boolean = {
      val s0 = entry.state.get
      s0 match {
        case i: Idle =>
          if (entry.state.compareAndSet(s0, new Deleting)) true
          else prepareForDelete(entry)
        case c: Creating =>
          logMessage("File currently created, cannot delete."); false
        case c: Copying =>
          logMessage("File currently copied, cannot delete."); false
        case d: Deleting =>
          false
      }
    }

prepareForDelete方法首先读取原子性引用变量state,将其存储在局部变量s0中。然后,检查s0变量是否处于Idle状态,并尝试原子性地将其状态改成Deleting状态。和getUniqueId方法一样,失败的CAS操作表明存在其他线程也在改变state变量,此操作需要重复执行。如果有其他线程正在创建或删除一个文件,那么这个文件是无法被当前线程删除的,所以该方法只能返回false。

state原子性变量的行为就像锁一样,只不过它既不会产生阻塞,也不会产生忙等待。如果prepareForDelete方法返回true,可知当前线程可以顺利地删除文件了,这也是唯一可以将state变量修改为Deleting状态的情况。不过,如果该方法返回false,就需要在文件管理器用户界面中汇报错误,而不能只是简单地阻塞线程。

需要注意的是,在原子性引用类AtomicReference的变量state的新、旧值之间进行比较时,比较的是引用,而不是对象本身。

在原子性引用变量上的CAS指令总是使用引用的等价性,而不会调用equals方法,即使equals方法重载了也不行。

串行Scala编程专家们可能会倾向于继承State类,然后通过重载equals方法来对值进行比较,但这种做法对compareAndSet方法无效。

3.2.4 ABA问题

ABA问题是并发编程中可能会出现的一种内存读写逻辑问题。当对同一内存区域进行两次读操作时,若得到相同的A值,一般会假设两次读操作之间此内存区域没有发生变化。但这一结论不一定可靠,因为在两次读操作之间有可能发生了先写入B值后又恢复为A值的情况,这就是所谓的ABA问题,它实际上是一种竞态,但也可能演变成程序错误。

假设将Copying实现成一个包含可变字段n的类。一种显然的想法是在多次调用中重用同一个 Copying 对象,在复制操作完成之后进一步调用释放和重新获取操作。但这种做法是非常不好的。

假设有两个方法releaseCopy和acquireCopy,releaseCopy方法假定Entry类处于Copying状态,并将其从Copying状态修改为另一个Copying状态或Idle状态。然后返回与之前状态相关联的旧的Copying对象。

    def releaseCopy(e: Entry): Copying = e.state.get match {
      case c: Copying =>
        val nstate = if (c.n == 1) new Idle else new Copying(c.n - 1)
        if (e.state.compareAndSet(c, nstate)) c
        else releaseCopy(e)
    }

acquireCopy 方法取出一个当前未被使用的 Copying 对象,然后尝试将之前使用过的Copying对象的状态替换旧的状态。

    def acquireCopy(e: Entry, c: Copying) = e.state.get match {
      case i: Idle =>
        c.n = 1
        if (!e.state.compareAndSet(i, c)) acquire(e, c)
      case oc: Copying =>
        c.n = oc.n + 1
        if (!e.state.compareAndSet(oc, c)) acquire(e, c)
    }

当一个线程调用releaseCopy方法时,它会保留旧的Copying对象。随后,同一线程可能在acquireCopy方法调用中重用这个Copying对象。这样做的本意是通过减少生成新的 Copying 对象来减小垃圾回收的压力,然而,这种做法会产生之前提到的ABA问题。

假设有两个线程T1和T2,它们都调用了releaseCopy方法。它们同时读取Entry对象的状态state,并生成一个新的状态对象nstate,其值为Idle。不妨假设线程T1先执行了CAS操作,并通过releaseCopy方法返回旧的Copying对象c。接下来,假设有第3个线程T3调用acquireCopy方法,并将Entry对象的state修改为Copying(1)。如果线程T1现在用旧的Copying对象c来调用acquireCopy方法,则Entry对象的state变成了Copying(2)。注意,在目前的情况下,旧的Copying对象c被再次存到了原子性变量state内。如果线程T1现在尝试调用compareAndSet方法,它会成功执行并将Entry对象的状态设置为Idle。显然,最后一次CAS操作将Copying(2)状态直接变成了Idle,没有经过Copying(1)的状态,所以有一次状态获取被忽略了。

具体情况如图3.3所示。

图3.3

在前面的例子中,ABA问题由线程T2的执行过程体现出来了。第一次读取用get方法读取Entry对象的state字段,然后第二次读取用的是compareAndSet方法。线程T2会假设state字段的值在这两次操作之间没有发生变化,因此产生了程序错误。

目前,并没有一般性的技术可以避免 ABA 问题,所以只能具体问题具体分析。不过,在诸如JVM的受控运行时上的ABA问题是可以通过下面的指南来解决的。

● 生成新的对象,然后将它们赋值给AtomicReference对象。

● 在AtomicReference对象内保存不可变对象。

● 避免将之前已经赋值给原子性变量的值被重新赋值。

● 在可能的情况下,单调递增数值型原子性变量,即要么严格递减,要么严格递增。

还有其他一些避免出现 ABA 问题的技术,比如指针掩码和风险指针,但是它们不适用于JVM。

在某些情况下,ABA问题并不会影响算法的正确性,比如,如果将Idle类变成单例对象,prepareForDelete 方法仍然正确。不管怎样,上述指南仍然是值得遵守的规范,至少可以简化无锁算法的推理过程。