Index: src/java/org/apache/nutch/fetcher/Fetcher.java
===================================================================
--- src/java/org/apache/nutch/fetcher/Fetcher.java	(revision 596792)
+++ src/java/org/apache/nutch/fetcher/Fetcher.java	(working copy)
@@ -69,8 +69,8 @@
     }
   }
 
-  private RecordReader input;
-  private OutputCollector output;
+  private RecordReader<Text, CrawlDatum> input;
+  private OutputCollector<Text, NutchWritable> output;
   private Reporter reporter;
 
   private String segmentName;
Index: src/java/org/apache/nutch/fetcher/Fetcher2.java
===================================================================
--- src/java/org/apache/nutch/fetcher/Fetcher2.java	(revision 596792)
+++ src/java/org/apache/nutch/fetcher/Fetcher2.java	(working copy)
@@ -83,7 +83,8 @@
  * 
  * @author Andrzej Bialecki
  */
-public class Fetcher2 extends Configured implements MapRunnable { 
+public class Fetcher2 extends Configured implements
+        MapRunnable<Text, CrawlDatum, Text, NutchWritable> { 
 
   public static final Log LOG = LogFactory.getLog(Fetcher2.class);
   
@@ -101,7 +102,7 @@
     }
   }
 
-  private OutputCollector output;
+  private OutputCollector<Text, NutchWritable> output;
   private Reporter reporter;
   
   private String segmentName;
@@ -376,11 +377,11 @@
    * items are consumed by FetcherThread-s.
    */
   private static class QueueFeeder extends Thread {
-    private RecordReader reader;
+    private RecordReader<Text, CrawlDatum> reader;
     private FetchItemQueues queues;
     private int size;
     
-    public QueueFeeder(RecordReader reader, FetchItemQueues queues, int size) {
+    public QueueFeeder(RecordReader<Text, CrawlDatum> reader, FetchItemQueues queues, int size) {
       this.reader = reader;
       this.queues = queues;
       this.size = size;
@@ -831,7 +832,7 @@
     return conf.getBoolean("fetcher.store.content", true);
   }
 
-  public void run(RecordReader input, OutputCollector output,
+  public void run(RecordReader<Text, CrawlDatum> input, OutputCollector<Text, NutchWritable> output,
                   Reporter reporter) throws IOException {
 
     this.output = output;
Index: src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
===================================================================
--- src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java	(revision 596792)
+++ src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java	(working copy)
@@ -26,7 +26,6 @@
 
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -38,18 +37,19 @@
 import org.apache.hadoop.util.Progressable;
 
 import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
 import org.apache.nutch.parse.ParseOutputFormat;
 import org.apache.nutch.protocol.Content;
 
 /** Splits FetcherOutput entries into multiple map files. */
-public class FetcherOutputFormat implements OutputFormat {
+public class FetcherOutputFormat implements OutputFormat<Text, NutchWritable> {
 
   public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
     if (fs.exists(new Path(job.getOutputPath(), CrawlDatum.FETCH_DIR_NAME)))
       throw new IOException("Segment already fetched!");
   }
 
-  public RecordWriter getRecordWriter(final FileSystem fs,
+  public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
                                       final JobConf job,
                                       final String name,
                                       final Progressable progress) throws IOException {
@@ -65,9 +65,9 @@
       new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
           compType, progress);
     
-    return new RecordWriter() {
+    return new RecordWriter<Text, NutchWritable>() {
         private MapFile.Writer contentOut;
-        private RecordWriter parseOut;
+        private RecordWriter<Text, Writable> parseOut;
 
         {
           if (Fetcher.isStoringContent(job)) {
@@ -81,7 +81,7 @@
           }
         }
 
-        public void write(WritableComparable key, Writable value)
+        public void write(Text key, NutchWritable value)
           throws IOException {
 
           Writable w = ((NutchWritable)value).get();
Index: src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java
===================================================================
--- src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java	(revision 596792)
+++ src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java	(working copy)
@@ -52,7 +52,7 @@
  * 
  * @author Andrzej Bialecki
  */
-public class CrawlDbConverter extends ToolBase implements Mapper {
+public class CrawlDbConverter extends ToolBase implements Mapper<Text, CrawlDatum, Text, CrawlDatum> {
   private static final Log LOG = LogFactory.getLog(CrawlDbConverter.class);
   
   private static final String CONVERT_META_KEY = "db.converter.with.metadata";
@@ -66,12 +66,11 @@
     newKey = new Text();
   }
 
-  public void map(WritableComparable key, Writable value, OutputCollector output,
+  public void map(Text key, CrawlDatum value, OutputCollector<Text, CrawlDatum> output,
       Reporter reporter) throws IOException {
     newKey.set(key.toString());
     if (withMetadata) {
-      CrawlDatum datum = (CrawlDatum)value;
-      MapWritable meta = datum.getMetaData();
+      MapWritable meta = value.getMetaData();
       if (meta.size() > 0) {
         MapWritable newMeta = new MapWritable();
         Iterator it = meta.keySet().iterator();
@@ -84,7 +83,7 @@
           }
           newMeta.put(k, v);
         }
-        datum.setMetaData(newMeta);
+        value.setMetaData(newMeta);
       }
     }
     output.collect(newKey, value);
Index: src/java/org/apache/nutch/tools/arc/ArcRecordReader.java
===================================================================
--- src/java/org/apache/nutch/tools/arc/ArcRecordReader.java	(revision 596792)
+++ src/java/org/apache/nutch/tools/arc/ArcRecordReader.java	(working copy)
@@ -28,8 +28,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -50,7 +48,7 @@
  * @see http://www.grub.org/
  */
 public class ArcRecordReader
-  implements RecordReader {
+  implements RecordReader<Text, BytesWritable> {
 
   public static final Log LOG = LogFactory.getLog(ArcRecordReader.class);
 
@@ -123,15 +121,15 @@
   /**
    * Creates a new instance of the <code>Text</code> object for the key.
    */
-  public WritableComparable createKey() {
-    return (WritableComparable)ReflectionUtils.newInstance(Text.class, conf);
+  public Text createKey() {
+    return (Text)ReflectionUtils.newInstance(Text.class, conf);
   }
 
   /**
    * Creates a new instance of the <code>BytesWritable</code> object for the key
    */
-  public Writable createValue() {
-    return (Writable)ReflectionUtils.newInstance(BytesWritable.class, conf);
+  public BytesWritable createValue() {
+    return (BytesWritable)ReflectionUtils.newInstance(BytesWritable.class, conf);
   }
 
   /**
@@ -175,7 +173,7 @@
    * 
    * @throws IOException If an error occurs while reading the record value.
    */
-  public boolean next(WritableComparable key, Writable value)
+  public boolean next(Text key, BytesWritable value)
     throws IOException {
 
     try {
Index: src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
===================================================================
--- src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java	(revision 596792)
+++ src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java	(working copy)
@@ -69,7 +69,7 @@
  */
 public class ArcSegmentCreator
   extends ToolBase
-  implements Mapper {
+  implements Mapper<Text, BytesWritable, Text, NutchWritable> {
 
   public static final Log LOG = LogFactory.getLog(ArcSegmentCreator.class);
   public static final String URL_VERSION = "arc.url.version";
@@ -145,7 +145,7 @@
    * 
    * @return The result of the parse in a ParseStatus object.
    */
-  private ParseStatus output(OutputCollector output, String segmentName,
+  private ParseStatus output(OutputCollector<Text, NutchWritable> output, String segmentName,
     Text key, CrawlDatum datum, Content content, ProtocolStatus pstatus,
     int status) {
 
@@ -270,7 +270,7 @@
    * @param output The output collecter.
    * @param reporter The progress reporter.
    */
-  public void map(WritableComparable key, Writable value,
+  public void map(Text key, BytesWritable value,
     OutputCollector output, Reporter reporter)
     throws IOException {
 
Index: src/java/org/apache/nutch/tools/arc/ArcInputFormat.java
===================================================================
--- src/java/org/apache/nutch/tools/arc/ArcInputFormat.java	(revision 596792)
+++ src/java/org/apache/nutch/tools/arc/ArcInputFormat.java	(working copy)
@@ -18,6 +18,8 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
@@ -38,7 +40,7 @@
    * @param job The job configuration.
    * @param reporter The progress reporter.
    */
-  public RecordReader getRecordReader(InputSplit split, JobConf job,
+  public RecordReader<Text, BytesWritable> getRecordReader(InputSplit split, JobConf job,
     Reporter reporter)
     throws IOException {
     reporter.setStatus(split.toString());
Index: src/java/org/apache/nutch/tools/FreeGenerator.java
===================================================================
--- src/java/org/apache/nutch/tools/FreeGenerator.java	(revision 596792)
+++ src/java/org/apache/nutch/tools/FreeGenerator.java	(working copy)
@@ -23,9 +23,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
@@ -59,7 +58,9 @@
   private static final String FILTER_KEY = "free.generator.filter";
   private static final String NORMALIZE_KEY = "free.generator.normalize";
 
-  public static class FG extends MapReduceBase implements Mapper, Reducer {
+  public static class FG extends MapReduceBase implements
+            Mapper<LongWritable, Text, Text, CrawlDatum>,
+            Reducer<Text, CrawlDatum, Text, CrawlDatum> {
     private URLNormalizers normalizers = null;
     private URLFilters filters = null;
     private ScoringFilters scfilters;
@@ -78,7 +79,7 @@
       }
     }
 
-    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+    public void map(LongWritable key, Text value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
       // value is a line of text
       String urlString = value.toString();
       try {
@@ -105,9 +106,9 @@
       output.collect(url, datum);
     }
 
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+    public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
       // pick just one (discard duplicates)
-      output.collect(key, (Writable)values.next());
+      output.collect(key, values.next());
     }
   }
   
Index: src/java/org/apache/nutch/crawl/CrawlDbReader.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDbReader.java	(revision 596792)
+++ src/java/org/apache/nutch/crawl/CrawlDbReader.java	(working copy)
@@ -36,8 +36,6 @@
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapFileOutputFormat;
@@ -83,33 +81,32 @@
     }
   }
 
-  public static class CrawlDbStatMapper implements Mapper {
+  public static class CrawlDbStatMapper implements Mapper<Text, CrawlDatum, Text, LongWritable> {
     LongWritable COUNT_1 = new LongWritable(1);
     public void configure(JobConf job) {}
     public void close() {}
-    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter)
+    public void map(Text key, CrawlDatum value, OutputCollector<Text, LongWritable> output, Reporter reporter)
             throws IOException {
-      CrawlDatum cd = (CrawlDatum) value;
       output.collect(new Text("T"), COUNT_1);
-      output.collect(new Text("status " + cd.getStatus()), COUNT_1);
-      output.collect(new Text("retry " + cd.getRetriesSinceFetch()), COUNT_1);
-      output.collect(new Text("s"), new LongWritable((long) (cd.getScore() * 1000.0)));
+      output.collect(new Text("status " + value.getStatus()), COUNT_1);
+      output.collect(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1);
+      output.collect(new Text("s"), new LongWritable((long) (value.getScore() * 1000.0)));
     }
   }
   
-  public static class CrawlDbStatCombiner implements Reducer {
+  public static class CrawlDbStatCombiner implements Reducer<Text, LongWritable, Text, LongWritable> {
     LongWritable val = new LongWritable();
     
     public CrawlDbStatCombiner() { }
     public void configure(JobConf job) { }
     public void close() {}
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+    public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter)
         throws IOException {
       val.set(0L);
-      String k = ((Text)key).toString();
+      String k = key.toString();
       if (!k.equals("s")) {
         while (values.hasNext()) {
-          LongWritable cnt = (LongWritable)values.next();
+          LongWritable cnt = values.next();
           val.set(val.get() + cnt.get());
         }
         output.collect(key, val);
@@ -118,7 +115,7 @@
         long min = Long.MAX_VALUE;
         long max = Long.MIN_VALUE;
         while (values.hasNext()) {
-          LongWritable cnt = (LongWritable)values.next();
+          LongWritable cnt = values.next();
           if (cnt.get() < min) min = cnt.get();
           if (cnt.get() > max) max = cnt.get();
           total += cnt.get();
@@ -130,46 +127,46 @@
     }
   }
 
-  public static class CrawlDbStatReducer implements Reducer {
+  public static class CrawlDbStatReducer implements Reducer<Text, LongWritable, Text, LongWritable> {
     public void configure(JobConf job) {}
     public void close() {}
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+    public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter)
             throws IOException {
 
-      String k = ((Text) key).toString();
+      String k = key.toString();
       if (k.equals("T")) {
         // sum all values for this key
         long sum = 0;
         while (values.hasNext()) {
-          sum += ((LongWritable) values.next()).get();
+          sum += values.next().get();
         }
         // output sum
         output.collect(key, new LongWritable(sum));
       } else if (k.startsWith("status") || k.startsWith("retry")) {
         LongWritable cnt = new LongWritable();
         while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+          LongWritable val = values.next();
           cnt.set(cnt.get() + val.get());
         }
         output.collect(key, cnt);
       } else if (k.equals("scx")) {
         LongWritable cnt = new LongWritable(Long.MIN_VALUE);
         while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+          LongWritable val = values.next();
           if (cnt.get() < val.get()) cnt.set(val.get());
         }
         output.collect(key, cnt);
       } else if (k.equals("scn")) {
         LongWritable cnt = new LongWritable(Long.MAX_VALUE);
         while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+          LongWritable val = values.next();
           if (cnt.get() > val.get()) cnt.set(val.get());
         }
         output.collect(key, cnt);
       } else if (k.equals("sct")) {
         LongWritable cnt = new LongWritable();
         while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+          LongWritable val = values.next();
           cnt.set(cnt.get() + val.get());
         }
         output.collect(key, cnt);
@@ -177,19 +174,7 @@
     }
   }
 
-  public static class CrawlDbDumpReducer implements Reducer {
-
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
-      while (values.hasNext()) {
-        output.collect(key, (Writable)values.next());
-      }
-    }
-
-    public void configure(JobConf job) {}
-    public void close() {}
-  }
-  
-  public static class CrawlDbTopNMapper implements Mapper {
+  public static class CrawlDbTopNMapper implements Mapper<Text, CrawlDatum, FloatWritable, Text> {
     private static final FloatWritable fw = new FloatWritable();
     private float min = 0.0f;
     
@@ -200,24 +185,22 @@
       }
     }
     public void close() {}
-    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter)
+    public void map(Text key, CrawlDatum value, OutputCollector<FloatWritable, Text> output, Reporter reporter)
             throws IOException {
-      CrawlDatum datum = (CrawlDatum)value;
-      if (datum.getScore() < min) return; // don't collect low-scoring records
-      fw.set(-datum.getScore()); // reverse sorting order
+      if (value.getScore() < min) return; // don't collect low-scoring records
+      fw.set(-value.getScore()); // reverse sorting order
       output.collect(fw, key); // invert mapping: score -> url
     }
   }
   
-  public static class CrawlDbTopNReducer implements Reducer {
+  public static class CrawlDbTopNReducer implements Reducer<FloatWritable, Text, FloatWritable, Text> {
     private long topN;
     private long count = 0L;
     
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+    public void reduce(FloatWritable key, Iterator<Text> values, OutputCollector<FloatWritable, Text> output, Reporter reporter) throws IOException {
       while (values.hasNext() && count < topN) {
-        FloatWritable fw = (FloatWritable)key;
-        fw.set(-fw.get());
-        output.collect(fw, (Writable)values.next());
+        key.set(-key.get());
+        output.collect(key, values.next());
         count++;
       }
     }
@@ -318,7 +301,7 @@
     Text key = new Text(url);
     CrawlDatum val = new CrawlDatum();
     openReaders(crawlDb, config);
-    CrawlDatum res = (CrawlDatum)MapFileOutputFormat.getEntry(readers, new HashPartitioner(), key, val);
+    CrawlDatum res = (CrawlDatum)MapFileOutputFormat.getEntry(readers, new HashPartitioner<Text, CrawlDatum>(), key, val);
     return res;
   }
 
Index: src/java/org/apache/nutch/crawl/LinkDb.java
===================================================================
--- src/java/org/apache/nutch/crawl/LinkDb.java	(revision 596792)
+++ src/java/org/apache/nutch/crawl/LinkDb.java	(working copy)
@@ -41,7 +41,7 @@
 import org.apache.nutch.util.NutchJob;
 
 /** Maintains an inverted link map, listing incoming links for each url. */
-public class LinkDb extends ToolBase implements Mapper {
+public class LinkDb extends ToolBase implements Mapper<Text, ParseData, Text, Inlinks> {
 
   public static final Log LOG = LogFactory.getLog(LinkDb.class);
 
@@ -74,8 +74,8 @@
 
   public void close() {}
 
-  public void map(WritableComparable key, Writable value,
-                  OutputCollector output, Reporter reporter)
+  public void map(Text key, ParseData value,
+                  OutputCollector<Text, Inlinks> output, Reporter reporter)
     throws IOException {
     String fromUrl = key.toString();
     String fromHost = getHost(fromUrl);
@@ -96,8 +96,7 @@
       }
     }
     if (fromUrl == null) return; // discard all outlinks
-    ParseData parseData = (ParseData)value;
-    Outlink[] outlinks = parseData.getOutlinks();
+    Outlink[] outlinks = value.getOutlinks();
     Inlinks inlinks = new Inlinks();
     for (int i = 0; i < outlinks.length; i++) {
       Outlink outlink = outlinks[i];
Index: src/java/org/apache/nutch/crawl/LinkDbMerger.java
===================================================================
--- src/java/org/apache/nutch/crawl/LinkDbMerger.java	(revision 596792)
+++ src/java/org/apache/nutch/crawl/LinkDbMerger.java	(working copy)
@@ -58,7 +58,7 @@
  * 
  * @author Andrzej Bialecki
  */
-public class LinkDbMerger extends ToolBase implements Reducer {
+public class LinkDbMerger extends ToolBase implements Reducer<Text, Inlinks, Text, Inlinks> {
   private static final Log LOG = LogFactory.getLog(LinkDbMerger.class);
   
   private int maxInlinks;
@@ -71,12 +71,12 @@
     setConf(conf);
   }
 
-  public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<Inlinks> values, OutputCollector<Text, Inlinks> output, Reporter reporter) throws IOException {
 
     Inlinks result = new Inlinks();
 
     while (values.hasNext()) {
-      Inlinks inlinks = (Inlinks)values.next();
+      Inlinks inlinks = values.next();
 
       int end = Math.min(maxInlinks - result.size(), inlinks.size());
       Iterator<Inlink> it = inlinks.iterator();
Index: src/java/org/apache/nutch/crawl/LinkDbFilter.java
===================================================================
--- src/java/org/apache/nutch/crawl/LinkDbFilter.java	(revision 596792)
+++ src/java/org/apache/nutch/crawl/LinkDbFilter.java	(working copy)
@@ -23,8 +23,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -38,7 +36,7 @@
  * 
  * @author Andrzej Bialecki
  */
-public class LinkDbFilter implements Mapper {
+public class LinkDbFilter implements Mapper<Text, Inlinks, Text, Inlinks> {
   public static final String URL_FILTERING = "linkdb.url.filters";
 
   public static final String URL_NORMALIZING = "linkdb.url.normalizer";
@@ -73,7 +71,7 @@
 
   public void close() {}
 
-  public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+  public void map(Text key, Inlinks value, OutputCollector<Text, Inlinks> output, Reporter reporter) throws IOException {
     String url = key.toString();
     Inlinks result = new Inlinks();
     if (normalize) {
@@ -93,8 +91,7 @@
       }
     }
     if (url == null) return; // didn't pass the filters
-    Inlinks inlinks = (Inlinks)value;
-    Iterator<Inlink> it = inlinks.iterator();
+    Iterator<Inlink> it = value.iterator();
     String fromUrl = null;
     while (it.hasNext()) {
       Inlink inlink = it.next();
Index: src/java/org/apache/nutch/crawl/Injector.java
===================================================================
--- src/java/org/apache/nutch/crawl/Injector.java	(revision 596792)
+++ src/java/org/apache/nutch/crawl/Injector.java	(working copy)
@@ -44,7 +44,8 @@
 
 
   /** Normalize and filter injected urls. */
-  public static class InjectMapper implements Mapper {
+  public static class InjectMapper implements
+            Mapper<LongWritable, Text, Text, CrawlDatum> {
     private URLNormalizers urlNormalizers;
     private int interval;
     private float scoreInjected;
@@ -65,8 +66,8 @@
 
     public void close() {}
 
-    public void map(WritableComparable key, Writable val,
-                    OutputCollector output, Reporter reporter)
+    public void map(LongWritable key, Text val, OutputCollector<Text, CrawlDatum> output, 
+            Reporter reporter)
       throws IOException {
       Text value = (Text)val;
       String url = value.toString();              // value is line of text
@@ -98,17 +99,17 @@
   }
 
   /** Combine multiple new entries for a url. */
-  public static class InjectReducer implements Reducer {
+  public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
     public void configure(JobConf job) {}    
     public void close() {}
 
-    public void reduce(WritableComparable key, Iterator values,
-                       OutputCollector output, Reporter reporter)
+    public void reduce(Text key, Iterator<CrawlDatum> values,
+                       OutputCollector<Text, CrawlDatum> output, Reporter reporter)
       throws IOException {
       CrawlDatum old = null;
       CrawlDatum injected = null;
       while (values.hasNext()) {
-        CrawlDatum val = (CrawlDatum)values.next();
+        CrawlDatum val = values.next();
         if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
           injected = val;
           injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
Index: src/java/org/apache/nutch/crawl/CrawlDbMerger.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDbMerger.java	(revision 596792)
+++ src/java/org/apache/nutch/crawl/CrawlDbMerger.java	(working copy)
@@ -53,7 +53,7 @@
 public class CrawlDbMerger extends ToolBase {
   private static final Log LOG = LogFactory.getLog(CrawlDbMerger.class);
 
-  public static class Merger extends MapReduceBase implements Reducer {
+  public static class Merger extends MapReduceBase implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
     MapWritable meta = new MapWritable();
     private FetchSchedule schedule;
 
@@ -63,13 +63,13 @@
       schedule = FetchScheduleFactory.getFetchSchedule(conf);
     }
 
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+    public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter)
             throws IOException {
       CrawlDatum res = null;
       long resTime = 0L;
       meta.clear();
       while (values.hasNext()) {
-        CrawlDatum val = (CrawlDatum) values.next();
+        CrawlDatum val =  values.next();
         if (res == null) {
           res = val;
           resTime = schedule.calculateLastFetchTime(res);
Index: src/java/org/apache/nutch/crawl/CrawlDbFilter.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDbFilter.java	(revision 596792)
+++ src/java/org/apache/nutch/crawl/CrawlDbFilter.java	(working copy)
@@ -22,8 +22,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -37,7 +35,7 @@
  * 
  * @author Andrzej Bialecki
  */
-public class CrawlDbFilter implements Mapper {
+public class CrawlDbFilter implements Mapper<Text, CrawlDatum, Text, CrawlDatum> {
   public static final String URL_FILTERING = "crawldb.url.filters";
 
   public static final String URL_NORMALIZING = "crawldb.url.normalizers";
@@ -72,7 +70,7 @@
   
   private Text newKey = new Text();
 
-  public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+  public void map(Text key, CrawlDatum value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
 
     String url = key.toString();
     if (urlNormalizers) {
Index: src/java/org/apache/nutch/crawl/PartitionUrlByHost.java
===================================================================
--- src/java/org/apache/nutch/crawl/PartitionUrlByHost.java	(revision 596792)
+++ src/java/org/apache/nutch/crawl/PartitionUrlByHost.java	(working copy)
@@ -27,7 +27,7 @@
 import org.apache.nutch.net.URLNormalizers;
 
 /** Partition urls by hostname. */
-public class PartitionUrlByHost implements Partitioner {
+public class PartitionUrlByHost implements Partitioner<Text, Writable> {
   private static final Log LOG = LogFactory.getLog(PartitionUrlByHost.class);
   
   private int seed;
@@ -41,7 +41,7 @@
   public void close() {}
 
   /** Hash by hostname. */
-  public int getPartition(WritableComparable key, Writable value,
+  public int getPartition(Text key, Writable value,
                           int numReduceTasks) {
     String urlString = ((Text)key).toString();
     try {
Index: src/java/org/apache/nutch/crawl/Generator.java
===================================================================
--- src/java/org/apache/nutch/crawl/Generator.java	(revision 596792)
+++ src/java/org/apache/nutch/crawl/Generator.java	(working copy)
@@ -81,7 +81,10 @@
   }
 
   /** Selects entries due for fetch. */
-  public static class Selector implements Mapper, Partitioner, Reducer {
+  public static class Selector implements
+            Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>,
+            Partitioner<WritableComparable, Writable>,
+            Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
     private LongWritable genTime = new LongWritable(System.currentTimeMillis());
     private long curTime;
     private long limit;
@@ -89,7 +92,7 @@
     private HashMap<String, IntWritable> hostCounts =
       new HashMap<String, IntWritable>();
     private int maxPerHost;
-    private Partitioner hostPartitioner = new PartitionUrlByHost();
+    private Partitioner<Text, Writable> hostPartitioner = new PartitionUrlByHost();
     private URLFilters filters;
     private URLNormalizers normalizers;
     private ScoringFilters scfilters;
@@ -120,10 +123,10 @@
     public void close() {}
 
     /** Select & invert subset due for fetch. */
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector output, Reporter reporter)
+    public void map(Text key, CrawlDatum value,
+                    OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter)
       throws IOException {
-      Text url = (Text)key;
+      Text url = key;
       if (filter) {
         // If filtering is on don't generate URLs that don't pass URLFilters
         try {
@@ -151,7 +154,7 @@
       }
       float sort = 1.0f;
       try {
-        sort = scfilters.generatorSortValue((Text)key, crawlDatum, sort);
+        sort = scfilters.generatorSortValue(key, crawlDatum, sort);
       } catch (ScoringFilterException sfe) {
         if (LOG.isWarnEnabled()) {
           LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe);
@@ -162,7 +165,7 @@
       // record generation time
       crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
       entry.datum = crawlDatum;
-      entry.url = (Text)key;
+      entry.url = key;
       output.collect(sortValue, entry);          // invert for sort by score
     }
 
@@ -174,13 +177,13 @@
     }
 
     /** Collect until limit is reached. */
-    public void reduce(WritableComparable key, Iterator values,
-                       OutputCollector output, Reporter reporter)
+    public void reduce(FloatWritable key, Iterator<SelectorEntry> values,
+                       OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter)
       throws IOException {
 
       while (values.hasNext() && count < limit) {
 
-        SelectorEntry entry = (SelectorEntry)values.next();
+        SelectorEntry entry = values.next();
         Text url = entry.url;
 
         if (maxPerHost > 0) {                     // are we counting hosts?
@@ -263,11 +266,11 @@
     }
   }
 
-  public static class SelectorInverseMapper extends MapReduceBase implements Mapper {
+  public static class SelectorInverseMapper extends MapReduceBase implements
+            Mapper<WritableComparable, SelectorEntry, Text, CrawlDatum> {
 
-    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
-      SelectorEntry entry = (SelectorEntry)value;
-      output.collect(entry.url, entry.datum);
+    public void map(WritableComparable key, SelectorEntry value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
+      output.collect(value.url, value.datum);
     }
   }
 
@@ -304,27 +307,29 @@
   /**
    * Update the CrawlDB so that the next generate won't include the same URLs.
    */
-  public static class CrawlDbUpdater extends MapReduceBase implements Mapper, Reducer {
+  public static class CrawlDbUpdater extends MapReduceBase implements
+            Mapper<WritableComparable, Writable, Text, CrawlDatum>,
+            Reducer<Text, CrawlDatum, Text, CrawlDatum> {
     long generateTime;
     
     public void configure(JobConf job) {
       generateTime = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
     }
     
-    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+    public void map(WritableComparable key, Writable value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
       if (key instanceof FloatWritable) { // tempDir source
         SelectorEntry se = (SelectorEntry)value;
         output.collect(se.url, se.datum);
       } else {
-        output.collect(key, value);
+        output.collect((Text)key, (CrawlDatum)value);
       }
     }
 
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+    public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
       CrawlDatum orig = null;
       LongWritable genTime = null;
       while (values.hasNext()) {
-        CrawlDatum val = (CrawlDatum)values.next();
+        CrawlDatum val = values.next();
         if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
           genTime = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
           if (genTime.get() != generateTime) {
Index: src/java/org/apache/nutch/crawl/CrawlDbReducer.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDbReducer.java	(revision 596792)
+++ src/java/org/apache/nutch/crawl/CrawlDbReducer.java	(working copy)
@@ -32,7 +32,7 @@
 import org.apache.nutch.scoring.ScoringFilters;
 
 /** Merge new page entries with existing entries. */
-public class CrawlDbReducer implements Reducer {
+public class CrawlDbReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
   public static final Log LOG = LogFactory.getLog(CrawlDbReducer.class);
   
   private int retryMax;
@@ -55,8 +55,8 @@
 
   public void close() {}
 
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter)
+  public void reduce(Text key, Iterator<CrawlDatum> values,
+                     OutputCollector<Text, CrawlDatum> output, Reporter reporter)
     throws IOException {
 
     CrawlDatum fetch = null;
@@ -65,7 +65,7 @@
     linked.clear();
 
     while (values.hasNext()) {
-      CrawlDatum datum = (CrawlDatum)values.next();
+      CrawlDatum datum = values.next();
       if (CrawlDatum.hasDbStatus(datum)) {
         if (old == null) {
           old = datum;
@@ -141,10 +141,10 @@
       if (old != null) {                          // if old exists
         result.set(old);                          // use it
       } else {
-        result = schedule.initializeSchedule((Text)key, result);
+        result = schedule.initializeSchedule(key, result);
         result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
         try {
-          scfilters.initialScore((Text)key, result);
+          scfilters.initialScore(key, result);
         } catch (ScoringFilterException e) {
           if (LOG.isWarnEnabled()) {
             LOG.warn("Cannot filter init score for url " + key +
@@ -173,7 +173,7 @@
         }
       }
       // set the schedule
-      result = schedule.setFetchSchedule((Text)key, result, prevFetchTime,
+      result = schedule.setFetchSchedule(key, result, prevFetchTime,
           prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(), modified);
       // set the result status and signature
       if (modified == FetchSchedule.STATUS_NOTMODIFIED) {
@@ -202,7 +202,7 @@
       // NOTMODIFIED state, when the old fetched copy was already removed with
       // old segments.
       if (maxInterval < result.getFetchInterval())
-        result = schedule.forceRefetch((Text)key, result, false);
+        result = schedule.forceRefetch(key, result, false);
       break;
     case CrawlDatum.STATUS_SIGNATURE:
       if (LOG.isWarnEnabled()) {
@@ -217,7 +217,7 @@
       } else {
         result.setStatus(CrawlDatum.STATUS_DB_GONE);
       }
-      result = schedule.setPageRetrySchedule((Text)key, result, prevFetchTime,
+      result = schedule.setPageRetrySchedule(key, result, prevFetchTime,
           prevModifiedTime, fetch.getFetchTime());
       break;
 
@@ -225,7 +225,7 @@
       if (old != null)
         result.setSignature(old.getSignature());  // use old signature
       result.setStatus(CrawlDatum.STATUS_DB_GONE);
-      result = schedule.setPageGoneSchedule((Text)key, result, prevFetchTime,
+      result = schedule.setPageGoneSchedule(key, result, prevFetchTime,
           prevModifiedTime, fetch.getFetchTime());
       break;
 
@@ -234,7 +234,7 @@
     }
 
     try {
-      scfilters.updateDbScore((Text)key, old, result, linked);
+      scfilters.updateDbScore(key, old, result, linked);
     } catch (Exception e) {
       if (LOG.isWarnEnabled()) {
         LOG.warn("Couldn't update score, key=" + key + ": " + e);
Index: src/java/org/apache/nutch/parse/ParseOutputFormat.java
===================================================================
--- src/java/org/apache/nutch/parse/ParseOutputFormat.java	(revision 596792)
+++ src/java/org/apache/nutch/parse/ParseOutputFormat.java	(working copy)
@@ -44,7 +44,7 @@
 import org.apache.hadoop.util.Progressable;
 
 /* Parse content in a segment. */
-public class ParseOutputFormat implements OutputFormat {
+public class ParseOutputFormat implements OutputFormat<Text, Writable> {
   private static final Log LOG = LogFactory.getLog(ParseOutputFormat.class);
 
   private URLFilters filters;
@@ -79,7 +79,7 @@
       throw new IOException("Segment already parsed!");
   }
 
-  public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
+  public RecordWriter<Text, Writable> getRecordWriter(FileSystem fs, JobConf job,
                                       String name, Progressable progress) throws IOException {
 
     this.filters = new URLFilters(job);
@@ -111,10 +111,10 @@
       SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class,
           compType, progress);
     
-    return new RecordWriter() {
+    return new RecordWriter<Text, Writable>() {
 
 
-        public void write(WritableComparable key, Writable value)
+        public void write(Text key, Writable value)
           throws IOException {
           
           Parse parse = (Parse)value;
Index: src/java/org/apache/nutch/parse/ParseSegment.java
===================================================================
--- src/java/org/apache/nutch/parse/ParseSegment.java	(revision 596792)
+++ src/java/org/apache/nutch/parse/ParseSegment.java	(working copy)
@@ -23,6 +23,7 @@
 import org.apache.nutch.crawl.SignatureFactory;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.conf.*;
 import org.apache.nutch.metadata.Nutch;
@@ -33,11 +34,11 @@
 import org.apache.hadoop.fs.Path;
 
 import java.io.*;
-import java.util.*;
 import java.util.Map.Entry;
 
 /* Parse content in a segment. */
-public class ParseSegment extends Configured implements Mapper, Reducer {
+public class ParseSegment extends Configured implements
+        Mapper<WritableComparable, Content, Text, ParseImpl> {
 
   public static final Log LOG = LogFactory.getLog(Parser.class);
   
@@ -60,19 +61,17 @@
   
   private Text newKey = new Text();
 
-  public void map(WritableComparable key, Writable value,
-                  OutputCollector output, Reporter reporter)
+  public void map(WritableComparable key, Content value,
+                  OutputCollector<Text, ParseImpl> output, Reporter reporter)
     throws IOException {
     // convert on the fly from old UTF8 keys
     if (key instanceof UTF8) {
       newKey.set(key.toString());
       key = newKey;
     }
-    Content content = (Content) value;
-
     ParseResult parseResult = null;
     try {
-      parseResult = new ParseUtil(getConf()).parse(content);
+      parseResult = new ParseUtil(getConf()).parse(value);
     } catch (Exception e) {
       LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
       return;
@@ -94,12 +93,12 @@
 
       // compute the new signature
       byte[] signature = 
-        SignatureFactory.getSignature(getConf()).calculate(content, parse); 
+        SignatureFactory.getSignature(getConf()).calculate(value, parse); 
       parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, 
           StringUtil.toHexString(signature));
       
       try {
-        scfilters.passScoreAfterParsing(url, content, parse);
+        scfilters.passScoreAfterParsing(url, value, parse);
       } catch (ScoringFilterException e) {
         if (LOG.isWarnEnabled()) {
           e.printStackTrace(LogUtil.getWarnStream(LOG));
@@ -111,12 +110,6 @@
     }
   }
 
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter)
-    throws IOException {
-    output.collect(key, (Writable)values.next()); // collect first value
-  }
-
   public void parse(Path segment) throws IOException {
 
     if (LOG.isInfoEnabled()) {
@@ -131,7 +124,7 @@
     job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
     job.setInputFormat(SequenceFileInputFormat.class);
     job.setMapperClass(ParseSegment.class);
-    job.setReducerClass(ParseSegment.class);
+    job.setReducerClass(IdentityReducer.class);
     
     job.setOutputPath(segment);
     job.setOutputFormat(ParseOutputFormat.class);
