Index: src/java/org/apache/nutch/crawl/NutchWritable.java
===================================================================
--- src/java/org/apache/nutch/crawl/NutchWritable.java	(revision 1669125)
+++ src/java/org/apache/nutch/crawl/NutchWritable.java	(working copy)
@@ -47,7 +47,8 @@
       org.apache.nutch.parse.ParseStatus.class,
       org.apache.nutch.protocol.Content.class,
       org.apache.nutch.protocol.ProtocolStatus.class,
-      org.apache.nutch.scoring.webgraph.LinkDatum.class
+      org.apache.nutch.scoring.webgraph.LinkDatum.class,
+      org.apache.nutch.util.hostdb.HostDatum.class
     };
   }
 
Index: src/java/org/apache/nutch/util/hostdb/DumpHostDb.java
===================================================================
--- src/java/org/apache/nutch/util/hostdb/DumpHostDb.java	(revision 0)
+++ src/java/org/apache/nutch/util/hostdb/DumpHostDb.java	(working copy)
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util.hostdb;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+/**
+
+ */
+public class DumpHostDb extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(DumpHostDb.class);
+
+  public static final String HOSTDB_FAILURE_THRESHOLD = "hostdb.failure.threshold";
+  public static final String HOSTDB_DUMP_HOMEPAGES = "hostdb.dump.homepages";
+  public static final String HOSTDB_NUM_PAGES_THRESHOLD = "hostdb.num.pages.threshold";
+
+  static class DumpHostDbMapper extends Mapper<Text, HostDatum, Text, Text> {
+    protected Integer failureThreshold = -1;
+    protected Integer numPagesThreshold = -1;
+    protected boolean dumpHomepages = false;
+    protected Text emptyText = new Text();
+
+    public void setup(Context context) {
+      failureThreshold = context.getConfiguration().getInt(HOSTDB_FAILURE_THRESHOLD, -1);
+      numPagesThreshold = context.getConfiguration().getInt(HOSTDB_NUM_PAGES_THRESHOLD, -1);
+      dumpHomepages = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOMEPAGES, false);
+    }
+
+    public void map(Text key, HostDatum datum, Context context) throws IOException, InterruptedException {
+      if (dumpHomepages) {
+        if (datum.numFailures() == 0) {
+          if (datum.hasHomepageUrl()) {
+            if (numPagesThreshold == -1 || (datum.getFetched() + datum.getNotModified())>= numPagesThreshold) {
+              context.write(new Text(datum.getHomepageUrl()), emptyText);
+            }
+          } else {
+            LOG.info(key.toString() + " has no homepage.");
+          }
+        }
+      } else {
+        if (datum.numFailures() >= failureThreshold) {
+
+          // TODO: also write to external storage, i.e. memcache
+          context.write(key, emptyText);
+        }
+      }
+    }
+  }
+
+  // Todo, reduce unknown hosts to single unknown domain if possible. Enable via configuration
+  // host_a.example.org,host_a.example.org ==> example.org
+//   static class DumpHostDbReducer extends Reduce<Text, Text, Text, Text> {
+//     public void setup(Context context) { }
+//
+//     public void reduce(Text domain, Iterable<Text> hosts, Context context) throws IOException, InterruptedException {
+//
+//     }
+//   }
+
+  private void dumpHostDb(Path hostDb, Path output, Integer failureThreshold, boolean dumpHomepages, Integer numPagesThreshold) throws Exception {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("DumpHostDb: starting at " + sdf.format(start));
+
+    Configuration conf = getConf();
+    conf.setInt(HOSTDB_FAILURE_THRESHOLD, failureThreshold);
+    conf.setInt(HOSTDB_NUM_PAGES_THRESHOLD, numPagesThreshold);
+    conf.setBoolean(HOSTDB_DUMP_HOMEPAGES, dumpHomepages);
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    conf.set("mapred.textoutputformat.separator", "\t");
+    
+    Job job = new Job(conf, "DumpHostDb");
+    job.setJarByClass(DumpHostDb.class);
+
+    FileInputFormat.addInputPath(job, new Path(hostDb, "current"));
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setJarByClass(DumpHostDb.class);
+
+    if (!dumpHomepages && failureThreshold == -1) {
+      job.setMapperClass(Mapper.class);
+    } else {
+      job.setMapperClass(DumpHostDbMapper.class);
+    }
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks(0);
+
+    try {
+      job.waitForCompletion(true);
+    } catch (Exception e) {
+      throw e;
+    }
+
+    long end = System.currentTimeMillis();
+    LOG.info("DumpHostDb: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String args[]) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new DumpHostDb(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err
+          .println("Usage: DumpHostDb <hostdb> <output> <[-dumpFailedHosts <threshold>] [-dumpHomepages [-numPagesThreshold <threshold>]]>");
+      return -1;
+    }
+
+    boolean dumpHomepages = false;
+    Integer failureThreshold = -1;
+    Integer numPagesThreshold = -1;
+
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-dumpHomepages")) {
+        LOG.info("DumpHostDB: dumping homepage URL's");
+        dumpHomepages = true;
+      }
+      if (args[i].equals("-numPagesThreshold")) {
+        numPagesThreshold = Integer.parseInt(args[i + 1]);
+        i++;
+      }
+      if (args[i].equals("-dumpFailedHosts")) {
+        failureThreshold = Integer.parseInt(args[i + 1]);
+        LOG.info("DumpHostDB: dumping failed hosts with " + failureThreshold+ " as threshold");
+      }
+    }
+
+    try {
+      dumpHostDb(new Path(args[0]), new Path(args[1]), failureThreshold, dumpHomepages, numPagesThreshold);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("DumpHostDb: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+
+}
+
+
Index: src/java/org/apache/nutch/util/hostdb/HostDatum.java
===================================================================
--- src/java/org/apache/nutch/util/hostdb/HostDatum.java	(revision 0)
+++ src/java/org/apache/nutch/util/hostdb/HostDatum.java	(working copy)
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util.hostdb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map.Entry;
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ */
+public class HostDatum implements Writable, Cloneable {
+  protected int failures = 0;
+  protected float score = 0;
+  protected Date lastCheck = new Date(0);
+  protected String homepageUrl = new String();
+
+  protected MapWritable metaData = new MapWritable();
+
+  // Records the number of times DNS look-up failed, may indicate host no longer exists
+  protected int dnsFailures = 0;
+
+  // Records the number of connection failures, may indicate our netwerk being blocked by firewall
+  protected int connectionFailures = 0;
+
+  protected int unfetched = 0;
+  protected int fetched = 0;
+  protected int notModified = 0;
+  protected int redirTemp = 0;
+  protected int redirPerm = 0;
+  protected int gone = 0;
+
+  public HostDatum() {
+  }
+
+  public HostDatum(float score) {
+    this(score, new Date());
+  }
+
+  public HostDatum(float score, Date lastCheck) {
+    this(score, lastCheck, new String());
+  }
+
+  public HostDatum(float score, Date lastCheck, String homepageUrl) {
+    this.score =  score;
+    this.lastCheck = lastCheck;
+    this.homepageUrl = homepageUrl;
+  }
+
+  public void resetFailures() {
+    setDnsFailures(0);
+    setConnectionFailures(0);
+  }
+
+  public void setDnsFailures(Integer dnsFailures) {
+    this.dnsFailures = dnsFailures;
+  }
+
+  public void setConnectionFailures(Integer connectionFailures) {
+    this.connectionFailures = connectionFailures;
+  }
+
+  public void incDnsFailures() {
+    this.dnsFailures++;
+  }
+
+  public void incConnectionFailures() {
+    this.connectionFailures++;
+  }
+
+  public Integer numFailures() {
+    return getDnsFailures() + getConnectionFailures();
+  }
+
+  public Integer getDnsFailures() {
+    return dnsFailures;
+  }
+
+  public Integer getConnectionFailures() {
+    return connectionFailures;
+  }
+
+  public void setScore(float score) {
+    this.score = score;
+  }
+
+  public void setLastCheck() {
+    setLastCheck(new Date());
+  }
+
+  public void setLastCheck(Date date) {
+    lastCheck = date;
+  }
+
+  public boolean isEmpty() {
+    return (lastCheck.getTime() == 0) ? true : false;
+  }
+
+  public float getScore() {
+    return score;
+  }
+
+  public Integer numRecords() {
+    return unfetched + fetched + gone + redirPerm + redirTemp + notModified;
+  }
+
+  public Date getLastCheck() {
+    return lastCheck;
+  }
+
+  public boolean hasHomepageUrl() {
+    return homepageUrl.length() > 0;
+  }
+
+  public String getHomepageUrl() {
+    return homepageUrl;
+  }
+
+  public void setHomepageUrl(String homepageUrl) {
+    this.homepageUrl = homepageUrl;
+  }
+
+  public void setUnfetched(int val) {
+    unfetched = val;
+  }
+
+  public int getUnfetched() {
+    return unfetched;
+  }
+
+  public void setFetched(int val) {
+    fetched = val;
+  }
+
+  public int getFetched() {
+    return fetched;
+  }
+
+  public void setNotModified(int val) {
+    notModified = val;
+  }
+
+  public int getNotModified() {
+    return notModified;
+  }
+
+  public void setRedirTemp(int val) {
+    redirTemp = val;
+  }
+
+  public int getRedirTemp() {
+    return redirTemp;
+  }
+
+  public void setRedirPerm(int val) {
+    redirPerm = val;
+  }
+
+  public int getRedirPerm() {
+    return redirPerm;
+  }
+
+  public void setGone(int val) {
+    gone = val;
+  }
+
+  public int getGone() {
+    return gone;
+  }
+
+  public void resetStatistics() {
+    setUnfetched(0);
+    setFetched(0);
+    setGone(0);
+    setRedirTemp(0);
+    setRedirPerm(0);
+    setNotModified(0);
+  }
+
+   public void setMetaData(org.apache.hadoop.io.MapWritable mapWritable) {
+     this.metaData = new org.apache.hadoop.io.MapWritable(mapWritable);
+   }
+
+   /**
+    * Add all metadata from other CrawlDatum to this CrawlDatum.
+    *
+    * @param other HostDatum
+    */
+   public void putAllMetaData(HostDatum other) {
+     for (Entry<Writable, Writable> e : other.getMetaData().entrySet()) {
+       getMetaData().put(e.getKey(), e.getValue());
+     }
+   }
+
+  /**
+   * returns a MapWritable if it was set or read in @see readFields(DataInput),
+   * returns empty map in case CrawlDatum was freshly created (lazily instantiated).
+   */
+  public org.apache.hadoop.io.MapWritable getMetaData() {
+    if (this.metaData == null) this.metaData = new org.apache.hadoop.io.MapWritable();
+    return this.metaData;
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    HostDatum result = (HostDatum)super.clone();
+    result.score = score;
+    result.lastCheck = lastCheck;
+    result.homepageUrl = homepageUrl;
+
+    result.dnsFailures = dnsFailures;
+    result.connectionFailures = connectionFailures;
+
+    result.unfetched = unfetched;
+    result.fetched = fetched;
+    result.notModified = notModified;
+    result.redirTemp = redirTemp;
+    result.redirPerm = redirPerm;
+    result.gone = gone;
+
+    result.metaData = metaData;
+
+    return result;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    score = in.readFloat();
+    lastCheck = new Date(in.readLong());
+    homepageUrl = Text.readString(in);
+
+    dnsFailures = in.readInt();
+    connectionFailures = in.readInt();
+
+    unfetched= in.readInt();
+    fetched= in.readInt();
+    notModified= in.readInt();
+    redirTemp= in.readInt();
+    redirPerm = in.readInt();
+    gone = in.readInt();
+
+    metaData = new org.apache.hadoop.io.MapWritable();
+    metaData.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeFloat(score);
+    out.writeLong(lastCheck.getTime());
+    Text.writeString(out, homepageUrl);
+
+    out.writeInt(dnsFailures);
+    out.writeInt(connectionFailures);
+
+    out.writeInt(unfetched);
+    out.writeInt(fetched);
+    out.writeInt(notModified);
+    out.writeInt(redirTemp);
+    out.writeInt(redirPerm);
+    out.writeInt(gone);
+
+    metaData.write(out);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append(Integer.toString(getUnfetched()));
+    buf.append("\t");
+    buf.append(Integer.toString(getFetched()));
+    buf.append("\t");
+    buf.append(Integer.toString(getGone()));
+    buf.append("\t");
+    buf.append(Integer.toString(getRedirTemp()));
+    buf.append("\t");
+    buf.append(Integer.toString(getRedirPerm()));
+    buf.append("\t");
+    buf.append(Integer.toString(getNotModified()));
+    buf.append("\t");
+    buf.append(Integer.toString(numRecords()));
+    buf.append("\t");
+    buf.append(Integer.toString(getDnsFailures()));
+    buf.append("\t");
+    buf.append(Integer.toString(getConnectionFailures()));
+    buf.append("\t");
+    buf.append(Integer.toString(numFailures()));
+    buf.append("\t");
+    buf.append(Float.toString(score));
+    buf.append("\t");
+    buf.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastCheck));
+    buf.append("\t");
+    buf.append(homepageUrl);
+    for (Entry<Writable, Writable> e : getMetaData().entrySet()) {
+      buf.append(e.getKey().toString());
+      buf.append(':');
+      buf.append(e.getValue().toString());
+      buf.append("|||");
+    }
+    return buf.toString();
+  }
+
+}
+
Index: src/java/org/apache/nutch/util/hostdb/UpdateHostDb.java
===================================================================
--- src/java/org/apache/nutch/util/hostdb/UpdateHostDb.java	(revision 0)
+++ src/java/org/apache/nutch/util/hostdb/UpdateHostDb.java	(working copy)
@@ -0,0 +1,930 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util.hostdb;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleInputs;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tool to create a HostDB from the CrawlDB. It aggregates fetch status values
+ * by host and checks DNS entries for hosts.
+ *
+ * @author markus@apache.org
+ */
+public class UpdateHostDb extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDb.class);
+  public static final String LOCK_NAME = ".locked";
+
+  public static final String HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD = "hostdb.purge.failed.hosts.threshold";
+  public static final String HOSTDB_NUM_RESOLVER_THREADS = "hostdb.num.resolvers.threads";
+  public static final String HOSTDB_RECHECK_INTERVAL = "hostdb.recheck.interval";
+  public static final String HOSTDB_CHECK_FAILED = "hostdb.check.failed";
+  public static final String HOSTDB_CHECK_NEW = "hostdb.check.new";
+  public static final String HOSTDB_CHECK_KNOWN = "hostdb.check.known";
+  public static final String HOSTDB_FORCE_CHECK = "hostdb.force.check";
+  public static final String HOSTDB_URL_FILTERING = "hostdb.url.filter";
+  public static final String HOSTDB_URL_NORMALIZING = "hostdb.url.normalize";
+  public static final String HOSTDB_NUMERIC_FIELDS = "hostdb.numeric.fields";
+  public static final String HOSTDB_STRING_FIELDS = "hostdb.string.fields";
+
+  /**
+   * Mapper ingesting HostDB and CrawlDB entries. Additionally it can also read
+   * host score info from a plain text key/value file generated by the
+   * Webgraph's NodeDumper tool.
+   */
+  public static class UpdateHostDbMapper
+    implements Mapper<Text, Writable, Text, NutchWritable> {
+    protected Text host = new Text();
+    protected HostDatum hostDatum = null;
+    protected CrawlDatum crawlDatum = null;
+    protected String reprUrl = null;
+    protected String buffer = null;
+    protected String[] args = null;
+    protected boolean filter = false;
+    protected boolean normalize = false;
+    protected boolean readingCrawlDb = false;
+    protected URLFilters filters = null;
+    protected URLNormalizers normalizers = null;
+
+    public void close() {}
+
+    /**
+     * @param JobConf
+     * @return void
+     */
+    public void configure(JobConf job) {
+      readingCrawlDb = job.getBoolean("hostdb.reading.crawldb", false);
+      filter = job.getBoolean(HOSTDB_URL_FILTERING, false);
+      normalize = job.getBoolean(HOSTDB_URL_NORMALIZING, false);
+
+      if (filter)
+        filters = new URLFilters(job);
+      if (normalize)
+        normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_DEFAULT);
+    }
+
+    /**
+     * Filters and or normalizes the input URL
+     *
+     * @param String
+     * @return String
+     */
+    protected String filterNormalize(String url) {
+      // We actually receive a hostname here so let's make a URL
+      // TODO: we force shop.fcgroningen to be https, how do we know that here?
+      // http://issues.openindex.io/browse/SPIDER-40
+      url = "http://" + url + "/";
+
+      try {
+        if (normalize)
+          url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
+        if (filter)
+          url = filters.filter(url);
+        if (url == null)
+          return null;
+      } catch (Exception e) {
+        return null;
+      }
+
+      // Turn back to host
+      return URLUtil.getHost(url);
+    }
+
+    /**
+     * Mapper ingesting records from the HostDB, CrawlDB and plaintext host
+     * scores file. Statistics and scores are passed on.
+     *
+     * @param Text key
+     * @param Writable value
+     * @param OutputCollector<Text,NutchWritable> output
+     * @param Reporter reporter
+     * @return void
+     */
+    public void map(Text key, Writable value,
+      OutputCollector<Text,NutchWritable> output, Reporter reporter)
+      throws IOException {
+
+      // Get the key!
+      String keyStr = key.toString();
+
+      // Check if we process records from the CrawlDB
+      if (key instanceof Text && value instanceof CrawlDatum) {
+        // Get the normalized and filtered host of this URL
+        buffer = filterNormalize(URLUtil.getHost(keyStr));
+
+        // Filtered out?
+        if (buffer == null) {
+          reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+          LOG.info("UpdateHostDb: " + URLUtil.getHost(keyStr) + " crawldatum has been filtered");
+          return;
+        }
+
+        // Set the host of this URL
+        host.set(buffer);
+        crawlDatum = (CrawlDatum)value;
+        hostDatum = new HostDatum();
+
+        /**
+         * TODO: fix multi redirects: host_a => host_b/page => host_c/page/whatever
+         * http://www.ferienwohnung-armbruster.de/
+         * http://www.ferienwohnung-armbruster.de/website/
+         * http://www.ferienwohnung-armbruster.de/website/willkommen.php
+         *
+         * We cannot reresolve redirects for host objects as CrawlDatum metadata is
+         * not available. We also cannot reliably use the reducer in all cases
+         * since redirects may be across hosts or even domains. The example
+         * above has redirects that will end up in the same reducer. During that
+         * phase, however, we do not know which URL redirects to the next URL.
+         */
+        // Do not resolve homepages when the root URL is unfetched
+        if (crawlDatum.getStatus() != CrawlDatum.STATUS_DB_UNFETCHED) {
+          // Get the protocol
+          String protocol = URLUtil.getProtocol(keyStr);
+          
+          // Get the proposed homepage URL
+          String homepage = protocol + "://" + buffer + "/";
+
+          // Check if the current key is equals the host
+          if (keyStr.equals(homepage)) {
+            // Check if this is a redirect to the real home page
+            if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM ||
+              crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
+
+              // Obtain the repr url for this redirect via protocolstatus from the metadata
+              ProtocolStatus z = (ProtocolStatus)crawlDatum.getMetaData().
+                get(Nutch.WRITABLE_PROTO_STATUS_KEY);
+
+              // Get the protocol status' arguments
+              args = z.getArgs();
+
+              // ..and the possible redirect URL
+              reprUrl = args[0];
+
+              // Am i a redirect?
+              if (reprUrl != null) {
+                LOG.info("UpdateHostDb: homepage: " + keyStr + " redirects to: " + args[0]);
+                output.collect(host, new NutchWritable(hostDatum));
+                hostDatum.setHomepageUrl(reprUrl);
+              } else {
+                LOG.info("UpdateHostDb: homepage: " + keyStr + 
+                  " redirects to: " + args[0] + " but has been filtered out");
+              }
+            } else {
+              hostDatum.setHomepageUrl(homepage);
+              output.collect(host, new NutchWritable(hostDatum));
+              LOG.info("UpdateHostDb: homepage: " + homepage);
+            }
+          }
+        }
+
+        // Always emit crawl datum
+        output.collect(host, new NutchWritable(crawlDatum));
+      }
+
+      // Check if we got a record from the hostdb
+      if (key instanceof Text && value instanceof HostDatum) {
+        buffer = filterNormalize(keyStr);
+
+        // Filtered out?
+        if (buffer == null) {
+          reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+          LOG.info("UpdateHostDb: " + key.toString() + " hostdatum has been filtered");
+          return;
+        }
+
+        // Get a HostDatum
+        hostDatum = (HostDatum)value;
+        key.set(buffer);
+
+        // If we're also reading CrawlDb entries, reset db_* statistics because
+        // we're aggregating them from CrawlDB anyway
+        if (readingCrawlDb) {
+          hostDatum.resetStatistics();
+        }
+
+        output.collect(key, new NutchWritable(hostDatum));
+      }
+
+      // Check if we got a record with host scores
+      if (key instanceof Text && value instanceof Text) {
+        buffer = filterNormalize(keyStr);
+
+        // Filtered out?
+        if (buffer == null) {
+          reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+          LOG.info("UpdateHostDb: " + key.toString() + " score has been filtered");
+          return;
+        }
+
+        key.set(buffer);
+
+        output.collect(key,
+          new NutchWritable(new FloatWritable(Float.parseFloat(value.toString()))));
+      }
+    }
+  }
+
+  /**
+   *
+   */
+  static class UpdateHostDbReducer implements Reducer<Text, NutchWritable, Text, HostDatum> {
+    protected ResolverThread resolverThread = null;
+
+    protected Integer numResolverThreads = 10;
+    protected static Integer purgeFailedHostsThreshold = -1;
+    protected static Integer recheckInterval = 86400000;
+    protected static boolean checkFailed = false;
+    protected static boolean checkNew = false;
+    protected static boolean checkKnown = false;
+    protected static boolean force = false;
+    protected static long now = new Date().getTime();
+    protected static String[] numericFields;
+    protected static String[] stringFields;
+
+    protected BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+    protected ThreadPoolExecutor executor = null;
+
+    /**
+     * Configures the thread pool and prestarts all resolver threads.
+     *
+     * @param JobConf
+     */
+    public void configure(JobConf job) {
+      purgeFailedHostsThreshold = job.getInt(HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1);
+      numResolverThreads = job.getInt(HOSTDB_NUM_RESOLVER_THREADS, 10);
+      recheckInterval = job.getInt(HOSTDB_RECHECK_INTERVAL, 86400) * 1000;
+      checkFailed = job.getBoolean(HOSTDB_CHECK_FAILED, false);
+      checkNew = job.getBoolean(HOSTDB_CHECK_NEW, false);
+      checkKnown = job.getBoolean(HOSTDB_CHECK_KNOWN, false);
+      force = job.getBoolean(HOSTDB_FORCE_CHECK, false);
+      numericFields = job.getStrings(HOSTDB_NUMERIC_FIELDS);
+      stringFields = job.getStrings(HOSTDB_STRING_FIELDS);
+
+      // Initialize the thread pool with our queue
+      executor = new ThreadPoolExecutor(numResolverThreads, numResolverThreads,
+        5, TimeUnit.SECONDS, queue);
+
+      // Run all threads in the pool
+      executor.prestartAllCoreThreads();
+    }
+
+    /**
+     *
+     */
+    public void reduce(Text key, Iterator<NutchWritable> values,
+      OutputCollector<Text,HostDatum> output, Reporter reporter) throws IOException {
+
+      Map<String,Map<String,Integer>> stringCounts = new HashMap<String,Map<String, Integer>>();
+      Map<String,Float> numericMaximums = new HashMap<String,Float>();
+      Map<String,Float> numericSums = new HashMap<String,Float>(); // used to calc averages
+      Map<String,Integer> numericCounts = new HashMap<String,Integer>(); // used to calc averages
+      Map<String,Float> numericMinimums = new HashMap<String,Float>();
+      
+      HostDatum hostDatum = new HostDatum();
+      float score = 0;
+      
+      // What fields do we need to collect metadata from
+      //String[] numericFields = {"_rs_", "adult"};
+      Text[] numericFieldWritables = new Text[numericFields.length];
+      for (int i = 0; i < numericFields.length; i++) {
+        numericFieldWritables[i] = new Text(numericFields[i]);
+      }
+      
+      //String[] stringFields = {"lang", "Content-Type"};
+      Text[] stringFieldWritables = new Text[stringFields.length];
+      for (int i = 0; i < stringFields.length; i++) {
+        stringFieldWritables[i] = new Text(stringFields[i]);
+        stringCounts.put(stringFields[i], new HashMap<String,Integer>());
+      }
+            
+      // Loop through all values until we find a non-empty HostDatum or use
+      // an empty if this is a new host for the host db
+      while (values.hasNext()) {
+        Writable value = values.next().get();
+        
+        // Count crawl datum status's and collect metadata from fields
+        if (value instanceof CrawlDatum) {
+          CrawlDatum buffer = (CrawlDatum)value;
+          
+          // Set the correct status field
+          switch (buffer.getStatus()) {
+            case CrawlDatum.STATUS_DB_UNFETCHED:
+              hostDatum.setUnfetched(hostDatum.getUnfetched() + 1);
+              break;
+
+            case CrawlDatum.STATUS_DB_FETCHED:
+              hostDatum.setFetched(hostDatum.getFetched() + 1);
+              break;
+
+            case CrawlDatum.STATUS_DB_GONE:
+              hostDatum.setGone(hostDatum.getGone() + 1);
+              break;
+
+            case CrawlDatum.STATUS_DB_REDIR_TEMP:
+              hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1);
+              break;
+
+            case CrawlDatum.STATUS_DB_REDIR_PERM:
+              hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1);
+              break;
+
+            case CrawlDatum.STATUS_DB_NOTMODIFIED:
+              hostDatum.setNotModified(hostDatum.getNotModified() + 1);
+              break;
+          }
+          
+          // Only gather metadata statistics for proper fetched pages
+          if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {            
+            // Deal with the string fields
+            for (int i = 0; i < stringFields.length; i++) {
+              // Does this field exist?
+              if (buffer.getMetaData().get(stringFieldWritables[i]) != null) {
+                // Get it!
+                String metadataValue = null;
+                try {
+                  metadataValue = buffer.getMetaData().get(stringFieldWritables[i]).toString();
+                } catch (Exception e) {
+                  LOG.error("Metadata field " + stringFields[i] + " is probably not a numeric value");
+                }
+              
+                // Does the value exist?
+                if (stringCounts.get(stringFields[i]).containsKey(metadataValue)) {
+                  // Yes, increment it
+                  stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1);
+                } else {
+                  // Create it!
+                  stringCounts.get(stringFields[i]).put(metadataValue, 1);
+                }
+              }
+            }
+            
+            // Deal with the numeric fields
+            for (int i = 0; i < numericFields.length; i++) {
+              // Does this field exist?
+              if (buffer.getMetaData().get(numericFieldWritables[i]) != null) {
+                // Get it!
+                Float metadataValue = Float.parseFloat(buffer.getMetaData().get(numericFieldWritables[i]).toString());
+              
+                // Does the minimum value exist?
+                if (numericMinimums.containsKey(numericFields[i])) {
+                  // Write if this is lower than existing value
+                  if (metadataValue < numericMinimums.get(numericFields[i])) {
+                    numericMinimums.put(numericFields[i], metadataValue);
+                  }
+                } else {
+                  // Create it!
+                  numericMinimums.put(numericFields[i], metadataValue);
+                }
+                
+                // Does the maximum value exist?
+                if (numericMaximums.containsKey(numericFields[i])) {
+                  // Write if this is lower than existing value
+                  if (metadataValue > numericMaximums.get(numericFields[i])) {
+                    numericMaximums.put(numericFields[i], metadataValue);
+                  }
+                } else {
+                  // Create it!
+                  numericMaximums.put(numericFields[i], metadataValue);
+                }
+                
+                // Sum it up!
+                if (numericSums.containsKey(numericFields[i])) {
+                  // Increment
+                  numericSums.put(numericFields[i], numericSums.get(numericFields[i]) + metadataValue);
+                  numericCounts.put(numericFields[i], numericCounts.get(numericFields[i]) + 1);
+                } else {
+                  // Create it!
+                  numericSums.put(numericFields[i], metadataValue);
+                  numericCounts.put(numericFields[i], 1);
+                }
+              }
+            }
+          }
+        }
+        
+        //
+        if (value instanceof HostDatum) {
+          HostDatum buffer = (HostDatum)value;
+
+          // Check homepage URL
+          if (buffer.hasHomepageUrl()) {
+            hostDatum.setHomepageUrl(buffer.getHomepageUrl());
+          }
+
+          // Check lastCheck timestamp
+          if (!buffer.isEmpty()) {
+            hostDatum.setLastCheck(buffer.getLastCheck());
+          }
+
+          // Check and set DNS failures
+          if (buffer.getDnsFailures() > 0) {
+            hostDatum.setDnsFailures(buffer.getDnsFailures());
+          }
+
+          // Check and set connection failures
+          if (buffer.getConnectionFailures() > 0) {
+            hostDatum.setConnectionFailures(buffer.getConnectionFailures());
+          }
+          
+          // Check metadata
+          if (!buffer.getMetaData().isEmpty()) {
+            hostDatum.setMetaData(buffer.getMetaData());
+          }
+
+          // Check and set score (score from Web Graph has precedence)
+          if (buffer.getScore() > 0) {
+            hostDatum.setScore(buffer.getScore());
+          }
+        }
+
+        // Check for the score
+        if (value instanceof FloatWritable) {
+          FloatWritable buffer = (FloatWritable)value;
+          score = buffer.get();
+        }
+      }
+
+      // Check if score was set from Web Graph
+      if (score > 0) {
+        hostDatum.setScore(score);
+      }
+      
+      // Set metadata
+      for (Map.Entry<String, Map<String,Integer>> entry : stringCounts.entrySet()) {
+        for (Map.Entry<String,Integer> subEntry : entry.getValue().entrySet()) {
+          hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new IntWritable(subEntry.getValue()));
+        }
+      }
+      for (Map.Entry<String, Float> entry : numericMaximums.entrySet()) {
+        hostDatum.getMetaData().put(new Text("max." + entry.getKey()), new FloatWritable(entry.getValue()));
+      }
+      for (Map.Entry<String, Float> entry : numericSums.entrySet()) {
+        hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), new FloatWritable(entry.getValue() / numericCounts.get(entry.getKey())));
+      }
+      for (Map.Entry<String, Float> entry : numericMinimums.entrySet()) {
+        hostDatum.getMetaData().put(new Text("min." + entry.getKey()), new FloatWritable(entry.getValue()));
+      }
+      
+      reporter.incrCounter("UpdateHostDb", "total_hosts", 1);
+
+      // See if this record is to be checked
+      if (shouldCheck(hostDatum)) {
+        // Make an entry
+        resolverThread = new ResolverThread(key.toString(), hostDatum, output, reporter);
+
+        // Add the entry to the queue (blocking)
+        try {
+          queue.put(resolverThread);
+        } catch (InterruptedException e) {
+          LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e));
+        }
+
+        // Do not progress, the datum will be written in the resolver thread
+        return;
+      } else {
+        reporter.incrCounter("UpdateHostDb", "skipped_not_eligible", 1);
+        LOG.info("UpdateHostDb: " + key.toString() + ": skipped_not_eligible");
+      }
+
+      // Write the host datum if it wasn't written by the resolver thread
+      output.collect(key, hostDatum);
+    }
+
+    /**
+     * Determines whether a record should be checked.
+     *
+     * @param HostDatum
+     * @return boolean
+     */
+    protected boolean shouldCheck(HostDatum datum) {
+      // Whether a new record is to be checked
+      if (checkNew && datum.isEmpty()) {
+        return true;
+      }
+
+      // Whether existing known hosts should be rechecked
+      if (checkKnown && !datum.isEmpty() && datum.getDnsFailures() == 0) {
+        return isEligibleForCheck(datum);
+      }
+
+      // Whether failed records are forced to be rechecked
+      if (checkFailed && datum.getDnsFailures() > 0) {
+        return isEligibleForCheck(datum);
+      }
+
+      // It seems this record is not to be checked
+      return false;
+    }
+
+    /**
+     * Determines whether a record is eligible for recheck
+     *
+     * @param HostDatum
+     * @return boolean
+     */
+    protected boolean isEligibleForCheck(HostDatum datum) {
+      // Whether an existing host, known or unknown, if forced to be rechecked
+      if (force || datum.getLastCheck().getTime() +
+        (recheckInterval * datum.getDnsFailures() + 1) < now) {
+        return true;
+      }
+
+      return false;
+    }
+
+    /**
+     * Shut down all running threads and wait for completion.
+     */
+    public void close() {
+      LOG.info("UpdateHostDb: feeder finished, waiting for shutdown");
+
+      // If we're here all keys have been fed and we can issue a shut down
+      executor.shutdown();
+
+      boolean finished = false;
+
+      // Wait until all resolvers have finished
+      while (!finished) {
+        try {
+          // Wait for the executor to shut down completely
+          if (!executor.isTerminated()) {
+            LOG.info("UpdateHostDb: resolver threads waiting: " + Integer.toString(executor.getPoolSize()));
+            Thread.sleep(1000);
+          } else {
+            // All is well, get out
+            finished = true;
+          }
+        } catch (InterruptedException e) {
+          // Huh?
+          LOG.warn(StringUtils.stringifyException(e));
+        }
+      }
+    }
+
+    static class ResolverThread implements Runnable {
+      protected String host = null;
+      protected HostDatum datum = null;
+      protected Text hostText = new Text();
+      protected OutputCollector<Text,HostDatum> output;
+      protected Reporter reporter;
+
+      /**
+       * Constructor.
+       */
+      public ResolverThread(String host, HostDatum datum,
+        OutputCollector<Text,HostDatum> output, Reporter reporter) {
+
+        hostText.set(host);
+        this.host = host;
+        this.datum = datum;
+        this.output = output;
+        this.reporter = reporter;
+      }
+
+      /**
+       *
+       */
+      public void run() {
+        // Resolve the host and act appropriatly
+        try {
+          // Throws an exception if host is not found
+          InetAddress inetAddr = InetAddress.getByName(host);
+
+          if (datum.isEmpty()) {
+            reporter.incrCounter("UpdateHostDb", "new_known_host" ,1);
+            datum.setLastCheck();
+            LOG.info("UpdateHostDb: " + host + ": new_known_host " + datum);
+          } else if (datum.getDnsFailures() > 0) {
+            reporter.incrCounter("UpdateHostDb", "rediscovered_host" ,1);
+            datum.setLastCheck();
+            datum.setDnsFailures(0);
+            LOG.info("UpdateHostDb: " + host + ": rediscovered_host " + datum);
+          } else {
+            reporter.incrCounter("UpdateHostDb", "existing_known_host", 1);
+            datum.setLastCheck();
+            LOG.info("UpdateHostDb: " + host + ": existing_known_host " + datum);
+          }
+
+          // Write the host datum
+          output.collect(hostText, datum);
+        } catch (UnknownHostException e) {
+          try {
+            // If the counter is empty we'll initialize with date = today and 1 failure
+            if (datum.isEmpty()) {
+              datum.setLastCheck();
+              datum.setDnsFailures(1);
+              output.collect(hostText, datum);
+              reporter.incrCounter("UpdateHostDb", "new_unknown_host", 1);
+              LOG.info("UpdateHostDb: " + host + ": new_unknown_host " + datum);
+            } else {
+              datum.setLastCheck();
+              datum.incDnsFailures();
+
+              // Check if this host should be forgotten
+              if (purgeFailedHostsThreshold == -1 ||
+                purgeFailedHostsThreshold < datum.getDnsFailures()) {
+
+                output.collect(hostText, datum);
+                reporter.incrCounter("UpdateHostDb", "existing_unknown_host" ,1);
+                LOG.info("UpdateHostDb: " + host + ": existing_unknown_host " + datum);
+              } else {
+                reporter.incrCounter("UpdateHostDb", "purged_unknown_host" ,1);
+                LOG.info("UpdateHostDb: " + host + ": purged_unknown_host " + datum);
+              }
+            }
+
+            reporter.incrCounter("UpdateHostDb",
+              Integer.toString(datum.numFailures()) + "_times_failed", 1);
+          } catch (Exception ioe) {
+            LOG.warn(StringUtils.stringifyException(ioe));
+          }
+        } catch (Exception e) {
+          LOG.warn(StringUtils.stringifyException(e));
+        }
+        reporter.incrCounter("UpdateHostDb", "checked_hosts", 1);
+      }
+    }
+  }
+
+  public void updateHostDb(Path hostDb, Path crawlDb, Path topHosts,
+    boolean checkFailed, boolean checkNew, boolean checkKnown,
+    boolean force, boolean filter, boolean normalize, int waitCrawlDb) throws Exception {
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("UpdateHostDb: starting at " + sdf.format(start));
+
+    JobConf job = new NutchJob(getConf());
+    boolean preserveBackup = job.getBoolean("db.preserve.backup", true);
+    job.setJarByClass(UpdateHostDb.class);
+    job.setJobName("UpdateHostDb");
+
+    // Check whether the urlfilter-domainblacklist plugin is loaded
+    if (filter && new String("urlfilter-domainblacklist").matches(job.get("plugin.includes"))) {
+      throw new Exception("domainblacklist-urlfilter must not be enabled");
+    }
+
+    // Check whether the urlnormalizer-host plugin is loaded
+    if (normalize && new String("urlnormalizer-host").matches(job.get("plugin.includes"))) {
+      throw new Exception("urlnormalizer-host must not be enabled");
+    }
+
+    FileSystem fs = FileSystem.get(job);
+    Path old = new Path(hostDb, "old");
+    Path current = new Path(hostDb, "current");
+    Path tempHostDb = new Path(hostDb, "hostdb-"
+      + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
+
+    if (topHosts != null) {
+      MultipleInputs.addInputPath(job, topHosts, KeyValueTextInputFormat.class);
+    }
+    if (crawlDb != null) {
+      // Tell the job we read from CrawlDB
+      job.setBoolean("hostdb.reading.crawldb", true);
+      MultipleInputs.addInputPath(job, new Path(crawlDb,
+        CrawlDb.CURRENT_NAME), SequenceFileInputFormat.class);
+        
+      // Are we waiting for crawldb lock file to be released?
+      if (waitCrawlDb > 0) {
+        // Get crawldb lock file
+        Path crawldbLock = new Path(crawlDb, LOCK_NAME);
+        boolean wait = true;
+        while (wait) {
+          LOG.info("UpdateHostDb: still locked, waiting for " + waitCrawlDb + " more seconds..");
+          if (fs.exists(crawldbLock)) {
+            waitCrawlDb--;
+            if (waitCrawlDb == 0) {
+              throw new Exception("UpdateHostDb: stop waiting!");
+            }
+          } else {
+            wait = false;
+          }
+          Thread.sleep(1000);
+        }
+      }
+    }
+    
+    // lock an existing hostdb to prevent multiple simultaneous updates
+    Path lock = new Path(hostDb, LOCK_NAME);
+    if (!fs.exists(current)) {
+      fs.mkdirs(current);
+    }
+    LockUtil.createLockFile(fs, lock, false);
+
+    FileOutputFormat.setOutputPath(job, tempHostDb);
+
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(NutchWritable.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(HostDatum.class);
+    job.setMapperClass(UpdateHostDbMapper.class);
+    job.setReducerClass(UpdateHostDbReducer.class);
+
+    job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    job.setSpeculativeExecution(false);
+    job.setBoolean(HOSTDB_CHECK_FAILED, checkFailed);
+    job.setBoolean(HOSTDB_CHECK_NEW, checkNew);
+    job.setBoolean(HOSTDB_CHECK_KNOWN, checkKnown);
+    job.setBoolean(HOSTDB_FORCE_CHECK, force);
+    job.setBoolean(HOSTDB_URL_FILTERING, filter);
+    job.setBoolean(HOSTDB_URL_NORMALIZING, normalize);
+
+    try {
+      JobClient.runJob(job);
+
+      FSUtils.replace(fs, old, current, true);
+      FSUtils.replace(fs, current, tempHostDb, true);
+
+      if (!preserveBackup && fs.exists(old)) fs.delete(old, true);
+    } catch (Exception e) {
+      if (fs.exists(tempHostDb)) {
+        fs.delete(tempHostDb, true);
+      }
+      LockUtil.removeLockFile(fs, lock);
+      throw e;
+    }
+
+    LockUtil.removeLockFile(fs, lock);
+    long end = System.currentTimeMillis();
+    LOG.info("UpdateHostDb: finished at " + sdf.format(end) +
+      ", elapsed: " + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String args[]) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new UpdateHostDb(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("Usage: UpdateHostDb -hostdb <hostdb> " +
+        "[-tophosts <tophosts>] [-crawldb <crawldb>] [-waitCrawlDb <seconds>] [-checkAll] [-checkFailed]" +
+        " [-checkNew] [-checkKnown] [-force] [-filter] [-normalize]");
+      return -1;
+    }
+
+    Path hostDb = null;
+    Path crawlDb = null;
+    Path topHosts = null;
+
+    boolean checkFailed = false;
+    boolean checkNew = false;
+    boolean checkKnown = false;
+    boolean force = false;
+
+    boolean filter = false;
+    boolean normalize = false;
+    
+    // Number of seconds to wait release of CrawlDB lock
+    int waitCrawlDb = -1;
+
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-hostdb")) {
+        hostDb = new Path(args[i + 1]);
+        LOG.info("UpdateHostDb: hostdb: " + hostDb);
+        i++;
+      }
+      if (args[i].equals("-crawldb")) {
+        crawlDb = new Path(args[i + 1]);
+        LOG.info("UpdateHostDb: crawldb: " + crawlDb);
+        i++;
+      }
+      if (args[i].equals("-tophosts")) {
+        topHosts = new Path(args[i + 1]);
+        LOG.info("UpdateHostDb: tophosts: " + topHosts);
+        i++;
+      }
+      
+      if (args[i].equals("-waitCrawlDb")) {
+        waitCrawlDb = Integer.parseInt(args[i + 1]);
+        LOG.info("UpdateHostDb: waiting for crawldb lock release: " + waitCrawlDb);
+        i++;
+      }
+
+      if (args[i].equals("-checkFailed")) {
+        LOG.info("UpdateHostDb: checking failed hosts");
+        checkFailed = true;
+      }
+      if (args[i].equals("-checkNew")) {
+        LOG.info("UpdateHostDb: checking new hosts");
+        checkNew = true;
+      }
+      if (args[i].equals("-checkKnown")) {
+        LOG.info("UpdateHostDb: checking known hosts");
+        checkKnown = true;
+      }
+      if (args[i].equals("-checkAll")) {
+        LOG.info("UpdateHostDb: checking all hosts");
+        checkFailed = true;
+        checkNew = true;
+        checkKnown = true;
+      }
+      if (args[i].equals("-force")) {
+        LOG.info("UpdateHostDb: forced check");
+        force = true;
+      }
+      if (args[i].equals("-filter")) {
+        LOG.info("UpdateHostDb: filtering enabled");
+        filter = true;
+      }
+      if (args[i].equals("-normalize")) {
+        LOG.info("UpdateHostDb: normalizing ensabled");
+        normalize = true;
+      }
+    }
+
+    if (hostDb == null) {
+      System.err.println("hostDb is mandatory");
+      return -1;
+    }
+
+    try {
+      updateHostDb(hostDb, crawlDb, topHosts, checkFailed, checkNew,
+        checkKnown, force, filter, normalize, waitCrawlDb);
+
+      return 0;
+    } catch (Exception e) {
+      LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
+
+
Index: src/bin/nutch
===================================================================
--- src/bin/nutch	(revision 1669125)
+++ src/bin/nutch	(working copy)
@@ -258,6 +258,10 @@
   CLASS=org.apache.nutch.scoring.webgraph.ScoreUpdater
 elif [ "$COMMAND" = "nodedumper" ] ; then
   CLASS=org.apache.nutch.scoring.webgraph.NodeDumper
+elif [ "$COMMAND" = "updatehostdb" ] ; then
+  CLASS=org.apache.nutch.util.hostdb.UpdateHostDb
+elif [ "$COMMAND" = "dumphostdb" ] ; then
+  CLASS=org.apache.nutch.util.hostdb.DumpHostDb
 elif [ "$COMMAND" = "plugin" ] ; then
   CLASS=org.apache.nutch.plugin.PluginRepository
 elif [ "$COMMAND" = "junit" ] ; then
Index: conf/log4j.properties
===================================================================
--- conf/log4j.properties	(revision 1669125)
+++ conf/log4j.properties	(working copy)
@@ -37,6 +37,8 @@
 log4j.logger.org.apache.nutch.indexer.IndexingFiltersChecker=INFO,cmdstdout
 log4j.logger.org.apache.nutch.tools.FreeGenerator=INFO,cmdstdout
 log4j.logger.org.apache.nutch.util.domain.DomainStatistics=INFO,cmdstdout
+log4j.logger.org.apache.nutch.util.hostdb.UpdateHostDb=INFO,cmdstdout
+log4j.logger.org.apache.nutch.util.hostdb.DumHostDb=INFO,cmdstdout
 log4j.logger.org.apache.nutch.tools.CrawlDBScanner=INFO,cmdstdout
 log4j.logger.org.apache.nutch.plugin.PluginRepository=WARN
 
