Index: src/java/org/apache/nutch/crawl/NutchWritable.java
===================================================================
--- src/java/org/apache/nutch/crawl/NutchWritable.java	(revision 1550985)
+++ src/java/org/apache/nutch/crawl/NutchWritable.java	(working copy)
@@ -48,6 +48,7 @@
       org.apache.nutch.protocol.Content.class,
       org.apache.nutch.protocol.ProtocolStatus.class,
       org.apache.nutch.scoring.webgraph.LinkDatum.class,
+      org.apache.nutch.util.hostdb.HostDatum.class,
     };
   }
 
Index: src/java/org/apache/nutch/util/hostdb/DumpHostDb.java
===================================================================
--- src/java/org/apache/nutch/util/hostdb/DumpHostDb.java	(revision 0)
+++ src/java/org/apache/nutch/util/hostdb/DumpHostDb.java	(working copy)
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util.hostdb;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.TimingUtil;
+
+/**
+ * A utility to dump the contents of HostDB.
+ */
+public class DumpHostDb extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(DumpHostDb.class);
+
+  public static final String HOSTDB_FAILURE_THRESHOLD = "hostdb.failure.threshold";
+  public static final String HOSTDB_NUM_PAGES_THRESHOLD = "hostdb.num.pages.threshold";
+  public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  static class DumpHostDbMapper extends Mapper<Text, HostDatum, Text, HostDatum> {
+    protected Integer failureThreshold = -1;
+    protected Integer numPagesThreshold = -1;
+
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      failureThreshold = conf.getInt(HOSTDB_FAILURE_THRESHOLD, -1);
+      numPagesThreshold = conf.getInt(HOSTDB_NUM_PAGES_THRESHOLD, -1);
+    }
+
+    public void map(Text key, HostDatum datum, Context context)
+        throws IOException, InterruptedException {
+      boolean filter = false;
+
+      if (numPagesThreshold != -1 && (datum.getStat(CrawlDatum.STATUS_DB_FETCHED) +
+          datum.getStat(CrawlDatum.STATUS_DB_NOTMODIFIED)) < numPagesThreshold)
+        filter = true;
+      if (failureThreshold != -1 && datum.numFailures() < numPagesThreshold)
+        filter = true;
+
+      if(!filter)
+        context.write(key, datum);
+    }
+  }
+
+  private void dumpHostDb(Path hostDb, Path output, Integer failureThreshold,
+                          Integer numPagesThreshold) throws Exception {
+
+    long start = System.currentTimeMillis();
+    LOG.info("HostDb dump: starting at " + sdf.format(start));
+
+    Configuration conf = getConf();
+    conf.setInt(HOSTDB_FAILURE_THRESHOLD, failureThreshold);
+    conf.setInt(HOSTDB_NUM_PAGES_THRESHOLD, numPagesThreshold);
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    Job job = new Job(conf, "DumpHostDb");
+    job.setJarByClass(DumpHostDb.class);
+
+    FileInputFormat.addInputPath(job, new Path(hostDb, "current"));
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setMapperClass(DumpHostDbMapper.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(HostDatum.class);
+
+    try {
+      job.waitForCompletion(true);
+    } catch (Exception e) {
+      throw e;
+    }
+
+    long end = System.currentTimeMillis();
+    LOG.info("HostDb dump: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String args[]) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new DumpHostDb(), args);
+    System.exit(res);
+  }
+
+  public static void usage() {
+    System.err.println("Usage: DumpHostDb <hostdb> <output> [-numPagesThreshold <threshold>] [-dumpFailedHosts <threshold>]");
+    System.err.println("\t<hostdb>\tdirectory name where hostdb is located");
+    System.err.println("\t<output>\toutput location where the dump will be produced");
+    System.err.println("\n Optional arguments:");
+    System.err.println("\t[-dumpFailedHosts <threshold>]\tlist status sorted by host");
+    System.err.println("\t[-numPagesThreshold <threshold>]\tthreshold for fetched pages of the hosts");
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage();
+      return -1;
+    }
+
+    Path hostdb = new Path(args[0]);
+    Path output = new Path(args[1]);
+
+    Integer failureThreshold = -1;
+    Integer numPagesThreshold = -1;
+
+    for (int i = 2; i < args.length; i++) {
+      if (args[i].equals("-dumpFailedHosts")) {
+        failureThreshold = Integer.parseInt(args[++i]);
+        LOG.info("HostDb dump: dumping failed hosts with a threshold of " + failureThreshold);
+      }
+      else if (args[i].equals("-numPagesThreshold")) {
+        numPagesThreshold = Integer.parseInt(args[++i]);
+        LOG.info("HostDb dump: dumping hosts with page threshold of " + numPagesThreshold );
+      }
+      else {
+        System.err.println("HostDb dump: Found invalid argument : \"" + args[i] + "\"\n");
+        usage();
+        return -1;
+      }
+    }
+
+    try {
+      dumpHostDb(hostdb, output, failureThreshold, numPagesThreshold);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("HostDb dump: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
Index: src/java/org/apache/nutch/util/hostdb/HostDb.java
===================================================================
--- src/java/org/apache/nutch/util/hostdb/HostDb.java	(revision 0)
+++ src/java/org/apache/nutch/util/hostdb/HostDb.java	(working copy)
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util.hostdb;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleInputs;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Tool to create a HostDB from the CrawlDB. It aggregates fetch status values by host and checks
+ * DNS entries for hosts.
+ */
+public class HostDb extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(HostDb.class);
+  public static final String LOCK_NAME = ".locked";
+
+  public static final String HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD = "hostdb.purge.failed.hosts.threshold";
+  public static final String HOSTDB_NUM_RESOLVER_THREADS = "hostdb.num.resolvers.threads";
+  public static final String HOSTDB_RECHECK_INTERVAL = "hostdb.recheck.interval";
+  public static final String HOSTDB_CHECK_FAILED = "hostdb.check.failed";
+  public static final String HOSTDB_CHECK_NEW = "hostdb.check.new";
+  public static final String HOSTDB_CHECK_KNOWN = "hostdb.check.known";
+  public static final String HOSTDB_FORCE_CHECK = "hostdb.force.check";
+  public static final String HOSTDB_URL_FILTERING = "hostdb.url.filter";
+  public static final String HOSTDB_URL_NORMALIZING = "hostdb.url.normalize";
+
+  public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  /**
+   * Mapper ingesting HostDB and CrawlDB entries. Additionally it can also read host score info
+   * from a plain text key/value file generated by the Webgraph's NodeDumper tool.
+   */
+  public static class HostDbMapper implements Mapper<Text, Writable, Text, NutchWritable> {
+    private Text host = new Text();
+    private HostDatum hostDatum = null;
+    private CrawlDatum crawlDatum = null;
+    private String reprUrl = null;
+    private String buffer = null;
+    private boolean filter = false;
+    private boolean normalize = false;
+    private boolean readingCrawlDb = false;
+    private URLFilters filters = null;
+    private URLNormalizers normalizers = null;
+
+    public void close() {}
+
+    public void configure(JobConf job) {
+      readingCrawlDb = job.getBoolean("hostdb.reading.crawldb", false);
+      filter = job.getBoolean(HOSTDB_URL_FILTERING, false);
+      normalize = job.getBoolean(HOSTDB_URL_NORMALIZING, false);
+
+      if (filter)
+        filters = new URLFilters(job);
+
+      if (normalize)
+        normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_DEFAULT);
+    }
+
+    /**
+     * Filters and or normalizes the input URL
+     */
+    private String filterNormalize(String url) {
+      // We actually receive a hostname here so let's make a URL
+      String httpUrl = "http://" + url + "/";
+
+      try {
+        if (normalize)
+          httpUrl = normalizers.normalize(httpUrl, URLNormalizers.SCOPE_DEFAULT);
+
+        if (filter)
+          httpUrl = filters.filter(httpUrl);
+
+        if (httpUrl == null) {
+          // All hosts may not allow HTTP scheme and just allow HTTPS scheme.
+          // So, try to force HTTPS for domains which are filtered with HTTP scheme
+          // Note that this is a hacky way of getting around and does not work
+          // for FTP and FILE schemes.
+          String httpsUrl = "https://" + url + "/";
+          if (normalize)
+            httpsUrl = normalizers.normalize(httpsUrl, URLNormalizers.SCOPE_DEFAULT);
+
+          if (filter)
+            httpsUrl = filters.filter(httpsUrl);
+
+          if(httpsUrl == null)
+            return null;
+        }
+      } catch (Exception e) {
+        return null;
+      }
+      return url;
+    }
+
+    /**
+     * Mapper ingesting records from the HostDB, CrawlDB and plain-text host scores
+     * file. Statistics and scores are passed on.
+     */
+    public void map(Text key, Writable value,
+                    OutputCollector<Text,NutchWritable> output, Reporter reporter) throws IOException {
+
+      if (value instanceof CrawlDatum) {
+        // This is a record from the CrawlDB
+        // Get the normalized and filtered host of this URL
+        buffer = filterNormalize(URLUtil.getHost(key.toString()));
+
+        // Filtered out?
+        if (buffer == null) {
+          reporter.incrCounter("HostDb", "filtered_records", 1);
+          LOG.info(URLUtil.getHost(key.toString()) + " crawldatum has been filtered");
+          return;
+        }
+
+        // Set the host of this URL
+        host.set(buffer);
+        crawlDatum = (CrawlDatum)value;
+        hostDatum = new HostDatum();
+
+        /**
+         * Known limitation:
+         * multi redirects: host_a => host_b/page => host_c/page/whatever
+         *
+         * We cannot re-resolve redirects for host objects as CrawlDatum metadata is
+         * not available. We also cannot reliably use the reducer in all cases since
+         * redirects may be across hosts or even domains. For now saving this for future
+         * as multi-redirects are not very common on the entire internet.
+         */
+
+        // Check if the current key is equals the host
+        if (key.toString().equals("http://" + buffer + "/")) {
+          // Check if this is a redirect to the real home page
+          if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM ||
+              crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
+
+            // Obtain the repr url for this redirect via protocol status from the metadata
+            ProtocolStatus z = (ProtocolStatus)crawlDatum.getMetaData().get(Nutch.WRITABLE_PROTO_STATUS_KEY);
+
+            // Get the protocol status' arguments
+            reprUrl = z.getArgs()[0];
+
+            if (reprUrl != null) {
+              LOG.info("Homepage: " + key.toString() + " redirects to: " + reprUrl);
+              hostDatum.setHomepageUrl(reprUrl);
+            } else {
+              LOG.info("Homepage: " + key.toString() +
+                  " redirects to: " + reprUrl + " but has been filtered out");
+            }
+          } else {
+            hostDatum.setHomepageUrl("http://" + buffer + "/");
+            LOG.info("Homepage: " + "http://" + buffer + "/");
+          }
+        }
+
+        hostDatum.setStat(crawlDatum.getStatus(), 1);
+        output.collect(host, new NutchWritable(hostDatum));
+      }
+      else if (value instanceof HostDatum) {       // we got a record from the hostdb
+        buffer = filterNormalize(key.toString());
+
+        // Filtered out?
+        if (buffer == null) {
+          reporter.incrCounter("HostDb", "filtered_records", 1);
+          LOG.info(key.toString() + " hostdatum has been filtered");
+          return;
+        }
+
+        // Get a HostDatum
+        hostDatum = (HostDatum)value;
+        key.set(buffer);
+
+        // If we're also reading CrawlDb entries, reset db_* statistics because
+        // we're aggregating them from CrawlDB anyway
+        if (readingCrawlDb)
+          hostDatum.resetStatistics();
+
+        output.collect(key, new NutchWritable(hostDatum));
+      }
+      else if (value instanceof Text) {         // we got a record with host scores
+        buffer = filterNormalize(key.toString());
+
+        // Filtered out?
+        if (buffer == null) {
+          reporter.incrCounter("HostDb", "filtered_records", 1);
+          LOG.info(key.toString() + " score has been filtered");
+          return;
+        }
+
+        key.set(buffer);
+        output.collect(key,
+            new NutchWritable(new FloatWritable(Float.parseFloat(value.toString()))));
+      }
+    }
+  }
+
+  static class HostDbReducer implements Reducer<Text, NutchWritable, Text, HostDatum> {
+    private ResolverThread resolverThread = null;
+
+    private Integer numResolverThreads = 10;
+    private static Integer purgeFailedHostsThreshold = -1;
+    private static Integer recheckInterval = 86400000;
+    private static boolean checkFailed = false;
+    private static boolean checkNew = false;
+    private static boolean checkKnown = false;
+    private static boolean force = false;
+    private static long now = new Date().getTime();
+
+    private BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+    private ThreadPoolExecutor executor = null;
+
+    /**
+     * Configures the thread pool and prestarts all resolver threads.
+     */
+    public void configure(JobConf job) {
+      purgeFailedHostsThreshold = job.getInt(HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1);
+      numResolverThreads = job.getInt(HOSTDB_NUM_RESOLVER_THREADS, 10);
+      recheckInterval = job.getInt(HOSTDB_RECHECK_INTERVAL, 86400) * 1000;
+      checkFailed = job.getBoolean(HOSTDB_CHECK_FAILED, false);
+      checkNew = job.getBoolean(HOSTDB_CHECK_NEW, false);
+      checkKnown = job.getBoolean(HOSTDB_CHECK_KNOWN, false);
+      force = job.getBoolean(HOSTDB_FORCE_CHECK, false);
+
+      // Initialize the thread pool with our queue
+      executor = new ThreadPoolExecutor(numResolverThreads, numResolverThreads, 5, TimeUnit.SECONDS, queue);
+
+      // Run all threads in the pool
+      executor.prestartAllCoreThreads();
+    }
+
+    public void reduce(Text key, Iterator<NutchWritable> values,
+                       OutputCollector<Text,HostDatum> output, Reporter reporter) throws IOException {
+
+      HostDatum hostDatum = new HostDatum();
+      float score = 0;
+
+      // Loop through all values until we find a non-empty HostDatum or use an empty if this is a new host for the host db
+      while (values.hasNext()) {
+        Writable value = values.next().get();
+
+        if (value instanceof HostDatum) {
+          HostDatum buffer = (HostDatum) value;
+
+          // Increment statistics only if this is not an existing HostDatum
+          if (hostDatum.isEmpty()) {
+            hostDatum.addStat(CrawlDatum.STATUS_DB_UNFETCHED, buffer);
+            hostDatum.addStat(CrawlDatum.STATUS_DB_FETCHED, buffer);
+            hostDatum.addStat(CrawlDatum.STATUS_DB_GONE, buffer);
+            hostDatum.addStat(CrawlDatum.STATUS_DB_REDIR_PERM, buffer);
+            hostDatum.addStat(CrawlDatum.STATUS_DB_REDIR_TEMP, buffer);
+            hostDatum.addStat(CrawlDatum.STATUS_DB_NOTMODIFIED, buffer);
+          }
+
+          // Check homepage URL
+          if (buffer.hasHomepageUrl())
+            hostDatum.setHomepageUrl(buffer.getHomepageUrl());
+
+          // Check lastCheck timestamp
+          if (!buffer.isEmpty())
+            hostDatum.setLastCheck(buffer.getLastCheck());
+
+          // Check and set failures
+          if (buffer.getDnsFailures() > 0)
+            hostDatum.setDnsFailures(buffer.getDnsFailures());
+
+          // Check and set failures
+          if (buffer.getConnectionFailures() > 0)
+            hostDatum.setConnectionFailures(buffer.getConnectionFailures());
+
+          // Check and set score (score from Web Graph has precedence)
+          if (buffer.getScore() > 0)
+            hostDatum.setScore(buffer.getScore());
+        }
+
+        // Check for the score
+        if (value instanceof FloatWritable) {
+          FloatWritable buffer = (FloatWritable)value;
+          score = buffer.get();
+        }
+      }
+
+      // Check if score was set from Web Graph
+      if (score > 0)
+        hostDatum.setScore(score);
+
+      reporter.incrCounter("HostDb", "total_hosts", 1);
+
+      // See if this record is to be checked
+      if (shouldCheck(hostDatum)) {
+        // Make an entry
+        resolverThread = new ResolverThread(key.toString(), hostDatum, output, reporter);
+
+        // Add the entry to the queue (blocking)
+        try {
+          queue.put(resolverThread);
+        } catch (InterruptedException e) {
+          LOG.error("HostDb: " + StringUtils.stringifyException(e));
+        }
+
+        // Do not progress, the datum will be written in the resolver thread
+        return;
+      } else {
+        reporter.incrCounter("HostDb", "skipped_not_eligible", 1);
+        LOG.info(key.toString() + ": skipped_not_eligible");
+      }
+
+      // Write the host datum if it wasn't written by the resolver thread
+      output.collect(key, hostDatum);
+    }
+
+    /**
+     * Determines whether a record should be checked.
+     */
+    private boolean shouldCheck(HostDatum datum) {
+      // Whether a new record is to be checked
+      if (checkNew && datum.isEmpty()) {
+        return true;
+      }
+
+      // Whether existing known hosts should be rechecked
+      if (checkKnown && !datum.isEmpty() && datum.getDnsFailures() == 0) {
+        return isEligibleForCheck(datum);
+      }
+
+      // Whether failed records are forced to be rechecked
+      if (checkFailed && datum.getDnsFailures() > 0) {
+        return isEligibleForCheck(datum);
+      }
+
+      // It seems this record is not to be checked
+      return false;
+    }
+
+    /**
+     * Determines whether a record is eligible for recheck
+     */
+    private boolean isEligibleForCheck(HostDatum datum) {
+      // Whether an existing host, known or unknown, if forced to be rechecked
+      return (force || datum.getLastCheck().getTime() + (recheckInterval * datum.getDnsFailures() + 1) < now);
+    }
+
+    /**
+     * Shut down all running threads and wait for completion.
+     */
+    public void close() {
+      LOG.info("Feeder finished, waiting for shutdown");
+
+      // If we're here all keys have been fed and we can issue a shut down
+      executor.shutdown();
+
+      boolean finished = false;
+
+      while (!finished) {
+        try {
+          // Wait for the executor to shut down completely
+          if (!executor.isTerminated()) {
+            LOG.info("Threads waiting: " + Integer.toString(executor.getPoolSize()));
+            Thread.sleep(1000);
+          } else {
+            // All is well, get out
+            finished = true;
+          }
+        } catch (InterruptedException e) {
+          LOG.warn(StringUtils.stringifyException(e));
+        }
+      }
+    }
+
+    static class ResolverThread implements Runnable {
+      private String host = null;
+      private HostDatum datum = null;
+      private Text hostText = new Text();
+      private OutputCollector<Text,HostDatum> output;
+      private Reporter reporter;
+
+      public ResolverThread(String host, HostDatum datum,
+                            OutputCollector<Text,HostDatum> output, Reporter reporter) {
+
+        hostText.set(host);
+        this.host = host;
+        this.datum = datum;
+        this.output = output;
+        this.reporter = reporter;
+      }
+
+      public void run() {
+        // Resolve the host and act appropriately
+        datum.setLastCheck();
+        try {
+          // Throws an exception if host is not found
+          InetAddress.getByName(host);
+
+          if (datum.isEmpty()) {
+            reporter.incrCounter("HostDb", "new_known_host" ,1);
+            LOG.info(host + ": new_known_host " + datum);
+          } else if (datum.getDnsFailures() > 0) {
+            reporter.incrCounter("HostDb", "rediscovered_host" ,1);
+            datum.setDnsFailures(0);
+            LOG.info(host + ": rediscovered_host " + datum);
+          } else {
+            reporter.incrCounter("HostDb", "existing_known_host", 1);
+            LOG.info(host + ": existing_known_host " + datum);
+          }
+          // Write the host datum
+          output.collect(hostText, datum);
+        } catch (UnknownHostException e) {
+          try {
+            // If the counter is empty we'll initialize with date = today and 1 failure
+            if (datum.isEmpty()) {
+              datum.setDnsFailures(1);
+              output.collect(hostText, datum);
+              reporter.incrCounter("HostDb", "new_unknown_host", 1);
+              LOG.info(host + ": new_unknown_host " + datum);
+            } else {
+              datum.incDnsFailures();
+
+              // Check if this host should be forgotten
+              if (purgeFailedHostsThreshold == -1 ||
+                  purgeFailedHostsThreshold < datum.getDnsFailures()) {
+                output.collect(hostText, datum);
+                reporter.incrCounter("HostDb", "existing_unknown_host" ,1);
+                LOG.info(host + ": existing_unknown_host " + datum);
+              } else {
+                reporter.incrCounter("HostDb", "purged_unknown_host" ,1);
+                LOG.info(host + ": purged_unknown_host " + datum);
+              }
+            }
+
+            reporter.incrCounter("HostDb",
+                Integer.toString(datum.numFailures()) + "_times_failed", 1);
+          } catch (Exception ioe) {
+            LOG.warn(StringUtils.stringifyException(ioe));
+          }
+        } catch (Exception e) {
+          LOG.warn(StringUtils.stringifyException(e));
+        }
+        reporter.incrCounter("HostDb", "checked_hosts", 1);
+      }
+    }
+  }
+
+  private void HostDb(Path hostDb, Path crawlDb, Path topHosts,
+                            boolean checkFailed, boolean checkNew, boolean checkKnown,
+                            boolean force, boolean filter, boolean normalize) throws Exception {
+
+    long start = System.currentTimeMillis();
+    LOG.info("HostDb: starting at " + sdf.format(start));
+
+    JobConf job = new NutchJob(getConf());
+    job.setJarByClass(HostDb.class);
+    job.setJobName("HostDb");
+    boolean preserveBackup = job.getBoolean("db.preserve.backup", true);
+
+    // Check whether the urlfilter-domainblacklist plugin is loaded
+    if ("urlfilter-domainblacklist".matches(job.get("plugin.includes"))) {
+      throw new Exception("domainblacklist-urlfilter must not be enabled");
+    }
+
+    // Check whether the urlnormalizer-host plugin is loaded
+    if ("urlnormalizer-host".matches(job.get("plugin.includes"))) {
+      throw new Exception("urlnormalizer-host must not be enabled");
+    }
+
+    FileSystem fs = FileSystem.get(job);
+    Path old = new Path(hostDb, "old");
+    Path current = new Path(hostDb, "current");
+    Path tempHostDb = new Path(hostDb, "hostdb-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    // lock an existing hostdb to prevent multiple simultaneous updates
+    Path lock = new Path(hostDb, LOCK_NAME);
+    if (!fs.exists(current)) {
+      fs.mkdirs(current);
+    }
+    LockUtil.createLockFile(fs, lock, false);
+
+    MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
+
+    if (topHosts != null) {
+      MultipleInputs.addInputPath(job, topHosts, KeyValueTextInputFormat.class);
+    }
+    if (crawlDb != null) {
+      // Tell the job we read from CrawlDB
+      job.setBoolean("hostdb.reading.crawldb", true);
+      MultipleInputs.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME),
+          SequenceFileInputFormat.class);
+    }
+
+    FileOutputFormat.setOutputPath(job, tempHostDb);
+
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(NutchWritable.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(HostDatum.class);
+    job.setMapperClass(HostDbMapper.class);
+    job.setReducerClass(HostDbReducer.class);
+
+    job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    job.setSpeculativeExecution(false);
+    job.setBoolean(HOSTDB_CHECK_FAILED, checkFailed);
+    job.setBoolean(HOSTDB_CHECK_NEW, checkNew);
+    job.setBoolean(HOSTDB_CHECK_KNOWN, checkKnown);
+    job.setBoolean(HOSTDB_FORCE_CHECK, force);
+    job.setBoolean(HOSTDB_URL_FILTERING, filter);
+    job.setBoolean(HOSTDB_URL_NORMALIZING, normalize);
+
+    try {
+      JobClient.runJob(job);
+
+      FSUtils.replace(fs, old, current, true);
+      FSUtils.replace(fs, current, tempHostDb, true);
+
+      if (!preserveBackup && fs.exists(old)) fs.delete(old, true);
+    } catch (Exception e) {
+      if (fs.exists(tempHostDb)) {
+        fs.delete(tempHostDb, true);
+      }
+      LockUtil.removeLockFile(fs, lock);
+      throw e;
+    }
+
+    LockUtil.removeLockFile(fs, lock);
+    long end = System.currentTimeMillis();
+    LOG.info("HostDb: finished at " + sdf.format(end) +
+        ", elapsed: " + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String args[]) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new HostDb(), args);
+    System.exit(res);
+  }
+
+  public static void usage() {
+    System.err.println("Usage: HostDb <hostdb> " +
+        "[-crawldb <crawldb>] [-tophosts <tophosts>] [-checkAll] [-checkFailed]" +
+        " [-checkNew] [-checkKnown] [-force] [-noFilter] [-noNormalize]");
+    System.err.println("\t<hostdb>\tdirectory name where hostdb is located");
+    System.err.println("\t-crawldb <crawldb>\tpath to a crawldb directory");
+    System.err.println("\t-tophosts <tophosts>\tkey-value text file generated by the Webgraph's NodeDumper tool having score");
+    System.err.println("\t-checkAll\tApply DNS check to resolve all hosts");
+    System.err.println("\t-checkFailed\tApply DNS check to resolve only on hosts which had failed DNS check earlier");
+    System.err.println("\t-checkNew\tApply DNS check to resolve only new hosts");
+    System.err.println("\t-checkKnown\tApply DNS check to resolve only known hosts");
+    System.err.println("\t-force\t\tforce hosts to be rechecked. With earlier args, check is done on host only if 'recheckInterval' has elapsed.");
+    System.err.println("\t-noFilter\tturn off URLFilters on urls");
+    System.err.println("\t-noNormalize\tturn off URLNormalizer on urls");
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage();
+      return -1;
+    }
+
+    Path hostDb = new Path(args[0]);
+    Path crawlDb = null;
+    Path topHosts = null;
+
+    boolean checkFailed = false;
+    boolean checkNew = false;
+    boolean checkKnown = false;
+    boolean force = false;
+
+    boolean filter = true;
+    boolean normalize = true;
+
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-crawldb")) {
+        crawlDb = new Path(args[++i]);
+        LOG.info("HostDb: crawldb: " + crawlDb);
+      }
+      else if (args[i].equals("-tophosts")) {
+        topHosts = new Path(args[++i]);
+        LOG.info("HostDb: tophosts: " + topHosts);
+      }
+      else if (args[i].equals("-checkFailed")) {
+        LOG.info("HostDb: checking failed hosts");
+        checkFailed = true;
+      }
+      else if (args[i].equals("-checkNew")) {
+        LOG.info("HostDb: checking new hosts");
+        checkNew = true;
+      }
+      else if (args[i].equals("-checkKnown")) {
+        LOG.info("HostDb: checking known hosts");
+        checkKnown = true;
+      }
+      else if (args[i].equals("-checkAll")) {
+        LOG.info("HostDb: checking all hosts");
+        checkFailed = true;
+        checkNew = true;
+        checkKnown = true;
+      }
+      else if (args[i].equals("-force")) {
+        LOG.info("HostDb: forced check");
+        force = true;
+      }
+      else if (args[i].equals("-noFilter")) {
+        LOG.info("HostDb: filtering disabled");
+        filter = false;
+      }
+      else if (args[i].equals("-noNormalize")) {
+        LOG.info("HostDb: normalizing disabled");
+        normalize = false;
+      }
+      else {
+        LOG.info("HostDb: Found invalid argument \"" + args[i] + "\"\n");
+        usage();
+        return -1;
+      }
+    }
+
+    try {
+      HostDb(hostDb, crawlDb, topHosts, checkFailed, checkNew,
+          checkKnown, force, filter, normalize);
+
+      return 0;
+    } catch (Exception e) {
+      LOG.error("HostDb: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
Index: src/java/org/apache/nutch/util/hostdb/HostDatum.java
===================================================================
--- src/java/org/apache/nutch/util/hostdb/HostDatum.java	(revision 0)
+++ src/java/org/apache/nutch/util/hostdb/HostDatum.java	(working copy)
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util.hostdb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
+import org.apache.nutch.crawl.CrawlDatum;
+
+/**
+ * Contains information of a Host
+ */
+public class HostDatum implements Writable, Cloneable {
+  private static final String EMPTY_STRING = "";
+  private static final byte CUR_VERSION = 1;
+  private static final Date DEFAULT_DATE = new Date(0);
+
+  private float score = 0;
+  private Date lastCheck = DEFAULT_DATE;
+  private String homepageUrl = EMPTY_STRING;
+
+  // Records the number of times DNS look-up failed, may indicate host no longer exists
+  private int dnsFailures = 0;
+
+  // Records the number of connection failures, may indicate our network being blocked by firewall
+  private int connectionFailures = 0;
+
+  // Counts for various url statuses
+  private HashMap<Byte, Integer> statCounts = new HashMap<Byte, Integer>();
+
+  private MapWritable metaData = new MapWritable();
+
+  public HostDatum() {
+    resetStatistics();
+  }
+
+  public boolean isEmpty() {
+    return lastCheck.getTime() == 0;
+  }
+
+  public float getScore() { return score; }
+  public void setScore(float score) { this.score = score; }
+
+  public Date getLastCheck() { return lastCheck; }
+  public void setLastCheck() { setLastCheck(new Date()); }
+  public void setLastCheck(Date date) { lastCheck = date; }
+
+  public boolean hasHomepageUrl() { return homepageUrl.compareTo(EMPTY_STRING) != 0; }
+  public String getHomepageUrl() { return homepageUrl; }
+  public void setHomepageUrl(String homepageUrl) { this.homepageUrl = homepageUrl; }
+
+  public int getDnsFailures() { return dnsFailures; }
+  public void incDnsFailures() { this.dnsFailures++; }
+  public void setDnsFailures(int i) { this.dnsFailures = i; }
+
+  public int getConnectionFailures() { return connectionFailures; }
+  public void setConnectionFailures(int i) { this.connectionFailures = i; }
+  public void incConnectionFailures() { this.connectionFailures++; }
+  public int numFailures() { return getDnsFailures() + getConnectionFailures(); }
+
+  public Integer getStat(byte key) { return statCounts.get(key); }
+  public void setStat(byte key, int val) { statCounts.put(key, val); }
+
+  public void addStat(byte key, HostDatum other) {
+    setStat(key, getStat(key) + other.getStat(key));
+  }
+
+  public Integer numRecords() {
+    return statCounts.get(CrawlDatum.STATUS_DB_UNFETCHED) +
+        statCounts.get(CrawlDatum.STATUS_DB_FETCHED) +
+        statCounts.get(CrawlDatum.STATUS_DB_NOTMODIFIED) +
+        statCounts.get(CrawlDatum.STATUS_DB_REDIR_PERM) +
+        statCounts.get(CrawlDatum.STATUS_DB_REDIR_TEMP) +
+        statCounts.get(CrawlDatum.STATUS_DB_GONE);
+  }
+
+  public void resetStatistics() {
+    statCounts.put(CrawlDatum.STATUS_DB_UNFETCHED,    0);
+    statCounts.put(CrawlDatum.STATUS_DB_FETCHED,      0);
+    statCounts.put(CrawlDatum.STATUS_DB_NOTMODIFIED,  0);
+    statCounts.put(CrawlDatum.STATUS_DB_REDIR_PERM,   0);
+    statCounts.put(CrawlDatum.STATUS_DB_REDIR_TEMP,   0);
+    statCounts.put(CrawlDatum.STATUS_DB_GONE,         0);
+  }
+
+  /**
+   * Returns a MapWritable if it was set or read in @see readFields(DataInput),
+   * Returns empty map in case CrawlDatum was freshly created (lazily instantiated).
+   */
+  public MapWritable getMetaData() {
+    if (this.metaData == null) this.metaData = new MapWritable();
+    return this.metaData;
+  }
+
+  /**
+   * Add all metadata from other HostDatum to this HostDatum.
+   */
+  public void putAllMetaData(HostDatum other) {
+    for (Entry<Writable, Writable> e : other.getMetaData().entrySet()) {
+      getMetaData().put(e.getKey(), e.getValue());
+    }
+  }
+
+  public void setMetaData(MapWritable mapWritable) {
+    this.metaData = new MapWritable(mapWritable);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte version = in.readByte();
+    if (version > CUR_VERSION)          // check version
+      throw new VersionMismatchException(CUR_VERSION, version);
+
+    score = in.readFloat();
+    lastCheck = new Date(in.readLong());
+    homepageUrl = Text.readString(in);
+
+    dnsFailures = in.readInt();
+    connectionFailures = in.readInt();
+
+    statCounts.put(CrawlDatum.STATUS_DB_UNFETCHED, in.readInt());
+    statCounts.put(CrawlDatum.STATUS_DB_FETCHED, in.readInt());
+    statCounts.put(CrawlDatum.STATUS_DB_NOTMODIFIED, in.readInt());
+    statCounts.put(CrawlDatum.STATUS_DB_REDIR_PERM, in.readInt());
+    statCounts.put(CrawlDatum.STATUS_DB_REDIR_TEMP, in.readInt());
+    statCounts.put(CrawlDatum.STATUS_DB_GONE, in.readInt());
+
+    metaData = new MapWritable();
+    metaData.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeByte(CUR_VERSION);           // store current version
+    out.writeFloat(score);
+    out.writeLong(lastCheck.getTime());
+    Text.writeString(out, homepageUrl);
+
+    out.writeInt(dnsFailures);
+    out.writeInt(connectionFailures);
+
+    out.writeInt(statCounts.get(CrawlDatum.STATUS_DB_UNFETCHED));
+    out.writeInt(statCounts.get(CrawlDatum.STATUS_DB_FETCHED));
+    out.writeInt(statCounts.get(CrawlDatum.STATUS_DB_NOTMODIFIED));
+    out.writeInt(statCounts.get(CrawlDatum.STATUS_DB_REDIR_PERM));
+    out.writeInt(statCounts.get(CrawlDatum.STATUS_DB_REDIR_TEMP));
+    out.writeInt(statCounts.get(CrawlDatum.STATUS_DB_GONE));
+
+    metaData.write(out);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("Version: " + CUR_VERSION + "\n");
+    buf.append("Homepage url: " + homepageUrl + "\n");
+    buf.append("Score: " + score + "\n");
+
+    if(lastCheck != DEFAULT_DATE)
+      buf.append("Last check: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastCheck) + "\n");
+    else
+      buf.append("Last check: \n");
+    buf.append("Total records: " + numRecords() + "\n");
+    buf.append("  Unfetched: " + statCounts.get(CrawlDatum.STATUS_DB_UNFETCHED) + "\n");
+    buf.append("  Fetched: " + statCounts.get(CrawlDatum.STATUS_DB_FETCHED) + "\n");
+    buf.append("  Gone: " + statCounts.get(CrawlDatum.STATUS_DB_GONE) + "\n");
+    buf.append("  Perm redirect: " + statCounts.get(CrawlDatum.STATUS_DB_REDIR_PERM) + "\n");
+    buf.append("  Temp redirect: " + statCounts.get(CrawlDatum.STATUS_DB_REDIR_TEMP) + "\n");
+    buf.append("  Not modified: " + statCounts.get(CrawlDatum.STATUS_DB_NOTMODIFIED) + "\n");
+
+    buf.append("Total failures: " + numFailures() + "\n");
+    buf.append("  DNS failures: " + getDnsFailures() + "\n");
+    buf.append("  Connection failures: " + getConnectionFailures() + "\n");
+
+    return buf.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof HostDatum))
+      return false;
+
+    HostDatum other = (HostDatum) o;
+    if(this.score == other.score &&
+        this.lastCheck == other.lastCheck &&
+        this.homepageUrl.compareTo(other.homepageUrl) == 0 &&
+        this.dnsFailures == other.dnsFailures &&
+        this.connectionFailures == other.connectionFailures) {
+      for(Byte key : statCounts.keySet()) {
+        if(other.getStat(key) == null || other.getStat(key) != statCounts.get(key))
+          return false;
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return dnsFailures ^
+        homepageUrl.hashCode() ^
+        lastCheck.hashCode() ^
+        connectionFailures ^
+        Float.valueOf(score).hashCode() ^
+        statCounts.get(CrawlDatum.STATUS_DB_UNFETCHED) ^
+        statCounts.get(CrawlDatum.STATUS_DB_FETCHED) ^
+        statCounts.get(CrawlDatum.STATUS_DB_NOTMODIFIED) ^
+        statCounts.get(CrawlDatum.STATUS_DB_REDIR_PERM) ^
+        statCounts.get(CrawlDatum.STATUS_DB_REDIR_TEMP) ^
+        statCounts.get(CrawlDatum.STATUS_DB_GONE);
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    HostDatum result = (HostDatum)super.clone();
+    result.score = score;
+    result.lastCheck = lastCheck;
+    result.homepageUrl = homepageUrl;
+
+    result.dnsFailures = dnsFailures;
+    result.connectionFailures = connectionFailures;
+
+    result.setStat(CrawlDatum.STATUS_DB_UNFETCHED, statCounts.get(CrawlDatum.STATUS_DB_UNFETCHED));
+    result.setStat(CrawlDatum.STATUS_DB_FETCHED, statCounts.get(CrawlDatum.STATUS_DB_FETCHED));
+    result.setStat(CrawlDatum.STATUS_DB_NOTMODIFIED, statCounts.get(CrawlDatum.STATUS_DB_NOTMODIFIED));
+    result.setStat(CrawlDatum.STATUS_DB_REDIR_PERM, statCounts.get(CrawlDatum.STATUS_DB_REDIR_PERM));
+    result.setStat(CrawlDatum.STATUS_DB_REDIR_TEMP, statCounts.get(CrawlDatum.STATUS_DB_REDIR_TEMP));
+    result.setStat(CrawlDatum.STATUS_DB_GONE, statCounts.get(CrawlDatum.STATUS_DB_GONE));
+
+    result.metaData = metaData;
+
+    return result;
+  }
+}
Index: src/bin/nutch
===================================================================
--- src/bin/nutch	(revision 1550985)
+++ src/bin/nutch	(working copy)
@@ -66,6 +66,8 @@
   echo "  solrdedup         remove duplicates from solr - DEPRECATED use the dedup command instead"
   echo "  solrclean         remove HTTP 301 and 404 documents from solr - DEPRECATED use the clean command instead"
   echo "  clean             remove HTTP 301 and 404 documents and duplicates from indexing backends configured via plugins"
+  echo "  hostdb            create a HostDB (or update an earlier one) from the CrawlDB"
+  echo "  readhostdb        dumps HostDB data"
   echo "  parsechecker      check the parser for a given url"
   echo "  indexchecker      check the indexing filters for a given url"
   echo "  domainstats       calculate domain statistics from crawldb"
@@ -236,6 +238,10 @@
   shift; shift
 elif [ "$COMMAND" = "clean" ] ; then
   CLASS=org.apache.nutch.indexer.CleaningJob
+elif [ "$COMMAND" = "hostdb" ] ; then
+  CLASS=org.apache.nutch.util.hostdb.HostDb
+elif [ "$COMMAND" = "readhostdb" ] ; then
+  CLASS=org.apache.nutch.util.hostdb.DumpHostDb 
 elif [ "$COMMAND" = "parsechecker" ] ; then
   CLASS=org.apache.nutch.parse.ParserChecker
 elif [ "$COMMAND" = "indexchecker" ] ; then
