1.3 设计爬虫队列
通过前几节的介绍,可以看出,网络爬虫最关键的数据结构是URL队列——通常我们称之为爬虫队列。之前的几节一直使用内存数据结构,例如用链表或者队列来实现URL队列。但是,网络中需要我们抓取的链接成千上万,在一些大型的搜索引擎中,比如百度和Google,大概有十几亿的URL需要抓取。因此,内存数据结构并不适合这些应用。最合适的一种方法就是使用内存数据库,或者直接使用数据库来存储这些URL。本节将讲解爬虫队列的基本知识,并介绍一种非常流行的内存数据库——Berkeley DB,最后介绍一个成熟的开源爬虫软件——Heritrix是如何实现爬虫队列的。
1.3.1 爬虫队列
爬虫队列的设计是网络爬虫的关键。爬虫队列是用来保存URL的队列数据结构的。在大型爬虫应用中,构建通用的可以伸缩的爬虫队列非常重要。数以10亿计的URL地址,使用内存的链表或者队列来存储显然不够,因此,需要找到一种数据结构,这种数据结构具有以下几个特点。
● 能够存储海量数据,当数据超出内存限制的时候,能够把它固化在硬盘上。
● 存取数据速度非常快。
● 能够支持多线程访问(多线程技术能够大规模提升爬虫的性能,这点将在之后的章节中详细介绍)。
结合上面三点对存储速度的要求,使Hash成为存储结构的不二选择。
通常,在进行Hash存储的时候,key值都选取URL字符串,但是为了更节省空间,通常会对URL进行压缩。常用的是MD5压缩算法。
MD5算法描述
对MD5算法的简要叙述为:MD5以512位分组来处理输入的信息,且每一分组又被划分为16个32位子分组,经过一系列的处理后,算法的输出由4个32位分组组成,将这4个32位分组级联后将生成一个128位的散列值。
在MD5算法中,首先需要对信息进行填充,使其位长度(Bits Length)对512求余的结果等于448。因此,信息的位长度将被扩展至N×512+448,即N×64+56个字节(Bytes),N为一个正整数。填充的方法如下。
在信息的后面填充一个1和无数个0,直到满足上面的条件时才停止用0对信息的填充。在这个结果后面附加一个以64位二进制表示的填充前信息长度。经过这两步的处理,现在的信息字节长度=N×512+448+64=(N+1)×512,即长度恰好是512的整数倍。这样做的原因是为了满足后面处理中对信息长度的要求。
MD5中有4个32位被称作链接变量(Chaining Variable)的整数参数,它们分别为:A=0x01234567、B=0x89abcdef、C=0xfedcba98、D=0x76543210。
当设置好这4个链接变量后,就开始进入算法的四轮循环运算。循环的次数是信息中512位信息分组的数目。
将上面4个链接变量复制到另外4个变量中:A到a、B到b、C到c、D到d。
主循环有四轮(MD4只有三轮),每轮循环都很相似。第一轮进行16次操作。每次操作对a、b、c或d中的3个做一次非线性函数运算,然后将所得结果依次加上第四个变量、文本的一个子分组和一个常数,再将所得结果向右循环移一个不定的数,并加上a、b、c或d之一。最后用该结果取代a、b、c或d之一。
以下是每次操作中用到的4个非线性函数(每轮一个)。
F(X,Y,Z)=(X&Y)|((~X)&Z)
G(X,Y,Z)=(X&Z)|(Y&(~Z))
H(X,Y,Z)=X^Y^Z
I(X,Y,Z)=Y^(X|(~Z))
(&是与,|是或,~是非,^是异或)
这4个函数的说明:如果X、Y和Z的对应位是独立和均匀的,那么结果的每一位也应是独立和均匀的。F是一个逐位运算的函数。即:如果X,那么Y,否则Z。函数H是逐位奇偶操作符。
假设Mj表示消息的第j个子分组(0~15),FF(a,b,c,d,Mj,s,ti)表示a=b+(a+(F(b,c,d)+Mj+ti)),GG(a,b,c,d,Mj,s,ti)表示a=b+(a+(G(b,c,d)+Mj+ti)),HH(a,b,c,d,Mj,s,ti)表示a=b+(a+(H(b,c,d)+Mj+ti)),II(a,b,c,d,Mj,s,ti)表示a=b+(a+(I(b,c,d)+Mj+ti))。
这四轮(共64步)如下。
第一轮:
FF(a,b,c,d,M0,7,0xd76aa478)
FF(d,a,b,c,M1,12,0xe8c7b756)
FF(c,d,a,b,M2,17,0x242070db)
FF(b,c,d,a,M3,22,0xc1bdceee)
FF(a,b,c,d,M4,7,0xf57c0faf)
FF(d,a,b,c,M5,12,0x4787c62a)
FF(c,d,a,b,M6,17,0xa8304613)
FF(b,c,d,a,M7,22,0xfd469501)
FF(a,b,c,d,M8,7,0x698098d8)
FF(d,a,b,c,M9,12,0x8b44f7af)
FF(c,d,a,b,M10,17,0xffff5bb1)
FF(b,c,d,a,M11,22,0x895cd7be)
FF(a,b,c,d,M12,7,0x6b901122)
FF(d,a,b,c,M13,12,0xfd987193)
FF(c,d,a,b,M14,17,0xa679438e)
FF(b,c,d,a,M15,22,0x49b40821)
第二轮:
GG(a,b,c,d,M1,5,0xf61e2562)
GG(d,a,b,c,M6,9,0xc040b340)
GG(c,d,a,b,M11,14,0x265e5a51)
GG(b,c,d,a,M0,20,0xe9b6c7aa)
GG(a,b,c,d,M5,5,0xd62f105d)
GG(d,a,b,c,M10,9,0x02441453)
GG(c,d,a,b,M15,14,0xd8a1e681)
GG(b,c,d,a,M4,20,0xe7d3fbc8)
GG(a,b,c,d,M9,5,0x21e1cde6)
GG(d,a,b,c,M14,9,0xc33707d6)
GG(c,d,a,b,M3,14,0xf4d50d87)
GG(b,c,d,a,M8,20,0x455a14ed)
GG(a,b,c,d,M13,5,0xa9e3e905)
GG(d,a,b,c,M2,9,0xfcefa3f8)
GG(c,d,a,b,M7,14,0x676f02d9)
GG(b,c,d,a,M12,20,0x8d2a4c8a)
第三轮:
HH(a,b,c,d,M5,4,0xfffa3942)
HH(d,a,b,c,M8,11,0x8771f681)
HH(c,d,a,b,M11,16,0x6d9d6122)
HH(b,c,d,a,M14,23,0xfde5380c)
HH(a,b,c,d,M1,4,0xa4beea44)
HH(d,a,b,c,M4,11,0x4bdecfa9)
HH(c,d,a,b,M7,16,0xf6bb4b60)
HH(b,c,d,a,M10,23,0xbebfbc70)
HH(a,b,c,d,M13,4,0x289b7ec6)
HH(d,a,b,c,M0,11,0xeaa127fa)
HH(c,d,a,b,M3,16,0xd4ef3085)
HH(b,c,d,a,M6,23,0x04881d05)
HH(a,b,c,d,M9,4,0xd9d4d039)
HH(d,a,b,c,M12,11,0xe6db99e5)
HH(c,d,a,b,M15,16,0x1fa27cf8)
HH(b,c,d,a,M2,23,0xc4ac5665)
第四轮:
II(a,b,c,d,M0,6,0xf4292244)
II(d,a,b,c,M7,10,0x432aff97)
II(c,d,a,b,M14,15,0xab9423a7)
II(b,c,d,a,M5,21,0xfc93a039)
II(a,b,c,d,M12,6,0x655b59c3)
II(d,a,b,c,M3,10,0x8f0ccc92)
II(c,d,a,b,M10,15,0xffeff47d)
II(b,c,d,a,M1,21,0x85845dd1)
II(a,b,c,d,M8,6,0x6fa87e4f)
II(d,a,b,c,M15,10,0xfe2ce6e0)
II(c,d,a,b,M6,15,0xa3014314)
II(b,c,d,a,M13,21,0x4e0811a1)
II(a,b,c,d,M4,6,0xf7537e82)
II(d,a,b,c,M11,10,0xbd3af235)
II(c,d,a,b,M2,15,0x2ad7d2bb)
II(b,c,d,a,M9,21,0xeb86d391)
常数ti的选择:在第i步中,ti是4294967296*abs(sin(i))的整数部分,i的单位是弧度。(4294967296=232)
所有这些都完成之后,将A、B、C、D分别加上a、b、c、d。此后用下一分组数据继续运行算法,最后的输出是A、B、C和D的级联。
在Java中,java.security.MessageDigest中已经定义了MD5的计算,只需要简单地调用即可得到MD5的128位整数,然后将此128位16个字节转换成十六进制表示即可。下面是一段Java实现MD5压缩的代码。
MD5压缩算法代码:
/* 传入参数:一个字节数组 * 传出参数:字节数组的MD5 结果字符串 */ public class MD5 { public static String getMD5(byte[] source) { String s = null; char hexDigits[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; // 用来将字节转换成十六进制表示的字符 try { java.security.MessageDigest md = java.security.MessageDigest.getInstance( "MD5" ); md.update( source ); byte tmp[] = md.digest(); // MD5的计算结果是一个 128 位的长整数, // 用字节表示就是 16个字节 char str[] = new char[16 * 2]; // 每个字节用十六进制表示的话,使用两个字符, // 所以表示成十六进制需要 32个字符 int k = 0; // 表示转换结果中对应的字符位置 for (int i = 0; i < 16; i++) { // 从第一个字节开始,将 MD5的每一个字节 // 转换成十六进制字符 byte byte0 = tmp[i]; // 取第 i个字节 str[k++] = hexDigits[byte0 >>> 4 & 0xf]; // 取字节中高 4 位的数字转换, // >>> 为逻辑右移,将符号位一起右移 str[k++] = hexDigits[byte0 & 0xf]; // 取字节中低 4 位的数字转换 } s = new String(str); // 转换后的结果转换为字符串 }catch( Exception e ) { e.printStackTrace(); } return s; } }
Hash存储的Value值通常会对URL和相关的信息进行封装,封装成为一个对象进行存储。
综合前面分析的信息,选择一个可以进行线程安全、使用Hash存储,并且能够应对海量数据的内存数据库是存储URL最合适的数据结构。因此,由Oracle公司开发的内存数据库产品Berkeley DB就进入了我们的视线。下一节,将集中精力介绍Berkeley DB以及它的用法。
1.3.2 使用Berkeley DB构建爬虫队列
Berkeley DB是一个嵌入式数据库,适合于管理海量的简单的数据。例如,Google用Berkeley DB HA(High Availability)来管理他们的账户信息。Motorola在它的无线产品中用Berkeley DB跟踪移动单元。HP、Microsoft、Sun Microsystems等也都是它的大客户。Berkeley DB不能完全取代关系型数据库,但在某些方面,它却令关系数据库望尘莫及。
关键字/数据key/value是Berkeley DB用来进行数据库管理的基础。每个key/value对构成一条记录,而整个数据库实际上就是由许多这样的结构单元所构成的。通过使用这种方式,开发人员在使用Berkeley DB提供的API访问数据库时,只需提供关键字就能够访问到相应的数据。当然,也可以提供key和部分data来查询符合条件的相近数据。
Berkeley DB底层实现采用B树,可以看成能够存储大量数据的HashMap。BerkeleyDB简称BDB,官方网址是http://www.oracle.com/database/berkeley-db/index.html。Berkeley DB的C++版本首先出现,然后在此基础上又实现了Java本地版本。Berkeley DB是通过环境对象EnvironmentConfig来对数据库进行管理的,每个EnvironmentConfig对象可以管理多个数据库。新建一个EnvironmentConfig的代码如下:
EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setTransactional(false); envConfig.setAllowCreate(true); exampleEnv = new Environment(envDir, envConfig);
其中,envDir是用户指定的一个目录。只要是由同一个EnvironmentConfig指定的数据库的数据文件和日志文件,都会放在这个目录下。EnvironmentConfig也是一种资源,使用完毕,需要关闭。
exampleEnv.sync(); exampleEnv.close(); exampleEnv = null;
创建好环境之后,就可以用它创建数据库了。用Berkeley DB创建数据库时,需要指定数据库的属性,就好比在Oracle中创建数据库时要指定java_pool、buffer_size等属性一样。Berkeley DB使用DatabaseConfig来管理一个具体的DataBase。
String databaseName= "ToDoTaskList.db"; DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); dbConfig.setTransactional(false); // 打开用来存储类信息的数据库 //用来存储类信息的数据库不要求能够存储重复的关键字 dbConfig.setSortedDuplicates(false); Database myClassDb = exampleEnv.openDatabase(null, "classDb", dbConfig); //初始化用来存储序列化对象的catalog类 catalog = new StoredClassCatalog(myClassDb); TupleBinding keyBinding = TupleBinding.getPrimitiveBinding(String.class); // 把value作为对象的序列化方式存储 SerialBinding valueBinding = new SerialBinding(catalog, NewsSource.class); store = exampleEnv.openDatabase(null, databaseName, dbConfig);
当数据库建立起来之后,就要确定往数据库里面存储的数据类型(也就是确定key和value的值)。Berkeley DB数据类型是使用EntryBinding对象来确定的。
EntryBinding keyBinding =new SerialBinding(javaCatalog,String.class);
其中,SerialBinding表示这个对象能够序列化到磁盘上,因此,构造函数的第二个参数一定要是实现了序列化接口的对象。
最后,我们来创建一个以Berkeley DB为底层数据结构的Map。
// 创建数据存储的映射视图 this.map = new StoredSortedMap(store, keyBinding, valueBinding, true);
1.3.3 使用Berkeley DB构建爬虫队列示例
上一节我们讲述了Berkeley DB的基础知识,这一节要讲述如何使用Berkeley DB来构建一个完整的爬虫队列。
首先,用Berkeley DB存储一个key/value的结构,并且key和value对象都要实现Java序列化接口。因此,我们先来构建value对象,即一个封装了很多重要属性的URL类。
Berkeley DB中存储的Value类:
public class CrawlUrl implements Serializable { private static final long serialVersionUID = 7931672194843948629L; public CrawlUrl() { } private String oriUrl;// 原始URL的值,主机部分是域名 private String url; // URL的值,主机部分是IP,为了防止重复主机的出现 private int urlNo; // URL NUM private int statusCode; // 获取URL返回的结果码 private int hitNum; // 此URL被其他文章引用的次数 private String charSet; // 此URL对应文章的汉字编码 private String abstractText; // 文章摘要 private String author; // 作者 private int weight; // 文章的权重(包含导向词的信息) private String description; // 文章的描述 private int fileSize; // 文章大小 private Timestamp lastUpdateTime; // 最后修改时间 private Date timeToLive; // 过期时间 private String title; // 文章名称 private String type; // 文章类型 private String[] urlRefrences; // 引用的链接 private int layer; //爬取的层次,从种子开始,依次为第0层,第1层…… public int getLayer() { return layer; } public void setLayer(int layer) { this.layer = layer; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public int getUrlNo() { return urlNo; } public void setUrlNo(int urlNo) { this.urlNo = urlNo; } public int getStatusCode() { return statusCode; } public void setStatusCode(int statusCode) { this.statusCode = statusCode; } public int getHitNum() { return hitNum; } public void setHitNum(int hitNum) { this.hitNum = hitNum; } public String getCharSet() { return charSet; } public void setCharSet(String charSet) { this.charSet = charSet; } public String getAbstractText() { return abstractText; } public void setAbstractText(String abstractText) { this.abstractText = abstractText; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public int getFileSize() { return fileSize; } public void setFileSize(int fileSize) { this.fileSize = fileSize; } public Timestamp getLastUpdateTime() { return lastUpdateTime; } public void setLastUpdateTime(Timestamp lastUpdateTime) { this.lastUpdateTime = lastUpdateTime; } public Date getTimeToLive() { return timeToLive; } public void setTimeToLive(Date timeToLive) { this.timeToLive = timeToLive; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String[] getUrlRefrences() { return urlRefrences; } public void setUrlRefrences(String[] urlRefrences) { this.urlRefrences = urlRefrences; } public final String getOriUrl() { return oriUrl; } public void setOriUrl(String oriUrl) { this.oriUrl = oriUrl; } }
写一个TODO表的接口:
public interface Frontier { public CrawlUrl getNext()throws Exception; public boolean putUrl(CrawlUrl url) throws Exception; //public boolean visited(CrawlUrl url); }
使用一个抽象类来封装对Berkeley DB的操作:
public abstract class AbstractFrontier { private Environment env; private static final String CLASS_CATALOG = "java_class_catalog"; protected StoredClassCatalog javaCatalog; protected Database catalogdatabase; protected Database database; public AbstractFrontier(String homeDirectory) throws DatabaseException, FileNotFoundException { // 打开env System.out.println("Opening environment in: " + homeDirectory); EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setTransactional(true); envConfig.setAllowCreate(true); env = new Environment(new File(homeDirectory), envConfig); // 设置DatabaseConfig DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setTransactional(true); dbConfig.setAllowCreate(true); // 打开 catalogdatabase = env.openDatabase(null, CLASS_CATALOG, dbConfig); javaCatalog = new StoredClassCatalog(catalogdatabase); // 设置DatabaseConfig DatabaseConfig dbConfig0 = new DatabaseConfig(); dbConfig0.setTransactional(true); dbConfig0.setAllowCreate(true); // 打开 database = env.openDatabase(null, "URL", dbConfig); } //关闭数据库,关闭环境 public void close() throws DatabaseException { database.close(); javaCatalog.close(); env.close(); } //put方法 protected abstract void put(Object key,Object value); //get方法 protected abstract Object get(Object key); //delete方法 protected abstract Object delete(Object key); }
实现真正的TODO表:
public class BDBFrontier extends AbstractFrontier implements Frontier { private StoredMap pendingUrisDB = null; //使用默认的路径和缓存大小构造函数 public BDBFrontier(String homeDirectory) throws DatabaseException, FileNotFoundException{ super(homeDirectory); EntryBinding keyBinding =new SerialBinding (javaCatalog,String.class); EntryBinding valueBinding =new SerialBinding(javaCatalog, CrawlUrl.class); pendingUrisDB = new StoredMap(database,keyBinding, valueBinding, true); } //获得下一条记录 public CrawlUrl getNext() throws Exception { CrawlUrl result = null; if(!pendingUrisDB.isEmpty()){ Set entrys = pendingUrisDB.entrySet(); System.out.println(entrys); Entry<String,CrawlUrl> entry=(Entry<String,CrawlUrl>)pendingUrisDB.entrySet().iterator().next(); result = entry.getValue(); delete(entry.getKey()); } return result; } //存入URL public boolean putUrl(CrawlUrl url){ put(url.getOriUrl(),url); return true; } // 存入数据库的方法 protected void put(Object key,Object value) { pendingUrisDB.put(key, value); } //取出 protected Object get(Object key){ return pendingUrisDB.get(key); } //删除 protected Object delete(Object key){ return pendingUrisDB.remove(key); } // 根据URL计算键值,可以使用各种压缩算法,包括MD5法 private String caculateUrl(String url) { return url; } // 测试函数 public static void main(String[] strs) { try { BDBFrontier bBDBFrontier = new BDBFrontier("c:\\bdb"); CrawlUrl url = new CrawlUrl(); url.setOriUrl("http://www.163.com"); bBDBFrontier.putUrl(url); System.out.println(((CrawlUrl)bBDBFrontier.getNext()).getOriUrl()); bBDBFrontier.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ } } }
以上就是一个使用Berkeley DB来实现TODO表的完整示例。
1.3.4 使用布隆过滤器构建Visited表
上一节内容介绍了如何实现TODO表,这一节来探讨如何在一个企业级的搜索引擎中实现Visited表。在企业级搜索引擎中,常用一个称为布隆过滤器(Bloom Filter)的算法来实现对已经抓取过的URL的过滤。下面首先介绍什么叫布隆过滤器算法。
在日常生活中,包括在设计计算机软件时,经常要判断一个元素是否在一个集合中。比如在字处理软件中,需要检查一个英语单词是否拼写正确(也就是判断它是否在已知的字典中);在FBI中,一个嫌疑人的名字是否已经在嫌疑名单上;在网络爬虫里,一个网址是否被访问过等。最直接的方法就是将集合中全部的元素存储在计算机中,当遇到一个新元素时,将它和集合中的元素直接比较即可。一般来讲,计算机中的集合是用哈希表(Hash Table)来存储的。它的好处是快速而准确,缺点是耗费存储空间。当集合比较小时,这个问题不显著,但是当集合巨大时,哈希表存储效率低的问题就显现出来了。比如说,一个像Yahoo、Hotmail和Gmail那样的公众电子邮件(E-mail)提供商,总是需要过滤来自发送垃圾邮件的人(spamer)的垃圾邮件。一个办法就是记录下那些发垃圾邮件的E-mail地址。由于那些发送者不停地在注册新的地址,全世界少说也有几十亿个发垃圾邮件的地址,将它们都存储起来则需要大量的网络服务器。如果用哈希表,每存储1亿个E-mail地址,就需要1.6GB的内存(用哈希表实现的具体办法是将每一个E-mail地址对应成一个8字节的信息指纹,然后将这个信息指纹存入哈希表,由于哈希表的存储效率一般只有50%,因此一个E-mail地址需要占用16个字节。1亿个地址大约要1.6GB,即16亿字节的内存),因此存储几十亿个邮件地址可能需要上百GB的内存。除非是超级计算机,否则一般服务器是无法存储的。
一种称作布隆过滤器的数学工具,它只需要哈希表1/8~1/4的大小就能解决同样的问题。
布隆过滤器是由巴顿·布隆于1970年提出的,它实际上是一个很长的二进制向量和一系列随机映射函数。我们通过下面的例子来说明其工作原理。
假定存储1亿个电子邮件地址,先建立一个16亿二进制常量,即2亿字节的向量,然后将这16亿个二进制位全部设置为0。对于每一个电子邮件地址X,用8个不同的随机数产生器(F1,F2,...,F8)产生8个信息指纹(f1,f2,...,f8)。再用一个随机数产生器G把这8个信息指纹映射到1亿~16亿中的8个自然数g1,g2,...,g8。现在我们把这8个位置的二进制位全部设置为1。当我们对这1亿个E-mail地址都进行这样的处理后,一个针对这些E-mail地址的布隆过滤器就建成了,如图1.7所示。
图1.7 布隆过滤器
现在,来看看布隆过滤器是如何检测一个可疑的电子邮件地址Y是否在黑名单中的。我们用8个随机数产生器(F1,F2,...,F8)对这个地址产生8个信息指纹S1,S2,…,S8,然后将这8个指纹对应到布隆过滤器的8个二进制位,分别是T1,T2,…,T8。如果Y在黑名单中,显然,T1,T2,…,T8对应的8个二进制位一定是1。这样在遇到任何黑名单中的电子邮件地址时,我们都能够准确地发现。
布隆过滤器绝对不会漏掉任何一个在黑名单中的可疑地址。但是,它有一个不足之处,也就是它有极小的可能将一个不在黑名单中的电子邮件地址判定为在黑名单中,因为有可能某个好的邮件地址正巧对应8个都被设置成1的二进制位。好在这种可能性很小。我们把它称为误识概率。在上面的例子中,误识概率大概在万分之一以下。常见的补救办法是建立一个小的白名单,存储那些可能误判的邮件地址。
下面是一个布隆过滤器的实现:
public class SimpleBloomFilter implements VisitedFrontier { private static final int DEFAULT_SIZE = 2 << 24; private static final int[] seeds = new int[] { 7, 11, 13, 31, 37, 61, }; private BitSet bits = new BitSet(DEFAULT_SIZE); private SimpleHash[] func = new SimpleHash[seeds.length]; public static void main(String[] args) { String value = "stone2083@yahoo.cn"; SimpleBloomFilter filter = new SimpleBloomFilter(); System.out.println(filter.contains(value)); filter.add(value); System.out.println(filter.contains(value)); } public SimpleBloomFilter() { for (int i = 0; i < seeds.length; i++) { func[i] = new SimpleHash(DEFAULT_SIZE, seeds[i]); } } // 覆盖方法,把URL添加进来 public void add(CrawlUrl value) { if (value != null) { add(value.getOriUrl()); } } // 覆盖方法,把URL添加进来 public void add(String value) { for (SimpleHash f : func) { bits.set(f.hash(value), true); } } // 覆盖方法,是否包含URL public boolean contains(CrawlUrl value) { return contains(value.getOriUrl()); } // 覆盖方法,是否包含URL public boolean contains(String value) { if (value == null) { return false; } boolean ret = true; for (SimpleHash f : func) { ret = ret && bits.get(f.hash(value)); } return ret; } public static class SimpleHash { private int cap; private int seed; public SimpleHash(int cap, int seed) { this.cap = cap; this.seed = seed; } public int hash(String value) { int result = 0; int len = value.length(); for (int i = 0; i < len; i++) { result = seed * result + value.charAt(i); } return (cap - 1) & result; } } }
如果想知道需要使用多少位才能降低错误概率,可以以表1.5所示的存储项目和位数比率估计布隆过滤器的误判率。
表1.5 布隆过滤器误判率表
为每个URL分配2字节就可以达到千分之几的冲突。比较保守的实现是,为每个URL分配4字节,项目和位数比是1∶32,误判率是0.00000021167340。对于5000万数量级的URL,布隆过滤器只占用200MB的空间,并且排重速度超快,一遍下来不到2分钟。
1.3.5 详解Heritrix爬虫队列
上一节介绍了Berkeley DB构建爬虫队列的方法和过程,在许多开源爬虫软件中,都是用Berkeley DB来实现爬虫队列。本节将分析一个常用的开源爬虫软件,以增加读者对爬虫队列的理解。
Heritrix是一个开源的可扩展的爬虫项目。它始于2003年,最初的目的是开发一个特殊的爬虫,对网上的资源进行归档。在Heritrix以及很多开源爬虫软件中,爬虫队列有一个非常好听的名字——Frontier。在Heritrix中,Frontier底层的数据结构也是使用了Berkeley DB。
Heritrix中的Frontier内部处理机制如图1.8所示。
图1.8 Heritrix中的Frontier架构
1. BdbMultipleWorkQueues
它是对Berkeley DB的简单封装。在内部有一个Berkeley Database,存放所有待处理的链接。代码如下:
public class BdbMultipleWorkQueues { // 存放所有待处理URL的数据库 private Database pendingUrisDB = null; // 由key获取一个链接 public CrawlURI get(DatabaseEntry headKey) throws DatabaseException { DatabaseEntry result = new DatabaseEntry(); // 由key获取相应的链接 OperationStatus status = getNextNearestItem(headKey, result); CrawlURI retVal = null; if (status != OperationStatus.SUCCESS) { LOGGER.severe("See '1219854 NPE je-2.0 " + "entryToObject '. OperationStatus " + " was not SUCCESS: " + status + ", headKey " + BdbWorkQueue.getPrefixClassKey(headKey.getData())); return null; } try { retVal = (CrawlURI) crawlUriBinding.entryToObject(result); } catch (RuntimeExceptionWrapper rw) { LOGGER.log(Level.SEVERE, "expected object missing in queue " + BdbWorkQueue.getPrefixClassKey(headKey.getData()), rw); return null; } retVal.setHolderKey(headKey); return retVal;// 返回链接 } // 从等待处理列表中获取一个链接 protected OperationStatus getNextNearestItem(DatabaseEntry headKey, DatabaseEntry result) throws DatabaseException { Cursor cursor = null; OperationStatus status; try { // 打开游标 cursor = this.pendingUrisDB.openCursor(null, null); status = cursor.getSearchKey(headKey, result, null); if (status != OperationStatus.SUCCESS || result.getData().length > 0) { throw new DatabaseException("bdb queue cap missing"); } status = cursor.getNext(headKey, result, null); } finally { if (cursor != null) { cursor.close(); } } return status; } /** * 添加URL到数据库 */ public void put(CrawlURI curi, boolean overwriteIfPresent) throws DatabaseException { DatabaseEntry insertKey = (DatabaseEntry)curi.getHolderKey(); if (insertKey == null) { insertKey = calculateInsertKey(curi); curi.setHolderKey(insertKey); } DatabaseEntry value = new DatabaseEntry(); crawlUriBinding.objectToEntry(curi, value); if (LOGGER.isLoggable(Level.FINE)) { tallyAverageEntrySize(curi, value); } OperationStatus status; if (overwriteIfPresent) { // 添加 status = pendingUrisDB.put(null, insertKey, value); } else { status = pendingUrisDB.putNoOverwrite(null, insertKey, value); } if (status != OperationStatus.SUCCESS) { LOGGER.severe("failed; " + status + " " + curi); } } }
2. BdbWorkQueue
它代表一个链接队列,该队列中所有的链接都具有相同的键值。它实际上是通过调用BdbMultipleWorkQueues的get方法从等待处理的链接数据库中取得链接的。代码如下:
public class BdbWorkQueue extends WorkQueue implements Comparable, Serializabl { //获取一个URL protected CrawlURI peekItem(final WorkQueueFrontier frontier) throws IOException { // 关键:从BdbFrontier中返回pendingUris final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier) .getWorkQueues(); DatabaseEntry key = new DatabaseEntry(origin); CrawlURI curi = null; int tries = 1; while(true) { try { //获取链接 curi = queues.get(key); } catch (DatabaseException e) { LOGGER.log(Level.SEVERE,"peekItem failure; retrying",e); } … } return curi; } }
3. WorkQueueFrontier
实现了最核心的方法:
public CrawlURI next() throws InterruptedException, EndedException { while (true) { long now = System.currentTimeMillis(); preNext(now); synchronized(readyClassQueues) { int activationsNeeded = targetSizeForReadyQueues() – rea dyClassQueues.size(); while(activationsNeeded > 0 && !inactiveQueues.isEmpty()) { activateInactiveQueue(); activationsNeeded--; } } WorkQueue readyQ = null; Object key = readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MIL LISECONDS); if (key != null) { readyQ = (WorkQueue)this.allQueues.get(key); } if (readyQ != null) { while(true) { CrawlURI curi = null; synchronized(readyQ) { /**取出一个URL,最终从子类BdbFrontier的 *pendingUris中取出一个链接 */ curi = readyQ.peek(this); if (curi != null) { String currentQueueKey = getClassKey(curi); if (currentQueueKey.equals(curi.getClassKey())) { noteAboutToEmit(curi, readyQ); //加入正在处理队列中 inProcessQueues.add(readyQ); return curi; //返回 } curi.setClassKey(currentQueueKey); readyQ.dequeue(this);//出队列 decrementQueuedCount(1); curi.setHolderKey(null); } else { readyQ.clearHeld(); break; } } if(curi!=null) { // complete the requeuing begun earlier sendToQueue(curi); } } } else { if (key != null) { logger.severe("Key "+ key + " in readyClassQueues but not allQueues"); } } if(shouldTerminate) { throw new EndedException("shouldTerminate is true"); } if(inProcessQueues.size()==0) { this.alreadyIncluded.requestFlush(); } } } //将URL加入待处理队列 public void schedule(CandidateURI caUri) { String canon = canonicalize(caUri); if (caUri.forceFetch()) { alreadyIncluded.addForce(canon, caUri); } else { alreadyIncluded.add(canon, caUri); } }
4. BdbFrontier
继承了WorkQueueFrontier,是Heritrix唯一具有实际意义的链接工厂。代码如下:
public class BdbFrontier extends WorkQueueFrontier implements Serializable { /** 所有待抓取的链接*/ protected transient BdbMultipleWorkQueues pendingUris; //初始化pendingUris,父类为抽象方法 protected void initQueue() throws IOException { try { this.pendingUris = createMultipleWorkQueues(); } catch(DatabaseException e) { throw (IOException)new IOException(e.getMessage()).initCause(e); } } private BdbMultipleWorkQueues createMultipleWorkQueues() throws DatabaseException { return new BdbMultipleWorkQueues(this.controller.getBdbEnvironment(), this.controller.getBdbEnvironment().getClassCatalog(), this.controller.isCheckpointRecover()); } protected BdbMultipleWorkQueues getWorkQueues() { return pendingUris; } … }
5. BdbUriUniqFilter
它实际上是一个过滤器,用来检查一个要进入等待队列的链接是否已经被抓取过。
方法代码如下:
//添加URL protected boolean setAdd(CharSequence uri) { DatabaseEntry key = new DatabaseEntry(); LongBinding.longToEntry(createKey(uri), key); long started = 0; OperationStatus status = null; try { if (logger.isLoggable(Level.INFO)) { started = System.currentTimeMillis(); } //添加到数据库 status = alreadySeen.putNoOverwrite(null, key, ZERO_LENGTH_ENTRY); if (logger.isLoggable(Level.INFO)) { aggregatedLookupTime +=(System.currentTimeMillis() - started); } } catch (DatabaseException e) { logger.severe(e.getMessage()); } if (status == OperationStatus.SUCCESS) { count++; if (logger.isLoggable(Level.INFO)) { final int logAt = 10000; if (count > 0 && ((count % logAt) == 0)) { logger.info("Average lookup " + (aggregatedLookupTime / logAt) + "ms."); aggregatedLookupTime = 0; } } } //如果存在,返回false if(status == OperationStatus.KEYEXIST) { return false; } else { return true; } }