天天看点

Nutch2 之 GeneratorJob

版本:Nutch2.2.1

类:GeneratorJob

源码:src/java/org/apache/nutch/crawl/GeneratorJob.java

GeneratorJob从数据库中取出WebPage,执行若干处理后,更新对应的数据库记录。

重点都在这一行里:

//file: src/java/org/apache/nutch/crawl/GeneratorJob.java
//GeneratorJob.run(Map<String,Object>) line: 196   
    StorageUtils.initMapperJob(currentJob, fields, SelectorEntry.class,
        WebPage.class, GeneratorMapper.class, SelectorEntryPartitioner.class, true);
           

initMapperJob的实现如下:

//file: src/java/org/apache/nutch/storage/StorageUtils.java
//StorageUtils.initMapperJob(Job, Collection<Field>, Class<K>, Class<V>, Class<GoraMapper<String,WebPage,K,V>>, Class<Partitioner<K,V>>, boolean) line: 113
public static <K, V> void initMapperJob(Job job,
      Collection<WebPage.Field> fields,
      Class<K> outKeyClass, Class<V> outValueClass,
      Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
      Class<? extends Partitioner<K, V>> partitionerClass, boolean reuseObjects)
  throws ClassNotFoundException, IOException {

    DataStore<String, WebPage> store = createWebStore(job.getConfiguration(),  String.class, WebPage.class);
    if (store==null) throw new RuntimeException("Could not create datastore");
    Query<String, WebPage> query = store.newQuery();
    query.setFields(toStringArray(fields));
    GoraMapper.initMapperJob(job, query, store, outKeyClass, outValueClass, mapperClass, partitionerClass, reuseObjects);
    GoraOutputFormat.setOutput(job, store, true);
  }
           

一、构建Query

GoraMapper.initMapperJob()有一个参数是query,追踪query的构建过程,很容易找到了这个:

//file:  src/java/org/apache/nutch/crawl/GeneratorJob.java
//line: 157
  public Collection<WebPage.Field> getFields(Job job) {
    Collection<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
    fields.addAll(FetchScheduleFactory.getFetchSchedule(job.getConfiguration()).getFields());
    return fields;
  }
           

其中4个字段是在下面定义:

//file:  src/java/org/apache/nutch/crawl/GeneratorJob.java
//line: 65
  private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();

  static {
    FIELDS.add(WebPage.Field.FETCH_TIME);
    FIELDS.add(WebPage.Field.SCORE);
    FIELDS.add(WebPage.Field.STATUS);
    FIELDS.add(WebPage.Field.MARKERS);
  }
           

另外3个字段在这里:

//file: src/java/org/apache/nutch/crawl/AbstractFetchSchedule.java
//line: 43
  private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();

  static {
    FIELDS.add(WebPage.Field.FETCH_TIME);
    FIELDS.add(WebPage.Field.RETRIES_SINCE_FETCH);
    FIELDS.add(WebPage.Field.FETCH_INTERVAL);
  }
           

AbstractFetchSchedule 实现了FetchSchedule接口。

query对象作为参过通过DefaultStringifier传递,以后再写一篇hadoop的几种参数传递方式的博文。

二、查询

可以看到这里面创建了DataStore,首先用来取出webpage:

//file: gora-sql-0.1.1-incubating/org/apache/gora/sql/store/SqlStore.java
//SqlStore<K,T>.execute(Query<K,T>) line: 419   
{
...
      statement = getConnection().prepareStatement(select.toString());

      setParametersForPreparedStatement(statement, query);
...
}
           

select.toString() print出来的如下:

SELECT id,fetchInterval,retriesSinceFetch,markers,status,score,fetchTime FROM webpage LIMIT 10000;
           

三、webpage处理

Map过程 (GeneratorMapper.class)

1. 检查Markers里是否有"_gnmrk_",如果有,说明已经generate过,不需要重复生成

2. 检查Markers里记录的网页深度(dist),通过generate.max.distance配置,默认值为-1,表示不限

3. 按规则过滤URL

4. 检查是否到了抓取时间 ( fetchTime <= curTime )

5. 设置SelectorEntry的url、score

Reduce过程 (GenratorReducer.class)

1. 检查已生成的任务数是否超过TopN或Long.MAX

2. 检查单个domain或host下的任务数是否超过maxCount(默认配置generate.max.count=-1)

3. 设置batchId (如果独立运行genrator会在run方法里面生成batchId,如果是直接运行crawl,batchId是个"*")

4. webpage 更新到数据库 过程参考:http://blog.csdn.net/itufo/article/details/20727597