Index: src/java/org/apache/nutch/fetcher/Fetcher.java
===================================================================
--- src/java/org/apache/nutch/fetcher/Fetcher.java	(revision 883295)
+++ src/java/org/apache/nutch/fetcher/Fetcher.java	(working copy)
@@ -208,6 +208,7 @@
     List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
     Set<FetchItem>  inProgress = Collections.synchronizedSet(new HashSet<FetchItem>());
     AtomicLong nextFetchTime = new AtomicLong();
+    AtomicInteger exceptionCounter = new AtomicInteger();
     long crawlDelay;
     long minCrawlDelay;
     int maxThreads;
@@ -221,7 +222,13 @@
       // ready to start
       setEndTime(System.currentTimeMillis() - crawlDelay);
     }
-    
+
+    public synchronized int emptyQueue() {
+      int presize = queue.size();
+      queue.clear();
+      return presize;
+    }
+
     public int getQueueSize() {
       return queue.size();
     }
@@ -229,8 +236,12 @@
     public int getInProgressSize() {
       return inProgress.size();
     }
-    
-    public void finishFetchItem(FetchItem it, boolean asap) {
+
+    public synchronized int incrementExceptionCounter() {
+      return exceptionCounter.incrementAndGet();
+    }
+
+    public synchronized void finishFetchItem(FetchItem it, boolean asap) {
       if (it != null) {
         inProgress.remove(it);
         setEndTime(System.currentTimeMillis(), asap);
@@ -299,8 +310,9 @@
     boolean byIP;
     long crawlDelay;
     long minCrawlDelay;
-    Configuration conf;    
-    
+    int maxNumberExceptionsPerQueue = -1;
+    Configuration conf;
+
     public FetchItemQueues(Configuration conf) {
       this.conf = conf;
       this.maxThreads = conf.getInt("fetcher.threads.per.host", 1);
@@ -308,6 +320,7 @@
       this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
       this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
       this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
+      this.maxNumberExceptionsPerQueue = conf.getInt("fetcher.max.exceptions.per.queue", -1);
     }
     
     public int getTotalSize() {
@@ -370,7 +383,35 @@
       }
       return null;
     }
-    
+
+    /**
+     * increment the time out counter of a queue in case of an exception e.g.
+     * timeout when higher that a given threshold simply empty the queue
+     * 
+     * @param queueid
+     * @return
+     */
+    public synchronized int checkExceptionThreshold(String queueid) {
+      FetchItemQueue fiq = queues.get(queueid);
+      if (fiq == null)
+        return 0;
+      if (fiq.getQueueSize() == 0)
+        return 0;
+      int excCount = fiq.incrementExceptionCounter();
+      if (maxNumberExceptionsPerQueue!= -1 && excCount >= maxNumberExceptionsPerQueue) {
+        // we've allowed too many exceptions for
+        // this queue - purge it
+        int deleted = fiq.emptyQueue();
+        LOG.info("* queue: " + queueid + " >> removed " + deleted
+            + " URLs from queue because " + excCount + " exceptions occurred");
+        for (int i = 0; i < deleted; i++) {
+          totalSize.decrementAndGet();
+        }
+        return deleted;
+      }
+      return 0;
+    }
+
     public synchronized void dump() {
       for (String id : queues.keySet()) {
         FetchItemQueue fiq = queues.get(id);
