Index: src/java/org/apache/nutch/crawl/CrawlDbFilter.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDbFilter.java	(revision 1692822)
+++ src/java/org/apache/nutch/crawl/CrawlDbFilter.java	(working copy)
@@ -21,6 +21,7 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
@@ -36,7 +37,7 @@
  * @author Andrzej Bialecki
  */
 public class CrawlDbFilter implements
-    Mapper<Text, CrawlDatum, Text, CrawlDatum> {
+    Mapper<Text, Writable, Text, NutchWritable> {
   public static final String URL_FILTERING = "crawldb.url.filters";
 
   public static final String URL_NORMALIZING = "crawldb.url.normalizers";
@@ -76,36 +77,45 @@
 
   private Text newKey = new Text();
 
-  public void map(Text key, CrawlDatum value,
-      OutputCollector<Text, CrawlDatum> output, Reporter reporter)
-      throws IOException {
+  public void map(Text key, Writable value,
+      OutputCollector<Text, NutchWritable> output,
+      Reporter reporter) throws IOException {
 
     String url = key.toString();
-
-    // https://issues.apache.org/jira/browse/NUTCH-1101 check status first,
-    // cheaper than normalizing or filtering
-    if (url404Purging && CrawlDatum.STATUS_DB_GONE == value.getStatus()) {
-      url = null;
+    
+    if (value instanceof Inlinks) {
+      Inlinks v = (Inlinks)value;
+      output.collect(key, new NutchWritable(v));
     }
-    if (url != null && urlNormalizers) {
-      try {
-        url = normalizers.normalize(url, scope); // normalize the url
-      } catch (Exception e) {
-        LOG.warn("Skipping " + url + ":" + e);
+    
+    if (value instanceof CrawlDatum) {
+      CrawlDatum crawlDatum = (CrawlDatum)value;
+      
+      // https://issues.apache.org/jira/browse/NUTCH-1101 check status first,
+      // cheaper than normalizing or filtering
+      if (url404Purging && CrawlDatum.STATUS_DB_GONE == crawlDatum.getStatus()) {
         url = null;
       }
-    }
-    if (url != null && urlFiltering) {
-      try {
-        url = filters.filter(url); // filter the url
-      } catch (Exception e) {
-        LOG.warn("Skipping " + url + ":" + e);
-        url = null;
+      if (url != null && urlNormalizers) {
+        try {
+          url = normalizers.normalize(url, scope); // normalize the url
+        } catch (Exception e) {
+          LOG.warn("Skipping " + url + ":" + e);
+          url = null;
+        }
       }
+      if (url != null && urlFiltering) {
+        try {
+          url = filters.filter(url); // filter the url
+        } catch (Exception e) {
+          LOG.warn("Skipping " + url + ":" + e);
+          url = null;
+        }
+      }
+      if (url != null) { // if it passes
+        newKey.set(url); // collect it
+        output.collect(newKey, new NutchWritable(crawlDatum));
+      }
     }
-    if (url != null) { // if it passes
-      newKey.set(url); // collect it
-      output.collect(newKey, value);
-    }
   }
 }
Index: src/java/org/apache/nutch/crawl/CrawlDb.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDb.java	(revision 1692822)
+++ src/java/org/apache/nutch/crawl/CrawlDb.java	(working copy)
@@ -68,6 +68,12 @@
   public void update(Path crawlDb, Path[] segments, boolean normalize,
       boolean filter, boolean additionsAllowed, boolean force)
       throws IOException {
+    update(crawlDb, segments, normalize, filter, additionsAllowed, false, null);
+  }
+      
+  public void update(Path crawlDb, Path[] segments, boolean normalize,
+      boolean filter, boolean additionsAllowed, boolean force, Path linkDb)
+      throws IOException {
     FileSystem fs = FileSystem.get(getConf());
     Path lock = new Path(crawlDb, LOCK_NAME);
     LockUtil.createLockFile(fs, lock, force);
@@ -89,6 +95,10 @@
       LOG.info("CrawlDb update: URL normalizing: " + normalize);
       LOG.info("CrawlDb update: URL filtering: " + filter);
       LOG.info("CrawlDb update: 404 purging: " + url404Purging);
+      
+      if (linkDb != null) {
+        LOG.info("CrawlDb update: linkdb: " + linkDb);
+      }
     }
 
     for (int i = 0; i < segments.length; i++) {
@@ -180,7 +190,7 @@
   public int run(String[] args) throws Exception {
     if (args.length < 1) {
       System.err
-          .println("Usage: CrawlDb <crawldb> (-dir <segments> | <seg1> <seg2> ...) [-force] [-normalize] [-filter] [-noAdditions]");
+          .println("Usage: CrawlDb <crawldb> (-dir <segments> | <seg1> <seg2> ...) [-force] [-normalize] [-filter] [-noAdditions] [-linkdb <linkdb>]");
       System.err.println("\tcrawldb\tCrawlDb to update");
       System.err
           .println("\t-dir segments\tparent directory containing all segments to update from");
@@ -194,6 +204,8 @@
           .println("\t-filter\tuse URLFilters on urls in CrawlDb and segment");
       System.err
           .println("\t-noAdditions\tonly update already existing URLs, don't add any newly discovered URLs");
+      System.err
+          .println("\t-linkdb <linkdb>\tpath to the linkdb");
 
       return -1;
     }
@@ -204,6 +216,7 @@
         true);
     boolean force = false;
     final FileSystem fs = FileSystem.get(getConf());
+    Path linkDb = null;
     HashSet<Path> dirs = new HashSet<Path>();
     for (int i = 1; i < args.length; i++) {
       if (args[i].equals("-normalize")) {
@@ -214,6 +227,8 @@
         force = true;
       } else if (args[i].equals("-noAdditions")) {
         additionsAllowed = false;
+      } else if (args[i].equals("-linkdb")) {
+        linkDb = new Path(args[++i]);
       } else if (args[i].equals("-dir")) {
         FileStatus[] paths = fs.listStatus(new Path(args[++i]),
             HadoopFSUtil.getPassDirectoriesFilter(fs));
@@ -224,7 +239,7 @@
     }
     try {
       update(new Path(args[0]), dirs.toArray(new Path[dirs.size()]), normalize,
-          filter, additionsAllowed, force);
+          filter, additionsAllowed, force, linkDb);
       return 0;
     } catch (Exception e) {
       LOG.error("CrawlDb update: " + StringUtils.stringifyException(e));
Index: src/java/org/apache/nutch/crawl/CrawlDbReducer.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDbReducer.java	(revision 1692822)
+++ src/java/org/apache/nutch/crawl/CrawlDbReducer.java	(working copy)
@@ -36,7 +36,7 @@
 
 /** Merge new page entries with existing entries. */
 public class CrawlDbReducer implements
-    Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+    Reducer<Text, NutchWritable, Text, CrawlDatum> {
   public static final Logger LOG = LoggerFactory
       .getLogger(CrawlDbReducer.class);
 
@@ -47,7 +47,15 @@
   private boolean additionsAllowed;
   private int maxInterval;
   private FetchSchedule schedule;
+  
+  private boolean readingLinkdb = false;
+  private boolean skipOrphans = false;
+  private int removeOrphansAfter = FetchSchedule.SECONDS_PER_DAY * 45;
 
+  public static Text WRITABLE_ORPHAN_KEY = new Text("orphan");
+  public static Text WRITABLE_HOMEPAGE_KEY = new Text("homepage");
+  public static Text WRITABLE_TRUE_KEY = new Text("true");
+
   public void configure(JobConf job) {
     retryMax = job.getInt("db.fetch.retry.max", 3);
     scfilters = new ScoringFilters(job);
@@ -56,12 +64,15 @@
     schedule = FetchScheduleFactory.getFetchSchedule(job);
     int maxLinks = job.getInt("db.update.max.inlinks", 10000);
     linked = new InlinkPriorityQueue(maxLinks);
+    skipOrphans = job.getBoolean("db.skip.orphans", false);
+    removeOrphansAfter = job.getInt("db.remove.orphans.after", FetchSchedule.SECONDS_PER_DAY * 45);
+    readingLinkdb = job.getBoolean("db.process.inlinks", false);
   }
 
   public void close() {
   }
 
-  public void reduce(Text key, Iterator<CrawlDatum> values,
+  public void reduce(Text key, Iterator<NutchWritable> values,
       OutputCollector<Text, CrawlDatum> output, Reporter reporter)
       throws IOException {
 
@@ -74,64 +85,108 @@
     boolean multiple = false; // avoid deep copy when only single value exists
     linked.clear();
     org.apache.hadoop.io.MapWritable metaFromParse = null;
+    boolean isOrphaned = false;
+    Inlinks inlinks = null;
 
     while (values.hasNext()) {
-      CrawlDatum datum = values.next();
-      if (!multiple && values.hasNext())
-        multiple = true;
-      if (CrawlDatum.hasDbStatus(datum)) {
-        if (!oldSet) {
-          if (multiple) {
-            old.set(datum);
+      Writable value = values.next().get();
+      
+      if (value instanceof Inlinks) {
+        inlinks = (Inlinks)value;
+      }
+      
+      if (value instanceof CrawlDatum) {
+        CrawlDatum datum = (CrawlDatum)value;
+        if (!multiple && values.hasNext())
+          multiple = true;
+        if (CrawlDatum.hasDbStatus(datum)) {
+          if (!oldSet) {
+            if (multiple) {
+              old.set(datum);
+            } else {
+              // no need for a deep copy - this is the only value
+              old = datum;
+            }
+            oldSet = true;
           } else {
-            // no need for a deep copy - this is the only value
-            old = datum;
+            // always take the latest version
+            if (old.getFetchTime() < datum.getFetchTime())
+              old.set(datum);
           }
-          oldSet = true;
-        } else {
-          // always take the latest version
-          if (old.getFetchTime() < datum.getFetchTime())
-            old.set(datum);
+          continue;
         }
-        continue;
-      }
 
-      if (CrawlDatum.hasFetchStatus(datum)) {
-        if (!fetchSet) {
+        if (CrawlDatum.hasFetchStatus(datum)) {
+          if (!fetchSet) {
+            if (multiple) {
+              fetch.set(datum);
+            } else {
+              fetch = datum;
+            }
+            fetchSet = true;
+          } else {
+            // always take the latest version
+            if (fetch.getFetchTime() < datum.getFetchTime())
+              fetch.set(datum);
+          }
+          continue;
+        }
+
+        switch (datum.getStatus()) { // collect other info
+        case CrawlDatum.STATUS_LINKED:
+          CrawlDatum link;
           if (multiple) {
-            fetch.set(datum);
+            link = new CrawlDatum();
+            link.set(datum);
           } else {
-            fetch = datum;
+            link = datum;
           }
-          fetchSet = true;
-        } else {
-          // always take the latest version
-          if (fetch.getFetchTime() < datum.getFetchTime())
-            fetch.set(datum);
+          linked.insert(link);
+          break;
+        case CrawlDatum.STATUS_SIGNATURE:
+          signature = datum.getSignature();
+          break;
+        case CrawlDatum.STATUS_PARSE_META:
+          metaFromParse = datum.getMetaData();
+          break;
+        default:
+          LOG.warn("Unknown status, key: " + key + ", datum: " + datum);
         }
-        continue;
       }
+    }
+    
+    long currentTime = System.currentTimeMillis();
+    
+    // Handle orphan pages
+    if (oldSet) {    
+      // First check if the URL has been rediscovered after it has been marked as orphan
+      if (readingLinkdb && inlinks != null && old.getMetaData().get(WRITABLE_ORPHAN_KEY) != null) {
+        // Remove any existing orphan key
+        old.getMetaData().remove(WRITABLE_ORPHAN_KEY);
+        fetch.getMetaData().remove(WRITABLE_ORPHAN_KEY);
+      }
+      
+      // Check orphaned pages, do we need to remove them now??
+      if (skipOrphans && old.getMetaData().get(WRITABLE_ORPHAN_KEY) != null) {
 
-      switch (datum.getStatus()) { // collect other info
-      case CrawlDatum.STATUS_LINKED:
-        CrawlDatum link;
-        if (multiple) {
-          link = new CrawlDatum();
-          link.set(datum);
-        } else {
-          link = datum;
+        Writable orphanWritable = old.getMetaData().get(WRITABLE_ORPHAN_KEY);
+        long timeToDelete = Long.parseLong(orphanWritable.toString());
+        
+        // Due for deletion?
+        if (timeToDelete < currentTime) {
+          LOG.info("Orphaned page " + key + " is not removed.");
+          return;
         }
-        linked.insert(link);
-        break;
-      case CrawlDatum.STATUS_SIGNATURE:
-        signature = datum.getSignature();
-        break;
-      case CrawlDatum.STATUS_PARSE_META:
-        metaFromParse = datum.getMetaData();
-        break;
-      default:
-        LOG.warn("Unknown status, key: " + key + ", datum: " + datum);
       }
+      
+      // Mark pages as orphan and as GONE so an indexer can remove it later
+      if (readingLinkdb && inlinks == null && old.getMetaData().get(WRITABLE_ORPHAN_KEY) == null) {
+        isOrphaned = true;
+        long timeToDelete = currentTime + (removeOrphansAfter * 1000L);
+        old.getMetaData().put(WRITABLE_ORPHAN_KEY, new LongWritable(timeToDelete));
+        reporter.incrCounter("CrawlDB status", "orphans", 1);
+        LOG.info(key + " has no inlinks, marked as orphan.");
+      }
     }
 
     // copy the content of the queue into a List
@@ -314,6 +369,12 @@
         LOG.warn("Couldn't update score, key=" + key + ": " + e);
       }
     }
+    
+    // Regardless of other status, mark as GONE if this is an orphan
+    if (isOrphaned) {
+      result.setStatus(CrawlDatum.STATUS_DB_GONE);
+    }
+
     // remove generation time, if any
     result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);
     output.collect(key, result);
