Hadoop构建数据仓库实践
上QQ阅读APP看书,第一时间看更新

3.3 Hadoop基本组件

如图3-2所示,Hadoop实际是由三个不同的组件构成:

图3-2 Hadoop基本组件

● HDFS:Hadoop分布式文件系统。

● YARN:一个资源调度框架。

● MapReduce:一个分布式处理框架。

程序员可以联合使用这三个组件构建分布式系统。

3.3.1 HDFS

HDFS是一个运行在通用硬件设备之上的分布式文件系统。HDFS是高度容错的,在廉价的硬件上部署。HDFS提供以高吞吐量访问应用数据的能力,非常适合拥有大数据集的应用。HDFS放宽了一些POSIX的需求,允许对文件系统数据的流式访问。HDFS源自为Apache Nutch Web搜索引擎项目建立的框架,是Apache Hadoop的核心项目。

1. HDFS的目标

● 硬件容错。HDFS假定发生硬件故障是一个常态。硬件损坏的情况通常比预想出现的更加频繁。一个HDFS实例可能由成百上千的服务器组成,每个机器上存储文件系统的部分数据。事实上一个HDFS包含有大量的硬件组件,而在如此之多的硬件中,出现问题的概率就非常大了,也可以说,HDFS中总会有部分组件处于不可用状态。因此,检测硬件错误并从有问题的硬件快速自动恢复,就成为HDFS架构的核心目标。

● 流式数据访问。运行在HDFS上的应用程序需要流式访问它们的数据集。简单地说,流式访问就是对数据边读取边处理,而不是将整个数据集读取完成后再开始处理。这与运行在典型普通文件系统上的程序不同。HDFS被设计成更适合批处理操作,而不是让用户交互式地使用。它强调的是数据访问的吞吐量而不是低延时。POSIX的许多硬性要求并不适合HDFS上的应用程序,因为POSIX的某些关键语义影响了数据吞吐量的提升。

● 支持大数据集。部署在HDFS上的应用要处理很大的数据集。HDFS中一个典型文件的大小是几GB到几TB。HDFS需要支持大文件,它应该提供很大的数据带宽,能够在单一集群中扩展几百甚至数千个节点,并且一个HDFS实例应该能够支持几千万个文件。

● 简单的一致性模型。HDFS应用程序访问文件是一次写多次读模式。文件一旦被创建,对该文件只能执行追加或彻底清除操作。追加的内容只能写到文件尾部,而文件中已有的任何内容都不能被更新。这些设定简化了数据一致性问题并能使数据访问的吞吐量更高。MapReduce或Web爬虫应用都适合于这种模型。

● 移动计算而不是移动数据。一个应用的计算请求,在它所操作的数据附近执行时效率会更高,尤其是在数据集非常大的情况下更是如此。此时网络的竞争最小,系统整体的吞吐量会得到提高。通常,将计算移动到临近数据的位置,比把数据移动到应用运行的位置要好。HDFS为应用程序提供接口,把计算移动到数据所在位置。

● 便捷访问异构的软硬件平台。HDFS能够很容易地从一个平台迁移到另一个,这种便利性使HDFS为大量应用程序所采用。

2. HDFS架构

如图3-3所示,HDFS是主/从架构。一个HDFS集群有一个NameNode进程,它负责管理文件系统的命名空间,这里所说的命名空间是指一种层次化的文件组织形式。NameNode进程控制被客户端访问的文件,运行NameNode进程的节点是HDFS的主节点。HDFS还有许多DataNode进程,通常集群中除NameNode外的每个节点都运行一个DataNode进程,它管理所在节点上的存储。运行DataNode进程的节点是HDFS的从节点,又称工作节点。HDFS维护一个文件系统命名空间,并允许将用户数据存储到文件中。在系统内部,一个文件被分成多个数据块,这些数据块实际被存储到DataNode所在节点上。NameNode不仅执行文件系统命名空间上的打开文件、关闭文件、文件和目录重命名等操作,还要维护数据块到DataNode节点的映射关系。DataNode不仅负责响应文件系统客户端的读写请求,还依照NameNode下达的指令执行数据块的创建、删除和复制等操作。

图3-3 HDFS架构

NameNode和DataNode进程运行在通用的机器上,这些机器通常安装Linux操作系统。HDFS是用Java语言开发的,任何支持Java的机器都可以运行NameNode或DataNode进程。使用平台无关的Java语言,意味着HDFS可以部署在大范围的主机上。典型的部署是一台专用服务器作为主节点,只运行NameNode进程。集群中的其他机器作为从节点,每个上面运行一个DataNode进程。一台主机上不能同时运行多个DataNode进程。

集群中NameNode的存在极大地简化了系统架构。NameNode所在的主节点是HDFS的仲裁人和所有元数据的知识库。这样的系统设计下,用户数据永远不会存储在主节点上。

HDFS支持传统的层次形文件组织。用户或应用可以创建目录,也可以在目录中存储文件。HDFS命名空间的层次结构与其他文件系统类似,能执行创建、删除文件,把一个目录中的文件移动到另外的目录中,修改文件名称的操作。HDFS支持配置用户配额和访问权限,但不支持软连接和硬连接。命名空间及其属性的任何变化都被NameNode所记录。应用可以指定一个HDFS文件的副本数。文件的副本数被称为该文件的复制因子,这个信息被NameNode存储。

3.数据复制

HDFS可以保证集群中文件存储的可靠性。它把文件分解成一个由数据块构成的序列,每个数据块有多个副本,这种数据冗余对容错非常关键。当一个数据块损坏时,不会造成数据丢失。数据块的大小和复制因子对每个文件都是可配的。

一般情况下,HDFS中一个文件的所有数据块,除最后一个块外,都有同样的大小。但是,HDFS支持变长的数据块,就是说一个文件有可能包含两种大小的数据块。当用户重新配置了文件的块大小,然后向该文件中追加数据,这时HDFS不会填充文件的最后一个块,而是用新的尺寸创建新块存储追加的数据,这种情况下文件中就会同时存在两种大小的块。

应用可以指定一个文件的副本数,即复制因子。可以在文件创建时指定复制因子,这个复制因子的配置以后是可以改变的。除了追加和清除操作外,HDFS中的文件在任何时候都是严格地一次写入。

NameNode做出的所有操作,都会考虑数据块的复制。它周期性地接收集群中每个DataNode发出的心跳和块报告。接收到心跳说明DataNode工作正常。块报告包含该DataNode节点上所有数据块的列表。

HDFS使用所谓的“机架感知”策略放置数据块副本。这是一个需要进行大量实验并不断调整的特性,也是HDFS与其他分布式文件系统的主要区别。机架感知的目的是要提升数据可靠性、可用性和网络带宽的利用率。当前HDFS版本的实现只是实施副本放置策略的第一步,主要是为了验证该策略在生产系统上的有效性,同时收集更多的行为信息,以供继续研究和测试更好的策略。

在此简单说一下可靠性与可用性的区别。可靠性是指系统可以无故障地持续运行,而可用性指的是系统在任何给定的时刻都能工作。例如,如果系统每月崩溃1分钟,那么它的可用性是99.998%,但是它还是非常不可靠的。与之相反,如果一个系统从来不崩溃,但是每年要停机两星期,那么它是高度可靠的,但是可用性只有96%。

一个大型HDFS集群中会包含很多计算机,这些机器分布于多个机架上。位于不同机架上的两个节点通过网络交换机进行通信。大多数情况下,同一个机架上机器间的网络带宽会高于不同机架上的机器。

NameNode通过Hadoop机架感知策略确定每个DataNode所属的机架ID。一种简单的策略是在每个机架上放置一份数据块的副本,这种设计即使在整个一个机架(甚至多个机架)失效的情况下,也能防止数据丢失。该策略还有一个优点是,可以利用多个机架的带宽读取数据。将数据副本平均分布于集群的所有机架中,当集群中的一个组件(节点、机架等)失效时,重新负载均衡也很简单。但是很显然,写入数据时需要把一个数据块传输到每一个机架,这样做的写入成本太高了。

在一个复制因子为3的普通场景中,HDFS把数据块的第一个副本放置在本地机架的一个节点上,另一个副本放置在本地机架的另外一个节点上,最后一个副本放置在另外一个机架的节点上。这样只写了两个机架,节省了一个机架的写入流量,提升了写入性能。该策略的前提是认可这样一种假设:机架失效的可能性比机器失效的可能性小得多。因此这种策略并不会影响数据的可靠性和可用性。然而它却减少了读取数据的整体带宽,因为此时只能利用两个机架的带宽而不是三个。使用这种策略,一个文件的副本不是平均分布于所有机架,三分之一在同一个节点,三分之二在同一个机架,剩下的三分之一分布在其他机架上。该策略提升了写的性能,同时没有损害数据可靠性或读的性能。

如果复制因子大于3,第4个及其后面的副本被随机放置,但每个机架的副本数量要低于上限值,上限值的计算公式是:((副本数 -1)/(机架数 + 2))取整。

由于NameNode不允许一个DataNode上存在一个数据块的多份副本,因此一个数据块的最大副本数就是当时DataNode节点的个数。

在HDFS支持选择存储类型和存储策略后,NameNode实施策略时除了依照上面描述的机架感知外,还考虑到放置副本的其他问题。NameNode首先按机架感知策略选择存储节点,然后检查该候选节点是否满足文件的存储需求。如果候选节点不支持文件的存储类型,NameNode就会去寻找其他节点。如果在第一条查找路径上没有找到足够的节点来存放副本,那么NameNode会再选择第二条路径继续查找可用于存储该文件类型的节点。当前默认的副本放置策略就是这样工作的。

为了使全局的带宽消耗和读延迟降到最小,在选择副本时,HDFS总是选择距离读请求最近的存储节点。如果在读请求所在节点的同一个机架上有需要的数据副本,则HDFS尽量选择它来满足读请求。如果HDFS集群跨越多个数据中心,那么存储在本地数据中心的副本会优先于远程副本被选择。

当Hadoop的NameNode节点启动时,会进入一种称为安全模式的特殊状态。NameNode处于安全模式时不会进行数据块的复制操作。此时NameNode会接收来自DataNode的心跳和块报告消息。块报告中包含该DataNode节点所保存的数据块列表,每个数据块有一个特定的最小副本数。NameNode检测到一个数据块达到了最小副本数时,就认为该数据块是复制安全的。当检测到的复制安全的数据块达到一定比例(由dfs.safemode.threshold.pct参数指定)30秒后,NameNode退出安全模式。然后NameNode会确定一个没有达到最小副本条件的数据块列表,并将这些数据块复制到其他DataNode节点,直至达到最小副本数。

4.文件系统元数据持久化

HDFS命名空间的元数据由NameNode负责存储。NameNode使用一个叫做EditLog的事务日志持久化记录文件系统元数据的每次变化。例如,在HDFS中创建一个新文件,NameNode就会向EditLog中插入一条记录标识这个操作。同样,改变文件的复制因子也会向EditLog中插入一条记录。NameNode使用本地主机上的一个操作系统文件存储EditLog。整个文件系统的命名空间,包括数据块和文件的映射关系、文件系统属性等,存储在一个叫做FsImage的文件中。FsImage也是一个NameNode节点的本地操作系统文件。

NameNode在内存中保留一份完整的文件系统命名空间映像,其中包括文件和数据块的映射关系。启动或者达到配置的阈值触发了检查点时,NameNode把FsImage和EditLog从磁盘读取到内存,对内存中的FsImage应用EditLog里的事务,并将新版本的FsImage写回磁盘,然后清除老的EditLog事务条目,因为它们已经持久化到FsImage了。这个过程叫做检查点。检查点的目的是确认HDFS有一个文件系统元数据的一致性视图,这是通过建立一个文件系统元数据的快照并保存到FsImage实现的。尽管可以高效读取FsImage,但把每次FsImage的改变直接写到磁盘的效率是很低的。替代做法是将每次的变更持久化到Editlog中,在检查点期间再把FsImage刷新到磁盘。检查点有两种触发机制,按以秒为单位的时间间隔(dfs.namenode.checkpoint.period)触发,或者达到文件系统累加的事务值(dfs.namenode.checkpoint.txns)时触发。如果两个参数都设置,两种条件都会触发检查点。熟悉数据库的读者对检查点这一概念一定不会陌生,NameNode的FsImage和EditLog,其作用与关系数据库中的数据文件、重做日志文件非常类似。

DataNode把HDFS文件里的数据存储到本地文件系统,是联系本地文件系统和HDFS的纽带。DataNode将HDFS的每个数据块存到一个单独的本地文件中,这些本地文件并不都在同一个目录中。DataNode会根据实际情况决定一个目录中的文件数,并在适当的时候建立子目录。本地文件系统不能支持在一个目录里创建太多的文件。DataNode启动时会扫描本地文件系统,生成一个该节点上与本地文件对应的所有HDFS数据块的列表,并把列表上报给NameNode,这个报告就是前面所说的块报告。

5. HDFS示例

如图3-4所示,有一个256MB的文件,集群中有4个节点,那么默认情况下,当把文件上传到集群时,系统会自动做三件事情:

图3-4 HDFS示例

● HDFS会将此文件分成四个64MB的数据块。

● 每个块有三个复制。

● 数据块被分散到集群节点中,确保对于任意数据块,没有两个块复制在相同的节点上。

这个简单的数据分布算法是Hadoop成功的关键,它显著提高了HDFS集群在硬件失效时的可用性,并且使MapReduce计算框架成为可能。

3.3.2 MapReduce

MapReduce是一个分布式计算软件框架,支持编写处理大数据量(TB以上)的应用程序。MapReduce程序可以在几千个节点组成的集群上并行执行。集群节点使用通用的硬件,以硬件冗余保证系统的可靠性和可用性,而MapReduce框架则从软件上保证处理任务的可靠性和容错性。

在Hadoop中每个MapReduce应用程序被表示成一个作业,每个作业又被分成多个任务。应用程序向框架提交一个MapReduce作业,作业一般会将输入的数据集合分成彼此独立的数据块,然后由map任务以并行方式完成对数据分块的处理。框架对map的输出进行排序,之后输入到reduce任务。MapReduce作业的输入输出都存储在一个如HDFS的文件系统上。框架调度并监控任务的执行,当任务失败时框架会重新启动任务。

通常情况下,集群中的一个节点既是计算节点,又是存储节点。也就是说,MapReduce框架和HDFS共同运行在多个节点之上。这种设计效率非常高,框架可以在数据所在的节点上调度任务执行,大大节省了集群节点间的整体带宽。

Hadoop 0.20.0和之前的版本里,MapReduce框架由JobTracker和TaskTracker组成。JobTracker是一个运行在主节点上的后台服务进程,启动之后会一直监听并接收来自各个TaskTracker发送的心跳,包括资源使用情况和任务运行情况等信息。TaskTracker是运行在从节点上的进程,它一方面从JobTracker接收并执行各种命令,包括提交任务、运行任务、杀死任务等,另一方面将本地节点上各个任务的状态通过心跳,周期性地汇报给JobTracker。TaskTracker与JobTracker之间采用RPC协议进行通信。为解决MapReduce框架的性能瓶颈,从0.23.0版本开始,Hadoop的MapReduce框架完全重构,使用YARN管理资源,框架的组成变为三个部分:一个主节点上的资源管理器ResourceManager,每个从节点上的节点管理器NodeManager,每个应用程序对应的MRAppMaster。

一个最简单的MapReduce应用程序,只需要指定输入输出的位置,并实现适当的接口或抽象类,就可以提供map和reduce的功能。提交应用程序时,需要指定依赖的包、相关环境变量和可选的MapReduce作业配置参数。Hadoop作业客户端将程序提交的MapReduce作业及其相关配置发送给ResourceManager, ResourceManager把作业分解成任务,然后把任务和配置信息分发给工作节点,调度并监控任务的执行,同时向作业客户端提供任务状态和诊断信息。

尽管Hadoop框架是用Java语言实现的,但MapReduce应用程序却不一定要用Java来编写。Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现MapReduce。Hadoop Pipes是一个C++ API,允许用户使用C++语言编写MapReduce应用程序。

1.处理步骤

MapReduce数据处理分为Split、Map、Shuffle和Reduce 4个步骤。应用程序实现Map和Reduce步骤的逻辑,Split和Shuffle步骤由框架自动完成。

(1)Split步骤

在执行MapReduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(键/值对), map会依次处理每一个记录。引入split的概念是为了解决记录溢出问题。假设一个map任务处理一个块中的所有记录,那么当一个记录跨越了块边界时怎么办呢?HDFS的块大小是严格的64MB(默认值,当然也可能是配置的其他值),而且HDFS并不关心文件块中存储的内容是什么,因此HDFS无法评估何时一个记录跨越了多个块。

为了解决此问题,Hadoop使用了一种数据块的逻辑表示,叫做input splits。当MapReduce作业客户端计算input splits时,它会计算出块中第一个和最后一个完整记录的位置。如果最后一个记录是不完整的,input split中包含下一个块的位置信息,还有完整记录所需的字节偏移量。

MapReduce数据处理是由input splits概念驱动的。为特定应用计算出的input splits数量决定了mapper任务的数量。ResourceManager尽可能把每个map任务分配到存储input split的从节点上,以此来保证input splits被本地处理。

(2)Map步骤

一个MapReduce应用逐一处理input splits中的每一条记录。input splits在上一步骤被计算完成之后,map任务便开始处理它们,此时Resource Manager的调度器会给map任务分配它们处理数据所需的资源。

对于文本文件,默认为文件里的每一行是一条记录,一行的内容是键/值对中的值,从split的起始位置到每行的字节偏移量,是键/值对中的键。之所以不用行号当作键,是因为当一个大的文本文件被分成了许多数据块,当作很多splits处理时,行号的概念本身就是存在风险的。每个split中的行数不同,因此在处理一个split之前就计算出行数并不容易。但字节偏移量是精确的,因为每个数据块都有相同的固定的字节数。

map任务处理每一个记录时,会生成一个新的中间键/值对,这个键和值可能与输入对完全不同。map任务的输出就是这些中间键/值对的全部集合。为每个map任务生成最终的输出文件前,先会依据键进行分区,以便将同一分组的数据交给同一个reduce任务处理。在非常简单的应用场景下,可能只有一个reduce任务,此时map任务的所有输出都会被写入一个文件。但是在有多个reduce任务的情况下,每个map任务会基于分区键生成多个输出文件。框架默认的分区函数(HashPartitioner)满足大多数情况,但有时也需要定制自己的partitioner,例如需要对mapper的结果集进行二次排序时。

在应用程序中最好对map任务的输出文件进行压缩以获得更优的性能。

(3)Shuffle步骤

Map步骤之后,开始Reduce处理之前,还有一个重要的步骤叫做Shuffle。MapReduce保证每个reduce任务的输入都是按照键排好序的。系统对map任务的输出执行排序和转换,并映射为reduce任务的输入,此过程就是Shuffle,它是MapReduce的核心处理过程。在Shuffle中,会把map任务输出的一组无规则的数据尽量转换成一组具有一定规则的数据,然后把数据传递给reduce任务运行的节点。Shuffle横跨Map端和Reduce端,在Map端包括spill过程,在Reduce端包括copy和sort过程,如图3-5所示。

图3-5 Shuffle过程

需要注意的是,只有当所有的map任务都结束时,reduce任务才会开始处理。如果一个map任务运行在一个配置比较差的从节点上,它的滞后会影响MapReduce作业的性能。为了避免这种情况的发生,MapReduce框架使用了一种叫做推测执行的方法。所谓的推测执行,就是当所有task都开始运行之后,MRAppMaster会统计所有任务的平均进度,如果某个task所在的task node因为硬件配置比较低或者CPU load很高等原因,导致任务执行比总体任务的平均执行慢,此时MRAppMaster会启动一个新的任务(duplicate task),原有任务和新任务哪个先执行完就把另外一个kill。另外,根据mapreduce job幂等的特点,同一个task执行多次的结果是一样的,所以task只要有一次执行成功,job就是成功的,被kill的task对job的结果没有影响。如果你监测到任务执行成功,但是总有些任务被kill,或者map任务的数量比预期的多,可能就是此原因所在。

map任务的输出不写到HDFS,而是写入map任务所在从节点的本地磁盘,这个中间结果也不会在Hadoop集群间进行复制。

(4)Reduce步骤

Reduce步骤负责数据的计算归并,它处理Shuffle后的每个键及其对应值的列表,并将一系列键/值对返回给客户端应用。有些情况下只需要Map步骤的处理就可以为应用生成输出结果,这时就没有Reduce步骤。例如,将全部文本转换成大写这种基本的转化操作,或者从视频文件中抽取关键帧等。这些数据处理只要Map阶段就够了,因此又叫map-only作业。但在大多数情况下,到map任务输出结果只完成了一部分工作。剩下的任务是对所有中间结果进行归并、聚合等操作,最终生成一个汇总的结果。

与map任务类似,reduce任务也是逐条处理每一个键。通常reduce为每个处理的键返回单一键/值对,但这个结果键/值对可能会比原始输入的键/值对小得多。当reduce任务完成后,每个reduce任务的输出会写入一个结果文件,并将结果文件存储到HDFS中,HDFS会自动生成结果文件数据块的副本。

Resource Manager会尽量给map任务分配资源,确保input splits被本地处理,但这个策略不适用于reduce任务。Resource Manager假定map的结果集需要通过网络传输给reduce任务进行处理。这样实现的原因是,要对成百上千的map任务输出进行Shuffle,没有切实可行的方法为reduce实施相同的本地优先策略。

2.逻辑表示

MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发;Reduce是规约,负责数据的计算归并。Map函数和Reduce函数都是通过键/值对来操作数据的。Map函数将输入数据按数据的类型和一定的规则进行分解,并返回一个中间键/值对的列表,如下所示:

    Map(k1, v1) → list(k2, v2)

Reduce函数处理Map阶段产生的组,按键依次产生归并后的值的集合,如下所示:

    Reduce(k2, list (v2)) → list(v3)

通常一次Reduce调用会返回一个v3值或返回空,尽管允许一次调用返回多个值。所有Reduce调用的返回值集成在一起作为请求的结果列表。

为了实现MapReduce,仅仅有键/值对的抽象是不够的。MapReduce的分布式实现还需要一个Map和Reduce两个执行阶段的“连接器”,它可以是一个分布式文件系统,如HDFS,也可以是从mapper到reducer的数据流。

既然本书讲的是数据仓库,我们就来看一个SQL的例子。想象有一个11亿人口数据的数据库,要按年龄分组统计每个年龄的平均社会关系数。查询语句如下:

    selectage, avg(contacts)
      fromsocial.person
     group byage
     order byage;

使用MapReduce, K1键可以是1到1100的整数,每个整数表示一个100万条人口记录的批次号。K2键是人口的年龄。这个统计可以使用下面的Map/Reduce函数伪代码实现:

    function Map is
        input: integer K1 between 1 and 1100, representing a batch of 1 million social.person
    records
        for each social.person record in the K1 batch do
            let Y be the person's age
            let N be the number of contacts the person has
            produce one output record (Y, (N,1))
        repeat
    end function

    function Reduce is
        input: age (in years) Y
        for each input record (Y, (N, C)) do
            Accumulate in S the sum of N*C
            Accumulate in Cnew the sum of C
        repeat
        let A be S/Cnew
        produce one output record (Y, (A, Cnew))
    end function

MapReduce系统将线性增长到1100个Map进程,每个进程处理100万条输入记录。在Map步骤里,将产生11亿条(Y,(N,1))记录,Y表示年龄,假设取值范围在8到103之间。MapReduce系统将线性产生96个Reduce进程执行中间键/值对的Shuffle操作。每个Map进程产生的100万条记录,经过输出、排序、溢写、合并等map端的Shuffle操作,输出到96个Reduce进程,Reduce端再进行合并排序,计算我们实际需要的每个年龄的平均社会关系人数。Reduce步骤只会产生96条(Y, A)的输出记录,它们以Y值排序,被记录到最终的结果文件。

记住,尽管一个reduce任务可能已经获得了所有map任务的输出,但是只有在所有的map任务都结束后,reduce任务才开始执行,换句话说,要保持对map任务的计数。这一点至关重要,否则我们计算的平均值就是错误的。例如,经过map端Shuffle操作的输出如下:

    -- map output #1: age, quantity of contacts
    10, 9
    10, 9
    10, 9

    -- map output #2: age, quantity of contacts
    10, 9
    10, 9

    -- map output #3: age, quantity of contacts
    10, 10

如果在前两个map输出完成就开始reduce计算任务,此时得到的结果是:10岁的平均社会关系人数是9((9+9+9+9+9)/5):

    -- reduce step #1: age, average of contacts
    10, 9

这时第三个map输出完成,继续计算平均值时,我们得到的结果是9.5((9+10)/2),但这个数是错误的,正确的结果应该是9.166((9*3+9*2+10*1)/(3+2+1))。

3.应用程序定义

MapReduce框架中可以由应用程序定义的部分主要有:

● 输入程序:输入程序将输入的文件分解成适当大小的‘splits'(实践中典型的是64MB或128MB),框架为每一个split赋予一个Map任务。输入程序从稳定存储(一般是分布式文件系统)读取数据并生成键/值对。输入程序最常见的例子是读取一个目录下的所有文件,并将每一行作为一个记录返回。

● map函数:map函数处理输入的键/值对,生成零个或多个中间输入键/值对。map函数的输入与输出可以是不同的类型。例如单词计数应用,map函数分解每行的单词并输出每个单词的键/值对。单词是键,单词的实例数是值。

● 分区函数:每个map函数的输出通过应用定义的分区函数分配给特定的reduce任务。分区函数的输入是键、值和reduce任务的数量,输出reduce任务的索引值。典型的分区函数是取键的哈希值,或对键的哈希值用reduce任务数取模。选择适当的分区函数对于数据在reduce间的平均分布和负载均衡非常重要。

● 比较函数:通过应用的比较函数从Map运行的节点为reduce拉取数据并排序。

● reduce函数:框架按键的排序为每个唯一的键调用一次应用的reduce函数。reduce函数会在与键相关的多个值中迭代,然后生成零个或多个输出。例如单词计数应用,reduce函数获取到输入值,对它们进行汇总计算,并为每个单词及其计数值生成单个输出项。

● 输出程序:输出程序负责将reduce的输出写入稳定存储。

4. MapReduce示例

MapReduce是一个分布式编程模式。它的主要思想是,将数据Map为一个键/值对的集合,然后对所有键/值对按照相同键值进行Reduce。为了直观地理解这种编程模式,看一个在10TB的Web日志中计算“ERROR”个数的例子。假设Web日志输出到一系列文本文件中,文件中的每一行代表一个事件,以ERROR、WARN或INFO之一开头,表示事件级别。一行的其他部分由事件的时间戳及其描述组成,如图3-6所示。

图3-6 Web日志中的文本

我们可以非常容易地使用MapReduce模式计算“ERROR”的数量。如图3-7所示,在map阶段,识别出每个以“ERROR”开头的行并输出键值对<ERROR, 1>。在reduce阶段我们只需要对map阶段生成的<ERROR, 1>对进行计数。

图3-7 MapReduce统计‘ERROR’的个数

对这个例子稍微做一点扩展,现在想知道日志中ERROR、WARN、INFO分别的个数。如图3-8所示,在map阶段检查每一行并标识键值对,如果行以“INFO”开头,键值对为<INFO, 1>,如果以“WARN”开头,键值对为<WARN, 1>,如果以“ERROR”开头,键值对为<ERROR, 1>。在reduce阶段,对每个map阶段生成的唯一键值“INFO”“WARN”和“ERROR”进行计数。

图3-8 MapReduce分别统计‘ERROR'、‘WARN'、‘INFO’的个数

通过上面简单的示例我们已经初步理解了MapReduce编程模式是如何工作的,现在看一下MapReduce是怎么实现的。如图3-9所示,HadoopMapReduce的实现分为split、map、shuffle和reduce 4步。开发者只需要在Mappers和Reducers的Java类中编码map和reduce阶段的逻辑,框架完成其余的工作。

图3-9 MapReduce执行步骤

MapReduce的处理流程如下:

● HDFS分布数据。

● 向YARN请求资源以建立mapper实例。

● 在可用的节点上建立mapper实例。

● 对mappers的输出进行混洗,确保一个键对应的所有值都分配给相同的reducer。

● 向YARN请求资源以建立reducer实例。

● 在可用的节点上建立reducer实例。

表面上看,似乎MapReduce能处理的情况十分有限,但实际结果却是,正如前面统计平均社会关系人数的例子所示,大多数SQL操作都可以被表达成一连串的MapReduce操作,并且Hadoop生态圈的工具可以自动把SQL转化成MapReduce程序处理,所以对于熟悉SQL的开发者来说,不必再自己实现Mapper或者Reducer。

3.3.3 YARN

YARN是一种集群管理技术,全称是Yet Another Resource Negotiator。从图3-10可以看到,YARN是第二代Hadoop的一个关键特性。Apache开始对YARN的描述是,为MapReduce重新设计的一个资源管理器,经过不断地发展和改进,现在的YARN更像是一个支持大数据应用的分布式操作系统。

图3-10 Hadoop1.0与Hadoop2.0

2012年,YARN成为ApacheHadoop的子项目,有时也叫MapReduce2.0。它对老的MapReduce进行重构,将资源管理和调度功能与MapReduce的数据处理组件解耦,以使Hadoop可以支持更多的数据处理方法和更广泛的应用。例如,现在的Hadoop集群可以同时执行MapReduce批处理作业、交互式查询和流数据应用。最初的Hadoop1.x中,HDFS和MapReduce被紧密联系在一起,MapReduce并行执行Hadoop系统上的资源管理、作业调度和数据处理。YARN使用一个中心资源管理器给应用分配Hadoop系统资源,多个节点管理器监控集群中各个节点的操作处理情况。

1.第一代Hadoop的问题

第一代Hadoop是共享HDFS实例的MapReduce集群模型。这种共享计算架构的主要组件是JobTracker和TaskTracker。JobTracker是一个中央守护进程,负责运行集群上的所有作业。用户程序(JobClient)提交的作业信息会发送到JobTracker中,JobTracker与集群中的其他节点通过心跳定时通信,管理哪些任务应该运行在哪些节点上,还负责所有任务的失败重启操作。TaskTracker是系统里的从进程,它监视自己所在机器的资源情况,并根据JobTracker的指令来执行任务。TaskTracker同时监控当前机器的任务运行状况。TaskTracker需要把这些信息通过心跳发送给JobTracker, JobTracker会搜集这些信息以给新提交的作业分配运行资源。

第一代MapReduce的架构简单明了,刚推出时也有很多成功案例,但随着分布式系统的集群规模和工作负荷不断增长,使用原框架显露出以下问题:

● 可扩展性问题。JobTracker完成了太多的任务,造成了过多的资源消耗,当MapReduce作业非常多的时候,会产生很大的内存开销,同时也增加了JobTracker失败的风险。内存管理以及JobTracker中各特性的粗粒度锁问题成为可扩展性的显著瓶颈。将JobTracker扩展到4000个节点规模的集群被证明是极端困难的。

● 内存溢出问题。在TaskTracker端,以MapReduce任务的数目作为资源的表示过于简单,没有考虑到任务中CPU、内存的占用情况,如果几个大内存消耗的任务被调度到了一起,很容易出现内存溢出问题。

● 可靠性与可用性问题。JobTracker失败所引发的中断,不仅仅是丢失单独的一个作业,而是会丢失集群中所有的运行作业,并且要求用户手动重新提交并恢复他们的作业。从操作的角度来看,MapReduce框架在发生任何变化时(如修复缺陷、性能提升或增加特性),都会强制进行系统级别的升级更新。操作员必须协调好集群停机时间,关掉集群,部署新的二进制文件,验证升级,然后才允许提交新的作业。任何停机都会导致处理的积压,当作业被重新提交时,它们会给JobTracker造成明显的压力。更糟的是,升级强制让分布式集群系统的每一个客户端同时更新。这些更新会让用户为了验证他们之前的应用程序是否适用于新的Hadoop版本而浪费大量时间。

● 资源模型问题。在TaskTracker端,把资源强制划分为map任务槽位和reduce任务槽位,map和reduce的槽位数量是配置的固定值,因此闲置的map资源无法启动reduce任务,反之亦然。当系统中只有map任务或只有reduce任务的时候,也会造成资源的浪费。

2. YARN架构

为了解决第一代Hadoop的可扩展性、内存消耗、线程模型、可靠性和性能上的问题,Hadoop开发出新一代的MapReduce框架,命名为MapReduce V2或者叫YARN,其架构如图3-11所示。

图3-11 YARN架构

YARN的基本思想是将资源管理和调度及监控功能从MapReduce分离出来,用独立的后台进程实现。这个想法需要有一个全局的资源管理器(ResourceManager),每个应用还要有一个应用主管(ApplicationMaster)。应用可以是一个单独MapReduce作业,或者是一个作业的有向无环图(DAG)。

资源管理器和节点管理器(NodeManager)构成了分布式数据计算框架。资源管理器是系统中所有应用资源分配的最终仲裁者。节点管理器是框架中每个工作节点的代理,监控节点CPU、内存、磁盘、网络等资源的使用,并且报告给资源管理器。

每个应用对应的ApplicationMaster实际上是框架中一组特定的库,负责从资源管理器协调资源,并和节点管理器一起工作,共同执行和监控任务。

资源管理器有两个主要组件:调度器和应用管理器。调度器负责给多个正在运行的应用分配资源,比如对每个应用所能使用的资源做限制,按一定规则排队等。调度器只负责资源分配,它不监控或跟踪应用的状态。而且,当任务因为应用的错误或硬件问题而失败后,调度器不保证能重启它们。调度器根据应用对资源的需求执行其调度功能,这基于一个叫做资源容器的抽象概念。资源容器由内存、CPU、磁盘、网络等元素构成。调度器使用一个可插拔的调度策略,将集群资源分配给多个应用。当前支持的调度器如CapacityScheduler和FairScheduler就是可插拔调度器的例子。

应用管理器负责接收应用提交的作业,协调执行特定应用所需的资源容器,并在ApplicationMaster容器失败时提供重启服务。每个应用对应一个ApplicationMaster,它向调度器请求适当的资源容器,并跟踪应用的状态和资源使用情况。

Hadoop-2.x的MapReduce API保持与之前的稳定版本(Hadoop-1.x)兼容。这意味着老的MapReduce作业不需要做任何修改,只需要重新编译就可以在YARN上执行。

3. Capacity调度器

Capacity调度器以一种操作友好的方式,把Hadoop应用作为一个共享的、多租户集群来运行,并把集群利用率和吞吐量最大化。Capacity调度器允许多用户安全地共享一个大规模Hadoop集群,并保证它们的性能。其核心思想是,Hadoop集群中的可用资源为多个用户所共享,资源的多少是由他们的计算需求决定的。基于这种思想带来的一个好处是,只要资源没有被其他用户使用,一个用户就可以使用它,从而以一种具有成本效益的方式提供资源的弹性使用。

多个用户共享集群,必须要实现所谓的多租户(multi-tenancy)技术,这是因为集群中的每个用户任务都必须保证高性能和安全性。特别是集群中出现了某个用户或应用试图占用大量资源时,共享的集群必须做到不影响其他用户的使用。Capacity调度器提供了一套严格的限制机制,确保单一应用或用户不能消耗集群中不成比例的资源数量。并且,Capacity调度器可能限制或挂起一个异常应用,以保证整个集群的稳定。

Capacity调度器一个主要的抽象概念是队列(queues)。队列是Capacity的基础调度单元,管理员可以通过配置队列来影响共享集群的使用。为了提供更多的控制和可预测性,Capacity调度器支持层次队列,保证资源在允许其他队列使用之前,被一个用户的子队列优先共享,以此为特定应用提供资源亲和性。

4. Fair调度器

Fair调度是将资源公平分配给应用的方法,使得所有应用在平均情况下随着时间得到相等的份额。新一代Hadoop有能力调度多种资源类型。默认时Fair调度器只在内存上采用公平调度。

在Fair调度模型中,每个应用都属于某一个队列。YARNContainer的分配是选择使用了最少资源的队列,在这个队列中,再选择使用了最少资源的应用程序。默认情况下,所有的用户共享一个称为“default”的队列。如果一个应用程序在Container资源请求中指定了队列,则将请求提交到该队列中。另外,还可以将Fair调度器配置成根据请求中包含的用户名来分配队列。Fair调度器还支持许多功能,如队列的权重(权重大的队列获得更多的Container),最小份额,最大份额,以及队列内的FIFO策略,但基本思想是尽可能平均地共享资源。

在Fair调度器下,如果单个应用程序正在运行,该应用程序可以请求整个集群资源。若有其他程序提交,空闲的资源可以被公平地分配给新的应用程序,使每个应用程序最终可以获得大致相当的资源。Fair调度器也支持抢占的概念,从而可以从ApplicationMaster要回Container。根据配置和应用程序的设计,抢占和随后的资源分配可以是友好的或者强制的。

除了提供公平共享,Fair调度器还允许保证队列的最小份额,这是确保某些用户、组,或者应用程序总能得到的资源。当队列中有等待的应用程序,它至少可以获取其最小份额的资源。与此相反,当队列并不需要所有的保证份额,超出的部分可以分配给其他运行的应用程序。为了避免拥有数百个作业的单个用户充斥整个集群,Fair调度器可以通过配置文件限制用户和每个队列中运行应用程序的数量。若达到了该限制,用户应用程序将在队列中等待,直到前面提交的作业完成。

5. Container

在最基本的层面,Container是单个节点上内存、CPU核和磁盘等物理资源的集合。单个节点上可以有多个Container。系统中的每个节点可以认为是由内存和CPU最小容量的多个Container组成。ApplicationMaster可以请求任何Container来占据最小容量的整数倍的资源。因此Container代表了集群中单个节点上的一组资源(内存、CPU等),由节点管理器监控,由资源管理器调度。

每个应用程序从ApplicationMaster开始,它本身就是一个Container。一旦启动,ApplicationMaster就与资源管理器协商更多的Container。在运行过程中,可以动态地请求或者释放Container。例如,一个MapReduce作业可以请求一定数量的Map Container,当Map任务结束时,它可以释放这些Map Container,并请求更多的Reduce Container。

6. NodeManager

NodeManager是DataNode节点上的“工作进程”代理,管理Hadoop集群中独立的计算节点。其职责包括与ResourceManager保持通信、管理Container的生命周期、监控每个Container的资源使用情况、跟踪节点健康状况、管理日志和不同应用程序的附属服务(auxiliaryservices)等。

在启动时,NodeManager向ResourceManager注册,然后发送包含了自身状态的心跳,并等待来自ResourceManager的指令。它的主要目的是管理ResourceManager分配给它的应用程序Container。

7. ApplicationMaster

不同于YARN的其他组件,Hadoop 1.x中没有组件和ApplicationMaster相对应。本质上讲,ApplicationMaster所做的工作,就是原来JobTracker为每个应用所做的,但实现却是完全不同的。

运行在Hadoop集群上的每个应用程序,都有自己专用的Application Master实例,它实际上运行在每个从节点的一个Container进程中。而JobTracker是运行在主节点上的单个后台进程,跟踪所有应用的进行情况。

ApplicationMaster会周期性地向ResourceManager发送心跳消息,报告自身的状态和应用的资源使用情况。ResourceManager根据调度的结果,给特定从节点上的ApplicationMaster分配一个预留的Container的资源租约。ApplicationMaster监控一个应用的整个生命周期,从Container请求所需的资源开始,到ResourceManager将租约请求分配给NodeManager。

为Hadoop编写的每个应用框架都有自己的ApplicationMaster实现。在YARN的设计中,MapReduce只是一种应用程序框架,这种设计允许使用其他框架建立和部署分布式应用程序。例如,YARN附带了一个Distributed-Shell应用程序,它允许在YARN集群中的多个节点运行一个shell脚本。