2.6 Nutch中的分布式
前面介绍了很多分布式的内容,具体在网络爬虫中如何应用这些知识呢?下面以一款开源的搜索引擎为例,介绍如何设计一个分布式爬虫。
和Heritrix一样,Nutch是一个用Java语言开发的开源搜索引擎。现在,越来越多的人从Heritrix转向了Nutch的学习与开发。与Heritrix相比,Nutch的爬虫具有以下特点:
● 简单。Nutch的代码简单易懂、容易修改,核心的爬虫类不超过10个,而Heritrix比较复杂。读懂Nutch的代码轻而易举,而要想读懂Heritrix的代码则要费很大的功夫。
● Nutch支持分布式爬虫。Nutch最新版本的底层实现使用Hadoop。在云计算风起云涌的今天,确实得到众多开发者的青睐。
2.6.1 Nutch爬虫详解
Nutch爬虫使用宽度优先搜索的技术进行抓取。Nutch爬虫的设计着重两个方面:存储与爬虫过程。
首先,看一下Nutch的存储。Nutch存储主要使用数据文件,它的数据文件有三类,分别是Web database、segment和index。
Web database,也叫WebDB,用于存储爬虫抓取的网页之间的链接结构信息,WebDB只在爬虫中使用。WebDB内存储了两种实体的信息:Page和Link。Page实体描述互联网中网页的特征信息,主要包括网页内的链接数目,抓取此网页的时间等相关抓取信息,对此网页的重要度评分等。Link实体描述的是两个Page实体之间的链接关系。WebDB中存储了一个所抓取网页的链接结构图,在链接结构图中,Page实体是图的节点,而Link实体则代表图的边。
一次爬行会产生很多个段(segment),段存储的是爬虫在一次抓取过程中抓到的网页以及这些网页的索引。爬虫爬行时会根据WebDB中的链接关系按照一定的爬行策略生成每次抓取循环所需的预取列表(fetch list),然后Fetcher类通过预取列表中的URL抓取这些网页并索引,并将其存入段中。段是有时效的,网页被爬虫重新抓取后,先前抓取产生的段就作废了。存储时,段文件夹是以产生时间命名的,方便用户删除作废的segments以节省存储空间。
index是爬虫抓取的所有网页的索引,它是将所有segments中的索引合并处理后得到的。Nutch利用Lucene技术进行索引。需要注意的是,Lucene中的段和Nutch中的段不同,Lucene中的段是索引的一部分,而Nutch中的段和索引是各自独立的。
在分析了爬虫工作中设计的文件之后,接下来研究Nutch爬虫的抓取流程以及这些文件在抓取中扮演的角色。Nutch爬虫的工作原理主要是:首先根据WebDB生成一个待抓取网页的URL集合——预取列表,接着下载线程Fetcher类开始根据预取列表进行网页抓取。如果下载线程有很多个,那么就生成很多个预取列表,也就是一个Fetcher类的线程对应一个预取列表。爬虫根据抓取回来的网页更新WebDB,并且根据更新后的WebDB生成新的预取列表,接着下一轮抓取循环重新开始。这个循环过程可以叫作“产生/抓取/更新”循环。
指向同一个主机上Web资源的URL通常被分配到同一个预取列表中,这样可以防止过多的Fetcher线程对同一个主机同时进行抓取而导致主机负担过重。另外Nutch遵守Robots协议,网站可以通过自定义Robots.txt控制Nutch爬虫的抓取。
在Nutch中,抓取操作的实现是通过实现一系列子操作来完成的。Nutch提供了子命令行,可以单独调用这些子操作。下面就是这些子操作的功能描述以及对应的命令行,其中命令行写在括号中。
(1)创建一个新的WebDB(admin db -create),并且将起始URL写入WebDB(inject)。
(2)根据WebDB生成预取列表并写入相应的segments(generate)。
(3)根据预取列表中的URL抓取网页(fetch)。
(4)解析(parse)获得的网页。
(5)根据网页内的URL更新WebDb(updatedb)。
(6)循环进行2~5步直至预先设定的抓取深度。
(7)根据WebDB得到的网页评分和链接更新segments(updatesegs)。
(8)对所抓取的网页进行索引(index)。
(9)在索引中丢弃有重复内容的网页和重复的URL(dedup)。
(10)将segments中的索引进行合并,生成用于检索的最终index(merge)。
Nutch爬虫的详细工作流程是:在创建一个WebDB之后——步骤(1),根据一些种子URL开始启动“产生/抓取/更新”循环——步骤(2)~(6)。当这个循环彻底结束,爬虫根据抓取中生成的segments创建索引——步骤(7)~(10)。在重复URL清除——步骤(9)之前,每个segment的索引都是独立的——步骤(8)。最终,各个独立的segment索引被合并为一个最终的索引index——步骤(10)。整个Nutch的流程图如图2.18所示。
图2.18 Nutch工作流程
其中,标出来的(1)到(5)步是Nutch爬虫的过程。
下面,结合Nutch的源代码看一下Nutch爬虫的实现。Nutch的爬虫代码部分主要集中在packageorg.apache.nutch.fetcher和插件protocol-file、Protocol-ftp、protocol-http、protocol-httpclient以及相应的Parser插件中。最主要的类是Fetcher,它控制了整个爬虫的工作流程,我们从它入手一步步跟踪整个代码。
Fetcher类是一个线程类,它在run()中采用多线程运行FetcherThread类,并调用恰当的Protocol插件(支持http、ftp等协议)获取内容,当内容被获取之后,调用恰当的Parser插件将内容分析为文本,然后把这些文本内容放到FetcherOutput类里,最后由FetcherOutputFormat类写到段中。下面从run()函数开始分析,它的关键代码如下:
for (int i = 0; i < threadCount; i++) { FetcherThread thread = new FetcherThread(THREAD_GROUP_NAME+i); thread.start(); }
在上面的run()函数的关键代码中,建立了多个FetcherThread线程来抓取网页,threadCount可以配置或者使用默认值。在这段代码之后是一个while(true)的循环:
int n = group.activeCount(); Thread[] list = new Thread[n]; group.enumerate(list); boolean noMoreFetcherThread = true; for (int i = 0; i < n; i++) { if (list[i] == null) continue; String tname = list[i].getName(); if (tname.startsWith(THREAD_GROUP_NAME)) noMoreFetcherThread = false; if (LOG.isLoggable(Level.FINE)) LOG.fine(list[i].toString()); } if (noMoreFetcherThread) { if (LOG.isLoggable(Level.FINE)) LOG.fine("number of active threads: "+n); if (pages == pages0 && errors == errors0 && bytes == bytes0) break; status(); pages0 = pages; errors0 = errors; bytes0 = bytes; }
上面这段代码相当于维护一个线程池,并在日志中输入抓取页面的速度、状态之类的信息。下面看看抓取的线程FetcherThread是如何工作的。
FetchListEntry fle = new FetchListEntry();
首先,建立一个抓取列表类,然后又是一个while(true)循环。
if (fetchList.next(fle) == null) break; url = fle.getPage().getURL().toString();
从当前的FetchListEntry中获得一个要抓取的URL,然后进行抓取。
if (!fle.getFetch()) { if (LOG.isLoggable(Level.FINE)) LOG.fine("not fetching " + url); handleFetch(fle, new ProtocolOutput(null, ProtocolStatus.STATUS_NOTFETCHING)); continue; } if (fetchList.next(fle) == null) break; url = fle.getPage().getURL().toString();
上面代码表明,如果不需要抓取,则在handleFetch类中进行相应的处理。接着又是一个do…while循环,用来处理抓取过程中重定向指定的次数:整个循环的条件是需要重新抓取并且重定向次数没有超出最大次数。
接下来,使用ProtocolFactory工厂类创建Protocol类实例。
Protocol protocol = ProtocolFactory.getProtocol(url);
Protocol的实现是以插件的形式提供的,可以从Protocol中获取Fetch的输出流。
ProtocolOutput output = protocol.getProtocolOutput(fle);
通过输出流可以获取抓取的状态ProtocolStatus和抓取的内容Content。
ProtocolStatus pstat = output.getStatus(); Content content = output.getContent(); ProtocolStatus pstat = output.getStatus(); Content content = output.getContent();
之后判断抓取的状态:
switch(pstat.getCode())
如果抓取成功
case ProtocolStatus.SUCCESS: if (content != null)
如果抓取到的内容不为空则修改抓取的页数、抓取的字节数,并且每抓取100页,就在日志中记录抓取的速度等信息。
synchronized (Fetcher.this) { pages++; bytes += content.getContent().length; if ((pages % 100) == 0) { status(); } }
在handleFetch中进行相应的处理。
ParseStatus ps = handleFetch(fle, output);
如果处理返回的状态不为空,并且成功地重定向:
if (ps != null &&ps.getMinorCode() == ParseStatus.SUCCESS_REDIRECT)
获取重定向的链接并进行过滤。
String newurl = ps.getMessage(); newurl = URLFilters.filter(newurl);
如果重定向的链接newurl不为空并且和现在的URL不同:
if (newurl != null &&!newurl.equals(url)) refetch = true; url = newurl; redirCnt++;
创建当前页面的FetchListEntry。
fle = new FetchListEntry(true, new Page(url, NEW_INJECTED_PAGE_SCORE), new String[0]);
如果链接页面已经转移或者临时转移:
case ProtocolStatus.MOVE D: case ProtocolStatus.TEMP_MOVED:
立即重定向,处理抓取的结果。
handleFetch(fle, output);
获取重定向的URL:
String newurl = pstat.getMessage(); newurl = URLFilters.filter(newurl); if (newurl != null && !newurl.equals(url)) { refetch = true; url = newurl; redirCnt++; fle = new FetchListEntry(true, new Page(url, NEW_INJECTED_PAGE_SCORE), new String[0]); }
整个获取重定向URL的过程和上面的重定向类似。
如果获得的状态是以下几种状态之一,直接交由handleFetch类来处理。
case ProtocolStatus.GONE: case ProtocolStatus.NOTFOUND: case ProtocolStatus.ACCESS_DENIED: case ProtocolStatus.ROBOTS_DENIED: case ProtocolStatus.RETRY: case ProtocolStatus.NOTMODIFIED: case ProtocolStatus.GONE: case ProtocolStatus.NOTFOUND: case ProtocolStatus.ACCESS_DENIED: case ProtocolStatus.ROBOTS_DENIED: case ProtocolStatus.RETRY: case ProtocolStatus.NOTMODIFIED:
如果发生异常,则在日志中记录异常信息,然后交给handleFetch类处理。
case ProtocolStatus.EXCEPTION: logError(url, fle, new Exception(pstat.getMessage())); handleFetch(fle, output);
其他情况为未知状态,在日志中记录当前的状态,然后交给handleFetch处理。
default: LOG.warning("Unknown ProtocolStatus: " + pstat.getCode()); handleFetch(fle, output);
循环结束。
最后,如果完成的线程数等于threadCount,则关闭所有的插件。
synchronized (Fetcher.this) { atCompletion++; if (atCompletion == threadCount) { try { PluginRepository.getInstance().finalize(); } catch (java.lang.Throwable t) { // do nothing } } }
从上面的代码分析中可以看到,获取页面后大多数的处理都交给了handleFetch类。下面就来看看private ParseStatus handleFetch(FetchListEntry fle, ProtocolOutput output)的代码。
根据output获取到内容和URL:
Content content = output.getContent(); MD5Hash hash = null; String url = fle.getPage().getURL().toString();
如果content为null,新建content:
if (content == null) { content = new Content(url, url, new byte[0], "", new Properties()); hash = MD5Hash.digest(url); } else { hash = MD5Hash.digest(content.getContent()); }
在获取ProtocolStatus时
ProtocolStatus protocolStatus = output.getStatus();
如果Fetcher不进行解析(parse),直接把抓取的页面写入磁盘。
if (!Fetcher.this.parsing) { outputPage(new FetcherOutput(fle, hash, protocolStatus), content, null, null); return null; }
如果Fetcher需要进行解析,则首先获取页面contentType,以便根据正确编码进行解析。
String contentType = content.getContentType();
下面是使用Parser类进行页面解析的过程:
Parse parse = null; ParseStatus status = null; try { parser = ParserFactory.getParser(contentType, url); parse = parser.getParse(content); status = parse.getData().getStatus(); } catch (Exception e) { e.printStackTrace(); status = new ParseStatus(e); }
如果提取页面成功
if (status.isSuccess())
将FetcherOutput提取的内容以及状态保存。
outputPage(new FetcherOutput(fle, hash, protocolStatus), content, new ParseText(parse.getText()), parse.getData());
否则将FetcherOutput和空的内容保存。
LOG.info("fetch okay, but can't parse " + url + ", reason: " + status.toString()); outputPage(new FetcherOutput(fle, hash, protocolStatus), content, new ParseText(""), new ParseData(status, "", new Outlink[0], new Properties()));
在抓取过程中使用的Protocol采用了Nutch的插件机制,任何实现Protocol接口的实现类都可以负责抓取数据。这使得Nutch可以使用更多的网络协议获得数据,例如:HTTP、FTP等。有关插件机制的代码,这里不再一一讲述,有兴趣的读者可以参考相关资料。
2.6.2 Nutch中的分布式
本节将讲述Nutch中的分布式机制。首先,Nutch对抓取的数据和索引是采用分布式存储的;其次,在爬虫和建立索引的许多环节,Nutch都采用了MapReduce算法进行分布式计算。本节将从这两个方面对Nutch的分布式进行讲解。
1. Nutch的分布式文件系统
Nutch抓取内容之后就要开始对文件进行管理。Nutch分布式文件系统的基础架构是Hadoop文件系统NDFS。
Nutch的整个分布式文件系统工作架构如图2.19所示。
图2.19 Nutch分布式文件系统架构
2. Nutch中的MapReduce算法
在介绍Nutch中的MapReduce算法应用之前,先介绍Nutch中的CrawlDB目录。
Nutch中的CrawlDB目录中存储着一系列的文件,这些文件中每一行都存储<URL,CrawlDatum>数据结构。其中,CrawlDatum数据结构保存了对应URL的一系列属性,包括抓取时间、状态、抓取时间间隔、链接数目等。
上一节讲述的Nutch运行过程的10个步骤中有6个步骤包括插入URL列表(inject)、生成抓取列表(generate)、抓取内容(fetch)、分析处理内容(parse)、更新Crawl DB库(updatedb)和建立索引(index)都是采用MapReduce算法来完成的,具体技术实现细节如下。
1)插入URL列表(inject)
MapReduce程序1:
目标:转换input输入为CrawlDatum格式
输入:URL文件
步骤:
(1)Map(line)→<url,CrawlDatum>。
(2)Reduce()合并多重的URL。
输出:临时的CrawlDatum文件
MapReduce程序2:
目标:合并上一步产生的临时文件到新的CrawlDB
输入:上次MapReduce输出的CrawlDatum
步骤:
(1)Map()过滤重复的URL。
(2)Reduce:合并两个CrawlDatum到一个新的CrawlDB。
输出:CrawlDatum
2)生成抓取列表(Generate)
MapReduce程序:
目标:选择抓取列表
输入:CrawlDB文件
步骤:
(1)map():如果抓取当前时间大于现在时间,转换成<CrawlDatum,URL>格式。
(2)Reduce:取最顶部的N个链接。
输出:<URL,CrawlDatum>文件
3)抓取内容(Fetch)
MapReduce:
目标:抓取内容
输入:<URL,CrawlDatum>,按主机划分,按Hash排序
步骤:
(1)map(URL,CrawlDatum):输出<URL,FetcherOutput>。
(2)多线程,调用Nutch的抓取协议插件,抓取输出<CrawlDatum, Content>。
输出:<URL,CrawlDatum>和<URL,Content>两个文件
4)分析处理内容(Parse)
MapReduce程序:
目标:处理抓取的内容
输入:抓取的<URL, Content>
步骤:
(1)map(URL,Content):<URL,Parse>。
(2)Raduce()函数调用Nutch的解析插件,输出处理完的格式是<ParseText,ParseData>。
输出:<URL,ParseText>、<URL,ParseData>、<URL,CrawlDatum>
5)更新CrawlDB库(updatedB)
MapReduce程序:
目标:将fetch和parse整合到DB中
输入:<URL, CrawlDatum>现有的DB加上fetch和parse的输出,合并上面3个DB为一个新的DB
输出:新的抓取DB
6)建立索引(index)
MapReduce程序:
目标:生成Lucene索引
输入:多种文件格式
步骤:
(1)parse处理完的<URL,ParseData>提取title、metadata信息等。
(2)parse处理完的<URL,ParseText>提取text内容。
(3)转换链接处理完的<URL,Inlinks>提取anchors。
(4)抓取内容处理完的<URL,CrawlDatum>提取抓取时间。
(5)map()函数用ObjectWritable包裹上面的内容。
(6)reduce()函数调用Nutch的索引插件,生成Lucene Document文档。
输出:输出Lucene索引