
3.1 Executor和ExecutionContext对象
第2章讨论过,虽然Scala程序中的线程比JVM进程要轻量级很多,但是创建线程的代价仍然比分配对象的大很多,因为创建线程需要用到监控器锁,或需要更新某个容器中的元素。
如果应用程序需要执行大量的微小并发任务,且对吞吐量的要求很高,那么为每个任务都创建一个全新线程的代价就过于大了。
启动一个线程需要为它的调用栈分配一个内存区域,还需要有一个上下文用于线程切换,切换甚至要比执行并发任务本身更耗时。出于这个考虑,大多数并发性框架会提供工具来维护一个线程集合,让这些线程处于待命状态,以随时处理新出现的并发性任务。这种工具一般称为线程池。
为帮助程序员封装并发任务的各种执行策略,Java 开发工具包(Java Development Kit,JDK)提供了一种称为Executor的抽象接口。Executor接口的使用比较简单,它只有一个execute方法,其参数是一个Runnable对象,在execute方法里可以调用这个Runnable对象的run方法。
Executor对象决定了由哪个线程在何时调用Runnable对象的run方法,它可为某次调用专门启动一个新线程,也可以直接在当前调用线程中执行这个Runnable对象。通常情况下,Executor对象会实现为一个线程池,它会从线程池中取出一个线程来执行Runnable对象,该线程与当前调用execute方法的线程并发执行。
JDK 7中新实现了一种Executor,称为ForkJoinPool,位于java.util. concurrent包中。Scala程序在JDK 6中也可以使用ForkJoinPool类,只不过这个类在scala.concurrent.forkjoin包中。
下面的代码片段展示了如何实例化一个 ForkJoinPool 类,然后向其提交一个异步任务。
import scala.concurrent._ import java.util.concurrent.ForkJoinPool object ExecutorsCreate extends App { val executor = new ForkJoinPool executor.execute(new Runnable { def run() = log("This task is run asynchronously.") }) Thread.sleep(500) }
从上面的代码可以看到,需要先导入包 scala.concurrent,后文的示例都会假定这个包已经被导入了。
然后,新建一个ForkJoinPool类的实例,并赋值给executor变量。实例化之后,executor的execute方法可接收一个Runnable对象,其任务是在标准输出中输出字符串。最后,还要调用sleep语句,以防止主线程在Runnable对象执行完run方法之前就结束了。注意,如果使用 SBT 运行此示例,且将 fork 设置为 false,则sleep语句可以省略。
那么,为什么一定需要Executor对象呢?从上面的示例可以看到,Executor对象和Runnable对象是独立的,Executor对象的代码改动不会对Runnable对象造成影响。因此,Executor 对象的作用是将并发计算定义和它的执行方式解耦合,从而,程序员可以集中精力确定哪些代码是可并发执行的,并将它们与调用方式(何时何地)分离开。
ForkJoinPool 类还实现了 Executor 接口的一个更复杂的子类型,称为ExecutorService,它扩展了几个接口方法,其中重要的是 shutdown 方法。shutdown方法用于保证Executor对象能够执行所有提交的任务,然后结束所有的工作线程,从而实现平滑的终止过程。这个方法可以不用显式调用,因为ForkJoinPool在终止线程方面做得比较好,它的线程默认都是后台线程,无须在线程结束时手动关闭。不过,程序员一般还是应该在创建ExecutorService之后调用其shutdown方法,特别是在程序终止之前。
ExecutorService对象的创建者有义务在不再需要此对象时调用其shutdown方法。
为确保所有提交到 ForkJoinPool 对象的任务都已经完成,还需要额外调用awaitTermination方法,并指定最大等待结束的时间。之前的sleep语句可以换成如下写法。
import java.util.concurrent.TimeUnit executor.shutdown() executor.awaitTermination(60, TimeUnit.SECONDS)
scala.concurrent包定义了ExecutionContext特质,和Executor对象的功能类似,只不过它只针对 Scala 程序。在后文中可以看到,很多 Scala 方法都用ExecutionContext对象作为其隐式参数。
ExecutionContext 实现了抽象的 execute 方法,对应了 Executor 接口的execute方法,它还实现了reportFailure方法,参数为一个Throwable对象,此方法在某个任务抛出异常时会被调用。ExecutionContext 中还带了一个默认的执行上下文对象,称为global,其内部使用了ForkJoinPool实例。
object ExecutionContextGlobal extends App { val ectx = ExecutionContext.global ectx.execute(new Runnable { def run() = log("Running on the execution context.") }) Thread.sleep(500) }
在前面的示例中,我们将从ForkJoinPool实例创建一个ExecutionContext对象,该实例的并行度为2。这意味着ForkJoinPool实例通常在其池中保留两个工作线程。
在下面的示例中,我们将依赖于全局ExecutionContext对象。为了使代码更简洁,我们将在本章的 package 对象中引入 execute 便捷方法,该方法在全局ExecutionContext对象上执行代码块。
def execute(body: =>Unit) = ExecutionContext.global.execute( new Runnable { def run() = body } )
Executor和ExecutionContext对象是一个不错的并发编程抽象,但不是灵丹妙药。它们可以通过对不同任务重用同一组线程来提高吞吐量,但是如果这些线程变得不可用,则它们将无法执行任务,因为所有线程都在忙于运行其他任务。在下面的示例中,我们声明32个独立的执行,每个执行持续2 s,并等待10 s完成。
object ExecutionContextSleep extends App { for (i<- 0 until 32) execute { Thread.sleep(2000) log(s"Task $i completed.") } Thread.sleep(10000) }
读者可能希望所有执行都在2 s后终止,但事实并非如此。相反,在具有超线程的四核CPU上,全局ExecutionContext对象在线程池中有8个线程,因此它以8个线程为一批执行工作任务。2 s后,将打印完成的8个任务的批处理,再过2 s后,将打印另一批任务,依此类推。这是因为全局ExecutionContext对象在内部维护着一个由8个工作线程组成的池,并且调用sleep会将它们全部置于定时等待状态。只有完成了这些工作线程中的sleep方法调用之后,才能执行另一批8个任务。情况可能更糟。我们可以启动8个任务,这些任务执行第2章介绍的保护块惯用法,而另一个任务是调用 notify 方法将其唤醒。由于ExecutionContext对象只能同时执行8个任务,因此在这种情况下,工作线程将永远被阻塞。我们说对ExecutionContext对象执行阻止操作可能会导致饥饿。
避免对ExecutionContext和Executor对象执行可能无限期阻塞的操作。
在了解了如何声明并发执行之后,我们将注意力转向通过处理程序数据来实现这些并发执行的交互方式。