2019年05月25日(星期六)  农历:己亥年四月廿一
  • 首页
  • JAVA
  • 开源JAVA爬虫crawler4j源码分析

作者:三年。分类: JAVA

爬虫在工作过程中,会有大量的URL需要存储和分配,如何高效的管理这些URL,是一个爬虫系统的重中之重。

crawler4j默认运行最多每小时解析几千个URL,在修改过后可以达到每小时几十万个(后面的文章中介绍),这么多的URL,应该如何管理呢?

crawler4j使用嵌入式数据库Berkeley DB JE 进行URL的临时存储和分配管理,关于Berkeley DB JE ,我在另一篇文章里做了简单介绍:

海量简单数据不想用SQL?试试高效的嵌入式数据库Berkeley DB JE吧!

WebURL:

还是先从BasicCrawlController的main函数开始,看程序是如何添加入口URL的:

controller.addSeed("http://www.ics.uci.edu/");

controller.addSeed("http://www.ics.uci.edu/~lopes/");

controller.addSeed("http://www.ics.uci.edu/~welling/");

再看CrawlController的addSeed()方法:

public void addSeed(String pageUrl) {

addSeed(pageUrl, -1);

}

public void addSeed(String pageUrl, int docId) {

String canonicalUrl = URLCanonicalizer.getCanonicalURL(pageUrl);

if (canonicalUrl == null) {

logger.error("Invalid seed URL: " + pageUrl);

return;

}

if (docId < 0) {

docId = docIdServer.getDocId(canonicalUrl);

if (docId > 0) {

// This URL is already seen.

return;

}

docId = docIdServer.getNewDocID(canonicalUrl);

} else {

try {

docIdServer.addUrlAndDocId(canonicalUrl, docId);

} catch (Exception e) {

logger.error("Could not add seed: " + e.getMessage());

}

}

WebURL webUrl = new WebURL();

webUrl.setURL(canonicalUrl);

webUrl.setDocid(docId);

webUrl.setDepth((short) 0);

if (!robotstxtServer.allows(webUrl)) {

logger.info("Robots.txt does not allow this seed: " + pageUrl);

} else {

frontier.schedule(webUrl);

}

}

这里定义了一个WebURL作为URL的Model类,存储了一些URL的属性:域、子域、路径、锚、URL地址,这些在调用setURL方法 时就会被解析出来,setURL主要是字符串的截取,还用到了TLDList.getInstance().contains(domain),就是从域 名列表文件tld-names.txt里查找判断URL里哪部分是域名,因为域名包括的部分可能不太一样, 如.cn、.com.cn、.gov、.gov.cn;还有一些爬虫属性:分配的ID、父URLID、父URL、深度、优先级,这些会在爬虫工作时指定, 所谓父URL就是在哪个页面发现的该地址,深度是第几级被发现的,如入口URL是0,从入口URL页面发现的地址是1,从1发现的新的是2,依此类推,优 先级高的(数字小的)会优先分配爬取。

DocIDServer:

addSeed里面setDocid是给URL分配一个惟一的ID,默认是从1开始自动增长:1 2 3 4 5... 虽然这里可以使用JAVA自带的集合类来管理和存储这些ID,但是为了确保惟一且保证在ID增长到了几十上百万时依然高效,crawler4j使用了前面 说的BDB JE来存储,当然还有一个原因是为了可恢复,即系统挂了恢复后爬虫可以继续,但我并不打算讨论这种情况,因为在这种情况下,crawler4j的运行效率 相当低!

用docIdServer.getDocId()来检查该URL是否已经存储,如果没有则docId = docIdServer.getNewDocID(canonicalUrl);获取新ID。看下docIdServer是怎么工作的,首先在 CrawlController构造函数中初始化并传入Environment(关于Env,请参考文章开头BDB JE链接):

docIdServer = new DocIDServer(env, config);

DocIdServer类只负责管理URL的ID,构造函数:

public DocIDServer(Environment env, CrawlConfig config) throws DatabaseException {

super(config);

DatabaseConfig dbConfig = new DatabaseConfig();

dbConfig.setAllowCreate(true);

dbConfig.setTransactional(config.isResumableCrawling());

dbConfig.setDeferredWrite(!config.isResumableCrawling());

docIDsDB = env.openDatabase(null, "DocIDs", dbConfig);

if (config.isResumableCrawling()) {

int docCount = getDocCount();

if (docCount > 0) {

logger.info("Loaded " + docCount + " URLs that had been detected in previous crawl.");

lastDocID = docCount;

}

} else {

lastDocID = 0;

}

}

这里只是简单的创建了一个名叫DocIDs的DB(有关可恢复不做讨论,这里和下面涉及resumable都是false)。这个DB是以URL为key,以ID为value存储的,因为key的惟一性,可保证URL不重复,且更好的用URL来进行ID查询。

再看getDocId():

public int getDocId(String url) {

synchronized (mutex) {

if (docIDsDB == null) {

return -1;

}

OperationStatus result;

DatabaseEntry value = new DatabaseEntry();

try {

DatabaseEntry key = new DatabaseEntry(url.getBytes());

result = docIDsDB.get(null, key, value, null);

if (result == OperationStatus.SUCCESS && value.getData().length > 0) {

return Util.byteArray2Int(value.getData());

}

} catch (Exception e) {

e.printStackTrace();

}

return -1;

}

}

因为是多线程访问,所以这里用了synchronized (mutex) 保证线程安全。如果能从DB中查询出key是指定的URL的话,则返回相应的ID value,否则返回-1说明没有找到。

public int getNewDocID(String url) {

synchronized (mutex) {

try {

// Make sure that we have not already assigned a docid for this URL

int docid = getDocId(url);

if (docid > 0) {

return docid;

}

lastDocID++;

docIDsDB.put(null, new DatabaseEntry(url.getBytes()), new DatabaseEntry(Util.int2ByteArray(lastDocID)));

return lastDocID;

} catch (Exception e) {

e.printStackTrace();

}

return -1;

}

}

用getNewDocID生成新的ID并将它和URL存入DB。

addUrlAndDocId()是当你不想自动生成ID而想自己指定一个ID时使用,一般不建议用,除非是第二次使用并想用和之前一样的ID,但如果这样的话得先查出前一次的ID,效率不高,且真的没多大必要!

DocIDServer主要就这两个方法了,逻辑很简单,功能也很单一。

Frontier

回到addSeed方法,最后一句frontier.schedule(webUrl);将指定URL加入队列,只有加入队列之后爬虫线程才能对该URL进行解析。

Frontier有两个重要的新属性,一个是计数器Counters,另一个是URL队列WorkQueues:

protected WorkQueues workQueues = new WorkQueues(env, "PendingURLsDB", config.isResumableCrawling());

protected Counters counters = new Counters(env, config);

计数器Counters实现比较简单,用一个HashMap存储,目前只存储了两个值:已加入队列的URL数和已爬取完成的URL数。

URL队列WorkQueues保存当前已发现的但是又还没有分配给爬虫线程的WebURL,用BDB JE存储,创建了一个名为PendingURLsDB的数据库:

public WorkQueues(Environment env, String dbName, boolean resumable) throws DatabaseException {

this.env = env;

this.resumable = resumable;

DatabaseConfig dbConfig = new DatabaseConfig();

dbConfig.setAllowCreate(true);

dbConfig.setTransactional(resumable);

dbConfig.setDeferredWrite(!resumable);

urlsDB = env.openDatabase(null, dbName, dbConfig);

webURLBinding = new WebURLTupleBinding();

}

自定义了一个WebURLTupleBinding,可以在JE中保存WebURL的各个属性。如果你需要给WebURL添加一些属性,比如锚 的标签名是a,img还是iframe,除了要在WebURL里面添加外,也需要修改WebURLTupleBinding,否则不会被存入DB,线程取 出的时候该属性就会为空!

WorkQueues使用put, delete, get方法来实现增删查,以6位byte作为key,第一位是WebURL的priority属性,第二位是WebURL的深度属性,剩下4位是用 WebURL的ID转换成byte;用WebURLTupleBinding中定义的内容作为value。因为数据库是以key为索引存储的,所以优先级 高的即数字小的会排在前面,接着深度小的也会排在前面。

关于优先级,crawler4j有个小BUG,就是WebURL的priority属性默认就是最小0,这使得如果你想优先爬取某URL就不可能了,解决方法是在WebURL构造函数或setURL里为priority赋上默认值,至于赋什么值好,就看着办吧嘿嘿!

Frontier提供两个方法添加URL到队列:

public void scheduleAll(List urls) {

int maxPagesToFetch = config.getMaxPagesToFetch();

synchronized (mutex) {

int newScheduledPage = 0;

for (WebURL url : urls) {

if (maxPagesToFetch > 0 && (scheduledPages + newScheduledPage) >= maxPagesToFetch) {

break;

}

try {

workQueues.put(url);

newScheduledPage++;

} catch (DatabaseException e) {

logger.error("Error while puting the url in the work queue.");

}

}

if (newScheduledPage > 0) {

scheduledPages += newScheduledPage;

counters.increment(Counters.ReservedCounterNames.SCHEDULED_PAGES, newScheduledPage);

}

synchronized (waitingList) {

waitingList.notifyAll();

}

}

}

public void schedule(WebURL url) {

int maxPagesToFetch = config.getMaxPagesToFetch();

synchronized (mutex) {

try {

if (maxPagesToFetch < 0 || scheduledPages < maxPagesToFetch) {

workQueues.put(url);

scheduledPages++;

counters.increment(Counters.ReservedCounterNames.SCHEDULED_PAGES);

}

} catch (DatabaseException e) {

logger.error("Error while puting the url in the work queue.");

}

}

}

单个添加和批量添加,添加到队列的同时设置计数器,多个逻辑有各自的实现类,实现分离,Frontier负责组合这些逻辑,外部只需调用Fontier即可!Frontier还有一方法就是获取队列中的数据,一次可获取多条:

public void getNextURLs(int max, List result) {

while (true) {

synchronized (mutex) {

if (isFinished) {

return;

}

try {

List curResults = workQueues.get(max);

workQueues.delete(curResults.size());

if (inProcessPages != null) {

for (WebURL curPage : curResults) {

inProcessPages.put(curPage);

}

}

result.addAll(curResults);

} catch (DatabaseException e) {

logger.error("Error while getting next urls: " + e.getMessage());

e.printStackTrace();

}

if (result.size() > 0) {

return;

}

}

try {

synchronized (waitingList) {

waitingList.wait();

}

} catch (InterruptedException ignored) {

// Do nothing

}

if (isFinished) {

return;

}

}

}

爬虫线程每次调用这个方法领取50个URL,领取完就从队列删除,开始解析,解析完后重新调用领取。如果队列是空的,线程将会在这个方法里面等 待wait(),其它线程也会在synchronized处排队,直到scheduleAll方法被调用,线程才会重新被激活notifyAll()。

以上就是crawler4j爬虫存储和分配URL的代码分析,涉及的类都被放在了edu.uci.ics.crawler4j.frontier包,该包还有一个类InProcessPagesDB是用来作可恢复爬取的,不做讨论。

温馨提示如有转载或引用以上内容之必要,敬请将本文链接作为出处标注,谢谢合作!

已有 0/1628 人参与

发表评论:



手Q扫描加入Java初学者群