Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/fetcher/Fetcher.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/fetcher/Fetcher.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/fetcher/Fetcher.java	(working copy)
@@ -29,8 +29,7 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
 
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.NutchWritable;
@@ -45,7 +44,7 @@
 
 
 /** The fetcher. Most of the work is done by plugins. */
-public class Fetcher extends ToolBase implements MapRunnable { 
+public class Fetcher extends Configured implements Tool, MapRunnable<WritableComparable, Writable, Text, NutchWritable> { 
 
   public static final Log LOG = LogFactory.getLog(Fetcher.class);
   
@@ -55,7 +54,7 @@
 
   public static final String PROTOCOL_REDIR = "protocol";
 
-  public static class InputFormat extends SequenceFileInputFormat {
+  public static class InputFormat extends SequenceFileInputFormat<WritableComparable, Writable> {
     /** Don't split inputs, to keep things polite. */
     public InputSplit[] getSplits(JobConf job, int nSplits)
       throws IOException {
@@ -63,14 +62,14 @@
       FileSystem fs = FileSystem.get(job);
       InputSplit[] splits = new InputSplit[files.length];
       for (int i = 0; i < files.length; i++) {
-        splits[i] = new FileSplit(files[i], 0, fs.getLength(files[i]), job);
+        splits[i] = new FileSplit(files[i], 0, fs.getFileStatus(files[i]).getLen(), job);
       }
       return splits;
     }
   }
 
-  private RecordReader input;
-  private OutputCollector output;
+  private RecordReader<WritableComparable, Writable> input;
+  private OutputCollector<Text, NutchWritable> output;
   private Reporter reporter;
 
   private String segmentName;
@@ -456,7 +455,7 @@
     return conf.getBoolean("fetcher.store.content", true);
   }
 
-  public void run(RecordReader input, OutputCollector output,
+  public void run(RecordReader<WritableComparable, Writable> input, OutputCollector<Text, NutchWritable> output,
                   Reporter reporter) throws IOException {
 
     this.input = input;
@@ -530,7 +529,7 @@
 
   /** Run the fetcher. */
   public static void main(String[] args) throws Exception {
-    int res = new Fetcher().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new Fetcher(), args);
     System.exit(res);
   }
   
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/DeleteDuplicates.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/DeleteDuplicates.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/DeleteDuplicates.java	(working copy)
@@ -28,9 +28,7 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
 
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
@@ -63,8 +61,8 @@
  * 
  * @author Andrzej Bialecki
  */
-public class DeleteDuplicates extends ToolBase
-  implements Mapper, Reducer, OutputFormat {
+public class DeleteDuplicates extends Configured
+  implements Tool, Mapper<WritableComparable, Writable, Text, IntWritable>, Reducer<Text, IntWritable, WritableComparable, Writable>, OutputFormat<WritableComparable, Writable> {
   private static final Log LOG = LogFactory.getLog(DeleteDuplicates.class);
 
 //   Algorithm:
@@ -141,7 +139,7 @@
 
   }
 
-  public static class InputFormat extends InputFormatBase {
+  public static class InputFormat extends FileInputFormat<Text, IndexDoc> {
     private static final long INDEX_LENGTH = Integer.MAX_VALUE;
 
     /** Return each index as a split. */
@@ -155,7 +153,7 @@
       return splits;
     }
 
-    public class DDRecordReader implements RecordReader {
+    public class DDRecordReader implements RecordReader<Text, IndexDoc> {
 
       private IndexReader indexReader;
       private int maxDoc = 0;
@@ -174,7 +172,7 @@
         this.index = index;
       }
 
-      public boolean next(WritableComparable key, Writable value)
+      public boolean next(Text key, IndexDoc indexDoc)
         throws IOException {
         
         // skip empty indexes
@@ -189,9 +187,8 @@
         Document document = indexReader.document(doc);
 
         // fill in key
-        ((Text)key).set(document.get("url"));
+        key.set(document.get("url"));
         // fill in value
-        IndexDoc indexDoc = (IndexDoc)value;
         indexDoc.keep = true;
         indexDoc.url.set(document.get("url"));
         indexDoc.hash.setDigest(document.get("digest"));
@@ -226,11 +223,11 @@
         indexReader.close();
       }
       
-      public WritableComparable createKey() {
+      public Text createKey() {
         return new Text();
       }
       
-      public Writable createValue() {
+      public IndexDoc createValue() {
         return new IndexDoc();
       }
 
@@ -240,7 +237,7 @@
     }
     
     /** Return each index as a split. */
-    public RecordReader getRecordReader(InputSplit split,
+    public RecordReader<Text, IndexDoc> getRecordReader(InputSplit split,
                                         JobConf job,
                                         Reporter reporter) throws IOException {
       FileSplit fsplit = (FileSplit)split;
@@ -250,27 +247,27 @@
     }
   }
   
-  public static class HashPartitioner implements Partitioner {
+  public static class HashPartitioner implements Partitioner<MD5Hash, Writable> {
     public void configure(JobConf job) {}
     public void close() {}
-    public int getPartition(WritableComparable key, Writable value,
+    public int getPartition(MD5Hash key, Writable value,
                             int numReduceTasks) {
-      int hashCode = ((MD5Hash)key).hashCode();
+      int hashCode = key.hashCode();
       return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
     }
   }
 
-  public static class UrlsReducer implements Reducer {
+  public static class UrlsReducer implements Reducer<Text, IndexDoc, MD5Hash, IndexDoc> {
     
     public void configure(JobConf job) {}
     
     public void close() {}
     
-    public void reduce(WritableComparable key, Iterator values,
-        OutputCollector output, Reporter reporter) throws IOException {
+    public void reduce(Text key, Iterator<IndexDoc> values,
+        OutputCollector<MD5Hash, IndexDoc> output, Reporter reporter) throws IOException {
       IndexDoc latest = null;
       while (values.hasNext()) {
-        IndexDoc value = (IndexDoc)values.next();
+        IndexDoc value = values.next();
         if (latest == null) {
           latest = value;
           continue;
@@ -296,7 +293,7 @@
     }
   }
   
-  public static class HashReducer implements Reducer {
+  public static class HashReducer implements Reducer<MD5Hash, IndexDoc, Text, IndexDoc> {
     boolean byScore;
     
     public void configure(JobConf job) {
@@ -304,12 +301,12 @@
     }
     
     public void close() {}
-    public void reduce(WritableComparable key, Iterator values,
-                       OutputCollector output, Reporter reporter)
+    public void reduce(MD5Hash key, Iterator<IndexDoc> values,
+                       OutputCollector<Text, IndexDoc> output, Reporter reporter)
       throws IOException {
       IndexDoc highest = null;
       while (values.hasNext()) {
-        IndexDoc value = (IndexDoc)values.next();
+        IndexDoc value = values.next();
         // skip already deleted
         if (!value.keep) {
           LOG.debug("-discard " + value + " (already marked)");
@@ -355,7 +352,7 @@
   public void setConf(Configuration conf) {
     super.setConf(conf);
     try {
-      fs = FileSystem.get(conf);
+      if(conf != null) fs = FileSystem.get(conf);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -365,7 +362,7 @@
 
   /** Map [*,IndexDoc] pairs to [index,doc] pairs. */
   public void map(WritableComparable key, Writable value,
-                  OutputCollector output, Reporter reporter)
+                  OutputCollector<Text, IntWritable> output, Reporter reporter)
     throws IOException {
     IndexDoc indexDoc = (IndexDoc)value;
     // don't delete these
@@ -375,14 +372,14 @@
   }
 
   /** Delete docs named in values from index named in key. */
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter)
+  public void reduce(Text key, Iterator<IntWritable> values,
+                     OutputCollector<WritableComparable, Writable> output, Reporter reporter)
     throws IOException {
     Path index = new Path(key.toString());
     IndexReader reader = IndexReader.open(new FsDirectory(fs, index, false, getConf()));
     try {
       while (values.hasNext()) {
-        IntWritable value = (IntWritable)values.next();
+        IntWritable value = values.next();
         LOG.debug("-delete " + index + " doc=" + value);
         reader.deleteDocument(value.get());
       }
@@ -392,11 +389,11 @@
   }
 
   /** Write nothing. */
-  public RecordWriter getRecordWriter(final FileSystem fs,
+  public RecordWriter<WritableComparable, Writable> getRecordWriter(final FileSystem fs,
                                       final JobConf job,
                                       final String name,
                                       final Progressable progress) throws IOException {
-    return new RecordWriter() {                   
+    return new RecordWriter<WritableComparable, Writable>() {                   
         public void write(WritableComparable key, Writable value)
           throws IOException {
           throw new UnsupportedOperationException();
@@ -496,7 +493,7 @@
   }
 
   public static void main(String[] args) throws Exception {
-    int res = new DeleteDuplicates().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new DeleteDuplicates(), args);
     System.exit(res);
   }
   
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexSorter.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexSorter.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexSorter.java	(working copy)
@@ -32,12 +32,12 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.*;
 
 /** Sort a Nutch index by page score.  Higher scoring documents are assigned
  * smaller document numbers. */
-public class IndexSorter extends ToolBase {
+public class IndexSorter extends Configured implements Tool {
   private static final Log LOG = LogFactory.getLog(IndexSorter.class);
   
   private static class PostingMap implements Comparable<PostingMap> {
@@ -296,7 +296,7 @@
 
   /** */
   public static void main(String[] args) throws Exception {
-    int res = new IndexSorter().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new IndexSorter(), args);
     System.exit(res);
   }
   
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexMerger.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexMerger.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexMerger.java	(working copy)
@@ -25,8 +25,7 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
 import org.apache.hadoop.conf.*;
 
 import org.apache.nutch.util.LogUtil;
@@ -42,7 +41,7 @@
  * @author Doug Cutting
  * @author Mike Cafarella
  *************************************************************************/
-public class IndexMerger extends ToolBase {
+public class IndexMerger extends Configured implements Tool {
   public static final Log LOG = LogFactory.getLog(IndexMerger.class);
 
   public static final String DONE_NAME = "merge.done";
@@ -80,17 +79,17 @@
     Directory[] dirs = new Directory[indexes.length];
     for (int i = 0; i < indexes.length; i++) {
       if (LOG.isInfoEnabled()) { LOG.info("Adding " + indexes[i]); }
-      dirs[i] = new FsDirectory(fs, indexes[i], false, this.conf);
+      dirs[i] = new FsDirectory(fs, indexes[i], false, getConf());
     }
 
     //
     // Merge indices
     //
     IndexWriter writer = new IndexWriter(localOutput.toString(), null, true);
-    writer.setMergeFactor(conf.getInt("indexer.mergeFactor", IndexWriter.DEFAULT_MERGE_FACTOR));
-    writer.setMaxBufferedDocs(conf.getInt("indexer.minMergeDocs", IndexWriter.DEFAULT_MAX_BUFFERED_DOCS));
-    writer.setMaxMergeDocs(conf.getInt("indexer.maxMergeDocs", IndexWriter.DEFAULT_MAX_MERGE_DOCS));
-    writer.setTermIndexInterval(conf.getInt("indexer.termIndexInterval", IndexWriter.DEFAULT_TERM_INDEX_INTERVAL));
+    writer.setMergeFactor(getConf().getInt("indexer.mergeFactor", IndexWriter.DEFAULT_MERGE_FACTOR));
+    writer.setMaxBufferedDocs(getConf().getInt("indexer.minMergeDocs", IndexWriter.DEFAULT_MAX_BUFFERED_DOCS));
+    writer.setMaxMergeDocs(getConf().getInt("indexer.maxMergeDocs", IndexWriter.DEFAULT_MAX_MERGE_DOCS));
+    writer.setTermIndexInterval(getConf().getInt("indexer.termIndexInterval", IndexWriter.DEFAULT_TERM_INDEX_INTERVAL));
     writer.setInfoStream(LogUtil.getDebugStream(LOG));
     writer.setUseCompoundFile(false);
     writer.setSimilarity(new NutchSimilarity());
@@ -108,7 +107,7 @@
    * Create an index for the input files in the named directory. 
    */
   public static void main(String[] args) throws Exception {
-    int res = new IndexMerger().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new IndexMerger(), args);
     System.exit(res);
   }
   
@@ -122,7 +121,7 @@
     //
     // Parse args, read all index directories to be processed
     //
-    FileSystem fs = FileSystem.get(conf);
+    FileSystem fs = FileSystem.get(getConf());
     List<Path> indexDirs = new ArrayList<Path>();
 
     Path workDir = new Path("indexmerger-" + System.currentTimeMillis());  
@@ -151,7 +150,7 @@
       LOG.fatal("IndexMerger: " + StringUtils.stringifyException(e));
       return -1;
     } finally {
-      FileSystem.getLocal(conf).delete(workDir);
+      FileSystem.getLocal(getConf()).delete(workDir);
     }
   }
 }
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/Indexer.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/Indexer.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/Indexer.java	(working copy)
@@ -27,9 +27,7 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
 import org.apache.nutch.parse.*;
 import org.apache.nutch.analysis.*;
 
@@ -51,7 +49,7 @@
 import org.apache.nutch.metadata.Nutch;
 
 /** Create indexes for segments. */
-public class Indexer extends ToolBase implements Reducer, Mapper {
+public class Indexer extends Configured implements Tool, Reducer<Text, NutchWritable, Text, Writable>, Mapper<Text, Writable, Text, NutchWritable> {
   
   public static final String DONE_NAME = "index.done";
 
@@ -85,8 +83,8 @@
 
   /** Unwrap Lucene Documents created by reduce and add them to an index. */
   public static class OutputFormat
-    extends org.apache.hadoop.mapred.OutputFormatBase {
-    public RecordWriter getRecordWriter(final FileSystem fs, JobConf job,
+    extends org.apache.hadoop.mapred.OutputFormatBase<WritableComparable, LuceneDocumentWrapper> {
+    public RecordWriter<WritableComparable, LuceneDocumentWrapper> getRecordWriter(final FileSystem fs, JobConf job,
                                         String name, final Progressable progress) throws IOException {
       final Path perm = new Path(job.getOutputPath(), name);
       final Path temp =
@@ -109,12 +107,12 @@
       writer.setUseCompoundFile(false);
       writer.setSimilarity(new NutchSimilarity());
 
-      return new RecordWriter() {
+      return new RecordWriter<WritableComparable, LuceneDocumentWrapper>() {
           boolean closed;
 
-          public void write(WritableComparable key, Writable value)
+          public void write(WritableComparable key, LuceneDocumentWrapper value)
             throws IOException {                  // unwrap & index doc
-            Document doc = ((LuceneDocumentWrapper) value).get();
+            Document doc = value.get();
             NutchAnalyzer analyzer = factory.get(doc.get("lang"));
             if (LOG.isInfoEnabled()) {
               LOG.info(" Indexing [" + doc.getField("url").stringValue() + "]" +
@@ -174,8 +172,8 @@
 
   public void close() {}
 
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter)
+  public void reduce(Text key, Iterator<NutchWritable> values,
+                     OutputCollector<Text, Writable> output, Reporter reporter)
     throws IOException {
     Inlinks inlinks = null;
     CrawlDatum dbDatum = null;
@@ -183,7 +181,7 @@
     ParseData parseData = null;
     ParseText parseText = null;
     while (values.hasNext()) {
-      Writable value = ((NutchWritable)values.next()).get(); // unwrap
+      Writable value = values.next().get(); // unwrap
       if (value instanceof Inlinks) {
         inlinks = (Inlinks)value;
       } else if (value instanceof CrawlDatum) {
@@ -248,7 +246,7 @@
         fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);
       }
       // run indexing filters
-      doc = this.filters.filter(doc, parse, (Text)key, fetchDatum, inlinks);
+      doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);
     } catch (IndexingException e) {
       if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); }
       return;
@@ -315,7 +313,7 @@
   }
 
   public static void main(String[] args) throws Exception {
-    int res = new Indexer().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new Indexer(), args);
     System.exit(res);
   }
   
@@ -341,8 +339,8 @@
     }
   }
 
-  public void map(WritableComparable key, Writable value,
-      OutputCollector output, Reporter reporter) throws IOException {
+  public void map(Text key, Writable value,
+      OutputCollector<Text, NutchWritable> output, Reporter reporter) throws IOException {
     output.collect(key, new NutchWritable(value));
   }
 
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java	(working copy)
@@ -24,18 +24,18 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.NutchWritable;
 import org.apache.nutch.crawl.SignatureFactory;
@@ -65,11 +65,10 @@
  * <p>Arc files are tars of compressed gzips which are produced by both the
  * internet archive project and the grub distributed crawler project.</p>
  * 
- * TODO: This class needs to be changed to use ToolRunner instead of ToolBase.
  */
 public class ArcSegmentCreator
-  extends ToolBase
-  implements Mapper {
+  extends Configured
+  implements Tool, Mapper<Text, BytesWritable, Text, NutchWritable> {
 
   public static final Log LOG = LogFactory.getLog(ArcSegmentCreator.class);
   public static final String URL_VERSION = "arc.url.version";
@@ -145,7 +144,7 @@
    * 
    * @return The result of the parse in a ParseStatus object.
    */
-  private ParseStatus output(OutputCollector output, String segmentName,
+  private ParseStatus output(OutputCollector<Text, NutchWritable> output, String segmentName,
     Text key, CrawlDatum datum, Content content, ProtocolStatus pstatus,
     int status) {
 
@@ -184,7 +183,7 @@
       // set the content signature
       if (parseResult == null) {
         byte[] signature = SignatureFactory.getSignature(getConf()).calculate(
-          content, new ParseStatus().getEmptyParse(conf));
+          content, new ParseStatus().getEmptyParse(getConf()));
         datum.setSignature(signature);
       }
 
@@ -266,12 +265,12 @@
    * segments.</p>
    * 
    * @param key The arc record header.
-   * @param value The arc record raw content bytes.
+   * @param bytes The arc record raw content bytes.
    * @param output The output collecter.
    * @param reporter The progress reporter.
    */
-  public void map(WritableComparable key, Writable value,
-    OutputCollector output, Reporter reporter)
+  public void map(Text key, BytesWritable bytes,
+    OutputCollector<Text, NutchWritable> output, Reporter reporter)
     throws IOException {
 
     String[] headers = key.toString().split("\\s+");
@@ -289,7 +288,6 @@
 
     // get the raw  bytes from the arc file, create a new crawldatum
     Text url = new Text();
-    BytesWritable bytes = (BytesWritable)value;
     CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_DB_FETCHED, interval,
       1.0f);
     String segmentName = getConf().get(Nutch.SEGMENT_NAME_KEY);
@@ -371,7 +369,7 @@
 
   public static void main(String args[])
     throws Exception {
-    int res = new ArcSegmentCreator().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new ArcSegmentCreator(), args);
     System.exit(res);
   }
 
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/FreeGenerator.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/FreeGenerator.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/FreeGenerator.java	(working copy)
@@ -23,8 +23,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -36,7 +36,8 @@
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Generator;
 import org.apache.nutch.crawl.PartitionUrlByHost;
@@ -53,13 +54,13 @@
  * 
  * @author Andrzej Bialecki
  */
-public class FreeGenerator extends ToolBase {
+public class FreeGenerator extends Configured implements Tool {
   private static final Log LOG = LogFactory.getLog(FreeGenerator.class);
   
   private static final String FILTER_KEY = "free.generator.filter";
   private static final String NORMALIZE_KEY = "free.generator.normalize";
 
-  public static class FG extends MapReduceBase implements Mapper, Reducer {
+  public static class FG extends MapReduceBase implements Mapper<WritableComparable, Text, Text, CrawlDatum>, Reducer<Text, CrawlDatum, Text, CrawlDatum> {
     private URLNormalizers normalizers = null;
     private URLFilters filters = null;
     private ScoringFilters scfilters;
@@ -78,7 +79,7 @@
       }
     }
 
-    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+    public void map(WritableComparable key, Text value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
       // value is a line of text
       String urlString = value.toString();
       try {
@@ -105,9 +106,9 @@
       output.collect(url, datum);
     }
 
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+    public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
       // pick just one (discard duplicates)
-      output.collect(key, (Writable)values.next());
+      output.collect(key, values.next());
     }
   }
   
@@ -161,7 +162,7 @@
   }
 
   public static void main(String[] args) throws Exception {
-    int res = new FreeGenerator().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new FreeGenerator(), args);
     System.exit(res);
   }
 }
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbReader.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbReader.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbReader.java	(working copy)
@@ -36,8 +36,6 @@
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapFileOutputFormat;
@@ -83,13 +81,12 @@
     }
   }
 
-  public static class CrawlDbStatMapper implements Mapper {
+  public static class CrawlDbStatMapper implements Mapper<Text, CrawlDatum, Text, LongWritable> {
     LongWritable COUNT_1 = new LongWritable(1);
     public void configure(JobConf job) {}
     public void close() {}
-    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter)
+    public void map(Text key, CrawlDatum cd, OutputCollector<Text, LongWritable> output, Reporter reporter)
             throws IOException {
-      CrawlDatum cd = (CrawlDatum) value;
       output.collect(new Text("T"), COUNT_1);
       output.collect(new Text("status " + cd.getStatus()), COUNT_1);
       output.collect(new Text("retry " + cd.getRetriesSinceFetch()), COUNT_1);
@@ -97,19 +94,19 @@
     }
   }
   
-  public static class CrawlDbStatCombiner implements Reducer {
+  public static class CrawlDbStatCombiner implements Reducer<Text, LongWritable, Text, LongWritable> {
     LongWritable val = new LongWritable();
     
     public CrawlDbStatCombiner() { }
     public void configure(JobConf job) { }
     public void close() {}
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+    public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter)
         throws IOException {
       val.set(0L);
-      String k = ((Text)key).toString();
+      String k = key.toString();
       if (!k.equals("s")) {
         while (values.hasNext()) {
-          LongWritable cnt = (LongWritable)values.next();
+          LongWritable cnt = values.next();
           val.set(val.get() + cnt.get());
         }
         output.collect(key, val);
@@ -130,66 +127,54 @@
     }
   }
 
-  public static class CrawlDbStatReducer implements Reducer {
+  public static class CrawlDbStatReducer implements Reducer<Text, LongWritable, Text, LongWritable> {
     public void configure(JobConf job) {}
     public void close() {}
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+    public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter)
             throws IOException {
 
-      String k = ((Text) key).toString();
+      String k = key.toString();
       if (k.equals("T")) {
         // sum all values for this key
         long sum = 0;
         while (values.hasNext()) {
-          sum += ((LongWritable) values.next()).get();
+          sum += values.next().get();
         }
         // output sum
         output.collect(key, new LongWritable(sum));
       } else if (k.startsWith("status") || k.startsWith("retry")) {
         LongWritable cnt = new LongWritable();
         while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+          LongWritable val = values.next();
           cnt.set(cnt.get() + val.get());
         }
         output.collect(key, cnt);
       } else if (k.equals("scx")) {
         LongWritable cnt = new LongWritable(Long.MIN_VALUE);
         while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+          LongWritable val = values.next();
           if (cnt.get() < val.get()) cnt.set(val.get());
         }
         output.collect(key, cnt);
       } else if (k.equals("scn")) {
         LongWritable cnt = new LongWritable(Long.MAX_VALUE);
         while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+          LongWritable val = values.next();
           if (cnt.get() > val.get()) cnt.set(val.get());
         }
         output.collect(key, cnt);
       } else if (k.equals("sct")) {
         LongWritable cnt = new LongWritable();
         while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+          LongWritable val = values.next();
           cnt.set(cnt.get() + val.get());
         }
         output.collect(key, cnt);
       }
     }
   }
-
-  public static class CrawlDbDumpReducer implements Reducer {
-
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
-      while (values.hasNext()) {
-        output.collect(key, (Writable)values.next());
-      }
-    }
-
-    public void configure(JobConf job) {}
-    public void close() {}
-  }
-  
-  public static class CrawlDbTopNMapper implements Mapper {
+ 
+  public static class CrawlDbTopNMapper implements Mapper<Text, CrawlDatum, FloatWritable, Text> {
     private static final FloatWritable fw = new FloatWritable();
     private float min = 0.0f;
     
@@ -200,24 +185,23 @@
       }
     }
     public void close() {}
-    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter)
+    public void map(Text key, CrawlDatum datum, OutputCollector<FloatWritable, Text> output, Reporter reporter)
             throws IOException {
-      CrawlDatum datum = (CrawlDatum)value;
       if (datum.getScore() < min) return; // don't collect low-scoring records
       fw.set(-datum.getScore()); // reverse sorting order
       output.collect(fw, key); // invert mapping: score -> url
     }
   }
   
-  public static class CrawlDbTopNReducer implements Reducer {
+  public static class CrawlDbTopNReducer implements Reducer<FloatWritable, Text, FloatWritable, Text> {
     private long topN;
     private long count = 0L;
     
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+    public void reduce(FloatWritable key, Iterator<Text> values, OutputCollector<FloatWritable, Text> output, Reporter reporter) throws IOException {
       while (values.hasNext() && count < topN) {
         FloatWritable fw = (FloatWritable)key;
         fw.set(-fw.get());
-        output.collect(fw, (Writable)values.next());
+        output.collect(fw, values.next());
         count++;
       }
     }
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDb.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDb.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDb.java	(working copy)
@@ -30,8 +30,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
 
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.net.URLNormalizers;
@@ -41,7 +40,7 @@
 import org.apache.nutch.util.NutchJob;
 
 /** Maintains an inverted link map, listing incoming links for each url. */
-public class LinkDb extends ToolBase implements Mapper {
+public class LinkDb extends Configured implements Tool, Mapper<Text, ParseData, Text, Inlinks> {
 
   public static final Log LOG = LogFactory.getLog(LinkDb.class);
 
@@ -53,9 +52,7 @@
   private URLFilters urlFilters;
   private URLNormalizers urlNormalizers;
   
-  public LinkDb() {
-    
-  }
+  public LinkDb() {}
   
   public LinkDb(Configuration conf) {
     setConf(conf);
@@ -74,8 +71,8 @@
 
   public void close() {}
 
-  public void map(WritableComparable key, Writable value,
-                  OutputCollector output, Reporter reporter)
+  public void map(Text key, ParseData parseData,
+                  OutputCollector<Text, Inlinks> output, Reporter reporter)
     throws IOException {
     String fromUrl = key.toString();
     String fromHost = getHost(fromUrl);
@@ -96,7 +93,6 @@
       }
     }
     if (fromUrl == null) return; // discard all outlinks
-    ParseData parseData = (ParseData)value;
     Outlink[] outlinks = parseData.getOutlinks();
     Inlinks inlinks = new Inlinks();
     for (int i = 0; i < outlinks.length; i++) {
@@ -149,7 +145,7 @@
     Path[] files = fs.listPaths(segmentsDir, new PathFilter() {
       public boolean accept(Path f) {
         try {
-          if (fs.isDirectory(f)) return true;
+          if (fs.getFileStatus(f).isDir()) return true;
         } catch (IOException ioe) {};
         return false;
       }
@@ -255,7 +251,7 @@
   }
 
   public static void main(String[] args) throws Exception {
-    int res = new LinkDb().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkDb(), args);
     System.exit(res);
   }
   
@@ -271,7 +267,7 @@
       return -1;
     }
     Path segDir = null;
-    final FileSystem fs = FileSystem.get(conf);
+    final FileSystem fs = FileSystem.get(getConf());
     Path db = new Path(args[0]);
     ArrayList<Path> segs = new ArrayList<Path>();
     boolean filter = true;
@@ -283,7 +279,7 @@
         Path[] files = fs.listPaths(segDir, new PathFilter() {
           public boolean accept(Path f) {
             try {
-              if (fs.isDirectory(f)) return true;
+              if (fs.getFileStatus(f).isDir()) return true;
             } catch (IOException ioe) {};
             return false;
           }
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbMerger.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbMerger.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbMerger.java	(working copy)
@@ -24,10 +24,10 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapFileOutputFormat;
@@ -36,7 +36,8 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
 
@@ -58,7 +59,7 @@
  * 
  * @author Andrzej Bialecki
  */
-public class LinkDbMerger extends ToolBase implements Reducer {
+public class LinkDbMerger extends Configured implements Tool, Reducer<Text, Inlinks, Text, Inlinks> {
   private static final Log LOG = LogFactory.getLog(LinkDbMerger.class);
   
   private int maxInlinks;
@@ -71,12 +72,12 @@
     setConf(conf);
   }
 
-  public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<Inlinks> values, OutputCollector<Text, Inlinks> output, Reporter reporter) throws IOException {
 
     Inlinks result = new Inlinks();
 
     while (values.hasNext()) {
-      Inlinks inlinks = (Inlinks)values.next();
+      Inlinks inlinks = values.next();
 
       int end = Math.min(maxInlinks - result.size(), inlinks.size());
       Iterator<Inlink> it = inlinks.iterator();
@@ -135,7 +136,7 @@
    * @param args
    */
   public static void main(String[] args) throws Exception {
-    int res = new LinkDbMerger().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkDbMerger(), args);
     System.exit(res);
   }
   
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Injector.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Injector.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Injector.java	(working copy)
@@ -28,8 +28,7 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
 
 import org.apache.nutch.net.*;
 import org.apache.nutch.scoring.ScoringFilterException;
@@ -39,12 +38,12 @@
 
 /** This class takes a flat file of URLs and adds them to the of pages to be
  * crawled.  Useful for bootstrapping the system. */
-public class Injector extends ToolBase {
+public class Injector extends Configured implements Tool {
   public static final Log LOG = LogFactory.getLog(Injector.class);
 
 
   /** Normalize and filter injected urls. */
-  public static class InjectMapper implements Mapper {
+  public static class InjectMapper implements Mapper<WritableComparable, Text, Text, CrawlDatum> {
     private URLNormalizers urlNormalizers;
     private int interval;
     private float scoreInjected;
@@ -65,12 +64,10 @@
 
     public void close() {}
 
-    public void map(WritableComparable key, Writable val,
-                    OutputCollector output, Reporter reporter)
+    public void map(WritableComparable key, Text value,
+                    OutputCollector<Text, CrawlDatum> output, Reporter reporter)
       throws IOException {
-      Text value = (Text)val;
       String url = value.toString();              // value is line of text
-      // System.out.println("url: " +url);
       try {
         url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
         url = filters.filter(url);             // filter the url
@@ -98,17 +95,17 @@
   }
 
   /** Combine multiple new entries for a url. */
-  public static class InjectReducer implements Reducer {
+  public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
     public void configure(JobConf job) {}    
     public void close() {}
 
-    public void reduce(WritableComparable key, Iterator values,
-                       OutputCollector output, Reporter reporter)
+    public void reduce(Text key, Iterator<CrawlDatum> values,
+                       OutputCollector<Text, CrawlDatum> output, Reporter reporter)
       throws IOException {
       CrawlDatum old = null;
       CrawlDatum injected = null;
       while (values.hasNext()) {
-        CrawlDatum val = (CrawlDatum)values.next();
+        CrawlDatum val = values.next();
         if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
           injected = val;
           injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
@@ -124,9 +121,7 @@
     }
   }
 
-  public Injector() {
-    
-  }
+  public Injector() {}
   
   public Injector(Configuration conf) {
     setConf(conf);
@@ -179,7 +174,7 @@
   }
 
   public static void main(String[] args) throws Exception {
-    int res = new Injector().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args);
     System.exit(res);
   }
   
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDb.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDb.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDb.java	(working copy)
@@ -28,8 +28,7 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
 
 import org.apache.nutch.util.LockUtil;
 import org.apache.nutch.util.NutchConfiguration;
@@ -39,7 +38,7 @@
  * This class takes the output of the fetcher and updates the
  * crawldb accordingly.
  */
-public class CrawlDb extends ToolBase {
+public class CrawlDb extends Configured implements Tool {
   public static final Log LOG = LogFactory.getLog(CrawlDb.class);
 
   public static final String CRAWLDB_ADDITIONS_ALLOWED = "db.update.additions.allowed";
@@ -47,11 +46,8 @@
   public static final String CURRENT_NAME = "current";
   
   public static final String LOCK_NAME = ".locked";
-
   
-  public CrawlDb() {
-    
-  }
+  public CrawlDb() {}
   
   public CrawlDb(Configuration conf) {
     setConf(conf);
@@ -149,7 +145,7 @@
   }
 
   public static void main(String[] args) throws Exception {
-    int res = new CrawlDb().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDb(), args);
     System.exit(res);
   }
 
@@ -184,7 +180,7 @@
         Path[] paths = fs.listPaths(new Path(args[++i]), new PathFilter() {
           public boolean accept(Path dir) {
             try {
-              return fs.isDirectory(dir);
+              return fs.getFileStatus(dir).isDir();
             } catch (IOException ioe) {
               return false;
             }
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbMerger.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbMerger.java	(working copy)
@@ -28,10 +28,9 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.conf.*;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
 
@@ -50,10 +49,10 @@
  * 
  * @author Andrzej Bialecki
  */
-public class CrawlDbMerger extends ToolBase {
+public class CrawlDbMerger extends Configured implements Tool {
   private static final Log LOG = LogFactory.getLog(CrawlDbMerger.class);
 
-  public static class Merger extends MapReduceBase implements Reducer {
+  public static class Merger extends MapReduceBase implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
     MapWritable meta = new MapWritable();
     private FetchSchedule schedule;
 
@@ -63,13 +62,13 @@
       schedule = FetchScheduleFactory.getFetchSchedule(conf);
     }
 
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+    public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter)
             throws IOException {
       CrawlDatum res = null;
       long resTime = 0L;
       meta.clear();
       while (values.hasNext()) {
-        CrawlDatum val = (CrawlDatum) values.next();
+        CrawlDatum val = values.next();
         if (res == null) {
           res = val;
           resTime = schedule.calculateLastFetchTime(res);
@@ -138,7 +137,7 @@
    * @param args
    */
   public static void main(String[] args) throws Exception {
-    int res = new CrawlDbMerger().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDbMerger(), args);
     System.exit(res);
   }
   
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java	(working copy)
@@ -27,7 +27,7 @@
 import org.apache.nutch.net.URLNormalizers;
 
 /** Partition urls by hostname. */
-public class PartitionUrlByHost implements Partitioner {
+public class PartitionUrlByHost implements Partitioner<Text, Writable> {
   private static final Log LOG = LogFactory.getLog(PartitionUrlByHost.class);
   
   private int seed;
@@ -41,9 +41,9 @@
   public void close() {}
 
   /** Hash by hostname. */
-  public int getPartition(WritableComparable key, Writable value,
+  public int getPartition(Text key, Writable value,
                           int numReduceTasks) {
-    String urlString = ((Text)key).toString();
+    String urlString = key.toString();
     try {
       urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_PARTITION);
     } catch (Exception e) {
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Generator.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Generator.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Generator.java	(working copy)
@@ -29,8 +29,7 @@
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -45,7 +44,7 @@
 import org.apache.nutch.util.NutchJob;
 
 /** Generates a subset of a crawl db to fetch. */
-public class Generator extends ToolBase {
+public class Generator extends Configured implements Tool {
 
   public static final String CRAWL_GENERATE_FILTER = "crawl.generate.filter";
   public static final String GENERATE_MAX_PER_HOST_BY_IP = "generate.max.per.host.by.ip";
@@ -81,7 +80,7 @@
   }
 
   /** Selects entries due for fetch. */
-  public static class Selector implements Mapper, Partitioner, Reducer {
+  public static class Selector implements Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>, Partitioner<FloatWritable, Writable>, Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
     private LongWritable genTime = new LongWritable(System.currentTimeMillis());
     private long curTime;
     private long limit;
@@ -89,7 +88,7 @@
     private HashMap<String, IntWritable> hostCounts =
       new HashMap<String, IntWritable>();
     private int maxPerHost;
-    private Partitioner hostPartitioner = new PartitionUrlByHost();
+    private Partitioner<Text, Writable> hostPartitioner = new PartitionUrlByHost();
     private URLFilters filters;
     private URLNormalizers normalizers;
     private ScoringFilters scfilters;
@@ -120,10 +119,10 @@
     public void close() {}
 
     /** Select & invert subset due for fetch. */
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector output, Reporter reporter)
+    public void map(Text key, CrawlDatum value,
+                    OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter)
       throws IOException {
-      Text url = (Text)key;
+      Text url = key;
       if (filter) {
         // If filtering is on don't generate URLs that don't pass URLFilters
         try {
@@ -136,7 +135,7 @@
           }
         }
       }
-      CrawlDatum crawlDatum = (CrawlDatum)value;
+      CrawlDatum crawlDatum = value;
 
       // check fetch schedule
       if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
@@ -167,20 +166,20 @@
     }
 
     /** Partition by host. */
-    public int getPartition(WritableComparable key, Writable value,
+    public int getPartition(FloatWritable key, Writable value,
                             int numReduceTasks) {
       return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
                                           numReduceTasks);
     }
 
     /** Collect until limit is reached. */
-    public void reduce(WritableComparable key, Iterator values,
-                       OutputCollector output, Reporter reporter)
+    public void reduce(FloatWritable key, Iterator<SelectorEntry> values,
+                       OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter)
       throws IOException {
 
       while (values.hasNext() && count < limit) {
 
-        SelectorEntry entry = (SelectorEntry)values.next();
+        SelectorEntry entry = values.next();
         Text url = entry.url;
 
         if (maxPerHost > 0) {                     // are we counting hosts?
@@ -263,9 +262,9 @@
     }
   }
 
-  public static class SelectorInverseMapper extends MapReduceBase implements Mapper {
+  public static class SelectorInverseMapper extends MapReduceBase implements Mapper<FloatWritable, SelectorEntry, Text, CrawlDatum> {
 
-    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+    public void map(FloatWritable key, SelectorEntry value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
       SelectorEntry entry = (SelectorEntry)value;
       output.collect(entry.url, entry.datum);
     }
@@ -304,27 +303,27 @@
   /**
    * Update the CrawlDB so that the next generate won't include the same URLs.
    */
-  public static class CrawlDbUpdater extends MapReduceBase implements Mapper, Reducer {
+  public static class CrawlDbUpdater extends MapReduceBase implements Mapper<WritableComparable, Writable, Text, CrawlDatum>, Reducer<Text, CrawlDatum, Text, CrawlDatum> {
     long generateTime;
     
     public void configure(JobConf job) {
       generateTime = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
     }
     
-    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+    public void map(WritableComparable key, Writable value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
       if (key instanceof FloatWritable) { // tempDir source
         SelectorEntry se = (SelectorEntry)value;
         output.collect(se.url, se.datum);
       } else {
-        output.collect(key, value);
+        output.collect((Text)key, (CrawlDatum)value);
       }
     }
 
-    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+    public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
       CrawlDatum orig = null;
       LongWritable genTime = null;
       while (values.hasNext()) {
-        CrawlDatum val = (CrawlDatum)values.next();
+        CrawlDatum val = values.next();
         if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
           genTime = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
           if (genTime.get() != generateTime) {
@@ -340,13 +339,10 @@
         orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
       }
       output.collect(key, orig);
-    }
-    
+    }    
   }
   
-  public Generator() {
-    
-  }
+  public Generator() {}
   
   public Generator(Configuration conf) {
     setConf(conf);
@@ -523,7 +519,7 @@
    * Generate a fetchlist from the crawldb.
    */
   public static void main(String args[]) throws Exception {
-    int res = new Generator().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new Generator(), args);
     System.exit(res);
   }
   
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbReader.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbReader.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbReader.java	(working copy)
@@ -23,12 +23,12 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.nutch.util.NutchConfiguration;
@@ -37,10 +37,10 @@
 import java.util.Iterator;
 
 /** . */
-public class LinkDbReader extends ToolBase implements Closeable {
+public class LinkDbReader extends Configured implements Tool, Closeable {
   public static final Log LOG = LogFactory.getLog(LinkDbReader.class);
 
-  private static final Partitioner PARTITIONER = new HashPartitioner();
+  private static final Partitioner<WritableComparable, Writable> PARTITIONER = new HashPartitioner<WritableComparable, Writable>();
 
   private FileSystem fs;
   private Path directory;
@@ -111,7 +111,7 @@
   }
   
   public static void main(String[] args) throws Exception {
-    int res = new LinkDbReader().doMain(NutchConfiguration.create(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkDbReader(), args);
     System.exit(res);
   }
   
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/parse/ParseSegment.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/parse/ParseSegment.java	(revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/parse/ParseSegment.java	(working copy)
@@ -23,7 +23,7 @@
 import org.apache.nutch.crawl.SignatureFactory;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.*;
 import org.apache.hadoop.conf.*;
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.protocol.*;
@@ -37,7 +37,7 @@
 import java.util.Map.Entry;
 
 /* Parse content in a segment. */
-public class ParseSegment extends Configured implements Mapper, Reducer {
+public class ParseSegment extends Configured implements Tool, Mapper<WritableComparable, Content, Text, ParseImpl>, Reducer<Text, Writable, Text, Writable> {
 
   public static final Log LOG = LogFactory.getLog(Parser.class);
   
@@ -60,15 +60,14 @@
   
   private Text newKey = new Text();
 
-  public void map(WritableComparable key, Writable value,
-                  OutputCollector output, Reporter reporter)
+  public void map(WritableComparable key, Content content,
+                  OutputCollector<Text, ParseImpl> output, Reporter reporter)
     throws IOException {
     // convert on the fly from old UTF8 keys
     if (key instanceof UTF8) {
       newKey.set(key.toString());
       key = newKey;
     }
-    Content content = (Content) value;
 
     ParseResult parseResult = null;
     try {
@@ -111,8 +110,8 @@
     }
   }
 
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter)
+  public void reduce(Text key, Iterator<Writable> values,
+                     OutputCollector<Text, Writable> output, Reporter reporter)
     throws IOException {
     output.collect(key, (Writable)values.next()); // collect first value
   }
@@ -144,6 +143,11 @@
 
 
   public static void main(String[] args) throws Exception {
+	int res = ToolRunner.run(NutchConfiguration.create(), new ParseSegment(), args);
+	System.exit(res);
+  }
+	  
+  public int run(String[] args) throws Exception {
     Path segment;
 
     String usage = "Usage: ParseSegment segment";
@@ -151,11 +155,9 @@
     if (args.length == 0) {
       System.err.println(usage);
       System.exit(-1);
-    }
-      
+    }      
     segment = new Path(args[0]);
-
-    ParseSegment parseSegment = new ParseSegment(NutchConfiguration.create());
-    parseSegment.parse(segment);
+    parse(segment);
+    return 0;
   }
 }
