
2.2 监控器和同步
本节将会详细探讨如何用 synchronized 语句进行线程间通信。如前文所述,synchronized语句既保证了不同线程的写操作的可见性,还限制了共享内存的并发访问。一般来说,对共享资源的访问进行限制的同步机制称为锁。锁还被用于保证没有两个线程同时执行同一段代码,即这两个线程对这段代码的执行是互斥的。
前文提到过,JVM上的每个对象都有一个特殊的内置监控器锁,也称为内蕴锁。当一个线程访问对象x上的synchronized语句时,若没有其他线程拥有x上的监控器,那么它就会获得此监控器。否则,该线程会等待监控器被释放。在获得监控器的同时,该线程也能看到释放该监控器的前一线程的所有内存写操作。
synchronized 语句的一个自然而然的性质是它可以嵌套。一个线程可以同时拥有属于不同对象的多个监控器。这对基于简单组件的大型系统而言是很有用的。用户并不能提前预知不同软件组件使用了哪些监控器。比如设计一个记录资金流水的在线银行系统,在系统中维护一个资金流水链表,用可变ArrayBuffer来实现。这个银行系统不会直接修改流水,但是会用一个logTransfer方法来添加新消息,此方法的调用与资金变化同步。ArrayBuffer 是针对单线程而设计的容器,所以需要对它进行并发写保护。下面的代码定义了一个logTransfer方法。
object SynchronizedNesting extends App { import scala.collection._ private val transfers = mutable.ArrayBuffer[String]() def logTransfer(name: String, n: Int) = transfers.synchronized { transfers += s"transfer to account '$name' = $n" } }
银行系统中除了日志模块之外,还用Account类来表示账户。Account对象中保存了账户所有者name和资金数目money。为了往账户中存钱,系统使用add方法来获得Account对象的监控器,并修改其money字段。银行的业务流程要求对大宗交易进行特殊处理,即如果转账数目超过10个货币单元,就需要记录日志。下面的代码定义了Account类及其add方法,add方法用于在Account对象上添加数目为n的货币单元。
class Account(val name: String, var money: Int) def add(account: Account, n: Int) = account.synchronized { account.money += n if (n > 10) logTransfer(account.name, n) }
add方法在synchronized语句内部调用logTransfer,而logTransfer会首先获得 transfers 的监控器。值得注意的是,这个过程不需要释放account 的监控器。如果transfers的监控器由另一个线程所有,当前线程将进入阻塞状态,而且它不会释放之前获得的监控器。
下面的示例中,应用程序创建两个独立的账户,由3个线程执行转账操作。一旦所有线程完成转账操作,主线程就将在日志中记录。
// 银行系统示例代码(续) val jane = new Account("Jane", 100) val john = new Account("John", 200) val t1 = thread { add(jane, 5) } val t2 = thread { add(john, 50) } val t3 = thread { add(jane, 70) } t1.join(); t2.join(); t3.join() log(s"--- transfers ---\n$transfers")
此例中的synchronized语句避免了线程t1和t3并发修改Jane的账户。线程t2和t3还会访问transfers日志。这个简单的例子表明了嵌套的好处,因为用户并不知道银行系统中还有哪些组件可能使用了transfers日志。为了封装代码并提高代码的可重用性,独立的软件组件不应该显式地对银行转账日志操作进行同步;相反,同步应该隐藏在logTransfer方法内。
2.2.1 死锁
在上述银行系统的示例中,一个比较好的地方是 logTransfer 方法绝不会尝试获取transfers的监控器之外的其他锁。一旦获得了监控器,线程就开始修改transfers日志,然后释放监控器;在这种嵌套锁的栈中,transfers总是最后出现。由于logTransfer方法是唯一对transfers进行同步的方法,因此它在同步transfers时不会无限阻塞其他线程。
死锁是一种经常出现的情况,两个或多个执行过程互相等待对方完成各自的操作。等待的原因在于每个执行过程都获得了某个资源的唯一访问权,而其他执行过程又恰好需要对方占有的那个资源。以日常生活为例,假设两位同事坐在咖啡馆中开始吃午餐(需要同时使用刀和叉子),一个同事拿叉子,另一个同事拿刀。双方都在等对方吃完饭,但又不交出自己的餐具,于是陷入了死锁,两个人都无法吃完午餐。至少,在领导来之前这个问题是无解的。
在并发编程中,两个线程同时获得两个不同的监控器,然后尝试获得对方的监控器时,死锁就发生了。双方都不释放自己的监控器,于是这两个线程进入阻塞状态,直到其中一个监控器被释放。
使用logTransfer方法绝不会造成死锁,因为多个线程在处理多个账户时也只会尝试获得同一个监控器,而这个监控器终究是会被释放掉的。现在,扩展介绍前面的银行系统示例,支持两个账户之间转账,代码如下。
object SynchronizedDeadlock extends App { import SynchronizedNesting.Account def send(a: Account, b: Account, n: Int) = a.synchronized { b.synchronized { a.money -= n b.money += n } } }
这里从前文的示例中导入了Account类。send方法是原子性的,它将数目为n的钱从账户 a 转给账户 b。要实现这一点,需要同时在两个账户上触发 synchronized语句,以确保没有其他线程可以并发地修改其中任意一个账户。代码如下所示。
val a = new Account("Jack", 1000) val b = new Account("Jill", 2000) val t1 = thread { for (i<- 0 until 100) send(a, b, 1) } val t2 = thread { for (i<- 0 until 100) send(b, a, 1) } t1.join(); t2.join() log(s"a = ${a.money}, b = ${b.money}")
现在,假设有两个新的银行客户 Jack和 Jill,他们在开户之后很喜欢新的电子银行平台,于是登录之后互相转账小笔金额进行测试,他们“狂按”了100次转账键。很快,问题就来了。线程t1和t2分别执行Jack和Jill的请求,同时触发send方法,只不过转账的方向是反的。比如线程t1锁住账户a,而t2锁住账户b,但都不能锁住对方的账户。让Jack和Jill惊讶的是,新的转账系统并没有看上去那么美好。读者如果运行这个示例,也只能以关闭终端而告终,然后重启SBT。
当两个或多个线程获得资源控制权,在不释放自己的资源的同时却又循环申请对方的资源时,死锁就发生了。
那么,如何防止死锁发生呢?回忆一下,在银行系统的最初版本中,申请监控器的顺序是良定义的。单个账户的监控器被一个线程获得之后,transfers 的监控器才有可能被其他线程获得。有理由相信,只要资源的访问存在确定的顺序,就不会有发生死锁的风险。在线程S获得资源X之后,线程T要想访问X就只能等待,而此时S绝不会尝试访问T已经获得的任何资源Y,因为Y < X,所以S只会尝试获取资源Z(Z > X)。资源之间的访问顺序打破了潜在的死循环,这是避免死锁的必要条件。
因此,需要在所有资源之间建立一个全序,这可以保证不会出现几个线程循环互相等待其他线程已经获得的资源的情况。
在上面的例子中,同样需要在不同账户之间建立顺序。一种方法是使用之前定义的getUniqueId方法。
import SynchronizedProtectedUid.getUniqueId class Account(val name: String, var money: Int) { val uid = getUniqueId() }
这个新定义的Account类保证了没有两个账户拥有同样的uid字段,不论账户是哪个线程创建的。下面的send方法就是根据uid字段的顺序来申请资源的,这样就可以避免造成死锁。
def send(a1: Account, a2: Account, n: Int) { def adjust() { a1.money -= n a2.money += n } if (a1.uid < a2.uid) a1.synchronized { a2.synchronized { adjust() } } else a2.synchronized { a1.synchronized { adjust() } } }
经过银行软件工程师的快速改进之后,Jack 和 Jill 又可以开心地互相转账了,阻塞的线程循环再也没有出现。在任何并发系统中,只要多个线程不释放自己已经获得的资源却又无限等待其他资源,就不可避免会造成死锁。不过,虽然死锁需要尽量避免,但是死锁并没有想象中那么可怕。从死锁的定义来看,值得安慰的是,出现死锁的系统不会再进一步执行了。开发者可以通过保存运行中的JVM实例的堆数据,分析线程的栈,然后快速解决Jack和Jill的问题;至少,死锁问题是很容易发现的,即使是在生产环境中也是如此。但竞态条件下的错误就不一样了,系统运行很长时间之后,其影响才会逐渐显现出来。
2.2.2 保护块
创建新线程的代价比创建 Account 之类的轻量级对象要大得多。高性能的银行系统要求能够快速响应,若对每个请求都创建新线程会拖慢系统的运行速度,特别在需要1 s内同时处理数千个请求时更是如此。同一线程应该能够被多个请求重用,这种可重用的线程的集合通常称为线程池。
在下列示例中,将定义一种特殊的称为工作(worker)线程的特殊线程,它将响应其他线程的请求,执行一个代码块。这里使用Scala标准库collection包中的可变类Queue来存储被调度的代码块。
import scala.collection._ object SynchronizedBadPool extends App { private val tasks = mutable.Queue[() => Unit]()
这里的代码块被表示为() => Unit类型的函数。worker线程会反复执行poll方法,它会对tasks进行同步,以检查队列是否为空。从poll方法的定义可以看出,synchronized语句也可以有返回值。在本例中,若还有任务未完成,就返回一个Some类型的可选值,否则返回None。Some对象包含了待执行的代码块。
val worker = new Thread { def poll(): Option[() => Unit] = tasks.synchronized { if (tasks.nonEmpty) Some(tasks.dequeue()) else None } override def run() = while (true) poll() match { case Some(task) => task() case None => } } worker.setName("Worker") worker.setDaemon(true) worker.start()
在上面的代码中,worker线程在启动之前被设置为守护线程。一般而言,JVM进程并不会在主线程终止时结束,而是会等所有守护线程全部结束。当 asynchronous方法向tasks中发送任务后,worker线程会执行tasks中未完成的代码块,所以要将worker线程设置为守护线程。
def asynchronous(body: => Unit) = tasks.synchronized { tasks.enqueue(() => body) } asynchronous { log("Hello") } asynchronous { log(" world!")} Thread.sleep(5000)
执行上面的示例,可以看到worker线程会输出Hello和world!。同时读者可以听一听自己计算机的风扇的声音,这会儿应该开始响一会儿了。打开Windows操作系统的任务管理器,或在UNIX操作系统的终端中执行top命令。可以发现一个CPU几乎被一个java进程占满了。原因应该很清楚了,等到worker线程完成任务,它会继续检查队列中是否有任务。我们称worker线程这样的状态为忙等待。忙等待是不必要的,因为这会无止境地占用处理器资源。不过,主线程结束时这些守护进程难道不应该也终止吗?一般情况下是这样的,但是本示例是在SBT所在的JVM进程中执行的,而SBT本身并未终止。而且SBT也有自己的非守护进程,所以这里的worker线程不会结束。为了让SBT在新进程中执行run命令,输入下列命令。
set fork := true
再次执行上述示例,这次主线程结束时worker线程也会跟着结束。但是忙等待的问题仍然存在,因为在大型系统中主线程不会很快结束。重复创建新线程是比较浪费资源的,而忙等待线程只会更浪费资源。只需几个这样的线程就能很快降低系统性能。忙等待只在极少数情况下是合理的,如果读者还是不确定它是否一定很危险,可以在自己的笔记本上执行上述示例,然后观察一下电池的耗尽速度。在这么做之前,记得保存好正在编辑中的文件,因为突然断电会导致数据丢失。
worker 线程更好的一种状态是休眠状态,类似于调用 join 之后线程的状态。worker只会在tasks队列不为空时才需要被唤醒。
Scala对象(以及一般的JVM对象)支持两个特殊的方法,称为wait和notify,它们分别用于让线程休眠和唤醒休眠线程。当前线程只有拥有对象x的监控器,才允许执行x的这两个方法。换句话说,当线程T调用某对象的wait方法时,它会释放x的监控器,然后进入休眠状态,直到另一线程S调用同一对象的notify方法。线程S通常用于为 T 准备数据。如下面的示例,主线程传递 Some 类型的消息,然后 greeter线程将其输出。
object SynchronizedGuardedBlocks extends App { val lock = new AnyRef var message: Option[String] = None val greeter = thread { lock.synchronized { while (message == None) lock.wait() log(message.get) } } lock.synchronized { message = Some("Hello!") lock.notify() } greeter.join() }
上面的代码中出现了一种新的 AnyRef 类型的锁 lock(映射到 java.lang. Object类),线程使用这个锁的监控器。线程greeter首先会申请获得这个锁的监控器,并检查message是否被设置为None。如果为None,则什么也不需要输出,然后线程greeter会调用lock上的wait方法,此时lock的监控器被释放。而主线程(之前在synchronized语句中被阻塞)则获得lock的监控器的所有权,它会设置message的值,然后调用notify方法。当主线程离开synchronized代码块时,它会释放lock。这会导致greeter被唤醒、获得lock,并检查是否又有消息了,如果有的话就输出。因为greeter尝试获得的监控器就是主线程之前释放掉的,主线程对message的设置发生在greeter线程查看消息之前。于是,可以看出线程greeter将会看到主线程设置的消息。在此例中,无论哪个线程先执行synchronized代码块,线程greeter都将输出Hello!。
wait的一个重要性质是它会引起虚假唤醒。有时候,JVM允许在没有调用notify的情况下唤醒一个执行了wait的休眠线程。为了防止出现这种情况,需要用一个while循环反复检查状态,然后结合 wait 使用,如上面代码所示。使用一个 if 语句是不够的,因为即使message的值为None,一个虚假唤醒也将允许线程执行message.get。
当线程发现满足唤醒条件时,它会获得监控器的所有权,这样就可以保证检查操作的原子性。注意,检查条件的那个线程必须获得监控器才能被唤醒。如果没有立即得到监控器,它会进入阻塞状态。
若synchronized语句在调用wait之前反复检查条件,那这个synchronized语句称为保护块。下面,就可以用保护块来提前避免Worker线程进入忙等待状态。使用监控器的Worker线程的完整代码如下所示。
object SynchronizedPool extends App { private val tasks = mutable.Queue[() => Unit]() object Worker extends Thread { setDaemon(true) def poll() = tasks.synchronized { while (tasks.isEmpty) tasks.wait() tasks.dequeue() } override def run() = while (true) { val task = poll() task() } } Worker.start() def asynchronous(body: =>Unit) = tasks.synchronized { tasks.enqueue(() => body) tasks.notify() } asynchronous { log("Hello ") } asynchronous { log("World!") } Thread.sleep(500) }
在上面的示例中,声明的 Worker 线程是应用程序中的一个单例对象。和之前不一样的是,poll方法在tasks对象上调用了wait,然后一直等到主线程在asynchronous方法中往tasks中加了一个代码块,并调用notify。执行这个示例,再看一看CPU使用情况。如果在执行忙等待的示例后被迫重启了 SBT(假设现在电池还有电),则可以看到Java进程的CPU使用量是0。
2.2.3 线程中断和平滑关闭
在上一个示例中,Worker线程在它的run方法中无穷循环,永不终止。读者可能会不以为意,反正Worker在休眠时并没有使用CPU,而且Worker是一个守护线程,总会在程序退出时结束。
不过,守护线程的栈空间在程序退出之前都会一直存在。如果休眠线程太多,内存就会被用完。结束休眠线程的一种方法是将它中断,代码如下所示。
Worker.interrupt()
当一个线程等待或计时等待时,调用其 interrupt 方法会抛出一个 InterruptedExption异常。此异常可以被捕获和处理,但在这里,它的作用是终止线程Worker。不过,如果对运行中的线程调用这个方法,这个异常就不会产生,而是设置线程的interrupt 标志。对于不阻塞的线程必须周期性地用 isInterrupted 方法查询interrupt标志。
另一种结束线程的方法称为平滑关闭。在平滑关闭中,一个线程设置终止条件,然后调用notify来唤醒工作线程。然后,工作线程会释放它所有的资源,并顺利地结束。定义一个称为terminated的变量,如果其值为true,就需要结束线程。在等待tasks之前,poll方法会额外地检查此变量,如果Worker线程应该继续运行,poll方法会选择性地返回一个任务。代码如下所示。
object Worker extends Thread { var terminated = false def poll(): Option[() => Unit] = tasks.synchronized { while (tasks.isEmpty && !terminated) tasks.wait() if (!terminated) Some(tasks.dequeue()) else None } }
下面重新定义run方法,它会在模式匹配中检查poll方法是否返回Some(task)。在此run方法中,不再使用while循环,而是在poll返回Some(task)时使用尾递归调用run方法。
import scala.annotation.tailrec @tailrec override def run() = poll() match { case Some(task) => task(); run() case None => } def shutdown() = tasks.synchronized { terminated = true tasks.notify() }
然后,主线程就可以在Worker线程上调用同步化的shutdown方法,从而发送终止线程的请求。这里不再需要将Worker线程设置为守护线程,Worker总是会自己结束运行的。
为了确保这些工具线程能够不进入竞态条件,并正确地结束,可以使用平滑关闭的思想。
如果出现无法用notify唤醒线程的情况,则应该使用interrupt方法,而不用平滑关闭这一方法。比如,线程在一个InterruptibleChannel对象上阻塞I/O,这时,被此线程调用wait方法的那个对象是隐藏的。
Thread类还定义一个 stop方法,这个方法不推荐使用,它会立刻终止线程,并抛出ThreadDeath异常。用户要避免使用这个方法,因为它会中断线程在任意点的运行,容易让程序数据处于不一致的状态。