Index: src/test/org/apache/nutch/crawl/TestMapWritable.java
===================================================================
--- src/test/org/apache/nutch/crawl/TestMapWritable.java	(revision 735743)
+++ src/test/org/apache/nutch/crawl/TestMapWritable.java	(working copy)
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nutch.crawl;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.nutch.crawl.CrawlDatum;
-import org.apache.nutch.crawl.MapWritable;
-import org.apache.nutch.util.NutchConfiguration;
-
-public class TestMapWritable extends TestCase {
-
-  private Configuration configuration = NutchConfiguration.create();
-
-  public void testMap() throws Exception {
-    MapWritable map = new MapWritable();
-    assertTrue(map.isEmpty());
-    for (int i = 0; i < 100; i++) {
-      Text key = new Text("" + i);
-      IntWritable value = new IntWritable(i);
-      map.put(key, value);
-      assertEquals(i + 1, map.size());
-      assertTrue(map.containsKey(new Text("" + i)));
-      assertTrue(map.containsValue(new IntWritable(i)));
-      map.remove(key);
-      assertEquals(i, map.size());
-      map.put(key, value);
-      assertEquals(value, map.get(key));
-      assertFalse(map.isEmpty());
-      assertTrue(map.keySet().contains(key));
-      assertEquals(i + 1, map.values().size());
-      assertTrue(map.values().contains(value));
-    }
-    testWritable(map);
-    MapWritable map2 = new MapWritable();
-    testWritable(map2);
-    map2.putAll(map);
-    assertEquals(100, map2.size());
-    testWritable(map2);
-
-    map.clear();
-    assertTrue(map.isEmpty());
-    assertEquals(0, map.size());
-    assertFalse(map.containsKey(new Text("" + 1)));
-
-  }
-
-  public void testWritable() throws Exception {
-    MapWritable datum1 = new MapWritable();
-    for (int i = 0; i < 100; i++) {
-      datum1.put(new LongWritable(i), new Text("" + 1));
-    }
-    assertEquals(100, datum1.size());
-    testWritable(datum1);
-
-    MapWritable datum2 = new MapWritable();
-    for (int i = 0; i < 100; i++) {
-      datum2.put(new DummyWritable(i), new DummyWritable(i));
-    }
-    assertEquals(100, datum2.size());
-    testWritable(datum2);
-
-    CrawlDatum c = new CrawlDatum(CrawlDatum.STATUS_DB_FETCHED, 1);
-    c.setMetaData(new MapWritable());
-    for (int i = 0; i < 100; i++) {
-      c.getMetaData().put(new LongWritable(i), new Text("" + 1));
-    }
-    testWritable(c);
-  }
-  
-  public void testEquals() {
-    MapWritable map1 = new MapWritable();
-    MapWritable map2 = new MapWritable();
-    map1.put(new Text("key1"), new Text("val1"));
-    map1.put(new Text("key2"), new Text("val2"));
-    map2.put(new Text("key2"), new Text("val2"));
-    map2.put(new Text("key1"), new Text("val1"));
-    assertTrue(map1.equals(map2));
-  }
-
-  public void testPerformance() throws Exception {
-    FileSystem fs = FileSystem.get(configuration);
-    Path file = new Path(System.getProperty("java.io.tmpdir"), "mapTestFile");
-    fs.delete(file, false);
-    org.apache.hadoop.io.SequenceFile.Writer writer = SequenceFile.createWriter(
-        fs, configuration, file, IntWritable.class, MapWritable.class);
-    // write map
-    System.out.println("start writing map's");
-    long start = System.currentTimeMillis();
-    IntWritable key = new IntWritable();
-    MapWritable map = new MapWritable();
-    LongWritable mapValue = new LongWritable();
-    for (int i = 0; i < 1000000; i++) {
-      key.set(i);
-      mapValue.set(i);
-      map.put(key, mapValue);
-      writer.append(key, map);
-    }
-    long needed = System.currentTimeMillis() - start;
-    writer.close();
-    System.out.println("needed time for writing map's: " + needed);
-
-    // read map
-
-    org.apache.hadoop.io.SequenceFile.Reader reader = new SequenceFile.Reader(
-        fs, file, configuration);
-    System.out.println("start reading map's");
-    start = System.currentTimeMillis();
-    while (reader.next(key, map)) {
-
-    }
-    reader.close();
-    needed = System.currentTimeMillis() - start;
-    System.out.println("needed time for reading map's: " + needed);
-    fs.delete(file);
-
-    // Text
-    System.out.println("start writing Text's");
-    writer = SequenceFile.createWriter(fs, configuration, file, IntWritable.class, Text.class);
-    // write map
-    start = System.currentTimeMillis();
-    key = new IntWritable();
-    Text value = new Text();
-    String s = "15726:15726";
-    for (int i = 0; i < 1000000; i++) {
-      key.set(i);
-      value.set(s);
-      writer.append(key, value);
-    }
-    needed = System.currentTimeMillis() - start;
-    writer.close();
-    System.out.println("needed time for writing Text's: " + needed);
-
-    // read map
-    System.out.println("start reading Text's");
-    reader = new SequenceFile.Reader(fs, file, configuration);
-    start = System.currentTimeMillis();
-    while (reader.next(key, value)) {
-
-    }
-    needed = System.currentTimeMillis() - start;
-    System.out.println("needed time for reading Text: " + needed);
-    fs.delete(file, false);
-  }
-
-  /** Utility method for testing writables, from hadoop code */
-  public void testWritable(Writable before) throws Exception {
-    DataOutputBuffer dob = new DataOutputBuffer();
-    before.write(dob);
-
-    DataInputBuffer dib = new DataInputBuffer();
-    dib.reset(dob.getData(), dob.getLength());
-
-    Writable after = (Writable) before.getClass().newInstance();
-    after.readFields(dib);
-
-    assertEquals(before, after);
-  }
-
-  public void testRecycling() throws Exception {
-    Text value = new Text("value");
-    Text key1 = new Text("a");
-    Text key2 = new Text("b");
-
-    MapWritable writable = new MapWritable();
-    writable.put(key1, value);
-    assertEquals(writable.get(key1), value);
-    assertNull(writable.get(key2));
-
-    DataOutputBuffer dob = new DataOutputBuffer();
-    writable.write(dob);
-    writable.clear();
-    writable.put(key1, value);
-    writable.put(key2, value);
-    assertEquals(writable.get(key1), value);
-    assertEquals(writable.get(key2), value);
-
-    DataInputBuffer dib = new DataInputBuffer();
-    dib.reset(dob.getData(), dob.getLength());
-    writable.readFields(dib);
-    assertEquals(writable.get(key1), value);
-    assertNull(writable.get(key2));
-  }
-
-  public static void main(String[] args) throws Exception {
-    TestMapWritable writable = new TestMapWritable();
-    writable.testPerformance();
-  }
-
-}
Index: src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java
===================================================================
--- src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java	(revision 735743)
+++ src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java	(working copy)
@@ -26,6 +26,7 @@
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
@@ -44,7 +45,6 @@
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.CrawlDb;
-import org.apache.nutch.crawl.MapWritable;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
 
Index: src/java/org/apache/nutch/tools/compat/ReprUrlFixer.java
===================================================================
--- src/java/org/apache/nutch/tools/compat/ReprUrlFixer.java	(revision 735743)
+++ src/java/org/apache/nutch/tools/compat/ReprUrlFixer.java	(working copy)
@@ -21,6 +21,7 @@
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -37,7 +38,6 @@
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.CrawlDb;
-import org.apache.nutch.crawl.MapWritable;
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.scoring.webgraph.Node;
 import org.apache.nutch.util.FSUtils;
Index: src/java/org/apache/nutch/crawl/CrawlDatum.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDatum.java	(revision 735743)
+++ src/java/org/apache/nutch/crawl/CrawlDatum.java	(working copy)
@@ -19,17 +19,18 @@
 
 import java.io.*;
 import java.util.*;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.io.*;
 import org.apache.nutch.util.*;
 
 /* The crawl state of a url. */
-public class CrawlDatum implements WritableComparable, Cloneable {
+public class CrawlDatum implements WritableComparable<CrawlDatum>, Cloneable {
   public static final String GENERATE_DIR_NAME = "crawl_generate";
   public static final String FETCH_DIR_NAME = "crawl_fetch";
   public static final String PARSE_DIR_NAME = "crawl_parse";
 
-  private final static byte CUR_VERSION = 6;
+  private final static byte CUR_VERSION = 7;
 
   /** Compatibility values for on-the-fly conversion from versions < 5. */
   private static final byte OLD_STATUS_SIGNATURE = 0;
@@ -118,7 +119,7 @@
   private float score = 1.0f;
   private byte[] signature = null;
   private long modifiedTime;
-  private MapWritable metaData;
+  private org.apache.hadoop.io.MapWritable metaData;
   
   public static boolean hasDbStatus(CrawlDatum datum) {
     if (datum.status <= STATUS_DB_MAX) return true;
@@ -131,10 +132,11 @@
   }
 
   public CrawlDatum() {
-    metaData = new MapWritable();
+    metaData = new org.apache.hadoop.io.MapWritable();
   }
 
   public CrawlDatum(int status, int fetchInterval) {
+    this();
     this.status = (byte)status;
     this.fetchInterval = fetchInterval;
   }
@@ -201,14 +203,16 @@
     this.signature = signature;
   }
   
-   public void setMetaData(MapWritable mapWritable) {this.metaData = mapWritable; }
+   public void setMetaData(org.apache.hadoop.io.MapWritable mapWritable) {
+     this.metaData = mapWritable;
+   }
 
   /**
    * returns a MapWritable if it was set or read in @see readFields(DataInput), 
    * returns empty map in case CrawlDatum was freshly created (lazily instantiated).
    */
-  public MapWritable getMetaData() {
-    if (this.metaData == null) this.metaData = new MapWritable();
+  public org.apache.hadoop.io.MapWritable getMetaData() {
+    if (this.metaData == null) this.metaData = new org.apache.hadoop.io.MapWritable();
     return this.metaData;
   }
   
@@ -223,7 +227,6 @@
     return result;
   }
 
-
   public void readFields(DataInput in) throws IOException {
     byte version = in.readByte();                 // read version
     if (version > CUR_VERSION)                   // check version
@@ -244,10 +247,20 @@
         in.readFully(signature);
       } else signature = null;
     }
+    metaData = new org.apache.hadoop.io.MapWritable();
     if (version > 3) {
-      metaData.clear();
-      if (in.readBoolean()) {
-        metaData.readFields(in);
+      if (version < 7) {
+        MapWritable oldMetaData = new MapWritable();
+        if (in.readBoolean()) {
+          oldMetaData.readFields(in);
+        }
+        for (Writable key : oldMetaData.keySet()) {
+          metaData.put(key, oldMetaData.get(key));
+        }
+      } else {
+        if (in.readBoolean()) {
+          metaData.readFields(in);
+        }
       }
     }
     // translate status codes
@@ -278,7 +291,7 @@
       out.writeByte(signature.length);
       out.write(signature);
     }
-    if (metaData != null && metaData.size() > 0) {
+    if (metaData.size() > 0) {
       out.writeBoolean(true);
       metaData.write(out);
     } else {
@@ -295,7 +308,7 @@
     this.score = that.score;
     this.modifiedTime = that.modifiedTime;
     this.signature = that.signature;
-    this.metaData = new MapWritable(that.metaData); // make a deep copy
+    this.metaData = new org.apache.hadoop.io.MapWritable(that.metaData); // make a deep copy
   }
 
 
@@ -304,8 +317,7 @@
   //
   
   /** Sort by decreasing score. */
-  public int compareTo(Object o) {
-    CrawlDatum that = (CrawlDatum)o; 
+  public int compareTo(CrawlDatum that) {
     if (that.score != this.score)
       return (that.score - this.score) > 0 ? 1 : -1;
     if (that.status != this.status)
@@ -367,7 +379,7 @@
   //
 
   public String toString() {
-    StringBuffer buf = new StringBuffer();
+    StringBuilder buf = new StringBuilder();
     buf.append("Version: " + CUR_VERSION + "\n");
     buf.append("Status: " + getStatus() + " (" + getStatusName(getStatus()) + ")\n");
     buf.append("Fetch time: " + new Date(getFetchTime()) + "\n");
@@ -377,9 +389,23 @@
         (getFetchInterval() / FetchSchedule.SECONDS_PER_DAY) + " days)\n");
     buf.append("Score: " + getScore() + "\n");
     buf.append("Signature: " + StringUtil.toHexString(getSignature()) + "\n");
-    buf.append("Metadata: " + (metaData != null ? metaData.toString() : "null") + "\n");
+    buf.append("Metadata: ");
+    for (Entry<Writable, Writable> e : metaData.entrySet()) {
+      buf.append(e.getKey());
+      buf.append(": ");
+      buf.append(e.getValue());
+    }
+    buf.append('\n');
     return buf.toString();
   }
+  
+  private boolean metadataEquals(org.apache.hadoop.io.MapWritable otherMetaData) {
+    HashSet<Entry<Writable, Writable>> set1 =
+      new HashSet<Entry<Writable,Writable>>(metaData.entrySet());
+    HashSet<Entry<Writable, Writable>> set2 =
+      new HashSet<Entry<Writable,Writable>>(otherMetaData.entrySet());
+    return set1.equals(set2);
+  }
 
   public boolean equals(Object o) {
     if (!(o instanceof CrawlDatum))
@@ -394,18 +420,7 @@
       (SignatureComparator._compare(this.signature, other.signature) == 0) &&
       (this.score == other.score);
     if (!res) return res;
-    // allow zero-sized metadata to be equal to null metadata
-    if (this.metaData == null) {
-      if (other.metaData != null && other.metaData.size() > 0) return false;
-      else return true;
-    } else {
-      if (other.metaData == null) {
-        if (this.metaData.size() == 0) return true;
-        else return false;
-      } else {
-        return this.metaData.equals(other.metaData);
-      }
-    }
+    return metadataEquals(other.metaData);
   }
 
   public int hashCode() {
@@ -416,7 +431,7 @@
                 signature[i+2] << 8 + signature[i+3]);
       }
     }
-    if (metaData != null) res ^= metaData.hashCode();
+    res ^= metaData.entrySet().hashCode();
     return
       res ^ status ^
       ((int)fetchTime) ^
Index: src/java/org/apache/nutch/crawl/MapWritable.java
===================================================================
--- src/java/org/apache/nutch/crawl/MapWritable.java	(revision 735743)
+++ src/java/org/apache/nutch/crawl/MapWritable.java	(working copy)
@@ -57,6 +57,7 @@
  * into the header of each MapWritable that uses these types.
  *
  * @author Stefan Groschupf
+ * @deprecated Use org.apache.hadoop.io.MapWritable instead.
  */
 public class MapWritable implements Writable {
 
Index: src/java/org/apache/nutch/crawl/CrawlDbMerger.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDbMerger.java	(revision 735743)
+++ src/java/org/apache/nutch/crawl/CrawlDbMerger.java	(working copy)
@@ -53,7 +53,7 @@
   private static final Log LOG = LogFactory.getLog(CrawlDbMerger.class);
 
   public static class Merger extends MapReduceBase implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
-    private MapWritable meta = new MapWritable();
+    private org.apache.hadoop.io.MapWritable meta = new org.apache.hadoop.io.MapWritable();
     private CrawlDatum res = new CrawlDatum();
     private FetchSchedule schedule;
 
