Index: build.xml
===================================================================
--- build.xml	(revision 980427)
+++ build.xml	(working copy)
@@ -218,6 +218,27 @@
 	<fail if="pmd.stop">FAILURE: PMD shows ${pmd.failures} rule violations. See ${pmd.report} for details.</fail>
   </target>
 
+  <target name="proxy" depends="job, compile-core-test">
+    <java classname="org.apache.nutch.tools.proxy.TestbedProxy" fork="true">
+      <classpath refid="test.classpath"/>
+      <arg value="-fake"/>
+<!--
+      <arg value="-delay"/>
+      <arg value="-200"/>
+-->
+      <jvmarg line="-Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl"/>
+    </java>
+  </target>
+
+  <target name="benchmark">
+    <java classname="org.apache.nutch.tools.Benchmark" fork="true">
+      <classpath refid="test.classpath"/>
+      <jvmarg line="-Xmx512m -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl"/>
+      <arg value="-seeds"/>
+      <arg value="10"/>
+    </java>
+  </target>
+
   <!-- ================================================================== -->
   <!-- Run unit tests                                                     --> 
   <!-- ================================================================== -->
Index: ivy/ivy.xml
===================================================================
--- ivy/ivy.xml	(revision 980427)
+++ ivy/ivy.xml	(working copy)
@@ -82,6 +82,8 @@
 
 		<dependency org="org.mortbay.jetty" name="jetty" rev="6.1.22"
 			conf="test->default" />
+		<dependency org="org.mortbay.jetty" name="jetty-client" rev="6.1.22"
+			conf="test->default" />
 		<dependency org="org.mortbay.jetty" name="jetty-util" rev="6.1.22"
 			conf="test->default" />
 
Index: src/test/org/apache/nutch/tools/proxy/SegmentHandler.java
===================================================================
--- src/test/org/apache/nutch/tools/proxy/SegmentHandler.java	(revision 0)
+++ src/test/org/apache/nutch/tools/proxy/SegmentHandler.java	(revision 0)
@@ -0,0 +1,235 @@
+package org.apache.nutch.tools.proxy;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.mortbay.jetty.Request;
+
+/**
+ * XXX should turn this into a plugin?
+ */
+public class SegmentHandler extends AbstractTestbedHandler {
+  private static final Log LOG = LogFactory.getLog(SegmentHandler.class);
+  private Segment seg;
+  
+  private static HashMap<Integer,Integer> protoCodes = new HashMap<Integer,Integer>();
+  
+  static {
+    protoCodes.put(ProtocolStatus.ACCESS_DENIED, HttpServletResponse.SC_UNAUTHORIZED);
+    protoCodes.put(ProtocolStatus.BLOCKED, HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+    protoCodes.put(ProtocolStatus.EXCEPTION, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+    protoCodes.put(ProtocolStatus.FAILED, HttpServletResponse.SC_BAD_REQUEST);
+    protoCodes.put(ProtocolStatus.GONE, HttpServletResponse.SC_GONE);
+    protoCodes.put(ProtocolStatus.MOVED, HttpServletResponse.SC_MOVED_PERMANENTLY);
+    protoCodes.put(ProtocolStatus.NOTFETCHING, HttpServletResponse.SC_BAD_REQUEST);
+    protoCodes.put(ProtocolStatus.NOTFOUND, HttpServletResponse.SC_NOT_FOUND);
+    protoCodes.put(ProtocolStatus.NOTMODIFIED, HttpServletResponse.SC_NOT_MODIFIED);
+    protoCodes.put(ProtocolStatus.PROTO_NOT_FOUND, HttpServletResponse.SC_BAD_REQUEST);
+    protoCodes.put(ProtocolStatus.REDIR_EXCEEDED, HttpServletResponse.SC_BAD_REQUEST);
+    protoCodes.put(ProtocolStatus.RETRY, HttpServletResponse.SC_BAD_REQUEST);
+    protoCodes.put(ProtocolStatus.ROBOTS_DENIED, HttpServletResponse.SC_FORBIDDEN);
+    protoCodes.put(ProtocolStatus.SUCCESS, HttpServletResponse.SC_OK);
+    protoCodes.put(ProtocolStatus.TEMP_MOVED, HttpServletResponse.SC_MOVED_TEMPORARILY);
+    protoCodes.put(ProtocolStatus.WOULDBLOCK, HttpServletResponse.SC_BAD_REQUEST);
+  }
+  
+  private static class SegmentPathFilter implements PathFilter {
+    public static final SegmentPathFilter INSTANCE = new SegmentPathFilter();
+    
+    @Override
+    public boolean accept(Path p) {
+      return p.getName().startsWith("part-");
+    }
+    
+  }
+  
+  private static class Segment implements Closeable {
+    
+    private static final Partitioner PARTITIONER = new HashPartitioner();
+
+    private FileSystem fs;
+    private Path segmentDir;
+
+    private Object cLock = new Object();
+    private Object crawlLock = new Object();
+    private MapFile.Reader[] content;
+    private MapFile.Reader[] parseText;
+    private MapFile.Reader[] parseData;
+    private MapFile.Reader[] crawl;
+    private Configuration conf;
+
+    public Segment(FileSystem fs, Path segmentDir, Configuration conf) throws IOException {
+      this.fs = fs;
+      this.segmentDir = segmentDir;
+      this.conf = conf;
+    }
+
+    public CrawlDatum getCrawlDatum(Text url) throws IOException {
+      synchronized (crawlLock) {
+        if (crawl == null)
+          crawl = getReaders(CrawlDatum.FETCH_DIR_NAME);
+      }
+      return (CrawlDatum)getEntry(crawl, url, new CrawlDatum());
+    }
+    
+    public Content getContent(Text url) throws IOException {
+      synchronized (cLock) {
+        if (content == null)
+          content = getReaders(Content.DIR_NAME);
+      }
+      return (Content)getEntry(content, url, new Content());
+    }
+
+    /** Open the output generated by this format. */
+    private MapFile.Reader[] getReaders(String subDir) throws IOException {
+      Path dir = new Path(segmentDir, subDir);
+      FileSystem fs = dir.getFileSystem(conf);
+      Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, SegmentPathFilter.INSTANCE));
+
+      // sort names, so that hash partitioning works
+      Arrays.sort(names);
+      
+      MapFile.Reader[] parts = new MapFile.Reader[names.length];
+      for (int i = 0; i < names.length; i++) {
+        parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
+      }
+      return parts;
+    }
+    
+    private Writable getEntry(MapFile.Reader[] readers, Text url,
+                              Writable entry) throws IOException {
+      return MapFileOutputFormat.getEntry(readers, PARTITIONER, url, entry);
+    }
+
+    public void close() throws IOException {
+      if (content != null) { closeReaders(content); }
+      if (parseText != null) { closeReaders(parseText); }
+      if (parseData != null) { closeReaders(parseData); }
+      if (crawl != null) { closeReaders(crawl); }
+    }
+
+    private void closeReaders(MapFile.Reader[] readers) throws IOException {
+      for (int i = 0; i < readers.length; i++) {
+        readers[i].close();
+      }
+    }
+
+  }
+  
+  public SegmentHandler(Configuration conf, Path name) throws Exception {
+    seg = new Segment(FileSystem.get(conf), name, conf);
+  }
+
+  @Override
+  public void handle(Request req, HttpServletResponse res, String target,
+          int dispatch) throws IOException, ServletException {
+    try {
+      String uri = req.getUri().toString();
+      LOG.info("URI: " + uri);
+      addMyHeader(res, "URI", uri);
+      Text url = new Text(uri.toString());
+      CrawlDatum cd = seg.getCrawlDatum(url);
+      if (cd != null) {
+        addMyHeader(res, "Res", "found");
+        LOG.info("-got " + cd.toString());
+        ProtocolStatus ps = (ProtocolStatus)cd.getMetaData().get(Nutch.WRITABLE_PROTO_STATUS_KEY);
+        if (ps != null) {
+          Integer TrCode = protoCodes.get(ps.getCode());
+          if (TrCode != null) {
+            res.setStatus(TrCode.intValue());            
+          } else {
+            res.setStatus(HttpServletResponse.SC_OK);
+          }
+          addMyHeader(res, "ProtocolStatus", ps.toString());
+        } else {
+          res.setStatus(HttpServletResponse.SC_OK);          
+        }
+        Content c = seg.getContent(url);
+        if (c == null) { // missing content
+          req.setHandled(true);
+          res.addHeader("X-Handled-By", getClass().getSimpleName());
+          return;
+        }
+        byte[] data = c.getContent();
+        LOG.debug("-data len=" + data.length);
+        Metadata meta = c.getMetadata();
+        String[] names = meta.names();
+        LOG.debug("- " + names.length + " meta");
+        for (int i = 0; i < names.length; i++) {
+          boolean my = true;
+          char ch = names[i].charAt(0);
+          if (Character.isLetter(ch) && Character.isUpperCase(ch)) {
+            // pretty good chance it's a standard header
+            my = false;
+          }
+          String[] values = meta.getValues(names[i]);
+          for (int k = 0; k < values.length; k++) {
+            if (my) {
+              addMyHeader(res, names[i], values[k]);
+            } else {
+              res.addHeader(names[i], values[k]);
+            }
+          }
+        }
+        req.setHandled(true);
+        res.addHeader("X-Handled-By", getClass().getSimpleName());
+        res.setContentType(meta.get(Metadata.CONTENT_TYPE));
+        res.setContentLength(data.length);
+        OutputStream os = res.getOutputStream();
+        os.write(data, 0, data.length);
+        res.flushBuffer();
+      } else {
+        addMyHeader(res, "Res", "not found");
+        LOG.info(" -not found " + url);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOG.warn(StringUtils.stringifyException(e));
+      addMyHeader(res, "Res", "Exception: " + StringUtils.stringifyException(e));
+    }
+  }
+
+}
Index: src/test/org/apache/nutch/tools/proxy/FakeHandler.java
===================================================================
--- src/test/org/apache/nutch/tools/proxy/FakeHandler.java	(revision 0)
+++ src/test/org/apache/nutch/tools/proxy/FakeHandler.java	(revision 0)
@@ -0,0 +1,97 @@
+package org.apache.nutch.tools.proxy;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Random;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+
+import org.mortbay.jetty.HttpURI;
+import org.mortbay.jetty.Request;
+
+public class FakeHandler extends AbstractTestbedHandler {
+  Random r = new Random(1234567890L); // predictable
+
+  private static final String testA = 
+    "<html><body><h1>Internet Weather Forecast Accuracy</h1>\n" + 
+    "<p>Weather forecasting is a secure and popular online presence, which is understandable. The weather affects most everyone's life, and the Internet can provide information on just about any location at any hour of the day or night. But how accurate is this information? How much can we trust it? Perhaps it is just my skeptical nature (or maybe the seeming unpredictability of nature), but I've never put much weight into weather forecasts - especially those made more than three days in advance. That skepticism progressed to a new high in the Summer of 2004, but I have only now done the research necessary to test the accuracy of online weather forecasts. First the story, then the data.</p>" +
+    "<h2>An Internet Weather Forecast Gone Terribly Awry</h2>" +
+    "<p>It was the Summer of 2004 and my wife and I were gearing up for a trip with another couple to Schlitterbahn in New Braunfels - one of the (if not the) best waterparks ever created. As a matter of course when embarking on a 2.5-hour drive to spend the day in a swimsuit, and given the tendency of the area for natural disasters, we checked the weather. The temperatures looked ideal and, most importantly, the chance of rain was a nice round goose egg.</p>";
+  private static final String testB =
+    "<p>A couple of hours into our Schlitterbahn experience, we got on a bus to leave the 'old section' for the 'new section.' Along the way, clouds gathered and multiple claps of thunder sounded. 'So much for the 0% chance of rain,' I commented. By the time we got to our destination, lightning sightings had led to the slides and pools being evacuated and soon the rain began coming down in torrents - accompanied by voluminous lightning flashes. After at least a half an hour the downpour had subsided, but the lightning showed no sign of letting up, so we began heading back to our vehicles. A hundred yards into the parking lot, we passing a tree that had apparently been split in two during the storm (whether by lightning or wind, I'm not sure). Not but a few yards later, there was a distinct thud and the husband of the couple accompanying us cried out as a near racquetball sized hunk of ice rebounded off of his head and onto the concrete. Soon, similarly sized hail was falling all around us as everyone scampered for cover. Some cowered under overturned trashcans while others were more fortunate and made it indoors.</p>" +
+    "<p>The hail, rain and lightning eventually subsided, but the most alarming news was waiting on cell phone voicemail. A friend who lived in the area had called frantically, knowing we were at the park, as the local news was reporting multiple people had been by struck by lightning at Schlitterbahn during the storm.</p>" +
+    "<p>'So much for the 0% chance of rain,' I repeated.</p></body></html>";
+
+  @Override
+  public void handle(Request req, HttpServletResponse res, String target, 
+          int dispatch) throws IOException, ServletException {
+    HttpURI u = req.getUri();
+    String uri = u.toString();
+    //System.err.println("-faking " + uri.toString());
+    addMyHeader(res, "URI", uri);
+    // don't pass it down the chain
+    req.setHandled(true);
+    res.addHeader("X-Handled-By", getClass().getSimpleName());
+    if (uri.endsWith("/robots.txt")) {
+      return;
+    }
+    res.setContentType("text/html");
+    try {
+      OutputStream os = res.getOutputStream();
+      byte[] bytes = testA.getBytes("UTF-8");
+      os.write(bytes);
+      // record URI
+      String p = "<p>URI: " + uri + "</p>\r\n";
+      os.write(p.getBytes());
+      // fake some links
+      String base;
+      if (u.getPath().length() > 5) {
+        base = u.getPath().substring(0, uri.length() - 5);
+      } else {
+        base = u.getPath();
+      }
+      String prefix = u.getScheme() + "://" + u.getHost();
+      if (u.getPort() != 80 && u.getPort() != -1) base += ":" + u.getPort();
+      if (!base.startsWith("/")) prefix += "/";
+      prefix = prefix + base;
+      for (int i = 0; i < 10; i++) {
+        String link = "<p><a href='" + prefix;
+        if (!prefix.endsWith("/")) {
+          link += "/";
+        }
+        link += i + ".html'>outlink " + i + "</a></p>\r\n";
+        os.write(link.getBytes());
+      }
+      // fake a link to a random nonexistent host
+      int h = r.nextInt(1000000); // 1 mln hosts
+      String link = "<p><a href='http://www.fake-" + h + ".com/'>fake host " + h + "</a></p>\r\n";
+      os.write(link.getBytes());
+      // fake a link to the root URL
+      link = "<p><a href='" + u.getScheme() + "://" + u.getHost();
+      if (u.getPort() != 80 && u.getPort() != -1) link += ":" + u.getPort();
+      link += "/'>site " + u.getHost() + "</a></p>\r\n";
+      os.write(link.getBytes());
+      os.write(testB.getBytes());
+      res.flushBuffer();
+    } catch (IOException ioe) {
+    }    
+  }
+
+}
Index: src/test/org/apache/nutch/tools/proxy/LogDebugHandler.java
===================================================================
--- src/test/org/apache/nutch/tools/proxy/LogDebugHandler.java	(revision 0)
+++ src/test/org/apache/nutch/tools/proxy/LogDebugHandler.java	(revision 0)
@@ -0,0 +1,59 @@
+package org.apache.nutch.tools.proxy;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.mortbay.jetty.Request;
+
+public class LogDebugHandler extends AbstractTestbedHandler implements Filter {
+  private static final Log LOG = LogFactory.getLog(LogDebugHandler.class);
+
+  @Override
+  public void handle(Request req, HttpServletResponse res, String target,
+          int dispatch) throws IOException, ServletException {
+    LOG.info("-- " + req.getMethod() + " " + req.getUri().toString() + "\n" + req.getConnection().getRequestFields());
+  }
+
+  @Override
+  public void doFilter(ServletRequest req, ServletResponse res,
+          FilterChain chain) throws IOException, ServletException {
+    ((HttpServletResponse)res).addHeader("X-Handled-By", "AsyncProxyHandler");
+    ((HttpServletResponse)res).addHeader("X-TestbedHandlers", "AsyncProxyHandler");
+    try {
+      chain.doFilter(req, res);
+    } catch (Throwable e) {
+      ((HttpServletResponse)res).sendError(HttpServletResponse.SC_BAD_REQUEST, e.toString());
+    }
+  }
+
+  @Override
+  public void init(FilterConfig arg0) throws ServletException {
+    // TODO Auto-generated method stub
+    
+  }
+}
Index: src/test/org/apache/nutch/tools/proxy/NotFoundHandler.java
===================================================================
--- src/test/org/apache/nutch/tools/proxy/NotFoundHandler.java	(revision 0)
+++ src/test/org/apache/nutch/tools/proxy/NotFoundHandler.java	(revision 0)
@@ -0,0 +1,39 @@
+package org.apache.nutch.tools.proxy;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+
+import org.mortbay.jetty.Request;
+
+public class NotFoundHandler extends AbstractTestbedHandler {
+
+  @Override
+  public void handle(Request req, HttpServletResponse res, String target,
+          int dispatch) throws IOException, ServletException {
+    // don't pass it down the chain
+    req.setHandled(true);
+    res.addHeader("X-Handled-By", getClass().getSimpleName());
+    addMyHeader(res, "URI", req.getUri().toString());
+    res.sendError(HttpServletResponse.SC_NOT_FOUND, "Not found: " +
+            req.getUri().toString());
+  }
+
+}
Index: src/test/org/apache/nutch/tools/proxy/AbstractTestbedHandler.java
===================================================================
--- src/test/org/apache/nutch/tools/proxy/AbstractTestbedHandler.java	(revision 0)
+++ src/test/org/apache/nutch/tools/proxy/AbstractTestbedHandler.java	(revision 0)
@@ -0,0 +1,47 @@
+package org.apache.nutch.tools.proxy;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.mortbay.jetty.HttpConnection;
+import org.mortbay.jetty.Request;
+import org.mortbay.jetty.handler.AbstractHandler;
+
+public abstract class AbstractTestbedHandler extends AbstractHandler {
+  protected boolean debug = false;
+
+  @Override
+  public void handle(String target, HttpServletRequest req,
+          HttpServletResponse res, int dispatch) throws IOException,
+          ServletException {
+    Request base_request = (req instanceof Request) ? (Request)req : HttpConnection.getCurrentConnection().getRequest();
+    res.addHeader("X-TestbedHandlers", this.getClass().getSimpleName());
+    handle(base_request, res, target, dispatch);
+  }
+  
+  public abstract void handle(Request req, HttpServletResponse res, String target,
+          int dispatch) throws IOException, ServletException;
+  
+  public void addMyHeader(HttpServletResponse res, String name, String value) {
+    name = "X-" + this.getClass().getSimpleName() + "-" + name;
+    res.addHeader(name, value);
+  }
+}
Index: src/test/org/apache/nutch/tools/proxy/DelayHandler.java
===================================================================
--- src/test/org/apache/nutch/tools/proxy/DelayHandler.java	(revision 0)
+++ src/test/org/apache/nutch/tools/proxy/DelayHandler.java	(revision 0)
@@ -0,0 +1,55 @@
+package org.apache.nutch.tools.proxy;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Random;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+
+import org.mortbay.jetty.Request;
+
+public class DelayHandler extends AbstractTestbedHandler {
+  
+  public static final long DEFAULT_DELAY = 2000;
+  
+  private int delay;
+  private boolean random;
+  private Random r;
+  
+  public DelayHandler(int delay) {
+    if (delay < 0) {
+      delay = -delay;
+      random = true;
+      r = new Random(1234567890L); // repeatable random
+    }
+    this.delay = delay;
+  }
+
+  @Override
+  public void handle(Request req, HttpServletResponse res, String target,
+          int dispatch) throws IOException, ServletException {
+    try {
+      int del = random ? r.nextInt(delay) : delay;
+      Thread.sleep(del);
+      addMyHeader(res, "Delay", String.valueOf(del));
+    } catch (Exception e) {
+      
+    }
+  }
+}
Index: src/test/org/apache/nutch/tools/proxy/TestbedProxy.java
===================================================================
--- src/test/org/apache/nutch/tools/proxy/TestbedProxy.java	(revision 0)
+++ src/test/org/apache/nutch/tools/proxy/TestbedProxy.java	(revision 0)
@@ -0,0 +1,144 @@
+package org.apache.nutch.tools.proxy;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.mortbay.jetty.Handler;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.bio.SocketConnector;
+import org.mortbay.jetty.handler.HandlerList;
+import org.mortbay.jetty.servlet.ServletHandler;
+import org.mortbay.proxy.AsyncProxyServlet;
+
+public class TestbedProxy {
+  private static final Log LOG = LogFactory.getLog(TestbedProxy.class);
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length == 0) {
+      System.err.println("TestbedProxy [-seg <segment_name> | -segdir <segments>] [-port <nnn>] [-forward] [-fake] [-delay nnn] [-debug]");
+      System.err.println("-seg <segment_name>\tpath to a single segment (can be specified multiple times)");
+      System.err.println("-segdir <segments>\tpath to a parent directory of multiple segments (as above)");
+      System.err.println("-port <nnn>\trun the proxy on port <nnn> (special permissions may be needed for ports < 1024)");
+      System.err.println("-forward\tif specified, requests to all unknown urls will be passed to");
+      System.err.println("\t\toriginal servers. If false (default) unknown urls generate 404 Not Found.");
+      System.err.println("-delay\tdelay every response by nnn seconds. If delay is negative use a random value up to nnn");
+      System.err.println("-fake\tif specified, requests to all unknown urls will succeed with fake content");
+      System.exit(-1);
+    }
+    
+    Configuration conf = NutchConfiguration.create();
+    int port = conf.getInt("segment.proxy.port", 8181);
+    boolean forward = false;
+    boolean fake = false;
+    boolean delay = false;
+    boolean debug = false;
+    int delayVal = 0;
+    
+    HashSet<Path> segs = new HashSet<Path>();
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-segdir")) {
+        FileSystem fs = FileSystem.get(conf);
+        FileStatus[] fstats = fs.listStatus(new Path(args[++i]));
+        Path[] paths = HadoopFSUtil.getPaths(fstats);
+        segs.addAll(Arrays.asList(paths));
+      } else if (args[i].equals("-port")) {
+        port = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-forward")) {
+        forward = true;
+      } else if (args[i].equals("-delay")) {
+        delay = true;
+        delayVal = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-fake")) {
+        fake = true;
+      } else if (args[i].equals("-debug")) {
+        debug = true;
+      } else if (args[i].equals("-seg")) {
+        segs.add(new Path(args[++i]));
+      } else {
+        LOG.fatal("Unknown argument: " + args[i]);
+        System.exit(-1);
+      }
+    }
+    
+    // Create the server
+    Server server = new Server();
+    SocketConnector connector = new SocketConnector();
+    connector.setPort(port);
+    connector.setResolveNames(false);
+    server.addConnector(connector);
+    
+    // create a list of handlers
+    HandlerList list = new HandlerList();
+    server.addHandler(list);
+    
+    if (debug) {
+      LOG.info("* Added debug handler.");
+      list.addHandler(new LogDebugHandler());
+    }
+ 
+    if (delay) {
+      LOG.info("* Added delay handler: " + (delayVal < 0 ? "random delay up to " + (-delayVal) : "constant delay of " + delayVal));
+      list.addHandler(new DelayHandler(delayVal));
+    }
+    
+    // XXX alternatively, we can add the DispatchHandler as the first one,
+    // XXX to activate handler plugins and redirect requests to appropriate
+    // XXX handlers ... Here we always load these handlers
+
+    Iterator<Path> it = segs.iterator();
+    while (it.hasNext()) {
+      Path p = it.next();
+      try {
+        SegmentHandler segment = new SegmentHandler(conf, p);
+        list.addHandler(segment);
+        LOG.info("* Added segment handler for: " + p);
+      } catch (Exception e) {
+        LOG.warn("Skipping segment '" + p + "': " + StringUtils.stringifyException(e));
+      }
+    }
+    if (forward) {
+      LOG.info("* Adding forwarding proxy for all unknown urls ...");
+      ServletHandler servlets = new ServletHandler();
+      servlets.addServletWithMapping(AsyncProxyServlet.class, "/*");
+      servlets.addFilterWithMapping(LogDebugHandler.class, "/*", Handler.ALL);
+      list.addHandler(servlets);
+    }
+    if (fake) {
+      LOG.info("* Added fake handler for remaining URLs.");
+      list.addHandler(new FakeHandler());
+    }
+    list.addHandler(new NotFoundHandler());
+    // Start the http server
+    server.start();
+    server.join();
+  }
+}
Index: src/test/org/apache/nutch/tools/Benchmark.java
===================================================================
--- src/test/org/apache/nutch/tools/Benchmark.java	(revision 0)
+++ src/test/org/apache/nutch/tools/Benchmark.java	(revision 0)
@@ -0,0 +1,157 @@
+package org.apache.nutch.tools;
+
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.Crawl;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.crawl.Injector;
+import org.apache.nutch.crawl.LinkDb;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.indexer.solr.SolrDeleteDuplicates;
+import org.apache.nutch.indexer.solr.SolrIndexer;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+public class Benchmark extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(Benchmark.class);
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = NutchConfiguration.create();
+    int res = ToolRunner.run(conf, new Benchmark(), args);
+    System.exit(res);
+  }
+  
+  private static String getDate() {
+    return new SimpleDateFormat("yyyyMMddHHmmss").format
+      (new Date(System.currentTimeMillis()));
+  }
+ 
+  private void createSeeds(FileSystem fs, Path seedsDir, int count) throws Exception {
+    OutputStream os = fs.create(new Path(seedsDir, "seeds"));
+    for (int i = 0; i < count; i++) {
+      String url = "http://www.test-" + i + ".com/\r\n";
+      os.write(url.getBytes());
+    }
+    os.flush();
+    os.close();
+  }
+  
+  public int run(String[] args) throws Exception {
+    String plugins = "protocol-http|parse-tika|scoring-opic|urlfilter-regex|urlnormalizer-pass";
+    int seeds = 1;
+    int depth = 10;
+    int threads = 10;
+    boolean delete = true;
+    long topN = Long.MAX_VALUE;
+    
+    if (args.length == 0) {
+      System.err.println("Usage: Benchmark [-seeds NN] [-depth NN] [-threads NN] [-keep] [-plugins <regex>]");
+      System.err.println("\t-seeds NN\tcreate NN unique hosts in a seed list (default: 1)");
+      System.err.println("\t-depth NN\tperform NN crawl cycles (default: 10)");
+      System.err.println("\t-threads NN\tuse NN threads per Fetcher task (default: 10)");
+      System.err.println("\t-keep\tkeep segment data (default: delete after updatedb)");
+      System.err.println("\t-plugins <regex>\toverride 'plugin.includes'.");
+      System.err.println("\tNOTE: if not specified, this is reset to: " + plugins);
+      System.err.println("\tNOTE: if 'default' is specified then a value set in nutch-default/nutch-site is used.");
+      return -1;
+    }
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-seeds")) {
+        seeds = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-threads")) {
+        threads = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-depth")) {
+        depth = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-keep")) {
+        delete = false;
+      } else if (args[i].equals("-plugins")) {
+        plugins = args[++i];
+      } else {
+        LOG.fatal("Invalid argument: '" + args[i] + "'");
+        return -1;
+      }
+    }
+    Configuration conf = getConf();
+    conf.set("http.proxy.host", "localhost");
+    conf.setInt("http.proxy.port", 8181);
+    conf.set("http.agent.name", "test");
+    if (!plugins.equals("default")) {
+      conf.set("plugin.includes", plugins);
+    }
+    JobConf job = new NutchJob(getConf());    
+    FileSystem fs = FileSystem.get(job);
+    Path dir = new Path(getConf().get("hadoop.tmp.dir"),
+            "bench-" + System.currentTimeMillis());
+    fs.mkdirs(dir);
+    Path rootUrlDir = new Path(dir, "seed");
+    fs.mkdirs(rootUrlDir);
+    createSeeds(fs, rootUrlDir, seeds);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("crawl started in: " + dir);
+      LOG.info("rootUrlDir = " + rootUrlDir);
+      LOG.info("threads = " + threads);
+      LOG.info("depth = " + depth);      
+    }
+    
+    Path crawlDb = new Path(dir + "/crawldb");
+    Path linkDb = new Path(dir + "/linkdb");
+    Path segments = new Path(dir + "/segments");
+    long start = System.currentTimeMillis();
+    Injector injector = new Injector(getConf());
+    Generator generator = new Generator(getConf());
+    Fetcher fetcher = new Fetcher(getConf());
+    ParseSegment parseSegment = new ParseSegment(getConf());
+    CrawlDb crawlDbTool = new CrawlDb(getConf());
+    LinkDb linkDbTool = new LinkDb(getConf());
+      
+    // initialize crawlDb
+    injector.inject(crawlDb, rootUrlDir);
+    int i;
+    for (i = 0; i < depth; i++) {             // generate new segment
+      Path[] segs = generator.generate(crawlDb, segments, -1, topN, System
+          .currentTimeMillis());
+      if (segs == null) {
+        LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
+        break;
+      }
+      fetcher.fetch(segs[0], threads, org.apache.nutch.fetcher.Fetcher.isParsing(getConf()));  // fetch it
+      if (!Fetcher.isParsing(job)) {
+        parseSegment.parse(segs[0]);    // parse it, if needed
+      }
+      crawlDbTool.update(crawlDb, segs, true, true); // update crawldb
+      // delete data
+      if (delete) {
+        for (Path p : segs) {
+          fs.delete(p, true);
+        }
+      }
+    }
+    if (i > 0) {
+      linkDbTool.invert(linkDb, segments, true, true, false); // invert links
+    } else {
+      LOG.warn("No URLs to fetch - check your seed list and URL filters.");
+    }
+    if (LOG.isInfoEnabled()) { LOG.info("crawl finished: " + dir); }
+    long end = System.currentTimeMillis();
+    LOG.info("TOTAL TIME: " + (end - start)/1000 + " sec");
+    return 0;
+  }
+
+}

Property changes on: src/test/org/apache/nutch/tools/Benchmark.java
___________________________________________________________________
Added: svn:executable
   + *

