Index: src/java/org/apache/nutch/crawl/HostDatum.java
===================================================================
--- src/java/org/apache/nutch/crawl/HostDatum.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/HostDatum.java	(revision 0)
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Holds data about a host.
+ *
+ * @author Otis Gospodnetic
+ * @version $Id$
+ */
+public class HostDatum implements WritableComparable, Cloneable {
+  private final static byte CUR_VERSION = 1;
+  private MapWritable metaData;
+
+  public HostDatum() {
+    metaData = new MapWritable();
+  }
+
+  public void setMetaData(MapWritable mapWritable) { this.metaData = mapWritable; }
+
+  /**
+   * Returns a MapWritable if it was set or read in.
+   * @see readFields(DataInput) 
+   * Returns an empty map in case HostDatum was freshly created (lazily instantiated).
+   */
+  public MapWritable getMetaData() {
+    if (this.metaData == null) this.metaData = new MapWritable();
+    return this.metaData;
+  }
+
+
+  //
+  // writable methods
+  //
+
+  public static HostDatum read(DataInput in) throws IOException {
+    HostDatum result = new HostDatum();
+    result.readFields(in);
+    return result;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    byte version = in.readByte();                 // read version
+    if (version > CUR_VERSION)                    // check version
+      throw new VersionMismatchException(CUR_VERSION, version);
+
+    metaData.clear();
+    if (in.readBoolean()) {
+      metaData.readFields(in);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeByte(CUR_VERSION);                   // store current version
+    if (metaData != null && metaData.size() > 0) {
+      out.writeBoolean(true);
+      metaData.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  /** Copy the contents of another instance into this instance. */
+  public void set(HostDatum that) {
+    this.metaData = new MapWritable(that.metaData); // make a deep copy
+  }
+
+
+  //
+  // compare methods
+  //
+
+  /** Sort by ....? */
+  public int compareTo(Object o) {
+    // TODO: buh?
+    return 1;
+  }
+
+
+  //
+  // basic methods
+  //
+
+  public String toString() {
+    StringBuffer buf = new StringBuffer();
+    buf.append("Version: " + CUR_VERSION + "\n");
+    buf.append("Metadata: " + (metaData != null ? metaData.toString() : "null") + "\n");
+    return buf.toString();
+  }
+
+  public boolean equals(Object o) {
+    if (!(o instanceof HostDatum))
+      return false;
+    HostDatum other = (HostDatum)o;
+    // allow zero-sized metadata to be equal to null metadata
+    if (this.metaData == null) {
+      if (other.metaData != null && other.metaData.size() > 0) return false;
+      else return true;
+    } else {
+      if (other.metaData == null) {
+        if (this.metaData.size() == 0) return true;
+        else return false;
+      } else {
+        return this.metaData.equals(other.metaData);
+      }
+    }
+  }
+
+  public int hashCode() {
+    int res = 0;
+    if (metaData != null) res ^= metaData.hashCode();
+    return res;
+  }
+
+  public Object clone() {
+    try {
+      return super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
Index: src/java/org/apache/nutch/crawl/HostDb.java
===================================================================
--- src/java/org/apache/nutch/crawl/HostDb.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/HostDb.java	(revision 0)
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * IGNORE JAVADOC BELOW -- the records should be Text, MapWritable tuples
+ * 
+ * host[:port] attempts successes failures exceptions timeouts unknownhost speed
+ * e.g.
+ * <pre>
+ * www.good.com 100 100 0 0 0 0 50          // 100 of 100 requests succeeded, 50 kbps
+ * www.flaky.com 100 35 65 5 10 0 20        // 35 of 100 request succeeded, 65 failed (5 exceptions, 10 timeouts)
+ * www.timeouty.com 100 0 100 0 100 0 0     // all 100 requests failed (all 100 were timeouts, hence 0 kbps) 
+ * www.slow.com 100 95 5 0 0 0 5            // 95 requests suceeded, 5 failed, but d/l speed is only 5 kbps 
+ * www.unknown.com 100 0 100 0 0 100 0      // all 100 requests failed (all 100 were DNS lookup failures, hence 0 kbps)
+ * </pre>
+ * 
+ * @author Otis Gospodnetic
+ * @version $Id$
+ */
+public class HostDb extends Configured implements Tool {
+  public static final Log LOG = LogFactory.getLog(HostDb.class);
+
+  public static final String CURRENT_NAME = "current"; // FIXME
+  public static final String LOCK_NAME = ".locked";
+
+  public HostDb() {}
+  public HostDb(Configuration conf) {
+    setConf(conf);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("Usage: HostDb <hostdb> (-dir <segments> | <seg1> <seg2> ...)");
+      System.err.println("\thostdb\tHostDb to read or update");
+      System.err.println("\t-dir segments\tparent directory containing all segments to update from");
+      System.err.println("\tseg1 seg2 ...\tlist of segment names to update from");
+      return -1;
+    }
+    boolean update = false; 
+    FileSystem fs = FileSystem.get(getConf());
+    HashSet<Path> dirs = new HashSet<Path>();
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-dir")) {
+        update = true;
+        FileStatus[] paths = fs.listStatus(new Path(args[++i]), HadoopFSUtil.getPassDirectoriesFilter(fs));
+        dirs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths)));
+      } else {
+        dirs.add(new Path(args[i]));
+      }
+    }
+    try {
+      if (update)
+        update(new Path(args[0]), dirs.toArray(new Path[dirs.size()]));
+      else
+        read(new Path(args[0]));
+      return 0;
+    } catch (Exception e) {
+      LOG.fatal("HostDb update: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+
+  //////////////////////////////////////
+
+  public void read(Path hostDb) throws IOException {
+    FileSystem fs = FileSystem.get(getConf());
+    LOG.info("HostDb read: starting");
+    LOG.info("HostDb read: db: " + hostDb);
+
+    JobConf job = createReadJob(getConf(), hostDb);
+    job.addInputPath(hostDb);
+
+    try {
+      JobClient.runJob(job);
+    } catch (IOException e) {
+      if (fs.exists(job.getOutputPath())) fs.delete(job.getOutputPath());
+      throw e;
+    }
+  }
+
+  public static JobConf createReadJob(Configuration config, Path hostDb) throws IOException {
+    Path outDir = new Path(hostDb, Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+    System.out.println("Output: " + outDir);     // FIXME: pass in -outDir value
+
+    JobConf job = new NutchJob(config);
+    job.setJobName("hostdb read " + hostDb + " to " + outDir);
+
+    Path current = new Path(hostDb, CURRENT_NAME);
+    if (FileSystem.get(job).exists(current)) {
+      job.addInputPath(current);
+    }
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setOutputPath(outDir);
+
+    job.setMapperClass(ReadMapper.class);
+    job.setReducerClass(ReadReducer.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(MapWritable.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(MapWritable.class);
+
+    return job;
+  }
+
+  public static class ReadMapper extends MapReduceBase implements Mapper<WritableComparable, Writable, Text, HostDatum> {
+    public void map(WritableComparable key, Writable value, OutputCollector<Text, HostDatum> output, Reporter reporter)
+    throws IOException {
+      Text host = (Text)key;
+      HostDatum datum = (HostDatum)value;
+      output.collect(host, datum);
+    }
+  }
+
+  public static class ReadReducer extends MapReduceBase implements Reducer<Text, HostDatum, Text, MapWritable> {
+    final MapWritable meta = new MapWritable();
+    final Text REQUESTS = new Text("Requests");
+    final Text SUCCESSES = new Text("Successes");
+    final Text FAILURES = new Text("Failures");
+    final Text EXCEPTIONS = new Text("Exceptions");
+    final Text TIMEOUTS = new Text("Timeouts");
+    final Text DNS_FAILURES = new Text("DnsFailures");
+    final FloatWritable DOWNLOAD_SPEED = new FloatWritable(0f);
+
+    public void reduce(Text key, Iterator<HostDatum> values, OutputCollector<Text, MapWritable> output, Reporter reporter)
+    throws IOException {
+      meta.clear();
+      long requests = 0;
+      long successes = 0;
+      long failures = 0;
+      long exceptions = 0;
+      long timeouts = 0;
+      long dnsFailures = 0;
+      float dlSpeed = 0f;
+
+      while (values.hasNext()) {
+        HostDatum datum = values.next();
+        MapWritable map = datum.getMetaData();
+        // TODO: OG: figure out what to do here for each MapWritable key
+        requests += ((LongWritable)map.get(REQUESTS)).get();
+        successes += ((LongWritable)map.get(SUCCESSES)).get();
+        failures += ((LongWritable)map.get(FAILURES)).get();
+        exceptions += ((LongWritable)map.get(EXCEPTIONS)).get();
+        timeouts += ((LongWritable)map.get(TIMEOUTS)).get();
+        dnsFailures += ((LongWritable)map.get(DNS_FAILURES)).get();
+        // dlSpeed - FIXME: what to do with this puppy, what to do, what to do...
+
+        meta.put(REQUESTS, new LongWritable(requests));
+        meta.put(SUCCESSES, new LongWritable(successes));
+        meta.put(FAILURES, new LongWritable(failures));
+        meta.put(EXCEPTIONS, new LongWritable(exceptions));
+        meta.put(TIMEOUTS, new LongWritable(timeouts));
+        meta.put(DNS_FAILURES, new LongWritable(dnsFailures));
+        meta.put(DOWNLOAD_SPEED, new FloatWritable(dlSpeed));
+      }
+      output.collect(key, meta);
+    }
+  }
+
+  //////////////////////////////////////
+
+  public void update(Path hostDb, Path[] segments) throws IOException {
+    FileSystem fs = FileSystem.get(getConf());
+    Path lock = new Path(hostDb, LOCK_NAME);
+    LockUtil.createLockFile(fs, lock, false);   // FIXME: force
+    if (LOG.isInfoEnabled()) {
+      LOG.info("HostDb update: starting");
+      LOG.info("HostDb update: db: " + hostDb);
+      LOG.info("HostDb update: segments: " + Arrays.asList(segments));
+    }
+
+    JobConf job = createUpdateJob(getConf(), hostDb);
+    for (int i = 0; i < segments.length; i++) {
+      Path fetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);
+      if (fs.exists(fetch)) {
+        job.addInputPath(fetch);
+      } else {
+        LOG.info(" - skipping invalid segment " + segments[i]);
+      }
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("HostDb update: Merging segment data into db.");
+    }
+    try {
+      JobClient.runJob(job);
+    } catch (IOException e) {
+      LockUtil.removeLockFile(fs, lock);
+      if (fs.exists(job.getOutputPath())) fs.delete(job.getOutputPath());
+      throw e;
+    }
+
+    install(job, hostDb);
+    if (LOG.isInfoEnabled()) { LOG.info("CrawlDb update: done"); }
+  }
+
+  public static JobConf createUpdateJob(Configuration config, Path hostDb) throws IOException {
+    Path newHostDb = new Path(hostDb, Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(config);
+    job.setJobName("hostdb update " + hostDb);
+
+    Path current = new Path(hostDb, CURRENT_NAME);
+    if (FileSystem.get(job).exists(current)) {
+      job.addInputPath(current);
+    }
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setOutputFormat(MapFileOutputFormat.class);     // TODO: OG: is this correct?
+    job.setOutputPath(newHostDb);
+
+    job.setMapperClass(UpdateMapper.class);
+    job.setReducerClass(UpdateReducer.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(MapWritable.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(HostDatum.class);
+
+    return job;
+  }
+
+  public static void install(JobConf job, Path hostDb) throws IOException {
+    Path newHostDb = job.getOutputPath();
+    FileSystem fs = new JobClient(job).getFs();
+    Path old = new Path(hostDb, "old");
+    Path current = new Path(hostDb, CURRENT_NAME);
+    if (fs.exists(current)) {
+      if (fs.exists(old)) fs.delete(old);
+      fs.rename(current, old);
+    }
+    fs.mkdirs(hostDb);
+    fs.rename(newHostDb, current);
+    if (fs.exists(old)) fs.delete(old);
+    Path lock = new Path(hostDb, LOCK_NAME);
+    LockUtil.removeLockFile(fs, lock);
+  }
+
+
+  //////////////////////////////////////
+
+  /**
+   */
+  public static class UpdateMapper extends MapReduceBase implements Mapper<WritableComparable, Writable, Text, MapWritable> {
+    final MapWritable meta = new MapWritable();
+
+    public void map(WritableComparable key, Writable value, OutputCollector<Text, MapWritable> output, Reporter reporter)
+    throws IOException {
+      meta.clear();
+      Text host = (Text)key;
+      HostDatum datum = (HostDatum)value;
+      // TODO: do nothing?
+      output.collect(host, meta);
+    }
+  }
+
+  /**
+   */
+  public static class UpdateReducer extends MapReduceBase implements Reducer<Text, MapWritable, Text, MapWritable> {
+
+    public void reduce(Text key, Iterator<MapWritable> values, OutputCollector<Text, MapWritable> output, Reporter reporter)
+    throws IOException {
+      MapWritable meta = new MapWritable();
+      while (values.hasNext()) {
+        MapWritable map = values.next();
+        // TODO: OG: figure out what to do here for each MapWritable key
+      }
+      meta.put(new Text("FIXME"), new Text("FIXME"));
+      output.collect(key, meta);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new HostDb(), args);
+    System.exit(res);
+  }
+}
