Index: src/java/org/apache/nutch/scoring/webgraph/LinkDatum.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/LinkDatum.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/LinkDatum.java	(revision 0)
@@ -0,0 +1,124 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A class for holding link information including the url, anchor text, a score,
+ * the timestamp of the link and a link type.
+ */
+public class LinkDatum
+  implements Writable {
+
+  public final static byte INLINK = 1;
+  public final static byte OUTLINK = 2;
+
+  private String url = null;
+  private String anchor = "";
+  private float score = 0.0f;
+  private long timestamp = 0L;
+  private byte linkType = 0;
+
+  /**
+   * Default constructor, no url, timestamp, score, or link type.
+   */
+  public LinkDatum() {
+
+  }
+
+  /**
+   * Creates a LinkDatum with a given url. Timestamp is set to current time.
+   * 
+   * @param url The link url.
+   */
+  public LinkDatum(String url) {
+    this(url, "", System.currentTimeMillis());
+  }
+
+  /**
+   * Creates a LinkDatum with a url and an anchor text. Timestamp is set to
+   * current time.
+   * 
+   * @param url The link url.
+   * @param anchor The link anchor text.
+   */
+  public LinkDatum(String url, String anchor) {
+    this(url, anchor, System.currentTimeMillis());
+  }
+
+  public LinkDatum(String url, String anchor, long timestamp) {
+    this.url = url;
+    this.anchor = anchor;
+    this.timestamp = timestamp;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  public String getAnchor() {
+    return anchor;
+  }
+
+  public void setAnchor(String anchor) {
+    this.anchor = anchor;
+  }
+
+  public float getScore() {
+    return score;
+  }
+
+  public void setScore(float score) {
+    this.score = score;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public byte getLinkType() {
+    return linkType;
+  }
+
+  public void setLinkType(byte linkType) {
+    this.linkType = linkType;
+  }
+
+  public void readFields(DataInput in)
+    throws IOException {
+    url = Text.readString(in);
+    anchor = Text.readString(in);
+    score = in.readFloat();
+    timestamp = in.readLong();
+    linkType = in.readByte();
+  }
+
+  public void write(DataOutput out)
+    throws IOException {
+    Text.writeString(out, url);
+    Text.writeString(out, anchor != null ? anchor : "");
+    out.writeFloat(score);
+    out.writeLong(timestamp);
+    out.writeByte(linkType);
+  }
+
+  public String toString() {
+
+    String type = (linkType == INLINK ? "inlink" : (linkType == OUTLINK)
+      ? "outlink" : "unknown");
+    return "url: " + url + ", anchor: " + anchor + ", score: " + score
+      + ", timestamp: " + timestamp + ", link type: " + type;
+  }
+}
Index: src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java	(revision 0)
@@ -0,0 +1,434 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.scoring.webgraph.Loops.LoopSet;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/**
+ * The LinkDumper tool creates a database of node to inlink information that can
+ * be read using the nested Reader class.  This allows the inlink and scoring 
+ * state of a single url to be reviewed quickly to determine why a given url is 
+ * ranking a certain way.  This tool is to be used with the LinkRank analysis.
+ */
+public class LinkDumper
+  extends Configured
+  implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(LinkDumper.class);
+  public static final String DUMP_DIR = "linkdump";
+
+  /**
+   * Reader class which will print out the url and all of its inlinks to system 
+   * out.  Each inlinkwill be displayed with its node information including 
+   * score and number of in and outlinks.
+   */
+  public static class Reader {
+
+    public static void main(String[] args)
+      throws Exception {
+
+      // open the readers for the linkdump directory
+      Configuration conf = NutchConfiguration.create();
+      FileSystem fs = FileSystem.get(conf);
+      Path webGraphDb = new Path(args[0]);
+      String url = args[1];
+      MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(
+        webGraphDb, DUMP_DIR), conf);
+
+      // get the link nodes for the url
+      Text key = new Text(url);
+      LinkNodes nodes = new LinkNodes();
+      MapFileOutputFormat.getEntry(readers,
+        new HashPartitioner<Text, LinkNodes>(), key, nodes);
+
+      // print out the link nodes
+      LinkNode[] linkNodesAr = nodes.getLinks();
+      System.out.println(url + ":");
+      for (LinkNode node : linkNodesAr) {
+        System.out.println("  " + node.getUrl() + " - "
+          + node.getNode().toString());
+      }
+
+      // close the readers
+      FSUtils.closeReaders(readers);
+    }
+  }
+
+  /**
+   * Bean class which holds url to node information.
+   */
+  public static class LinkNode
+    implements Writable {
+
+    private String url = null;
+    private Node node = null;
+
+    public LinkNode() {
+
+    }
+
+    public LinkNode(String url, Node node) {
+      this.url = url;
+      this.node = node;
+    }
+
+    public String getUrl() {
+      return url;
+    }
+
+    public void setUrl(String url) {
+      this.url = url;
+    }
+
+    public Node getNode() {
+      return node;
+    }
+
+    public void setNode(Node node) {
+      this.node = node;
+    }
+
+    public void readFields(DataInput in)
+      throws IOException {
+      url = in.readUTF();
+      node = new Node();
+      node.readFields(in);
+    }
+
+    public void write(DataOutput out)
+      throws IOException {
+      out.writeUTF(url);
+      node.write(out);
+    }
+
+  }
+
+  /**
+   * Writable class which holds an array of LinkNode objects.
+   */
+  public static class LinkNodes
+    implements Writable {
+
+    private LinkNode[] links;
+
+    public LinkNodes() {
+
+    }
+
+    public LinkNodes(LinkNode[] links) {
+      this.links = links;
+    }
+
+    public LinkNode[] getLinks() {
+      return links;
+    }
+
+    public void setLinks(LinkNode[] links) {
+      this.links = links;
+    }
+
+    public void readFields(DataInput in)
+      throws IOException {
+      int numLinks = in.readInt();
+      if (numLinks > 0) {
+        links = new LinkNode[numLinks];
+        for (int i = 0; i < numLinks; i++) {
+          LinkNode node = new LinkNode();
+          node.readFields(in);
+          links[i] = node;
+        }
+      }
+    }
+
+    public void write(DataOutput out)
+      throws IOException {
+      if (links != null && links.length > 0) {
+        int numLinks = links.length;
+        out.writeInt(numLinks);
+        for (int i = 0; i < numLinks; i++) {
+          links[i].write(out);
+        }
+      }
+    }
+  }
+
+  /**
+   * Inverts outlinks from the WebGraph to inlinks and attaches node
+   * information.
+   */
+  public static class Inverter
+    implements Mapper<Text, Writable, Text, ObjectWritable>,
+    Reducer<Text, ObjectWritable, Text, LinkNode> {
+
+    private JobConf conf;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    /**
+     * Wraps all values in ObjectWritables.
+     */
+    public void map(Text key, Writable value,
+      OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+      throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      objWrite.set(value);
+      output.collect(key, objWrite);
+    }
+
+    /**
+     * Inverts outlinks to inlinks while attaching node information to the 
+     * outlink.
+     */
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+      OutputCollector<Text, LinkNode> output, Reporter reporter)
+      throws IOException {
+
+      String fromUrl = key.toString();
+      List<LinkDatum> outlinks = new ArrayList<LinkDatum>();
+      Node node = null;
+      LoopSet loops = null;
+
+      // loop through all values aggregating outlinks, saving node and loopset
+      while (values.hasNext()) {
+        ObjectWritable write = values.next();
+        Object obj = write.get();
+        if (obj instanceof Node) {
+          node = (Node)obj;
+        }
+        else if (obj instanceof LinkDatum) {
+          outlinks.add((LinkDatum)WritableUtils.clone((LinkDatum)obj, conf));
+        }
+        else if (obj instanceof LoopSet) {
+          loops = (LoopSet)obj;
+        }
+      }
+
+      // only collect if there are outlinks
+      int numOutlinks = node.getNumOutlinks();
+      if (numOutlinks > 0) {
+
+        Set<String> loopSet = (loops != null) ? loops.getLoopSet() : null;
+        for (int i = 0; i < outlinks.size(); i++) {
+          LinkDatum outlink = outlinks.get(i);
+          String toUrl = outlink.getUrl();
+          
+          // remove any url that is in the loopset, same as LinkRank
+          if (loopSet != null && loopSet.contains(toUrl)) {
+            continue;
+          }
+          
+          // collect the outlink as an inlink with the node 
+          output.collect(new Text(toUrl), new LinkNode(fromUrl, node));
+        }
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  /**
+   * Merges LinkNode objects into a single array value per url.  This allows 
+   * all values to be quickly retrieved and printed via the Reader tool.
+   */
+  public static class Merger
+    implements Reducer<Text, LinkNode, Text, LinkNodes> {
+
+    private JobConf conf;
+    private int maxInlinks = 50000;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    /**
+     * Aggregate all LinkNode objects for a given url.
+     */
+    public void reduce(Text key, Iterator<LinkNode> values,
+      OutputCollector<Text, LinkNodes> output, Reporter reporter)
+      throws IOException {
+
+      List<LinkNode> nodeList = new ArrayList<LinkNode>();
+      int numNodes = 0;
+
+      while (values.hasNext()) {
+        LinkNode cur = values.next();
+        if (numNodes < maxInlinks) {
+          nodeList.add((LinkNode)WritableUtils.clone(cur, conf));
+          numNodes++;
+        }
+        else {
+          break;
+        }
+      }
+
+      LinkNode[] linkNodesAr = nodeList.toArray(new LinkNode[nodeList.size()]);
+      LinkNodes linkNodes = new LinkNodes(linkNodesAr);
+      output.collect(key, linkNodes);
+    }
+
+    public void close() {
+
+    }
+  }
+
+  /**
+   * Runs the inverter and merger jobs of the LinkDumper tool to create the 
+   * url to inlink node database.
+   */
+  public void dumpLinks(Path webGraphDb)
+    throws IOException {
+
+    LOG.info("NodeDumper: starting");
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    Path linkdump = new Path(webGraphDb, DUMP_DIR);
+    Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+    Path loopSetDb = new Path(webGraphDb, Loops.LOOPS_DIR);
+    boolean loopsExists = fs.exists(loopSetDb);
+    Path outlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR);
+
+    // run the inverter job
+    Path tempInverted = new Path(webGraphDb, "inverted-"
+      + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+    JobConf inverter = new NutchJob(conf);
+    inverter.setJobName("LinkDumper: inverter");
+    FileInputFormat.addInputPath(inverter, nodeDb);
+    if (loopsExists) {
+      FileInputFormat.addInputPath(inverter, loopSetDb);
+    }
+    FileInputFormat.addInputPath(inverter, outlinkDb);
+    inverter.setInputFormat(SequenceFileInputFormat.class);
+    inverter.setMapperClass(Inverter.class);
+    inverter.setReducerClass(Inverter.class);
+    inverter.setMapOutputKeyClass(Text.class);
+    inverter.setMapOutputValueClass(ObjectWritable.class);
+    inverter.setOutputKeyClass(Text.class);
+    inverter.setOutputValueClass(LinkNode.class);
+    FileOutputFormat.setOutputPath(inverter, tempInverted);
+    inverter.setOutputFormat(SequenceFileOutputFormat.class);
+
+    try {
+      LOG.info("LinkDumper: running inverter");
+      JobClient.runJob(inverter);
+      LOG.info("LinkDumper: finished inverter");
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    // run the merger job
+    JobConf merger = new NutchJob(conf);
+    merger.setJobName("LinkDumper: merger");
+    FileInputFormat.addInputPath(merger, tempInverted);
+    merger.setInputFormat(SequenceFileInputFormat.class);
+    merger.setReducerClass(Merger.class);
+    merger.setMapOutputKeyClass(Text.class);
+    merger.setMapOutputValueClass(LinkNode.class);
+    merger.setOutputKeyClass(Text.class);
+    merger.setOutputValueClass(LinkNodes.class);
+    FileInputFormat.addInputPath(merger, linkdump);
+    merger.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      LOG.info("LinkDumper: running merger");
+      JobClient.runJob(merger);
+      LOG.info("LinkDumper: finished merger");
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    fs.delete(tempInverted, true);
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkDumper(),
+      args);
+    System.exit(res);
+  }
+
+  /**
+   * Runs the LinkDumper tool.  This simply creates the database, to read the
+   * values the nested Reader tool must be used.
+   */
+  public int run(String[] args)
+    throws Exception {
+
+    Options options = new Options();
+    Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+      "show this help message").create("help");
+    Option webGraphDbOpts = OptionBuilder.withArgName("webgraphdb").hasArg()
+      .withDescription("the web graph database to use").create("webgraphdb");
+    options.addOption(helpOpts);
+    options.addOption(webGraphDbOpts);
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("LinkDumper", options);
+        return -1;
+      }
+
+      String webGraphDb = line.getOptionValue("webgraphdb");
+      dumpLinks(new Path(webGraphDb));
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("LinkDumper: " + StringUtils.stringifyException(e));
+      return -2;
+    }
+  }
+}
Index: src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/LinkRank.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/LinkRank.java	(revision 0)
@@ -0,0 +1,665 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.scoring.webgraph.Loops.LoopSet;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.URLUtil;
+
+public class LinkRank
+  extends Configured
+  implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(LinkRank.class);
+  private static final String NUM_NODES = "_num_nodes_";
+
+  /**
+   * Runs the counter job. The counter job determines the number of links in the
+   * webgraph. This is used during analysis.
+   * 
+   * @param fs The job file system.
+   * @param webGraphDb The web graph database to use.
+   * 
+   * @return The number of nodes in the web graph.
+   * @throws IOException If an error occurs while running the counter job.
+   */
+  private int runCounter(FileSystem fs, Path webGraphDb)
+    throws IOException {
+
+    // configure the counter job
+    Path numLinksPath = new Path(webGraphDb, NUM_NODES);
+    Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+    JobConf counter = new NutchJob(getConf());
+    counter.setJobName("LinkRank Counter");
+    FileInputFormat.addInputPath(counter, nodeDb);
+    FileOutputFormat.setOutputPath(counter, numLinksPath);
+    counter.setInputFormat(SequenceFileInputFormat.class);
+    counter.setMapperClass(Counter.class);
+    counter.setCombinerClass(Counter.class);
+    counter.setReducerClass(Counter.class);
+    counter.setMapOutputKeyClass(Text.class);
+    counter.setMapOutputValueClass(LongWritable.class);
+    counter.setOutputKeyClass(Text.class);
+    counter.setOutputValueClass(LongWritable.class);
+    counter.setNumReduceTasks(1);
+    counter.setOutputFormat(TextOutputFormat.class);
+
+    // run the counter job, outputs to a single reduce task and file
+    LOG.info("Starting link counter job");
+    try {
+      JobClient.runJob(counter);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished link counter job");
+
+    // read the first (and only) line from the file which should be the
+    // number of links in the web graph
+    LOG.info("Reading numlinks temp file");
+    FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-00000"));
+    BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks));
+    String numLinksLine = buffer.readLine();
+    readLinks.close();
+
+    // delete temp file and convert and return the number of links as an int
+    LOG.info("Deleting numlinks temp file");
+    fs.delete(numLinksPath, true);
+    String numLinks = numLinksLine.split("\\s+")[1];
+    return Integer.parseInt(numLinks);
+  }
+
+  /**
+   * Runs the initializer job. The initializer job sets up the nodes with a
+   * default starting score for link analysis.
+   * 
+   * @param nodeDb The node database to use.
+   * @param output The job output directory.
+   * 
+   * @throws IOException If an error occurs while running the initializer job.
+   */
+  private void runInitializer(Path nodeDb, Path output)
+    throws IOException {
+
+    // configure the initializer
+    JobConf initializer = new NutchJob(getConf());
+    initializer.setJobName("LinkAnalysis Initializer");
+    FileInputFormat.addInputPath(initializer, nodeDb);
+    FileOutputFormat.setOutputPath(initializer, output);
+    initializer.setInputFormat(SequenceFileInputFormat.class);
+    initializer.setMapperClass(Initializer.class);
+    initializer.setMapOutputKeyClass(Text.class);
+    initializer.setMapOutputValueClass(Node.class);
+    initializer.setOutputKeyClass(Text.class);
+    initializer.setOutputValueClass(Node.class);
+    initializer.setOutputFormat(MapFileOutputFormat.class);
+
+    // run the initializer
+    LOG.info("Starting initialization job");
+    try {
+      JobClient.runJob(initializer);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished initialization job.");
+  }
+
+  /**
+   * Runs the inverter job. The inverter job flips outlinks to inlinks to be
+   * passed into the analysis job.
+   * 
+   * The inverter job takes a link loops database if it exists. It is an
+   * optional componenet of link analysis due to its extreme computational and
+   * space requirements but it can be very useful is weeding out and eliminating
+   * link farms and other spam pages.
+   * 
+   * @param nodeDb The node database to use.
+   * @param outlinkDb The outlink database to use.
+   * @param loopDb The loop database to use if it exists.
+   * @param output The output directory.
+   * 
+   * @throws IOException If an error occurs while running the inverter job.
+   */
+  private void runInverter(Path nodeDb, Path outlinkDb, Path loopDb, Path output)
+    throws IOException {
+
+    // configure the inverter
+    JobConf inverter = new NutchJob(getConf());
+    inverter.setJobName("LinkAnalysis Inverter");
+    FileInputFormat.addInputPath(inverter, nodeDb);
+    FileInputFormat.addInputPath(inverter, outlinkDb);
+
+    // add the loop database if it exists, isn't null
+    if (loopDb != null) {
+      FileInputFormat.addInputPath(inverter, loopDb);
+    }
+    FileOutputFormat.setOutputPath(inverter, output);
+    inverter.setInputFormat(SequenceFileInputFormat.class);
+    inverter.setMapperClass(Inverter.class);
+    inverter.setReducerClass(Inverter.class);
+    inverter.setMapOutputKeyClass(Text.class);
+    inverter.setMapOutputValueClass(ObjectWritable.class);
+    inverter.setOutputKeyClass(Text.class);
+    inverter.setOutputValueClass(LinkDatum.class);
+    inverter.setOutputFormat(SequenceFileOutputFormat.class);
+
+    // run the inverter job
+    LOG.info("Starting inverter job");
+    try {
+      JobClient.runJob(inverter);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished inverter job.");
+  }
+
+  /**
+   * Runs the link analysis job. The link analysis job applies the link rank
+   * formula to create a score per url and stores that score in the NodeDb.
+   * 
+   * Typically the link analysis job is run a number of times to allow the link
+   * rank scores to converge.
+   * 
+   * @param nodeDb The node database from which we are getting previous link
+   * rank scores.
+   * @param inverted The inverted inlinks
+   * @param output The link analysis output.
+   * @param iteration The current iteration number.
+   * @param numIterations The total number of link analysis iterations
+   * 
+   * @throws IOException If an error occurs during link analysis.
+   */
+  private void runAnalysis(Path nodeDb, Path inverted, Path output,
+    int iteration, int numIterations, float rankOne)
+    throws IOException {
+
+    JobConf analyzer = new NutchJob(getConf());
+    analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1));
+    analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1)
+      + " of " + numIterations);
+    FileInputFormat.addInputPath(analyzer, nodeDb);
+    FileInputFormat.addInputPath(analyzer, inverted);
+    FileOutputFormat.setOutputPath(analyzer, output);
+    analyzer.set("link.analyze.rank.one", String.valueOf(rankOne));
+    analyzer.setMapOutputKeyClass(Text.class);
+    analyzer.setMapOutputValueClass(ObjectWritable.class);
+    analyzer.setInputFormat(SequenceFileInputFormat.class);
+    analyzer.setMapperClass(Analyzer.class);
+    analyzer.setReducerClass(Analyzer.class);
+    analyzer.setOutputKeyClass(Text.class);
+    analyzer.setOutputValueClass(Node.class);
+    analyzer.setOutputFormat(MapFileOutputFormat.class);
+
+    LOG.info("Starting analysis job");
+    try {
+      JobClient.runJob(analyzer);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished analysis job.");
+  }
+
+  /**
+   * The Counter job that determines the total number of nodes in the WebGraph.
+   * This is used to determine a rank one score for pages with zero inlinks but
+   * that contain outlinks.
+   */
+  private static class Counter
+    implements Mapper<Text, Node, Text, LongWritable>,
+    Reducer<Text, LongWritable, Text, LongWritable> {
+
+    private JobConf conf;
+    private static Text numNodes = new Text(NUM_NODES);
+    private static LongWritable one = new LongWritable(1L);
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    /**
+     * Outputs one for every node.
+     */
+    public void map(Text key, Node value,
+      OutputCollector<Text, LongWritable> output, Reporter reporter)
+      throws IOException {
+      output.collect(numNodes, one);
+    }
+
+    /**
+     * Totals the node number and outputs a single total value.
+     */
+    public void reduce(Text key, Iterator<LongWritable> values,
+      OutputCollector<Text, LongWritable> output, Reporter reporter)
+      throws IOException {
+
+      long total = 0;
+      while (values.hasNext()) {
+        total += values.next().get();
+      }
+      output.collect(numNodes, new LongWritable(total));
+    }
+
+    public void close() {
+    }
+  }
+
+  private static class Initializer
+    implements Mapper<Text, Node, Text, Node> {
+
+    private JobConf conf;
+    private float initialScore = 1.0f;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      initialScore = conf.getFloat("link.analyze.initial.score", 1.0f);
+    }
+
+    public void map(Text key, Node node, OutputCollector<Text, Node> output,
+      Reporter reporter)
+      throws IOException {
+
+      String url = key.toString();
+      Node outNode = (Node)WritableUtils.clone(node, conf);
+      outNode.setInlinkScore(initialScore);
+
+      output.collect(new Text(url), outNode);
+    }
+
+    public void close() {
+    }
+  }
+
+  /**
+   * Inverts outlinks and attaches current score from the NodeDb of the
+   * WebGraph. The link analysis process consists of inverting, analyzing and
+   * scoring, in a loop for a given number of iterations.
+   */
+  private static class Inverter
+    implements Mapper<Text, Writable, Text, ObjectWritable>,
+    Reducer<Text, ObjectWritable, Text, LinkDatum> {
+
+    private JobConf conf;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    /**
+     * Convert values to ObjectWritable
+     */
+    public void map(Text key, Writable value,
+      OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+      throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      objWrite.set(value);
+      output.collect(key, objWrite);
+    }
+
+    /**
+     * Inverts outlinks to inlinks, attaches current score for the outlink from
+     * the NodeDb of the WebGraph and removes any outlink that is contained
+     * within the loopset.
+     */
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+      OutputCollector<Text, LinkDatum> output, Reporter reporter)
+      throws IOException {
+
+      String fromUrl = key.toString();
+      List<LinkDatum> outlinks = new ArrayList<LinkDatum>();
+      Node node = null;
+      LoopSet loops = null;
+
+      // aggregate outlinks, assign other values
+      while (values.hasNext()) {
+        ObjectWritable write = values.next();
+        Object obj = write.get();
+        if (obj instanceof Node) {
+          node = (Node)obj;
+        }
+        else if (obj instanceof LinkDatum) {
+          outlinks.add((LinkDatum)WritableUtils.clone((LinkDatum)obj, conf));
+        }
+        else if (obj instanceof LoopSet) {
+          loops = (LoopSet)obj;
+        }
+      }
+
+      // get the number of outlinks and the current inlink and outlink scores
+      // from the node of the url
+      int numOutlinks = node.getNumOutlinks();
+      float inlinkScore = node.getInlinkScore();
+      float outlinkScore = node.getOutlinkScore();
+      LOG.debug(fromUrl + ": num outlinks " + numOutlinks);
+
+      // can't invert if no outlinks
+      if (numOutlinks > 0) {
+
+        Set<String> loopSet = (loops != null) ? loops.getLoopSet() : null;
+        for (int i = 0; i < outlinks.size(); i++) {
+          LinkDatum outlink = outlinks.get(i);
+          String toUrl = outlink.getUrl();
+
+          // remove any url that is contained in the loopset
+          if (loopSet != null && loopSet.contains(toUrl)) {
+            LOG.debug(fromUrl + ": Skipping inverting inlink from loop "
+              + toUrl);
+            continue;
+          }
+          outlink.setUrl(fromUrl);
+          outlink.setScore(outlinkScore);
+
+          // collect the inverted outlink
+          output.collect(new Text(toUrl), outlink);
+          LOG.debug(toUrl + ": inverting inlink from " + fromUrl
+            + " origscore: " + inlinkScore + " numOutlinks: " + numOutlinks
+            + " inlinkscore: " + outlinkScore);
+        }
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  /**
+   * Runs a single link analysis iteration.
+   */
+  private static class Analyzer
+    implements Mapper<Text, Writable, Text, ObjectWritable>,
+    Reducer<Text, ObjectWritable, Text, Node> {
+
+    private JobConf conf;
+    private float dampingFactor = 0.85f;
+    private float rankOne = 0.0f;
+    private int itNum = 0;
+    private boolean limitPages = true;
+    private boolean limitDomains = true;
+
+    /**
+     * Configures the job, sets the damping factor, rank one score, and other
+     * needed values for analysis.
+     */
+    public void configure(JobConf conf) {
+
+      try {
+        this.conf = conf;
+        this.dampingFactor = conf.getFloat("link.analyze.damping.factor", 0.85f);
+        this.rankOne = conf.getFloat("link.analyze.rank.one", 0.0f);
+        this.itNum = conf.getInt("link.analyze.iteration", 0);
+        limitPages = conf.getBoolean("link.ignore.limit.page", true);
+        limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
+      }
+      catch (Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+        throw new IllegalArgumentException(e);
+      }
+    }
+
+    /**
+     * Convert values to ObjectWritable
+     */
+    public void map(Text key, Writable value,
+      OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+      throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      objWrite.set(WritableUtils.clone(value, conf));
+      output.collect(key, objWrite);
+    }
+
+    /**
+     * Performs a single iteration of link analysis. The resulting scores are
+     * stored in a temporary NodeDb which replaces the NodeDb of the WebGraph.
+     */
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+      OutputCollector<Text, Node> output, Reporter reporter)
+      throws IOException {
+
+      String url = key.toString();
+      Set<String> domains = new HashSet<String>();
+      Set<String> pages = new HashSet<String>();
+      Node node = null;
+
+      // a page with zero inlinks has a score of rankOne
+      int numInlinks = 0;
+      float totalInlinkScore = rankOne;
+
+      while (values.hasNext()) {
+
+        ObjectWritable next = values.next();
+        Object value = next.get();
+        if (value instanceof Node) {
+          node = (Node)value;
+        }
+        else if (value instanceof LinkDatum) {
+
+          LinkDatum linkDatum = (LinkDatum)value;
+          float scoreFromInlink = linkDatum.getScore();
+          String inlinkUrl = linkDatum.getUrl();
+          String inLinkDomain = URLUtil.getDomainName(inlinkUrl);
+          String inLinkPage = URLUtil.getPage(inlinkUrl);
+
+          // limit counting duplicate inlinks by pages or domains
+          if ((limitPages && pages.contains(inLinkPage))
+            || (limitDomains && domains.contains(inLinkDomain))) {
+            LOG.debug(url + ": ignoring " + scoreFromInlink + " from "
+              + inlinkUrl + ", duplicate page or domain");
+            continue;
+          }
+
+          // aggregate total inlink score
+          numInlinks++;
+          totalInlinkScore += scoreFromInlink;
+          domains.add(inLinkDomain);
+          pages.add(inLinkPage);
+          LOG.debug(url + ": adding " + scoreFromInlink + " from " + inlinkUrl
+            + ", total: " + totalInlinkScore);
+        }
+      }
+
+      // calculate linkRank score formula
+      float linkRankScore = (1 - this.dampingFactor)
+        + (this.dampingFactor * totalInlinkScore);
+
+      LOG.info(url + ": score: " + linkRankScore + " num inlinks: "
+        + numInlinks + " iteration: " + itNum + "\n");
+
+      // store the score in a temporary NodeDb
+      Node outNode = (Node)WritableUtils.clone(node, conf);
+      outNode.setInlinkScore(linkRankScore);
+      output.collect(key, outNode);
+    }
+
+    public void close()
+      throws IOException {
+    }
+  }
+
+  /**
+   * Default constructor.
+   */
+  public LinkRank() {
+    super();
+  }
+
+  /**
+   * Configurable constructor.
+   */
+  public LinkRank(Configuration conf) {
+    super(conf);
+  }
+
+  public void close() {
+  }
+
+  /**
+   * Runs the complete link analysis job. The complete job determins rank one
+   * score. Then runs through a given number of invert and analyze iterations,
+   * by default 10. And finally replaces the NodeDb in the WebGraph with the
+   * link rank output.
+   * 
+   * @param webGraphDb The WebGraph to run link analysis on.
+   * 
+   * @throws IOException If an error occurs during link analysis.
+   */
+  public void analyze(Path webGraphDb)
+    throws IOException {
+
+    // store the link rank under the webgraphdb temporarily, final scores get
+    // upddated into the nodedb
+    Path linkRank = new Path(webGraphDb, "linkrank");
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    // create the linkrank directory if needed
+    if (!fs.exists(linkRank)) {
+      fs.mkdirs(linkRank);
+    }
+
+    // the webgraph outlink and node database paths
+    Path wgOutlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR);
+    Path wgNodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+    Path nodeDb = new Path(linkRank, WebGraph.NODE_DIR);
+    Path loopDb = new Path(webGraphDb, Loops.LOOPS_DIR);
+    if (!fs.exists(loopDb)) {
+      loopDb = null;
+    }
+
+    // get the number of total nodes in the webgraph, used for rank one, then
+    // initialze all urls with a default score
+    int numLinks = runCounter(fs, webGraphDb);
+    runInitializer(wgNodeDb, nodeDb);
+    float rankOneScore = (1f / (float)numLinks);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Number of links " + numLinks);
+      LOG.info("Rank One " + rankOneScore);
+    }
+
+    // run invert and analysis for a given number of iterations to allow the
+    // link rank scores to converge
+    int numIterations = conf.getInt("link.analyze.num.iterations", 10);
+    for (int i = 0; i < numIterations; i++) {
+
+      // the input to inverting is always the previous output from analysis
+      LOG.info("Running iteration " + (i + 1) + " of " + numIterations);
+      Path tempRank = new Path(linkRank + "-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+      fs.mkdirs(tempRank);
+      Path tempInverted = new Path(tempRank, "inverted");
+      Path tempNodeDb = new Path(tempRank, WebGraph.NODE_DIR);
+
+      // run invert and analysis
+      runInverter(nodeDb, wgOutlinkDb, loopDb, tempInverted);
+      runAnalysis(nodeDb, tempInverted, tempNodeDb, i, numIterations,
+        rankOneScore);
+
+      // replace the temporary NodeDb with the output from analysis
+      LOG.info("Installing new link scores");
+      FSUtils.replace(fs, linkRank, tempRank, true);
+      LOG.info("Finished analysis iteration " + (i + 1) + " of "
+        + numIterations);
+    }
+
+    // replace the NodeDb in the WebGraph with the final output of analysis
+    LOG.info("Installing web graph nodes");
+    FSUtils.replace(fs, wgNodeDb, nodeDb, true);
+
+    // remove the temporary link rank folder
+    fs.delete(linkRank, true);
+    LOG.info("Finished analysis");
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkRank(), args);
+    System.exit(res);
+  }
+
+  /**
+   * Runs the LinkRank tool.
+   */
+  public int run(String[] args)
+    throws Exception {
+
+    Options options = new Options();
+    Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+      "show this help message").create("help");
+    Option webgraphOpts = OptionBuilder.withArgName("webgraphdb").hasArg().withDescription(
+      "the web graph db to use").create("webgraphdb");
+    options.addOption(helpOpts);
+    options.addOption(webgraphOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("LinkRank", options);
+        return -1;
+      }
+
+      String webGraphDb = line.getOptionValue("webgraphdb");
+
+      analyze(new Path(webGraphDb));
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("LinkAnalysis: " + StringUtils.stringifyException(e));
+      return -2;
+    }
+  }
+}
Index: src/java/org/apache/nutch/scoring/webgraph/LoopReader.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/LoopReader.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/LoopReader.java	(revision 0)
@@ -0,0 +1,107 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.nutch.scoring.webgraph.Loops.LoopSet;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+
+/**
+ * The LoopReader tool prints the loopset information for a single url.
+ */
+public class LoopReader {
+
+  private Configuration conf;
+  private FileSystem fs;
+  private MapFile.Reader[] loopReaders;
+
+  /**
+   * Prints loopset for a single url.  The loopset information will show any
+   * outlink url the eventually forms a link cycle.
+   * 
+   * @param webGraphDb The WebGraph to check for loops
+   * @param url The url to check.
+   * 
+   * @throws IOException If an error occurs while printing loopset information.
+   */
+  public void dumpUrl(Path webGraphDb, String url)
+    throws IOException {
+
+    // open the readers
+    conf = NutchConfiguration.create();
+    fs = FileSystem.get(conf);
+    loopReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb,
+      Loops.LOOPS_DIR), conf);
+
+    // get the loopset for a given url, if any
+    Text key = new Text(url);
+    LoopSet loop = new LoopSet();
+    MapFileOutputFormat.getEntry(loopReaders,
+      new HashPartitioner<Text, LoopSet>(), key, loop);
+
+    // print out each loop url in the set
+    System.out.println(url + ":");
+    for (String loopUrl : loop.getLoopSet()) {
+      System.out.println("  " + loopUrl);
+    }
+
+    // close the readers
+    FSUtils.closeReaders(loopReaders);
+  }
+
+  /**
+   * Runs the LoopReader tool.  For this tool to work the loops job must have
+   * already been run on the corresponding WebGraph.
+   */
+  public static void main(String[] args)
+    throws Exception {
+
+    Options options = new Options();
+    Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+      "show this help message").create("help");
+    Option webGraphOpts = OptionBuilder.withArgName("webgraphdb").hasArg()
+      .withDescription("the webgraphdb to use").create("webgraphdb");
+    Option urlOpts = OptionBuilder.withArgName("url").hasOptionalArg()
+      .withDescription("the url to dump").create("url");
+    options.addOption(helpOpts);
+    options.addOption(webGraphOpts);
+    options.addOption(urlOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")
+        || !line.hasOption("url")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("WebGraphReader", options);
+        return;
+      }
+
+      String webGraphDb = line.getOptionValue("webgraphdb");
+      String url = line.getOptionValue("url");
+      LoopReader reader = new LoopReader();
+      reader.dumpUrl(new Path(webGraphDb), url);
+      return;
+    }
+    catch (Exception e) {
+      e.printStackTrace();
+      return;
+    }
+  }
+
+}
\ No newline at end of file
Index: src/java/org/apache/nutch/scoring/webgraph/Loops.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/Loops.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/Loops.java	(revision 0)
@@ -0,0 +1,590 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/**
+ * The Loops job identifies cycles of loops inside of the web graph. This is
+ * then used in the LinkRank program to remove those links from consideration
+ * during link analysis.
+ * 
+ * This job will identify both reciprocal links and cycles of 2+ links up to a
+ * set depth to check. The Loops job is expensive in both computational and
+ * space terms. Because it checks outlinks of outlinks of outlinks for cycles
+ * its intermediate output can be extremly large even if the end output is
+ * rather small. Because of this the Loops job is optional and if it doesn't
+ * exist then it won't be factored into the LinkRank program.
+ */
+public class Loops
+  extends Configured
+  implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(Loops.class);
+  public static final String LOOPS_DIR = "loops";
+  public static final String ROUTES_DIR = "routes";
+
+  /**
+   * A link path or route looking to identify a link cycle.
+   */
+  public static class Route
+    implements Writable {
+
+    private String outlinkUrl = null;
+    private String lookingFor = null;
+    private boolean found = false;
+
+    public Route() {
+
+    }
+
+    public String getOutlinkUrl() {
+      return outlinkUrl;
+    }
+
+    public void setOutlinkUrl(String outlinkUrl) {
+      this.outlinkUrl = outlinkUrl;
+    }
+
+    public String getLookingFor() {
+      return lookingFor;
+    }
+
+    public void setLookingFor(String lookingFor) {
+      this.lookingFor = lookingFor;
+    }
+
+    public boolean isFound() {
+      return found;
+    }
+
+    public void setFound(boolean found) {
+      this.found = found;
+    }
+
+    public void readFields(DataInput in)
+      throws IOException {
+
+      outlinkUrl = Text.readString(in);
+      lookingFor = Text.readString(in);
+      found = in.readBoolean();
+    }
+
+    public void write(DataOutput out)
+      throws IOException {
+      Text.writeString(out, outlinkUrl);
+      Text.writeString(out, lookingFor);
+      out.writeBoolean(found);
+    }
+  }
+
+  /**
+   * A set of loops.
+   */
+  public static class LoopSet
+    implements Writable {
+
+    private Set<String> loopSet = new HashSet<String>();
+
+    public LoopSet() {
+
+    }
+
+    public Set<String> getLoopSet() {
+      return loopSet;
+    }
+
+    public void setLoopSet(Set<String> loopSet) {
+      this.loopSet = loopSet;
+    }
+
+    public void readFields(DataInput in)
+      throws IOException {
+
+      int numNodes = in.readInt();
+      loopSet = new HashSet<String>();
+      for (int i = 0; i < numNodes; i++) {
+        String url = Text.readString(in);
+        loopSet.add(url);
+      }
+    }
+
+    public void write(DataOutput out)
+      throws IOException {
+
+      int numNodes = (loopSet != null ? loopSet.size() : 0);
+      out.writeInt(numNodes);
+      for (String loop : loopSet) {
+        Text.writeString(out, loop);
+      }
+    }
+
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      for (String loop : loopSet) {
+        builder.append(loop + ",");
+      }
+      return builder.substring(0, builder.length() - 1);
+    }
+  }
+
+  /**
+   * Initializes the Loop routes.
+   */
+  public static class Initializer
+    extends Configured
+    implements Mapper<Text, Writable, Text, ObjectWritable>,
+    Reducer<Text, ObjectWritable, Text, Route> {
+
+    private JobConf conf;
+
+    /**
+     * Default constructor.
+     */
+    public Initializer() {
+    }
+
+    /**
+     * Configurable constructor.
+     */
+    public Initializer(Configuration conf) {
+      setConf(conf);
+    }
+
+    /**
+     * Configure the job.
+     */
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    /**
+     * Wraps values in ObjectWritable.
+     */
+    public void map(Text key, Writable value,
+      OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+      throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      objWrite.set(value);
+      output.collect(key, objWrite);
+    }
+
+    /**
+     * Takes any node that has inlinks and sets up a route for all of its
+     * outlinks. These routes will then be followed to a maximum depth inside of
+     * the Looper job.
+     */
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+      OutputCollector<Text, Route> output, Reporter reporter)
+      throws IOException {
+
+      String url = key.toString();
+      Node node = null;
+      List<LinkDatum> outlinkList = new ArrayList<LinkDatum>();
+
+      // collect all outlinks and assign node
+      while (values.hasNext()) {
+        ObjectWritable objWrite = values.next();
+        Object obj = objWrite.get();
+        if (obj instanceof LinkDatum) {
+          outlinkList.add((LinkDatum)obj);
+        }
+        else if (obj instanceof Node) {
+          node = (Node)obj;
+        }
+      }
+
+      // has to have inlinks otherwise cycle not possible
+      if (node != null) {
+
+        int numInlinks = node.getNumInlinks();
+        if (numInlinks > 0) {
+
+          // initialize and collect a route for every outlink
+          for (LinkDatum datum : outlinkList) {
+            String outlinkUrl = datum.getUrl();
+            Route route = new Route();
+            route.setFound(false);
+            route.setLookingFor(url);
+            route.setOutlinkUrl(outlinkUrl);
+            output.collect(new Text(outlinkUrl), route);
+          }
+        }
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  /**
+   * Follows a route path looking for the start url of the route. If the start
+   * url is found then the route is a cyclical path.
+   */
+  public static class Looper
+    extends Configured
+    implements Mapper<Text, Writable, Text, ObjectWritable>,
+    Reducer<Text, ObjectWritable, Text, Route> {
+
+    private JobConf conf;
+    private boolean last = false;
+
+    /**
+     * Default constructor.
+     */
+    public Looper() {
+    }
+
+    /**
+     * Configurable constructor.
+     */
+    public Looper(Configuration conf) {
+      setConf(conf);
+    }
+
+    /**
+     * Configure the job.
+     */
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      this.last = conf.getBoolean("last", false);
+    }
+
+    /**
+     * Wrap values in ObjectWritable.
+     */
+    public void map(Text key, Writable value,
+      OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+      throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      Writable cloned = null;
+      if (value instanceof LinkDatum) {
+        cloned = new Text(((LinkDatum)value).getUrl());
+      }
+      else {
+        cloned = WritableUtils.clone(value, conf);
+      }
+      objWrite.set(cloned);
+      output.collect(key, objWrite);
+    }
+
+    /**
+     * Performs a single loop pass looking for loop cycles within routes. If
+     * This is not the last loop cycle then url will be mapped for further
+     * passes.
+     */
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+      OutputCollector<Text, Route> output, Reporter reporter)
+      throws IOException {
+
+      List<Route> routeList = new ArrayList<Route>();
+      Set<String> outlinkUrls = new LinkedHashSet<String>();
+      int numValues = 0;
+
+      // aggregate all routes and outlinks for a given url
+      while (values.hasNext()) {
+        ObjectWritable next = values.next();
+        Object value = next.get();
+        if (value instanceof Route) {
+          routeList.add((Route)WritableUtils.clone((Route)value, conf));
+        }
+        else if (value instanceof Text) {
+          String outlinkUrl = ((Text)value).toString();
+          if (!outlinkUrls.contains(outlinkUrl)) {
+            outlinkUrls.add(outlinkUrl);
+          }
+        }
+
+        // specify progress, could be a lot of routes
+        numValues++;
+        if (numValues % 100 == 0) {
+          reporter.progress();
+        }
+      }
+
+      // loop through the route list
+      Iterator<Route> routeIt = routeList.listIterator();
+      while (routeIt.hasNext()) {
+
+        // removing the route for space concerns, could be a lot of routes
+        // if the route is already found, meaning it is a loop just collect it
+        // urls with no outlinks that are not found will fall off
+        Route route = routeIt.next();
+        routeIt.remove();
+        if (route.isFound()) {
+          output.collect(key, route);
+        }
+        else {
+
+          // if the route start url is found, set route to found and collect
+          String lookingFor = route.getLookingFor();
+          if (outlinkUrls.contains(lookingFor)) {
+            route.setFound(true);
+            output.collect(key, route);
+          }
+          else if (!last) {
+
+            // setup for next pass through the loop
+            for (String outlink : outlinkUrls) {
+              output.collect(new Text(outlink), route);
+            }
+          }
+        }
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  /**
+   * Finishes the Loops job by aggregating and collecting and found routes.
+   */
+  public static class Finalizer
+    extends Configured
+    implements Mapper<Text, Route, Text, Route>,
+    Reducer<Text, Route, Text, LoopSet> {
+
+    private JobConf conf;
+
+    /**
+     * Default constructor.
+     */
+    public Finalizer() {
+    }
+
+    /**
+     * Configurable constructor.
+     */
+    public Finalizer(Configuration conf) {
+      setConf(conf);
+    }
+
+    /**
+     * Configures the job.
+     */
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    /**
+     * Maps out and found routes, those will be the link cycles.
+     */
+    public void map(Text key, Route value, OutputCollector<Text, Route> output,
+      Reporter reporter)
+      throws IOException {
+
+      if (value.isFound()) {
+        String lookingFor = value.getLookingFor();
+        output.collect(new Text(lookingFor), value);
+      }
+    }
+
+    /**
+     * Aggregates all found routes for a given start url into a loopset and 
+     * collects the loopset.
+     */
+    public void reduce(Text key, Iterator<Route> values,
+      OutputCollector<Text, LoopSet> output, Reporter reporter)
+      throws IOException {
+
+      LoopSet loops = new LoopSet();
+      while (values.hasNext()) {
+        Route route = values.next();
+        loops.getLoopSet().add(route.getOutlinkUrl());
+      }
+      output.collect(key, loops);
+    }
+
+    public void close() {
+    }
+  }
+
+  /**
+   * Runs the various loop jobs.
+   */
+  public void findLoops(Path webGraphDb)
+    throws IOException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Loops: starting");
+      LOG.info("Loops: webgraphdb: " + webGraphDb);
+    }
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+    Path outlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR);
+    Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+    Path routes = new Path(webGraphDb, ROUTES_DIR);
+    Path tempRoute = new Path(webGraphDb, ROUTES_DIR + "-"
+      + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    // run the initializer
+    JobConf init = new NutchJob(conf);
+    init.setJobName("Initializer: " + webGraphDb);
+    FileInputFormat.addInputPath(init, outlinkDb);
+    FileInputFormat.addInputPath(init, nodeDb);
+    init.setInputFormat(SequenceFileInputFormat.class);
+    init.setMapperClass(Initializer.class);
+    init.setReducerClass(Initializer.class);
+    init.setMapOutputKeyClass(Text.class);
+    init.setMapOutputValueClass(ObjectWritable.class);
+    init.setOutputKeyClass(Text.class);
+    init.setOutputValueClass(Route.class);
+    FileOutputFormat.setOutputPath(init, tempRoute);
+    init.setOutputFormat(SequenceFileOutputFormat.class);
+
+    try {
+      LOG.info("Initializer: running");
+      JobClient.runJob(init);
+      LOG.info("Initializer: installing " + routes);
+      FSUtils.replace(fs, routes, tempRoute, true);
+      LOG.info("Initializer: finished");
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    // run the loops job for a maxdepth, default 2, which will find a 3 link
+    // loop cycle
+    int depth = conf.getInt("link.loops.depth", 2);
+    for (int i = 0; i < depth; i++) {
+
+      JobConf looper = new NutchJob(conf);
+      looper.setJobName("Looper: " + (i + 1) + " of " + depth);
+      FileInputFormat.addInputPath(looper, outlinkDb);
+      FileInputFormat.addInputPath(looper, routes);
+      looper.setInputFormat(SequenceFileInputFormat.class);
+      looper.setMapperClass(Looper.class);
+      looper.setReducerClass(Looper.class);
+      looper.setMapOutputKeyClass(Text.class);
+      looper.setMapOutputValueClass(ObjectWritable.class);
+      looper.setOutputKeyClass(Text.class);
+      looper.setOutputValueClass(Route.class);
+      FileOutputFormat.setOutputPath(looper, tempRoute);
+      looper.setOutputFormat(SequenceFileOutputFormat.class);
+      looper.setBoolean("last", i == (depth - 1));
+
+      try {
+        LOG.info("Looper: running");
+        JobClient.runJob(looper);
+        LOG.info("Looper: installing " + routes);
+        FSUtils.replace(fs, routes, tempRoute, true);
+        LOG.info("Looper: finished");
+      }
+      catch (IOException e) {
+        LOG.error(StringUtils.stringifyException(e));
+        throw e;
+      }
+    }
+
+    // run the finalizer
+    JobConf finalizer = new NutchJob(conf);
+    finalizer.setJobName("Finalizer: " + webGraphDb);
+    FileInputFormat.addInputPath(finalizer, routes);
+    finalizer.setInputFormat(SequenceFileInputFormat.class);
+    finalizer.setMapperClass(Finalizer.class);
+    finalizer.setReducerClass(Finalizer.class);
+    finalizer.setMapOutputKeyClass(Text.class);
+    finalizer.setMapOutputValueClass(Route.class);
+    finalizer.setOutputKeyClass(Text.class);
+    finalizer.setOutputValueClass(LoopSet.class);
+    FileOutputFormat.setOutputPath(finalizer, new Path(webGraphDb, LOOPS_DIR));
+    finalizer.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      LOG.info("Finalizer: running");
+      JobClient.runJob(finalizer);
+      LOG.info("Finalizer: finished");
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new Loops(), args);
+    System.exit(res);
+  }
+
+  /**
+   * Runs the Loops tool.
+   */
+  public int run(String[] args)
+    throws Exception {
+
+    Options options = new Options();
+    Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+      "show this help message").create("help");
+    Option webGraphDbOpts = OptionBuilder.withArgName("webgraphdb").hasArg().withDescription(
+      "the web graph database to use").create("webgraphdb");
+    options.addOption(helpOpts);
+    options.addOption(webGraphDbOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("Loops", options);
+        return -1;
+      }
+
+      String webGraphDb = line.getOptionValue("webgraphdb");
+      findLoops(new Path(webGraphDb));
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("Loops: " + StringUtils.stringifyException(e));
+      return -2;
+    }
+  }
+}
\ No newline at end of file
Index: src/java/org/apache/nutch/scoring/webgraph/Node.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/Node.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/Node.java	(revision 0)
@@ -0,0 +1,89 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.nutch.metadata.Metadata;
+
+/**
+ * A class which holds the number of inlinks and outlinks for a given url along
+ * with an inlink score from a link analysis program and any metadata.  
+ * 
+ * The Node is the core unit of the NodeDb in the WebGraph.
+ */
+public class Node
+  implements Writable {
+
+  private int numInlinks = 0;
+  private int numOutlinks = 0;
+  private float inlinkScore = 1.0f;
+  private Metadata metadata = new Metadata();
+
+  public Node() {
+
+  }
+
+  public int getNumInlinks() {
+    return numInlinks;
+  }
+
+  public void setNumInlinks(int numInlinks) {
+    this.numInlinks = numInlinks;
+  }
+
+  public int getNumOutlinks() {
+    return numOutlinks;
+  }
+
+  public void setNumOutlinks(int numOutlinks) {
+    this.numOutlinks = numOutlinks;
+  }
+
+  public float getInlinkScore() {
+    return inlinkScore;
+  }
+
+  public void setInlinkScore(float inlinkScore) {
+    this.inlinkScore = inlinkScore;
+  }
+
+  public float getOutlinkScore() {
+    return (numOutlinks > 0) ? inlinkScore / numOutlinks : inlinkScore;
+  }
+
+  public Metadata getMetadata() {
+    return metadata;
+  }
+
+  public void setMetadata(Metadata metadata) {
+    this.metadata = metadata;
+  }
+
+  public void readFields(DataInput in)
+    throws IOException {
+
+    numInlinks = in.readInt();
+    numOutlinks = in.readInt();
+    inlinkScore = in.readFloat();
+    metadata.clear();
+    metadata.readFields(in);
+  }
+
+  public void write(DataOutput out)
+    throws IOException {
+
+    out.writeInt(numInlinks);
+    out.writeInt(numOutlinks);
+    out.writeFloat(inlinkScore);
+    metadata.write(out);
+  }
+
+  public String toString() {
+    return "num inlinks: " + numInlinks + ", num outlinks: " + numOutlinks
+      + ", inlink score: " + inlinkScore + ", outlink score: "
+      + getOutlinkScore() + ", metadata: " + metadata.toString();
+  }
+
+}
Index: src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java	(revision 0)
@@ -0,0 +1,248 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/**
+ * A tools that dumps out the top urls by number of inlinks, number of outlinks,
+ * or by score, to a text file. One of the major uses of this tool is to check
+ * the top scoring urls of a link analysis program such as LinkRank.
+ * 
+ * For number of inlinks or number of outlinks the WebGraph program will need to
+ * have been run. For link analysis score a program such as LinkRank will need
+ * to have been run which updates the NodeDb of the WebGraph.
+ */
+public class NodeDumper
+  extends Configured
+  implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(NodeDumper.class);
+
+  private static enum DumpType {
+    INLINKS,
+    OUTLINKS,
+    SCORES
+  }
+
+  /**
+   * Outputs the top urls sorted in descending order. Depending on the flag set
+   * on the command line, the top urls could be for number of inlinks, for
+   * number of outlinks, or for link analysis score.
+   */
+  public static class Sorter
+    extends Configured
+    implements Mapper<Text, Node, FloatWritable, Text>,
+    Reducer<FloatWritable, Text, Text, FloatWritable> {
+
+    private JobConf conf;
+    private boolean inlinks = false;
+    private boolean outlinks = false;
+    private boolean scores = false;
+    private long topn = Long.MAX_VALUE;
+
+    /**
+     * Configures the job, sets the flag for type of content and the topN number
+     * if any.
+     */
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      this.inlinks = conf.getBoolean("inlinks", false);
+      this.outlinks = conf.getBoolean("outlinks", false);
+      this.scores = conf.getBoolean("scores", true);
+      this.topn = conf.getLong("topn", Long.MAX_VALUE);
+    }
+
+    public void close() {
+    }
+
+    /**
+     * Outputs the url with the appropriate number of inlinks, outlinks, or for
+     * score.
+     */
+    public void map(Text key, Node node,
+      OutputCollector<FloatWritable, Text> output, Reporter reporter)
+      throws IOException {
+
+      float number = 0;
+      if (inlinks) {
+        number = node.getNumInlinks();
+      }
+      else if (outlinks) {
+        number = node.getNumOutlinks();
+      }
+      else {
+        number = node.getInlinkScore();
+      }
+
+      // number collected with negative to be descending
+      output.collect(new FloatWritable(-number), key);
+    }
+
+    /**
+     * Flips and collects the url and numeric sort value.
+     */
+    public void reduce(FloatWritable key, Iterator<Text> values,
+      OutputCollector<Text, FloatWritable> output, Reporter reporter)
+      throws IOException {
+
+      // take the negative of the negative to get original value, sometimes 0
+      // value are a little weird
+      float val = key.get();
+      FloatWritable number = new FloatWritable(val == 0 ? 0 : -val);
+      long numCollected = 0;
+
+      // collect all values, this time with the url as key
+      while (values.hasNext() && (numCollected < topn)) {
+        Text url = (Text)WritableUtils.clone(values.next(), conf);
+        output.collect(url, number);
+        numCollected++;
+      }
+    }
+  }
+
+  /**
+   * Runs the process to dump the top urls out to a text file.
+   * 
+   * @param webGraphDb The WebGraph from which to pull values.
+   * 
+   * @param inlinks
+   * @param outlinks
+   * @param scores
+   * @param topN
+   * @param output
+   * 
+   * @throws IOException If an error occurs while dumping the top values.
+   */
+  public void dumpNodes(Path webGraphDb, DumpType type, long topN, Path output)
+    throws IOException {
+
+    LOG.info("NodeDumper: starting");
+    Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+    Configuration conf = getConf();
+
+    JobConf dumper = new NutchJob(conf);
+    dumper.setJobName("NodeDumper: " + webGraphDb);
+    FileInputFormat.addInputPath(dumper, nodeDb);
+    dumper.setInputFormat(SequenceFileInputFormat.class);
+    dumper.setMapperClass(Sorter.class);
+    dumper.setReducerClass(Sorter.class);
+    dumper.setMapOutputKeyClass(FloatWritable.class);
+    dumper.setMapOutputValueClass(Text.class);
+    dumper.setOutputKeyClass(Text.class);
+    dumper.setOutputValueClass(FloatWritable.class);
+    FileOutputFormat.setOutputPath(dumper, output);
+    dumper.setOutputFormat(TextOutputFormat.class);
+    dumper.setNumReduceTasks(1);
+    dumper.setBoolean("inlinks", type == DumpType.INLINKS);
+    dumper.setBoolean("outlinks", type == DumpType.OUTLINKS);
+    dumper.setBoolean("scores", type == DumpType.SCORES);
+    dumper.setLong("topn", topN);
+
+    try {
+      LOG.info("NodeDumper: running");
+      JobClient.runJob(dumper);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new NodeDumper(),
+      args);
+    System.exit(res);
+  }
+
+  /**
+   * Runs the node dumper tool.
+   */
+  public int run(String[] args)
+    throws Exception {
+
+    Options options = new Options();
+    Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+      "show this help message").create("help");
+    Option webGraphDbOpts = OptionBuilder.withArgName("webgraphdb").hasArg().withDescription(
+      "the web graph database to use").create("webgraphdb");
+    Option inlinkOpts = OptionBuilder.withArgName("inlinks").withDescription(
+      "show highest inlinks").create("inlinks");
+    Option outlinkOpts = OptionBuilder.withArgName("outlinks").withDescription(
+      "show highest outlinks").create("outlinks");
+    Option scoreOpts = OptionBuilder.withArgName("scores").hasOptionalArg().withDescription(
+      "show highest scores").create("scores");
+    Option topNOpts = OptionBuilder.withArgName("topn").hasOptionalArg().withDescription(
+      "show topN scores").create("topn");
+    Option outputOpts = OptionBuilder.withArgName("output").hasArg().withDescription(
+      "the output directory to use").create("output");
+    options.addOption(helpOpts);
+    options.addOption(webGraphDbOpts);
+    options.addOption(inlinkOpts);
+    options.addOption(outlinkOpts);
+    options.addOption(scoreOpts);
+    options.addOption(topNOpts);
+    options.addOption(outputOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("NodeDumper", options);
+        return -1;
+      }
+
+      String webGraphDb = line.getOptionValue("webgraphdb");
+      boolean inlinks = line.hasOption("inlinks");
+      boolean outlinks = line.hasOption("outlinks");
+      boolean scores = line.hasOption("scores");
+      long topN = (line.hasOption("topn")
+        ? Long.parseLong(line.getOptionValue("topn")) : Long.MAX_VALUE);
+
+      // get the correct dump type
+      String output = line.getOptionValue("output");
+      DumpType type = (inlinks ? DumpType.INLINKS : outlinks
+        ? DumpType.OUTLINKS : DumpType.SCORES);
+
+      dumpNodes(new Path(webGraphDb), type, topN, new Path(output));
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("NodeDumper: " + StringUtils.stringifyException(e));
+      return -2;
+    }
+  }
+}
Index: src/java/org/apache/nutch/scoring/webgraph/NodeReader.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/NodeReader.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/NodeReader.java	(revision 0)
@@ -0,0 +1,106 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+
+/**
+ * Reads and prints to system out information for a single node from the NodeDb 
+ * in the WebGraph.
+ */
+public class NodeReader {
+
+  private Configuration conf;
+  private FileSystem fs;
+  private MapFile.Reader[] nodeReaders;
+
+  /**
+   * Prints the content of the Node represented by the url to system out.
+   * 
+   * @param webGraphDb The webgraph from which to get the node.
+   * @param url The url of the node.
+   * 
+   * @throws IOException If an error occurs while getting the node.
+   */
+  public void dumpUrl(Path webGraphDb, String url)
+    throws IOException {
+
+    conf = NutchConfiguration.create();
+    fs = FileSystem.get(conf);
+    nodeReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb,
+      WebGraph.NODE_DIR), conf);
+
+    // open the readers, get the node, print out the info, and close the readers
+    Text key = new Text(url);
+    Node node = new Node();
+    MapFileOutputFormat.getEntry(nodeReaders,
+      new HashPartitioner<Text, Node>(), key, node);
+    System.out.println(url + ":");
+    System.out.println("  inlink score: " + node.getInlinkScore());
+    System.out.println("  outlink score: " + node.getOutlinkScore());
+    System.out.println("  num inlinks: " + node.getNumInlinks());
+    System.out.println("  num outlinks: " + node.getNumOutlinks());
+    FSUtils.closeReaders(nodeReaders);
+  }
+
+  /**
+   * Runs the NodeReader tool.  The command line arguments must contain a 
+   * webgraphdb path and a url.  The url must match the normalized url that is
+   * contained in the NodeDb of the WebGraph.
+   */
+  public static void main(String[] args)
+    throws Exception {
+
+    Options options = new Options();
+    Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+      "show this help message").create("help");
+    Option webGraphOpts = OptionBuilder.withArgName("webgraphdb").hasArg()
+      .withDescription("the webgraphdb to use").create("webgraphdb");
+    Option urlOpts = OptionBuilder.withArgName("url").hasOptionalArg()
+      .withDescription("the url to dump").create("url");
+    options.addOption(helpOpts);
+    options.addOption(webGraphOpts);
+    options.addOption(urlOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      // command line must take a webgraphdb and a url
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")
+        || !line.hasOption("url")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("WebGraphReader", options);
+        return;
+      }
+
+      // dump the values to system out and return
+      String webGraphDb = line.getOptionValue("webgraphdb");
+      String url = line.getOptionValue("url");
+      NodeReader reader = new NodeReader();
+      reader.dumpUrl(new Path(webGraphDb), url);
+      
+      return;
+    }
+    catch (Exception e) {
+      e.printStackTrace();
+      return;
+    }
+  }
+
+}
\ No newline at end of file
Index: src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java	(revision 0)
@@ -0,0 +1,226 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/**
+ * Updates the score from the WebGraph node database into the crawl database.
+ * Any score that is not in the node database is set to the clear score in the 
+ * crawl database.
+ */
+public class ScoreUpdater
+  extends Configured
+  implements Tool, Mapper<Text, Writable, Text, ObjectWritable>,
+  Reducer<Text, ObjectWritable, Text, CrawlDatum> {
+
+  public static final Log LOG = LogFactory.getLog(ScoreUpdater.class);
+
+  private JobConf conf;
+  private float clearScore = 0.0f;
+
+  public void configure(JobConf conf) {
+    this.conf = conf;
+    clearScore = conf.getFloat("link.score.updater.clear.score", 0.0f);
+  }
+
+  /**
+   * Changes input into ObjectWritables.
+   */
+  public void map(Text key, Writable value,
+    OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+    throws IOException {
+
+    ObjectWritable objWrite = new ObjectWritable();
+    objWrite.set(value);
+    output.collect(key, objWrite);
+  }
+
+  /**
+   * Creates new CrawlDatum objects with the updated score from the NodeDb or
+   * with a cleared score.
+   */
+  public void reduce(Text key, Iterator<ObjectWritable> values,
+    OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+    throws IOException {
+
+    String url = key.toString();
+    Node node = null;
+    CrawlDatum datum = null;
+
+    // set the node and the crawl datum, should be one of each unless no node
+    // for url in the crawldb
+    while (values.hasNext()) {
+      ObjectWritable next = values.next();
+      Object value = next.get();
+      if (value instanceof Node) {
+        node = (Node)value;
+      }
+      else if (value instanceof CrawlDatum) {
+        datum = (CrawlDatum)value;
+      }
+    }
+
+    // datum should never be null, could happen if somehow the url was 
+    // normalized or changed after being pulled from the crawldb
+    if (datum != null) {
+
+      if (node != null) {
+        
+        // set the inlink score in the nodedb
+        float inlinkScore = node.getInlinkScore();
+        datum.setScore(inlinkScore);
+        LOG.debug(url + ": setting to score " + inlinkScore);
+      }
+      else {
+        
+        // clear out the score in the crawldb
+        datum.setScore(clearScore);
+        LOG.debug(url + ": setting to clear score of " + clearScore);
+      }
+
+      output.collect(key, datum);
+    }
+    else {
+      LOG.debug(url + ": no datum");
+    }
+  }
+
+  public void close() {
+  }
+
+  /**
+   * Updates the inlink score in the web graph node databsae into the crawl 
+   * database.
+   * 
+   * @param crawlDb The crawl database to update
+   * @param webGraphDb The webgraph database to use.
+   * 
+   * @throws IOException If an error occurs while updating the scores.
+   */
+  public void update(Path crawlDb, Path webGraphDb)
+    throws IOException {
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    // create a temporary crawldb with the new scores
+    LOG.info("Running crawldb update " + crawlDb);
+    Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+    Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME);
+    Path newCrawlDb = new Path(crawlDb,
+      Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    // run the updater job outputting to the temp crawl database
+    JobConf updater = new NutchJob(conf);
+    updater.setJobName("Update CrawlDb from WebGraph");
+    FileInputFormat.addInputPath(updater, crawlDbCurrent);
+    FileInputFormat.addInputPath(updater, nodeDb);
+    FileOutputFormat.setOutputPath(updater, newCrawlDb);
+    updater.setInputFormat(SequenceFileInputFormat.class);
+    updater.setMapperClass(ScoreUpdater.class);
+    updater.setReducerClass(ScoreUpdater.class);
+    updater.setMapOutputKeyClass(Text.class);
+    updater.setMapOutputValueClass(ObjectWritable.class);
+    updater.setOutputKeyClass(Text.class);
+    updater.setOutputValueClass(CrawlDatum.class);
+    updater.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      JobClient.runJob(updater);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      
+      // remove the temp crawldb on error
+      if (fs.exists(newCrawlDb)) {
+        fs.delete(newCrawlDb, true);
+      }
+      throw e;
+    }
+
+    // install the temp crawl database
+    LOG.info("Installing new crawldb " + crawlDb);
+    CrawlDb.install(updater, crawlDb);
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new ScoreUpdater(),
+      args);
+    System.exit(res);
+  }
+
+  /**
+   * Runs the ScoreUpdater tool.
+   */
+  public int run(String[] args)
+    throws Exception {
+
+    Options options = new Options();
+    Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+      "show this help message").create("help");
+    Option crawlDbOpts = OptionBuilder.withArgName("crawldb").hasArg().withDescription(
+      "the crawldb to use").create("crawldb");
+    Option webGraphOpts = OptionBuilder.withArgName("webgraphdb").hasArg().withDescription(
+      "the webgraphdb to use").create("webgraphdb");
+    options.addOption(helpOpts);
+    options.addOption(crawlDbOpts);
+    options.addOption(webGraphOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")
+        || !line.hasOption("crawldb")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("ScoreUpdater", options);
+        return -1;
+      }
+
+      String crawlDb = line.getOptionValue("crawldb");
+      String webGraphDb = line.getOptionValue("webgraphdb");
+      update(new Path(crawlDb), new Path(webGraphDb));
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("ScoreUpdater: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
Index: src/java/org/apache/nutch/scoring/webgraph/WebGraph.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/WebGraph.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/WebGraph.java	(revision 0)
@@ -0,0 +1,629 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Creates three databases, one for inlinks, one for outlinks, and a node
+ * database that holds the number of in and outlinks to a url and the current
+ * score for the url.
+ * 
+ * The score is set by an analysis program such as LinkRank. The WebGraph is an
+ * update-able database. Outlinks are stored by their fetch time or by the
+ * current system time if no fetch time is available. Only the most recent
+ * version of outlinks for a given url is stored. As more crawls are executed
+ * and the WebGraph updated, newer Outlinks will replace older Outlinks. This
+ * allows the WebGraph to adapt to changes in the link structure of the web.
+ * 
+ * The Inlink database is created from the Outlink database and is regenerated
+ * when the WebGraph is updated. The Node database is created from both the
+ * Inlink and Outlink databases. Because the Node database is overwritten when
+ * the WebGraph is updated and because the Node database holds current scores
+ * for urls it is recommended that a crawl-cyle (one or more full crawls) fully
+ * complete before the WebGraph is updated and some type of analysis, such as
+ * LinkRank, is run to update scores in the Node database in a stable fashion.
+ */
+public class WebGraph
+  extends Configured
+  implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(WebGraph.class);
+  public static final String LOCK_NAME = ".locked";
+  public static final String INLINK_DIR = "inlinks";
+  public static final String OUTLINK_DIR = "outlinks";
+  public static final String NODE_DIR = "nodes";
+
+  /**
+   * The OutlinkDb creates a database of all outlinks. Outlinks to internal urls
+   * by domain and host can be ignored. The number of Outlinks out to a given
+   * page or domain can also be limited.
+   */
+  public static class OutlinkDb
+    extends Configured
+    implements Mapper<Text, Writable, Text, LinkDatum>,
+    Reducer<Text, LinkDatum, Text, LinkDatum> {
+
+    // ignoring internal domains, internal hosts
+    private boolean ignoreDomain = true;
+    private boolean ignoreHost = true;
+
+    // limiting urls out to a page or to a domain
+    private boolean limitPages = true;
+    private boolean limitDomains = true;
+
+    // url normalizers and job configuration
+    private URLNormalizers urlNormalizers;
+    private JobConf conf;
+
+    /**
+     * Normalizes and trims extra whitespace from the given url.
+     * 
+     * @param url The url to normalize.
+     * 
+     * @return The normalized url.
+     */
+    private String normalizeUrl(String url) {
+
+      String normalized = null;
+      if (urlNormalizers != null) {
+        try {
+
+          // normalize and trim the url
+          normalized = urlNormalizers.normalize(url,
+            URLNormalizers.SCOPE_DEFAULT);
+          normalized = normalized.trim();
+        }
+        catch (Exception e) {
+          LOG.warn("Skipping " + url + ":" + e);
+          normalized = null;
+        }
+      }
+      return normalized;
+    }
+
+    /**
+     * Returns the fetch time from the parse data or the current system time if
+     * the fetch time doesn't exist.
+     * 
+     * @param data The parse data.
+     * 
+     * @return The fetch time as a long.
+     */
+    private long getFetchTime(ParseData data) {
+
+      // default to current system time
+      long fetchTime = System.currentTimeMillis();
+      String fetchTimeStr = data.getContentMeta().get(Nutch.FETCH_TIME_KEY);
+      try {
+
+        // get the fetch time from the parse data
+        fetchTime = Long.parseLong(fetchTimeStr);
+      }
+      catch (Exception e) {
+        fetchTime = System.currentTimeMillis();
+      }
+      return fetchTime;
+    }
+
+    /**
+     * Default constructor.
+     */
+    public OutlinkDb() {
+    }
+
+    /**
+     * Configurable constructor.
+     */
+    public OutlinkDb(Configuration conf) {
+      setConf(conf);
+    }
+
+    /**
+     * Configures the OutlinkDb job. Sets up internal links and link limiting.
+     */
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      ignoreHost = conf.getBoolean("link.ignore.internal.host", true);
+      ignoreDomain = conf.getBoolean("link.ignore.internal.domain", true);
+      limitPages = conf.getBoolean("link.ignore.limit.page", true);
+      limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
+      urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
+    }
+
+    /**
+     * Passes through existing LinkDatum objects from an existing OutlinkDb and
+     * maps out new LinkDatum objects from new crawls ParseData.
+     */
+    public void map(Text key, Writable value,
+      OutputCollector<Text, LinkDatum> output, Reporter reporter)
+      throws IOException {
+
+      // normalize url, stop processing if null
+      String url = normalizeUrl(key.toString());
+      if (url == null) {
+        return;
+      }
+
+      if (value instanceof ParseData) {
+
+        // get the parse data and the outlinks from the parse data, along with
+        // the fetch time for those links
+        ParseData data = (ParseData)value;
+        long fetchTime = getFetchTime(data);
+        Outlink[] outlinkAr = data.getOutlinks();
+        Map<String, String> outlinkMap = new LinkedHashMap<String, String>();
+
+        // normalize urls and put into map
+        if (outlinkAr != null && outlinkAr.length > 0) {
+          for (int i = 0; i < outlinkAr.length; i++) {
+            Outlink outlink = outlinkAr[i];
+            String toUrl = normalizeUrl(outlink.getToUrl());
+
+            // only put into map if the url doesn't already exist in the map or
+            // if it does and the anchor for that link is null, will replace if
+            // url is existing
+            boolean existingUrl = outlinkMap.containsKey(toUrl);
+            if (toUrl != null
+              && (!existingUrl || (existingUrl && outlinkMap.get(toUrl) == null))) {
+              outlinkMap.put(toUrl, outlink.getAnchor());
+            }
+          }
+        }
+
+        // collect the outlinks under the fetch time
+        for (String outlinkUrl : outlinkMap.keySet()) {
+          String anchor = outlinkMap.get(outlinkUrl);
+          LinkDatum datum = new LinkDatum(outlinkUrl, anchor, fetchTime);
+          output.collect(key, datum);
+        }
+      }
+      else if (value instanceof LinkDatum) {
+
+        // collect existing outlinks from existing OutlinkDb
+        output.collect(key, (LinkDatum)value);
+      }
+    }
+
+    public void reduce(Text key, Iterator<LinkDatum> values,
+      OutputCollector<Text, LinkDatum> output, Reporter reporter)
+      throws IOException {
+
+      // aggregate all outlinks, get the most recent timestamp for a fetch
+      // which should be the timestamp for all of the most recent outlinks
+      long mostRecent = 0L;
+      List<LinkDatum> outlinkList = new ArrayList<LinkDatum>();
+      while (values.hasNext()) {
+
+        // loop through, change out most recent timestamp if needed
+        LinkDatum next = values.next();
+        long timestamp = next.getTimestamp();
+        if (mostRecent == 0L || mostRecent < timestamp) {
+          mostRecent = timestamp;
+        }
+        outlinkList.add((LinkDatum)WritableUtils.clone(next, conf));
+      }
+
+      // get the url, domain, and host for the url
+      String url = key.toString();
+      String domain = URLUtil.getDomainName(url);
+      String host = URLUtil.getHost(url);
+
+      // setup checking sets for domains and pages
+      Set<String> domains = new HashSet<String>();
+      Set<String> pages = new HashSet<String>();
+
+      // loop through the link datums
+      for (LinkDatum datum : outlinkList) {
+
+        // get the url, host, domain, and page for each outlink
+        String toUrl = datum.getUrl();
+        String toDomain = URLUtil.getDomainName(toUrl);
+        String toHost = URLUtil.getHost(toUrl);
+        String toPage = URLUtil.getPage(toUrl);
+        datum.setLinkType(LinkDatum.OUTLINK);
+
+        // outlinks must be the most recent and conform to internal url and
+        // limiting rules, if it does collect it
+        if (datum.getTimestamp() == mostRecent
+          && (!limitPages || (limitPages && !pages.contains(toPage)))
+          && (!limitDomains || (limitDomains && !domains.contains(toDomain)))
+          && (!ignoreHost || (ignoreHost && !toHost.equalsIgnoreCase(host)))
+          && (!ignoreDomain || (ignoreDomain && !toDomain.equalsIgnoreCase(domain)))) {
+          output.collect(key, datum);
+          pages.add(toPage);
+          domains.add(toDomain);
+        }
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  /**
+   * The InlinkDb creates a database of Inlinks. Inlinks are inverted from the
+   * OutlinkDb LinkDatum objects and are regenerated each time the WebGraph is
+   * updated.
+   */
+  private static class InlinkDb
+    extends Configured
+    implements Mapper<Text, LinkDatum, Text, LinkDatum> {
+
+    private JobConf conf;
+    private long timestamp;
+
+    /**
+     * Default constructor.
+     */
+    public InlinkDb() {
+    }
+
+    /**
+     * Configurable constructor.
+     */
+    public InlinkDb(Configuration conf) {
+      setConf(conf);
+    }
+
+    /**
+     * Configures job. Sets timestamp for all Inlink LinkDatum objects to the
+     * current system time.
+     */
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      timestamp = System.currentTimeMillis();
+    }
+
+    public void close() {
+    }
+
+    /**
+     * Inverts the Outlink LinkDatum objects into new LinkDatum objects with a
+     * new system timestamp, type and to and from url switched.
+     */
+    public void map(Text key, LinkDatum datum,
+      OutputCollector<Text, LinkDatum> output, Reporter reporter)
+      throws IOException {
+
+      // get the to and from url and the anchor
+      String fromUrl = key.toString();
+      String toUrl = datum.getUrl();
+      String anchor = datum.getAnchor();
+
+      // flip the from and to url and set the new link type
+      LinkDatum inlink = new LinkDatum(fromUrl, anchor, timestamp);
+      inlink.setLinkType(LinkDatum.INLINK);
+      output.collect(new Text(toUrl), inlink);
+    }
+  }
+
+  /**
+   * Creates the Node database which consists of the number of in and outlinks
+   * for each url and a score slot for analysis programs such as LinkRank.
+   */
+  private static class NodeDb
+    extends Configured
+    implements Reducer<Text, LinkDatum, Text, Node> {
+
+    private JobConf conf;
+
+    /**
+     * Default constructor.
+     */
+    public NodeDb() {
+    }
+
+    /**
+     * Configurable constructor.
+     */
+    public NodeDb(Configuration conf) {
+      setConf(conf);
+    }
+
+    /**
+     * Configures job.
+     */
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    public void close() {
+    }
+
+    /**
+     * Counts the number of inlinks and outlinks for each url and sets a default
+     * score of 0.0 for each url (node) in the webgraph.
+     */
+    public void reduce(Text key, Iterator<LinkDatum> values,
+      OutputCollector<Text, Node> output, Reporter reporter)
+      throws IOException {
+
+      Node node = new Node();
+      int numInlinks = 0;
+      int numOutlinks = 0;
+
+      // loop through counting number of in and out links
+      while (values.hasNext()) {
+        LinkDatum next = values.next();
+        if (next.getLinkType() == LinkDatum.INLINK) {
+          numInlinks++;
+        }
+        else if (next.getLinkType() == LinkDatum.OUTLINK) {
+          numOutlinks++;
+        }
+      }
+
+      // set the in and outlinks and a default score of 0
+      node.setNumInlinks(numInlinks);
+      node.setNumOutlinks(numOutlinks);
+      node.setInlinkScore(0.0f);
+      output.collect(key, node);
+    }
+  }
+
+  /**
+   * Creates the three different WebGraph databases, Outlinks, Inlinks, and
+   * Node. If a current WebGraph exists then it is updated, if it doesn't exist
+   * then a new WebGraph database is created.
+   * 
+   * @param webGraphDb The WebGraph to create or update.
+   * @param segments The array of segments used to update the WebGraph. Newer
+   * segments and fetch times will overwrite older segments.
+   * 
+   * @throws IOException If an error occurs while processing the WebGraph.
+   */
+  public void createWebGraph(Path webGraphDb, Path[] segments)
+    throws IOException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("WebGraphDb: starting");
+      LOG.info("WebGraphDb: webgraphdb: " + webGraphDb);
+    }
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    // lock an existing webgraphdb to prevent multiple simultaneous updates
+    Path lock = new Path(webGraphDb, LOCK_NAME);
+    boolean webGraphDbExists = fs.exists(webGraphDb);
+    if (webGraphDbExists) {
+      LockUtil.createLockFile(fs, lock, false);
+    }
+    else {
+      
+      // if the webgraph doesn't exist, create it
+      fs.mkdirs(webGraphDb);
+    }
+
+    // outlink and temp outlink database paths
+    Path outlinkDb = new Path(webGraphDb, OUTLINK_DIR);
+    Path tempOutlinkDb = new Path(outlinkDb + "-"
+      + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+    JobConf outlinkJob = new NutchJob(conf);
+    outlinkJob.setJobName("Outlinkdb: " + outlinkDb);
+
+    // get the parse data for all segments
+    if (segments != null) {
+      for (int i = 0; i < segments.length; i++) {
+        Path parseData = new Path(segments[i], ParseData.DIR_NAME);
+        if (fs.exists(parseData)) {
+          LOG.info("OutlinkDb: adding input: " + parseData);
+          FileInputFormat.addInputPath(outlinkJob, parseData);
+        }
+      }
+    }
+
+    // add the existing webgraph
+    if (webGraphDbExists) {
+      LOG.info("OutlinkDb: adding input: " + outlinkDb);
+      FileInputFormat.addInputPath(outlinkJob, outlinkDb);
+    }
+
+    outlinkJob.setInputFormat(SequenceFileInputFormat.class);
+    outlinkJob.setMapperClass(OutlinkDb.class);
+    outlinkJob.setReducerClass(OutlinkDb.class);
+    outlinkJob.setMapOutputKeyClass(Text.class);
+    outlinkJob.setMapOutputValueClass(LinkDatum.class);
+    outlinkJob.setOutputKeyClass(Text.class);
+    outlinkJob.setOutputValueClass(LinkDatum.class);
+    FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb);
+    outlinkJob.setOutputFormat(MapFileOutputFormat.class);
+
+    // run the outlinkdb job and replace any old outlinkdb with the new one
+    try {
+      LOG.info("OutlinkDb: running");
+      JobClient.runJob(outlinkJob);
+      LOG.info("OutlinkDb: installing " + outlinkDb);
+      FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true);
+      LOG.info("OutlinkDb: finished");
+    }
+    catch (IOException e) {
+      
+      // remove lock file and and temporary directory if an error occurs
+      LockUtil.removeLockFile(fs, lock);
+      if (fs.exists(tempOutlinkDb)) {
+        fs.delete(tempOutlinkDb, true);
+      }
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    // inlink and temp link database paths
+    Path inlinkDb = new Path(webGraphDb, INLINK_DIR);
+    Path tempInlinkDb = new Path(inlinkDb + "-"
+      + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf inlinkJob = new NutchJob(conf);
+    inlinkJob.setJobName("Inlinkdb " + inlinkDb);
+    LOG.info("InlinkDb: adding input: " + outlinkDb);
+    FileInputFormat.addInputPath(inlinkJob, outlinkDb);
+    inlinkJob.setInputFormat(SequenceFileInputFormat.class);
+    inlinkJob.setMapperClass(InlinkDb.class);
+    inlinkJob.setMapOutputKeyClass(Text.class);
+    inlinkJob.setMapOutputValueClass(LinkDatum.class);
+    inlinkJob.setOutputKeyClass(Text.class);
+    inlinkJob.setOutputValueClass(LinkDatum.class);
+    FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb);
+    inlinkJob.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      
+      // run the inlink and replace any old with new
+      LOG.info("InlinkDb: running");
+      JobClient.runJob(inlinkJob);
+      LOG.info("InlinkDb: installing " + inlinkDb);
+      FSUtils.replace(fs, inlinkDb, tempInlinkDb, true);
+      LOG.info("InlinkDb: finished");
+    }
+    catch (IOException e) {
+      
+      // remove lock file and and temporary directory if an error occurs
+      LockUtil.removeLockFile(fs, lock);
+      if (fs.exists(tempInlinkDb)) {
+        fs.delete(tempInlinkDb, true);
+      }      
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    // node and temp node database paths
+    Path nodeDb = new Path(webGraphDb, NODE_DIR);
+    Path tempNodeDb = new Path(nodeDb + "-"
+      + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf nodeJob = new NutchJob(conf);
+    nodeJob.setJobName("NodeDb " + nodeDb);
+    LOG.info("NodeDb: adding input: " + outlinkDb);
+    LOG.info("NodeDb: adding input: " + inlinkDb);
+    FileInputFormat.addInputPath(nodeJob, outlinkDb);
+    FileInputFormat.addInputPath(nodeJob, inlinkDb);
+    nodeJob.setInputFormat(SequenceFileInputFormat.class);
+    nodeJob.setReducerClass(NodeDb.class);
+    nodeJob.setMapOutputKeyClass(Text.class);
+    nodeJob.setMapOutputValueClass(LinkDatum.class);
+    nodeJob.setOutputKeyClass(Text.class);
+    nodeJob.setOutputValueClass(Node.class);
+    FileOutputFormat.setOutputPath(nodeJob, tempNodeDb);
+    nodeJob.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      
+      // run the node job and replace old nodedb with new
+      LOG.info("NodeDb: running");
+      JobClient.runJob(nodeJob);
+      LOG.info("NodeDb: installing " + nodeDb);
+      FSUtils.replace(fs, nodeDb, tempNodeDb, true);
+      LOG.info("NodeDb: finished");
+    }
+    catch (IOException e) {
+      
+      // remove lock file and and temporary directory if an error occurs
+      LockUtil.removeLockFile(fs, lock);
+      if (fs.exists(tempNodeDb)) {
+        fs.delete(tempNodeDb, true);
+      }      
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    // remove the lock file for the webgraph
+    LockUtil.removeLockFile(fs, lock);
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new WebGraph(), args);
+    System.exit(res);
+  }
+
+  /**
+   * Parses command link arguments and runs the WebGraph jobs.
+   */
+  public int run(String[] args)
+    throws Exception {
+
+    Options options = new Options();
+    Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+      "show this help message").create("help");
+    Option webGraphDbOpts = OptionBuilder.withArgName("webgraphdb").hasArg().withDescription(
+      "the web graph database to use").create("webgraphdb");
+    Option segOpts = OptionBuilder.withArgName("segment").hasArgs().withDescription(
+      "the segment(s) to use").create("segment");
+    options.addOption(helpOpts);
+    options.addOption(webGraphDbOpts);
+    options.addOption(segOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")
+        || !line.hasOption("segment")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("WebGraph", options);
+        return -1;
+      }
+
+      String webGraphDb = line.getOptionValue("webgraphdb");
+      String[] segments = line.getOptionValues("segment");
+      Path[] segPaths = new Path[segments.length];
+      for (int i = 0; i < segments.length; i++) {
+        segPaths[i] = new Path(segments[i]);
+      }
+
+      createWebGraph(new Path(webGraphDb), segPaths);
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("WebGraph: " + StringUtils.stringifyException(e));
+      return -2;
+    }
+  }
+
+}
Index: src/java/org/apache/nutch/tools/compat/ReprUrlFixer.java
===================================================================
--- src/java/org/apache/nutch/tools/compat/ReprUrlFixer.java	(revision 0)
+++ src/java/org/apache/nutch/tools/compat/ReprUrlFixer.java	(revision 0)
@@ -0,0 +1,296 @@
+package org.apache.nutch.tools.compat;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.MapWritable;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.scoring.webgraph.Node;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * <p>
+ * Significant changes were made to representative url logic used for redirects.
+ * This tool will fix representative urls stored in current segments and crawl
+ * databases. Any new fetches will use the new representative url logic.
+ * </p>
+ * 
+ * <p>
+ * All crawl datums are assumed to be temp url redirects. While this may cause
+ * some urls to be incorrectly removed, this tool is a temporary measure to be
+ * used until fetches can be rerun. This reduce logic is the same for segments
+ * fetch and parse directory as well as for existing crawl databases.
+ * </p>
+ */
+public class ReprUrlFixer
+  extends Configured
+  implements Tool, Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+
+  public static final Log LOG = LogFactory.getLog(ReprUrlFixer.class);
+  private JobConf conf;
+
+  public void configure(JobConf conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Runs the new ReprUrl logic on all crawldatums.
+   */
+  public void reduce(Text key, Iterator<CrawlDatum> values,
+    OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+    throws IOException {
+
+    String url = key.toString();
+    Node node = null;
+    List<CrawlDatum> datums = new ArrayList<CrawlDatum>();
+
+    // get all crawl datums for a given url key, fetch for instance can have
+    // more than one under a given key if there are multiple redirects to a
+    // given url
+    while (values.hasNext()) {
+      CrawlDatum datum = values.next();
+      datums.add((CrawlDatum)WritableUtils.clone(datum, conf));
+    }
+
+    // apply redirect repr url logic for each datum
+    for (CrawlDatum datum : datums) {
+
+      MapWritable metadata = datum.getMetaData();
+      Text reprUrl = (Text)metadata.get(Nutch.WRITABLE_REPR_URL_KEY);
+      byte status = datum.getStatus();
+      boolean isCrawlDb = (CrawlDatum.hasDbStatus(datum));
+      boolean segFetched = (status == CrawlDatum.STATUS_FETCH_SUCCESS);
+
+      // only if the crawl datum is from the crawldb or is a successfully
+      // fetched page from the segments
+      if ((isCrawlDb || segFetched) && reprUrl != null) {
+
+        String src = reprUrl.toString();
+        String dest = url;
+        URL srcUrl = null;
+        URL dstUrl = null;
+
+        // both need to be well formed urls
+        try {
+          srcUrl = new URL(src);
+          dstUrl = new URL(url);
+        }
+        catch (MalformedURLException e) {
+        }
+
+        // if the src and repr urls are the same after the new logic then
+        // remove the repr url from the metadata as it is no longer needed
+        if (srcUrl != null && dstUrl != null) {
+          String reprOut = URLUtil.chooseRepr(src, dest, true);
+          if (reprOut.equals(dest)) {
+            LOG.info("Removing " + reprOut + " from " + dest);
+            metadata.remove(Nutch.WRITABLE_REPR_URL_KEY);
+          }
+        }
+      }
+
+      // collect each datum
+      output.collect(key, datum);
+    }
+
+  }
+
+  public void close() {
+  }
+
+  /**
+   * Run the fixer on any crawl database and segments specified.
+   */
+  public void update(Path crawlDb, Path[] segments)
+    throws IOException {
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    // run the crawl database through the repr fixer
+    if (crawlDb != null) {
+
+      LOG.info("Running ReprUtilFixer " + crawlDb);
+      Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME);
+      Path newCrawlDb = new Path(crawlDb,
+        Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+      JobConf updater = new NutchJob(conf);
+      updater.setJobName("ReprUtilFixer: " + crawlDb.toString());
+      FileInputFormat.addInputPath(updater, crawlDbCurrent);
+      FileOutputFormat.setOutputPath(updater, newCrawlDb);
+      updater.setInputFormat(SequenceFileInputFormat.class);
+      updater.setReducerClass(ReprUrlFixer.class);
+      updater.setOutputKeyClass(Text.class);
+      updater.setOutputValueClass(CrawlDatum.class);
+      updater.setOutputFormat(MapFileOutputFormat.class);
+
+      try {
+        JobClient.runJob(updater);
+        LOG.info("Installing new crawldb " + crawlDb);
+        CrawlDb.install(updater, crawlDb);
+      }
+      catch (IOException e) {
+        LOG.error(StringUtils.stringifyException(e));
+        throw e;
+      }
+    }
+
+    // run the segments through the repr fixer, logic will be run on both the
+    // crawl_parse and the crawl_fetch directories for every segment specified
+    if (segments != null) {
+
+      for (int i = 0; i < segments.length; i++) {
+
+        Path segment = segments[i];
+        LOG.info("Running ReprUtilFixer " + segment + " fetch");
+        Path segFetch = new Path(segment, CrawlDatum.FETCH_DIR_NAME);
+        Path newSegFetch = new Path(segment, CrawlDatum.FETCH_DIR_NAME + "-"
+          + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+        JobConf fetch = new NutchJob(conf);
+        fetch.setJobName("ReprUtilFixer: " + segment.toString());
+        FileInputFormat.addInputPath(fetch, segFetch);
+        FileOutputFormat.setOutputPath(fetch, newSegFetch);
+        fetch.setInputFormat(SequenceFileInputFormat.class);
+        fetch.setReducerClass(ReprUrlFixer.class);
+        fetch.setOutputKeyClass(Text.class);
+        fetch.setOutputValueClass(CrawlDatum.class);
+        fetch.setOutputFormat(MapFileOutputFormat.class);
+
+        try {
+          JobClient.runJob(fetch);
+          LOG.info("Installing new segment fetch directory " + newSegFetch);
+          FSUtils.replace(fs, segFetch, newSegFetch, true);
+          LOG.info("ReprUrlFixer: finished installing segment fetch directory");
+        }
+        catch (IOException e) {
+          LOG.error(StringUtils.stringifyException(e));
+          throw e;
+        }
+
+        LOG.info("Running ReprUtilFixer " + segment + " parse");
+        Path segParse = new Path(segment, CrawlDatum.PARSE_DIR_NAME);
+        Path newSegParse = new Path(segment, CrawlDatum.PARSE_DIR_NAME + "-"
+          + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+        JobConf parse = new NutchJob(conf);
+        parse.setJobName("ReprUtilFixer: " + segment.toString());
+        FileInputFormat.addInputPath(parse, segParse);
+        FileOutputFormat.setOutputPath(parse, newSegParse);
+        parse.setInputFormat(SequenceFileInputFormat.class);
+        parse.setReducerClass(ReprUrlFixer.class);
+        parse.setOutputKeyClass(Text.class);
+        parse.setOutputValueClass(CrawlDatum.class);
+        parse.setOutputFormat(MapFileOutputFormat.class);
+
+        try {
+          JobClient.runJob(parse);
+          LOG.info("Installing new segment parse directry " + newSegParse);
+          FSUtils.replace(fs, segParse, newSegParse, true);
+          LOG.info("ReprUrlFixer: finished installing segment parse directory");
+        }
+        catch (IOException e) {
+          LOG.error(StringUtils.stringifyException(e));
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * Runs The ReprUrlFixer.
+   */
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new ReprUrlFixer(),
+      args);
+    System.exit(res);
+  }
+
+  /**
+   * Parse command line options and execute the main update logic.
+   */
+  public int run(String[] args)
+    throws Exception {
+
+    Options options = new Options();
+    Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+      "show this help message").create("help");
+    Option crawlDbOpts = OptionBuilder.withArgName("crawldb").hasArg().withDescription(
+      "the crawldb to use").create("crawldb");
+    Option segOpts = OptionBuilder.withArgName("segment").hasArgs().withDescription(
+      "the segment(s) to use").create("segment");
+    options.addOption(helpOpts);
+    options.addOption(crawlDbOpts);
+    options.addOption(segOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      // parse out common line arguments and make sure either a crawldb or a
+      // segment are specified
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help")
+        || (!line.hasOption("crawldb") && !line.hasOption("segment"))) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("ReprUtilFixer", options);
+        return -1;
+      }
+
+      // create paths for all of the segments specified, multiple segments may
+      // be run at once
+      String crawlDb = line.getOptionValue("crawldb");
+      String[] segments = line.getOptionValues("segment");
+      Path[] segPaths = new Path[segments != null ? segments.length : 0];
+      if (segments != null) {
+        for (int i = 0; i < segments.length; i++) {
+          segPaths[i] = new Path(segments[i]);
+        }
+      }
+      update(new Path(crawlDb), segPaths);
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("ReprUtilFixer: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
\ No newline at end of file
Index: src/java/org/apache/nutch/util/FSUtils.java
===================================================================
--- src/java/org/apache/nutch/util/FSUtils.java	(revision 0)
+++ src/java/org/apache/nutch/util/FSUtils.java	(revision 0)
@@ -0,0 +1,83 @@
+package org.apache.nutch.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Utility methods for common filesystem operations.
+ */
+public class FSUtils {
+
+  /**
+   * Replaces the current path with the new path and if set removes the old
+   * path. If removeOld is set to false then the old path will be set to the
+   * name current.old.
+   * 
+   * @param fs The FileSystem.
+   * @param current The end path, the one being replaced.
+   * @param replacement The path to replace with.
+   * @param removeOld True if we are removing the current path.
+   * 
+   * @throws IOException If an error occurs during replacement.
+   */
+  public static void replace(FileSystem fs, Path current, Path replacement,
+    boolean removeOld)
+    throws IOException {
+
+    // rename any current path to old
+    Path old = new Path(current + ".old");
+    if (fs.exists(current)) {
+      fs.rename(current, old);
+    }
+
+    // rename the new path to current and remove the old path if needed
+    fs.rename(replacement, current);
+    if (fs.exists(old) && removeOld) {
+      fs.delete(old, true);
+    }
+  }
+
+  /**
+   * Closes a group of SequenceFile readers.
+   * 
+   * @param readers The SequenceFile readers to close.
+   * @throws IOException If an error occurs while closing a reader.
+   */
+  public static void closeReaders(SequenceFile.Reader[] readers)
+    throws IOException {
+    
+    // loop through the readers, closing one by one
+    if (readers != null) {
+      for (int i = 0; i < readers.length; i++) {
+        SequenceFile.Reader reader = readers[i];
+        if (reader != null) {
+          reader.close();
+        }
+      }
+    }
+  }
+
+  /**
+   * Closes a group of MapFile readers.
+   * 
+   * @param readers The MapFile readers to close.
+   * @throws IOException If an error occurs while closing a reader.
+   */
+  public static void closeReaders(MapFile.Reader[] readers)
+    throws IOException {
+    
+    // loop through the readers closing one by one
+    if (readers != null) {
+      for (int i = 0; i < readers.length; i++) {
+        MapFile.Reader reader = readers[i];
+        if (reader != null) {
+          reader.close();
+        }
+      }
+    }
+  }
+}
Index: src/java/org/apache/nutch/util/URLUtil.java
===================================================================
--- src/java/org/apache/nutch/util/URLUtil.java	(revision 702976)
+++ src/java/org/apache/nutch/util/URLUtil.java	(working copy)
@@ -29,185 +29,340 @@
 
   private static Pattern IP_PATTERN = Pattern.compile("(\\d{1,3}\\.){3}(\\d{1,3})");
 
-  /** Returns the domain name of the url. The domain name of a url is
-   *  the substring of the url's hostname, w/o subdomain names. As an
-   *  example <br><code>
+  /**
+   * Returns the domain name of the url. The domain name of a url is the
+   * substring of the url's hostname, w/o subdomain names. As an example <br>
+   * <code>
    *  getDomainName(conf, new URL(http://lucene.apache.org/))
    *  </code><br>
-   *  will return <br><code> apache.org</code>
-   *   */
+   * will return <br>
+   * <code> apache.org</code>
+   */
   public static String getDomainName(URL url) {
     DomainSuffixes tlds = DomainSuffixes.getInstance();
     String host = url.getHost();
-    //it seems that java returns hostnames ending with .
-    if(host.endsWith("."))
+    // it seems that java returns hostnames ending with .
+    if (host.endsWith("."))
       host = host.substring(0, host.length() - 1);
-    if(IP_PATTERN.matcher(host).matches())
+    if (IP_PATTERN.matcher(host).matches())
       return host;
-    
+
     int index = 0;
     String candidate = host;
-    for(;index >= 0;) {
+    for (; index >= 0;) {
       index = candidate.indexOf('.');
-      String subCandidate = candidate.substring(index+1); 
-      if(tlds.isDomainSuffix(subCandidate)) {
-        return candidate; 
+      String subCandidate = candidate.substring(index + 1);
+      if (tlds.isDomainSuffix(subCandidate)) {
+        return candidate;
       }
       candidate = subCandidate;
     }
     return candidate;
   }
 
-  /** Returns the domain name of the url. The domain name of a url is
-   *  the substring of the url's hostname, w/o subdomain names. As an
-   *  example <br><code>
+  /**
+   * Returns the domain name of the url. The domain name of a url is the
+   * substring of the url's hostname, w/o subdomain names. As an example <br>
+   * <code>
    *  getDomainName(conf, new http://lucene.apache.org/)
    *  </code><br>
-   *  will return <br><code> apache.org</code>
+   * will return <br>
+   * <code> apache.org</code>
+   * 
    * @throws MalformedURLException
    */
-  public static String getDomainName(String url) throws MalformedURLException {
+  public static String getDomainName(String url)
+    throws MalformedURLException {
     return getDomainName(new URL(url));
   }
 
-  /** Returns whether the given urls have the same domain name.
-   * As an example, <br>
+  /**
+   * Returns whether the given urls have the same domain name. As an example, <br>
    * <code> isSameDomain(new URL("http://lucene.apache.org")
    * , new URL("http://people.apache.org/"))
    * <br> will return true. </code>
-   *
+   * 
    * @return true if the domain names are equal
    */
   public static boolean isSameDomainName(URL url1, URL url2) {
     return getDomainName(url1).equalsIgnoreCase(getDomainName(url2));
   }
 
-  /**Returns whether the given urls have the same domain name.
-  * As an example, <br>
-  * <code> isSameDomain("http://lucene.apache.org"
-  * ,"http://people.apache.org/")
-  * <br> will return true. </code>
-  * @return true if the domain names are equal
-  * @throws MalformedURLException
-  */
+  /**
+   * Returns whether the given urls have the same domain name. As an example, <br>
+   * <code> isSameDomain("http://lucene.apache.org"
+   * ,"http://people.apache.org/")
+   * <br> will return true. </code>
+   * 
+   * @return true if the domain names are equal
+   * @throws MalformedURLException
+   */
   public static boolean isSameDomainName(String url1, String url2)
     throws MalformedURLException {
     return isSameDomainName(new URL(url1), new URL(url2));
   }
 
-  /** Returns the {@link DomainSuffix} corresponding to the
-   * last public part of the hostname
+  /**
+   * Returns the {@link DomainSuffix} corresponding to the last public part of
+   * the hostname
    */
   public static DomainSuffix getDomainSuffix(URL url) {
     DomainSuffixes tlds = DomainSuffixes.getInstance();
     String host = url.getHost();
-    if(IP_PATTERN.matcher(host).matches())
+    if (IP_PATTERN.matcher(host).matches())
       return null;
-    
+
     int index = 0;
     String candidate = host;
-    for(;index >= 0;) {
+    for (; index >= 0;) {
       index = candidate.indexOf('.');
-      String subCandidate = candidate.substring(index+1);
+      String subCandidate = candidate.substring(index + 1);
       DomainSuffix d = tlds.get(subCandidate);
-      if(d != null) {
-        return d; 
+      if (d != null) {
+        return d;
       }
       candidate = subCandidate;
     }
     return null;
   }
 
-  /** Returns the {@link DomainSuffix} corresponding to the
-   * last public part of the hostname
+  /**
+   * Returns the {@link DomainSuffix} corresponding to the last public part of
+   * the hostname
    */
-  public static DomainSuffix getDomainSuffix(String url) throws MalformedURLException {
+  public static DomainSuffix getDomainSuffix(String url)
+    throws MalformedURLException {
     return getDomainSuffix(new URL(url));
   }
 
-  /** Partitions of the hostname of the url by "."  */
+  /** Partitions of the hostname of the url by "." */
   public static String[] getHostSegments(URL url) {
     String host = url.getHost();
-    //return whole hostname, if it is an ipv4
-    //TODO : handle ipv6
-    if(IP_PATTERN.matcher(host).matches())
-      return new String[] {host};
+    // return whole hostname, if it is an ipv4
+    // TODO : handle ipv6
+    if (IP_PATTERN.matcher(host).matches())
+      return new String[]{host};
     return host.split("\\.");
   }
 
-  /** Partitions of the hostname of the url by "."
-   * @throws MalformedURLException */
-  public static String[] getHostSegments(String url) throws MalformedURLException {
-   return getHostSegments(new URL(url));
+  /**
+   * Partitions of the hostname of the url by "."
+   * 
+   * @throws MalformedURLException
+   */
+  public static String[] getHostSegments(String url)
+    throws MalformedURLException {
+    return getHostSegments(new URL(url));
   }
-
-  /** Given two urls (source and destination of the redirect),
-   * returns the representative one.
-   *
-   * <p>Implements the algorithm described here:
+  
+  /**
+   * <p>Given two urls, a src and a destination of a redirect, it returns the 
+   * representative url.<p>
+   * 
+   * <p>This method implements an extended version of the algorithm used by the
+   * Yahoo! Slurp crawler described here:<br>
+   * <a href=
+   * "http://help.yahoo.com/l/nz/yahooxtra/search/webcrawler/slurp-11.html"> How
+   * does the Yahoo! webcrawler handle redirects?</a> <br>
    * <br>
-   * <a href="http://help.yahoo.com/l/nz/yahooxtra/search/webcrawler/slurp-11.html">
-   * How does the Yahoo! webcrawler handle redirects?</a>
-   * <br><br>
-   * The algorithm is as follows:
    * <ol>
-   *  <li>Choose target url if either url is malformed.</li>
-   *  <li>When a page in one domain redirects to a page in another domain,
-   *  choose the "target" URL.</li>
-   *  <li>When a top-level page in a domain presents a permanent redirect
-   *  to a page deep within the same domain, choose the "source" URL.</li>
-   *  <li>When a page deep within a domain presents a permanent redirect
-   *  to a page deep within the same domain, choose the "target" URL.</li>
-   *  <li>When a page in a domain presents a temporary redirect to
-   *  another page in the same domain, choose the "source" URL.<li>
-   * <ol>
-   * </p>
-   *
-   * @param src Source url of redirect
-   * @param dst Destination url of redirect
-   * @param temp Flag to indicate if redirect is temporary
-   * @return Representative url (either src or dst)
+   * <li>Choose target url if either url is malformed.</li>
+   * <li>If different domains the keep the destination whether or not the 
+   * redirect is temp or perm</li>
+   * <ul><li>a.com -> b.com*</li></ul>
+   * <li>If the redirect is permanent and the source is root, keep the source.</li>
+   * <ul><li>*a.com -> a.com?y=1 || *a.com -> a.com/xyz/index.html</li></ul>
+   * <li>If the redirect is permanent and the source is not root and the 
+   * destination is root, keep the destination</li>
+   * <ul><li>a.com/xyz/index.html -> a.com*</li></ul>
+   * <li>If the redirect is permanent and neither the source nor the destination
+   * is root, then keep the destination</li>
+   * <ul><li>a.com/xyz/index.html -> a.com/abc/page.html*</li></ul>
+   * <li>If the redirect is temporary and source is root and destination is not
+   * root, then keep the source</li>
+   * <ul><li>*a.com -> a.com/xyz/index.html</li></ul>
+   * <li>If the redirect is temporary and source is not root and destination is
+   * root, then keep the destination</li>
+   * <ul><li>a.com/xyz/index.html -> a.com*</li></ul>
+   * <li>If the redirect is temporary and neither the source or the destination
+   * is root, then keep the shortest url.  First check for the shortest host,
+   * and if both are equal then check by path.  Path is first by length then by
+   * the number of / path separators.</li>
+   * <ul>
+   * <li>a.com/xyz/index.html -> a.com/abc/page.html*</li>
+   * <li>*www.a.com/xyz/index.html -> www.news.a.com/xyz/index.html</li>
+   * </ul>
+   * <li>If the redirect is temporary and both the source and the destination
+   * are root, then keep the shortest sub-domain</li>
+   * <ul><li>*www.a.com -> www.news.a.com</li></ul>
+   * <br>
+   * While not in this logic there is a further piece of representative url 
+   * logic that occurs during indexing and after scoring.  During creation of 
+   * the basic fields before indexing, if a url has a representative url stored
+   * we check both the url and its representative url (which should never be 
+   * the same) against their linkrank scores and the highest scoring one is 
+   * kept as the url and the lower scoring one is held as the orig url inside 
+   * of the index.
+   * 
+   * @param src The source url.
+   * @param dst The destination url.
+   * @param temp Is the redirect a temporary redirect.
+   * 
+   * @return String The representative url.
    */
   public static String chooseRepr(String src, String dst, boolean temp) {
+
+    // validate both are well formed urls
     URL srcUrl;
     URL dstUrl;
     try {
       srcUrl = new URL(src);
       dstUrl = new URL(dst);
-    } catch (MalformedURLException e) {
+    }
+    catch (MalformedURLException e) {
       return dst;
     }
 
+    // get the source and destination domain, host, and page
     String srcDomain = URLUtil.getDomainName(srcUrl);
     String dstDomain = URLUtil.getDomainName(dstUrl);
+    String srcHost = srcUrl.getHost();
+    String dstHost = dstUrl.getHost();
+    String srcFile = srcUrl.getFile();
+    String dstFile = dstUrl.getFile();
 
+    // are the source and destination the root path url.com/ or url.com
+    boolean srcRoot = (srcFile.equals("/") || srcFile.length() == 0);
+    boolean destRoot = (dstFile.equals("/") || dstFile.length() == 0);
+
+    // 1) different domain them keep dest, temp or perm
+    // a.com -> b.com*
+    //    
+    // 2) permanent and root, keep src
+    // *a.com -> a.com?y=1 || *a.com -> a.com/xyz/index.html
+    //      
+    // 3) permanent and not root and dest root, keep dest
+    // a.com/xyz/index.html -> a.com*
+    //      
+    // 4) permanent and neither root keep dest
+    // a.com/xyz/index.html -> a.com/abc/page.html*
+    //      
+    // 5) temp and root and dest not root keep src
+    // *a.com -> a.com/xyz/index.html
+    //  
+    // 7) temp and not root and dest root keep dest
+    // a.com/xyz/index.html -> a.com*
+    //  
+    // 8) temp and neither root, keep shortest, if hosts equal by path else by
+    // hosts. paths are first by length then by number of / separators
+    // a.com/xyz/index.html -> a.com/abc/page.html*
+    // *www.a.com/xyz/index.html -> www.news.a.com/xyz/index.html
+    //  
+    // 9) temp and both root keep shortest sub domain
+    // *www.a.com -> www.news.a.com
+
+    // if we are dealing with a redirect from one domain to another keep the
+    // destination
     if (!srcDomain.equals(dstDomain)) {
       return dst;
     }
 
-    String srcFile = srcUrl.getFile();
+    // if it is a permanent redirect
+    if (!temp) {
+      
+      // if source is root return source, otherwise destination
+      if (srcRoot) {
+        return src;
+      }
+      else {
+        return dst;
+      }
+    }
+    else { // temporary redirect
+
+      // source root and destination not root
+      if (srcRoot && !destRoot) {
+        return src;
+      }
+      else if (!srcRoot && destRoot) { // destination root and source not
+        return dst;
+      }
+      else if (!srcRoot && !destRoot && (srcHost.equals(dstHost))) {
+
+        // source and destination hosts are the same, check paths, host length
+        int numSrcPaths = srcFile.split("/").length;
+        int numDstPaths = dstFile.split("/").length;
+        if (numSrcPaths != numDstPaths) {
+          return (numDstPaths < numSrcPaths ? dst : src);
+        }
+        else {
+          int srcPathLength = srcFile.length();
+          int dstPathLength = dstFile.length();
+          return (dstPathLength < srcPathLength ? dst : src);
+        }
+      }
+      else {
 
-    if (!temp && srcFile.equals("/")) {
-      return src;
+        // different host names and both root take the shortest
+        int numSrcSubs = srcHost.split("\\.").length;
+        int numDstSubs = dstHost.split("\\.").length;
+        return (numDstSubs < numSrcSubs ? dst : src);
+      }
     }
-
-    return temp ? src : dst;
   }
 
   /** For testing */
-  public static void main(String[] args){
+  public static void main(String[] args) {
+
+//    if (args.length != 1) {
+//      System.err.println("Usage : URLUtil <url>");
+//      return;
+//    }
+//
+//    String url = args[0];
+//    try {
+//      System.out.println(URLUtil.getDomainName(new URL(url)));
+//    }
+//    catch (MalformedURLException ex) {
+//      ex.printStackTrace();
+//    }
     
-    if(args.length!=1) {
-      System.err.println("Usage : URLUtil <url>");
-      return ;
+    System.out.println(chooseRepr("http://milogardner@yahoo.com/", "http://www.yahoo.com/", true));
+  }
+
+  /**
+   * Returns the lowercased hostname for the url or null if the url is not well
+   * formed.
+   * 
+   * @param url The url to check.
+   * @return String The hostname for the url.
+   */
+  public static String getHost(String url) {
+    try {
+      return new URL(url).getHost().toLowerCase();
     }
-    
-    String url = args[0];
+    catch (MalformedURLException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Returns the page for the url.  The page consists of the protocol, host,
+   * and path, but does not include the query string.  The host is lowercased
+   * but the path is not.
+   * 
+   * @param url The url to check.
+   * @return String The page for the url.
+   */
+  public static String getPage(String url) {
     try {
-      System.out.println(URLUtil.getDomainName(new URL(url)));
+      // get the full url, and replace the query string with and empty string
+      url = url.toLowerCase();
+      String queryStr = new URL(url).getQuery();
+      return (queryStr != null) ? url.replace("?" + queryStr, "") : url;
     }
-    catch (MalformedURLException ex) {
-      ex.printStackTrace();
+    catch (MalformedURLException e) {
+      return null;
     }
   }
 }
Index: src/test/org/apache/nutch/util/TestURLUtil.java
===================================================================
--- src/test/org/apache/nutch/util/TestURLUtil.java	(revision 702976)
+++ src/test/org/apache/nutch/util/TestURLUtil.java	(working copy)
@@ -22,14 +22,17 @@
 import junit.framework.TestCase;
 
 /** Test class for URLUtil */
-public class TestURLUtil extends TestCase {
+public class TestURLUtil
+  extends TestCase {
 
   @Override
-  protected void setUp() throws Exception {
+  protected void setUp()
+    throws Exception {
     super.setUp();
   }
 
-  public void testGetDomainName() throws Exception{
+  public void testGetDomainName()
+    throws Exception {
 
     URL url = null;
 
@@ -51,7 +54,7 @@
     url = new URL("http://www.example.co.uk.com");
     assertEquals("uk.com", URLUtil.getDomainName(url));
 
-    //"nn" is not a tld
+    // "nn" is not a tld
     url = new URL("http://example.com.nn");
     assertEquals("nn", URLUtil.getDomainName(url));
 
@@ -60,25 +63,26 @@
 
     url = new URL("http://www.edu.tr.xyz");
     assertEquals("xyz", URLUtil.getDomainName(url));
-    
+
     url = new URL("http://www.example.c.se");
     assertEquals("example.c.se", URLUtil.getDomainName(url));
 
-    //plc.co.im is listed as a domain suffix
+    // plc.co.im is listed as a domain suffix
     url = new URL("http://www.example.plc.co.im");
     assertEquals("example.plc.co.im", URLUtil.getDomainName(url));
-    
-    //2000.hu is listed as a domain suffix
+
+    // 2000.hu is listed as a domain suffix
     url = new URL("http://www.example.2000.hu");
     assertEquals("example.2000.hu", URLUtil.getDomainName(url));
-    
-    //test non-ascii
+
+    // test non-ascii
     url = new URL("http://www.example.商業.tw");
     assertEquals("example.商業.tw", URLUtil.getDomainName(url));
-    
+
   }
 
-  public void testGetDomainSuffix() throws Exception{
+  public void testGetDomainSuffix()
+    throws Exception {
     URL url = null;
 
     url = new URL("http://lucene.apache.org/nutch");
@@ -96,7 +100,7 @@
     url = new URL("http://www.example.co.uk.com");
     assertEquals("com", URLUtil.getDomainSuffix(url).getDomain());
 
-    //"nn" is not a tld
+    // "nn" is not a tld
     url = new URL("http://example.com.nn");
     assertNull(URLUtil.getDomainSuffix(url));
 
@@ -105,59 +109,108 @@
 
     url = new URL("http://www.edu.tr.xyz");
     assertNull(URLUtil.getDomainSuffix(url));
-    
+
     url = new URL("http://subdomain.example.edu.tr");
     assertEquals("edu.tr", URLUtil.getDomainSuffix(url).getDomain());
-    
+
     url = new URL("http://subdomain.example.presse.fr");
     assertEquals("presse.fr", URLUtil.getDomainSuffix(url).getDomain());
-    
+
     url = new URL("http://subdomain.example.presse.tr");
     assertEquals("tr", URLUtil.getDomainSuffix(url).getDomain());
-   
-    //plc.co.im is listed as a domain suffix
+
+    // plc.co.im is listed as a domain suffix
     url = new URL("http://www.example.plc.co.im");
     assertEquals("plc.co.im", URLUtil.getDomainSuffix(url).getDomain());
-    
-    //2000.hu is listed as a domain suffix
+
+    // 2000.hu is listed as a domain suffix
     url = new URL("http://www.example.2000.hu");
     assertEquals("2000.hu", URLUtil.getDomainSuffix(url).getDomain());
-    
-    //test non-ascii
+
+    // test non-ascii
     url = new URL("http://www.example.商業.tw");
     assertEquals("商業.tw", URLUtil.getDomainSuffix(url).getDomain());
-    
+
   }
-  
-  public void testGetHostSegments() throws Exception{
+
+  public void testGetHostSegments()
+    throws Exception {
     URL url;
     String[] segments;
-    
+
     url = new URL("http://subdomain.example.edu.tr");
     segments = URLUtil.getHostSegments(url);
     assertEquals("subdomain", segments[0]);
     assertEquals("example", segments[1]);
     assertEquals("edu", segments[2]);
     assertEquals("tr", segments[3]);
-    
+
     url = new URL("http://");
     segments = URLUtil.getHostSegments(url);
     assertEquals(1, segments.length);
     assertEquals("", segments[0]);
-    
+
     url = new URL("http://140.211.11.130/foundation/contributing.html");
     segments = URLUtil.getHostSegments(url);
     assertEquals(1, segments.length);
     assertEquals("140.211.11.130", segments[0]);
-    
-    //test non-ascii
+
+    // test non-ascii
     url = new URL("http://www.example.商業.tw");
     segments = URLUtil.getHostSegments(url);
     assertEquals("www", segments[0]);
     assertEquals("example", segments[1]);
     assertEquals("商業", segments[2]);
     assertEquals("tw", segments[3]);
+
+  }
+
+  public void testChooseRepr()
+    throws Exception {
+    
+    String aDotCom = "http://www.a.com";
+    String bDotCom = "http://www.b.com";
+    String aSubDotCom = "http://www.news.a.com";
+    String aQStr = "http://www.a.com?y=1";
+    String aPath = "http://www.a.com/xyz/index.html";
+    String aPath2 = "http://www.a.com/abc/page.html";
+    String aPath3 = "http://www.news.a.com/abc/page.html";
+    
+    // 1) different domain them keep dest, temp or perm
+    // a.com -> b.com*
+    assertEquals(bDotCom, URLUtil.chooseRepr(aDotCom, bDotCom, true));
+    assertEquals(bDotCom, URLUtil.chooseRepr(aDotCom, bDotCom, false));
+    
+    // 2) permanent and root, keep src
+    // *a.com -> a.com?y=1 || *a.com -> a.com/xyz/index.html
+    assertEquals(aDotCom, URLUtil.chooseRepr(aDotCom, aQStr, false));
+    assertEquals(aDotCom, URLUtil.chooseRepr(aDotCom, aPath, false));
+    
+    //3) permanent and not root and dest root, keep dest
+    //a.com/xyz/index.html -> a.com*
+    assertEquals(aDotCom, URLUtil.chooseRepr(aPath, aDotCom, false));
     
+    //4) permanent and neither root keep dest
+    // a.com/xyz/index.html -> a.com/abc/page.html*
+    assertEquals(aPath2, URLUtil.chooseRepr(aPath, aPath2, false));
+    
+    //5) temp and root and dest not root keep src
+    //*a.com -> a.com/xyz/index.html
+    assertEquals(aDotCom, URLUtil.chooseRepr(aDotCom, aPath, true));
+    
+    //6) temp and not root and dest root keep dest
+    // a.com/xyz/index.html -> a.com*
+    assertEquals(aDotCom, URLUtil.chooseRepr(aPath, aDotCom, true));
+
+    //7) temp and neither root, keep shortest, if hosts equal by path else by hosts
+    //  a.com/xyz/index.html -> a.com/abc/page.html*
+    // *www.a.com/xyz/index.html -> www.news.a.com/xyz/index.html
+    assertEquals(aPath2, URLUtil.chooseRepr(aPath, aPath2, true));
+    assertEquals(aPath, URLUtil.chooseRepr(aPath, aPath3, true));
+
+    //8) temp and both root keep shortest sub domain
+    // *www.a.com -> www.news.a.com
+    assertEquals(aDotCom, URLUtil.chooseRepr(aDotCom, aSubDotCom, true));
   }
 
 }

