Index: Injector.java
===================================================================
--- Injector.java (revision 1589699)
+++ Injector.java (working copy)
@@ -39,7 +39,7 @@
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
-/** This class takes a flat file of URLs and adds them to the of pages to be
+/** This class takes a folder of flat files of URLs and adds them to the list of pages to be
* crawled. Useful for bootstrapping the system.
* The URL files contain one URL per line, optionally followed by custom metadata
* separated by tabs with the metadata key separated from the corresponding value by '='.
@@ -59,7 +59,12 @@
/** metadata key reserved for setting a fixed custom fetchInterval for a specific URL */
public static String nutchFixedFetchIntervalMDName = "nutch.fetchInterval.fixed";
- /** Normalize and filter injected urls. */
+ /**
+ * Parses the url line into the url and its metadata.
+ * Filters out urls based on the filter plugins.
+ * Produces a URL(normalized) and a CrawlDatum that has the following attributes:
+ * Fetchinterval, Score, status=STATUS_INJECTED, metadata from the url line
+ */
public static class InjectMapper implements Mapper, Text, Text, CrawlDatum> {
private URLNormalizers urlNormalizers;
private int interval;
@@ -68,7 +73,9 @@
private URLFilters filters;
private ScoringFilters scfilters;
private long curTime;
-
+/**
+ * Initialize values of the injection mapper
+ */
public void configure(JobConf job) {
this.jobConf = job;
urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT);
@@ -80,14 +87,14 @@
}
public void close() {}
-
+
public void map(WritableComparable> key, Text value,
OutputCollector output, Reporter reporter)
throws IOException {
String url = value.toString().trim(); // value is line of text
-
+
+ /* Ignore lines that start with # or empty lines */
if (url != null && ( url.length() == 0 || url.startsWith("#") ) ) {
- /* Ignore line that start with # */
return;
}
@@ -104,7 +111,7 @@
// find separation between name and value
int indexEquals = splits[s].indexOf("=");
if (indexEquals==-1) {
- // skip anything without a =
+ // skip anything without an =
continue;
}
String metaname = splits[s].substring(0, indexEquals);
@@ -114,16 +121,16 @@
customScore = Float.parseFloat(metavalue);}
catch (NumberFormatException nfe){}
}
- else if (metaname.equals(nutchFetchIntervalMDName)) {
- try {
- customInterval = Integer.parseInt(metavalue);}
- catch (NumberFormatException nfe){}
- }
- else if (metaname.equals(nutchFixedFetchIntervalMDName)) {
- try {
- fixedInterval = Integer.parseInt(metavalue);}
- catch (NumberFormatException nfe){}
- }
+ else if (metaname.equals(nutchFetchIntervalMDName)) {
+ try {
+ customInterval = Integer.parseInt(metavalue);}
+ catch (NumberFormatException nfe){}
+ }
+ else if (metaname.equals(nutchFixedFetchIntervalMDName)) {
+ try {
+ fixedInterval = Integer.parseInt(metavalue);}
+ catch (NumberFormatException nfe){}
+ }
else metadata.put(metaname,metavalue);
}
}
@@ -151,7 +158,7 @@
}
datum.setFetchTime(curTime);
- // now add the metadata
+ // Add the metadata
Iterator keysIter = metadata.keySet().iterator();
while (keysIter.hasNext()){
String keymd = keysIter.next();
@@ -180,7 +187,9 @@
private float scoreInjected;
private boolean overwrite = false;
private boolean update = false;
-
+ /**
+ * Initialize Injection reducer
+ */
public void configure(JobConf job) {
interval = job.getInt("db.fetch.interval.default", 2592000);
scoreInjected = job.getFloat("db.score.injected", 1.0f);
@@ -194,12 +203,21 @@
private CrawlDatum old = new CrawlDatum();
private CrawlDatum injected = new CrawlDatum();
-
+ /**
+ * Receives data of the same url
+ * Produces one datum that is one of the following:
+ * 1. Datum in crawlDB pre-injection
+ * 2. Datum in crawlDB, updated with new url metadata
+ * 3. Newly injected datum
+ * Depending on whether db.injector.overwrite or db.injector.update are set to true
+ */
public void reduce(Text key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException {
boolean oldSet = false;
boolean injectedSet = false;
+
+ //Find new and old records in crawlDB of that datum
while (values.hasNext()) {
CrawlDatum val = values.next();
if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
@@ -214,18 +232,14 @@
CrawlDatum res = null;
- // Old default behaviour
+ // Take New Record (Injected record does not exist in crawlDB)
if (injectedSet && !oldSet) {
res = injected;
- } else {
+ } else { // Take Old Record (Injected record exists in crawlDB)
res = old;
}
- /**
- * Whether to overwrite, ignore or update existing records
- * @see https://issues.apache.org/jira/browse/NUTCH-1405
- */
- // Injected record already exists and update but not overwrite
+ // Update Record (Injected record already exists and update=true)
if (injectedSet && oldSet && update && !overwrite) {
res = old;
old.putAllMetaData(injected);
@@ -233,7 +247,7 @@
old.setFetchInterval(injected.getFetchInterval() != interval ? injected.getFetchInterval() : old.getFetchInterval());
}
- // Injected record already exists and overwrite
+ // Overwrite Record (Injected record already exists and overwrite=true)
if (injectedSet && oldSet && overwrite) {
res = injected;
}
@@ -247,7 +261,14 @@
public Injector(Configuration conf) {
setConf(conf);
}
-
+ /**
+ * Main function of the injector that reads the urls in the urlDir,
+ * runs the url filters on them,
+ * normalizes them and injects them into the crawldb
+ * @param crawlDB the path to the crawl database (existing or new)
+ * @param urlDir the path to the url directory that contains the urls file
+ * @throws IOException if the urlDir path doesn't exist
+ */
public void inject(Path crawlDb, Path urlDir) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
@@ -301,7 +322,10 @@
long end = System.currentTimeMillis();
LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
-
+/**
+ * Entry point for injector through the command line
+ * Expects two arguments: a crawdb directory and a URL directory
+ */
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args);
System.exit(res);