diff --git src/java/org/apache/nutch/crawl/DbUpdateMapper.java src/java/org/apache/nutch/crawl/DbUpdateMapper.java
index 48e4913..1c4e1be 100644
--- src/java/org/apache/nutch/crawl/DbUpdateMapper.java
+++ src/java/org/apache/nutch/crawl/DbUpdateMapper.java
@@ -55,15 +55,13 @@ extends GoraMapper<String, WebPage, UrlWithScore, NutchWritable> {
   @Override
   public void map(String key, WebPage page, Context context)
   throws IOException, InterruptedException {
-
-    Utf8 mark = Mark.GENERATE_MARK.checkMark(page);
-    if(!NutchJob.shouldProcess(mark,batchId)) {
+   if(Mark.GENERATE_MARK.checkMark(page) == null) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id (" + mark + ")");
+        LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; not generated yet");
       }
       return;
     }
-
+  
     String url = TableUtil.unreverseUrl(key);
 
     scoreData.clear();
diff --git src/java/org/apache/nutch/crawl/DbUpdaterJob.java src/java/org/apache/nutch/crawl/DbUpdaterJob.java
index 6d59389..b0cf9bc 100644
--- src/java/org/apache/nutch/crawl/DbUpdaterJob.java
+++ src/java/org/apache/nutch/crawl/DbUpdaterJob.java
@@ -22,6 +22,9 @@ import java.util.HashSet;
 import java.util.Map;
 
 import org.apache.avro.util.Utf8;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.MapFieldValueFilter;
+import org.apache.gora.filter.SingleFieldValueFilter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -30,6 +33,7 @@ import org.apache.nutch.crawl.UrlWithScore.UrlScoreComparator;
 import org.apache.nutch.crawl.UrlWithScore.UrlScoreComparator.UrlOnlyComparator;
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.storage.Mark;
 import org.apache.nutch.storage.StorageUtils;
 import org.apache.nutch.storage.WebPage;
 import org.apache.nutch.util.NutchConfiguration;
@@ -103,14 +107,28 @@ public class DbUpdaterJob extends NutchTool implements Tool {
     currentJob.setSortComparatorClass(UrlScoreComparator.class);
     currentJob.setGroupingComparatorClass(UrlOnlyComparator.class);
     
+    MapFieldValueFilter<String, WebPage> batchIdFilter = getBatchIdFilter(batchId);
     StorageUtils.initMapperJob(currentJob, fields, UrlWithScore.class,
-        NutchWritable.class, DbUpdateMapper.class);
+        NutchWritable.class, DbUpdateMapper.class, batchIdFilter);
     StorageUtils.initReducerJob(currentJob, DbUpdateReducer.class);
     currentJob.waitForCompletion(true);
     ToolUtil.recordJobStatus(null, currentJob, results);
     return results;
   }
-  
+
+  private MapFieldValueFilter<String, WebPage> getBatchIdFilter(String batchId) {
+    if (batchId.equals(Nutch.ALL_CRAWL_ID.toString())) {
+      return null;
+    }
+    MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String, WebPage>();
+    filter.setFieldName(WebPage.Field.MARKERS.toString());
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.setMapKey(Mark.GENERATE_MARK.getName());
+    filter.getOperands().add(new Utf8(batchId));
+    return filter;
+  }
+
   private int updateTable(String crawlId,String batchId) throws Exception {
     
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
diff --git src/java/org/apache/nutch/fetcher/FetcherJob.java src/java/org/apache/nutch/fetcher/FetcherJob.java
index 7a94fbf..5010e8f 100644
--- src/java/org/apache/nutch/fetcher/FetcherJob.java
+++ src/java/org/apache/nutch/fetcher/FetcherJob.java
@@ -26,6 +26,8 @@ import java.util.Random;
 import java.util.StringTokenizer;
 
 import org.apache.avro.util.Utf8;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.MapFieldValueFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -109,10 +111,10 @@ public class FetcherJob extends NutchTool implements Tool {
     @Override
     protected void map(String key, WebPage page, Context context)
         throws IOException, InterruptedException {
-      Utf8 mark = Mark.GENERATE_MARK.checkMark(page);
-      if (!NutchJob.shouldProcess(mark, batchId)) {
+      if (Mark.GENERATE_MARK.checkMark(page) == null) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id (" + mark + ")");
+          LOG.debug("Skipping " + TableUtil.unreverseUrl(key)
+              + "; not generated yet");
         }
         return;
       }
@@ -184,8 +186,10 @@ public class FetcherJob extends NutchTool implements Tool {
     numJobs = 1;
     currentJob = new NutchJob(getConf(), "fetch");
     Collection<WebPage.Field> fields = getFields(currentJob);
+    MapFieldValueFilter<String, WebPage> batchIdFilter = getBatchIdFilter(batchId);
     StorageUtils.initMapperJob(currentJob, fields, IntWritable.class,
-        FetchEntry.class, FetcherMapper.class, FetchEntryPartitioner.class, false);
+        FetchEntry.class, FetcherMapper.class, FetchEntryPartitioner.class,
+        batchIdFilter, false);
     StorageUtils.initReducerJob(currentJob, FetcherReducer.class);
     if (numTasks == null || numTasks < 1) {
       currentJob.setNumReduceTasks(currentJob.getConfiguration().getInt("mapred.map.tasks",
@@ -198,7 +202,20 @@ public class FetcherJob extends NutchTool implements Tool {
     return results;
   }
 
-  /**
+  private MapFieldValueFilter<String, WebPage> getBatchIdFilter(String batchId) {
+    if (batchId.equals(Nutch.ALL_CRAWL_ID.toString())) {
+      return null;
+    }
+    MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String, WebPage>();
+    filter.setFieldName(WebPage.Field.MARKERS.toString());
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.setMapKey(Mark.GENERATE_MARK.getName());
+    filter.getOperands().add(new Utf8(batchId));
+    return filter;
+  }
+
+    /**
    * Run fetcher.
    * @param batchId batchId (obtained from Generator) or null to fetch all generated fetchlists
    * @param threads number of threads per map task
diff --git src/java/org/apache/nutch/indexer/IndexingJob.java src/java/org/apache/nutch/indexer/IndexingJob.java
index c8e649f..32433c7 100644
--- src/java/org/apache/nutch/indexer/IndexingJob.java
+++ src/java/org/apache/nutch/indexer/IndexingJob.java
@@ -22,6 +22,8 @@ import java.util.HashSet;
 import java.util.Map;
 
 import org.apache.avro.util.Utf8;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.MapFieldValueFilter;
 import org.apache.gora.mapreduce.GoraMapper;
 import org.apache.gora.mapreduce.StringComparator;
 import org.apache.gora.store.DataStore;
@@ -99,14 +101,12 @@ public class IndexingJob extends NutchTool implements Tool {
       }
 
       Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);
-      if (!batchId.equals(REINDEX)) {
-        if (!NutchJob.shouldProcess(mark, batchId)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipping " + TableUtil.unreverseUrl(key)
-                + "; different batch id (" + mark + ")");
-          }
-          return;
+      if (mark == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping " + TableUtil.unreverseUrl(key)
+              + "; not updated on db yet");
         }
+        return;
       }
 
       NutchDocument doc = indexUtil.index(key, page);
@@ -145,8 +145,9 @@ public class IndexingJob extends NutchTool implements Tool {
         StringComparator.class, RawComparator.class);
 
     Collection<WebPage.Field> fields = getFields(job);
+    MapFieldValueFilter<String, WebPage> batchIdFilter = getBatchIdFilter(batchId);
     StorageUtils.initMapperJob(job, fields, String.class, NutchDocument.class,
-        IndexerMapper.class);
+        IndexerMapper.class, batchIdFilter);
     job.setNumReduceTasks(0);
     job.setOutputFormatClass(IndexerOutputFormat.class);
 
@@ -155,6 +156,20 @@ public class IndexingJob extends NutchTool implements Tool {
     return results;
   }
 
+  private MapFieldValueFilter<String, WebPage> getBatchIdFilter(String batchId) {
+    if (batchId.equals(REINDEX.toString())
+        || batchId.equals(Nutch.ALL_CRAWL_ID.toString())) {
+      return null;
+    }
+    MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String, WebPage>();
+    filter.setFieldName(WebPage.Field.MARKERS.toString());
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.setMapKey(Mark.UPDATEDB_MARK.getName());
+    filter.getOperands().add(new Utf8(batchId));
+    return filter;
+  }
+
   public void index(String batchId) throws Exception {
     LOG.info("IndexingJob: starting");
 
diff --git src/java/org/apache/nutch/parse/ParserJob.java src/java/org/apache/nutch/parse/ParserJob.java
index ba17744..8301022 100644
--- src/java/org/apache/nutch/parse/ParserJob.java
+++ src/java/org/apache/nutch/parse/ParserJob.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.Map;
 
 import org.apache.avro.util.Utf8;
+import org.apache.gora.filter.MapFieldValueFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -47,6 +48,8 @@ import org.apache.nutch.util.StringUtil;
 import org.apache.nutch.util.TableUtil;
 import org.apache.nutch.util.TimingUtil;
 import org.apache.nutch.util.ToolUtil;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.SingleFieldValueFilter;
 import org.apache.gora.mapreduce.GoraMapper;
 
 public class ParserJob extends NutchTool implements Tool {
@@ -102,14 +105,14 @@ public class ParserJob extends NutchTool implements Tool {
     @Override
     public void map(String key, WebPage page, Context context)
         throws IOException, InterruptedException {
-      CharSequence mark = Mark.FETCH_MARK.checkMark(page);
       String unreverseKey = TableUtil.unreverseUrl(key);
       if (batchId.equals(REPARSE)) {
         LOG.debug("Reparsing " + unreverseKey);
       } else {
-        if (!NutchJob.shouldProcess(mark, batchId)) {
+        if (Mark.FETCH_MARK.checkMark(page) == null) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id (" + mark + ")");
+            LOG.debug("Skipping " + TableUtil.unreverseUrl(key)
+                + "; not fetched yet");
           }
           return;
         }
@@ -247,8 +250,9 @@ public class ParserJob extends NutchTool implements Tool {
     currentJob = new NutchJob(getConf(), "parse");
     
     Collection<WebPage.Field> fields = getFields(currentJob);
-    StorageUtils.initMapperJob(currentJob, fields, String.class, WebPage.class,
-        ParserMapper.class);
+    MapFieldValueFilter<String, WebPage> batchIdFilter = getBatchIdFilter(batchId);
+	StorageUtils.initMapperJob(currentJob, fields, String.class, WebPage.class,
+        ParserMapper.class, batchIdFilter);
     StorageUtils.initReducerJob(currentJob, IdentityPageReducer.class);
     currentJob.setNumReduceTasks(0);
 
@@ -257,6 +261,20 @@ public class ParserJob extends NutchTool implements Tool {
     return results;
   }
 
+  private MapFieldValueFilter<String, WebPage> getBatchIdFilter(String batchId) {
+    if (batchId.equals(REPARSE.toString())
+        || batchId.equals(Nutch.ALL_CRAWL_ID.toString())) {
+      return null;
+    }
+    MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String, WebPage>();
+    filter.setFieldName(WebPage.Field.MARKERS.toString());
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.setMapKey(Mark.FETCH_MARK.getName());
+    filter.getOperands().add(new Utf8(batchId));
+    return filter;
+  }
+
   public int parse(String batchId, boolean shouldResume, boolean force) throws Exception {
     
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
diff --git src/java/org/apache/nutch/storage/Mark.java src/java/org/apache/nutch/storage/Mark.java
index d334304..d7e7302 100644
--- src/java/org/apache/nutch/storage/Mark.java
+++ src/java/org/apache/nutch/storage/Mark.java
@@ -55,4 +55,8 @@ public enum Mark {
     }
     return null;
   }
+  
+  public Utf8 getName() {
+	return name;
+  }
 }
diff --git src/java/org/apache/nutch/storage/StorageUtils.java src/java/org/apache/nutch/storage/StorageUtils.java
index 4bdc775..f0a0d16 100644
--- src/java/org/apache/nutch/storage/StorageUtils.java
+++ src/java/org/apache/nutch/storage/StorageUtils.java
@@ -16,6 +16,7 @@
  ******************************************************************************/
 package org.apache.nutch.storage;
 
+import org.apache.gora.filter.Filter;
 import org.apache.gora.mapreduce.GoraMapper;
 import org.apache.gora.mapreduce.GoraOutputFormat;
 import org.apache.gora.mapreduce.GoraReducer;
@@ -115,17 +116,42 @@ public class StorageUtils {
       Class<K> outKeyClass, Class<V> outValueClass,
       Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
       Class<? extends Partitioner<K, V>> partitionerClass, boolean reuseObjects)
-  throws ClassNotFoundException, IOException {
+      throws ClassNotFoundException, IOException {
+    initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass,
+        partitionerClass, null, reuseObjects);
+  }
+
+  public static <K, V> void initMapperJob(Job job,
+      Collection<WebPage.Field> fields, Class<K> outKeyClass,
+      Class<V> outValueClass,
+      Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
+      Class<? extends Partitioner<K, V>> partitionerClass,
+      Filter<String, WebPage> filter, boolean reuseObjects)
+      throws ClassNotFoundException, IOException {
     DataStore<String, WebPage> store = createWebStore(job.getConfiguration(),
         String.class, WebPage.class);
-    if (store==null) throw new RuntimeException("Could not create datastore");
+    if (store == null)
+      throw new RuntimeException("Could not create datastore");
     Query<String, WebPage> query = store.newQuery();
     query.setFields(toStringArray(fields));
-    GoraMapper.initMapperJob(job, query, store,
-        outKeyClass, outValueClass, mapperClass, partitionerClass, reuseObjects);
+    if (filter != null) {
+      query.setFilter(filter);
+    }
+    GoraMapper.initMapperJob(job, query, store, outKeyClass, outValueClass,
+        mapperClass, partitionerClass, reuseObjects);
     GoraOutputFormat.setOutput(job, store, true);
   }
 
+  public static <K, V> void initMapperJob(Job job,
+      Collection<WebPage.Field> fields, Class<K> outKeyClass,
+      Class<V> outValueClass,
+      Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
+      Filter<String, WebPage> filter) throws ClassNotFoundException,
+      IOException {
+    initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass, null,
+        filter, true);
+  }
+
   public static <K, V> void initReducerJob(Job job,
       Class<? extends GoraReducer<K, V, String, WebPage>> reducerClass)
   throws ClassNotFoundException, GoraException {
