Index: src/java/org/apache/nutch/tools/CrawlDBScanner.java
===================================================================
--- src/java/org/apache/nutch/tools/CrawlDBScanner.java	(revision 1214303)
+++ src/java/org/apache/nutch/tools/CrawlDBScanner.java	(working copy)
@@ -26,24 +26,20 @@
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapFileOutputFormat;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.CrawlDb;
 import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.TimingUtil;
 
 /**
@@ -52,71 +48,66 @@
  * used as a new CrawlDB. The dump mechanism of the crawldb reader is not very
  * useful on large crawldbs as the ouput can be extremely large and the -url
  * function can't help if we don't know what url we want to have a look at.
- * 
+ *
  * @author : Julien Nioche
  */
+public class CrawlDBScanner extends Configured implements Tool {
 
-public class CrawlDBScanner extends Configured implements Tool,
-    Mapper<Text,CrawlDatum,Text,CrawlDatum>, Reducer<Text,CrawlDatum,Text,CrawlDatum> {
-
   public static final Logger LOG = LoggerFactory.getLogger(CrawlDBScanner.class);
 
-  public CrawlDBScanner() {}
 
-  public CrawlDBScanner(Configuration conf) {
-    setConf(conf);
-  }
+  static class CrawlDBScannerMapper extends Mapper<Text,CrawlDatum,Text,CrawlDatum> {
+    private String regex = null;
+    private String status = null;
 
-  public void close() {}
+    public void setup(Context context) {
+      regex = context.getConfiguration().get("CrawlDBScanner.regex");
+      status = context.getConfiguration().get("CrawlDBScanner.status");
+    }
 
-  private String regex = null;
-  private String status = null;
+    public void map(Text url, CrawlDatum crawlDatum, Context context) throws IOException, InterruptedException {
+      // check status
+      if (status != null
+          && !status.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus()))) return;
 
-  public void configure(JobConf job) {
-    regex = job.get("CrawlDBScanner.regex");
-    status = job.get("CrawlDBScanner.status");
-  }
-
-  public void map(Text url, CrawlDatum crawlDatum,
-      OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws IOException {
-
-    // check status
-    if (status != null
-        && !status.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus()))) return;
-
-    // if URL matched regexp dump it
-    if (url.toString().matches(regex)) {
-      output.collect(url, crawlDatum);
+      // if URL matched regexp dump it
+      if (url.toString().matches(regex)) {
+        context.write(url, crawlDatum);
+      }
     }
   }
 
-  public void reduce(Text key, Iterator<CrawlDatum> values,
-      OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws IOException {
-    while (values.hasNext()) {
-      CrawlDatum val = values.next();
-      output.collect(key, val);
+  static class CrawlDBScannerReducer extends Reducer <Text,CrawlDatum,Text,CrawlDatum> {
+    public void reduce(Text key, Iterator<CrawlDatum> values, Context context) throws IOException, InterruptedException {
+      while (values.hasNext()) {
+        CrawlDatum val = values.next();
+        context.write(key, val);
+      }
     }
   }
 
   private void scan(Path crawlDb, Path outputPath, String regex, String status,
-      boolean text) throws IOException {
+      boolean text) throws Exception {
 
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
     LOG.info("CrawlDB scanner: starting at " + sdf.format(start));
 
-    JobConf job = new NutchJob(getConf());
 
-    job.setJobName("Scan : " + crawlDb + " for URLS matching : " + regex);
+    Configuration conf = getConf();
+    conf.set("CrawlDBScanner.regex", regex);
+    if (status != null) conf.set("CrawlDBScanner.status", status);
+    if (text) conf.set("mapred.output.compress", "false");
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
 
-    job.set("CrawlDBScanner.regex", regex);
-    if (status != null) job.set("CrawlDBScanner.status", status);
+    Job job = new Job(conf, "Scan : " + crawlDb + " for URLS matching : " + regex);
+    job.setJarByClass(CrawlDBScanner.class);
 
     FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
-    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
 
-    job.setMapperClass(CrawlDBScanner.class);
-    job.setReducerClass(CrawlDBScanner.class);
+    job.setMapperClass(CrawlDBScannerMapper.class);
+    job.setReducerClass(CrawlDBScannerReducer.class);
 
     FileOutputFormat.setOutputPath(job, outputPath);
 
@@ -124,14 +115,13 @@
     // in order to check something - better to use the text format and avoid
     // compression
     if (text) {
-      job.set("mapred.output.compress", "false");
-      job.setOutputFormat(TextOutputFormat.class);
+      job.setOutputFormatClass(TextOutputFormat.class);
     }
     // otherwise what we will actually create is a mini-crawlDB which can be
     // then used
     // for debugging
     else {
-      job.setOutputFormat(MapFileOutputFormat.class);
+      job.setOutputFormatClass(MapFileOutputFormat.class);
     }
 
     job.setMapOutputKeyClass(Text.class);
@@ -141,7 +131,7 @@
     job.setOutputValueClass(CrawlDatum.class);
 
     try {
-      JobClient.runJob(job);
+      job.waitForCompletion(true);
     } catch (IOException e) {
       throw e;
     }
