Index: src/java/org/apache/nutch/fetcher/Fetcher2.java
===================================================================
--- src/java/org/apache/nutch/fetcher/Fetcher2.java	(revision 646481)
+++ src/java/org/apache/nutch/fetcher/Fetcher2.java	(working copy)
@@ -61,11 +61,11 @@
  * is less than a fixed number (currently set to a multiple of the number of
  * threads).
  * 
- * <p>As items are consumed from the queues, the QueueFeeder continues to add new
- * input items, so that their total count stays fixed (FetcherThread-s may also
- * add new items to the queues e.g. as a results of redirection) - until all
+ * <p>As FetchItem-s are consumed from the queues, the QueueFeeder continues to add new
+ * FetchItem-s, so that their total count stays fixed (FetcherThread-s may also
+ * add new FetchItem-s to the queues e.g. as a results of redirection) - until all
  * input items are exhausted, at which point the number of items in the queues
- * begins to decrease. When this number reaches 0 fetcher will finish.
+ * begins to decrease. When this number reaches 0, the fetcher will finish.
  * 
  * <p>This fetcher implementation handles per-host blocking itself, instead
  * of delegating this work to protocol-specific plugins.
@@ -119,7 +119,7 @@
   private boolean parsing;
   FetchItemQueues fetchQueues;
   QueueFeeder feeder;
-  
+
   /**
    * This class described the item to be fetched.
    */
@@ -128,7 +128,7 @@
     Text url;
     URL u;
     CrawlDatum datum;
-    
+
     public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
       this.url = url;
       this.u = u;
@@ -202,7 +202,16 @@
     long minCrawlDelay;
     int maxThreads;
     Configuration conf;
-    
+    long start = System.currentTimeMillis(); // start time of FetchItemQueue creation
+    AtomicLong bytes = new AtomicLong(0);        // total bytes fetched for this queue
+    AtomicInteger processed = new AtomicInteger(0);
+    AtomicInteger dropped = new AtomicInteger(0);
+    AtomicInteger exceptions = new AtomicInteger(0);
+    AtomicInteger timeouts = new AtomicInteger(0);
+    AtomicInteger kbps = new AtomicInteger(0);
+    boolean tooSlow = false;
+    boolean tooManyErrors = false;
+
     public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
       this.conf = conf;
       this.maxThreads = maxThreads;
@@ -238,25 +247,57 @@
     }
     
     public FetchItem getFetchItem() {
+      FetchItem it = null;
       if (inProgress.size() >= maxThreads) return null;
+      if (queue.size() == 0) return null;
+      // if the queue/server is slow or has too many errors return quickly 
+      if (tooSlow || tooManyErrors) {
+        it = queue.remove(0);
+        inProgress.add(it);
+        processed.incrementAndGet();
+        return it;
+      }
       long now = System.currentTimeMillis();
       if (nextFetchTime.get() > now) return null;
-      FetchItem it = null;
-      if (queue.size() == 0) return null;
       try {
         it = queue.remove(0);
         inProgress.add(it);
+        processed.incrementAndGet();
       } catch (Exception e) {
         LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
       }
+
+      // calculate download speed for this queue/server
+      long elapsed = (System.currentTimeMillis() - start)/1000;
+      kbps.set(Math.round(((((float)bytes.get())*8)/1024)/elapsed));
+      // if we processed enough URLs from this server and the download speed is too low
+      // TODO: OG: ideally we'd mark the server as too slow only if we knew we have a lot more of its URLs to process
+      // 
+      if (kbps.get() < 25 && processed.get() > 20 && queue.size() > 30) {
+        tooSlow = true;
+        LOG.info("SLOW: KBPS: " + kbps + " Exceptions: " + exceptions + ", Timeouts: " + timeouts + 
+            ", Processed: " + processed + ", " + (float)timeouts.get()/(float)processed.get() + " " + it.url);
+      }
+      
+      // TODO: OG: 10 could be a percentage of gen max per host or some N maybe
+//      int maxPerHost = conf.getInt("generate.max.per.host", 20);
+      if (timeouts.get() > 10 && (float)timeouts.get()/(float)processed.get() > 0.20) {
+        tooManyErrors = true;
+        LOG.info("TIMEOUTS: KBPS: " + kbps + " Exceptions: " + exceptions + ", Timeouts: " + timeouts +
+            ", Processed: " + processed + ", " + (float)timeouts.get()/(float)processed.get() + " " + it.url);
+      }
       return it;
     }
-    
+
     public synchronized void dump() {
       LOG.info("  maxThreads    = " + maxThreads);
       LOG.info("  inProgress    = " + inProgress.size());
       LOG.info("  crawlDelay    = " + crawlDelay);
       LOG.info("  minCrawlDelay = " + minCrawlDelay);
+      LOG.info("  kbps          = " + kbps);
+      LOG.info("  processed     = " + processed);
+      LOG.info("  exceptions    = " + exceptions);
+      LOG.info("  timeouts      = " + timeouts);
       LOG.info("  nextFetchTime = " + nextFetchTime.get());
       LOG.info("  now           = " + System.currentTimeMillis());
       for (int i = 0; i < queue.size(); i++) {
@@ -338,6 +379,7 @@
         // initialize queue
         fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
         queues.put(id, fiq);
+        if (LOG.isDebugEnabled()) { LOG.debug("FetchItemQueue created: " + id); }
       }
       return fiq;
     }
@@ -349,7 +391,8 @@
         FetchItemQueue fiq = it.next().getValue();
         // reap empty queues
         if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
-          it.remove();
+          // OG: commented out to avoid the reseting of queue stats (timeouts, exceptions, kbps)
+//          it.remove();
           continue;
         }
         FetchItem fit = fiq.getFetchItem();
@@ -386,6 +429,7 @@
       this.size = size;
       this.setDaemon(true);
       this.setName("QueueFeeder");
+      if (LOG.isInfoEnabled()) { LOG.info("QueueFeeder: size: " + size); }
     }
     
     public void run() {
@@ -465,8 +509,8 @@
           fit = fetchQueues.getFetchItem();
           if (fit == null) {
             if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
-              LOG.debug(getName() + " spin-waiting ...");
-              // spin-wait.
+              LOG.debug(getName() + " spin-waiting (no FetchItem ready)...");
+              // spin-wait
               spinWaiting.incrementAndGet();
               try {
                 Thread.sleep(500);
@@ -478,6 +522,16 @@
               return;
             }
           }
+          // if this queue is too slow or has too many errors, drop its URL and pretend it's gone 
+          FetchItemQueue q = fetchQueues.getFetchItemQueue(fit.queueID);
+          if (q.tooSlow || q.tooManyErrors) {
+            q.dropped.incrementAndGet();
+            LOG.info("Dropped: " + q.tooSlow + "/" + q.tooManyErrors + " " + fit.url);
+            // unblock
+            fetchQueues.finishFetchItem(fit, true);
+            output(fit.url, fit.datum, null, ProtocolStatus.STATUS_GONE, CrawlDatum.STATUS_FETCH_GONE);
+            continue;
+          }
           lastRequestStart.set(System.currentTimeMillis());
           Text reprUrlWritable =
             (Text) fit.datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
@@ -539,6 +593,7 @@
               case ProtocolStatus.SUCCESS:        // got a page
                 pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS);
                 updateStatus(content.getContent().length);
+                q.bytes.addAndGet(content.getContent().length);
                 if (pstatus != null && pstatus.isSuccess() &&
                         pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
                   String newUrl = pstatus.getMessage();
@@ -608,6 +663,14 @@
                 break;
 
               case ProtocolStatus.EXCEPTION:
+                // TODO: OG: look for java.net.Socket*
+                q.exceptions.incrementAndGet();
+//                LOG.info("EXCEPTION: " + q.exceptions + " " + fit.url);
+                if (status.getMessage().startsWith("java.net.SocketTimeoutException")) {
+                  q.timeouts.incrementAndGet();
+//                  LOG.info("TIMEOUT: " + q.timeouts + " " + fit.url);
+                }
+
                 logError(fit.url, status.getMessage());
                 /* FALLTHROUGH */
               case ProtocolStatus.RETRY:          // retry
@@ -818,8 +881,7 @@
     bytes.addAndGet(bytesInPage);
   }
 
-  
-  private void reportStatus() throws IOException {
+  private String reportStatus() throws IOException {
     String status;
     long elapsed = (System.currentTimeMillis() - start)/1000;
     status = activeThreads + " threads, " +
@@ -827,6 +889,7 @@
       + Math.round(((float)pages.get()*10)/elapsed)/10.0+" pages/s, "
       + Math.round(((((float)bytes.get())*8)/1024)/elapsed)+" kb/s, ";
     reporter.setStatus(status);
+    return status;
   }
 
   public void configure(JobConf job) {
@@ -860,7 +923,6 @@
 
     int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
     if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
-
     feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
     //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
     feeder.start();
@@ -882,9 +944,11 @@
       } catch (InterruptedException e) {}
 
       reportStatus();
+//      LOG.info(status);
       LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
-          + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
-
+          + ", fetchQueues=" + fetchQueues.getQueueCount()
+          + ", totalFetchItemsQueued=" + fetchQueues.getTotalSize());
+ 
       if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
         fetchQueues.dump();
       }
@@ -898,7 +962,13 @@
 
     } while (activeThreads.get() > 0);
     LOG.info("-activeThreads=" + activeThreads);
-    
+
+    // dump queue/server information
+    for (String id : fetchQueues.queues.keySet()) {
+      FetchItemQueue fiq = fetchQueues.queues.get(id);
+      LOG.info("STAT: " + id + " " + fiq.processed + " " + fiq.dropped + " "
+          + fiq.exceptions + " " + fiq.timeouts + " " + fiq.kbps);
+    }
   }
 
   public void fetch(Path segment, int threads, boolean parsing)
