Index: src/java/org/apache/nutch/fetcher/Fetcher2.java
===================================================================
--- src/java/org/apache/nutch/fetcher/Fetcher2.java	(revision 0)
+++ src/java/org/apache/nutch/fetcher/Fetcher2.java	(revision 0)
@@ -0,0 +1,754 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+// Commons Logging imports
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+
+import org.apache.lucene.util.PriorityQueue;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.net.*;
+import org.apache.nutch.protocol.*;
+import org.apache.nutch.parse.*;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.*;
+
+
+/** The fetcher. Most of the work is done by plugins. */
+public class Fetcher2 extends Configured implements MapRunnable { 
+
+  public static final Log LOG = LogFactory.getLog(Fetcher2.class);
+  
+  public static final String SIGNATURE_KEY = "nutch.content.digest";
+  public static final String SEGMENT_NAME_KEY = "nutch.segment.name";
+  public static final String SCORE_KEY = "nutch.crawl.score";
+
+  public static class InputFormat extends SequenceFileInputFormat {
+    /** Don't split inputs, to keep things polite. */
+    public FileSplit[] getSplits(FileSystem fs, JobConf job, int nSplits)
+      throws IOException {
+      Path[] files = listPaths(fs, job);
+      FileSplit[] splits = new FileSplit[files.length];
+      for (int i = 0; i < files.length; i++) {
+        splits[i] = new FileSplit(files[i], 0, fs.getLength(files[i]));
+      }
+      return splits;
+    }
+  }
+
+  private RecordReader input;
+  private OutputCollector output;
+  private Reporter reporter;
+  
+  private HashMap queues;
+  private boolean byIP;
+  private long serverDelay;
+  private int queueSize;
+  private int maxDelays;
+  private int maxThreadsPerHost;
+  
+  /**
+   * Maps from host to a Long naming the time it should be unblocked.
+   * The Long is zero while the host is in use, then set to now+wait when
+   * a request finishes.  This way only one thread at a time accesses a
+   * host.
+   */
+  private static HashMap BLOCKED_ADDR_TO_TIME = new HashMap();
+  
+  /**
+   * Maps a host to the number of threads accessing that host.
+   */
+  private static HashMap THREADS_PER_HOST_COUNT = new HashMap();
+  
+  /**
+   * Queue of blocked hosts.  This contains all of the non-zero entries
+   * from BLOCKED_ADDR_TO_TIME, ordered by increasing time.
+   */
+  private static HostQueue BLOCKED_ADDR_QUEUE = new HostQueue(500);
+  
+
+  private String segmentName;
+  private int activeThreads;
+  private int maxRedirect;
+
+  private long start = System.currentTimeMillis(); // start time of fetcher run
+  private long lastRequestStart = start;
+
+  private long bytes;                             // total bytes fetched
+  private int pages;                              // total pages fetched
+  private int errors;                             // total pages errored
+
+  private boolean storingContent;
+  private boolean parsing;
+
+  private class FetcherThread extends Thread {
+    private Configuration conf;
+    private URLFilters urlFilters;
+    private ScoringFilters scfilters;
+    private ParseUtil parseUtil;
+    private UrlNormalizer normalizer;
+    private ProtocolFactory protocolFactory;
+    private boolean hasMoreInput = true;
+
+    public FetcherThread(Configuration conf) {
+      this.setDaemon(true);                       // don't hang JVM on exit
+      this.setName("FetcherThread");              // use an informative name
+      this.conf = conf;
+      this.urlFilters = new URLFilters(conf);
+      this.scfilters = new ScoringFilters(conf);
+      this.parseUtil = new ParseUtil(conf);
+      this.protocolFactory = new ProtocolFactory(conf);
+      this.normalizer = new UrlNormalizerFactory(conf).getNormalizer();
+    }
+
+    public void run() {
+      synchronized (Fetcher2.this) {activeThreads++;} // count threads
+      
+      try {
+        UTF8 key = new UTF8();
+        CrawlDatum datum = new CrawlDatum();
+        
+        while (true) {
+          // TODO : NUTCH-258 ...
+          // If something bad happened, then exit
+          // if (conf.getBoolean("fetcher.exit", false)) {
+          //   break;
+          // ]
+          
+          // check the queues
+          if (!getFromQueue(key, datum, false)) {
+            try {                                   // get next entry from input
+              if (!hasMoreInput || !input.next(key, datum)) {
+                hasMoreInput = false;
+                if (getQueueSize() > 0) {
+                  try {
+                    Thread.sleep(serverDelay);
+                  } catch (Exception e) {}
+                  continue;
+                } else {
+                  break;                              // at eof, exit
+                }
+              }
+            } catch (IOException e) {
+              if (LOG.isFatalEnabled()) {
+                e.printStackTrace(LogUtil.getFatalStream(LOG));
+                LOG.fatal("fetcher caught:"+e.toString());
+              }
+              break;
+            }
+          }
+
+          synchronized (Fetcher2.this) {
+            lastRequestStart = System.currentTimeMillis();
+          }
+
+          // url may be changed through redirects.
+          UTF8 url = new UTF8();
+          url.set(key);
+          try {
+            if (LOG.isInfoEnabled()) { LOG.info("fetching " + url); }
+
+            // fetch the page
+            boolean redirecting;
+            int redirectCount = 0;
+            do {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("redirectCount=" + redirectCount);
+              }
+              redirecting = false;
+              URL u = null;
+              try {
+                u = new URL(url.toString());
+              } catch (Exception e) {
+                LOG.warn("Cannot parse url: " + url, e);
+                continue;
+              }
+              String proto = u.getProtocol().toLowerCase();
+              String host;
+              if (byIP) {
+                try {
+                  InetAddress addr = InetAddress.getByName(u.getHost());
+                  host = addr.getHostAddress();
+                } catch (UnknownHostException e) {
+                  // unable to resolve it, so don't fall back to host name
+                  throw new Exception(e);
+                }
+              } else {
+                host = u.getHost();
+                if (host == null)
+                  throw new Exception("Unknown host for url: " + url);
+              }
+              host = host.toLowerCase();
+              String protoHost = proto + "://" + host;
+              Protocol protocol = this.protocolFactory.getProtocol(url.toString());
+              RobotRules rules = protocol.getRobotRules(url, datum);
+              if (!rules.isAllowed(u)) {
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace("Denied by robots.txt: " + url);
+                }
+                output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE);
+                continue;
+              }
+              long delay = serverDelay;
+              if (rules.getCrawlDelay() > 0) delay = rules.getCrawlDelay();
+              // block address
+              long wait = blockAddr(protoHost, delay);
+              if (wait > 0) {
+                putToQueue(protoHost, url, datum, wait);
+                continue;
+              }
+              ProtocolOutput output = protocol.getProtocolOutput(url, datum);
+              ProtocolStatus status = output.getStatus();
+              Content content = output.getContent();
+              ParseStatus pstatus = null;
+              unblockAddr(protoHost, delay);
+
+              switch(status.getCode()) {
+                
+              case ProtocolStatus.WOULDBLOCK:
+                putToQueue(protoHost, url, datum, status);
+                break;
+
+              case ProtocolStatus.SUCCESS:        // got a page
+                pstatus = output(url, datum, content, CrawlDatum.STATUS_FETCH_SUCCESS);
+                updateStatus(content.getContent().length);
+                if (pstatus != null && pstatus.isSuccess() &&
+                        pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+                  String newUrl = pstatus.getMessage();
+                  newUrl = normalizer.normalize(newUrl);
+                  newUrl = this.urlFilters.filter(newUrl);
+                  if (newUrl != null && !newUrl.equals(url.toString())) {
+                    url = new UTF8(newUrl);
+                    redirecting = true;
+                    redirectCount++;
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug(" - content redirect to " + url);
+                    }
+                  } else if (LOG.isDebugEnabled()) {
+                    LOG.debug(" - content redirect skipped: " +
+                             (newUrl != null ? "to same url" : "filtered"));
+                  }
+                }
+                break;
+
+              case ProtocolStatus.MOVED:         // redirect
+              case ProtocolStatus.TEMP_MOVED:
+                String newUrl = status.getMessage();
+                newUrl = normalizer.normalize(newUrl);
+                newUrl = this.urlFilters.filter(newUrl);
+                if (newUrl != null && !newUrl.equals(url.toString())) {
+                  url = new UTF8(newUrl);
+                  redirecting = true;
+                  redirectCount++;
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug(" - protocol redirect to " + url);
+                  }
+                } else if (LOG.isDebugEnabled()) {
+                  LOG.debug(" - protocol redirect skipped: " +
+                           (newUrl != null ? "to same url" : "filtered"));
+                }
+                break;
+
+              case ProtocolStatus.EXCEPTION:
+                logError(url, status.getMessage());
+              case ProtocolStatus.RETRY:          // retry
+                datum.setRetriesSinceFetch(datum.getRetriesSinceFetch()+1);
+                output(url, datum, null, CrawlDatum.STATUS_FETCH_RETRY);
+                break;
+                
+              case ProtocolStatus.GONE:           // gone
+              case ProtocolStatus.NOTFOUND:
+              case ProtocolStatus.ACCESS_DENIED:
+              case ProtocolStatus.ROBOTS_DENIED:
+              case ProtocolStatus.NOTMODIFIED:
+                output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE);
+                break;
+
+              default:
+                if (LOG.isWarnEnabled()) {
+                  LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+                }
+                output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE);
+              }
+
+              if (redirecting && redirectCount >= maxRedirect) {
+                if (LOG.isInfoEnabled()) {
+                  LOG.info(" - redirect count exceeded " + url);
+                }
+                output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE);
+              }
+
+            } while (redirecting && (redirectCount < maxRedirect));
+
+            
+          } catch (Throwable t) {                 // unexpected exception
+            logError(url, t.toString());
+            output(url, datum, null, CrawlDatum.STATUS_FETCH_RETRY);
+            
+          }
+        }
+
+      } catch (Throwable e) {
+        if (LOG.isFatalEnabled()) {
+          e.printStackTrace(LogUtil.getFatalStream(LOG));
+          LOG.fatal("fetcher caught:"+e.toString());
+        }
+      } finally {
+        synchronized (Fetcher2.this) {activeThreads--;} // count threads
+      }
+    }
+
+    private void logError(UTF8 url, String message) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("fetch of " + url + " failed with: " + message);
+      }
+      synchronized (Fetcher2.this) {               // record failure
+        errors++;
+      }
+    }
+
+    private ParseStatus output(UTF8 key, CrawlDatum datum,
+                        Content content, int status) {
+
+      datum.setStatus(status);
+      datum.setFetchTime(System.currentTimeMillis());
+
+      if (content == null) {
+        String url = key.toString();
+        content = new Content(url, url, new byte[0], "", new Metadata(), this.conf);
+      }
+      Metadata metadata = content.getMetadata();
+      // add segment to metadata
+      metadata.set(SEGMENT_NAME_KEY, segmentName);
+      // add score to content metadata so that ParseSegment can pick it up.
+      try {
+        scfilters.passScoreBeforeParsing(key, datum, content);
+      } catch (Exception e) {
+        if (LOG.isWarnEnabled()) {
+          e.printStackTrace(LogUtil.getWarnStream(LOG));
+          LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+        }
+      }
+
+      Parse parse = null;
+      if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
+        ParseStatus parseStatus;
+        try {
+          parse = this.parseUtil.parse(content);
+          parseStatus = parse.getData().getStatus();
+        } catch (Exception e) {
+          parseStatus = new ParseStatus(e);
+        }
+        if (!parseStatus.isSuccess()) {
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Error parsing: " + key + ": " + parseStatus);
+          }
+          parse = parseStatus.getEmptyParse(getConf());
+        }
+        // Calculate page signature. For non-parsing fetchers this will
+        // be done in ParseSegment
+        byte[] signature = SignatureFactory.getSignature(getConf()).calculate(content, parse);
+        metadata.set(SIGNATURE_KEY, StringUtil.toHexString(signature));
+        datum.setSignature(signature);
+        // Ensure segment name and score are in parseData metadata
+        parse.getData().getContentMeta().set(SEGMENT_NAME_KEY, segmentName);
+        parse.getData().getContentMeta().set(SIGNATURE_KEY, StringUtil.toHexString(signature));
+        try {
+          scfilters.passScoreAfterParsing(key, content, parse);
+        } catch (Exception e) {
+          if (LOG.isWarnEnabled()) {
+            e.printStackTrace(LogUtil.getWarnStream(LOG));
+            LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+          }
+        }
+        
+      }
+
+      try {
+        output.collect
+          (key,
+           new FetcherOutput(datum,
+                             storingContent ? content : null,
+                             parse != null ? new ParseImpl(parse) : null));
+      } catch (IOException e) {
+        if (LOG.isFatalEnabled()) {
+          e.printStackTrace(LogUtil.getFatalStream(LOG));
+          LOG.fatal("fetcher caught:"+e.toString());
+        }
+      }
+      if (parse != null) return parse.getData().getStatus();
+      else return null;
+    }
+    
+  }
+
+  public Fetcher2() { super(null); }
+
+  public Fetcher2(Configuration conf) { super(conf); }
+
+  private synchronized void updateStatus(int bytesInPage) throws IOException {
+    pages++;
+    bytes += bytesInPage;
+  }
+
+  /**
+   * Attempts to block a combination of protocol + host (name or IP,
+   * depending on the config), taking into account the specified crawl delay
+   * between successive requests, and maximum number of concurrent requests.
+   * 
+   * @param protoHost protocol + host (e.g. "http://www.example.com")
+   * @param crawlDelay number of milliseconds between consecutive requests
+   * @return if this returns 0, then address is blocked and ready to be
+   * fetched from (and needs to be unblocked later). If non-zero, then this
+   * value is the time when next attempt should be made.
+   * @throws ProtocolException
+   */
+  private long blockAddr(String protoHost, long crawlDelay) throws ProtocolException {
+    
+    cleanExpiredServerBlocks();                 // free held addresses
+    
+    Long time;
+    synchronized (BLOCKED_ADDR_TO_TIME) {
+      time = (Long) BLOCKED_ADDR_TO_TIME.get(protoHost);
+      if (time == null) {                       // address is free
+        
+        // get # of threads already accessing this addr
+        Integer counter = (Integer)THREADS_PER_HOST_COUNT.get(protoHost);
+        int count = (counter == null) ? 0 : counter.intValue();
+        
+        count++;                              // increment & store
+        THREADS_PER_HOST_COUNT.put(protoHost, new Integer(count));
+        
+        if (count >= maxThreadsPerHost) {
+          BLOCKED_ADDR_TO_TIME.put(protoHost, new Long(0)); // block it
+        }
+        return 0;
+      }
+    }
+    
+    long done = time.longValue();
+    long now = System.currentTimeMillis();
+    long sleep = 0;
+    if (done == 0) {                            // address is still in use
+      sleep = crawlDelay;                      // wait at least delay
+      
+    } else if (now < done) {                    // address is on hold
+      sleep = done - now;                       // wait until its free
+    }
+    
+    return System.currentTimeMillis() + sleep;
+  }
+  
+  private void unblockAddr(String host, long crawlDelay) {
+    synchronized (BLOCKED_ADDR_TO_TIME) {
+      int addrCount = ((Integer)THREADS_PER_HOST_COUNT.get(host)).intValue();
+      if (addrCount == 1) {
+        long fetchTime = System.currentTimeMillis() + crawlDelay;
+        THREADS_PER_HOST_COUNT.remove(host);
+        BLOCKED_ADDR_QUEUE.insert(new HostQueueEntry(null, null, 
+                                                     host, fetchTime));
+        BLOCKED_ADDR_TO_TIME.put(host, new Long(fetchTime));
+      } else {
+        THREADS_PER_HOST_COUNT.put(host, new Integer(addrCount - 1));
+      }
+    }
+  }
+  
+  private static void cleanExpiredServerBlocks() {
+    synchronized (BLOCKED_ADDR_TO_TIME) {
+      HostQueueEntry hqe = (HostQueueEntry) BLOCKED_ADDR_QUEUE.top();
+      long currentTime = System.currentTimeMillis();
+      while (hqe != null && hqe.fetchTime <= currentTime) {
+        BLOCKED_ADDR_TO_TIME.remove(hqe.protoHost);
+        BLOCKED_ADDR_QUEUE.pop();
+      }
+    }
+  }
+  
+  private synchronized void putToQueue(String protoHost, UTF8 u, CrawlDatum datum,
+          ProtocolStatus status) throws Exception {
+    long fetchTime = System.currentTimeMillis();
+    if (status == null) {
+      fetchTime += serverDelay;
+    } else {
+      String msg = status.getMessage();
+      if (msg != null) {
+        try {
+          fetchTime = Long.parseLong(msg);
+        } catch (Exception e) {
+          fetchTime += serverDelay;
+        }
+      } else fetchTime += serverDelay;
+    }
+    putToQueue(protoHost, u, datum, fetchTime);
+  }
+  
+  private synchronized void putToQueue(String protoHost, UTF8 u, CrawlDatum datum,
+          long fetchTime) throws Exception {
+    HostQueue hostQueue = (HostQueue)queues.get(protoHost);
+    if (hostQueue == null) {
+      hostQueue = new HostQueue(500);
+      queues.put(protoHost, hostQueue);
+    }
+    HostQueueEntry hqe = new HostQueueEntry(u, datum, protoHost, fetchTime);
+    hostQueue.insert(hqe);
+    queueSize++;
+    LOG.info("PUT: protoHost=" + protoHost + ", url=" + u + ", size=" + queueSize);
+  }
+  
+  private static class HostQueueEntry {
+    UTF8 url;
+    String protoHost;
+    CrawlDatum datum;
+    long fetchTime;
+    
+    public HostQueueEntry(UTF8 url, CrawlDatum datum, String protoHost, long fetchTime) {
+      this.url = url;
+      this.datum = datum;
+      this.fetchTime = fetchTime;
+      this.protoHost = protoHost;
+    }
+  }
+  
+  private static class HostQueue extends PriorityQueue {
+    
+    public HostQueue(int size) {
+      initialize(size);
+    }
+
+    protected boolean lessThan(Object o1, Object o2) {
+      HostQueueEntry hqe1 = (HostQueueEntry)o1;
+      HostQueueEntry hqe2 = (HostQueueEntry)o2;
+      return hqe1.fetchTime < hqe2.fetchTime;
+    }
+    
+  }
+  
+  private synchronized boolean getFromQueue(UTF8 key, CrawlDatum datum, boolean asap) {
+    if (getQueueSize() == 0) return false;
+    // get the tops
+    Iterator it = queues.values().iterator();
+    HostQueueEntry hqe = new HostQueueEntry(null, null, null, Long.MAX_VALUE);
+    while (it.hasNext()) {
+      HostQueue hq = (HostQueue) it.next();
+      HostQueueEntry tmp = (HostQueueEntry)hq.top();
+      if (tmp != null && (tmp.fetchTime < tmp.fetchTime)) { 
+        hqe = tmp;
+      }
+    }
+    if (hqe.fetchTime == Long.MAX_VALUE) {
+      LOG.info("GET: size=" + queueSize + " but tops count == 0");
+      return false;
+    }
+
+    if (asap) {
+      key.set(hqe.url);
+      datum.set(hqe.datum);
+      HostQueue hq = (HostQueue)queues.get(hqe.protoHost);
+      hq.pop();
+      if(hq.size() == 0) {
+        queues.remove(hq);
+      }
+      queueSize--;
+      LOG.info("GET: protoHost=" + hqe.protoHost + ", url=" + hqe.url + ", size=" + queueSize);
+      return true;
+    } else if (hqe.fetchTime < System.currentTimeMillis()) {
+      key.set(hqe.url);
+      datum.set(hqe.datum);
+      HostQueue hq = (HostQueue)queues.get(hqe.protoHost);
+      hq.pop();
+      if(hq.size() == 0) {
+        queues.remove(hq);
+      }
+      queueSize--;
+      LOG.info("GET: protoHost=" + hqe.protoHost + ", url=" + hqe.url + ", size=" + queueSize);
+      return true;
+    } else {
+      LOG.info("GET: not yet, protoHost=" + hqe.protoHost + ", url=" + hqe.url + ", size=" + queueSize);
+      return false;
+    }
+  }
+  
+  private synchronized int getQueueSize() {
+    return queueSize;
+  }
+  
+  private void reportStatus() throws IOException {
+    String status;
+    synchronized (this) {
+      long elapsed = (System.currentTimeMillis() - start)/1000;
+      status = 
+        pages+" pages, "+errors+" errors, "+queueSize+" in queue, "
+        + Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, "
+        + Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, ";
+    }
+    reporter.setStatus(status);
+  }
+
+  public void configure(JobConf job) {
+    setConf(job);
+
+    this.segmentName = job.get(SEGMENT_NAME_KEY);
+    this.storingContent = isStoringContent(job);
+    this.parsing = isParsing(job);
+    this.queues = new HashMap();
+    this.queueSize = 0;
+    this.maxDelays = job.getInt("http.max.delays", 3);
+    this.maxThreadsPerHost = job.getInt("fetcher.threads.per.host", 1);
+    // backward-compatible default setting
+    this.byIP = job.getBoolean("fetcher.threads.per.host.by.ip", true);
+    this.serverDelay = (long) (job.getFloat("fetcher.server.delay", 1.0f) * 1000);
+
+//    if (job.getBoolean("fetcher.verbose", false)) {
+//      LOG.setLevel(Level.FINE);
+//    }
+  }
+
+  public void close() {}
+
+  public static boolean isParsing(Configuration conf) {
+    return conf.getBoolean("fetcher.parse", true);
+  }
+
+  public static boolean isStoringContent(Configuration conf) {
+    return conf.getBoolean("fetcher.store.content", true);
+  }
+
+  public void run(RecordReader input, OutputCollector output,
+                  Reporter reporter) throws IOException {
+
+    this.input = input;
+    this.output = output;
+    this.reporter = reporter;
+
+    this.maxRedirect = getConf().getInt("http.redirect.max", 3);
+    
+    int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
+    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
+
+    for (int i = 0; i < threadCount; i++) {       // spawn threads
+      new FetcherThread(getConf()).start();
+    }
+
+    // select a timeout that avoids a task timeout
+    long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
+
+    do {                                          // wait for threads to exit
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {}
+
+      reportStatus();
+
+      // some requests seem to hang, despite all intentions
+      synchronized (this) {
+        if ((System.currentTimeMillis() - lastRequestStart) > timeout) {
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Aborting with "+activeThreads+" hung threads.");
+          }
+          return;
+        }
+      }
+
+    } while (activeThreads > 0);
+    
+  }
+
+  public void fetch(Path segment, int threads, boolean parsing)
+    throws IOException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Fetcher: starting");
+      LOG.info("Fetcher: segment: " + segment);
+    }
+
+    JobConf job = new NutchJob(getConf());
+    job.setJobName("fetch " + segment);
+
+    job.setInt("fetcher.threads.fetch", threads);
+    job.set(SEGMENT_NAME_KEY, segment.getName());
+    job.setBoolean("fetcher.parse", parsing);
+
+    // for politeness, don't permit parallel execution of a single task
+    job.setSpeculativeExecution(false);
+
+    job.setInputPath(new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
+    job.setInputFormat(InputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(CrawlDatum.class);
+
+    job.setMapRunnerClass(Fetcher2.class);
+
+    job.setOutputPath(segment);
+    job.setOutputFormat(FetcherOutputFormat.class);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(FetcherOutput.class);
+
+    JobClient.runJob(job);
+    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); }
+  }
+
+
+  /** Run the fetcher. */
+  public static void main(String[] args) throws Exception {
+
+    String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]";
+
+    if (args.length < 1) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+      
+    Path segment = new Path(args[0]);
+
+    Configuration conf = NutchConfiguration.create();
+
+    int threads = conf.getInt("fetcher.threads.fetch", 10);
+    boolean parsing = true;
+
+    for (int i = 1; i < args.length; i++) {       // parse command line
+      if (args[i].equals("-threads")) {           // found -threads option
+        threads =  Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-noParsing")) parsing = false;
+    }
+
+    conf.setInt("fetcher.threads.fetch", threads);
+    if (!parsing) {
+      conf.setBoolean("fetcher.parse", parsing);
+    }
+    Fetcher2 fetcher = new Fetcher2(conf);          // make a Fetcher
+    
+    fetcher.fetch(segment, threads, parsing);              // run the Fetcher
+
+  }
+}
Index: src/java/org/apache/nutch/protocol/EmptyRobotRules.java
===================================================================
--- src/java/org/apache/nutch/protocol/EmptyRobotRules.java	(revision 0)
+++ src/java/org/apache/nutch/protocol/EmptyRobotRules.java	(revision 0)
@@ -0,0 +1,26 @@
+/*
+ * Created on Aug 4, 2006
+ * Author: Andrzej Bialecki &lt;ab@getopt.org&gt;
+ *
+ */
+package org.apache.nutch.protocol;
+
+import java.net.URL;
+
+public class EmptyRobotRules implements RobotRules {
+  
+  public static final RobotRules RULES = new EmptyRobotRules();
+
+  public long getCrawlDelay() {
+    return -1;
+  }
+
+  public long getExpireTime() {
+    return -1;
+  }
+
+  public boolean isAllowed(URL url) {
+    return true;
+  }
+
+}
Index: src/java/org/apache/nutch/protocol/RobotRules.java
===================================================================
--- src/java/org/apache/nutch/protocol/RobotRules.java	(revision 0)
+++ src/java/org/apache/nutch/protocol/RobotRules.java	(revision 0)
@@ -0,0 +1,29 @@
+
+package org.apache.nutch.protocol;
+
+import java.net.URL;
+
+
+/**
+ * This class holds the rules which were parsed from a robots.txt file, and can
+ * test paths against those rules.
+ */
+public interface RobotRules {
+  /**
+   * Get expire time
+   */
+  public long getExpireTime();
+
+  /**
+   * Get Crawl-Delay, in milliseconds. This returns -1 if not set.
+   */
+  public long getCrawlDelay();
+
+  /**
+   * Returns <code>false</code> if the <code>robots.txt</code> file
+   * prohibits us from accessing the given <code>url</code>, or
+   * <code>true</code> otherwise.
+   */
+  public boolean isAllowed(URL url);
+  
+}
Index: src/java/org/apache/nutch/protocol/Protocol.java
===================================================================
--- src/java/org/apache/nutch/protocol/Protocol.java	(revision 441424)
+++ src/java/org/apache/nutch/protocol/Protocol.java	(working copy)
@@ -30,6 +30,14 @@
   /** The name of the extension point. */
   public final static String X_POINT_ID = Protocol.class.getName();
 
+  /**
+   * Retrieve robot rules applicable for this url.
+   * @param url url to check
+   * @param datum page datum
+   * @return robot rules (specific for this url or default), never null
+   */
+  RobotRules getRobotRules(UTF8 url, CrawlDatum datum);
+  
   /** Returns the {@link Content} for a fetchlist entry.
    */
   ProtocolOutput getProtocolOutput(UTF8 url, CrawlDatum datum);
Index: src/java/org/apache/nutch/protocol/ProtocolStatus.java
===================================================================
--- src/java/org/apache/nutch/protocol/ProtocolStatus.java	(revision 441424)
+++ src/java/org/apache/nutch/protocol/ProtocolStatus.java	(working copy)
@@ -60,6 +60,10 @@
   public static final int NOTFETCHING          = 20;
   /** Unchanged since the last fetch. */
   public static final int NOTMODIFIED          = 21;
+  /** Request was refused by protocol plugins, because it would block.
+   * The expected number of milliseconds to wait before retry may be provided
+   * in args. */
+  public static final int WOULDBLOCK          = 22;
   
   // Useful static instances for status codes that don't usually require any
   // additional arguments.
@@ -93,6 +97,7 @@
     codeToName.put(new Integer(REDIR_EXCEEDED), "redir_exceeded");
     codeToName.put(new Integer(NOTFETCHING), "notfetching");
     codeToName.put(new Integer(NOTMODIFIED), "notmodified");
+    codeToName.put(new Integer(WOULDBLOCK), "wouldblock");
   }
   
   public ProtocolStatus() {
Index: src/plugin/protocol-file/src/java/org/apache/nutch/protocol/file/File.java
===================================================================
--- src/plugin/protocol-file/src/java/org/apache/nutch/protocol/file/File.java	(revision 441424)
+++ src/plugin/protocol-file/src/java/org/apache/nutch/protocol/file/File.java	(working copy)
@@ -28,9 +28,11 @@
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.EmptyRobotRules;
 import org.apache.nutch.protocol.Protocol;
 import org.apache.nutch.protocol.ProtocolOutput;
 import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.protocol.RobotRules;
 
 import java.net.URL;
 
@@ -163,4 +165,8 @@
   public Configuration getConf() {
     return this.conf;
   }
+
+  public RobotRules getRobotRules(UTF8 url, CrawlDatum datum) {
+    return EmptyRobotRules.RULES;
+  }
 }
Index: src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/HttpBase.java
===================================================================
--- src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/HttpBase.java	(revision 441424)
+++ src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/HttpBase.java	(working copy)
@@ -35,6 +35,7 @@
 import org.apache.nutch.protocol.ProtocolException;
 import org.apache.nutch.protocol.ProtocolOutput;
 import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.protocol.RobotRules;
 import org.apache.nutch.util.GZIPUtils;
 import org.apache.nutch.util.LogUtil;
 
@@ -70,21 +71,6 @@
   /** The length limit for downloaded content, in bytes. */
   protected int maxContent = 64 * 1024; 
 
-  /** The number of times a thread will delay when trying to fetch a page. */
-  protected int maxDelays = 3;
-
-  /**
-   * The maximum number of threads that should be allowed
-   * to access a host at one time.
-   */
-  protected int maxThreadsPerHost = 1; 
-
-  /**
-   * The number of seconds the fetcher will delay between
-   * successive requests to the same server.
-   */
-  protected long serverDelay = 1000;
-
   /** The Nutch 'User-Agent' request header */
   protected String userAgent = getAgentString(
                         "NutchCVS", null, "Nutch",
@@ -92,25 +78,6 @@
                         "nutch-agent@lucene.apache.org");
 
     
-  /**
-   * Maps from host to a Long naming the time it should be unblocked.
-   * The Long is zero while the host is in use, then set to now+wait when
-   * a request finishes.  This way only one thread at a time accesses a
-   * host.
-   */
-  private static HashMap BLOCKED_ADDR_TO_TIME = new HashMap();
-  
-  /**
-   * Maps a host to the number of threads accessing that host.
-   */
-  private static HashMap THREADS_PER_HOST_COUNT = new HashMap();
-  
-  /**
-   * Queue of blocked hosts.  This contains all of the non-zero entries
-   * from BLOCKED_ADDR_TO_TIME, ordered by increasing time.
-   */
-  private static LinkedList BLOCKED_ADDR_QUEUE = new LinkedList();
-  
   /** The default logger */
   private final static Log LOGGER = LogFactory.getLog(HttpBase.class);
 
@@ -120,9 +87,6 @@
   /** The nutch configuration */
   private Configuration conf = null;
   
-  /** Do we block by IP addresses or by hostnames? */
-  private boolean byIP = true;
- 
   /** Do we use HTTP/1.1? */
   protected boolean useHttp11 = false;
 
@@ -147,13 +111,8 @@
         this.useProxy = (proxyHost != null && proxyHost.length() > 0);
         this.timeout = conf.getInt("http.timeout", 10000);
         this.maxContent = conf.getInt("http.content.limit", 64 * 1024);
-        this.maxDelays = conf.getInt("http.max.delays", 3);
-        this.maxThreadsPerHost = conf.getInt("fetcher.threads.per.host", 1);
         this.userAgent = getAgentString(conf.get("http.agent.name"), conf.get("http.agent.version"), conf
                 .get("http.agent.description"), conf.get("http.agent.url"), conf.get("http.agent.email"));
-        this.serverDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
-        // backward-compatible default setting
-        this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
         this.useHttp11 = conf.getBoolean("http.http11", false);
         this.robots.setConf(conf);
         logConf();
@@ -171,28 +130,8 @@
     String urlString = url.toString();
     try {
       URL u = new URL(urlString);
-      
-      try {
-        if (!robots.isAllowed(this, u)) {
-          return new ProtocolOutput(null, new ProtocolStatus(ProtocolStatus.ROBOTS_DENIED, url));
-        }
-      } catch (Throwable e) {
-        // XXX Maybe bogus: assume this is allowed.
-        if (logger.isTraceEnabled()) {
-          logger.trace("Exception checking robot rules for " + url + ": " + e);
-        }
-      }
-      
-      long crawlDelay = robots.getCrawlDelay(this, u);
-      long delay = crawlDelay > 0 ? crawlDelay : serverDelay;
-      String host = blockAddr(u, delay);
-      Response response;
-      try {
-        response = getResponse(u, datum, false); // make a request
-      } finally {
-        unblockAddr(host, delay);
-      }
-      
+
+      Response response = getResponse(u, datum, false); // make a request
       int code = response.getCode();
       byte[] content = response.getContent();
       Content c = new Content(u.toString(), u.toString(),
@@ -280,18 +219,6 @@
     return maxContent;
   }
 
-  public int getMaxDelays() {
-    return maxDelays;
-  }
-
-  public int getMaxThreadsPerHost() {
-    return maxThreadsPerHost;
-  }
-
-  public long getServerDelay() {
-    return serverDelay;
-  }
-
   public String getUserAgent() {
     return userAgent;
   }
@@ -300,94 +227,6 @@
     return useHttp11;
   }
   
-  private String blockAddr(URL url, long crawlDelay) throws ProtocolException {
-    
-    String host;
-    if (byIP) {
-      try {
-        InetAddress addr = InetAddress.getByName(url.getHost());
-        host = addr.getHostAddress();
-      } catch (UnknownHostException e) {
-        // unable to resolve it, so don't fall back to host name
-        throw new HttpException(e);
-      }
-    } else {
-      host = url.getHost();
-      if (host == null)
-        throw new HttpException("Unknown host for url: " + url);
-    }
-    host = host.toLowerCase();
-    
-    int delays = 0;
-    while (true) {
-      cleanExpiredServerBlocks();                 // free held addresses
-      
-      Long time;
-      synchronized (BLOCKED_ADDR_TO_TIME) {
-        time = (Long) BLOCKED_ADDR_TO_TIME.get(host);
-        if (time == null) {                       // address is free
-          
-          // get # of threads already accessing this addr
-          Integer counter = (Integer)THREADS_PER_HOST_COUNT.get(host);
-          int count = (counter == null) ? 0 : counter.intValue();
-          
-          count++;                              // increment & store
-          THREADS_PER_HOST_COUNT.put(host, new Integer(count));
-          
-          if (count >= maxThreadsPerHost) {
-            BLOCKED_ADDR_TO_TIME.put(host, new Long(0)); // block it
-          }
-          return host;
-        }
-      }
-      
-      if (delays == maxDelays)
-        throw new HttpException("Exceeded http.max.delays: retry later.");
-      
-      long done = time.longValue();
-      long now = System.currentTimeMillis();
-      long sleep = 0;
-      if (done == 0) {                            // address is still in use
-        sleep = crawlDelay;                      // wait at least delay
-        
-      } else if (now < done) {                    // address is on hold
-        sleep = done - now;                       // wait until its free
-      }
-      
-      try {
-        Thread.sleep(sleep);
-      } catch (InterruptedException e) {}
-      delays++;
-    }
-  }
-  
-  private void unblockAddr(String host, long crawlDelay) {
-    synchronized (BLOCKED_ADDR_TO_TIME) {
-      int addrCount = ((Integer)THREADS_PER_HOST_COUNT.get(host)).intValue();
-      if (addrCount == 1) {
-        THREADS_PER_HOST_COUNT.remove(host);
-        BLOCKED_ADDR_QUEUE.addFirst(host);
-        BLOCKED_ADDR_TO_TIME.put
-                (host, new Long(System.currentTimeMillis() + crawlDelay));
-      } else {
-        THREADS_PER_HOST_COUNT.put(host, new Integer(addrCount - 1));
-      }
-    }
-  }
-  
-  private static void cleanExpiredServerBlocks() {
-    synchronized (BLOCKED_ADDR_TO_TIME) {
-      while (!BLOCKED_ADDR_QUEUE.isEmpty()) {
-        String host = (String) BLOCKED_ADDR_QUEUE.getLast();
-        long time = ((Long) BLOCKED_ADDR_TO_TIME.get(host)).longValue();
-        if (time <= System.currentTimeMillis()) {
-          BLOCKED_ADDR_TO_TIME.remove(host);
-          BLOCKED_ADDR_QUEUE.removeLast();
-        }
-      }
-    }
-  }
-  
   private static String getAgentString(String agentName,
                                        String agentVersion,
                                        String agentDesc,
@@ -440,8 +279,6 @@
       logger.info("http.timeout = " + timeout);
       logger.info("http.content.limit = " + maxContent);
       logger.info("http.agent = " + userAgent);
-      logger.info("fetcher.server.delay = " + serverDelay);
-      logger.info("http.max.delays = " + maxDelays);
     }
   }
   
@@ -510,4 +347,8 @@
                                           boolean followRedirects)
     throws ProtocolException, IOException;
 
+  public RobotRules getRobotRules(UTF8 url, CrawlDatum datum) {
+    return this.robots.getRobotRulesSet(this, url);
+  }
+
 }
Index: src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/RobotRulesParser.java
===================================================================
--- src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/RobotRulesParser.java	(revision 441424)
+++ src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/RobotRulesParser.java	(working copy)
@@ -33,9 +33,11 @@
 // Nutch imports
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.UTF8;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.net.protocols.Response;
 import org.apache.nutch.protocol.ProtocolException;
+import org.apache.nutch.protocol.RobotRules;
 
 
 /**
@@ -69,7 +71,7 @@
    * This class holds the rules which were parsed from a robots.txt
    * file, and can test paths against those rules.
    */
-  public static class RobotRuleSet {
+  public static class RobotRuleSet implements RobotRules {
     ArrayList tmpEntries = new ArrayList();
     RobotsEntry[] entries = null;
     long expireTime;
@@ -143,10 +145,18 @@
     
     /** 
      *  Returns <code>false</code> if the <code>robots.txt</code> file
-     *  prohibits us from accessing the given <code>path</code>, or
+     *  prohibits us from accessing the given <code>url</code>, or
      *  <code>true</code> otherwise.
      */ 
-    public boolean isAllowed(String path) {
+    public boolean isAllowed(URL url) {
+      String path = url.getPath();                  // check rules
+      if ((path == null) || "".equals(path)) {
+        path= "/";
+      }
+      return isAllowed(path);
+    }
+    
+    boolean isAllowed(String path) {
       try {
         path= URLDecoder.decode(path, CHARACTER_ENCODING);
       } catch (Exception e) {
@@ -414,7 +424,17 @@
     return rules;
   }
   
-  private RobotRuleSet getRobotRulesSet(HttpBase http, URL url) {
+  public RobotRuleSet getRobotRulesSet(HttpBase http, UTF8 url) {
+    URL u = null;
+    try {
+      u = new URL(url.toString());
+    } catch (Exception e) {
+      return EMPTY_RULES;
+    }
+    return getRobotRulesSet(http, u);
+  }
+  
+  public RobotRuleSet getRobotRulesSet(HttpBase http, URL url) {
 
     String host = url.getHost().toLowerCase(); // normalize to lower case
 
@@ -446,12 +466,8 @@
 
   public boolean isAllowed(HttpBase http, URL url)
       throws ProtocolException, IOException {
-    String path = url.getPath();                  // check rules
-    if ((path == null) || "".equals(path)) {
-      path= "/";
-    }
 
-    return getRobotRulesSet(http, url).isAllowed(path);
+    return getRobotRulesSet(http, url).isAllowed(url);
   }
   
   public long getCrawlDelay(HttpBase http, URL url)
Index: src/plugin/protocol-ftp/src/java/org/apache/nutch/protocol/ftp/Ftp.java
===================================================================
--- src/plugin/protocol-ftp/src/java/org/apache/nutch/protocol/ftp/Ftp.java	(revision 441424)
+++ src/plugin/protocol-ftp/src/java/org/apache/nutch/protocol/ftp/Ftp.java	(working copy)
@@ -29,9 +29,11 @@
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.EmptyRobotRules;
 import org.apache.nutch.protocol.Protocol;
 import org.apache.nutch.protocol.ProtocolOutput;
 import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.protocol.RobotRules;
 
 import java.net.URL;
 
@@ -237,4 +239,8 @@
     return this.conf;
   }
 
+  public RobotRules getRobotRules(UTF8 url, CrawlDatum datum) {
+    return EmptyRobotRules.RULES;
+  }
+
 }
Index: src/plugin/protocol-httpclient/src/java/org/apache/nutch/protocol/httpclient/Http.java
===================================================================
--- src/plugin/protocol-httpclient/src/java/org/apache/nutch/protocol/httpclient/Http.java	(revision 441424)
+++ src/plugin/protocol-httpclient/src/java/org/apache/nutch/protocol/httpclient/Http.java	(working copy)
@@ -110,11 +110,6 @@
     params.setSendBufferSize(BUFFER_SIZE);
     params.setReceiveBufferSize(BUFFER_SIZE);
     params.setMaxTotalConnections(maxThreadsTotal);
-    if (maxThreadsTotal > maxThreadsPerHost) {
-      params.setDefaultMaxConnectionsPerHost(maxThreadsPerHost);
-    } else {
-      params.setDefaultMaxConnectionsPerHost(maxThreadsTotal);
-    }
 
     HostConfiguration hostConf = client.getHostConfiguration();
     ArrayList headers = new ArrayList();
