Index: src/java/org/apache/nutch/parse/Outlinks.java
===================================================================
--- src/java/org/apache/nutch/parse/Outlinks.java	(revision 0)
+++ src/java/org/apache/nutch/parse/Outlinks.java	(revision 0)
@@ -0,0 +1,70 @@
+package org.apache.nutch.parse;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class Outlinks
+  implements Writable {
+
+  private Outlink[] outlinks = new Outlink[0];
+  private long timestamp = System.currentTimeMillis();
+
+  public Outlinks() {
+
+  }
+
+  public Outlinks(Outlink[] outlinks) {
+    this.outlinks = outlinks;
+  }
+
+  public Outlink[] getOutlinks() {
+    return outlinks;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public void setOutlinks(Outlink[] outlinks) {
+    this.outlinks = outlinks;
+  }
+  
+  public int getNumOutlinks() {
+    return (outlinks == null ? 0 : outlinks.length);
+  }
+
+  public void write(DataOutput out)
+    throws IOException {
+    int numOutlinks = (outlinks != null ? outlinks.length : 0);    
+    out.writeInt(numOutlinks);
+    out.writeLong(timestamp);
+    if (numOutlinks > 0) {
+      for (int i = 0; i < numOutlinks; i++) {
+        Outlink cur = outlinks[i];
+        cur.write(out);
+      }
+    }
+  }
+
+  public void readFields(DataInput in)
+    throws IOException {
+
+    int numOutlinks = in.readInt();
+    this.timestamp = in.readLong();
+    this.outlinks = new Outlink[numOutlinks];    
+    if (numOutlinks > 0) {
+      for (int i = 0; i < numOutlinks; i++) {
+        Outlink cur = new Outlink();
+        cur.readFields(in);
+        outlinks[i] = cur;
+      }
+    }
+  }
+}
Index: src/java/org/apache/nutch/crawl/InlinkDb.java
===================================================================
--- src/java/org/apache/nutch/crawl/InlinkDb.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/InlinkDb.java	(revision 0)
@@ -0,0 +1,249 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.Outlinks;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+public class InlinkDb
+  extends Configured
+  implements Tool, Mapper<Text, Outlinks, Text, Inlink>,
+  Reducer<Text, Inlink, Text, Inlinks> {
+
+  public static final Log LOG = LogFactory.getLog(InlinkDb.class);
+  public static final String CURRENT_NAME = "current";
+
+  private int maxAnchorLength;
+  private int maxInlinks;
+  private boolean ignoreInternalLinks;
+  private URLFilters urlFilters;
+  private URLNormalizers urlNormalizers;
+
+  private JobConf conf;
+
+  private String getHost(String url) {
+    try {
+      return new URL(url).getHost().toLowerCase();
+    }
+    catch (MalformedURLException e) {
+      return null;
+    }
+  }
+
+  public InlinkDb() {
+  }
+
+  public InlinkDb(Configuration conf) {
+    setConf(conf);
+  }
+
+  public void configure(JobConf conf) {
+    this.conf = conf;
+    maxAnchorLength = conf.getInt("db.max.anchor.length", 100);
+    ignoreInternalLinks = conf.getBoolean("db.ignore.internal.links", true);
+    maxInlinks = conf.getInt("db.max.inlinks", 10000);
+    if (conf.getBoolean(LinkDbFilter.URL_FILTERING, false)) {
+      urlFilters = new URLFilters(conf);
+    }
+    if (conf.getBoolean(LinkDbFilter.URL_NORMALIZING, false)) {
+      urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_LINKDB);
+    }
+  }
+
+  public void close() {
+  }
+
+  public void map(Text key, Outlinks outlinks,
+    OutputCollector<Text, Inlink> output, Reporter reporter)
+    throws IOException {
+
+    String fromUrl = key.toString();
+    String fromHost = getHost(fromUrl);
+
+    if (urlNormalizers != null) {
+      try {
+        fromUrl = urlNormalizers
+          .normalize(fromUrl, URLNormalizers.SCOPE_LINKDB); // normalize the url
+      }
+      catch (Exception e) {
+        LOG.warn("Skipping " + fromUrl + ":" + e);
+        fromUrl = null;
+      }
+    }
+
+    if (fromUrl != null && urlFilters != null) {
+      try {
+        fromUrl = urlFilters.filter(fromUrl); // filter the url
+      }
+      catch (Exception e) {
+        LOG.warn("Skipping " + fromUrl + ":" + e);
+        fromUrl = null;
+      }
+    }
+
+    if (fromUrl == null) {
+      return;
+    }
+
+    Outlink[] outlinkAr = outlinks.getOutlinks();
+    for (int i = 0; i < outlinkAr.length; i++) {
+
+      Outlink outlink = outlinkAr[i];
+      String toUrl = outlink.getToUrl();
+
+      if (ignoreInternalLinks) {
+        String toHost = getHost(toUrl);
+        if (toHost == null || toHost.equals(fromHost)) {
+          continue;
+        }
+      }
+      if (urlNormalizers != null) {
+        try {
+          toUrl = urlNormalizers.normalize(toUrl, URLNormalizers.SCOPE_LINKDB);
+        }
+        catch (Exception e) {
+          LOG.warn("Skipping " + toUrl + ":" + e);
+          toUrl = null;
+        }
+      }
+      if (toUrl != null && urlFilters != null) {
+        try {
+          toUrl = urlFilters.filter(toUrl);
+        }
+        catch (Exception e) {
+          LOG.warn("Skipping " + toUrl + ":" + e);
+          toUrl = null;
+        }
+      }
+      if (toUrl == null) {
+        continue;
+      }
+
+      String anchor = outlink.getAnchor();
+      if (anchor.length() > maxAnchorLength) {
+        anchor = anchor.substring(0, maxAnchorLength);
+      }
+
+      output.collect(new Text(toUrl), new Inlink(fromUrl, anchor));
+    }
+
+  }
+
+  public void reduce(Text key, Iterator<Inlink> values,
+    OutputCollector<Text, Inlinks> output, Reporter reporter)
+    throws IOException {
+
+    Inlinks inlinks = new Inlinks();
+    int numCollected = 0;
+    while (values.hasNext()) {
+      Inlink inlink = values.next();
+      if (maxInlinks <= 0 || numCollected < maxInlinks) {
+        numCollected++;
+        inlinks.add((Inlink)WritableUtils.clone(inlink, getConf()));
+      }
+      else {
+        LOG.debug("Ignoring " + inlink.getFromUrl() + " max of " + maxInlinks
+          + " collected");
+      }
+    }
+    output.collect(key, inlinks);
+  }
+
+  public void invert(Path outlinkdb, Path inlinkdb)
+    throws IOException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("InlinkDb: starting");
+      LOG.info("InlinkDb: outlinkdb: " + outlinkdb);
+      LOG.info("InlinkDb: inlinkdb: " + inlinkdb);
+    }
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+    boolean outlinkDbExists = fs.exists(outlinkdb);
+    if (!outlinkDbExists) {
+      fs.mkdirs(outlinkdb);
+    }
+
+    Path outlinkCurrent = new Path(outlinkdb, CURRENT_NAME);
+    Path inlinkCurrent = new Path(inlinkdb, CURRENT_NAME);
+
+    JobConf job = new NutchJob(conf);
+    job.setJobName("Inlinkdb " + inlinkdb);
+    LOG.info("InlinkDb: adding input: " + outlinkdb);
+    job.addInputPath(outlinkCurrent);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setMapperClass(InlinkDb.class);
+    job.setReducerClass(InlinkDb.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Inlink.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Inlinks.class);
+    job.setOutputPath(inlinkCurrent);
+    job.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      LOG.info("InlinkDb: running");
+      JobClient.runJob(job);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    LOG.info("InlinkDb: finished");
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+   
+     int res = ToolRunner.run(NutchConfiguration.create(), new InlinkDb(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args)
+    throws Exception {
+
+    if (args.length < 2) {
+      System.out.println("Usage: InlinkDb <outlinkdb> <inlinkdb>");
+      System.out.println("\toutlinkdb\tThe OutlinkDb to update");
+      System.out.println("\tinlinkdb\tThe output InlinkDb");
+      return -1;
+    }
+
+    try {
+      invert(new Path(args[0]), new Path(args[1]));
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("InlinkDb: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
Index: src/java/org/apache/nutch/crawl/OutlinkDb.java
===================================================================
--- src/java/org/apache/nutch/crawl/OutlinkDb.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/OutlinkDb.java	(revision 0)
@@ -0,0 +1,279 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.Outlinks;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.URLUtil;
+
+public class OutlinkDb
+  extends Configured
+  implements Tool, Mapper<Text, Writable, Text, Outlinks>,
+  Reducer<Text, Outlinks, Text, Outlinks> {
+
+  public static final Log LOG = LogFactory.getLog(OutlinkDb.class);
+  public static final String LOCK_NAME = ".locked";
+  public static final String CURRENT_NAME = "current";
+
+  private boolean ignoreInternal = true;
+  private URLNormalizers urlNormalizers;
+
+  private JobConf conf;
+
+  public OutlinkDb() {
+  }
+
+  public OutlinkDb(Configuration conf) {
+    setConf(conf);
+  }
+
+  public void configure(JobConf conf) {
+    this.conf = conf;
+    ignoreInternal = conf.getBoolean("outlink.ignore.internal", true);
+    urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
+  }
+
+  public void close() {
+  }
+
+  public void map(Text key, Writable value,
+    OutputCollector<Text, Outlinks> output, Reporter reporter)
+    throws IOException {
+
+    String url = key.toString();
+    if (urlNormalizers != null) {
+      try {
+        url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
+      }
+      catch (Exception e) {
+        LOG.warn("Skipping " + url + ":" + e);
+        url = null;
+      }
+    }
+    if (url == null) {
+      return;
+    }
+
+    if (value instanceof ParseData) {
+
+      ParseData data = (ParseData)value;
+      Outlink[] outlinkAr = data.getOutlinks();
+
+      if (outlinkAr != null && outlinkAr.length > 0) {
+
+        List<Outlink> outlinkList = new ArrayList<Outlink>();
+        for (int i = 0; i < outlinkAr.length; i++) {
+
+          Outlink outlink = outlinkAr[i];
+          String toUrl = outlink.getToUrl();
+          String anchor = outlink.getAnchor();
+          try {
+            toUrl = urlNormalizers.normalize(toUrl,
+              URLNormalizers.SCOPE_DEFAULT);
+          }
+          catch (Exception e) {
+            LOG.warn("Skipping outlink " + toUrl + ":" + e);
+            toUrl = null;
+          }
+
+          if (toUrl != null) {
+            outlinkList.add(new Outlink(toUrl, anchor));
+          }
+        }
+        outlinkAr = outlinkList.toArray(new Outlink[outlinkList.size()]);
+      }
+
+      Outlinks outlinks = new Outlinks(outlinkAr);
+      outlinks.setTimestamp(System.currentTimeMillis());
+      output.collect(key, outlinks);
+    }
+    else if (value instanceof CrawlDatum) {
+      output.collect(key, new Outlinks());
+    }
+    else if (value instanceof Outlinks) {
+      output.collect(key, (Outlinks)value);
+    }
+
+  }
+
+  public void reduce(Text key, Iterator<Outlinks> values,
+    OutputCollector<Text, Outlinks> output, Reporter reporter)
+    throws IOException {
+
+    Outlinks mostRecent = null;
+    while (values.hasNext()) {
+      Outlinks next = values.next();
+      long timestamp = next.getTimestamp();
+      int numOutlinks = next.getNumOutlinks();
+      if (mostRecent == null
+        || (mostRecent.getNumOutlinks() == 0 && numOutlinks > 0)
+        || mostRecent.getTimestamp() < timestamp && numOutlinks > 0) {
+        mostRecent = (Outlinks)WritableUtils.clone(next, conf);
+      }
+    }
+
+    if (ignoreInternal) {
+
+      String domain = URLUtil.getDomainName(key.toString());
+      List<Outlink> externalList = new ArrayList<Outlink>();
+      Outlink[] outlinks = mostRecent.getOutlinks();
+      for (int i = 0; i < outlinks.length; i++) {
+        Outlink current = outlinks[i];
+        String curDomain = URLUtil.getDomainName(current.getToUrl());
+        if (curDomain != null && !curDomain.equalsIgnoreCase(domain)) {
+          externalList.add(current);
+        }
+      }
+      Outlink[] externals = externalList.toArray(new Outlink[externalList
+        .size()]);
+      mostRecent.setOutlinks(externals);
+      mostRecent.setTimestamp(System.currentTimeMillis());
+    }
+    output.collect(key, mostRecent);
+
+  }
+
+  public void update(Path crawldb, Path outlinkdb, Path[] segments)
+    throws IOException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("OutlinkDb: starting");
+      LOG.info("OutlinkDb: crawldb: " + crawldb);
+      LOG.info("OutlinkDb: outlinkdb: " + outlinkdb);
+      LOG.info("OutlinkDb: segments: " + segments);
+    }
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+    Path lock = new Path(outlinkdb, LOCK_NAME);
+    boolean outlinkDbExists = fs.exists(outlinkdb);
+    if (outlinkDbExists) {
+      LockUtil.createLockFile(fs, lock, false);
+    }
+
+    Path outlinkCurrent = new Path(outlinkdb, CURRENT_NAME);
+    Path newOutlinkDb = new Path(outlinkdb, Integer.toString(new Random()
+      .nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(conf);
+    job.setJobName("Outlinkdb " + outlinkdb);
+
+    if (segments != null) {
+      for (int i = 0; i < segments.length; i++) {
+        Path parseData = new Path(segments[i], ParseData.DIR_NAME);
+        if (fs.exists(parseData)) {
+          LOG.info("OutlinkDb: adding input: " + parseData);
+          job.addInputPath(parseData);
+        }
+      }
+    }
+
+    if (outlinkDbExists) {
+      LOG.info("OutlinkDb: adding input: " + outlinkdb);
+      job.addInputPath(outlinkCurrent);
+    }
+
+    Path crawlDbCurrent = new Path(crawldb, CrawlDb.CURRENT_NAME);
+    if (fs.exists(crawlDbCurrent)) {
+      LOG.info("OutlinkDb: adding input: " + crawlDbCurrent);
+      job.addInputPath(crawlDbCurrent);
+    }
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setMapperClass(OutlinkDb.class);
+    job.setReducerClass(OutlinkDb.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Outlinks.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Outlinks.class);
+    job.setOutputPath(newOutlinkDb);
+    job.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      LOG.info("OutlinkDb: running");
+      JobClient.runJob(job);
+    }
+    catch (IOException e) {
+      LockUtil.removeLockFile(fs, lock);
+      if (fs.exists(job.getOutputPath())) {
+        fs.delete(job.getOutputPath());
+      }
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    LOG.info("OutlinkDb: installing " + outlinkdb);
+    Path old = new Path(outlinkCurrent + ".old");
+    if (fs.exists(outlinkCurrent)) {
+      fs.rename(outlinkCurrent, old);
+    }
+    fs.rename(newOutlinkDb, outlinkCurrent);
+    if (fs.exists(old)) {
+      fs.delete(old);
+    }
+    LockUtil.removeLockFile(fs, lock);
+    LOG.info("OutlinkDb: finished");
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner
+      .run(NutchConfiguration.create(), new OutlinkDb(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args)
+    throws Exception {
+    if (args.length < 3) {
+      System.out.println("Usage: OutlinkDb <crawldb> <outlinkdb> [<segment1>"
+        + " <segment2> ...]");
+      System.out.println("\tcrawldb\tThe crawldb to pull from");
+      System.out.println("\toutlinkdb\tThe OutlinkDb to update");
+      System.out.println("\tsegments\tSegments used to update the OutlinkDb");
+      return -1;
+    }
+
+    try {
+      int numSegments = args.length - 2;
+      Path[] segments = new Path[numSegments];
+      for (int i = 0; i < numSegments; i++) {
+        segments[i] = new Path(args[i + 2]);
+      }
+      update(new Path(args[0]), new Path(args[1]), segments);
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("OutlinkDb: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
Index: src/java/org/apache/nutch/scoring/LinkAnalysis.java
===================================================================
--- src/java/org/apache/nutch/scoring/LinkAnalysis.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/LinkAnalysis.java	(revision 0)
@@ -0,0 +1,933 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.Inlink;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.crawl.LinkDb;
+import org.apache.nutch.crawl.OutlinkDb;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.Outlinks;
+import org.apache.nutch.util.MapFileUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * A LinkAnalysis Tool which performs a PageRank-like scoring.
+ */
+public class LinkAnalysis
+  extends Configured
+  implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(LinkAnalysis.class);
+  public static final String CURRENT_NAME = "current";
+  private static final String LINK_SCORES = "link.scores.file";
+  private static final String NUM_LINKS = "numlinks";
+
+  private static HashPartitioner hashPartitioner = new HashPartitioner();
+  private int numIterations = 10;
+
+  /**
+   * A simple display tool which prints out the inlinks and outlinks for a given
+   * url along with the scores for those urls.
+   */
+  public static class Display {
+
+    private Configuration conf;
+    private FileSystem fs;
+    private MapFile.Reader[] linkScoreReaders;
+    private MapFile.Reader[] linkDbReaders;
+    private MapFile.Reader[] outlinkDbReaders;
+    private String linkScores = null;
+    private String url = null;
+    private String linkDb = null;
+
+    private String displayLinkScore(String url)
+      throws IOException {
+
+      LinkScore linkScore = new LinkScore();
+      MapFileOutputFormat.getEntry(linkScoreReaders, hashPartitioner, new Text(
+        url), linkScore);
+      return url.toString() + " inlink: " + linkScore.getScore() + " outlink: "
+        + linkScore.getScore();
+    }
+
+    public Display(String linkScores, String linkDb, String outlinkDb,
+      String url)
+      throws IOException {
+
+      this.linkScores = linkScores;
+      this.linkDb = linkDb;
+      this.url = url;
+      Configuration conf = NutchConfiguration.create();
+      FileSystem fs = FileSystem.get(conf);
+      Path linkScoresCurrent = new Path(linkScores, CURRENT_NAME);
+      linkScoreReaders = MapFileOutputFormat.getReaders(fs, linkScoresCurrent,
+        conf);
+      linkDbReaders = MapFileOutputFormat.getReaders(fs, new Path(linkDb,
+        LinkDb.CURRENT_NAME), conf);
+      outlinkDbReaders = MapFileOutputFormat.getReaders(fs, new Path(outlinkDb,
+        OutlinkDb.CURRENT_NAME), conf);
+    }
+
+    public void display()
+      throws IOException {
+      System.out.println(displayLinkScore(url));
+
+      String domain = URLUtil.getDomainName(url);
+
+      Inlinks inlinks = new Inlinks();
+      MapFileOutputFormat.getEntry(linkDbReaders, hashPartitioner,
+        new Text(url), inlinks);
+      Iterator<Inlink> inlinkIt = inlinks.iterator();
+      System.out.println("\nInlinks:\n");
+      while (inlinkIt.hasNext()) {
+        Inlink inlink = inlinkIt.next();
+        String inlinkUrl = inlink.getFromUrl();
+        String inlinkDomain = URLUtil.getDomainName(inlinkUrl);
+        boolean sameDomain = inlinkDomain.equals(domain);
+        System.out.println("\t" + (sameDomain ? "Ignored " : "")
+          + displayLinkScore(inlinkUrl));
+      }
+
+      Outlinks outlinks = new Outlinks();
+      MapFileOutputFormat.getEntry(outlinkDbReaders, hashPartitioner, new Text(
+        url), outlinks);
+      Outlink[] outlinkAr = outlinks.getOutlinks();
+      System.out.println("\nOutlinks:\n");
+      for (int i = 0; i < outlinkAr.length; i++) {
+        Outlink outlink = outlinkAr[i];
+        String inlinkUrl = outlink.getToUrl();
+        String outlinkDomain = URLUtil.getDomainName(inlinkUrl);
+        boolean sameDomain = outlinkDomain.equals(domain);
+        System.out.println("\t" + (sameDomain ? "Ignored " : "")
+          + displayLinkScore(inlinkUrl));
+      }
+    }
+
+    public void close()
+      throws IOException {
+      MapFileUtils.closeReaders(linkScoreReaders);
+      MapFileUtils.closeReaders(linkDbReaders);
+      MapFileUtils.closeReaders(outlinkDbReaders);
+    }
+
+    public static void main(String[] args)
+      throws Exception {
+
+      if (args.length < 3) {
+        System.out.println("Usage: LinkAnalysis.Display <linkscores> <linkdb>"
+          + " <outlinkdb> <url>");
+        System.out.println("\tlinkscores\tThe ioutlinkdb to use for outlinks");
+        System.out.println("\tlinkdb\tThe linkdb to get inlinks from");
+        System.out.println("\turl\tThe url to display");
+        return;
+      }
+
+      String linkScores = args[0];
+      String linkdb = args[1];
+      String outlinkdb = args[2];
+      String url = args[3];
+      Display display = new Display(linkScores, linkdb, outlinkdb, url);
+      display.display();
+      display.close();
+    }
+  }
+
+  public static class LinkScore
+    implements Writable {
+
+    private String url = null;
+    private double score = 0.0f;
+
+    public LinkScore() {
+
+    }
+
+    public LinkScore(String url, double score) {
+      this.url = url;
+      this.score = score;
+    }
+
+    public String getUrl() {
+      return url;
+    }
+
+    public void setUrl(String url) {
+      this.url = url;
+    }
+
+    public double getScore() {
+      return score;
+    }
+
+    public void setScore(double score) {
+      this.score = score;
+    }
+
+    public void readFields(DataInput in)
+      throws IOException {
+
+      url = Text.readString(in);
+      score = in.readDouble();
+    }
+
+    public void write(DataOutput out)
+      throws IOException {
+
+      Text.writeString(out, url);
+      out.writeDouble(score);
+    }
+  }
+
+  public static class LinkScores
+    implements Writable {
+
+    private LinkScore[] scores;
+
+    public LinkScores() {
+
+    }
+
+    public void set(LinkScore[] scores) {
+      this.scores = scores;
+    }
+
+    public LinkScore[] get() {
+      return scores;
+    }
+
+    public int getNumLinks() {
+      return (scores != null) ? scores.length : 0;
+    }
+
+    public void readFields(DataInput in)
+      throws IOException {
+      int numScores = in.readInt();
+      scores = new LinkScore[numScores];
+      for (int i = 0; i < numScores; i++) {
+        LinkScore cur = new LinkScore();
+        cur.readFields(in);
+        scores[i] = cur;
+      }
+    }
+
+    public void write(DataOutput out)
+      throws IOException {
+      int numScores = scores.length;
+      out.writeInt(numScores);
+      for (int i = 0; i < numScores; i++) {
+        LinkScore cur = scores[i];
+        cur.write(out);
+      }
+    }
+
+  }
+
+  private static class Counter
+    implements Mapper<Text, Outlinks, Text, LongWritable>,
+    Reducer<Text, LongWritable, Text, LongWritable> {
+
+    private JobConf conf;
+    private static Text numLinks = new Text(NUM_LINKS);
+    private static LongWritable one = new LongWritable(1L);
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    public void map(Text key, Outlinks value,
+      OutputCollector<Text, LongWritable> output, Reporter reporter)
+      throws IOException {
+      output.collect(numLinks, one);
+    }
+
+    public void reduce(Text key, Iterator<LongWritable> values,
+      OutputCollector<Text, LongWritable> output, Reporter reporter)
+      throws IOException {
+
+      long total = 0;
+      while (values.hasNext()) {
+        total += values.next().get();
+      }
+      output.collect(numLinks, new LongWritable(total));
+    }
+
+    public void close() {
+    }
+  }
+
+  private static class Initializer
+    implements Mapper<Text, Outlinks, Text, LinkScore> {
+
+    private JobConf conf;
+    private float initialScore = .50f;
+    private URLNormalizers urlNormalizers;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      initialScore = conf.getFloat("link.analyze.initial.score", .50f);
+      urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
+    }
+
+    public void map(Text key, Outlinks value,
+      OutputCollector<Text, LinkScore> output, Reporter reporter)
+      throws IOException {
+
+      String url = key.toString();
+      if (urlNormalizers != null) {
+        try {
+          url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
+        }
+        catch (Exception e) {
+          LOG.warn("Skipping " + url + ":" + e);
+          url = null;
+        }
+      }
+
+      if (url != null) {
+        output.collect(new Text(url), new LinkScore(url, initialScore));
+        LOG.debug(url + ": initial score: " + initialScore);
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  private static class Inverter
+    implements Mapper<Text, Writable, Text, ObjectWritable>,
+    Reducer<Text, ObjectWritable, Text, LinkScore> {
+
+    private JobConf conf;
+    private float minScore = 0.001f;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      this.minScore = conf.getFloat("link.analyze.minimum.score", 0.001f);
+    }
+
+    public void map(Text key, Writable value,
+      OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+      throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      objWrite.set(value);
+      output.collect(key, objWrite);
+    }
+
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+      OutputCollector<Text, LinkScore> output, Reporter reporter)
+      throws IOException {
+
+      String fromUrl = key.toString();
+      Outlinks outlinks = null;
+      LinkScore inlinkScore = null;
+
+      while (values.hasNext()) {
+        ObjectWritable write = values.next();
+        Object obj = write.get();
+        if (obj instanceof Outlinks) {
+          outlinks = (Outlinks)obj;
+        }
+        else if (obj instanceof LinkScore) {
+          inlinkScore = (LinkScore)obj;
+        }
+      }
+
+      boolean hasInlinks = (inlinkScore != null);
+      int numOutlinks = (outlinks != null ? outlinks.getNumOutlinks() : 0);
+      LOG.debug(fromUrl + ": num outlinks " + numOutlinks);
+
+      if (numOutlinks > 0) {
+
+        double outlinkScore = (hasInlinks ? inlinkScore.getScore() : minScore);
+        double origScore = outlinkScore;
+        if (numOutlinks > 0) {
+          outlinkScore = outlinkScore / numOutlinks;
+        }
+
+        Outlink[] outlinkAr = outlinks.getOutlinks();
+        for (int i = 0; i < outlinkAr.length; i++) {
+          Outlink outlink = outlinkAr[i];
+          String toUrl = outlink.getToUrl();
+          output.collect(new Text(toUrl), new LinkScore(fromUrl, outlinkScore));
+          LOG.debug(toUrl + ": inverting inlink from " + fromUrl + " origscore: "
+            + origScore + " numOutlinks: " + numOutlinks + " inlinkscore: "
+            + outlinkScore);
+        }
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  private static class Merger
+    implements Reducer<Text, LinkScore, Text, LinkScores> {
+
+    private JobConf conf;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    public void reduce(Text key, Iterator<LinkScore> values,
+      OutputCollector<Text, LinkScores> output, Reporter reporter)
+      throws IOException {
+
+      String url = key.toString();
+      List<LinkScore> inlinkScoresList = new ArrayList<LinkScore>();
+      while (values.hasNext()) {
+        LinkScore cur = values.next();
+        inlinkScoresList.add((LinkScore)WritableUtils.clone(cur, conf));
+        LOG.debug(url + ": inlink score:" + cur.getScore() + " from "
+          + cur.getUrl());
+      }
+
+      LinkScore[] inlinkScoresAr = new LinkScore[inlinkScoresList.size()];
+      inlinkScoresList.toArray(inlinkScoresAr);
+      LinkScores inlinkScores = new LinkScores();
+      inlinkScores.set(inlinkScoresAr);
+
+      output.collect(key, inlinkScores);
+    }
+
+    public void close() {
+    }
+  }
+
+  private static class Analyzer
+    implements Mapper<Text, Writable, Text, ObjectWritable>,
+    Reducer<Text, ObjectWritable, Text, LinkScore> {
+
+    private JobConf conf;
+    private FileSystem fs;
+    private FileSystem localFs;
+    private Path linkScores = null;
+    private Path localScores = null;
+    private float minScore = 0.001f;
+    private double reciprocalScore = 0.001f;
+    private double rankOneScore = 0.001f;
+    private double tScore = 0.001f;
+    private int itNum = 0;
+    private int minInlinks = 3;
+
+    public void configure(JobConf conf) {
+
+      try {
+
+        this.conf = conf;
+        this.fs = FileSystem.get(conf);
+        this.localFs = FileSystem.getLocal(conf);
+        this.linkScores = new Path(conf.get(LINK_SCORES));
+        this.minScore = conf.getFloat("link.analyze.minimum.score", 0.001f);
+        this.minInlinks = conf.getInt("link.analyze.minimum.inlinks", 3);
+        this.reciprocalScore = conf.getFloat("link.analyze.reciprocal.score",
+          minScore);
+        this.rankOneScore = conf.getFloat("link.analyze.rankone.score",
+          minScore);
+        this.tScore = conf.getFloat("link.analyze.teleport.score", minScore);
+        this.itNum = conf.getInt("link.analyze.iteration", 0);
+      }
+      catch (Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+        throw new IllegalArgumentException(e);
+      }
+    }
+
+    public void map(Text key, Writable value,
+      OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+      throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      objWrite.set(value);
+      output.collect(key, objWrite);
+    }
+
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+      OutputCollector<Text, LinkScore> output, Reporter reporter)
+      throws IOException {
+
+      String url = key.toString();
+      String domain = URLUtil.getDomainName(url).trim();
+      Outlinks outlinks = null;
+      LinkScores inlinks = null;
+
+      while (values.hasNext()) {
+        ObjectWritable next = values.next();
+        Object value = next.get();
+        if (value instanceof Outlinks) {
+          outlinks = (Outlinks)value;
+        }
+        else if (value instanceof LinkScores) {
+          inlinks = (LinkScores)value;
+        }
+      }
+
+      int numOutlinks = (outlinks == null ? 0 : outlinks.getNumOutlinks());
+      int numInlinks = (inlinks != null ? inlinks.getNumLinks() : 0);
+
+      if (numInlinks == 0) {
+
+        double inlinkScore = tScore;
+        double outlinkScore = 0.0d;
+        LOG.debug(url + ": adding tscore of " + tScore);
+
+        if (numOutlinks == 0) {
+          inlinkScore += rankOneScore;
+          LOG.debug(url + ": adding rank one score of " + rankOneScore);
+        }
+        else {
+          outlinkScore = tScore / numOutlinks;
+        }
+
+        if (inlinkScore < minScore) {
+          inlinkScore = minScore;
+          LOG.debug(url + ": setting " + inlinkScore + " to minscore "
+            + minScore);
+        }
+
+        output.collect(key, new LinkScore(key.toString(), outlinkScore));
+        LOG.debug(url + ": score: " + inlinkScore + " num inlinks: "
+          + numInlinks + " iteration: " + itNum + "\n");
+      }
+      else {
+
+        Set<String> outlinkSet = new LinkedHashSet<String>();
+        Set<String> inlinkDomains = new LinkedHashSet<String>();
+        Map<String, Integer> inDomainCounts = new HashMap<String, Integer>();
+        Set<String> outlinkDomains = new LinkedHashSet<String>();
+
+        LinkScore[] inlinkScores = inlinks.get();
+        for (int i = 0; i < inlinkScores.length; i++) {
+
+          LinkScore inlink = inlinkScores[i];
+          String inlinkUrl = inlink.getUrl();
+          String inlinkDomain = URLUtil.getDomainName(inlinkUrl).trim();
+
+          inlinkDomains.add(inlinkDomain);
+          int count = 0;
+          if (inDomainCounts.containsKey(inlinkDomain)) {
+            count = inDomainCounts.get(inlinkDomain);
+          }
+          inDomainCounts.put(inlinkDomain, ++count);
+        }
+
+        if (numOutlinks > 0) {
+          Outlink[] outlinkAr = outlinks.getOutlinks();
+          for (int i = 0; i < outlinkAr.length; i++) {
+            Outlink outlink = outlinkAr[i];
+            String outlinkUrl = outlink.getToUrl();
+            String outlinkDomain = URLUtil.getDomainName(outlinkUrl).trim();
+            outlinkDomains.add(outlinkDomain);
+            outlinkSet.add(url);
+          }
+        }
+
+        double totalInlinkScore = 0.0f;
+        for (int i = 0; i < inlinkScores.length; i++) {
+
+          LinkScore inlink = inlinkScores[i];
+          String inlinkUrl = inlink.getUrl();
+          String inlinkDomain = URLUtil.getDomainName(inlinkUrl).trim();
+          if (domain.equals(inlinkDomain)) {
+            LOG.debug(url + ": ignoring inlink from " + inlinkUrl
+              + " same domain");
+            continue;
+          }
+          double scoreFromInlink = inlink.getScore();
+
+          // penalize for reciprocal linking between domains
+          if (outlinkDomains.contains(inlinkDomain)) {
+            scoreFromInlink = reciprocalScore;
+            LOG.debug(url + ": reciprocal link from " + inlinkUrl);
+          }
+
+          // normalize out many links from a single domain
+          int domainCount = inDomainCounts.get(inlinkDomain);
+          scoreFromInlink = (scoreFromInlink / domainCount);
+          totalInlinkScore += scoreFromInlink;
+          LOG.debug(url + ": adding " + scoreFromInlink + " from " + inlinkUrl
+            + ", total: " + totalInlinkScore);
+        }
+
+        totalInlinkScore += tScore;
+        LOG.debug(url + ": adding tscore of " + tScore + ", total: "
+          + totalInlinkScore);
+
+        // total inlink score, score for the doc, must be at least min score
+        if (totalInlinkScore < minScore) {
+          totalInlinkScore = minScore;
+          LOG.debug(url + ": setting " + totalInlinkScore + " to minscore "
+            + minScore);
+        }
+
+        LOG.debug(url + ": score: " + totalInlinkScore + " num inlinks: "
+          + numInlinks + " iteration: " + itNum + "\n");
+
+        output.collect(key, new LinkScore(key.toString(), totalInlinkScore));
+      }
+    }
+
+    public void close()
+      throws IOException {
+    }
+
+  }
+
+  public LinkAnalysis() {
+    super();
+  }
+
+  public LinkAnalysis(Configuration conf) {
+    super(conf);
+  }
+
+  public void close() {
+  }
+
+  public void analyze(Path outlinkdb, Path linkscores)
+    throws IOException {
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+    Path outlinkDbCurrent = new Path(outlinkdb, OutlinkDb.CURRENT_NAME);
+    Path linkScoresCurrent = new Path(linkscores, CURRENT_NAME);
+    if (!fs.exists(linkscores)) {
+      fs.mkdirs(linkscores);
+    }
+
+    Path numLinksPath = new Path(outlinkdb, NUM_LINKS);
+    JobConf counter = new NutchJob(conf);
+    counter.setJobName("LinkAnalysis Counter");
+    counter.addInputPath(outlinkDbCurrent);
+    counter.setOutputPath(numLinksPath);
+    counter.setInputFormat(SequenceFileInputFormat.class);
+    counter.setMapperClass(Counter.class);
+    counter.setCombinerClass(Counter.class);
+    counter.setReducerClass(Counter.class);
+    counter.setMapOutputKeyClass(Text.class);
+    counter.setMapOutputValueClass(LongWritable.class);
+    counter.setOutputKeyClass(Text.class);
+    counter.setOutputValueClass(LongWritable.class);
+    counter.setNumReduceTasks(1);
+    counter.setOutputFormat(TextOutputFormat.class);
+
+    LOG.info("Starting link counter job");
+    try {
+      JobClient.runJob(counter);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished link counter job");
+
+    LOG.info("Reading numlinks temp file");
+    FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-00000"));
+    BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks));
+    String numLinksLine = buffer.readLine();
+    readLinks.close();
+
+    LOG.info("Deleting numlinks temp file");
+    fs.delete(numLinksPath);
+    String numLinks = numLinksLine.split("\\s+")[1];
+
+    JobConf initializer = new NutchJob(conf);
+    initializer.setJobName("LinkAnalysis Initializer");
+    initializer.addInputPath(outlinkDbCurrent);
+    initializer.setOutputPath(linkScoresCurrent);
+    initializer.setInputFormat(SequenceFileInputFormat.class);
+    initializer.setMapperClass(Initializer.class);
+    initializer.setMapOutputKeyClass(Text.class);
+    initializer.setMapOutputValueClass(LinkScore.class);
+    initializer.setOutputKeyClass(Text.class);
+    initializer.setOutputValueClass(LinkScore.class);
+    initializer.setOutputFormat(MapFileOutputFormat.class);
+
+    LOG.info("Starting initialization job");
+    try {
+      JobClient.runJob(initializer);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished initialization job.");
+
+    Path linkScoresInverted = new Path(linkscores, "inverted");
+    Path linkScoresMerged = new Path(linkscores, "merged");
+
+    JobConf initinverter = new NutchJob(conf);
+    initinverter.setJobName("LinkAnalysis Inverter");
+    initinverter.addInputPath(linkScoresCurrent);
+    initinverter.addInputPath(outlinkDbCurrent);
+    initinverter.setOutputPath(linkScoresInverted);
+    initinverter.setInputFormat(SequenceFileInputFormat.class);
+    initinverter.setMapperClass(Inverter.class);
+    initinverter.setReducerClass(Inverter.class);
+    initinverter.setMapOutputKeyClass(Text.class);
+    initinverter.setMapOutputValueClass(ObjectWritable.class);
+    initinverter.setOutputKeyClass(Text.class);
+    initinverter.setOutputValueClass(LinkScore.class);
+    initinverter.setOutputFormat(SequenceFileOutputFormat.class);
+
+    LOG.info("Starting inverter job");
+    try {
+      JobClient.runJob(initinverter);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished inverter job.");
+
+    JobConf initmerge = new NutchJob(conf);
+    initmerge.setJobName("LinkAnalysis Merger");
+    initmerge.addInputPath(linkScoresInverted);
+    initmerge.setOutputPath(linkScoresMerged);
+    initmerge.setInputFormat(SequenceFileInputFormat.class);
+    initmerge.setReducerClass(Merger.class);
+    initmerge.setMapOutputKeyClass(Text.class);
+    initmerge.setMapOutputValueClass(LinkScore.class);
+    initmerge.setOutputKeyClass(Text.class);
+    initmerge.setOutputValueClass(LinkScores.class);
+    initmerge.setOutputFormat(MapFileOutputFormat.class);
+
+    LOG.info("Starting merger job");
+    try {
+      JobClient.runJob(initmerge);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished merger job.");
+
+    float teleportPrct = conf.getFloat("link.analyze.teleport.prct", .50f);
+    double rankOneScore = (1d / (double)Integer.parseInt(numLinks));
+    double tScore = ((1 - teleportPrct) * rankOneScore);
+    float minScore = conf.getFloat("link.analyze.minimum.score",
+      Float.MIN_VALUE);
+
+    LOG.info("Number of links " + numLinks);
+    LOG.info("MinScore " + minScore);
+    LOG.info("RankOne " + rankOneScore);
+    LOG.info("tScore " + tScore);
+
+    numIterations = conf.getInt("link.analyze.num.iterations", 10);
+    for (int i = 0; i < numIterations; i++) {
+
+      LOG.info("Running iteration " + (i + 1) + " of " + numIterations);
+      Path tempScores = new Path(linkscores + "-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+      fs.mkdirs(tempScores);
+
+      Path tempCurrent = new Path(tempScores, CURRENT_NAME);
+      Path tempInverted = new Path(tempScores, "inverted");
+      Path tempMerged = new Path(tempScores, "merged");
+
+      JobConf analyzer = new NutchJob(conf);
+      analyzer.set("link.analyze.num.links", numLinks);
+      analyzer.set("link.analyze.iteration", String.valueOf(i + 1));
+      analyzer.set("link.analyze.rankone", String.valueOf(rankOneScore));
+      analyzer.set("link.analyze.tscore", String.valueOf(tScore));
+      analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (i + 1)
+        + " of " + numIterations);
+      analyzer.set(LINK_SCORES, linkscores.toString());
+      analyzer.addInputPath(outlinkDbCurrent);
+      analyzer.addInputPath(linkScoresMerged);
+      analyzer.setMapOutputKeyClass(Text.class);
+      analyzer.setMapOutputValueClass(ObjectWritable.class);
+      analyzer.setInputFormat(SequenceFileInputFormat.class);
+      analyzer.setMapperClass(Analyzer.class);
+      analyzer.setReducerClass(Analyzer.class);
+      analyzer.setOutputKeyClass(Text.class);
+      analyzer.setOutputValueClass(LinkScore.class);
+      analyzer.setOutputPath(tempCurrent);
+      analyzer.setOutputFormat(MapFileOutputFormat.class);
+
+      LOG.info("Starting link analysis job");
+      try {
+        JobClient.runJob(analyzer);
+      }
+      catch (IOException e) {
+        LOG.error(StringUtils.stringifyException(e));
+        throw e;
+      }
+      LOG.info("Finished linkanalysis job.");
+
+      if (i < (numIterations - 1)) {
+
+        JobConf inverter = new NutchJob(conf);
+        inverter.setJobName("LinkAnalysis Inverter");
+        inverter.addInputPath(tempCurrent);
+        inverter.addInputPath(outlinkDbCurrent);
+        inverter.setOutputPath(tempInverted);
+        inverter.setInputFormat(SequenceFileInputFormat.class);
+        inverter.setMapperClass(Inverter.class);
+        inverter.setReducerClass(Inverter.class);
+        inverter.setMapOutputKeyClass(Text.class);
+        inverter.setMapOutputValueClass(ObjectWritable.class);
+        inverter.setOutputKeyClass(Text.class);
+        inverter.setOutputValueClass(LinkScore.class);
+        inverter.setOutputFormat(SequenceFileOutputFormat.class);
+
+        LOG.info("Starting inverter job");
+        try {
+          JobClient.runJob(inverter);
+        }
+        catch (IOException e) {
+          LOG.error(StringUtils.stringifyException(e));
+          throw e;
+        }
+        LOG.info("Finished inverter job.");
+
+        JobConf merger = new NutchJob(conf);
+        merger.setJobName("LinkAnalysis Merger");
+        merger.addInputPath(tempInverted);
+        merger.setOutputPath(tempMerged);
+        merger.setInputFormat(SequenceFileInputFormat.class);
+        merger.setReducerClass(Merger.class);
+        merger.setMapOutputKeyClass(Text.class);
+        merger.setMapOutputValueClass(LinkScore.class);
+        merger.setOutputKeyClass(Text.class);
+        merger.setOutputValueClass(LinkScores.class);
+        merger.setOutputFormat(MapFileOutputFormat.class);
+
+        LOG.info("Starting merger job");
+        try {
+          JobClient.runJob(merger);
+        }
+        catch (IOException e) {
+          LOG.error(StringUtils.stringifyException(e));
+          throw e;
+        }
+        LOG.info("Finished merger job.");
+      }
+
+      LOG.info("Installing new link scores");
+      Path old = new Path(linkscores + ".old");
+      if (fs.exists(linkscores)) {
+        fs.rename(linkscores, old);
+      }
+      fs.rename(tempScores, linkscores);
+      if (fs.exists(old)) {
+        fs.delete(old);
+      }
+      LOG.info("Finished analysis iteration " + (i + 1) + " of "
+        + numIterations);
+    }
+    LOG.info("Finished analysis");
+
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkAnalysis(),
+      args);
+    System.exit(res);
+  }
+
+  public int run(String[] args)
+    throws Exception {
+
+    Options options = new Options();
+    Option help = OptionBuilder.withArgName("help").hasArg().withDescription(
+      "show this help message").create("help");
+    Option outlinkOpts = OptionBuilder.withArgName("outlinks").hasArg()
+      .withDescription("the outlinkdb to use").create("outlinks");
+    Option linkanalysisOpts = OptionBuilder.withArgName("linkanalysis")
+      .hasArg().withDescription("the linkanalysis output to use").create(
+        "linkanalysis");
+    options.addOption(outlinkOpts);
+    options.addOption(linkanalysisOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help")
+        || !(line.hasOption("outlinks") || line.hasOption("linkanalysis"))) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("LinkAnalysis", options);
+        return -1;
+      }
+
+      String outlinkdb = line.getOptionValue("outlinks");
+      String output = line.getOptionValue("linkanalysis");
+
+      analyze(new Path(outlinkdb), new Path(output));
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("LinkAnalysis: " + StringUtils.stringifyException(e));
+      return -2;
+    }
+  }
+}
Index: src/java/org/apache/nutch/scoring/ScoreUpdater.java
===================================================================
--- src/java/org/apache/nutch/scoring/ScoreUpdater.java	(revision 0)
+++ src/java/org/apache/nutch/scoring/ScoreUpdater.java	(revision 0)
@@ -0,0 +1,164 @@
+package org.apache.nutch.scoring;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.scoring.LinkAnalysis.LinkScore;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+public class ScoreUpdater
+  extends Configured
+  implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(ScoreUpdater.class);
+
+  private static class Updater
+    implements Mapper<Text, Writable, Text, ObjectWritable>,
+    Reducer<Text, ObjectWritable, Text, CrawlDatum> {
+
+    private JobConf conf;
+    private float clearScore = 0.0f;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      clearScore = conf.getFloat("link.score.updater.clear.score", 0.0f);
+    }
+
+    public void map(Text key, Writable value,
+      OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+      throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      objWrite.set(value);
+      output.collect(key, objWrite);
+    }
+
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+      OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+      throws IOException {
+
+      String url = key.toString();
+      LinkScore score = null;
+      CrawlDatum datum = null;
+
+      while (values.hasNext()) {
+        ObjectWritable next = values.next();
+        Object value = next.get();
+        if (value instanceof LinkScore) {
+          score = (LinkScore)value;
+        }
+        else if (value instanceof CrawlDatum) {
+          datum = (CrawlDatum)value;
+        }
+      }
+
+      if (datum != null) {
+        
+        if (score != null) {
+          datum.setScore((float)score.getScore());
+          LOG.debug(url + ": setting to score " + score.getScore());
+        }
+        else {
+          datum.setScore(clearScore);
+          LOG.debug(url + ": setting to clear score of " + clearScore);
+        }
+
+        output.collect(key, datum);
+      }
+      else {
+        LOG.debug(url + ": no datum");
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  public void update(Path crawldb, Path linkscores)
+    throws IOException {
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    LOG.info("Running crawldb update " + crawldb);
+    Path linkScoresCurrent = new Path(linkscores, LinkAnalysis.CURRENT_NAME);
+    Path crawlDbCurrent = new Path(crawldb, CrawlDb.CURRENT_NAME);
+    Path newCrawlDb = new Path(crawldb, Integer.toString(new Random()
+      .nextInt(Integer.MAX_VALUE)));
+
+    JobConf updater = new NutchJob(conf);
+    updater.setJobName("LinkAnalysis Update CrawlDb");
+    updater.addInputPath(crawlDbCurrent);
+    updater.addInputPath(linkScoresCurrent);
+    updater.setOutputPath(newCrawlDb);
+    updater.setInputFormat(SequenceFileInputFormat.class);
+    updater.setMapperClass(Updater.class);
+    updater.setReducerClass(Updater.class);
+    updater.setMapOutputKeyClass(Text.class);
+    updater.setMapOutputValueClass(ObjectWritable.class);
+    updater.setOutputKeyClass(Text.class);
+    updater.setOutputValueClass(CrawlDatum.class);
+    updater.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      JobClient.runJob(updater);
+    }
+    catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    LOG.info("Installing new crawldb " + crawldb);
+    CrawlDb.install(updater, crawldb);
+  }
+
+  public static void main(String[] args)
+    throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new ScoreUpdater(),
+      args);
+    System.exit(res);
+  }
+
+  public int run(String[] args)
+    throws Exception {
+    if (args.length < 2) {
+      System.out.println("Usage: ScoreUpdater <crawldb> <linkscores>");
+      System.out.println("\tcrawldb\tThe crawldb to update");
+      System.out.println("\tlinkscores\tThe linkscores to update the crawldb");
+      return -1;
+    }
+
+    try {
+      update(new Path(args[0]), new Path(args[1]));
+      return 0;
+    }
+    catch (Exception e) {
+      LOG.fatal("ScoreUpdater: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
Index: src/java/org/apache/nutch/util/MapFileUtils.java
===================================================================
--- src/java/org/apache/nutch/util/MapFileUtils.java	(revision 0)
+++ src/java/org/apache/nutch/util/MapFileUtils.java	(revision 0)
@@ -0,0 +1,21 @@
+package org.apache.nutch.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.MapFile;
+
+public class MapFileUtils {
+
+  public static void closeReaders(MapFile.Reader[] readers)
+    throws IOException {
+    if (readers != null) {
+      for (int i = 0; i < readers.length; i++) {
+        MapFile.Reader reader = readers[i];
+        if (reader != null) {
+          reader.close();
+        }
+      }
+    }
+  }
+
+}
Index: src/plugin/scoring-link/build.xml
===================================================================
--- src/plugin/scoring-link/build.xml	(revision 0)
+++ src/plugin/scoring-link/build.xml	(revision 0)
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="scoring-link" default="jar-core">
+
+  <import file="../build-plugin.xml"/>
+
+  <!-- Deploy Unit test dependencies -->
+  <target name="deps-test">
+    <ant target="deploy" inheritall="false" dir="../nutch-extensionpoints"/>
+  </target>
+
+</project>
Index: src/plugin/scoring-link/plugin.xml
===================================================================
--- src/plugin/scoring-link/plugin.xml	(revision 0)
+++ src/plugin/scoring-link/plugin.xml	(revision 0)
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<plugin
+   id="scoring-link"
+   name="Link Analysis Scoring Plug-in"
+   version="1.0.0"
+   provider-name="nutch.org">
+
+
+   <runtime>
+      <library name="scoring-link.jar">
+         <export name="*"/>
+      </library>
+   </runtime>
+
+   <extension id="org.apache.nutch.scoring.link"
+              name="LinkAnalysisScoring"
+              point="org.apache.nutch.scoring.ScoringFilter">
+
+      <implementation id="org.apache.nutch.scoring.link.LinkAnalysisScoringFilter"
+        class="org.apache.nutch.scoring.link.LinkAnalysisScoringFilter" />
+   </extension>
+
+</plugin>
Index: src/plugin/scoring-link/src/java/org/apache/nutch/scoring/link/LinkAnalysisScoringFilter.java
===================================================================
--- src/plugin/scoring-link/src/java/org/apache/nutch/scoring/link/LinkAnalysisScoringFilter.java	(revision 0)
+++ src/plugin/scoring-link/src/java/org/apache/nutch/scoring/link/LinkAnalysisScoringFilter.java	(revision 0)
@@ -0,0 +1,86 @@
+package org.apache.nutch.scoring.link;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.document.Document;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.scoring.ScoringFilter;
+import org.apache.nutch.scoring.ScoringFilterException;
+
+public class LinkAnalysisScoringFilter
+  implements ScoringFilter {
+
+  private Configuration conf;
+  private float scoreInjected = 0.001f;
+  private float normalizedScore = 1.00f;
+
+  public LinkAnalysisScoringFilter() {
+
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    normalizedScore = conf.getFloat("link.analyze.normalize.score", 1.00f);
+    scoreInjected = conf.getFloat("link.analyze.minimum.score", 0.001f);
+  }
+
+  public CrawlDatum distributeScoreToOutlinks(Text fromUrl,
+    ParseData parseData, Collection<Entry<Text, CrawlDatum>> targets,
+    CrawlDatum adjust, int allCount)
+    throws ScoringFilterException {
+    return adjust;
+  }
+
+  public float generatorSortValue(Text url, CrawlDatum datum, float initSort)
+    throws ScoringFilterException {
+    return datum.getScore() * initSort;
+  }
+
+  public float indexerScore(Text url, Document doc, CrawlDatum dbDatum,
+    CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore)
+    throws ScoringFilterException {
+    // make sure the indexer boost is always > 1
+    return (normalizedScore * dbDatum.getScore());
+  }
+
+  public void initialScore(Text url, CrawlDatum datum)
+    throws ScoringFilterException {
+    datum.setScore(0.0f);
+  }
+
+  public void injectedScore(Text url, CrawlDatum datum)
+    throws ScoringFilterException {
+    datum.setScore(scoreInjected);
+  }
+
+  public void passScoreAfterParsing(Text url, Content content, Parse parse)
+    throws ScoringFilterException {
+    parse.getData().getContentMeta().set(Nutch.SCORE_KEY,
+      content.getMetadata().get(Nutch.SCORE_KEY));
+  }
+
+  public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content)
+    throws ScoringFilterException {
+    content.getMetadata().set(Nutch.SCORE_KEY, "" + datum.getScore());
+  }
+
+  public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum,
+    List<CrawlDatum> inlinked)
+    throws ScoringFilterException {
+    // nothing to do
+  }
+
+}
Index: src/plugin/build.xml
===================================================================
--- src/plugin/build.xml	(revision 666721)
+++ src/plugin/build.xml	(working copy)
@@ -66,6 +66,7 @@
      <ant dir="query-site" target="deploy"/>
      <ant dir="query-url" target="deploy"/>
      <ant dir="scoring-opic" target="deploy"/>
+  	 <ant dir="scoring-link" target="deploy"/>
      <ant dir="summary-basic" target="deploy"/>
      <ant dir="subcollection" target="deploy"/>
      <ant dir="summary-lucene" target="deploy"/>
@@ -160,6 +161,7 @@
     <ant dir="query-site" target="clean"/>
     <ant dir="query-url" target="clean"/>
     <ant dir="scoring-opic" target="clean"/>
+  	<ant dir="scoring-link" target="clean"/>
     <ant dir="subcollection" target="clean"/>
     <ant dir="summary-basic" target="clean"/>
     <ant dir="summary-lucene" target="clean"/>

