Index: src/java/org/apache/nutch/fetcher/Fetcher.java
===================================================================
--- src/java/org/apache/nutch/fetcher/Fetcher.java	(revision 1160196)
+++ src/java/org/apache/nutch/fetcher/Fetcher.java	(working copy)
@@ -391,18 +391,11 @@
     // called only once the feeder has stopped
     public synchronized int checkTimelimit() {
       int count = 0;
+
       if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
         // emptying the queues
-        for (String id : queues.keySet()) {
-          FetchItemQueue fiq = queues.get(id);
-          if (fiq.getQueueSize() == 0) continue;
-          LOG.info("* queue: " + id + " >> timelimit! ");
-          int deleted = fiq.emptyQueue();
-          for (int i = 0; i < deleted; i++) {
-            totalSize.decrementAndGet();
-          }
-          count += deleted;
-        }
+        count = emptyQueues();
+
         // there might also be a case where totalsize !=0 but number of queues
         // == 0
         // in which case we simply force it to 0 to avoid blocking
@@ -410,6 +403,24 @@
       }
       return count;
     }
+
+    // empties the queues (used by timebomb and throughput threshold)
+    public synchronized int emptyQueues() {
+      int count = 0;
+
+      for (String id : queues.keySet()) {
+        FetchItemQueue fiq = queues.get(id);
+        if (fiq.getQueueSize() == 0) continue;
+        LOG.info("* queue: " + id + " >> dropping! ");
+        int deleted = fiq.emptyQueue();
+        for (int i = 0; i < deleted; i++) {
+          totalSize.decrementAndGet();
+        }
+        count += deleted;
+      }
+
+      return count;
+    }
     
     /**
      * Increment the exception counter of a queue in case of an exception e.g.
@@ -460,7 +471,7 @@
     private FetchItemQueues queues;
     private int size;
     private long timelimit = -1;
-    
+
     public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
         FetchItemQueues queues, int size) {
       this.reader = reader;
@@ -967,15 +978,20 @@
   }
 
   
-  private void reportStatus() throws IOException {
+  private void reportStatus(int pagesLastSec, int bytesLastSec) throws IOException {
     String status;
     long elapsed = (System.currentTimeMillis() - start)/1000;
+
+    float avgPagesSec = Math.round(((float)pages.get()*10)/elapsed)/10;
+    float avgBytesSec = Math.round(((((float)bytes.get())*8)/1000)/elapsed);
+
     status = activeThreads + " threads, " +
      fetchQueues.getQueueCount() + " queues, "+
      fetchQueues.getTotalSize() + " URLs queued, "+
       pages+" pages, "+errors+" errors, "
-      + Math.round(((float)pages.get()*10)/elapsed)/10.0+" pages/s, "
-      + Math.round(((((float)bytes.get())*8)/1000)/elapsed)+" kbits/s, ";
+      + avgPagesSec + " (" + pagesLastSec + ") pages/s, "
+      + avgBytesSec + " (" + bytesLastSec + ") kbits/s, ";
+
     reporter.setStatus(status);
   }
 
@@ -1034,19 +1050,68 @@
     // select a timeout that avoids a task timeout
     long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/timeoutDivisor;
 
+    // Used for threshold check, holds pages and bytes processed in the last second
+    int pagesLastSec;
+    int bytesLastSec;
+
+    // Set to true whenever the threshold has been exceeded for the first time
+    boolean throughputThresholdExceeded = false;
+    int throughputThresholdNumRetries = 0;
+
+    int throughputThresholdPages = getConf().getInt("fetcher.throughput.threshold.pages", -1);
+    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); }
+    int throughputThresholdMaxRetries = getConf().getInt("fetcher.throughput.threshold.retries", 5);
+    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold retries: " + throughputThresholdMaxRetries); }
+
     do {                                          // wait for threads to exit
+      pagesLastSec = pages.get();
+      bytesLastSec = (int)bytes.get();
+
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {}
 
-      reportStatus();
+      pagesLastSec = pages.get() - pagesLastSec;
+      bytesLastSec = (int)bytes.get() - bytesLastSec;
+
+      reportStatus(pagesLastSec, bytesLastSec);
+
       LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
           + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
 
       if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
         fetchQueues.dump();
       }
-      
+
+      // if throughput threshold is enabled
+      if (!feeder.isAlive() && throughputThresholdPages != -1) {
+        // Have we reached the threshold of pages/second and threshold was not yet exceeded
+        if (pagesLastSec > throughputThresholdPages && !throughputThresholdExceeded) {
+          LOG.info("Exceding " + Integer.toString(throughputThresholdPages) + " pages/second");
+          throughputThresholdExceeded = true;
+        }
+
+        // Check if we're dropping below the threshold
+        if (throughputThresholdExceeded && pagesLastSec < throughputThresholdPages) {
+          throughputThresholdNumRetries++;
+          LOG.warn(Integer.toString(throughputThresholdNumRetries) + ": dropping below configured threshold of " + Integer.toString(throughputThresholdPages) + " pages per second");
+
+          // Quit if we dropped below threshold too many times
+          if (throughputThresholdNumRetries == throughputThresholdMaxRetries) {
+            LOG.warn("Dropped below threshold too many times, killing!");
+
+            // Disable the threshold checker
+            throughputThresholdPages = -1;
+
+            // Empty the queues cleanly and get number of items that were dropped
+            int hitByThrougputThreshold = fetchQueues.emptyQueues();
+
+            if (hitByThrougputThreshold != 0) reporter.incrCounter("FetcherStatus",
+              "hitByThrougputThreshold", hitByThrougputThreshold);
+          }
+        }
+      }
+
       // check timelimit
       if (!feeder.isAlive()) {
         int hitByTimeLimit = fetchQueues.checkTimelimit();
Index: conf/nutch-default.xml
===================================================================
--- conf/nutch-default.xml	(revision 1160196)
+++ conf/nutch-default.xml	(working copy)
@@ -695,6 +695,25 @@
   </description>
 </property>
 
+<property>
+  <name>fetcher.throughput.threshold.pages</name>
+  <value>-1</value>
+  <description>The threshold of minimum pages per second. If the fetcher downloads less
+  pages per second than the configured threshold, the fetcher stops, preventing slow queue's
+  from stalling the throughput. This threshold must be an integer. This can be useful when
+  fetcher.timelimit.mins is hard to determine. The default value of -1 disables this check.
+  </description>
+</property>
+
+<property>
+  <name>fetcher.throughput.threshold.retries</name>
+  <value>5</value>
+  <description>The number of times the fetcher.throughput.threshold is allowed to be exceeded.
+  This settings prevents accidental slow downs from immediately killing the fetcher thread.
+  </description>
+</property>
+
+
 <!-- moreindexingfilter plugin properties -->
 
 <property>
