Index: src/java/org/apache/nutch/crawl/CrawlDbReader.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDbReader.java	(revision 1214303)
+++ src/java/org/apache/nutch/crawl/CrawlDbReader.java	(working copy)
@@ -31,6 +31,7 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Closeable;
@@ -39,25 +40,19 @@
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapFileOutputFormat;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.HashPartitioner;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.StringUtil;
 
 /**
@@ -66,7 +61,7 @@
  * @author Andrzej Bialecki
  * 
  */
-public class CrawlDbReader implements Closeable {
+public class CrawlDbReader extends Configured implements Closeable {
 
   public static final Logger LOG = LoggerFactory.getLogger(CrawlDbReader.class);
   
@@ -78,7 +73,7 @@
   private void openReaders(String crawlDb, Configuration config) throws IOException {
     if (readers != null) return;
     FileSystem fs = FileSystem.get(config);
-    readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb,
+    readers = MapFileOutputFormat.getReaders(new Path(crawlDb,
         CrawlDb.CURRENT_NAME), config);
   }
   
@@ -94,7 +89,7 @@
   }
   
   public static class CrawlDatumCsvOutputFormat extends FileOutputFormat<Text,CrawlDatum> {
-    protected static class LineRecordWriter implements RecordWriter<Text,CrawlDatum> {
+    protected static class LineRecordWriter extends RecordWriter<Text,CrawlDatum> {
       private DataOutputStream out;
 
       public LineRecordWriter(DataOutputStream out) {
@@ -133,155 +128,148 @@
           out.writeByte('\n');
       }
 
-      public synchronized void close(Reporter reporter) throws IOException {
+      public synchronized void close(TaskAttemptContext context) throws IOException {
         out.close();
       }
     }
 
-    public RecordWriter<Text,CrawlDatum> getRecordWriter(FileSystem fs, JobConf job, String name,
-        Progressable progress) throws IOException {
-      Path dir = FileOutputFormat.getOutputPath(job);
-      DataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+    public RecordWriter<Text,CrawlDatum> getRecordWriter(TaskAttemptContext context) throws IOException {
+      Path dir = FileOutputFormat.getOutputPath(context);
+// Where is name??
+      DataOutputStream fileOut = FileSystem.get(context.getConfiguration()).create(new Path(dir, getOutputName(context)));
       return new LineRecordWriter(fileOut);
    }
   }
 
-  public static class CrawlDbStatMapper implements Mapper<Text, CrawlDatum, Text, LongWritable> {
+  public static class CrawlDbStatMapper extends Mapper<Text, CrawlDatum, Text, LongWritable> {
     LongWritable COUNT_1 = new LongWritable(1);
     private boolean sort = false;
-    public void configure(JobConf job) {
-      sort = job.getBoolean("db.reader.stats.sort", false );
+    public void setup(Context context) {
+      sort = context.getConfiguration().getBoolean("db.reader.stats.sort", false );
     }
     public void close() {}
-    public void map(Text key, CrawlDatum value, OutputCollector<Text, LongWritable> output, Reporter reporter)
-            throws IOException {
-      output.collect(new Text("T"), COUNT_1);
-      output.collect(new Text("status " + value.getStatus()), COUNT_1);
-      output.collect(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1);
-      output.collect(new Text("s"), new LongWritable((long) (value.getScore() * 1000.0)));
+    public void map(Text key, CrawlDatum value, Context context)
+            throws IOException, InterruptedException {
+      context.write(new Text("T"), COUNT_1);
+      context.write(new Text("status " + value.getStatus()), COUNT_1);
+      context.write(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1);
+      context.write(new Text("s"), new LongWritable((long) (value.getScore() * 1000.0)));
       if(sort){
         URL u = new URL(key.toString());
         String host = u.getHost();
-        output.collect(new Text("status " + value.getStatus() + " " + host), COUNT_1);
+        context.write(new Text("status " + value.getStatus() + " " + host), COUNT_1);
       }
     }
   }
   
-  public static class CrawlDbStatCombiner implements Reducer<Text, LongWritable, Text, LongWritable> {
+  public static class CrawlDbStatCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
     LongWritable val = new LongWritable();
-    
     public CrawlDbStatCombiner() { }
-    public void configure(JobConf job) { }
-    public void close() {}
-    public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter)
-        throws IOException {
+    public void reduce(Text key, Iterable<LongWritable> values, Context context)
+        throws IOException, InterruptedException {
       val.set(0L);
       String k = ((Text)key).toString();
       if (!k.equals("s")) {
-        while (values.hasNext()) {
-          LongWritable cnt = (LongWritable)values.next();
-          val.set(val.get() + cnt.get());
-        }
-        output.collect(key, val);
+          for (LongWritable cnt : values) {
+            val.set(val.get() + cnt.get());
+          }
+          context.write(key, val);
       } else {
         long total = 0;
         long min = Long.MAX_VALUE;
         long max = Long.MIN_VALUE;
-        while (values.hasNext()) {
-          LongWritable cnt = (LongWritable)values.next();
+        for (LongWritable cnt : values) {
           if (cnt.get() < min) min = cnt.get();
           if (cnt.get() > max) max = cnt.get();
           total += cnt.get();
         }
-        output.collect(new Text("scn"), new LongWritable(min));
-        output.collect(new Text("scx"), new LongWritable(max));
-        output.collect(new Text("sct"), new LongWritable(total));
+        context.write(new Text("scn"), new LongWritable(min));
+        context.write(new Text("scx"), new LongWritable(max));
+        context.write(new Text("sct"), new LongWritable(total));
       }
     }
   }
 
-  public static class CrawlDbStatReducer implements Reducer<Text, LongWritable, Text, LongWritable> {
-    public void configure(JobConf job) {}
-    public void close() {}
-    public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter)
-            throws IOException {
+  public static class CrawlDbStatReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
+    public void reduce(Text key, Iterable<LongWritable> values, Context context)
+            throws IOException, InterruptedException {
 
       String k = ((Text) key).toString();
       if (k.equals("T")) {
         // sum all values for this key
         long sum = 0;
-        while (values.hasNext()) {
-          sum += ((LongWritable) values.next()).get();
+        for (LongWritable val : values) {
+          sum += val.get();
         }
         // output sum
-        output.collect(key, new LongWritable(sum));
+        context.write(key, new LongWritable(sum));
       } else if (k.startsWith("status") || k.startsWith("retry")) {
         LongWritable cnt = new LongWritable();
-        while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+        for (LongWritable val : values) {
           cnt.set(cnt.get() + val.get());
         }
-        output.collect(key, cnt);
+        context.write(key, cnt);
       } else if (k.equals("scx")) {
         LongWritable cnt = new LongWritable(Long.MIN_VALUE);
-        while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+        for (LongWritable val : values) {
           if (cnt.get() < val.get()) cnt.set(val.get());
         }
-        output.collect(key, cnt);
+        context.write(key, cnt);
       } else if (k.equals("scn")) {
         LongWritable cnt = new LongWritable(Long.MAX_VALUE);
-        while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+        for (LongWritable val : values) {
           if (cnt.get() > val.get()) cnt.set(val.get());
         }
-        output.collect(key, cnt);
+        context.write(key, cnt);
       } else if (k.equals("sct")) {
         LongWritable cnt = new LongWritable();
-        while (values.hasNext()) {
-          LongWritable val = (LongWritable)values.next();
+        for (LongWritable val : values) {
           cnt.set(cnt.get() + val.get());
         }
-        output.collect(key, cnt);
+        context.write(key, cnt);
       }
     }
   }
 
-  public static class CrawlDbTopNMapper implements Mapper<Text, CrawlDatum, FloatWritable, Text> {
+  public static class CrawlDbTopNMapper extends Mapper<Text, CrawlDatum, FloatWritable, Text> {
     private static final FloatWritable fw = new FloatWritable();
     private float min = 0.0f;
     
-    public void configure(JobConf job) {
-      long lmin = job.getLong("db.reader.topn.min", 0);
+    public void setup(Context context) {
+      long lmin = context.getConfiguration().getLong("db.reader.topn.min", 0);
       if (lmin != 0) {
         min = (float)lmin / 1000000.0f;
       }
     }
     public void close() {}
-    public void map(Text key, CrawlDatum value, OutputCollector<FloatWritable, Text> output, Reporter reporter)
-            throws IOException {
+    public void map(Text key, CrawlDatum value, Context context)
+            throws IOException,InterruptedException {
       if (value.getScore() < min) return; // don't collect low-scoring records
       fw.set(-value.getScore()); // reverse sorting order
-      output.collect(fw, key); // invert mapping: score -> url
+      context.write(fw, key); // invert mapping: score -> url
     }
   }
   
-  public static class CrawlDbTopNReducer implements Reducer<FloatWritable, Text, FloatWritable, Text> {
+  public static class CrawlDbTopNReducer extends Reducer<FloatWritable, Text, FloatWritable, Text> {
     private long topN;
     private long count = 0L;
     
-    public void reduce(FloatWritable key, Iterator<Text> values, OutputCollector<FloatWritable, Text> output, Reporter reporter) throws IOException {
-      while (values.hasNext() && count < topN) {
+    public void reduce(FloatWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+      for (Text val : values) {
         key.set(-key.get());
-        output.collect(key, values.next());
+        context.write(key, val);
         count++;
+
+        if (count >= topN) {
+          continue;
+        }
       }
     }
 
-    public void configure(JobConf job) {
-      topN = job.getLong("db.reader.topn", 100) / job.getNumReduceTasks();
+    public void setup(Context context) {
+      topN = context.getConfiguration().getLong("db.reader.topn", 100) / context.getNumReduceTasks();
     }
-    
     public void close() {}
   }
 
@@ -289,7 +277,7 @@
     closeReaders();
   }
   
-  public void processStatJob(String crawlDb, Configuration config, boolean sort) throws IOException {
+  public void processStatJob(String crawlDb, Configuration conf, boolean sort) throws Exception {
 
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb statistics start: " + crawlDb);
@@ -297,31 +285,32 @@
     
     Path tmpFolder = new Path(crawlDb, "stat_tmp" + System.currentTimeMillis());
 
-    JobConf job = new NutchJob(config);
-    job.setJobName("stats " + crawlDb);
-    job.setBoolean("db.reader.stats.sort", sort);
+    // https://issues.apache.org/jira/browse/NUTCH-1029
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    conf.setBoolean("db.reader.stats.sort", sort);
+    Job job = new Job(conf, "stats " + crawlDb);
+    job.setJarByClass(CrawlDbReader.class);
 
     FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
-    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
 
     job.setMapperClass(CrawlDbStatMapper.class);
     job.setCombinerClass(CrawlDbStatCombiner.class);
     job.setReducerClass(CrawlDbStatReducer.class);
 
     FileOutputFormat.setOutputPath(job, tmpFolder);
-    job.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(LongWritable.class);
 
-    // https://issues.apache.org/jira/browse/NUTCH-1029
-    job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    job.waitForCompletion(true);
 
-    JobClient.runJob(job);
-
     // reading the result
-    FileSystem fileSystem = FileSystem.get(config);
-    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(config, tmpFolder);
+    FileSystem fileSystem = FileSystem.get(conf);
 
+// Cannot get seuqnce file readers??? Gone??
+    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(tmpFolder, conf);
+
     Text key = new Text();
     LongWritable value = new LongWritable();
 
@@ -385,7 +374,7 @@
     return res;
   }
 
-  public void readUrl(String crawlDb, String url, Configuration config) throws IOException {
+  public void readUrl(String crawlDb, String url, Configuration config) throws Exception {
     CrawlDatum res = get(crawlDb, url, config);
     System.out.println("URL: " + url);
     if (res != null) {
@@ -395,7 +384,7 @@
     }
   }
   
-  public void processDumpJob(String crawlDb, String output, Configuration config, int format) throws IOException {
+  public void processDumpJob(String crawlDb, String output, Configuration conf, int format) throws Exception {
 
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb dump: starting");
@@ -404,23 +393,24 @@
     
     Path outFolder = new Path(output);
 
-    JobConf job = new NutchJob(config);
-    job.setJobName("dump " + crawlDb);
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    Job job = new Job(conf, "dump " + crawlDb);
+    job.setJarByClass(CrawlDbReader.class);
 
     FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
-    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
 
     FileOutputFormat.setOutputPath(job, outFolder);
-    if(format == CSV_FORMAT) job.setOutputFormat(CrawlDatumCsvOutputFormat.class);
-    else job.setOutputFormat(TextOutputFormat.class);
+    if(format == CSV_FORMAT) job.setOutputFormatClass(CrawlDatumCsvOutputFormat.class);
+    else job.setOutputFormatClass(TextOutputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(CrawlDatum.class);
 
-    JobClient.runJob(job);
+    job.waitForCompletion(true);
     if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); }
   }
 
-  public void processTopNJob(String crawlDb, long topN, float min, String output, Configuration config) throws IOException {
+  public void processTopNJob(String crawlDb, long topN, float min, String output, Configuration config) throws Exception {
     
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")");
@@ -433,49 +423,53 @@
                "/readdb-topN-temp-"+
                Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
-    JobConf job = new NutchJob(config);
-    job.setJobName("topN prepare " + crawlDb);
+    Configuration conf = getConf();
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    // XXX hmmm, no setFloat() in the API ... :(
+    conf.setLong("db.reader.topn.min", Math.round(1000000.0 * min));
+
+    Job job = new Job(conf, "topN prepare " + crawlDb);
+    job.setJarByClass(CrawlDbReader.class);
+
     FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
-    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setMapperClass(CrawlDbTopNMapper.class);
-    job.setReducerClass(IdentityReducer.class);
+    job.setReducerClass(Reducer.class);
 
     FileOutputFormat.setOutputPath(job, tempDir);
-    job.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setOutputKeyClass(FloatWritable.class);
     job.setOutputValueClass(Text.class);
-
-    // XXX hmmm, no setFloat() in the API ... :(
-    job.setLong("db.reader.topn.min", Math.round(1000000.0 * min));
-    JobClient.runJob(job); 
+    job.waitForCompletion(true);
     
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb topN: collecting topN scores.");
     }
-    job = new NutchJob(config);
-    job.setJobName("topN collect " + crawlDb);
-    job.setLong("db.reader.topn", topN);
 
+    conf.setLong("db.reader.topn", topN);
+    job = new Job(config, "topN collect " + crawlDb);
+    job.setJarByClass(CrawlDbReader.class);
+
     FileInputFormat.addInputPath(job, tempDir);
-    job.setInputFormat(SequenceFileInputFormat.class);
-    job.setMapperClass(IdentityMapper.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setMapperClass(Mapper.class);
     job.setReducerClass(CrawlDbTopNReducer.class);
 
     FileOutputFormat.setOutputPath(job, outFolder);
-    job.setOutputFormat(TextOutputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
     job.setOutputKeyClass(FloatWritable.class);
     job.setOutputValueClass(Text.class);
 
     job.setNumReduceTasks(1); // create a single file.
 
-    JobClient.runJob(job);
+    job.waitForCompletion(true);
     FileSystem fs = FileSystem.get(config);
     fs.delete(tempDir, true);
     if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: done"); }
 
   }
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) throws Exception {
     CrawlDbReader dbr = new CrawlDbReader();
 
     if (args.length < 1) {
