Index: src/java/org/apache/nutch/fetcher/Fetcher.java
===================================================================
--- src/java/org/apache/nutch/fetcher/Fetcher.java	(revision 1590576)
+++ src/java/org/apache/nutch/fetcher/Fetcher.java	(working copy)
@@ -132,6 +132,8 @@
   private boolean parsing;
   FetchItemQueues fetchQueues;
   QueueFeeder feeder;
+  
+  LinkedList<FetcherThread> threads = new LinkedList<FetcherThread>();
 
   /**
    * This class described the item to be fetched.
@@ -593,6 +595,8 @@
 
     private int outlinksDepthDivisor;
     private boolean skipTruncated;
+    
+    private boolean halted = false;
 
     public FetcherThread(Configuration conf) {
       this.setDaemon(true);                       // don't hang JVM on exit
@@ -635,6 +639,13 @@
       try {
 
         while (true) {
+          // check whether must be stopped
+          if (isHalted()) {
+            LOG.debug(getName() + " set to halted");
+            fit = null;
+            return;
+          }
+          
           fit = fetchQueues.getFetchItem();
           if (fit == null) {
             if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
@@ -648,6 +659,7 @@
               continue;
             } else {
               // all done, finish this thread
+              LOG.info("Thread " + getName() + "has no more work available");
               return;
             }
           }
@@ -664,8 +676,8 @@
             redirecting = false;
             redirectCount = 0;
             do {
-              if (LOG.isInfoEnabled()) {
-                LOG.info("fetching " + fit.url + " (queue crawl delay=" + 
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("fetching " + fit.url + " (queue crawl delay=" + 
                          fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay + "ms)"); 
               }
               if (LOG.isDebugEnabled()) {
@@ -1097,6 +1109,14 @@
       return null;
     }
 
+    public synchronized void setHalted(boolean halted) {
+      this.halted = halted;
+    }
+
+    public synchronized boolean isHalted() {
+      return halted;
+    }
+    
   }
 
   public Fetcher() { super(null); }
@@ -1178,7 +1198,9 @@
     getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
 
     for (int i = 0; i < threadCount; i++) {       // spawn threads
-      new FetcherThread(getConf()).start();
+      FetcherThread t = new FetcherThread(getConf());
+      threads.add(t);
+      t.start();
     }
 
     // select a timeout that avoids a task timeout
@@ -1197,7 +1219,13 @@
     int throughputThresholdMaxRetries = getConf().getInt("fetcher.throughput.threshold.retries", 5);
     if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold retries: " + throughputThresholdMaxRetries); }
     long throughputThresholdTimeLimit = getConf().getLong("fetcher.throughput.threshold.check.after", -1);
-
+    
+    int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1);
+    int maxNumThreads = getConf().getInt("fetcher.maxNum.threads", -1);
+    final int bandwidthTargetCheckEveryNSecs  = getConf().getInt("fetcher.bandwidth.target.check.everyNSecs", 30);
+    int bandwidthTargetCheckCounter = 0;
+    long bytesAtLastBWTCheck = 0l;
+    
     do {                                          // wait for threads to exit
       pagesLastSec = pages.get();
       bytesLastSec = (int)bytes.get();
@@ -1214,7 +1242,7 @@
       reportStatus(pagesLastSec, bytesLastSec);
 
       LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
-          + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
+          + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize()+ ", fetchQueues.getQueueCount="+fetchQueues.getQueueCount());
 
       if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
         fetchQueues.dump();
@@ -1242,7 +1270,58 @@
           }
         }
       }
+      
+      // adjust the number of threads if a target bandwidth 
+      // has been set and we have more queues than live threads
+      if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs) bandwidthTargetCheckCounter++;
+      else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs && targetBandwidth!=-1 && maxNumThreads!=-1){  	
+        long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)/bandwidthTargetCheckEveryNSecs;
+        
+    	bytesAtLastBWTCheck = bytes.get();
+    	bandwidthTargetCheckCounter = 0;
+        
+        int averageBdwPerThread = 0;
+        if (activeThreads.get()>0)
+        	averageBdwPerThread = Math.round(bpsSinceLastCheck/activeThreads.get());   
 
+        LOG.info("averageBdwPerThread : "+averageBdwPerThread);
+        
+        if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0){
+          // check whether it is worth doing e.g. more queues than threads
+          if (fetchQueues.getQueueCount() > activeThreads.get()){
+            LOG.info("Has space for more threads : "+bpsSinceLastCheck +" bits last sec vs target : "+targetBandwidth);
+            
+            long remainingBdw = targetBandwidth - bpsSinceLastCheck;
+            int additionalThreads = Math.round(remainingBdw/averageBdwPerThread);
+            int availableThreads = maxNumThreads - activeThreads.get();
+
+            // determine the number of available threads (min between availableThreads and additionalThreads)
+            additionalThreads = (availableThreads < additionalThreads ? availableThreads:additionalThreads);
+            
+            LOG.info("adding new threads : "+additionalThreads);
+            // activate new threads
+            for (int i = 0; i < additionalThreads; i++) {
+              FetcherThread thread = new FetcherThread(getConf());
+              threads.add(thread);
+              thread.start();
+            }
+          }
+        }
+        else if (bpsSinceLastCheck > targetBandwidth && averageBdwPerThread > 0){
+          // if the bandwidth we're using is greater then the expected bandwidth, we have to stop some threads
+          long excessBdw = bpsSinceLastCheck - targetBandwidth;
+          int excessThreads = Math.round(excessBdw/averageBdwPerThread);
+          LOG.info("Exceeding target bandwidth ("+bpsSinceLastCheck/1000/1000 +" vs "+(targetBandwidth/1000/1000)+" Mbps). \t=> excessThreads = "+excessThreads);
+          // keep at least one
+          if (excessThreads >= threads.size()) excessThreads = 0;
+          // de-activates threads
+          for (int i = 0; i < excessThreads; i++) {
+            FetcherThread thread = threads.removeLast();
+            thread.setHalted(true);
+          }
+        }
+      }
+
       // check timelimit
       if (!feeder.isAlive()) {
         int hitByTimeLimit = fetchQueues.checkTimelimit();
