Index: src/java/org/apache/nutch/fetcher/Fetcher.java
===================================================================
--- src/java/org/apache/nutch/fetcher/Fetcher.java	(revision 1163171)
+++ src/java/org/apache/nutch/fetcher/Fetcher.java	(working copy)
@@ -391,18 +391,11 @@
     // called only once the feeder has stopped
     public synchronized int checkTimelimit() {
       int count = 0;
+
       if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
         // emptying the queues
-        for (String id : queues.keySet()) {
-          FetchItemQueue fiq = queues.get(id);
-          if (fiq.getQueueSize() == 0) continue;
-          LOG.info("* queue: " + id + " >> timelimit! ");
-          int deleted = fiq.emptyQueue();
-          for (int i = 0; i < deleted; i++) {
-            totalSize.decrementAndGet();
-          }
-          count += deleted;
-        }
+        count = emptyQueues();
+
         // there might also be a case where totalsize !=0 but number of queues
         // == 0
         // in which case we simply force it to 0 to avoid blocking
@@ -410,6 +403,24 @@
       }
       return count;
     }
+
+    // empties the queues (used by timebomb and throughput threshold)
+    public synchronized int emptyQueues() {
+      int count = 0;
+
+      for (String id : queues.keySet()) {
+        FetchItemQueue fiq = queues.get(id);
+        if (fiq.getQueueSize() == 0) continue;
+        LOG.info("* queue: " + id + " >> dropping! ");
+        int deleted = fiq.emptyQueue();
+        for (int i = 0; i < deleted; i++) {
+          totalSize.decrementAndGet();
+        }
+        count += deleted;
+      }
+
+      return count;
+    }
     
     /**
      * Increment the exception counter of a queue in case of an exception e.g.
@@ -460,7 +471,7 @@
     private FetchItemQueues queues;
     private int size;
     private long timelimit = -1;
-    
+
     public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
         FetchItemQueues queues, int size) {
       this.reader = reader;
@@ -967,15 +978,20 @@
   }
 
   
-  private void reportStatus() throws IOException {
+  private void reportStatus(int pagesLastSec, int bytesLastSec) throws IOException {
     String status;
     long elapsed = (System.currentTimeMillis() - start)/1000;
+
+    float avgPagesSec = Math.round(((float)pages.get()*10)/elapsed)/10;
+    float avgBytesSec = Math.round(((((float)bytes.get())*8)/1000)/elapsed);
+
     status = activeThreads + " threads, " +
      fetchQueues.getQueueCount() + " queues, "+
      fetchQueues.getTotalSize() + " URLs queued, "+
       pages+" pages, "+errors+" errors, "
-      + Math.round(((float)pages.get()*10)/elapsed)/10.0+" pages/s, "
-      + Math.round(((((float)bytes.get())*8)/1000)/elapsed)+" kbits/s, ";
+      + avgPagesSec + " (" + pagesLastSec + ") pages/s, "
+      + avgBytesSec + " (" + bytesLastSec + ") kbits/s, ";
+
     reporter.setStatus(status);
   }
 
@@ -1034,19 +1050,68 @@
     // select a timeout that avoids a task timeout
     long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/timeoutDivisor;
 
+    // Used for threshold check, holds pages and bytes processed in the last second
+    int pagesLastSec;
+    int bytesLastSec;
+
+    // Set to true whenever the threshold has been exceeded for the first time
+    boolean throughputThresholdExceeded = false;
+    int throughputThresholdNumRetries = 0;
+
+    int throughputThresholdPages = getConf().getInt("fetcher.throughput.threshold.pages", -1);
+    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); }
+    int throughputThresholdMaxRetries = getConf().getInt("fetcher.throughput.threshold.retries", 5);
+    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold retries: " + throughputThresholdMaxRetries); }
+
     do {                                          // wait for threads to exit
+      pagesLastSec = pages.get();
+      bytesLastSec = (int)bytes.get();
+
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {}
 
-      reportStatus();
+      pagesLastSec = pages.get() - pagesLastSec;
+      bytesLastSec = (int)bytes.get() - bytesLastSec;
+
+      reportStatus(pagesLastSec, bytesLastSec);
+
       LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
           + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
 
       if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
         fetchQueues.dump();
       }
-      
+
+      // if throughput threshold is enabled
+      if (!feeder.isAlive() && throughputThresholdPages != -1) {
+        // Have we reached the threshold of pages/second and threshold was not yet exceeded
+        if (pagesLastSec > throughputThresholdPages && !throughputThresholdExceeded) {
+          LOG.info("Exceding " + Integer.toString(throughputThresholdPages) + " pages/second");
+          throughputThresholdExceeded = true;
+        }
+
+        // Check if we're dropping below the threshold
+        if (throughputThresholdExceeded && pagesLastSec < throughputThresholdPages) {
+          throughputThresholdNumRetries++;
+          LOG.warn(Integer.toString(throughputThresholdNumRetries) + ": dropping below configured threshold of " + Integer.toString(throughputThresholdPages) + " pages per second");
+
+          // Quit if we dropped below threshold too many times
+          if (throughputThresholdNumRetries == throughputThresholdMaxRetries) {
+            LOG.warn("Dropped below threshold too many times, killing!");
+
+            // Disable the threshold checker
+            throughputThresholdPages = -1;
+
+            // Empty the queues cleanly and get number of items that were dropped
+            int hitByThrougputThreshold = fetchQueues.emptyQueues();
+
+            if (hitByThrougputThreshold != 0) reporter.incrCounter("FetcherStatus",
+              "hitByThrougputThreshold", hitByThrougputThreshold);
+          }
+        }
+      }
+
       // check timelimit
       if (!feeder.isAlive()) {
         int hitByTimeLimit = fetchQueues.checkTimelimit();
@@ -1067,7 +1132,7 @@
     
   }
 
-  public void fetch(Path segment, int threads, boolean parsing)
+  public void fetch(Path segment, int threads)
     throws IOException {
 
     checkConfiguration();
@@ -1094,7 +1159,6 @@
 
     job.setInt("fetcher.threads.fetch", threads);
     job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
-    job.setBoolean("fetcher.parse", parsing);
 
     // for politeness, don't permit parallel execution of a single task
     job.setSpeculativeExecution(false);
@@ -1124,7 +1188,7 @@
   
   public int run(String[] args) throws Exception {
 
-    String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]";
+    String usage = "Usage: Fetcher <segment> [-threads n]";
 
     if (args.length < 1) {
       System.err.println(usage);
@@ -1139,15 +1203,13 @@
     for (int i = 1; i < args.length; i++) {       // parse command line
       if (args[i].equals("-threads")) {           // found -threads option
         threads =  Integer.parseInt(args[++i]);
-      } else if (args[i].equals("-noParsing")) parsing = false;
+      }
     }
 
     getConf().setInt("fetcher.threads.fetch", threads);
-    if (!parsing) {
-      getConf().setBoolean("fetcher.parse", parsing);
-    }
+
     try {
-      fetch(segment, threads, parsing);
+      fetch(segment, threads);
       return 0;
     } catch (Exception e) {
       LOG.fatal("Fetcher: " + StringUtils.stringifyException(e));
