Index: src/java/org/apache/nutch/crawl/OutlinkDb.java
===================================================================
--- src/java/org/apache/nutch/crawl/OutlinkDb.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/OutlinkDb.java	(revision 0)
@@ -0,0 +1,216 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.Outlinks;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.URLUtil;
+
+public class OutlinkDb
+  extends Configured
+  implements Tool, Mapper<Text, Writable, Text, Outlinks>,
+  Reducer<Text, Outlinks, Text, Outlinks> {
+
+  public static final Log LOG = LogFactory.getLog(OutlinkDb.class);
+  public static final String LOCK_NAME = ".locked";
+  private boolean ignoreInternal = true;
+
+  private JobConf conf;
+
+  public OutlinkDb() {
+  }
+
+  public OutlinkDb(Configuration conf) {
+    setConf(conf);
+  }
+
+  public void configure(JobConf conf) {
+    this.conf = conf;
+    ignoreInternal = conf.getBoolean("outlink.ignore.internal", true);
+  }
+
+  public void close() {
+  }
+
+  public void map(Text key, Writable value,
+    OutputCollector<Text, Outlinks> output, Reporter reporter)
+    throws IOException {
+
+    if (value instanceof ParseData) {
+      ParseData data = (ParseData)value;
+      Outlink[] outlinkAr = data.getOutlinks();
+      Outlinks outlinks = new Outlinks(outlinkAr);
+      outlinks.setTimestamp(System.currentTimeMillis());
+      output.collect(key, outlinks);
+    }
+    else if (value instanceof CrawlDatum) {
+      output.collect(key, new Outlinks());
+    }
+    else if (value instanceof Outlinks) {
+      output.collect(key, (Outlinks)value);
+    }
+  }
+
+  public void reduce(Text key, Iterator<Outlinks> values,
+    OutputCollector<Text, Outlinks> output, Reporter reporter)
+    throws IOException {
+
+    Outlinks mostRecent = null;
+    while (values.hasNext()) {
+      Outlinks next = values.next();
+      long timestamp = next.getTimestamp();
+      if (mostRecent == null
+        || (mostRecent.getNumOutlinks() == 0 && next.getNumOutlinks() > 0)
+        || mostRecent.getTimestamp() < timestamp) {
+        mostRecent = next;
+      }
+    }
+    
+    if (ignoreInternal) {
+      String domain = URLUtil.getDomainName(key.toString());
+      List<Outlink> externalList = new ArrayList<Outlink>();
+      Outlink[] outlinks = mostRecent.getOutlinks();
+      for (int i = 0; i < outlinks.length; i++) {
+        Outlink current = outlinks[i];
+        String curDomain = URLUtil.getDomainName(current.getToUrl());
+        if (curDomain != null && !curDomain.equalsIgnoreCase(domain)) {
+          externalList.add(current);
+        }
+      }
+      Outlink[] externals = externalList.toArray(new Outlink[externalList.size()]);
+      mostRecent.setOutlinks(externals);
+      mostRecent.setTimestamp(System.currentTimeMillis());
+    }
+    output.collect(key, mostRecent);
+  }
+
+  public void update(Path crawldb, Path outlinkdb, Path segments)
+    throws IOException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("OutlinkDb: starting");
+      LOG.info("OutlinkDb: crawldb: " + crawldb);
+      LOG.info("OutlinkDb: outlinkdb: " + outlinkdb);
+      LOG.info("OutlinkDb: segments: " + segments);
+    }
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+    Path lock = new Path(outlinkdb, LOCK_NAME);
+    boolean outlinkDbExists = fs.exists(outlinkdb);
+    if (outlinkDbExists) {
+      LockUtil.createLockFile(fs, lock, false);
+    }
+
+    Path newOutlinkDb = new Path(outlinkdb + "-"
+      + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(conf);
+    job.setJobName("Outlinkdb " + outlinkdb);
+
+    Path parseData = new Path(segments, ParseData.DIR_NAME);
+    if (fs.exists(parseData)) {
+      LOG.info("OutlinkDb: adding input: " + parseData);
+      job.addInputPath(parseData);
+    }
+    if (outlinkDbExists) {
+      LOG.info("OutlinkDb: adding input: " + outlinkdb);
+      job.addInputPath(outlinkdb);
+    }
+    
+    Path crawlDbCurrent = new Path(crawldb, CrawlDb.CURRENT_NAME);
+    if (fs.exists(crawlDbCurrent)) {
+      LOG.info("OutlinkDb: adding input: " + crawlDbCurrent);
+      job.addInputPath(crawlDbCurrent);
+    }
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setMapperClass(OutlinkDb.class);
+    job.setReducerClass(OutlinkDb.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Outlinks.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Outlinks.class);
+    job.setOutputPath(newOutlinkDb);
+    job.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      LOG.info("OutlinkDb: running");
+      JobClient.runJob(job);
+    }
+    catch (IOException e) {
+      LockUtil.removeLockFile(fs, lock);
+      if (fs.exists(job.getOutputPath())) {
+        fs.delete(job.getOutputPath());
+      }
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    LOG.info("OutlinkDb: installing " + outlinkdb);
+    Path old = new Path(outlinkdb + ".old");
+    if (fs.exists(outlinkdb)) {
+      fs.rename(outlinkdb, old);
+    }
+    fs.rename(newOutlinkDb, outlinkdb);
+    if (fs.exists(old)) {
+      fs.delete(old);
+    }
+    LockUtil.removeLockFile(fs, lock);
+    LOG.info("OutlinkDb: finished");
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner
+      .run(NutchConfiguration.create(), new OutlinkDb(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args)
+    throws Exception {
+    if (args.length < 3) {
+      System.out.println("Usage: OutlinkDb <crawldb> <outlinkdb> <segment>");
+      System.out.println("\tcrawldb\tThe crawldb to pull from");
+      System.out.println("\toutlinkdb\tThe OutlinkDb to update");
+      System.out.println("\tsegments\tSegments used to update the OutlinkDb");
+      return -1;
+    }
+
+    try {
+      update(new Path(args[0]), new Path(args[1]), new Path(args[2]));
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("OutlinkDb: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
Index: src/java/org/apache/nutch/metadata/Metadata.java
===================================================================
--- src/java/org/apache/nutch/metadata/Metadata.java	(revision 666721)
+++ src/java/org/apache/nutch/metadata/Metadata.java	(working copy)
@@ -124,6 +124,29 @@
   }
 
   /**
+   * Add an array metadata name/value mapping.
+   * Add the specified value to the list of values associated to the
+   * specified metadata name.
+   *
+   * @param name the metadata name.
+   * @param value the metadata value.
+   */
+  public void addAll(final String name, final String[] newValues) {
+    String[] values = metadata.get(name);
+    if (values == null) {
+      String[] allValues = new String[newValues.length];
+      System.arraycopy(newValues, 0, allValues, 0, newValues.length);
+      metadata.put(name, allValues);
+    } 
+    else {
+      String[] allValues = new String[values.length + newValues.length];
+      System.arraycopy(values, 0, allValues, 0, values.length);
+      System.arraycopy(newValues, 0, allValues, values.length, newValues.length);
+      metadata.put(name, allValues);
+    }
+  }
+  
+  /**
    * Copy All key-value pairs from properties.
    * @param properties properties to copy from
    */
Index: src/java/org/apache/nutch/parse/Outlink.java
===================================================================
--- src/java/org/apache/nutch/parse/Outlink.java	(revision 666721)
+++ src/java/org/apache/nutch/parse/Outlink.java	(working copy)
@@ -17,15 +17,17 @@
 
 package org.apache.nutch.parse;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.net.MalformedURLException;
 
-import org.apache.hadoop.io.*;
-import org.apache.nutch.net.URLNormalizers;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 
 /* An outgoing link from a page. */
-public class Outlink implements Writable {
+public class Outlink
+  implements Writable {
 
   private String toUrl;
   private String anchor;
@@ -30,15 +32,19 @@
   private String toUrl;
   private String anchor;
 
-  public Outlink() {}
+  public Outlink() {
+  }
 
-  public Outlink(String toUrl, String anchor) throws MalformedURLException {
+  public Outlink(String toUrl, String anchor)
+    throws MalformedURLException {
     this.toUrl = toUrl;
-    if (anchor == null) anchor = "";
+    if (anchor == null)
+      anchor = "";
     this.anchor = anchor;
   }
 
-  public void readFields(DataInput in) throws IOException {
+  public void readFields(DataInput in)
+    throws IOException {
     toUrl = Text.readString(in);
     anchor = Text.readString(in);
   }
@@ -44,12 +50,14 @@
   }
 
   /** Skips over one Outlink in the input. */
-  public static void skip(DataInput in) throws IOException {
-    Text.skip(in);                                // skip toUrl
-    Text.skip(in);                                // skip anchor
+  public static void skip(DataInput in)
+    throws IOException {
+    Text.skip(in); // skip toUrl
+    Text.skip(in); // skip anchor
   }
 
-  public void write(DataOutput out) throws IOException {
+  public void write(DataOutput out)
+    throws IOException {
     Text.writeString(out, toUrl);
     Text.writeString(out, anchor);
   }
@@ -54,7 +62,8 @@
     Text.writeString(out, anchor);
   }
 
-  public static Outlink read(DataInput in) throws IOException {
+  public static Outlink read(DataInput in)
+    throws IOException {
     Outlink outlink = new Outlink();
     outlink.readFields(in);
     return outlink;
@@ -60,9 +69,13 @@
     return outlink;
   }
 
-  public String getToUrl() { return toUrl; }
-  public String getAnchor() { return anchor; }
+  public String getToUrl() {
+    return toUrl;
+  }
 
+  public String getAnchor() {
+    return anchor;
+  }
 
   public boolean equals(Object o) {
     if (!(o instanceof Outlink))
@@ -68,13 +81,12 @@
     if (!(o instanceof Outlink))
       return false;
     Outlink other = (Outlink)o;
-    return
-      this.toUrl.equals(other.toUrl) &&
-      this.anchor.equals(other.anchor);
+    return this.toUrl.equals(other.toUrl) && this.anchor.equals(other.anchor);
   }
 
   public String toString() {
-    return "toUrl: " + toUrl + " anchor: " + anchor;  // removed "\n". toString, not printLine... WD.
+    return "toUrl: " + toUrl + " anchor: " + anchor; // removed "\n". toString,
+                                                      // not printLine... WD.
   }
 
 }
Index: src/java/org/apache/nutch/parse/Outlinks.java
===================================================================
--- src/java/org/apache/nutch/parse/Outlinks.java	(revision 0)
+++ src/java/org/apache/nutch/parse/Outlinks.java	(revision 0)
@@ -0,0 +1,70 @@
+package org.apache.nutch.parse;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class Outlinks
+  implements Writable {
+
+  private Outlink[] outlinks = new Outlink[0];
+  private long timestamp = System.currentTimeMillis();
+
+  public Outlinks() {
+
+  }
+
+  public Outlinks(Outlink[] outlinks) {
+    this.outlinks = outlinks;
+  }
+
+  public Outlink[] getOutlinks() {
+    return outlinks;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public void setOutlinks(Outlink[] outlinks) {
+    this.outlinks = outlinks;
+  }
+  
+  public int getNumOutlinks() {
+    return (outlinks == null ? 0 : outlinks.length);
+  }
+
+  public void write(DataOutput out)
+    throws IOException {
+    int numOutlinks = (outlinks != null ? outlinks.length : 0);    
+    out.writeInt(numOutlinks);
+    out.writeLong(System.currentTimeMillis());
+    if (numOutlinks > 0) {
+      for (int i = 0; i < numOutlinks; i++) {
+        Outlink cur = outlinks[i];
+        cur.write(out);
+      }
+    }
+  }
+
+  public void readFields(DataInput in)
+    throws IOException {
+
+    int numOutlinks = in.readInt();
+    this.timestamp = in.readLong();
+    this.outlinks = new Outlink[numOutlinks];    
+    if (numOutlinks > 0) {
+      for (int i = 0; i < numOutlinks; i++) {
+        Outlink cur = new Outlink();
+        cur.readFields(in);
+        outlinks[i] = cur;
+      }
+    }
+  }
+}
Index: src/java/org/apache/nutch/scoring/LinkAnalysis.java
===================================================================
--- src/java/org/apache/nutch/scoring/LinkAnalysis.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/LinkAnalysis.java	(revision 0)
@@ -0,0 +1,332 @@
+package org.apache.nutch.scoring;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.Inlink;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.crawl.LinkDb;
+import org.apache.nutch.parse.Outlinks;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+public class LinkAnalysis
+  extends Configured
+  implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(LinkAnalysis.class);
+  private static final String LINK_SCORES = "link.scores.file";
+  private int numIterations = 10;
+
+  private static class LinkScore
+    implements Writable {
+
+    private float inlinkScore = 0.0f;
+    private float outlinkScore = 0.0f;
+
+    public LinkScore() {
+
+    }
+
+    public LinkScore(float inlinkScore, float outlinkScore) {
+      this.inlinkScore = inlinkScore;
+      this.outlinkScore = outlinkScore;
+    }
+
+    public float getInlinkScore() {
+      return inlinkScore;
+    }
+
+    public void setInlinkScore(float inlinkScore) {
+      this.inlinkScore = inlinkScore;
+    }
+
+    public float getOutlinkScore() {
+      return outlinkScore;
+    }
+
+    public void setOutlinkScore(float outlinkScore) {
+      this.outlinkScore = outlinkScore;
+    }
+
+    public void readFields(DataInput in)
+      throws IOException {
+      inlinkScore = in.readFloat();
+      outlinkScore = in.readFloat();
+    }
+
+    public void write(DataOutput out)
+      throws IOException {
+      out.writeFloat(inlinkScore);
+      out.writeFloat(outlinkScore);
+    }
+
+  }
+
+  private static class Initializer
+    implements Mapper<Text, Outlinks, Text, LinkScore> {
+
+    private JobConf conf;
+    private float initialScore = .50f;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      initialScore = conf.getFloat("link.analyze.initial.score", .50f);
+    }
+
+    public void map(Text key, Outlinks value,
+      OutputCollector<Text, LinkScore> output, Reporter reporter)
+      throws IOException {
+
+      int numOutlinks = value.getNumOutlinks();
+      float outlinkScore = initialScore;
+      if (numOutlinks > 0) {
+        outlinkScore = outlinkScore / numOutlinks;
+      }
+      output.collect(key, new LinkScore(initialScore, outlinkScore));
+    }
+
+    public void close() {
+    }
+  }
+
+  private static class Analyzer
+    implements Mapper<Text, Writable, Text, ObjectWritable>,
+    Reducer<Text, ObjectWritable, Text, LinkScore> {
+
+    private JobConf conf;
+    private Path linkScores = null;
+    private MapFile.Reader[] readers = null;
+    private float minScore = 0.001f;
+
+    public void configure(JobConf conf) {
+      try {
+        this.conf = conf;
+        FileSystem fs = FileSystem.get(conf);
+        linkScores = new Path(conf.get(LINK_SCORES));
+        readers = MapFileOutputFormat.getReaders(fs, linkScores, conf);
+        minScore = conf.getFloat("link.analyze.minimum.score", .001f);
+      }
+      catch (Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+        throw new IllegalArgumentException(e);
+      }
+    }
+
+    public void map(Text key, Writable value,
+      OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+      throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      objWrite.set(value);
+      output.collect(key, objWrite);
+    }
+
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+      OutputCollector<Text, LinkScore> output, Reporter reporter)
+      throws IOException {
+
+      Outlinks outlinks = null;
+      Inlinks inlinks = null;
+
+      while (values.hasNext()) {
+        ObjectWritable next = values.next();
+        Object value = next.get();
+        if (value instanceof Outlinks) {
+          outlinks = (Outlinks)value;
+        }
+        else if (value instanceof Inlinks) {
+          inlinks = (Inlinks)value;
+        }
+      }
+
+      HashPartitioner part = new HashPartitioner();
+      int numOutlinks = (outlinks == null ? 0 : outlinks.getNumOutlinks());
+      if (inlinks == null || inlinks.size() == 0) {
+        float outlinkScore = (numOutlinks > 0 ? minScore / numOutlinks
+          : minScore);
+        output.collect(key, new LinkScore(minScore, outlinkScore));
+        LOG.info(key.toString() + " score is " + minScore
+          + " for iteration with outlink score " + outlinkScore + " and "
+          + numOutlinks + " outlinks\n");
+      }
+      else {
+
+        Set<String> inlinkSet = new LinkedHashSet<String>();
+        Iterator<Inlink> inlinkit = inlinks.iterator();
+        while (inlinkit.hasNext()) {
+          Inlink inlink = inlinkit.next();
+          String url = inlink.getFromUrl();
+          inlinkSet.add(url);
+        }
+
+        float inlinkScore = 0.0f;
+        Iterator<String> inlinkSetIt = inlinkSet.iterator();
+        while (inlinkSetIt.hasNext()) {
+          String url = inlinkSetIt.next();
+          LinkScore score = new LinkScore(0.0f, 0.0f);
+          MapFileOutputFormat.getEntry(readers, part, new Text(url), score);
+          inlinkScore += score.getOutlinkScore();
+          LOG.info("Adding " + score.getOutlinkScore() + " to "
+            + key.toString() + " from " + url + " for total " + inlinkScore);
+        }
+
+        float outlinkScore = (numOutlinks > 0 ? inlinkScore / numOutlinks
+          : inlinkScore);
+        LOG.info(key.toString() + " score is " + inlinkScore
+          + " for iteration with outlink score " + outlinkScore + " and "
+          + numOutlinks + " outlinks\n");
+        output.collect(key, new LinkScore(inlinkScore, outlinkScore));
+      }
+    }
+
+    public void close()
+      throws IOException {
+
+      if (readers != null) {
+        for (int i = 0; i < readers.length; i++) {
+          MapFile.Reader reader = readers[i];
+          if (reader != null) {
+            reader.close();
+          }
+        }
+      }
+    }
+
+  }
+
+  public LinkAnalysis() {
+    super();
+  }
+
+  public LinkAnalysis(Configuration conf) {
+    super(conf);
+  }
+
+  public void close() {
+  }
+
+  public void analyze(Path outlinkdb, Path inlinkdb, Path linkscores)
+    throws IOException {
+
+    Path linkDbCurrent = new Path(inlinkdb, LinkDb.CURRENT_NAME);
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+    JobConf initializer = new NutchJob(conf);
+    initializer.setJobName("LinkAnalysis Initializer");
+    initializer.addInputPath(outlinkdb);
+    initializer.setOutputPath(linkscores);
+    initializer.setInputFormat(SequenceFileInputFormat.class);
+    initializer.setMapperClass(Initializer.class);
+    initializer.setMapOutputKeyClass(Text.class);
+    initializer.setMapOutputValueClass(LinkScore.class);
+    initializer.setOutputKeyClass(Text.class);
+    initializer.setOutputValueClass(LinkScore.class);
+    initializer.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      JobClient.runJob(initializer);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    numIterations = conf.getInt("link.analyze.num.iterations", 10);
+    for (int i = 0; i < numIterations; i++) {
+
+      LOG.info("Running analysis iteration " + i + " of " + numIterations);
+      Path newLinkScores = new Path(linkscores + "-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+      JobConf analyzer = new NutchJob(conf);
+      analyzer.setJobName("LinkAnalysis Analyzer");
+      analyzer.set(LINK_SCORES, linkscores.toString());
+      analyzer.addInputPath(outlinkdb);
+      analyzer.addInputPath(linkDbCurrent);
+      analyzer.setMapOutputKeyClass(Text.class);
+      analyzer.setMapOutputValueClass(ObjectWritable.class);
+      analyzer.setInputFormat(SequenceFileInputFormat.class);
+      analyzer.setMapperClass(Analyzer.class);
+      analyzer.setReducerClass(Analyzer.class);
+      analyzer.setOutputKeyClass(Text.class);
+      analyzer.setOutputValueClass(LinkScore.class);
+      analyzer.setOutputPath(newLinkScores);
+      analyzer.setOutputFormat(MapFileOutputFormat.class);
+
+      try {
+        JobClient.runJob(analyzer);
+      }
+      catch (IOException e) {
+        LOG.error(StringUtils.stringifyException(e));
+        throw e;
+      }
+
+      LOG.info("Installing new link scores");
+      Path old = new Path(linkscores + ".old");
+      if (fs.exists(linkscores)) {
+        fs.rename(linkscores, old);
+      }
+      fs.rename(newLinkScores, linkscores);
+      if (fs.exists(old)) {
+        fs.delete(old);
+      }
+      LOG.info("Finished analysis iteration " + i + " of " + numIterations);
+    }
+    LOG.info("Finished analysis");
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkAnalysis(),
+      args);
+    System.exit(res);
+  }
+
+  public int run(String[] args)
+    throws Exception {
+    if (args.length < 3) {
+      System.out.println("Usage: LinkAnalysis <outlinkdb> <inlinkdb> <output>");
+      System.out.println("\tlinkdb\tThe linkdb to use for outlinks");
+      System.out.println("\toutput\tThe linkscores output");
+      return -1;
+    }
+
+    try {
+      analyze(new Path(args[0]), new Path(args[1]), new Path(args[2]));
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("LinkAnalysis: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
Index: src/java/org/apache/nutch/tools/arc/ArcInputFormat.java
===================================================================
--- src/java/org/apache/nutch/tools/arc/ArcInputFormat.java	(revision 666721)
+++ src/java/org/apache/nutch/tools/arc/ArcInputFormat.java	(working copy)
@@ -42,7 +42,7 @@
     Reporter reporter)
     throws IOException {
     reporter.setStatus(split.toString());
-    return new ArcRecordReader(job, (FileSplit)split);
+    return new ArcGzipRecordReader(job, (FileSplit)split);
   }
 
 }
Index: src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
===================================================================
--- src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java	(revision 666721)
+++ src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java	(working copy)
@@ -16,9 +16,15 @@
  */
 package org.apache.nutch.tools.arc;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.StringReader;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
@@ -59,12 +65,14 @@
 import org.apache.nutch.util.StringUtil;
 
 /**
- * <p>The <code>ArcSegmentCreator</code> is a replacement for fetcher that will
- * take arc files as input and produce a nutch segment as output.</p>
- * 
- * <p>Arc files are tars of compressed gzips which are produced by both the
- * internet archive project and the grub distributed crawler project.</p>
- * 
+ * <p>
+ * The <code>ArcSegmentCreator</code> is a replacement for fetcher that will
+ * take arc files as input and produce a nutch segment as output.
+ * </p>
+ * <p>
+ * Arc files are tars of compressed gzips which are produced by both the
+ * internet archive project and the grub distributed crawler project.
+ * </p>
  */
 public class ArcSegmentCreator
   extends Configured
@@ -86,7 +94,9 @@
   }
 
   /**
-   * <p>Constructor that sets the job configuration.</p>
+   * <p>
+   * Constructor that sets the job configuration.
+   * </p>
    * 
    * @param conf
    */
@@ -109,8 +119,10 @@
   }
 
   /**
-   * <p>Configures the job.  Sets the url filters, scoring filters, url normalizers
-   * and other relevant data.</p>
+   * <p>
+   * Configures the job. Sets the url filters, scoring filters, url normalizers
+   * and other relevant data.
+   * </p>
    * 
    * @param job The job configuration.
    */
@@ -130,11 +142,12 @@
   }
 
   /**
-   * <p>Parses the raw content of a single record to create output.  This method
-   * is almost the same as the {@link org.apache.nutch.Fetcher#output} method in
-   * terms of processing and output.  
+   * <p>
+   * Parses the raw content of a single record to create output. This method is
+   * almost the same as the {@link org.apache.nutch.Fetcher#output} method in
+   * terms of processing and output.
    * 
-   * @param output  The job output collector.
+   * @param output The job output collector.
    * @param segmentName The name of the segment to create.
    * @param key The url of the record.
    * @param datum The CrawlDatum of the record.
@@ -141,12 +154,11 @@
    * @param content The raw content of the record
    * @param pstatus The protocol status
    * @param status The fetch status.
-   * 
    * @return The result of the parse in a ParseStatus object.
    */
-  private ParseStatus output(OutputCollector<Text, NutchWritable> output, String segmentName,
-    Text key, CrawlDatum datum, Content content, ProtocolStatus pstatus,
-    int status) {
+  private ParseStatus output(OutputCollector<Text, NutchWritable> output,
+    String segmentName, Text key, CrawlDatum datum, Content content,
+    ProtocolStatus pstatus, int status) {
 
     // set the fetch status and the fetch time
     datum.setStatus(status);
@@ -192,7 +204,7 @@
         output.collect(key, new NutchWritable(content));
 
         if (parseResult != null) {
-          for (Entry <Text, Parse> entry : parseResult) {
+          for (Entry<Text, Parse> entry : parseResult) {
             Text url = entry.getKey();
             Parse parse = entry.getValue();
             ParseStatus parseStatus = parse.getData().getStatus();
@@ -202,9 +214,9 @@
               parse = parseStatus.getEmptyParse(getConf());
             }
 
-            // Calculate page signature. 
-            byte[] signature = SignatureFactory.getSignature(getConf()).calculate(
-              content, parse);
+            // Calculate page signature.
+            byte[] signature = SignatureFactory.getSignature(getConf())
+              .calculate(content, parse);
             // Ensure segment name and score are in parseData metadata
             parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
               segmentName);
@@ -231,7 +243,8 @@
       }
       catch (IOException e) {
         if (LOG.isFatalEnabled()) {
-          LOG.fatal("ArcSegmentCreator caught:" + StringUtils.stringifyException(e));
+          LOG.fatal("ArcSegmentCreator caught:"
+            + StringUtils.stringifyException(e));
         }
       }
 
@@ -243,7 +256,7 @@
         }
       }
     }
-    
+
     return null;
   }
 
@@ -248,7 +261,9 @@
   }
 
   /**
-   * <p>Logs any error that occurs during conversion.</p>
+   * <p>
+   * Logs any error that occurs during conversion.
+   * </p>
    * 
    * @param url The url we are parsing.
    * @param t The error that occured.
@@ -255,8 +270,8 @@
    */
   private void logError(Text url, Throwable t) {
     if (LOG.isInfoEnabled()) {
-      LOG.info("Conversion of " + url + " failed with: " + 
-          StringUtils.stringifyException(t));
+      LOG.info("Conversion of " + url + " failed with: "
+        + StringUtils.stringifyException(t));
     }
   }
 
@@ -261,8 +276,9 @@
   }
 
   /**
-   * <p>Runs the Map job to translate an arc record into output for Nutch 
-   * segments.</p>
+   * <p>
+   * Runs the Map job to translate an arc record into output for Nutch segments.
+   * </p>
    * 
    * @param key The arc record header.
    * @param bytes The arc record raw content bytes.
@@ -273,12 +289,49 @@
     OutputCollector<Text, NutchWritable> output, Reporter reporter)
     throws IOException {
 
-    String[] headers = key.toString().split("\\s+");
-    String urlStr = headers[0];
-    String version = headers[2];
-    String contentType = headers[3];
-    
-    // arcs start with a file description.  for now we ignore this as it is not
+    String page = new String(bytes.get());
+    int headsep = page.indexOf("\n\n");
+    String header = page.substring(0, headsep);
+    String body = page.substring(headsep + 1);
+
+    if (header.startsWith("filedesc")) {
+      return;
+    }
+
+    BufferedReader reader = new BufferedReader(new StringReader(header));
+    String line = null;
+    List<String> headers = new ArrayList<String>();
+    while ((line = reader.readLine()) != null) {
+      headers.add(line);
+    }
+
+    String[] desc = headers.get(0).split("\\s+");
+    String urlStr = desc[0];
+    String ip = desc[1];
+    String version = desc[2];
+    String contentType = desc[3];
+    int requestLength = Integer.parseInt(desc[4]);
+
+    String[] proto = headers.get(1).split("\\s+");
+    String httpType = proto[0];
+    if (!httpType.startsWith("HTTP")) {
+      return;
+    }
+    int httpCode = Integer.parseInt(proto[1]);
+
+    Map<String, String> headerMap = new HashMap<String, String>();
+    for (int m = 2; m < headers.size(); m++) {
+      String cur = headers.get(m);
+      int sep = cur.indexOf(":");
+      if (sep == -1) {
+        continue;
+      }
+      String name = cur.substring(0, sep);
+      String value = cur.substring(sep + 1);
+      headerMap.put(name.trim(), value.trim());
+    }
+
+    // arcs start with a file description. for now we ignore this as it is not
     // a content record
     if (urlStr.startsWith("filedesc://")) {
       LOG.info("Ignoring file header: " + urlStr);
@@ -286,7 +339,7 @@
     }
     LOG.info("Processing: " + urlStr);
 
-    // get the raw  bytes from the arc file, create a new crawldatum
+    // get the raw bytes from the arc file, create a new crawldatum
     Text url = new Text();
     CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_DB_FETCHED, interval,
       1.0f);
@@ -310,20 +363,24 @@
       url.set(urlStr);
       try {
 
-        // set the protocol status to success and the crawl status to success
-        // create the content from the normalized url and the raw bytes from
-        // the arc file,  TODO: currently this doesn't handle text of errors
-        // pages (i.e. 404, etc.). We assume we won't get those.
-        ProtocolStatus status = ProtocolStatus.STATUS_SUCCESS;
-        Content content = new Content(urlStr, urlStr, bytes.get(), contentType,
-          new Metadata(), getConf());
-        
-        // set the url version into the metadata
-        content.getMetadata().set(URL_VERSION, version);
-        ParseStatus pstatus = null;
-        pstatus = output(output, segmentName, url, datum, content, status,
-          CrawlDatum.STATUS_FETCH_SUCCESS);
-        reporter.progress();
+        // only handle successful fetches for now, it ingnore 404 and redirects
+        // are set as retry output
+        if (httpCode == 200) {
+          ProtocolStatus status = ProtocolStatus.STATUS_SUCCESS;
+          Content content = new Content(urlStr, urlStr, body.getBytes(),
+            contentType, new Metadata(), getConf());
+
+          // set the url version into the metadata
+          content.getMetadata().set(URL_VERSION, version);
+          ParseStatus pstatus = null;
+          pstatus = output(output, segmentName, url, datum, content, status,
+            CrawlDatum.STATUS_FETCH_SUCCESS);
+        }
+        else if (httpCode == 301 || httpCode == 302) {
+          String redirectUrl = headerMap.get("Location");
+          output(output, segmentName, new Text(redirectUrl), datum, null, null,
+            CrawlDatum.STATUS_FETCH_RETRY);
+        }
       }
       catch (Throwable t) { // unexpected exception
         logError(url, t);
@@ -331,14 +388,17 @@
           CrawlDatum.STATUS_FETCH_RETRY);
       }
     }
+    
+    reporter.progress();
   }
 
   /**
-   * <p>Creates the arc files to segments job.</p>
+   * <p>
+   * Creates the arc files to segments job.
+   * </p>
    * 
    * @param arcFiles The path to the directory holding the arc files
    * @param segmentsOutDir The output directory for writing the segments
-   * 
    * @throws IOException If an IO error occurs while running the job.
    */
   public void createSegments(Path arcFiles, Path segmentsOutDir)
@@ -369,7 +429,8 @@
 
   public static void main(String args[])
     throws Exception {
-    int res = ToolRunner.run(NutchConfiguration.create(), new ArcSegmentCreator(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(),
+      new ArcSegmentCreator(), args);
     System.exit(res);
   }
 

