Index: src/java/org/apache/nutch/fetcher/Fetcher2.java
===================================================================
--- src/java/org/apache/nutch/fetcher/Fetcher2.java	(revision 0)
+++ src/java/org/apache/nutch/fetcher/Fetcher2.java	(revision 0)
@@ -0,0 +1,820 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapRunnable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseImpl;
+import org.apache.nutch.parse.ParseStatus;
+import org.apache.nutch.parse.ParseUtil;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.Protocol;
+import org.apache.nutch.protocol.ProtocolFactory;
+import org.apache.nutch.protocol.ProtocolOutput;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.protocol.RobotRules;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.LogUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.StringUtil;
+
+
+/** The fetcher. Most of the work is done by plugins. */
+public class Fetcher2 extends Configured implements MapRunnable { 
+
+  public static final Log LOG = LogFactory.getLog(Fetcher2.class);
+  
+  public static final String SIGNATURE_KEY = "nutch.content.digest";
+  public static final String SEGMENT_NAME_KEY = "nutch.segment.name";
+  public static final String SCORE_KEY = "nutch.crawl.score";
+
+  public static class InputFormat extends SequenceFileInputFormat {
+    /** Don't split inputs, to keep things polite. */
+    public FileSplit[] getSplits(FileSystem fs, JobConf job, int nSplits)
+      throws IOException {
+      Path[] files = listPaths(fs, job);
+      FileSplit[] splits = new FileSplit[files.length];
+      for (int i = 0; i < files.length; i++) {
+        splits[i] = new FileSplit(files[i], 0, fs.getLength(files[i]));
+      }
+      return splits;
+    }
+  }
+
+  private OutputCollector output;
+  private Reporter reporter;
+  
+  private String segmentName;
+  private AtomicInteger activeThreads = new AtomicInteger(0);
+  private AtomicInteger spinWaiting = new AtomicInteger(0);
+
+  private long start = System.currentTimeMillis(); // start time of fetcher run
+  private AtomicLong lastRequestStart = new AtomicLong(start);
+
+  private AtomicLong bytes = new AtomicLong(0);        // total bytes fetched
+  private AtomicInteger pages = new AtomicInteger(0);  // total pages fetched
+  private AtomicInteger errors = new AtomicInteger(0); // total pages errored
+
+  private boolean storingContent;
+  private boolean parsing;
+  FetchItemQueues fetchQueues;
+  QueueFeeder feeder;
+  
+  private static class FetchItem {    
+    String queueID;
+    Text url;
+    URL u;
+    CrawlDatum datum;
+    
+    public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
+      this.url = url;
+      this.u = u;
+      this.datum = datum;
+      this.queueID = queueID;
+    }
+    
+    public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) {
+      String queueID;
+      URL u = null;
+      try {
+        u = new URL(url.toString());
+      } catch (Exception e) {
+        LOG.warn("Cannot parse url: " + url, e);
+        return null;
+      }
+      String proto = u.getProtocol().toLowerCase();
+      String host;
+      if (byIP) {
+        try {
+          InetAddress addr = InetAddress.getByName(u.getHost());
+          host = addr.getHostAddress();
+        } catch (UnknownHostException e) {
+          // unable to resolve it, so don't fall back to host name
+          LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
+          return null;
+        }
+      } else {
+        host = u.getHost();
+        if (host == null) {
+          LOG.warn("Unknown host for url: " + url + ", skipping.");
+          return null;
+        }
+        host = host.toLowerCase();
+      }
+      queueID = proto + "://" + host;
+      return new FetchItem(url, u, datum, queueID);
+    }
+
+    public CrawlDatum getDatum() {
+      return datum;
+    }
+
+    public String getQueueID() {
+      return queueID;
+    }
+
+    public Text getUrl() {
+      return url;
+    }
+    
+    public URL getURL2() {
+      return u;
+    }
+  }
+  
+  private static class FetchItemQueue {
+    List queue = Collections.synchronizedList(new LinkedList());
+    Set  inProgress = Collections.synchronizedSet(new HashSet());
+    AtomicLong endTime = new AtomicLong();
+    long crawlDelay;
+    long minCrawlDelay;
+    int maxThreads;
+    Configuration conf;
+    
+    public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
+      this.conf = conf;
+      this.maxThreads = maxThreads;
+      this.crawlDelay = crawlDelay;
+      this.minCrawlDelay = minCrawlDelay;
+      // ready to start
+      this.endTime.set(System.currentTimeMillis() - crawlDelay);
+    }
+    
+    public int getQueueSize() {
+      return queue.size();
+    }
+    
+    public int getInProgressSize() {
+      return inProgress.size();
+    }
+    
+    public void finishFetchItem(FetchItem it) {
+      if (it != null) {
+        inProgress.remove(it);
+        endTime.set(System.currentTimeMillis());
+      }
+    }
+    
+    public void addFetchItem(FetchItem it) {
+      if (it == null) return;
+      queue.add(it);
+    }
+    
+    public void addInProgressFetchItem(FetchItem it) {
+      if (it == null) return;
+      inProgress.add(it);
+    }
+    
+    public FetchItem getFetchItem() {
+      if (inProgress.size() >= maxThreads) return null;
+      long now = System.currentTimeMillis();
+      long last = endTime.get() + (maxThreads > 1 ? crawlDelay : minCrawlDelay);
+      if (last > now) return null;
+      FetchItem it = null;
+      if (queue.size() == 0) return null;
+      try {
+        it = (FetchItem)queue.remove(0);
+        inProgress.add(it);
+      } catch (Exception e) {
+        
+      }
+      return it;
+    }
+    
+    public synchronized void dump() {
+      LOG.info("  maxThreads    = " + maxThreads);
+      LOG.info("  inProgress    = " + inProgress.size());
+      LOG.info("  crawlDelay    = " + crawlDelay);
+      LOG.info("  minCrawlDelay = " + minCrawlDelay);
+      LOG.info("  endTime       = " + endTime.get());
+      LOG.info("  now           = " + System.currentTimeMillis());
+      for (int i = 0; i < queue.size(); i++) {
+        FetchItem it = (FetchItem)queue.get(i);
+        LOG.info("  " + i + ". " + it.url);
+      }
+    }
+  }
+  
+  private static class FetchItemQueues {
+    public static final String DEFAULT_ID = "default";
+    Map queues = new HashMap();
+    AtomicInteger totalSize = new AtomicInteger(0);
+    int maxThreads;
+    boolean byIP;
+    long crawlDelay;
+    long minCrawlDelay;
+    Configuration conf;    
+    
+    public FetchItemQueues(Configuration conf) {
+      this.conf = conf;
+      this.maxThreads = conf.getInt("fetcher.threads.per.host", 1);
+      // backward-compatible default setting
+      this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
+      this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
+      this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
+    }
+    
+    public int getTotalSize() {
+      return totalSize.get();
+    }
+    
+    public int getQueueCount() {
+      return queues.size();
+    }
+    
+    public void addFetchItem(Text url, CrawlDatum datum) {
+      FetchItem it = FetchItem.create(url, datum, byIP);
+      if (it != null) addFetchItem(it);
+    }
+    
+    public void addFetchItem(FetchItem it) {
+      FetchItemQueue fiq = getFetchItemQueue(it.queueID);
+      fiq.addFetchItem(it);
+      totalSize.incrementAndGet();
+    }
+    
+    public void finishFetchItem(FetchItem it) {
+      FetchItemQueue fiq = (FetchItemQueue)queues.get(it.queueID);
+      if (fiq == null) {
+        LOG.warn("Attempting to finish item from unknown queue: " + it);
+        return;
+      }
+      fiq.finishFetchItem(it);
+    }
+    
+    public synchronized FetchItemQueue getFetchItemQueue(String id) {
+      FetchItemQueue fiq = (FetchItemQueue)queues.get(id);
+      if (fiq == null) {
+        // initialize queue
+        fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+        queues.put(id, fiq);
+      }
+      return fiq;
+    }
+    
+    public synchronized FetchItem getFetchItem() {
+      Iterator it = queues.values().iterator();
+      while (it.hasNext()) {
+        FetchItemQueue fiq = (FetchItemQueue)it.next();
+        FetchItem fit = fiq.getFetchItem();
+        if (fit != null) {
+          totalSize.decrementAndGet();
+          return fit;
+        }
+      }
+      return null;
+    }
+    
+    public void expireEmptyQueues() {
+      Iterator it = queues.values().iterator();
+    }
+    
+    public synchronized void dump() {
+      Iterator it = queues.keySet().iterator();
+      while (it.hasNext()) {
+        String id = (String)it.next();
+        FetchItemQueue fiq = (FetchItemQueue)queues.get(id);
+        if (fiq.getQueueSize() == 0) continue;
+        LOG.info("* queue: " + id);
+        fiq.dump();
+      }
+    }
+  }
+  
+  private static class QueueFeeder extends Thread {
+    private RecordReader reader;
+    private FetchItemQueues queues;
+    private int size;
+    
+    public QueueFeeder(RecordReader reader, FetchItemQueues queues, int size) {
+      this.reader = reader;
+      this.queues = queues;
+      this.size = size;
+      this.setDaemon(true);
+      this.setName("QueueFeeder");
+    }
+    
+    public void run() {
+      boolean hasMore = true;
+      int cnt = 0;
+      
+      while (hasMore) {
+        int feed = size - queues.getTotalSize();
+        if (feed <= 0) {
+          try {
+            Thread.sleep(1000);
+          } catch (Exception e) {};
+          continue;
+        } else {
+          LOG.info("-feeding " + feed + " input urls ...");
+          while (feed > 0 && hasMore) {
+            try {
+              Text url = new Text();
+              CrawlDatum datum = new CrawlDatum();
+              hasMore = reader.next(url, datum);
+              if (hasMore) {
+                queues.addFetchItem(url, datum);
+                cnt++;
+                feed--;
+              }
+            } catch (IOException e) {
+              LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
+              return;
+            }
+          }
+        }
+      }
+      LOG.info("QueueFeeder finished: total " + cnt + " records.");
+    }
+  }
+  
+  private class FetcherThread extends Thread {
+    private Configuration conf;
+    private URLFilters urlFilters;
+    private ScoringFilters scfilters;
+    private ParseUtil parseUtil;
+    private URLNormalizers normalizers;
+    private ProtocolFactory protocolFactory;
+    private boolean hasMoreInput = true;
+    private long maxCrawlDelay;
+    private boolean byIP;
+    private int maxRedirect;
+
+    public FetcherThread(Configuration conf) {
+      this.setDaemon(true);                       // don't hang JVM on exit
+      this.setName("FetcherThread");              // use an informative name
+      this.conf = conf;
+      this.urlFilters = new URLFilters(conf);
+      this.scfilters = new ScoringFilters(conf);
+      this.parseUtil = new ParseUtil(conf);
+      this.protocolFactory = new ProtocolFactory(conf);
+      this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
+      this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
+      // backward-compatible default setting
+      this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
+      this.maxRedirect = conf.getInt("http.redirect.max", 3);
+    }
+
+    public void run() {
+      activeThreads.incrementAndGet(); // count threads
+      
+      FetchItem fit = null;
+      try {
+        
+        while (true) {
+          fit = fetchQueues.getFetchItem();
+          if (fit == null) {
+            if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
+              // LOG.info(getName() + " spin-waiting ...");
+              // spin-wait.
+              spinWaiting.incrementAndGet();
+              try {
+                Thread.sleep(500);
+              } catch (Exception e) {}
+              spinWaiting.decrementAndGet();
+              continue;
+            } else {
+              // all done, finish this thread
+              return;
+            }
+          }
+          lastRequestStart.set(System.currentTimeMillis());
+          try {
+            if (LOG.isInfoEnabled()) { LOG.info("fetching " + fit.url); }
+
+            // fetch the page
+            boolean redirecting = false;
+            int redirectCount = 0;
+            do {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("redirectCount=" + redirectCount);
+              }
+              redirecting = false;
+              Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString());
+              RobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
+              if (!rules.isAllowed(fit.u)) {
+                // unblock
+                fetchQueues.finishFetchItem(fit);
+                if (LOG.isTraceEnabled()) {
+                  LOG.info("Denied by robots.txt: " + fit.url);
+                }
+                output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
+                continue;
+              }
+              if (rules.getCrawlDelay() > 0) {
+                if (rules.getCrawlDelay() > maxCrawlDelay) {
+                  // unblock
+                  fetchQueues.finishFetchItem(fit);
+                  LOG.info("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
+                  output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
+                  continue;
+                } else {
+                  FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
+                  fiq.crawlDelay = rules.getCrawlDelay();
+                }
+              }
+              ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);
+              ProtocolStatus status = output.getStatus();
+              Content content = output.getContent();
+              ParseStatus pstatus = null;
+              // unblock queue
+              fetchQueues.finishFetchItem(fit);
+
+              switch(status.getCode()) {
+                
+              case ProtocolStatus.WOULDBLOCK:
+                // unblock
+                fetchQueues.finishFetchItem(fit);
+                // retry ?
+                fetchQueues.addFetchItem(fit);
+                break;
+
+              case ProtocolStatus.SUCCESS:        // got a page
+                pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS);
+                updateStatus(content.getContent().length);
+                if (pstatus != null && pstatus.isSuccess() &&
+                        pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+                  String newUrl = pstatus.getMessage();
+                  newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
+                  newUrl = this.urlFilters.filter(newUrl);
+                  if (newUrl != null && !newUrl.equals(fit.url.toString())) {
+                    Text redirUrl = new Text(newUrl);
+                    redirecting = true;
+                    redirectCount++;
+                    fit = FetchItem.create(redirUrl, new CrawlDatum(), byIP);
+                    FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
+                    fiq.addInProgressFetchItem(fit);
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug(" - content redirect to " + redirUrl);
+                    }
+                  } else if (LOG.isDebugEnabled()) {
+                    LOG.debug(" - content redirect skipped: " +
+                             (newUrl != null ? "to same url" : "filtered"));
+                  }
+                }
+                break;
+
+              case ProtocolStatus.MOVED:         // redirect
+              case ProtocolStatus.TEMP_MOVED:
+                String newUrl = status.getMessage();
+                newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
+                newUrl = this.urlFilters.filter(newUrl);
+                if (newUrl != null && !newUrl.equals(fit.url.toString())) {
+                  Text redirUrl = new Text(newUrl);
+                  redirecting = true;
+                  redirectCount++;
+                  fit = FetchItem.create(redirUrl, new CrawlDatum(), byIP);
+                  FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
+                  fiq.addInProgressFetchItem(fit);
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug(" - protocol redirect to " + redirUrl);
+                  }
+                } else if (LOG.isDebugEnabled()) {
+                  LOG.debug(" - protocol redirect skipped: " +
+                           (newUrl != null ? "to same url" : "filtered"));
+                }
+                break;
+
+              case ProtocolStatus.EXCEPTION:
+                logError(fit.url, status.getMessage());
+              case ProtocolStatus.RETRY:          // retry
+                fit.datum.setRetriesSinceFetch(fit.datum.getRetriesSinceFetch()+1);
+                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
+                break;
+                
+              case ProtocolStatus.GONE:           // gone
+              case ProtocolStatus.NOTFOUND:
+              case ProtocolStatus.ACCESS_DENIED:
+              case ProtocolStatus.ROBOTS_DENIED:
+              case ProtocolStatus.NOTMODIFIED:
+                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
+                break;
+
+              default:
+                if (LOG.isWarnEnabled()) {
+                  LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+                }
+                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
+              }
+
+              if (redirecting && redirectCount >= maxRedirect) {
+                fetchQueues.finishFetchItem(fit);
+                if (LOG.isInfoEnabled()) {
+                  LOG.info(" - redirect count exceeded " + fit.url);
+                }
+                output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE);
+              }
+
+            } while (redirecting && (redirectCount < maxRedirect));
+            
+          } catch (Throwable t) {                 // unexpected exception
+            // unblock
+            fetchQueues.finishFetchItem(fit);
+            logError(fit.url, t.toString());
+            output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY);
+          }
+        }
+
+      } catch (Throwable e) {
+        if (LOG.isFatalEnabled()) {
+          e.printStackTrace(LogUtil.getFatalStream(LOG));
+          LOG.fatal("fetcher caught:"+e.toString());
+        }
+      } finally {
+        if (fit != null) fetchQueues.finishFetchItem(fit);
+        activeThreads.decrementAndGet(); // count threads
+        LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
+      }
+    }
+
+    private void logError(Text url, String message) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("fetch of " + url + " failed with: " + message);
+      }
+      errors.incrementAndGet();
+    }
+
+    private ParseStatus output(Text key, CrawlDatum datum,
+                        Content content, ProtocolStatus pstat, int status) {
+
+      datum.setStatus(status);
+      datum.setFetchTime(System.currentTimeMillis());
+      datum.getMetaData().put(new Text("_pstat_"), pstat);
+
+      if (content == null) {
+        String url = key.toString();
+        content = new Content(url, url, new byte[0], "", new Metadata(), this.conf);
+      }
+      Metadata metadata = content.getMetadata();
+      // add segment to metadata
+      metadata.set(SEGMENT_NAME_KEY, segmentName);
+      // add score to content metadata so that ParseSegment can pick it up.
+      try {
+        scfilters.passScoreBeforeParsing(key, datum, content);
+      } catch (Exception e) {
+        if (LOG.isWarnEnabled()) {
+          e.printStackTrace(LogUtil.getWarnStream(LOG));
+          LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+        }
+      }
+
+      Parse parse = null;
+      if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
+        ParseStatus parseStatus;
+        try {
+          parse = this.parseUtil.parse(content);
+          parseStatus = parse.getData().getStatus();
+        } catch (Exception e) {
+          parseStatus = new ParseStatus(e);
+        }
+        if (!parseStatus.isSuccess()) {
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Error parsing: " + key + ": " + parseStatus);
+          }
+          parse = parseStatus.getEmptyParse(getConf());
+        }
+        // Calculate page signature. For non-parsing fetchers this will
+        // be done in ParseSegment
+        byte[] signature = SignatureFactory.getSignature(getConf()).calculate(content, parse);
+        metadata.set(SIGNATURE_KEY, StringUtil.toHexString(signature));
+        datum.setSignature(signature);
+        // Ensure segment name and score are in parseData metadata
+        parse.getData().getContentMeta().set(SEGMENT_NAME_KEY, segmentName);
+        parse.getData().getContentMeta().set(SIGNATURE_KEY, StringUtil.toHexString(signature));
+        try {
+          scfilters.passScoreAfterParsing(key, content, parse);
+        } catch (Exception e) {
+          if (LOG.isWarnEnabled()) {
+            e.printStackTrace(LogUtil.getWarnStream(LOG));
+            LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+          }
+        }
+        
+      }
+
+      try {
+        output.collect
+          (key,
+           new FetcherOutput(datum,
+                             storingContent ? content : null,
+                             parse != null ? new ParseImpl(parse) : null));
+      } catch (IOException e) {
+        if (LOG.isFatalEnabled()) {
+          e.printStackTrace(LogUtil.getFatalStream(LOG));
+          LOG.fatal("fetcher caught:"+e.toString());
+        }
+      }
+      if (parse != null) return parse.getData().getStatus();
+      else return null;
+    }
+    
+  }
+
+  public Fetcher2() { super(null); }
+
+  public Fetcher2(Configuration conf) { super(conf); }
+
+  private void updateStatus(int bytesInPage) throws IOException {
+    pages.incrementAndGet();
+    bytes.addAndGet(bytesInPage);
+  }
+
+  
+  private void reportStatus() throws IOException {
+    String status;
+    long elapsed = (System.currentTimeMillis() - start)/1000;
+    status = 
+      pages+" pages, "+errors+" errors, "
+      + Math.round(((float)pages.get()*10)/elapsed)/10.0+" pages/s, "
+      + Math.round(((((float)bytes.get())*8)/1024)/elapsed)+" kb/s, ";
+    reporter.setStatus(status);
+  }
+
+  public void configure(JobConf job) {
+    setConf(job);
+
+    this.segmentName = job.get(SEGMENT_NAME_KEY);
+    this.storingContent = isStoringContent(job);
+    this.parsing = isParsing(job);
+
+//    if (job.getBoolean("fetcher.verbose", false)) {
+//      LOG.setLevel(Level.FINE);
+//    }
+  }
+
+  public void close() {}
+
+  public static boolean isParsing(Configuration conf) {
+    return conf.getBoolean("fetcher.parse", true);
+  }
+
+  public static boolean isStoringContent(Configuration conf) {
+    return conf.getBoolean("fetcher.store.content", true);
+  }
+
+  public void run(RecordReader input, OutputCollector output,
+                  Reporter reporter) throws IOException {
+
+    this.output = output;
+    this.reporter = reporter;
+    this.fetchQueues = new FetchItemQueues(getConf());
+
+    int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
+    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
+
+    feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
+    //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
+    feeder.start();
+
+    // set non-blocking & no-robots mode for HTTP protocol plugins.
+    getConf().setBoolean("http.plugin.check.blocking", false);
+    getConf().setBoolean("http.plugin.check.robots", false);
+    
+    for (int i = 0; i < threadCount; i++) {       // spawn threads
+      new FetcherThread(getConf()).start();
+    }
+
+    // select a timeout that avoids a task timeout
+    long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
+
+    do {                                          // wait for threads to exit
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {}
+
+      reportStatus();
+      LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
+          + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
+
+      if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
+        fetchQueues.dump();
+      }
+      // some requests seem to hang, despite all intentions
+      if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Aborting with "+activeThreads+" hung threads.");
+        }
+        return;
+      }
+
+    } while (activeThreads.get() > 0);
+    LOG.info("-activeThreads=" + activeThreads);
+    
+  }
+
+  public void fetch(Path segment, int threads, boolean parsing)
+    throws IOException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Fetcher: starting");
+      LOG.info("Fetcher: segment: " + segment);
+    }
+
+    JobConf job = new NutchJob(getConf());
+    job.setJobName("fetch " + segment);
+
+    job.setInt("fetcher.threads.fetch", threads);
+    job.set(SEGMENT_NAME_KEY, segment.getName());
+    job.setBoolean("fetcher.parse", parsing);
+
+    // for politeness, don't permit parallel execution of a single task
+    job.setSpeculativeExecution(false);
+
+    job.setInputPath(new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
+    job.setInputFormat(InputFormat.class);
+
+    job.setMapRunnerClass(Fetcher2.class);
+
+    job.setOutputPath(segment);
+    job.setOutputFormat(FetcherOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(FetcherOutput.class);
+
+    JobClient.runJob(job);
+    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); }
+  }
+
+
+  /** Run the fetcher. */
+  public static void main(String[] args) throws Exception {
+
+    String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]";
+
+    if (args.length < 1) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+      
+    Path segment = new Path(args[0]);
+
+    Configuration conf = NutchConfiguration.create();
+
+    int threads = conf.getInt("fetcher.threads.fetch", 10);
+    boolean parsing = true;
+
+    for (int i = 1; i < args.length; i++) {       // parse command line
+      if (args[i].equals("-threads")) {           // found -threads option
+        threads =  Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-noParsing")) parsing = false;
+    }
+
+    conf.setInt("fetcher.threads.fetch", threads);
+    if (!parsing) {
+      conf.setBoolean("fetcher.parse", parsing);
+    }
+    Fetcher2 fetcher = new Fetcher2(conf);          // make a Fetcher
+    
+    fetcher.fetch(segment, threads, parsing);              // run the Fetcher
+
+  }
+}

Property changes on: src/java/org/apache/nutch/fetcher/Fetcher2.java
___________________________________________________________________
Name: svn:eol-style
   + native

Index: src/java/org/apache/nutch/protocol/EmptyRobotRules.java
===================================================================
--- src/java/org/apache/nutch/protocol/EmptyRobotRules.java	(revision 0)
+++ src/java/org/apache/nutch/protocol/EmptyRobotRules.java	(revision 0)
@@ -0,0 +1,26 @@
+/*
+ * Created on Aug 4, 2006
+ * Author: Andrzej Bialecki &lt;ab@getopt.org&gt;
+ *
+ */
+package org.apache.nutch.protocol;
+
+import java.net.URL;
+
+public class EmptyRobotRules implements RobotRules {
+  
+  public static final RobotRules RULES = new EmptyRobotRules();
+
+  public long getCrawlDelay() {
+    return -1;
+  }
+
+  public long getExpireTime() {
+    return -1;
+  }
+
+  public boolean isAllowed(URL url) {
+    return true;
+  }
+
+}

Property changes on: src/java/org/apache/nutch/protocol/EmptyRobotRules.java
___________________________________________________________________
Name: svn:eol-style
   + native

Index: src/java/org/apache/nutch/protocol/RobotRules.java
===================================================================
--- src/java/org/apache/nutch/protocol/RobotRules.java	(revision 0)
+++ src/java/org/apache/nutch/protocol/RobotRules.java	(revision 0)
@@ -0,0 +1,29 @@
+
+package org.apache.nutch.protocol;
+
+import java.net.URL;
+
+
+/**
+ * This class holds the rules which were parsed from a robots.txt file, and can
+ * test paths against those rules.
+ */
+public interface RobotRules {
+  /**
+   * Get expire time
+   */
+  public long getExpireTime();
+
+  /**
+   * Get Crawl-Delay, in milliseconds. This returns -1 if not set.
+   */
+  public long getCrawlDelay();
+
+  /**
+   * Returns <code>false</code> if the <code>robots.txt</code> file
+   * prohibits us from accessing the given <code>url</code>, or
+   * <code>true</code> otherwise.
+   */
+  public boolean isAllowed(URL url);
+  
+}

Property changes on: src/java/org/apache/nutch/protocol/RobotRules.java
___________________________________________________________________
Name: svn:eol-style
   + native

Index: src/java/org/apache/nutch/protocol/Protocol.java
===================================================================
--- src/java/org/apache/nutch/protocol/Protocol.java	(revision 478878)
+++ src/java/org/apache/nutch/protocol/Protocol.java	(working copy)
@@ -34,4 +34,12 @@
   /** Returns the {@link Content} for a fetchlist entry.
    */
   ProtocolOutput getProtocolOutput(Text url, CrawlDatum datum);
+
+  /**
+   * Retrieve robot rules applicable for this url.
+   * @param url url to check
+   * @param datum page datum
+   * @return robot rules (specific for this url or default), never null
+   */
+  RobotRules getRobotRules(Text url, CrawlDatum datum);
 }
Index: src/plugin/protocol-file/src/java/org/apache/nutch/protocol/file/File.java
===================================================================
--- src/plugin/protocol-file/src/java/org/apache/nutch/protocol/file/File.java	(revision 478878)
+++ src/plugin/protocol-file/src/java/org/apache/nutch/protocol/file/File.java	(working copy)
@@ -29,9 +29,11 @@
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.EmptyRobotRules;
 import org.apache.nutch.protocol.Protocol;
 import org.apache.nutch.protocol.ProtocolOutput;
 import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.protocol.RobotRules;
 
 import java.net.URL;
 
@@ -164,4 +166,8 @@
   public Configuration getConf() {
     return this.conf;
   }
+
+  public RobotRules getRobotRules(Text url, CrawlDatum datum) {
+    return EmptyRobotRules.RULES;
+  }
 }
Index: src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/HttpBase.java
===================================================================
--- src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/HttpBase.java	(revision 478878)
+++ src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/HttpBase.java	(working copy)
@@ -36,6 +36,7 @@
 import org.apache.nutch.protocol.ProtocolException;
 import org.apache.nutch.protocol.ProtocolOutput;
 import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.protocol.RobotRules;
 import org.apache.nutch.util.GZIPUtils;
 import org.apache.nutch.util.LogUtil;
 
@@ -43,7 +44,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 
-
 /**
  * @author J&eacute;r&ocirc;me Charron
  */
@@ -130,6 +130,12 @@
   /** Skip page if Crawl-Delay longer than this value. */
   protected long maxCrawlDelay = -1L;
 
+  /** Plugin should handle host blocking internally. */
+  protected boolean checkBlocking = true;
+  
+  /** Plugin should handle robot rules checking internally. */
+  protected boolean checkRobots = true;
+
   /** Creates a new instance of HttpBase */
   public HttpBase() {
     this(null);
@@ -161,6 +167,8 @@
         this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
         this.useHttp11 = conf.getBoolean("http.useHttp11", false);
         this.robots.setConf(conf);
+        this.checkBlocking = conf.getBoolean("http.plugin.check.blocking", true);
+        this.checkRobots = conf.getBoolean("http.plugin.check.robots", true);
         logConf();
     }
 
@@ -177,36 +185,40 @@
     try {
       URL u = new URL(urlString);
       
-      try {
-        if (!robots.isAllowed(this, u)) {
-          return new ProtocolOutput(null, new ProtocolStatus(ProtocolStatus.ROBOTS_DENIED, url));
+      if (checkRobots) {
+        try {
+          if (!robots.isAllowed(this, u)) {
+            return new ProtocolOutput(null, new ProtocolStatus(ProtocolStatus.ROBOTS_DENIED, url));
+          }
+        } catch (Throwable e) {
+          // XXX Maybe bogus: assume this is allowed.
+          if (logger.isTraceEnabled()) {
+            logger.trace("Exception checking robot rules for " + url + ": " + e);
+          }
         }
-      } catch (Throwable e) {
-        // XXX Maybe bogus: assume this is allowed.
-        if (logger.isTraceEnabled()) {
-          logger.trace("Exception checking robot rules for " + url + ": " + e);
-        }
       }
       
       long crawlDelay = robots.getCrawlDelay(this, u);
       long delay = crawlDelay > 0 ? crawlDelay : serverDelay;
-      if (maxCrawlDelay >= 0 && delay > maxCrawlDelay) {
+      if (checkBlocking && maxCrawlDelay >= 0 && delay > maxCrawlDelay) {
         // skip this page, otherwise the thread would block for too long.
         LOGGER.info("Skipping: " + u + " exceeds fetcher.max.crawl.delay, max="
                 + (maxCrawlDelay / 1000) + ", Crawl-Delay=" + (delay / 1000));
         return new ProtocolOutput(null, ProtocolStatus.STATUS_WOULDBLOCK);
       }
-      String host;
-      try {
-        host = blockAddr(u, delay);
-      } catch (BlockedException be) {
-        return new ProtocolOutput(null, ProtocolStatus.STATUS_BLOCKED);
+      String host = null;
+      if (checkBlocking) {
+        try {
+          host = blockAddr(u, delay);
+        } catch (BlockedException be) {
+          return new ProtocolOutput(null, ProtocolStatus.STATUS_BLOCKED);
+        }
       }
       Response response;
       try {
         response = getResponse(u, datum, false); // make a request
       } finally {
-        unblockAddr(host, delay);
+        if (checkBlocking) unblockAddr(host, delay);
       }
       
       int code = response.getCode();
@@ -456,8 +468,12 @@
       logger.info("http.timeout = " + timeout);
       logger.info("http.content.limit = " + maxContent);
       logger.info("http.agent = " + userAgent);
-      logger.info("fetcher.server.delay = " + serverDelay);
-      logger.info("http.max.delays = " + maxDelays);
+      logger.info("http.plugin.check.blocking = " + checkBlocking);
+      logger.info("http.plugin.check.robots = " + checkRobots);
+      if (checkBlocking) {
+        logger.info("fetcher.server.delay = " + serverDelay);
+        logger.info("http.max.delays = " + maxDelays);
+      }
     }
   }
   
@@ -531,4 +547,8 @@
                                           boolean followRedirects)
     throws ProtocolException, IOException;
 
+  public RobotRules getRobotRules(Text url, CrawlDatum datum) {
+    return robots.getRobotRulesSet(this, url);
+  }
+
 }
Index: src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/RobotRulesParser.java
===================================================================
--- src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/RobotRulesParser.java	(revision 478878)
+++ src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/RobotRulesParser.java	(working copy)
@@ -34,9 +34,11 @@
 // Nutch imports
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Text;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.net.protocols.Response;
 import org.apache.nutch.protocol.ProtocolException;
+import org.apache.nutch.protocol.RobotRules;
 
 
 /**
@@ -70,7 +72,7 @@
    * This class holds the rules which were parsed from a robots.txt
    * file, and can test paths against those rules.
    */
-  public static class RobotRuleSet {
+  public static class RobotRuleSet implements RobotRules {
     ArrayList tmpEntries = new ArrayList();
     RobotsEntry[] entries = null;
     long expireTime;
@@ -142,6 +144,19 @@
       this.crawlDelay = crawlDelay;
     }
     
+    /**
+     *  Returns <code>false</code> if the <code>robots.txt</code> file
+     *  prohibits us from accessing the given <code>url</code>, or
+     *  <code>true</code> otherwise.
+     */
+    public boolean isAllowed(URL url) {
+      String path = url.getPath();                  // check rules
+      if ((path == null) || "".equals(path)) {
+        path= "/";
+      }
+      return isAllowed(path);
+    }
+    
     /** 
      *  Returns <code>false</code> if the <code>robots.txt</code> file
      *  prohibits us from accessing the given <code>path</code>, or
@@ -154,7 +169,7 @@
         // just ignore it- we can still try to match 
         // path prefixes
       }
-
+      
       if (entries == null) {
         entries= new RobotsEntry[tmpEntries.size()];
         entries= (RobotsEntry[]) 
@@ -415,6 +430,16 @@
     return rules;
   }
   
+  public RobotRuleSet getRobotRulesSet(HttpBase http, Text url) {
+    URL u = null;
+    try {
+      u = new URL(url.toString());
+    } catch (Exception e) {
+      return EMPTY_RULES;
+    }
+    return getRobotRulesSet(http, u);
+  }
+  
   private RobotRuleSet getRobotRulesSet(HttpBase http, URL url) {
 
     String host = url.getHost().toLowerCase(); // normalize to lower case
Index: src/plugin/protocol-ftp/src/java/org/apache/nutch/protocol/ftp/Ftp.java
===================================================================
--- src/plugin/protocol-ftp/src/java/org/apache/nutch/protocol/ftp/Ftp.java	(revision 478878)
+++ src/plugin/protocol-ftp/src/java/org/apache/nutch/protocol/ftp/Ftp.java	(working copy)
@@ -30,9 +30,11 @@
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.EmptyRobotRules;
 import org.apache.nutch.protocol.Protocol;
 import org.apache.nutch.protocol.ProtocolOutput;
 import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.protocol.RobotRules;
 
 import java.net.URL;
 
@@ -238,4 +240,8 @@
     return this.conf;
   }
 
+  public RobotRules getRobotRules(Text url, CrawlDatum datum) {
+    return EmptyRobotRules.RULES;
+  }
+
 }
