Index: src/java/org/apache/nutch/crawl/CrawlStatus.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlStatus.java	(revision 1550034)
+++ src/java/org/apache/nutch/crawl/CrawlStatus.java	(working copy)
@@ -34,6 +34,8 @@
   public static final byte STATUS_RETRY          = 0x22;
   /** Fetching successful - page is not modified. */
   public static final byte STATUS_NOTMODIFIED    = 0x26;
+  /** Page duplicated **/
+  public static final byte STATUS_DUPLICATED      = 0x27;
   
   private static final Map<Byte, String> NAMES = new HashMap<Byte, String>();
   
@@ -45,6 +47,7 @@
     NAMES.put(STATUS_REDIR_PERM, "status_redir_perm");
     NAMES.put(STATUS_RETRY, "status_retry");
     NAMES.put(STATUS_NOTMODIFIED, "status_notmodified");
+    NAMES.put(STATUS_DUPLICATED, "status_duplicated");
   }
   
   public static String getName(byte status) {
Index: src/java/org/apache/nutch/crawl/DeduplicationJob.java
===================================================================
--- src/java/org/apache/nutch/crawl/DeduplicationJob.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/DeduplicationJob.java	(working copy)
@@ -0,0 +1,187 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.crawl;
+
+import org.apache.gora.mapreduce.GoraMapper;
+import org.apache.gora.mapreduce.GoraReducer;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.fetcher.FetchEntry;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.ToolUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+
+public class DeduplicationJob extends NutchTool implements Tool {
+  public static final Logger LOG = LoggerFactory.getLogger(DbUpdaterJob.class);
+
+  private static final Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
+
+  static {
+    FIELDS.add(WebPage.Field.STATUS);
+    FIELDS.add(WebPage.Field.SIGNATURE);
+    FIELDS.add(WebPage.Field.SCORE);
+    FIELDS.add(WebPage.Field.FETCH_TIME);
+  }
+
+  public static class DbFitler extends GoraMapper<String, WebPage, BytesWritable, FetchEntry> {
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+    }
+
+    @Override
+    protected void map(String key, WebPage page, Context context) throws IOException, InterruptedException {
+      byte status = (byte) page.getStatus();
+      if (status == CrawlStatus.STATUS_FETCHED || status == CrawlStatus.STATUS_DUPLICATED
+          || status == CrawlStatus.STATUS_NOTMODIFIED) {
+        
+        ByteBuffer signature = page.getSignature();
+        if (signature == null) {
+          return;
+        }
+        BytesWritable sig = new BytesWritable(signature.array());
+        // reduce on the signature
+        context.write(sig, new FetchEntry(context.getConfiguration(), key, page));
+      }
+    }
+  }
+
+  public static class DedupReducer extends GoraReducer<BytesWritable, FetchEntry, String, WebPage> {
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+    }
+
+    @Override
+    protected void reduce(BytesWritable sig, Iterable<FetchEntry> pages, Context context) throws IOException,
+        InterruptedException {
+      WebPage existingPage = null;
+      String existingKey = null;
+
+      for (FetchEntry page : pages) {
+        if (existingPage == null) {
+          existingPage = page.getWebPage();
+          existingKey = page.getKey();
+          continue;
+        }
+
+        // compare based on score
+        if (existingPage.getScore() < page.getWebPage().getScore()) {
+          writeOutAsDuplicate(existingKey, existingPage, context);
+          existingPage = page.getWebPage();
+          existingKey = page.getKey();
+          continue;
+        } else if (existingPage.getScore() > page.getWebPage().getScore()) {
+          writeOutAsDuplicate(page.getKey(), page.getWebPage(), context);
+          continue;
+        }
+
+        // same score? delete the one which is oldest
+        if (existingPage.getFetchTime() > page.getWebPage().getFetchTime()) {
+          // mark new one as duplicate
+          writeOutAsDuplicate(page.getKey(), page.getWebPage(), context);
+          continue;
+        } else if (existingPage.getFetchTime() < page.getWebPage().getFetchTime()) {
+          writeOutAsDuplicate(existingKey, existingPage, context);
+          existingPage = page.getWebPage();
+          existingKey = page.getKey();
+          continue;
+        }
+        // same time? keep the one which has the shortest URL
+        if (existingKey.length() < page.getKey().length()) {
+          writeOutAsDuplicate(page.getKey(), page.getWebPage(), context);
+          continue;
+        } else if (existingKey.length() > page.getKey().length()) {
+          writeOutAsDuplicate(existingKey, existingPage, context);
+          existingPage = page.getWebPage();
+          existingKey = page.getKey();
+          continue;
+        }
+      }
+      /*byte status = (byte) existingPage.getStatus();
+      if (status == CrawlStatus.STATUS_DUPLICATED) {
+        existingPage.setStatus(CrawlStatus.STATUS_FETCHED);
+        context.getCounter("DeduplicationJobStatus", "Documents marked as non-duplicate").increment(1);
+        context.write(existingKey, existingPage);
+      }*/
+    }
+
+    private void writeOutAsDuplicate(String key, WebPage page, Context context) throws IOException, InterruptedException {
+      page.setStatus(CrawlStatus.STATUS_DUPLICATED);
+      context.getCounter("DeduplicationJobStatus", "Documents marked as duplicate").increment(1);
+      context.write(key, page);
+    }
+  }
+
+  @Override
+  public Map<String, Object> run(Map<String, Object> args) throws Exception {
+    currentJob = new NutchJob(getConf(), "deduplication");
+    StorageUtils.initMapperJob(currentJob, FIELDS, BytesWritable.class, FetchEntry.class,
+        DbFitler.class, null, true);
+    StorageUtils.initReducerJob(currentJob, DedupReducer.class);
+    currentJob.waitForCompletion(true);
+    ToolUtil.recordJobStatus(null, currentJob, results);
+    return results;
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    /*String usage = "Usage: DeduplicateJob [-crawlId <id>] " +
+        "    -crawlId <id> - the id to prefix the schemas to operate on, \n \t \t    (default: storage.crawl.id)\n";*/
+
+    for (int i = 0; i < args.length; i++) {
+      if ("-crawlId".equals(args[i])) {
+        getConf().set(Nutch.CRAWL_ID_KEY, args[++i]);
+      } else {
+        throw new IllegalArgumentException("arg " +args[i]+ " not recognized");
+      }
+    }
+    
+    return deduplicate();
+  }
+  
+  private int deduplicate() throws Exception {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("DeduplicationJob: starting at " + sdf.format(start));
+    
+    run(ToolUtil.toArgMap());
+    
+    long finish = System.currentTimeMillis();
+    LOG.info("DeduplicationJob: finished at " + sdf.format(finish) + ", time elapsed: " + TimingUtil.elapsedTime(start, finish));
+    
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new DeduplicationJob(), args);
+    System.exit(res);
+  }
+}
