Index: src/java/org/apache/nutch/scoring/webgraph/LinkDatum.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/LinkDatum.java (revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/LinkDatum.java (revision 0)
@@ -0,0 +1,124 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A class for holding link information including the url, anchor text, a score,
+ * the timestamp of the link and a link type.
+ */
+public class LinkDatum
+ implements Writable {
+
+ public final static byte INLINK = 1;
+ public final static byte OUTLINK = 2;
+
+ private String url = null;
+ private String anchor = "";
+ private float score = 0.0f;
+ private long timestamp = 0L;
+ private byte linkType = 0;
+
+ /**
+ * Default constructor, no url, timestamp, score, or link type.
+ */
+ public LinkDatum() {
+
+ }
+
+ /**
+ * Creates a LinkDatum with a given url. Timestamp is set to current time.
+ *
+ * @param url The link url.
+ */
+ public LinkDatum(String url) {
+ this(url, "", System.currentTimeMillis());
+ }
+
+ /**
+ * Creates a LinkDatum with a url and an anchor text. Timestamp is set to
+ * current time.
+ *
+ * @param url The link url.
+ * @param anchor The link anchor text.
+ */
+ public LinkDatum(String url, String anchor) {
+ this(url, anchor, System.currentTimeMillis());
+ }
+
+ public LinkDatum(String url, String anchor, long timestamp) {
+ this.url = url;
+ this.anchor = anchor;
+ this.timestamp = timestamp;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getAnchor() {
+ return anchor;
+ }
+
+ public void setAnchor(String anchor) {
+ this.anchor = anchor;
+ }
+
+ public float getScore() {
+ return score;
+ }
+
+ public void setScore(float score) {
+ this.score = score;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public byte getLinkType() {
+ return linkType;
+ }
+
+ public void setLinkType(byte linkType) {
+ this.linkType = linkType;
+ }
+
+ public void readFields(DataInput in)
+ throws IOException {
+ url = Text.readString(in);
+ anchor = Text.readString(in);
+ score = in.readFloat();
+ timestamp = in.readLong();
+ linkType = in.readByte();
+ }
+
+ public void write(DataOutput out)
+ throws IOException {
+ Text.writeString(out, url);
+ Text.writeString(out, anchor != null ? anchor : "");
+ out.writeFloat(score);
+ out.writeLong(timestamp);
+ out.writeByte(linkType);
+ }
+
+ public String toString() {
+
+ String type = (linkType == INLINK ? "inlink" : (linkType == OUTLINK)
+ ? "outlink" : "unknown");
+ return "url: " + url + ", anchor: " + anchor + ", score: " + score
+ + ", timestamp: " + timestamp + ", link type: " + type;
+ }
+}
Index: src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java (revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java (revision 0)
@@ -0,0 +1,434 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.scoring.webgraph.Loops.LoopSet;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/**
+ * The LinkDumper tool creates a database of node to inlink information that can
+ * be read using the nested Reader class. This allows the inlink and scoring
+ * state of a single url to be reviewed quickly to determine why a given url is
+ * ranking a certain way. This tool is to be used with the LinkRank analysis.
+ */
+public class LinkDumper
+ extends Configured
+ implements Tool {
+
+ public static final Log LOG = LogFactory.getLog(LinkDumper.class);
+ public static final String DUMP_DIR = "linkdump";
+
+ /**
+ * Reader class which will print out the url and all of its inlinks to system
+ * out. Each inlinkwill be displayed with its node information including
+ * score and number of in and outlinks.
+ */
+ public static class Reader {
+
+ public static void main(String[] args)
+ throws Exception {
+
+ // open the readers for the linkdump directory
+ Configuration conf = NutchConfiguration.create();
+ FileSystem fs = FileSystem.get(conf);
+ Path webGraphDb = new Path(args[0]);
+ String url = args[1];
+ MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(
+ webGraphDb, DUMP_DIR), conf);
+
+ // get the link nodes for the url
+ Text key = new Text(url);
+ LinkNodes nodes = new LinkNodes();
+ MapFileOutputFormat.getEntry(readers,
+ new HashPartitioner(), key, nodes);
+
+ // print out the link nodes
+ LinkNode[] linkNodesAr = nodes.getLinks();
+ System.out.println(url + ":");
+ for (LinkNode node : linkNodesAr) {
+ System.out.println(" " + node.getUrl() + " - "
+ + node.getNode().toString());
+ }
+
+ // close the readers
+ FSUtils.closeReaders(readers);
+ }
+ }
+
+ /**
+ * Bean class which holds url to node information.
+ */
+ public static class LinkNode
+ implements Writable {
+
+ private String url = null;
+ private Node node = null;
+
+ public LinkNode() {
+
+ }
+
+ public LinkNode(String url, Node node) {
+ this.url = url;
+ this.node = node;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public Node getNode() {
+ return node;
+ }
+
+ public void setNode(Node node) {
+ this.node = node;
+ }
+
+ public void readFields(DataInput in)
+ throws IOException {
+ url = in.readUTF();
+ node = new Node();
+ node.readFields(in);
+ }
+
+ public void write(DataOutput out)
+ throws IOException {
+ out.writeUTF(url);
+ node.write(out);
+ }
+
+ }
+
+ /**
+ * Writable class which holds an array of LinkNode objects.
+ */
+ public static class LinkNodes
+ implements Writable {
+
+ private LinkNode[] links;
+
+ public LinkNodes() {
+
+ }
+
+ public LinkNodes(LinkNode[] links) {
+ this.links = links;
+ }
+
+ public LinkNode[] getLinks() {
+ return links;
+ }
+
+ public void setLinks(LinkNode[] links) {
+ this.links = links;
+ }
+
+ public void readFields(DataInput in)
+ throws IOException {
+ int numLinks = in.readInt();
+ if (numLinks > 0) {
+ links = new LinkNode[numLinks];
+ for (int i = 0; i < numLinks; i++) {
+ LinkNode node = new LinkNode();
+ node.readFields(in);
+ links[i] = node;
+ }
+ }
+ }
+
+ public void write(DataOutput out)
+ throws IOException {
+ if (links != null && links.length > 0) {
+ int numLinks = links.length;
+ out.writeInt(numLinks);
+ for (int i = 0; i < numLinks; i++) {
+ links[i].write(out);
+ }
+ }
+ }
+ }
+
+ /**
+ * Inverts outlinks from the WebGraph to inlinks and attaches node
+ * information.
+ */
+ public static class Inverter
+ implements Mapper,
+ Reducer {
+
+ private JobConf conf;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Wraps all values in ObjectWritables.
+ */
+ public void map(Text key, Writable value,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ ObjectWritable objWrite = new ObjectWritable();
+ objWrite.set(value);
+ output.collect(key, objWrite);
+ }
+
+ /**
+ * Inverts outlinks to inlinks while attaching node information to the
+ * outlink.
+ */
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ String fromUrl = key.toString();
+ List outlinks = new ArrayList();
+ Node node = null;
+ LoopSet loops = null;
+
+ // loop through all values aggregating outlinks, saving node and loopset
+ while (values.hasNext()) {
+ ObjectWritable write = values.next();
+ Object obj = write.get();
+ if (obj instanceof Node) {
+ node = (Node)obj;
+ }
+ else if (obj instanceof LinkDatum) {
+ outlinks.add((LinkDatum)WritableUtils.clone((LinkDatum)obj, conf));
+ }
+ else if (obj instanceof LoopSet) {
+ loops = (LoopSet)obj;
+ }
+ }
+
+ // only collect if there are outlinks
+ int numOutlinks = node.getNumOutlinks();
+ if (numOutlinks > 0) {
+
+ Set loopSet = (loops != null) ? loops.getLoopSet() : null;
+ for (int i = 0; i < outlinks.size(); i++) {
+ LinkDatum outlink = outlinks.get(i);
+ String toUrl = outlink.getUrl();
+
+ // remove any url that is in the loopset, same as LinkRank
+ if (loopSet != null && loopSet.contains(toUrl)) {
+ continue;
+ }
+
+ // collect the outlink as an inlink with the node
+ output.collect(new Text(toUrl), new LinkNode(fromUrl, node));
+ }
+ }
+ }
+
+ public void close() {
+ }
+ }
+
+ /**
+ * Merges LinkNode objects into a single array value per url. This allows
+ * all values to be quickly retrieved and printed via the Reader tool.
+ */
+ public static class Merger
+ implements Reducer {
+
+ private JobConf conf;
+ private int maxInlinks = 50000;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Aggregate all LinkNode objects for a given url.
+ */
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ List nodeList = new ArrayList();
+ int numNodes = 0;
+
+ while (values.hasNext()) {
+ LinkNode cur = values.next();
+ if (numNodes < maxInlinks) {
+ nodeList.add((LinkNode)WritableUtils.clone(cur, conf));
+ numNodes++;
+ }
+ else {
+ break;
+ }
+ }
+
+ LinkNode[] linkNodesAr = nodeList.toArray(new LinkNode[nodeList.size()]);
+ LinkNodes linkNodes = new LinkNodes(linkNodesAr);
+ output.collect(key, linkNodes);
+ }
+
+ public void close() {
+
+ }
+ }
+
+ /**
+ * Runs the inverter and merger jobs of the LinkDumper tool to create the
+ * url to inlink node database.
+ */
+ public void dumpLinks(Path webGraphDb)
+ throws IOException {
+
+ LOG.info("NodeDumper: starting");
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ Path linkdump = new Path(webGraphDb, DUMP_DIR);
+ Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+ Path loopSetDb = new Path(webGraphDb, Loops.LOOPS_DIR);
+ boolean loopsExists = fs.exists(loopSetDb);
+ Path outlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR);
+
+ // run the inverter job
+ Path tempInverted = new Path(webGraphDb, "inverted-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+ JobConf inverter = new NutchJob(conf);
+ inverter.setJobName("LinkDumper: inverter");
+ FileInputFormat.addInputPath(inverter, nodeDb);
+ if (loopsExists) {
+ FileInputFormat.addInputPath(inverter, loopSetDb);
+ }
+ FileInputFormat.addInputPath(inverter, outlinkDb);
+ inverter.setInputFormat(SequenceFileInputFormat.class);
+ inverter.setMapperClass(Inverter.class);
+ inverter.setReducerClass(Inverter.class);
+ inverter.setMapOutputKeyClass(Text.class);
+ inverter.setMapOutputValueClass(ObjectWritable.class);
+ inverter.setOutputKeyClass(Text.class);
+ inverter.setOutputValueClass(LinkNode.class);
+ FileOutputFormat.setOutputPath(inverter, tempInverted);
+ inverter.setOutputFormat(SequenceFileOutputFormat.class);
+
+ try {
+ LOG.info("LinkDumper: running inverter");
+ JobClient.runJob(inverter);
+ LOG.info("LinkDumper: finished inverter");
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ // run the merger job
+ JobConf merger = new NutchJob(conf);
+ merger.setJobName("LinkDumper: merger");
+ FileInputFormat.addInputPath(merger, tempInverted);
+ merger.setInputFormat(SequenceFileInputFormat.class);
+ merger.setReducerClass(Merger.class);
+ merger.setMapOutputKeyClass(Text.class);
+ merger.setMapOutputValueClass(LinkNode.class);
+ merger.setOutputKeyClass(Text.class);
+ merger.setOutputValueClass(LinkNodes.class);
+ FileInputFormat.addInputPath(merger, linkdump);
+ merger.setOutputFormat(MapFileOutputFormat.class);
+
+ try {
+ LOG.info("LinkDumper: running merger");
+ JobClient.runJob(merger);
+ LOG.info("LinkDumper: finished merger");
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ fs.delete(tempInverted, true);
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new LinkDumper(),
+ args);
+ System.exit(res);
+ }
+
+ /**
+ * Runs the LinkDumper tool. This simply creates the database, to read the
+ * values the nested Reader tool must be used.
+ */
+ public int run(String[] args)
+ throws Exception {
+
+ Options options = new Options();
+ Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+ "show this help message").create("help");
+ Option webGraphDbOpts = OptionBuilder.withArgName("webgraphdb").hasArg()
+ .withDescription("the web graph database to use").create("webgraphdb");
+ options.addOption(helpOpts);
+ options.addOption(webGraphDbOpts);
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("LinkDumper", options);
+ return -1;
+ }
+
+ String webGraphDb = line.getOptionValue("webgraphdb");
+ dumpLinks(new Path(webGraphDb));
+ return 0;
+ }
+ catch (Exception e) {
+ LOG.fatal("LinkDumper: " + StringUtils.stringifyException(e));
+ return -2;
+ }
+ }
+}
Index: src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/LinkRank.java (revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/LinkRank.java (revision 0)
@@ -0,0 +1,665 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.scoring.webgraph.Loops.LoopSet;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.URLUtil;
+
+public class LinkRank
+ extends Configured
+ implements Tool {
+
+ public static final Log LOG = LogFactory.getLog(LinkRank.class);
+ private static final String NUM_NODES = "_num_nodes_";
+
+ /**
+ * Runs the counter job. The counter job determines the number of links in the
+ * webgraph. This is used during analysis.
+ *
+ * @param fs The job file system.
+ * @param webGraphDb The web graph database to use.
+ *
+ * @return The number of nodes in the web graph.
+ * @throws IOException If an error occurs while running the counter job.
+ */
+ private int runCounter(FileSystem fs, Path webGraphDb)
+ throws IOException {
+
+ // configure the counter job
+ Path numLinksPath = new Path(webGraphDb, NUM_NODES);
+ Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+ JobConf counter = new NutchJob(getConf());
+ counter.setJobName("LinkRank Counter");
+ FileInputFormat.addInputPath(counter, nodeDb);
+ FileOutputFormat.setOutputPath(counter, numLinksPath);
+ counter.setInputFormat(SequenceFileInputFormat.class);
+ counter.setMapperClass(Counter.class);
+ counter.setCombinerClass(Counter.class);
+ counter.setReducerClass(Counter.class);
+ counter.setMapOutputKeyClass(Text.class);
+ counter.setMapOutputValueClass(LongWritable.class);
+ counter.setOutputKeyClass(Text.class);
+ counter.setOutputValueClass(LongWritable.class);
+ counter.setNumReduceTasks(1);
+ counter.setOutputFormat(TextOutputFormat.class);
+
+ // run the counter job, outputs to a single reduce task and file
+ LOG.info("Starting link counter job");
+ try {
+ JobClient.runJob(counter);
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ LOG.info("Finished link counter job");
+
+ // read the first (and only) line from the file which should be the
+ // number of links in the web graph
+ LOG.info("Reading numlinks temp file");
+ FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-00000"));
+ BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks));
+ String numLinksLine = buffer.readLine();
+ readLinks.close();
+
+ // delete temp file and convert and return the number of links as an int
+ LOG.info("Deleting numlinks temp file");
+ fs.delete(numLinksPath, true);
+ String numLinks = numLinksLine.split("\\s+")[1];
+ return Integer.parseInt(numLinks);
+ }
+
+ /**
+ * Runs the initializer job. The initializer job sets up the nodes with a
+ * default starting score for link analysis.
+ *
+ * @param nodeDb The node database to use.
+ * @param output The job output directory.
+ *
+ * @throws IOException If an error occurs while running the initializer job.
+ */
+ private void runInitializer(Path nodeDb, Path output)
+ throws IOException {
+
+ // configure the initializer
+ JobConf initializer = new NutchJob(getConf());
+ initializer.setJobName("LinkAnalysis Initializer");
+ FileInputFormat.addInputPath(initializer, nodeDb);
+ FileOutputFormat.setOutputPath(initializer, output);
+ initializer.setInputFormat(SequenceFileInputFormat.class);
+ initializer.setMapperClass(Initializer.class);
+ initializer.setMapOutputKeyClass(Text.class);
+ initializer.setMapOutputValueClass(Node.class);
+ initializer.setOutputKeyClass(Text.class);
+ initializer.setOutputValueClass(Node.class);
+ initializer.setOutputFormat(MapFileOutputFormat.class);
+
+ // run the initializer
+ LOG.info("Starting initialization job");
+ try {
+ JobClient.runJob(initializer);
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ LOG.info("Finished initialization job.");
+ }
+
+ /**
+ * Runs the inverter job. The inverter job flips outlinks to inlinks to be
+ * passed into the analysis job.
+ *
+ * The inverter job takes a link loops database if it exists. It is an
+ * optional componenet of link analysis due to its extreme computational and
+ * space requirements but it can be very useful is weeding out and eliminating
+ * link farms and other spam pages.
+ *
+ * @param nodeDb The node database to use.
+ * @param outlinkDb The outlink database to use.
+ * @param loopDb The loop database to use if it exists.
+ * @param output The output directory.
+ *
+ * @throws IOException If an error occurs while running the inverter job.
+ */
+ private void runInverter(Path nodeDb, Path outlinkDb, Path loopDb, Path output)
+ throws IOException {
+
+ // configure the inverter
+ JobConf inverter = new NutchJob(getConf());
+ inverter.setJobName("LinkAnalysis Inverter");
+ FileInputFormat.addInputPath(inverter, nodeDb);
+ FileInputFormat.addInputPath(inverter, outlinkDb);
+
+ // add the loop database if it exists, isn't null
+ if (loopDb != null) {
+ FileInputFormat.addInputPath(inverter, loopDb);
+ }
+ FileOutputFormat.setOutputPath(inverter, output);
+ inverter.setInputFormat(SequenceFileInputFormat.class);
+ inverter.setMapperClass(Inverter.class);
+ inverter.setReducerClass(Inverter.class);
+ inverter.setMapOutputKeyClass(Text.class);
+ inverter.setMapOutputValueClass(ObjectWritable.class);
+ inverter.setOutputKeyClass(Text.class);
+ inverter.setOutputValueClass(LinkDatum.class);
+ inverter.setOutputFormat(SequenceFileOutputFormat.class);
+
+ // run the inverter job
+ LOG.info("Starting inverter job");
+ try {
+ JobClient.runJob(inverter);
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ LOG.info("Finished inverter job.");
+ }
+
+ /**
+ * Runs the link analysis job. The link analysis job applies the link rank
+ * formula to create a score per url and stores that score in the NodeDb.
+ *
+ * Typically the link analysis job is run a number of times to allow the link
+ * rank scores to converge.
+ *
+ * @param nodeDb The node database from which we are getting previous link
+ * rank scores.
+ * @param inverted The inverted inlinks
+ * @param output The link analysis output.
+ * @param iteration The current iteration number.
+ * @param numIterations The total number of link analysis iterations
+ *
+ * @throws IOException If an error occurs during link analysis.
+ */
+ private void runAnalysis(Path nodeDb, Path inverted, Path output,
+ int iteration, int numIterations, float rankOne)
+ throws IOException {
+
+ JobConf analyzer = new NutchJob(getConf());
+ analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1));
+ analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1)
+ + " of " + numIterations);
+ FileInputFormat.addInputPath(analyzer, nodeDb);
+ FileInputFormat.addInputPath(analyzer, inverted);
+ FileOutputFormat.setOutputPath(analyzer, output);
+ analyzer.set("link.analyze.rank.one", String.valueOf(rankOne));
+ analyzer.setMapOutputKeyClass(Text.class);
+ analyzer.setMapOutputValueClass(ObjectWritable.class);
+ analyzer.setInputFormat(SequenceFileInputFormat.class);
+ analyzer.setMapperClass(Analyzer.class);
+ analyzer.setReducerClass(Analyzer.class);
+ analyzer.setOutputKeyClass(Text.class);
+ analyzer.setOutputValueClass(Node.class);
+ analyzer.setOutputFormat(MapFileOutputFormat.class);
+
+ LOG.info("Starting analysis job");
+ try {
+ JobClient.runJob(analyzer);
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ LOG.info("Finished analysis job.");
+ }
+
+ /**
+ * The Counter job that determines the total number of nodes in the WebGraph.
+ * This is used to determine a rank one score for pages with zero inlinks but
+ * that contain outlinks.
+ */
+ private static class Counter
+ implements Mapper,
+ Reducer {
+
+ private JobConf conf;
+ private static Text numNodes = new Text(NUM_NODES);
+ private static LongWritable one = new LongWritable(1L);
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Outputs one for every node.
+ */
+ public void map(Text key, Node value,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+ output.collect(numNodes, one);
+ }
+
+ /**
+ * Totals the node number and outputs a single total value.
+ */
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ long total = 0;
+ while (values.hasNext()) {
+ total += values.next().get();
+ }
+ output.collect(numNodes, new LongWritable(total));
+ }
+
+ public void close() {
+ }
+ }
+
+ private static class Initializer
+ implements Mapper {
+
+ private JobConf conf;
+ private float initialScore = 1.0f;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ initialScore = conf.getFloat("link.analyze.initial.score", 1.0f);
+ }
+
+ public void map(Text key, Node node, OutputCollector output,
+ Reporter reporter)
+ throws IOException {
+
+ String url = key.toString();
+ Node outNode = (Node)WritableUtils.clone(node, conf);
+ outNode.setInlinkScore(initialScore);
+
+ output.collect(new Text(url), outNode);
+ }
+
+ public void close() {
+ }
+ }
+
+ /**
+ * Inverts outlinks and attaches current score from the NodeDb of the
+ * WebGraph. The link analysis process consists of inverting, analyzing and
+ * scoring, in a loop for a given number of iterations.
+ */
+ private static class Inverter
+ implements Mapper,
+ Reducer {
+
+ private JobConf conf;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Convert values to ObjectWritable
+ */
+ public void map(Text key, Writable value,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ ObjectWritable objWrite = new ObjectWritable();
+ objWrite.set(value);
+ output.collect(key, objWrite);
+ }
+
+ /**
+ * Inverts outlinks to inlinks, attaches current score for the outlink from
+ * the NodeDb of the WebGraph and removes any outlink that is contained
+ * within the loopset.
+ */
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ String fromUrl = key.toString();
+ List outlinks = new ArrayList();
+ Node node = null;
+ LoopSet loops = null;
+
+ // aggregate outlinks, assign other values
+ while (values.hasNext()) {
+ ObjectWritable write = values.next();
+ Object obj = write.get();
+ if (obj instanceof Node) {
+ node = (Node)obj;
+ }
+ else if (obj instanceof LinkDatum) {
+ outlinks.add((LinkDatum)WritableUtils.clone((LinkDatum)obj, conf));
+ }
+ else if (obj instanceof LoopSet) {
+ loops = (LoopSet)obj;
+ }
+ }
+
+ // get the number of outlinks and the current inlink and outlink scores
+ // from the node of the url
+ int numOutlinks = node.getNumOutlinks();
+ float inlinkScore = node.getInlinkScore();
+ float outlinkScore = node.getOutlinkScore();
+ LOG.debug(fromUrl + ": num outlinks " + numOutlinks);
+
+ // can't invert if no outlinks
+ if (numOutlinks > 0) {
+
+ Set loopSet = (loops != null) ? loops.getLoopSet() : null;
+ for (int i = 0; i < outlinks.size(); i++) {
+ LinkDatum outlink = outlinks.get(i);
+ String toUrl = outlink.getUrl();
+
+ // remove any url that is contained in the loopset
+ if (loopSet != null && loopSet.contains(toUrl)) {
+ LOG.debug(fromUrl + ": Skipping inverting inlink from loop "
+ + toUrl);
+ continue;
+ }
+ outlink.setUrl(fromUrl);
+ outlink.setScore(outlinkScore);
+
+ // collect the inverted outlink
+ output.collect(new Text(toUrl), outlink);
+ LOG.debug(toUrl + ": inverting inlink from " + fromUrl
+ + " origscore: " + inlinkScore + " numOutlinks: " + numOutlinks
+ + " inlinkscore: " + outlinkScore);
+ }
+ }
+ }
+
+ public void close() {
+ }
+ }
+
+ /**
+ * Runs a single link analysis iteration.
+ */
+ private static class Analyzer
+ implements Mapper,
+ Reducer {
+
+ private JobConf conf;
+ private float dampingFactor = 0.85f;
+ private float rankOne = 0.0f;
+ private int itNum = 0;
+ private boolean limitPages = true;
+ private boolean limitDomains = true;
+
+ /**
+ * Configures the job, sets the damping factor, rank one score, and other
+ * needed values for analysis.
+ */
+ public void configure(JobConf conf) {
+
+ try {
+ this.conf = conf;
+ this.dampingFactor = conf.getFloat("link.analyze.damping.factor", 0.85f);
+ this.rankOne = conf.getFloat("link.analyze.rank.one", 0.0f);
+ this.itNum = conf.getInt("link.analyze.iteration", 0);
+ limitPages = conf.getBoolean("link.ignore.limit.page", true);
+ limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
+ }
+ catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Convert values to ObjectWritable
+ */
+ public void map(Text key, Writable value,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ ObjectWritable objWrite = new ObjectWritable();
+ objWrite.set(WritableUtils.clone(value, conf));
+ output.collect(key, objWrite);
+ }
+
+ /**
+ * Performs a single iteration of link analysis. The resulting scores are
+ * stored in a temporary NodeDb which replaces the NodeDb of the WebGraph.
+ */
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ String url = key.toString();
+ Set domains = new HashSet();
+ Set pages = new HashSet();
+ Node node = null;
+
+ // a page with zero inlinks has a score of rankOne
+ int numInlinks = 0;
+ float totalInlinkScore = rankOne;
+
+ while (values.hasNext()) {
+
+ ObjectWritable next = values.next();
+ Object value = next.get();
+ if (value instanceof Node) {
+ node = (Node)value;
+ }
+ else if (value instanceof LinkDatum) {
+
+ LinkDatum linkDatum = (LinkDatum)value;
+ float scoreFromInlink = linkDatum.getScore();
+ String inlinkUrl = linkDatum.getUrl();
+ String inLinkDomain = URLUtil.getDomainName(inlinkUrl);
+ String inLinkPage = URLUtil.getPage(inlinkUrl);
+
+ // limit counting duplicate inlinks by pages or domains
+ if ((limitPages && pages.contains(inLinkPage))
+ || (limitDomains && domains.contains(inLinkDomain))) {
+ LOG.debug(url + ": ignoring " + scoreFromInlink + " from "
+ + inlinkUrl + ", duplicate page or domain");
+ continue;
+ }
+
+ // aggregate total inlink score
+ numInlinks++;
+ totalInlinkScore += scoreFromInlink;
+ domains.add(inLinkDomain);
+ pages.add(inLinkPage);
+ LOG.debug(url + ": adding " + scoreFromInlink + " from " + inlinkUrl
+ + ", total: " + totalInlinkScore);
+ }
+ }
+
+ // calculate linkRank score formula
+ float linkRankScore = (1 - this.dampingFactor)
+ + (this.dampingFactor * totalInlinkScore);
+
+ LOG.info(url + ": score: " + linkRankScore + " num inlinks: "
+ + numInlinks + " iteration: " + itNum + "\n");
+
+ // store the score in a temporary NodeDb
+ Node outNode = (Node)WritableUtils.clone(node, conf);
+ outNode.setInlinkScore(linkRankScore);
+ output.collect(key, outNode);
+ }
+
+ public void close()
+ throws IOException {
+ }
+ }
+
+ /**
+ * Default constructor.
+ */
+ public LinkRank() {
+ super();
+ }
+
+ /**
+ * Configurable constructor.
+ */
+ public LinkRank(Configuration conf) {
+ super(conf);
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Runs the complete link analysis job. The complete job determins rank one
+ * score. Then runs through a given number of invert and analyze iterations,
+ * by default 10. And finally replaces the NodeDb in the WebGraph with the
+ * link rank output.
+ *
+ * @param webGraphDb The WebGraph to run link analysis on.
+ *
+ * @throws IOException If an error occurs during link analysis.
+ */
+ public void analyze(Path webGraphDb)
+ throws IOException {
+
+ // store the link rank under the webgraphdb temporarily, final scores get
+ // upddated into the nodedb
+ Path linkRank = new Path(webGraphDb, "linkrank");
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ // create the linkrank directory if needed
+ if (!fs.exists(linkRank)) {
+ fs.mkdirs(linkRank);
+ }
+
+ // the webgraph outlink and node database paths
+ Path wgOutlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR);
+ Path wgNodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+ Path nodeDb = new Path(linkRank, WebGraph.NODE_DIR);
+ Path loopDb = new Path(webGraphDb, Loops.LOOPS_DIR);
+ if (!fs.exists(loopDb)) {
+ loopDb = null;
+ }
+
+ // get the number of total nodes in the webgraph, used for rank one, then
+ // initialze all urls with a default score
+ int numLinks = runCounter(fs, webGraphDb);
+ runInitializer(wgNodeDb, nodeDb);
+ float rankOneScore = (1f / (float)numLinks);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Number of links " + numLinks);
+ LOG.info("Rank One " + rankOneScore);
+ }
+
+ // run invert and analysis for a given number of iterations to allow the
+ // link rank scores to converge
+ int numIterations = conf.getInt("link.analyze.num.iterations", 10);
+ for (int i = 0; i < numIterations; i++) {
+
+ // the input to inverting is always the previous output from analysis
+ LOG.info("Running iteration " + (i + 1) + " of " + numIterations);
+ Path tempRank = new Path(linkRank + "-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+ fs.mkdirs(tempRank);
+ Path tempInverted = new Path(tempRank, "inverted");
+ Path tempNodeDb = new Path(tempRank, WebGraph.NODE_DIR);
+
+ // run invert and analysis
+ runInverter(nodeDb, wgOutlinkDb, loopDb, tempInverted);
+ runAnalysis(nodeDb, tempInverted, tempNodeDb, i, numIterations,
+ rankOneScore);
+
+ // replace the temporary NodeDb with the output from analysis
+ LOG.info("Installing new link scores");
+ FSUtils.replace(fs, linkRank, tempRank, true);
+ LOG.info("Finished analysis iteration " + (i + 1) + " of "
+ + numIterations);
+ }
+
+ // replace the NodeDb in the WebGraph with the final output of analysis
+ LOG.info("Installing web graph nodes");
+ FSUtils.replace(fs, wgNodeDb, nodeDb, true);
+
+ // remove the temporary link rank folder
+ fs.delete(linkRank, true);
+ LOG.info("Finished analysis");
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new LinkRank(), args);
+ System.exit(res);
+ }
+
+ /**
+ * Runs the LinkRank tool.
+ */
+ public int run(String[] args)
+ throws Exception {
+
+ Options options = new Options();
+ Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+ "show this help message").create("help");
+ Option webgraphOpts = OptionBuilder.withArgName("webgraphdb").hasArg().withDescription(
+ "the web graph db to use").create("webgraphdb");
+ options.addOption(helpOpts);
+ options.addOption(webgraphOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("LinkRank", options);
+ return -1;
+ }
+
+ String webGraphDb = line.getOptionValue("webgraphdb");
+
+ analyze(new Path(webGraphDb));
+ return 0;
+ }
+ catch (Exception e) {
+ LOG.fatal("LinkAnalysis: " + StringUtils.stringifyException(e));
+ return -2;
+ }
+ }
+}
Index: src/java/org/apache/nutch/scoring/webgraph/LoopReader.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/LoopReader.java (revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/LoopReader.java (revision 0)
@@ -0,0 +1,107 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.nutch.scoring.webgraph.Loops.LoopSet;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+
+/**
+ * The LoopReader tool prints the loopset information for a single url.
+ */
+public class LoopReader {
+
+ private Configuration conf;
+ private FileSystem fs;
+ private MapFile.Reader[] loopReaders;
+
+ /**
+ * Prints loopset for a single url. The loopset information will show any
+ * outlink url the eventually forms a link cycle.
+ *
+ * @param webGraphDb The WebGraph to check for loops
+ * @param url The url to check.
+ *
+ * @throws IOException If an error occurs while printing loopset information.
+ */
+ public void dumpUrl(Path webGraphDb, String url)
+ throws IOException {
+
+ // open the readers
+ conf = NutchConfiguration.create();
+ fs = FileSystem.get(conf);
+ loopReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb,
+ Loops.LOOPS_DIR), conf);
+
+ // get the loopset for a given url, if any
+ Text key = new Text(url);
+ LoopSet loop = new LoopSet();
+ MapFileOutputFormat.getEntry(loopReaders,
+ new HashPartitioner(), key, loop);
+
+ // print out each loop url in the set
+ System.out.println(url + ":");
+ for (String loopUrl : loop.getLoopSet()) {
+ System.out.println(" " + loopUrl);
+ }
+
+ // close the readers
+ FSUtils.closeReaders(loopReaders);
+ }
+
+ /**
+ * Runs the LoopReader tool. For this tool to work the loops job must have
+ * already been run on the corresponding WebGraph.
+ */
+ public static void main(String[] args)
+ throws Exception {
+
+ Options options = new Options();
+ Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+ "show this help message").create("help");
+ Option webGraphOpts = OptionBuilder.withArgName("webgraphdb").hasArg()
+ .withDescription("the webgraphdb to use").create("webgraphdb");
+ Option urlOpts = OptionBuilder.withArgName("url").hasOptionalArg()
+ .withDescription("the url to dump").create("url");
+ options.addOption(helpOpts);
+ options.addOption(webGraphOpts);
+ options.addOption(urlOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")
+ || !line.hasOption("url")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("WebGraphReader", options);
+ return;
+ }
+
+ String webGraphDb = line.getOptionValue("webgraphdb");
+ String url = line.getOptionValue("url");
+ LoopReader reader = new LoopReader();
+ reader.dumpUrl(new Path(webGraphDb), url);
+ return;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+
+}
\ No newline at end of file
Index: src/java/org/apache/nutch/scoring/webgraph/Loops.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/Loops.java (revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/Loops.java (revision 0)
@@ -0,0 +1,590 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/**
+ * The Loops job identifies cycles of loops inside of the web graph. This is
+ * then used in the LinkRank program to remove those links from consideration
+ * during link analysis.
+ *
+ * This job will identify both reciprocal links and cycles of 2+ links up to a
+ * set depth to check. The Loops job is expensive in both computational and
+ * space terms. Because it checks outlinks of outlinks of outlinks for cycles
+ * its intermediate output can be extremly large even if the end output is
+ * rather small. Because of this the Loops job is optional and if it doesn't
+ * exist then it won't be factored into the LinkRank program.
+ */
+public class Loops
+ extends Configured
+ implements Tool {
+
+ public static final Log LOG = LogFactory.getLog(Loops.class);
+ public static final String LOOPS_DIR = "loops";
+ public static final String ROUTES_DIR = "routes";
+
+ /**
+ * A link path or route looking to identify a link cycle.
+ */
+ public static class Route
+ implements Writable {
+
+ private String outlinkUrl = null;
+ private String lookingFor = null;
+ private boolean found = false;
+
+ public Route() {
+
+ }
+
+ public String getOutlinkUrl() {
+ return outlinkUrl;
+ }
+
+ public void setOutlinkUrl(String outlinkUrl) {
+ this.outlinkUrl = outlinkUrl;
+ }
+
+ public String getLookingFor() {
+ return lookingFor;
+ }
+
+ public void setLookingFor(String lookingFor) {
+ this.lookingFor = lookingFor;
+ }
+
+ public boolean isFound() {
+ return found;
+ }
+
+ public void setFound(boolean found) {
+ this.found = found;
+ }
+
+ public void readFields(DataInput in)
+ throws IOException {
+
+ outlinkUrl = Text.readString(in);
+ lookingFor = Text.readString(in);
+ found = in.readBoolean();
+ }
+
+ public void write(DataOutput out)
+ throws IOException {
+ Text.writeString(out, outlinkUrl);
+ Text.writeString(out, lookingFor);
+ out.writeBoolean(found);
+ }
+ }
+
+ /**
+ * A set of loops.
+ */
+ public static class LoopSet
+ implements Writable {
+
+ private Set loopSet = new HashSet();
+
+ public LoopSet() {
+
+ }
+
+ public Set getLoopSet() {
+ return loopSet;
+ }
+
+ public void setLoopSet(Set loopSet) {
+ this.loopSet = loopSet;
+ }
+
+ public void readFields(DataInput in)
+ throws IOException {
+
+ int numNodes = in.readInt();
+ loopSet = new HashSet();
+ for (int i = 0; i < numNodes; i++) {
+ String url = Text.readString(in);
+ loopSet.add(url);
+ }
+ }
+
+ public void write(DataOutput out)
+ throws IOException {
+
+ int numNodes = (loopSet != null ? loopSet.size() : 0);
+ out.writeInt(numNodes);
+ for (String loop : loopSet) {
+ Text.writeString(out, loop);
+ }
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ for (String loop : loopSet) {
+ builder.append(loop + ",");
+ }
+ return builder.substring(0, builder.length() - 1);
+ }
+ }
+
+ /**
+ * Initializes the Loop routes.
+ */
+ public static class Initializer
+ extends Configured
+ implements Mapper,
+ Reducer {
+
+ private JobConf conf;
+
+ /**
+ * Default constructor.
+ */
+ public Initializer() {
+ }
+
+ /**
+ * Configurable constructor.
+ */
+ public Initializer(Configuration conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Configure the job.
+ */
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Wraps values in ObjectWritable.
+ */
+ public void map(Text key, Writable value,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ ObjectWritable objWrite = new ObjectWritable();
+ objWrite.set(value);
+ output.collect(key, objWrite);
+ }
+
+ /**
+ * Takes any node that has inlinks and sets up a route for all of its
+ * outlinks. These routes will then be followed to a maximum depth inside of
+ * the Looper job.
+ */
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ String url = key.toString();
+ Node node = null;
+ List outlinkList = new ArrayList();
+
+ // collect all outlinks and assign node
+ while (values.hasNext()) {
+ ObjectWritable objWrite = values.next();
+ Object obj = objWrite.get();
+ if (obj instanceof LinkDatum) {
+ outlinkList.add((LinkDatum)obj);
+ }
+ else if (obj instanceof Node) {
+ node = (Node)obj;
+ }
+ }
+
+ // has to have inlinks otherwise cycle not possible
+ if (node != null) {
+
+ int numInlinks = node.getNumInlinks();
+ if (numInlinks > 0) {
+
+ // initialize and collect a route for every outlink
+ for (LinkDatum datum : outlinkList) {
+ String outlinkUrl = datum.getUrl();
+ Route route = new Route();
+ route.setFound(false);
+ route.setLookingFor(url);
+ route.setOutlinkUrl(outlinkUrl);
+ output.collect(new Text(outlinkUrl), route);
+ }
+ }
+ }
+ }
+
+ public void close() {
+ }
+ }
+
+ /**
+ * Follows a route path looking for the start url of the route. If the start
+ * url is found then the route is a cyclical path.
+ */
+ public static class Looper
+ extends Configured
+ implements Mapper,
+ Reducer {
+
+ private JobConf conf;
+ private boolean last = false;
+
+ /**
+ * Default constructor.
+ */
+ public Looper() {
+ }
+
+ /**
+ * Configurable constructor.
+ */
+ public Looper(Configuration conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Configure the job.
+ */
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ this.last = conf.getBoolean("last", false);
+ }
+
+ /**
+ * Wrap values in ObjectWritable.
+ */
+ public void map(Text key, Writable value,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ ObjectWritable objWrite = new ObjectWritable();
+ Writable cloned = null;
+ if (value instanceof LinkDatum) {
+ cloned = new Text(((LinkDatum)value).getUrl());
+ }
+ else {
+ cloned = WritableUtils.clone(value, conf);
+ }
+ objWrite.set(cloned);
+ output.collect(key, objWrite);
+ }
+
+ /**
+ * Performs a single loop pass looking for loop cycles within routes. If
+ * This is not the last loop cycle then url will be mapped for further
+ * passes.
+ */
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ List routeList = new ArrayList();
+ Set outlinkUrls = new LinkedHashSet();
+ int numValues = 0;
+
+ // aggregate all routes and outlinks for a given url
+ while (values.hasNext()) {
+ ObjectWritable next = values.next();
+ Object value = next.get();
+ if (value instanceof Route) {
+ routeList.add((Route)WritableUtils.clone((Route)value, conf));
+ }
+ else if (value instanceof Text) {
+ String outlinkUrl = ((Text)value).toString();
+ if (!outlinkUrls.contains(outlinkUrl)) {
+ outlinkUrls.add(outlinkUrl);
+ }
+ }
+
+ // specify progress, could be a lot of routes
+ numValues++;
+ if (numValues % 100 == 0) {
+ reporter.progress();
+ }
+ }
+
+ // loop through the route list
+ Iterator routeIt = routeList.listIterator();
+ while (routeIt.hasNext()) {
+
+ // removing the route for space concerns, could be a lot of routes
+ // if the route is already found, meaning it is a loop just collect it
+ // urls with no outlinks that are not found will fall off
+ Route route = routeIt.next();
+ routeIt.remove();
+ if (route.isFound()) {
+ output.collect(key, route);
+ }
+ else {
+
+ // if the route start url is found, set route to found and collect
+ String lookingFor = route.getLookingFor();
+ if (outlinkUrls.contains(lookingFor)) {
+ route.setFound(true);
+ output.collect(key, route);
+ }
+ else if (!last) {
+
+ // setup for next pass through the loop
+ for (String outlink : outlinkUrls) {
+ output.collect(new Text(outlink), route);
+ }
+ }
+ }
+ }
+ }
+
+ public void close() {
+ }
+ }
+
+ /**
+ * Finishes the Loops job by aggregating and collecting and found routes.
+ */
+ public static class Finalizer
+ extends Configured
+ implements Mapper,
+ Reducer {
+
+ private JobConf conf;
+
+ /**
+ * Default constructor.
+ */
+ public Finalizer() {
+ }
+
+ /**
+ * Configurable constructor.
+ */
+ public Finalizer(Configuration conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Configures the job.
+ */
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Maps out and found routes, those will be the link cycles.
+ */
+ public void map(Text key, Route value, OutputCollector output,
+ Reporter reporter)
+ throws IOException {
+
+ if (value.isFound()) {
+ String lookingFor = value.getLookingFor();
+ output.collect(new Text(lookingFor), value);
+ }
+ }
+
+ /**
+ * Aggregates all found routes for a given start url into a loopset and
+ * collects the loopset.
+ */
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ LoopSet loops = new LoopSet();
+ while (values.hasNext()) {
+ Route route = values.next();
+ loops.getLoopSet().add(route.getOutlinkUrl());
+ }
+ output.collect(key, loops);
+ }
+
+ public void close() {
+ }
+ }
+
+ /**
+ * Runs the various loop jobs.
+ */
+ public void findLoops(Path webGraphDb)
+ throws IOException {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Loops: starting");
+ LOG.info("Loops: webgraphdb: " + webGraphDb);
+ }
+
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+ Path outlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR);
+ Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+ Path routes = new Path(webGraphDb, ROUTES_DIR);
+ Path tempRoute = new Path(webGraphDb, ROUTES_DIR + "-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ // run the initializer
+ JobConf init = new NutchJob(conf);
+ init.setJobName("Initializer: " + webGraphDb);
+ FileInputFormat.addInputPath(init, outlinkDb);
+ FileInputFormat.addInputPath(init, nodeDb);
+ init.setInputFormat(SequenceFileInputFormat.class);
+ init.setMapperClass(Initializer.class);
+ init.setReducerClass(Initializer.class);
+ init.setMapOutputKeyClass(Text.class);
+ init.setMapOutputValueClass(ObjectWritable.class);
+ init.setOutputKeyClass(Text.class);
+ init.setOutputValueClass(Route.class);
+ FileOutputFormat.setOutputPath(init, tempRoute);
+ init.setOutputFormat(SequenceFileOutputFormat.class);
+
+ try {
+ LOG.info("Initializer: running");
+ JobClient.runJob(init);
+ LOG.info("Initializer: installing " + routes);
+ FSUtils.replace(fs, routes, tempRoute, true);
+ LOG.info("Initializer: finished");
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ // run the loops job for a maxdepth, default 2, which will find a 3 link
+ // loop cycle
+ int depth = conf.getInt("link.loops.depth", 2);
+ for (int i = 0; i < depth; i++) {
+
+ JobConf looper = new NutchJob(conf);
+ looper.setJobName("Looper: " + (i + 1) + " of " + depth);
+ FileInputFormat.addInputPath(looper, outlinkDb);
+ FileInputFormat.addInputPath(looper, routes);
+ looper.setInputFormat(SequenceFileInputFormat.class);
+ looper.setMapperClass(Looper.class);
+ looper.setReducerClass(Looper.class);
+ looper.setMapOutputKeyClass(Text.class);
+ looper.setMapOutputValueClass(ObjectWritable.class);
+ looper.setOutputKeyClass(Text.class);
+ looper.setOutputValueClass(Route.class);
+ FileOutputFormat.setOutputPath(looper, tempRoute);
+ looper.setOutputFormat(SequenceFileOutputFormat.class);
+ looper.setBoolean("last", i == (depth - 1));
+
+ try {
+ LOG.info("Looper: running");
+ JobClient.runJob(looper);
+ LOG.info("Looper: installing " + routes);
+ FSUtils.replace(fs, routes, tempRoute, true);
+ LOG.info("Looper: finished");
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ }
+
+ // run the finalizer
+ JobConf finalizer = new NutchJob(conf);
+ finalizer.setJobName("Finalizer: " + webGraphDb);
+ FileInputFormat.addInputPath(finalizer, routes);
+ finalizer.setInputFormat(SequenceFileInputFormat.class);
+ finalizer.setMapperClass(Finalizer.class);
+ finalizer.setReducerClass(Finalizer.class);
+ finalizer.setMapOutputKeyClass(Text.class);
+ finalizer.setMapOutputValueClass(Route.class);
+ finalizer.setOutputKeyClass(Text.class);
+ finalizer.setOutputValueClass(LoopSet.class);
+ FileOutputFormat.setOutputPath(finalizer, new Path(webGraphDb, LOOPS_DIR));
+ finalizer.setOutputFormat(MapFileOutputFormat.class);
+
+ try {
+ LOG.info("Finalizer: running");
+ JobClient.runJob(finalizer);
+ LOG.info("Finalizer: finished");
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new Loops(), args);
+ System.exit(res);
+ }
+
+ /**
+ * Runs the Loops tool.
+ */
+ public int run(String[] args)
+ throws Exception {
+
+ Options options = new Options();
+ Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+ "show this help message").create("help");
+ Option webGraphDbOpts = OptionBuilder.withArgName("webgraphdb").hasArg().withDescription(
+ "the web graph database to use").create("webgraphdb");
+ options.addOption(helpOpts);
+ options.addOption(webGraphDbOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("Loops", options);
+ return -1;
+ }
+
+ String webGraphDb = line.getOptionValue("webgraphdb");
+ findLoops(new Path(webGraphDb));
+ return 0;
+ }
+ catch (Exception e) {
+ LOG.fatal("Loops: " + StringUtils.stringifyException(e));
+ return -2;
+ }
+ }
+}
\ No newline at end of file
Index: src/java/org/apache/nutch/scoring/webgraph/Node.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/Node.java (revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/Node.java (revision 0)
@@ -0,0 +1,89 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.nutch.metadata.Metadata;
+
+/**
+ * A class which holds the number of inlinks and outlinks for a given url along
+ * with an inlink score from a link analysis program and any metadata.
+ *
+ * The Node is the core unit of the NodeDb in the WebGraph.
+ */
+public class Node
+ implements Writable {
+
+ private int numInlinks = 0;
+ private int numOutlinks = 0;
+ private float inlinkScore = 1.0f;
+ private Metadata metadata = new Metadata();
+
+ public Node() {
+
+ }
+
+ public int getNumInlinks() {
+ return numInlinks;
+ }
+
+ public void setNumInlinks(int numInlinks) {
+ this.numInlinks = numInlinks;
+ }
+
+ public int getNumOutlinks() {
+ return numOutlinks;
+ }
+
+ public void setNumOutlinks(int numOutlinks) {
+ this.numOutlinks = numOutlinks;
+ }
+
+ public float getInlinkScore() {
+ return inlinkScore;
+ }
+
+ public void setInlinkScore(float inlinkScore) {
+ this.inlinkScore = inlinkScore;
+ }
+
+ public float getOutlinkScore() {
+ return (numOutlinks > 0) ? inlinkScore / numOutlinks : inlinkScore;
+ }
+
+ public Metadata getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(Metadata metadata) {
+ this.metadata = metadata;
+ }
+
+ public void readFields(DataInput in)
+ throws IOException {
+
+ numInlinks = in.readInt();
+ numOutlinks = in.readInt();
+ inlinkScore = in.readFloat();
+ metadata.clear();
+ metadata.readFields(in);
+ }
+
+ public void write(DataOutput out)
+ throws IOException {
+
+ out.writeInt(numInlinks);
+ out.writeInt(numOutlinks);
+ out.writeFloat(inlinkScore);
+ metadata.write(out);
+ }
+
+ public String toString() {
+ return "num inlinks: " + numInlinks + ", num outlinks: " + numOutlinks
+ + ", inlink score: " + inlinkScore + ", outlink score: "
+ + getOutlinkScore() + ", metadata: " + metadata.toString();
+ }
+
+}
Index: src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java (revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java (revision 0)
@@ -0,0 +1,248 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/**
+ * A tools that dumps out the top urls by number of inlinks, number of outlinks,
+ * or by score, to a text file. One of the major uses of this tool is to check
+ * the top scoring urls of a link analysis program such as LinkRank.
+ *
+ * For number of inlinks or number of outlinks the WebGraph program will need to
+ * have been run. For link analysis score a program such as LinkRank will need
+ * to have been run which updates the NodeDb of the WebGraph.
+ */
+public class NodeDumper
+ extends Configured
+ implements Tool {
+
+ public static final Log LOG = LogFactory.getLog(NodeDumper.class);
+
+ private static enum DumpType {
+ INLINKS,
+ OUTLINKS,
+ SCORES
+ }
+
+ /**
+ * Outputs the top urls sorted in descending order. Depending on the flag set
+ * on the command line, the top urls could be for number of inlinks, for
+ * number of outlinks, or for link analysis score.
+ */
+ public static class Sorter
+ extends Configured
+ implements Mapper,
+ Reducer {
+
+ private JobConf conf;
+ private boolean inlinks = false;
+ private boolean outlinks = false;
+ private boolean scores = false;
+ private long topn = Long.MAX_VALUE;
+
+ /**
+ * Configures the job, sets the flag for type of content and the topN number
+ * if any.
+ */
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ this.inlinks = conf.getBoolean("inlinks", false);
+ this.outlinks = conf.getBoolean("outlinks", false);
+ this.scores = conf.getBoolean("scores", true);
+ this.topn = conf.getLong("topn", Long.MAX_VALUE);
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Outputs the url with the appropriate number of inlinks, outlinks, or for
+ * score.
+ */
+ public void map(Text key, Node node,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ float number = 0;
+ if (inlinks) {
+ number = node.getNumInlinks();
+ }
+ else if (outlinks) {
+ number = node.getNumOutlinks();
+ }
+ else {
+ number = node.getInlinkScore();
+ }
+
+ // number collected with negative to be descending
+ output.collect(new FloatWritable(-number), key);
+ }
+
+ /**
+ * Flips and collects the url and numeric sort value.
+ */
+ public void reduce(FloatWritable key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ // take the negative of the negative to get original value, sometimes 0
+ // value are a little weird
+ float val = key.get();
+ FloatWritable number = new FloatWritable(val == 0 ? 0 : -val);
+ long numCollected = 0;
+
+ // collect all values, this time with the url as key
+ while (values.hasNext() && (numCollected < topn)) {
+ Text url = (Text)WritableUtils.clone(values.next(), conf);
+ output.collect(url, number);
+ numCollected++;
+ }
+ }
+ }
+
+ /**
+ * Runs the process to dump the top urls out to a text file.
+ *
+ * @param webGraphDb The WebGraph from which to pull values.
+ *
+ * @param inlinks
+ * @param outlinks
+ * @param scores
+ * @param topN
+ * @param output
+ *
+ * @throws IOException If an error occurs while dumping the top values.
+ */
+ public void dumpNodes(Path webGraphDb, DumpType type, long topN, Path output)
+ throws IOException {
+
+ LOG.info("NodeDumper: starting");
+ Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+ Configuration conf = getConf();
+
+ JobConf dumper = new NutchJob(conf);
+ dumper.setJobName("NodeDumper: " + webGraphDb);
+ FileInputFormat.addInputPath(dumper, nodeDb);
+ dumper.setInputFormat(SequenceFileInputFormat.class);
+ dumper.setMapperClass(Sorter.class);
+ dumper.setReducerClass(Sorter.class);
+ dumper.setMapOutputKeyClass(FloatWritable.class);
+ dumper.setMapOutputValueClass(Text.class);
+ dumper.setOutputKeyClass(Text.class);
+ dumper.setOutputValueClass(FloatWritable.class);
+ FileOutputFormat.setOutputPath(dumper, output);
+ dumper.setOutputFormat(TextOutputFormat.class);
+ dumper.setNumReduceTasks(1);
+ dumper.setBoolean("inlinks", type == DumpType.INLINKS);
+ dumper.setBoolean("outlinks", type == DumpType.OUTLINKS);
+ dumper.setBoolean("scores", type == DumpType.SCORES);
+ dumper.setLong("topn", topN);
+
+ try {
+ LOG.info("NodeDumper: running");
+ JobClient.runJob(dumper);
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new NodeDumper(),
+ args);
+ System.exit(res);
+ }
+
+ /**
+ * Runs the node dumper tool.
+ */
+ public int run(String[] args)
+ throws Exception {
+
+ Options options = new Options();
+ Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+ "show this help message").create("help");
+ Option webGraphDbOpts = OptionBuilder.withArgName("webgraphdb").hasArg().withDescription(
+ "the web graph database to use").create("webgraphdb");
+ Option inlinkOpts = OptionBuilder.withArgName("inlinks").withDescription(
+ "show highest inlinks").create("inlinks");
+ Option outlinkOpts = OptionBuilder.withArgName("outlinks").withDescription(
+ "show highest outlinks").create("outlinks");
+ Option scoreOpts = OptionBuilder.withArgName("scores").hasOptionalArg().withDescription(
+ "show highest scores").create("scores");
+ Option topNOpts = OptionBuilder.withArgName("topn").hasOptionalArg().withDescription(
+ "show topN scores").create("topn");
+ Option outputOpts = OptionBuilder.withArgName("output").hasArg().withDescription(
+ "the output directory to use").create("output");
+ options.addOption(helpOpts);
+ options.addOption(webGraphDbOpts);
+ options.addOption(inlinkOpts);
+ options.addOption(outlinkOpts);
+ options.addOption(scoreOpts);
+ options.addOption(topNOpts);
+ options.addOption(outputOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("NodeDumper", options);
+ return -1;
+ }
+
+ String webGraphDb = line.getOptionValue("webgraphdb");
+ boolean inlinks = line.hasOption("inlinks");
+ boolean outlinks = line.hasOption("outlinks");
+ boolean scores = line.hasOption("scores");
+ long topN = (line.hasOption("topn")
+ ? Long.parseLong(line.getOptionValue("topn")) : Long.MAX_VALUE);
+
+ // get the correct dump type
+ String output = line.getOptionValue("output");
+ DumpType type = (inlinks ? DumpType.INLINKS : outlinks
+ ? DumpType.OUTLINKS : DumpType.SCORES);
+
+ dumpNodes(new Path(webGraphDb), type, topN, new Path(output));
+ return 0;
+ }
+ catch (Exception e) {
+ LOG.fatal("NodeDumper: " + StringUtils.stringifyException(e));
+ return -2;
+ }
+ }
+}
Index: src/java/org/apache/nutch/scoring/webgraph/NodeReader.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/NodeReader.java (revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/NodeReader.java (revision 0)
@@ -0,0 +1,106 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+
+/**
+ * Reads and prints to system out information for a single node from the NodeDb
+ * in the WebGraph.
+ */
+public class NodeReader {
+
+ private Configuration conf;
+ private FileSystem fs;
+ private MapFile.Reader[] nodeReaders;
+
+ /**
+ * Prints the content of the Node represented by the url to system out.
+ *
+ * @param webGraphDb The webgraph from which to get the node.
+ * @param url The url of the node.
+ *
+ * @throws IOException If an error occurs while getting the node.
+ */
+ public void dumpUrl(Path webGraphDb, String url)
+ throws IOException {
+
+ conf = NutchConfiguration.create();
+ fs = FileSystem.get(conf);
+ nodeReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb,
+ WebGraph.NODE_DIR), conf);
+
+ // open the readers, get the node, print out the info, and close the readers
+ Text key = new Text(url);
+ Node node = new Node();
+ MapFileOutputFormat.getEntry(nodeReaders,
+ new HashPartitioner(), key, node);
+ System.out.println(url + ":");
+ System.out.println(" inlink score: " + node.getInlinkScore());
+ System.out.println(" outlink score: " + node.getOutlinkScore());
+ System.out.println(" num inlinks: " + node.getNumInlinks());
+ System.out.println(" num outlinks: " + node.getNumOutlinks());
+ FSUtils.closeReaders(nodeReaders);
+ }
+
+ /**
+ * Runs the NodeReader tool. The command line arguments must contain a
+ * webgraphdb path and a url. The url must match the normalized url that is
+ * contained in the NodeDb of the WebGraph.
+ */
+ public static void main(String[] args)
+ throws Exception {
+
+ Options options = new Options();
+ Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+ "show this help message").create("help");
+ Option webGraphOpts = OptionBuilder.withArgName("webgraphdb").hasArg()
+ .withDescription("the webgraphdb to use").create("webgraphdb");
+ Option urlOpts = OptionBuilder.withArgName("url").hasOptionalArg()
+ .withDescription("the url to dump").create("url");
+ options.addOption(helpOpts);
+ options.addOption(webGraphOpts);
+ options.addOption(urlOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ // command line must take a webgraphdb and a url
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")
+ || !line.hasOption("url")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("WebGraphReader", options);
+ return;
+ }
+
+ // dump the values to system out and return
+ String webGraphDb = line.getOptionValue("webgraphdb");
+ String url = line.getOptionValue("url");
+ NodeReader reader = new NodeReader();
+ reader.dumpUrl(new Path(webGraphDb), url);
+
+ return;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+
+}
\ No newline at end of file
Index: src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java (revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java (revision 0)
@@ -0,0 +1,226 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/**
+ * Updates the score from the WebGraph node database into the crawl database.
+ * Any score that is not in the node database is set to the clear score in the
+ * crawl database.
+ */
+public class ScoreUpdater
+ extends Configured
+ implements Tool, Mapper,
+ Reducer {
+
+ public static final Log LOG = LogFactory.getLog(ScoreUpdater.class);
+
+ private JobConf conf;
+ private float clearScore = 0.0f;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ clearScore = conf.getFloat("link.score.updater.clear.score", 0.0f);
+ }
+
+ /**
+ * Changes input into ObjectWritables.
+ */
+ public void map(Text key, Writable value,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ ObjectWritable objWrite = new ObjectWritable();
+ objWrite.set(value);
+ output.collect(key, objWrite);
+ }
+
+ /**
+ * Creates new CrawlDatum objects with the updated score from the NodeDb or
+ * with a cleared score.
+ */
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ String url = key.toString();
+ Node node = null;
+ CrawlDatum datum = null;
+
+ // set the node and the crawl datum, should be one of each unless no node
+ // for url in the crawldb
+ while (values.hasNext()) {
+ ObjectWritable next = values.next();
+ Object value = next.get();
+ if (value instanceof Node) {
+ node = (Node)value;
+ }
+ else if (value instanceof CrawlDatum) {
+ datum = (CrawlDatum)value;
+ }
+ }
+
+ // datum should never be null, could happen if somehow the url was
+ // normalized or changed after being pulled from the crawldb
+ if (datum != null) {
+
+ if (node != null) {
+
+ // set the inlink score in the nodedb
+ float inlinkScore = node.getInlinkScore();
+ datum.setScore(inlinkScore);
+ LOG.debug(url + ": setting to score " + inlinkScore);
+ }
+ else {
+
+ // clear out the score in the crawldb
+ datum.setScore(clearScore);
+ LOG.debug(url + ": setting to clear score of " + clearScore);
+ }
+
+ output.collect(key, datum);
+ }
+ else {
+ LOG.debug(url + ": no datum");
+ }
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Updates the inlink score in the web graph node databsae into the crawl
+ * database.
+ *
+ * @param crawlDb The crawl database to update
+ * @param webGraphDb The webgraph database to use.
+ *
+ * @throws IOException If an error occurs while updating the scores.
+ */
+ public void update(Path crawlDb, Path webGraphDb)
+ throws IOException {
+
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ // create a temporary crawldb with the new scores
+ LOG.info("Running crawldb update " + crawlDb);
+ Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+ Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME);
+ Path newCrawlDb = new Path(crawlDb,
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ // run the updater job outputting to the temp crawl database
+ JobConf updater = new NutchJob(conf);
+ updater.setJobName("Update CrawlDb from WebGraph");
+ FileInputFormat.addInputPath(updater, crawlDbCurrent);
+ FileInputFormat.addInputPath(updater, nodeDb);
+ FileOutputFormat.setOutputPath(updater, newCrawlDb);
+ updater.setInputFormat(SequenceFileInputFormat.class);
+ updater.setMapperClass(ScoreUpdater.class);
+ updater.setReducerClass(ScoreUpdater.class);
+ updater.setMapOutputKeyClass(Text.class);
+ updater.setMapOutputValueClass(ObjectWritable.class);
+ updater.setOutputKeyClass(Text.class);
+ updater.setOutputValueClass(CrawlDatum.class);
+ updater.setOutputFormat(MapFileOutputFormat.class);
+
+ try {
+ JobClient.runJob(updater);
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+
+ // remove the temp crawldb on error
+ if (fs.exists(newCrawlDb)) {
+ fs.delete(newCrawlDb, true);
+ }
+ throw e;
+ }
+
+ // install the temp crawl database
+ LOG.info("Installing new crawldb " + crawlDb);
+ CrawlDb.install(updater, crawlDb);
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new ScoreUpdater(),
+ args);
+ System.exit(res);
+ }
+
+ /**
+ * Runs the ScoreUpdater tool.
+ */
+ public int run(String[] args)
+ throws Exception {
+
+ Options options = new Options();
+ Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+ "show this help message").create("help");
+ Option crawlDbOpts = OptionBuilder.withArgName("crawldb").hasArg().withDescription(
+ "the crawldb to use").create("crawldb");
+ Option webGraphOpts = OptionBuilder.withArgName("webgraphdb").hasArg().withDescription(
+ "the webgraphdb to use").create("webgraphdb");
+ options.addOption(helpOpts);
+ options.addOption(crawlDbOpts);
+ options.addOption(webGraphOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")
+ || !line.hasOption("crawldb")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("ScoreUpdater", options);
+ return -1;
+ }
+
+ String crawlDb = line.getOptionValue("crawldb");
+ String webGraphDb = line.getOptionValue("webgraphdb");
+ update(new Path(crawlDb), new Path(webGraphDb));
+ return 0;
+ }
+ catch (Exception e) {
+ LOG.fatal("ScoreUpdater: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+}
Index: src/java/org/apache/nutch/scoring/webgraph/WebGraph.java
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/WebGraph.java (revision 0)
+++ src/java/org/apache/nutch/scoring/webgraph/WebGraph.java (revision 0)
@@ -0,0 +1,629 @@
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Creates three databases, one for inlinks, one for outlinks, and a node
+ * database that holds the number of in and outlinks to a url and the current
+ * score for the url.
+ *
+ * The score is set by an analysis program such as LinkRank. The WebGraph is an
+ * update-able database. Outlinks are stored by their fetch time or by the
+ * current system time if no fetch time is available. Only the most recent
+ * version of outlinks for a given url is stored. As more crawls are executed
+ * and the WebGraph updated, newer Outlinks will replace older Outlinks. This
+ * allows the WebGraph to adapt to changes in the link structure of the web.
+ *
+ * The Inlink database is created from the Outlink database and is regenerated
+ * when the WebGraph is updated. The Node database is created from both the
+ * Inlink and Outlink databases. Because the Node database is overwritten when
+ * the WebGraph is updated and because the Node database holds current scores
+ * for urls it is recommended that a crawl-cyle (one or more full crawls) fully
+ * complete before the WebGraph is updated and some type of analysis, such as
+ * LinkRank, is run to update scores in the Node database in a stable fashion.
+ */
+public class WebGraph
+ extends Configured
+ implements Tool {
+
+ public static final Log LOG = LogFactory.getLog(WebGraph.class);
+ public static final String LOCK_NAME = ".locked";
+ public static final String INLINK_DIR = "inlinks";
+ public static final String OUTLINK_DIR = "outlinks";
+ public static final String NODE_DIR = "nodes";
+
+ /**
+ * The OutlinkDb creates a database of all outlinks. Outlinks to internal urls
+ * by domain and host can be ignored. The number of Outlinks out to a given
+ * page or domain can also be limited.
+ */
+ public static class OutlinkDb
+ extends Configured
+ implements Mapper,
+ Reducer {
+
+ // ignoring internal domains, internal hosts
+ private boolean ignoreDomain = true;
+ private boolean ignoreHost = true;
+
+ // limiting urls out to a page or to a domain
+ private boolean limitPages = true;
+ private boolean limitDomains = true;
+
+ // url normalizers and job configuration
+ private URLNormalizers urlNormalizers;
+ private JobConf conf;
+
+ /**
+ * Normalizes and trims extra whitespace from the given url.
+ *
+ * @param url The url to normalize.
+ *
+ * @return The normalized url.
+ */
+ private String normalizeUrl(String url) {
+
+ String normalized = null;
+ if (urlNormalizers != null) {
+ try {
+
+ // normalize and trim the url
+ normalized = urlNormalizers.normalize(url,
+ URLNormalizers.SCOPE_DEFAULT);
+ normalized = normalized.trim();
+ }
+ catch (Exception e) {
+ LOG.warn("Skipping " + url + ":" + e);
+ normalized = null;
+ }
+ }
+ return normalized;
+ }
+
+ /**
+ * Returns the fetch time from the parse data or the current system time if
+ * the fetch time doesn't exist.
+ *
+ * @param data The parse data.
+ *
+ * @return The fetch time as a long.
+ */
+ private long getFetchTime(ParseData data) {
+
+ // default to current system time
+ long fetchTime = System.currentTimeMillis();
+ String fetchTimeStr = data.getContentMeta().get(Nutch.FETCH_TIME_KEY);
+ try {
+
+ // get the fetch time from the parse data
+ fetchTime = Long.parseLong(fetchTimeStr);
+ }
+ catch (Exception e) {
+ fetchTime = System.currentTimeMillis();
+ }
+ return fetchTime;
+ }
+
+ /**
+ * Default constructor.
+ */
+ public OutlinkDb() {
+ }
+
+ /**
+ * Configurable constructor.
+ */
+ public OutlinkDb(Configuration conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Configures the OutlinkDb job. Sets up internal links and link limiting.
+ */
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ ignoreHost = conf.getBoolean("link.ignore.internal.host", true);
+ ignoreDomain = conf.getBoolean("link.ignore.internal.domain", true);
+ limitPages = conf.getBoolean("link.ignore.limit.page", true);
+ limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
+ urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
+ }
+
+ /**
+ * Passes through existing LinkDatum objects from an existing OutlinkDb and
+ * maps out new LinkDatum objects from new crawls ParseData.
+ */
+ public void map(Text key, Writable value,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ // normalize url, stop processing if null
+ String url = normalizeUrl(key.toString());
+ if (url == null) {
+ return;
+ }
+
+ if (value instanceof ParseData) {
+
+ // get the parse data and the outlinks from the parse data, along with
+ // the fetch time for those links
+ ParseData data = (ParseData)value;
+ long fetchTime = getFetchTime(data);
+ Outlink[] outlinkAr = data.getOutlinks();
+ Map outlinkMap = new LinkedHashMap();
+
+ // normalize urls and put into map
+ if (outlinkAr != null && outlinkAr.length > 0) {
+ for (int i = 0; i < outlinkAr.length; i++) {
+ Outlink outlink = outlinkAr[i];
+ String toUrl = normalizeUrl(outlink.getToUrl());
+
+ // only put into map if the url doesn't already exist in the map or
+ // if it does and the anchor for that link is null, will replace if
+ // url is existing
+ boolean existingUrl = outlinkMap.containsKey(toUrl);
+ if (toUrl != null
+ && (!existingUrl || (existingUrl && outlinkMap.get(toUrl) == null))) {
+ outlinkMap.put(toUrl, outlink.getAnchor());
+ }
+ }
+ }
+
+ // collect the outlinks under the fetch time
+ for (String outlinkUrl : outlinkMap.keySet()) {
+ String anchor = outlinkMap.get(outlinkUrl);
+ LinkDatum datum = new LinkDatum(outlinkUrl, anchor, fetchTime);
+ output.collect(key, datum);
+ }
+ }
+ else if (value instanceof LinkDatum) {
+
+ // collect existing outlinks from existing OutlinkDb
+ output.collect(key, (LinkDatum)value);
+ }
+ }
+
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ // aggregate all outlinks, get the most recent timestamp for a fetch
+ // which should be the timestamp for all of the most recent outlinks
+ long mostRecent = 0L;
+ List outlinkList = new ArrayList();
+ while (values.hasNext()) {
+
+ // loop through, change out most recent timestamp if needed
+ LinkDatum next = values.next();
+ long timestamp = next.getTimestamp();
+ if (mostRecent == 0L || mostRecent < timestamp) {
+ mostRecent = timestamp;
+ }
+ outlinkList.add((LinkDatum)WritableUtils.clone(next, conf));
+ }
+
+ // get the url, domain, and host for the url
+ String url = key.toString();
+ String domain = URLUtil.getDomainName(url);
+ String host = URLUtil.getHost(url);
+
+ // setup checking sets for domains and pages
+ Set domains = new HashSet();
+ Set pages = new HashSet();
+
+ // loop through the link datums
+ for (LinkDatum datum : outlinkList) {
+
+ // get the url, host, domain, and page for each outlink
+ String toUrl = datum.getUrl();
+ String toDomain = URLUtil.getDomainName(toUrl);
+ String toHost = URLUtil.getHost(toUrl);
+ String toPage = URLUtil.getPage(toUrl);
+ datum.setLinkType(LinkDatum.OUTLINK);
+
+ // outlinks must be the most recent and conform to internal url and
+ // limiting rules, if it does collect it
+ if (datum.getTimestamp() == mostRecent
+ && (!limitPages || (limitPages && !pages.contains(toPage)))
+ && (!limitDomains || (limitDomains && !domains.contains(toDomain)))
+ && (!ignoreHost || (ignoreHost && !toHost.equalsIgnoreCase(host)))
+ && (!ignoreDomain || (ignoreDomain && !toDomain.equalsIgnoreCase(domain)))) {
+ output.collect(key, datum);
+ pages.add(toPage);
+ domains.add(toDomain);
+ }
+ }
+ }
+
+ public void close() {
+ }
+ }
+
+ /**
+ * The InlinkDb creates a database of Inlinks. Inlinks are inverted from the
+ * OutlinkDb LinkDatum objects and are regenerated each time the WebGraph is
+ * updated.
+ */
+ private static class InlinkDb
+ extends Configured
+ implements Mapper {
+
+ private JobConf conf;
+ private long timestamp;
+
+ /**
+ * Default constructor.
+ */
+ public InlinkDb() {
+ }
+
+ /**
+ * Configurable constructor.
+ */
+ public InlinkDb(Configuration conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Configures job. Sets timestamp for all Inlink LinkDatum objects to the
+ * current system time.
+ */
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ timestamp = System.currentTimeMillis();
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Inverts the Outlink LinkDatum objects into new LinkDatum objects with a
+ * new system timestamp, type and to and from url switched.
+ */
+ public void map(Text key, LinkDatum datum,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ // get the to and from url and the anchor
+ String fromUrl = key.toString();
+ String toUrl = datum.getUrl();
+ String anchor = datum.getAnchor();
+
+ // flip the from and to url and set the new link type
+ LinkDatum inlink = new LinkDatum(fromUrl, anchor, timestamp);
+ inlink.setLinkType(LinkDatum.INLINK);
+ output.collect(new Text(toUrl), inlink);
+ }
+ }
+
+ /**
+ * Creates the Node database which consists of the number of in and outlinks
+ * for each url and a score slot for analysis programs such as LinkRank.
+ */
+ private static class NodeDb
+ extends Configured
+ implements Reducer {
+
+ private JobConf conf;
+
+ /**
+ * Default constructor.
+ */
+ public NodeDb() {
+ }
+
+ /**
+ * Configurable constructor.
+ */
+ public NodeDb(Configuration conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Configures job.
+ */
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Counts the number of inlinks and outlinks for each url and sets a default
+ * score of 0.0 for each url (node) in the webgraph.
+ */
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ Node node = new Node();
+ int numInlinks = 0;
+ int numOutlinks = 0;
+
+ // loop through counting number of in and out links
+ while (values.hasNext()) {
+ LinkDatum next = values.next();
+ if (next.getLinkType() == LinkDatum.INLINK) {
+ numInlinks++;
+ }
+ else if (next.getLinkType() == LinkDatum.OUTLINK) {
+ numOutlinks++;
+ }
+ }
+
+ // set the in and outlinks and a default score of 0
+ node.setNumInlinks(numInlinks);
+ node.setNumOutlinks(numOutlinks);
+ node.setInlinkScore(0.0f);
+ output.collect(key, node);
+ }
+ }
+
+ /**
+ * Creates the three different WebGraph databases, Outlinks, Inlinks, and
+ * Node. If a current WebGraph exists then it is updated, if it doesn't exist
+ * then a new WebGraph database is created.
+ *
+ * @param webGraphDb The WebGraph to create or update.
+ * @param segments The array of segments used to update the WebGraph. Newer
+ * segments and fetch times will overwrite older segments.
+ *
+ * @throws IOException If an error occurs while processing the WebGraph.
+ */
+ public void createWebGraph(Path webGraphDb, Path[] segments)
+ throws IOException {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("WebGraphDb: starting");
+ LOG.info("WebGraphDb: webgraphdb: " + webGraphDb);
+ }
+
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ // lock an existing webgraphdb to prevent multiple simultaneous updates
+ Path lock = new Path(webGraphDb, LOCK_NAME);
+ boolean webGraphDbExists = fs.exists(webGraphDb);
+ if (webGraphDbExists) {
+ LockUtil.createLockFile(fs, lock, false);
+ }
+ else {
+
+ // if the webgraph doesn't exist, create it
+ fs.mkdirs(webGraphDb);
+ }
+
+ // outlink and temp outlink database paths
+ Path outlinkDb = new Path(webGraphDb, OUTLINK_DIR);
+ Path tempOutlinkDb = new Path(outlinkDb + "-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+ JobConf outlinkJob = new NutchJob(conf);
+ outlinkJob.setJobName("Outlinkdb: " + outlinkDb);
+
+ // get the parse data for all segments
+ if (segments != null) {
+ for (int i = 0; i < segments.length; i++) {
+ Path parseData = new Path(segments[i], ParseData.DIR_NAME);
+ if (fs.exists(parseData)) {
+ LOG.info("OutlinkDb: adding input: " + parseData);
+ FileInputFormat.addInputPath(outlinkJob, parseData);
+ }
+ }
+ }
+
+ // add the existing webgraph
+ if (webGraphDbExists) {
+ LOG.info("OutlinkDb: adding input: " + outlinkDb);
+ FileInputFormat.addInputPath(outlinkJob, outlinkDb);
+ }
+
+ outlinkJob.setInputFormat(SequenceFileInputFormat.class);
+ outlinkJob.setMapperClass(OutlinkDb.class);
+ outlinkJob.setReducerClass(OutlinkDb.class);
+ outlinkJob.setMapOutputKeyClass(Text.class);
+ outlinkJob.setMapOutputValueClass(LinkDatum.class);
+ outlinkJob.setOutputKeyClass(Text.class);
+ outlinkJob.setOutputValueClass(LinkDatum.class);
+ FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb);
+ outlinkJob.setOutputFormat(MapFileOutputFormat.class);
+
+ // run the outlinkdb job and replace any old outlinkdb with the new one
+ try {
+ LOG.info("OutlinkDb: running");
+ JobClient.runJob(outlinkJob);
+ LOG.info("OutlinkDb: installing " + outlinkDb);
+ FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true);
+ LOG.info("OutlinkDb: finished");
+ }
+ catch (IOException e) {
+
+ // remove lock file and and temporary directory if an error occurs
+ LockUtil.removeLockFile(fs, lock);
+ if (fs.exists(tempOutlinkDb)) {
+ fs.delete(tempOutlinkDb, true);
+ }
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ // inlink and temp link database paths
+ Path inlinkDb = new Path(webGraphDb, INLINK_DIR);
+ Path tempInlinkDb = new Path(inlinkDb + "-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf inlinkJob = new NutchJob(conf);
+ inlinkJob.setJobName("Inlinkdb " + inlinkDb);
+ LOG.info("InlinkDb: adding input: " + outlinkDb);
+ FileInputFormat.addInputPath(inlinkJob, outlinkDb);
+ inlinkJob.setInputFormat(SequenceFileInputFormat.class);
+ inlinkJob.setMapperClass(InlinkDb.class);
+ inlinkJob.setMapOutputKeyClass(Text.class);
+ inlinkJob.setMapOutputValueClass(LinkDatum.class);
+ inlinkJob.setOutputKeyClass(Text.class);
+ inlinkJob.setOutputValueClass(LinkDatum.class);
+ FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb);
+ inlinkJob.setOutputFormat(MapFileOutputFormat.class);
+
+ try {
+
+ // run the inlink and replace any old with new
+ LOG.info("InlinkDb: running");
+ JobClient.runJob(inlinkJob);
+ LOG.info("InlinkDb: installing " + inlinkDb);
+ FSUtils.replace(fs, inlinkDb, tempInlinkDb, true);
+ LOG.info("InlinkDb: finished");
+ }
+ catch (IOException e) {
+
+ // remove lock file and and temporary directory if an error occurs
+ LockUtil.removeLockFile(fs, lock);
+ if (fs.exists(tempInlinkDb)) {
+ fs.delete(tempInlinkDb, true);
+ }
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ // node and temp node database paths
+ Path nodeDb = new Path(webGraphDb, NODE_DIR);
+ Path tempNodeDb = new Path(nodeDb + "-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf nodeJob = new NutchJob(conf);
+ nodeJob.setJobName("NodeDb " + nodeDb);
+ LOG.info("NodeDb: adding input: " + outlinkDb);
+ LOG.info("NodeDb: adding input: " + inlinkDb);
+ FileInputFormat.addInputPath(nodeJob, outlinkDb);
+ FileInputFormat.addInputPath(nodeJob, inlinkDb);
+ nodeJob.setInputFormat(SequenceFileInputFormat.class);
+ nodeJob.setReducerClass(NodeDb.class);
+ nodeJob.setMapOutputKeyClass(Text.class);
+ nodeJob.setMapOutputValueClass(LinkDatum.class);
+ nodeJob.setOutputKeyClass(Text.class);
+ nodeJob.setOutputValueClass(Node.class);
+ FileOutputFormat.setOutputPath(nodeJob, tempNodeDb);
+ nodeJob.setOutputFormat(MapFileOutputFormat.class);
+
+ try {
+
+ // run the node job and replace old nodedb with new
+ LOG.info("NodeDb: running");
+ JobClient.runJob(nodeJob);
+ LOG.info("NodeDb: installing " + nodeDb);
+ FSUtils.replace(fs, nodeDb, tempNodeDb, true);
+ LOG.info("NodeDb: finished");
+ }
+ catch (IOException e) {
+
+ // remove lock file and and temporary directory if an error occurs
+ LockUtil.removeLockFile(fs, lock);
+ if (fs.exists(tempNodeDb)) {
+ fs.delete(tempNodeDb, true);
+ }
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ // remove the lock file for the webgraph
+ LockUtil.removeLockFile(fs, lock);
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new WebGraph(), args);
+ System.exit(res);
+ }
+
+ /**
+ * Parses command link arguments and runs the WebGraph jobs.
+ */
+ public int run(String[] args)
+ throws Exception {
+
+ Options options = new Options();
+ Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+ "show this help message").create("help");
+ Option webGraphDbOpts = OptionBuilder.withArgName("webgraphdb").hasArg().withDescription(
+ "the web graph database to use").create("webgraphdb");
+ Option segOpts = OptionBuilder.withArgName("segment").hasArgs().withDescription(
+ "the segment(s) to use").create("segment");
+ options.addOption(helpOpts);
+ options.addOption(webGraphDbOpts);
+ options.addOption(segOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")
+ || !line.hasOption("segment")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("WebGraph", options);
+ return -1;
+ }
+
+ String webGraphDb = line.getOptionValue("webgraphdb");
+ String[] segments = line.getOptionValues("segment");
+ Path[] segPaths = new Path[segments.length];
+ for (int i = 0; i < segments.length; i++) {
+ segPaths[i] = new Path(segments[i]);
+ }
+
+ createWebGraph(new Path(webGraphDb), segPaths);
+ return 0;
+ }
+ catch (Exception e) {
+ LOG.fatal("WebGraph: " + StringUtils.stringifyException(e));
+ return -2;
+ }
+ }
+
+}
Index: src/java/org/apache/nutch/tools/compat/ReprUrlFixer.java
===================================================================
--- src/java/org/apache/nutch/tools/compat/ReprUrlFixer.java (revision 0)
+++ src/java/org/apache/nutch/tools/compat/ReprUrlFixer.java (revision 0)
@@ -0,0 +1,296 @@
+package org.apache.nutch.tools.compat;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.MapWritable;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.scoring.webgraph.Node;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ *
+ * Significant changes were made to representative url logic used for redirects.
+ * This tool will fix representative urls stored in current segments and crawl
+ * databases. Any new fetches will use the new representative url logic.
+ *
+ *
+ *
+ * All crawl datums are assumed to be temp url redirects. While this may cause
+ * some urls to be incorrectly removed, this tool is a temporary measure to be
+ * used until fetches can be rerun. This reduce logic is the same for segments
+ * fetch and parse directory as well as for existing crawl databases.
+ *
+ */
+public class ReprUrlFixer
+ extends Configured
+ implements Tool, Reducer {
+
+ public static final Log LOG = LogFactory.getLog(ReprUrlFixer.class);
+ private JobConf conf;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Runs the new ReprUrl logic on all crawldatums.
+ */
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ String url = key.toString();
+ Node node = null;
+ List datums = new ArrayList();
+
+ // get all crawl datums for a given url key, fetch for instance can have
+ // more than one under a given key if there are multiple redirects to a
+ // given url
+ while (values.hasNext()) {
+ CrawlDatum datum = values.next();
+ datums.add((CrawlDatum)WritableUtils.clone(datum, conf));
+ }
+
+ // apply redirect repr url logic for each datum
+ for (CrawlDatum datum : datums) {
+
+ MapWritable metadata = datum.getMetaData();
+ Text reprUrl = (Text)metadata.get(Nutch.WRITABLE_REPR_URL_KEY);
+ byte status = datum.getStatus();
+ boolean isCrawlDb = (CrawlDatum.hasDbStatus(datum));
+ boolean segFetched = (status == CrawlDatum.STATUS_FETCH_SUCCESS);
+
+ // only if the crawl datum is from the crawldb or is a successfully
+ // fetched page from the segments
+ if ((isCrawlDb || segFetched) && reprUrl != null) {
+
+ String src = reprUrl.toString();
+ String dest = url;
+ URL srcUrl = null;
+ URL dstUrl = null;
+
+ // both need to be well formed urls
+ try {
+ srcUrl = new URL(src);
+ dstUrl = new URL(url);
+ }
+ catch (MalformedURLException e) {
+ }
+
+ // if the src and repr urls are the same after the new logic then
+ // remove the repr url from the metadata as it is no longer needed
+ if (srcUrl != null && dstUrl != null) {
+ String reprOut = URLUtil.chooseRepr(src, dest, true);
+ if (reprOut.equals(dest)) {
+ LOG.info("Removing " + reprOut + " from " + dest);
+ metadata.remove(Nutch.WRITABLE_REPR_URL_KEY);
+ }
+ }
+ }
+
+ // collect each datum
+ output.collect(key, datum);
+ }
+
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Run the fixer on any crawl database and segments specified.
+ */
+ public void update(Path crawlDb, Path[] segments)
+ throws IOException {
+
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ // run the crawl database through the repr fixer
+ if (crawlDb != null) {
+
+ LOG.info("Running ReprUtilFixer " + crawlDb);
+ Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME);
+ Path newCrawlDb = new Path(crawlDb,
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf updater = new NutchJob(conf);
+ updater.setJobName("ReprUtilFixer: " + crawlDb.toString());
+ FileInputFormat.addInputPath(updater, crawlDbCurrent);
+ FileOutputFormat.setOutputPath(updater, newCrawlDb);
+ updater.setInputFormat(SequenceFileInputFormat.class);
+ updater.setReducerClass(ReprUrlFixer.class);
+ updater.setOutputKeyClass(Text.class);
+ updater.setOutputValueClass(CrawlDatum.class);
+ updater.setOutputFormat(MapFileOutputFormat.class);
+
+ try {
+ JobClient.runJob(updater);
+ LOG.info("Installing new crawldb " + crawlDb);
+ CrawlDb.install(updater, crawlDb);
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ }
+
+ // run the segments through the repr fixer, logic will be run on both the
+ // crawl_parse and the crawl_fetch directories for every segment specified
+ if (segments != null) {
+
+ for (int i = 0; i < segments.length; i++) {
+
+ Path segment = segments[i];
+ LOG.info("Running ReprUtilFixer " + segment + " fetch");
+ Path segFetch = new Path(segment, CrawlDatum.FETCH_DIR_NAME);
+ Path newSegFetch = new Path(segment, CrawlDatum.FETCH_DIR_NAME + "-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf fetch = new NutchJob(conf);
+ fetch.setJobName("ReprUtilFixer: " + segment.toString());
+ FileInputFormat.addInputPath(fetch, segFetch);
+ FileOutputFormat.setOutputPath(fetch, newSegFetch);
+ fetch.setInputFormat(SequenceFileInputFormat.class);
+ fetch.setReducerClass(ReprUrlFixer.class);
+ fetch.setOutputKeyClass(Text.class);
+ fetch.setOutputValueClass(CrawlDatum.class);
+ fetch.setOutputFormat(MapFileOutputFormat.class);
+
+ try {
+ JobClient.runJob(fetch);
+ LOG.info("Installing new segment fetch directory " + newSegFetch);
+ FSUtils.replace(fs, segFetch, newSegFetch, true);
+ LOG.info("ReprUrlFixer: finished installing segment fetch directory");
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ LOG.info("Running ReprUtilFixer " + segment + " parse");
+ Path segParse = new Path(segment, CrawlDatum.PARSE_DIR_NAME);
+ Path newSegParse = new Path(segment, CrawlDatum.PARSE_DIR_NAME + "-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf parse = new NutchJob(conf);
+ parse.setJobName("ReprUtilFixer: " + segment.toString());
+ FileInputFormat.addInputPath(parse, segParse);
+ FileOutputFormat.setOutputPath(parse, newSegParse);
+ parse.setInputFormat(SequenceFileInputFormat.class);
+ parse.setReducerClass(ReprUrlFixer.class);
+ parse.setOutputKeyClass(Text.class);
+ parse.setOutputValueClass(CrawlDatum.class);
+ parse.setOutputFormat(MapFileOutputFormat.class);
+
+ try {
+ JobClient.runJob(parse);
+ LOG.info("Installing new segment parse directry " + newSegParse);
+ FSUtils.replace(fs, segParse, newSegParse, true);
+ LOG.info("ReprUrlFixer: finished installing segment parse directory");
+ }
+ catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * Runs The ReprUrlFixer.
+ */
+ public static void main(String[] args)
+ throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new ReprUrlFixer(),
+ args);
+ System.exit(res);
+ }
+
+ /**
+ * Parse command line options and execute the main update logic.
+ */
+ public int run(String[] args)
+ throws Exception {
+
+ Options options = new Options();
+ Option helpOpts = OptionBuilder.withArgName("help").withDescription(
+ "show this help message").create("help");
+ Option crawlDbOpts = OptionBuilder.withArgName("crawldb").hasArg().withDescription(
+ "the crawldb to use").create("crawldb");
+ Option segOpts = OptionBuilder.withArgName("segment").hasArgs().withDescription(
+ "the segment(s) to use").create("segment");
+ options.addOption(helpOpts);
+ options.addOption(crawlDbOpts);
+ options.addOption(segOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ // parse out common line arguments and make sure either a crawldb or a
+ // segment are specified
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help")
+ || (!line.hasOption("crawldb") && !line.hasOption("segment"))) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("ReprUtilFixer", options);
+ return -1;
+ }
+
+ // create paths for all of the segments specified, multiple segments may
+ // be run at once
+ String crawlDb = line.getOptionValue("crawldb");
+ String[] segments = line.getOptionValues("segment");
+ Path[] segPaths = new Path[segments != null ? segments.length : 0];
+ if (segments != null) {
+ for (int i = 0; i < segments.length; i++) {
+ segPaths[i] = new Path(segments[i]);
+ }
+ }
+ update(new Path(crawlDb), segPaths);
+ return 0;
+ }
+ catch (Exception e) {
+ LOG.fatal("ReprUtilFixer: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+}
\ No newline at end of file
Index: src/java/org/apache/nutch/util/FSUtils.java
===================================================================
--- src/java/org/apache/nutch/util/FSUtils.java (revision 0)
+++ src/java/org/apache/nutch/util/FSUtils.java (revision 0)
@@ -0,0 +1,83 @@
+package org.apache.nutch.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Utility methods for common filesystem operations.
+ */
+public class FSUtils {
+
+ /**
+ * Replaces the current path with the new path and if set removes the old
+ * path. If removeOld is set to false then the old path will be set to the
+ * name current.old.
+ *
+ * @param fs The FileSystem.
+ * @param current The end path, the one being replaced.
+ * @param replacement The path to replace with.
+ * @param removeOld True if we are removing the current path.
+ *
+ * @throws IOException If an error occurs during replacement.
+ */
+ public static void replace(FileSystem fs, Path current, Path replacement,
+ boolean removeOld)
+ throws IOException {
+
+ // rename any current path to old
+ Path old = new Path(current + ".old");
+ if (fs.exists(current)) {
+ fs.rename(current, old);
+ }
+
+ // rename the new path to current and remove the old path if needed
+ fs.rename(replacement, current);
+ if (fs.exists(old) && removeOld) {
+ fs.delete(old, true);
+ }
+ }
+
+ /**
+ * Closes a group of SequenceFile readers.
+ *
+ * @param readers The SequenceFile readers to close.
+ * @throws IOException If an error occurs while closing a reader.
+ */
+ public static void closeReaders(SequenceFile.Reader[] readers)
+ throws IOException {
+
+ // loop through the readers, closing one by one
+ if (readers != null) {
+ for (int i = 0; i < readers.length; i++) {
+ SequenceFile.Reader reader = readers[i];
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * Closes a group of MapFile readers.
+ *
+ * @param readers The MapFile readers to close.
+ * @throws IOException If an error occurs while closing a reader.
+ */
+ public static void closeReaders(MapFile.Reader[] readers)
+ throws IOException {
+
+ // loop through the readers closing one by one
+ if (readers != null) {
+ for (int i = 0; i < readers.length; i++) {
+ MapFile.Reader reader = readers[i];
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ }
+}
Index: src/java/org/apache/nutch/util/URLUtil.java
===================================================================
--- src/java/org/apache/nutch/util/URLUtil.java (revision 702976)
+++ src/java/org/apache/nutch/util/URLUtil.java (working copy)
@@ -29,185 +29,340 @@
private static Pattern IP_PATTERN = Pattern.compile("(\\d{1,3}\\.){3}(\\d{1,3})");
- /** Returns the domain name of the url. The domain name of a url is
- * the substring of the url's hostname, w/o subdomain names. As an
- * example
+ /**
+ * Returns the domain name of the url. The domain name of a url is the
+ * substring of the url's hostname, w/o subdomain names. As an example
+ *
* getDomainName(conf, new URL(http://lucene.apache.org/))
*
- * will return
apache.org
- * */
+ * will return
+ * apache.org
+ */
public static String getDomainName(URL url) {
DomainSuffixes tlds = DomainSuffixes.getInstance();
String host = url.getHost();
- //it seems that java returns hostnames ending with .
- if(host.endsWith("."))
+ // it seems that java returns hostnames ending with .
+ if (host.endsWith("."))
host = host.substring(0, host.length() - 1);
- if(IP_PATTERN.matcher(host).matches())
+ if (IP_PATTERN.matcher(host).matches())
return host;
-
+
int index = 0;
String candidate = host;
- for(;index >= 0;) {
+ for (; index >= 0;) {
index = candidate.indexOf('.');
- String subCandidate = candidate.substring(index+1);
- if(tlds.isDomainSuffix(subCandidate)) {
- return candidate;
+ String subCandidate = candidate.substring(index + 1);
+ if (tlds.isDomainSuffix(subCandidate)) {
+ return candidate;
}
candidate = subCandidate;
}
return candidate;
}
- /** Returns the domain name of the url. The domain name of a url is
- * the substring of the url's hostname, w/o subdomain names. As an
- * example
+ /**
+ * Returns the domain name of the url. The domain name of a url is the
+ * substring of the url's hostname, w/o subdomain names. As an example
+ *
* getDomainName(conf, new http://lucene.apache.org/)
*
- * will return
apache.org
+ * will return
+ * apache.org
+ *
* @throws MalformedURLException
*/
- public static String getDomainName(String url) throws MalformedURLException {
+ public static String getDomainName(String url)
+ throws MalformedURLException {
return getDomainName(new URL(url));
}
- /** Returns whether the given urls have the same domain name.
- * As an example,
+ /**
+ * Returns whether the given urls have the same domain name. As an example,
* isSameDomain(new URL("http://lucene.apache.org")
* , new URL("http://people.apache.org/"))
*
will return true.
- *
+ *
* @return true if the domain names are equal
*/
public static boolean isSameDomainName(URL url1, URL url2) {
return getDomainName(url1).equalsIgnoreCase(getDomainName(url2));
}
- /**Returns whether the given urls have the same domain name.
- * As an example,
- * isSameDomain("http://lucene.apache.org"
- * ,"http://people.apache.org/")
- *
will return true.
- * @return true if the domain names are equal
- * @throws MalformedURLException
- */
+ /**
+ * Returns whether the given urls have the same domain name. As an example,
+ * isSameDomain("http://lucene.apache.org"
+ * ,"http://people.apache.org/")
+ *
will return true.
+ *
+ * @return true if the domain names are equal
+ * @throws MalformedURLException
+ */
public static boolean isSameDomainName(String url1, String url2)
throws MalformedURLException {
return isSameDomainName(new URL(url1), new URL(url2));
}
- /** Returns the {@link DomainSuffix} corresponding to the
- * last public part of the hostname
+ /**
+ * Returns the {@link DomainSuffix} corresponding to the last public part of
+ * the hostname
*/
public static DomainSuffix getDomainSuffix(URL url) {
DomainSuffixes tlds = DomainSuffixes.getInstance();
String host = url.getHost();
- if(IP_PATTERN.matcher(host).matches())
+ if (IP_PATTERN.matcher(host).matches())
return null;
-
+
int index = 0;
String candidate = host;
- for(;index >= 0;) {
+ for (; index >= 0;) {
index = candidate.indexOf('.');
- String subCandidate = candidate.substring(index+1);
+ String subCandidate = candidate.substring(index + 1);
DomainSuffix d = tlds.get(subCandidate);
- if(d != null) {
- return d;
+ if (d != null) {
+ return d;
}
candidate = subCandidate;
}
return null;
}
- /** Returns the {@link DomainSuffix} corresponding to the
- * last public part of the hostname
+ /**
+ * Returns the {@link DomainSuffix} corresponding to the last public part of
+ * the hostname
*/
- public static DomainSuffix getDomainSuffix(String url) throws MalformedURLException {
+ public static DomainSuffix getDomainSuffix(String url)
+ throws MalformedURLException {
return getDomainSuffix(new URL(url));
}
- /** Partitions of the hostname of the url by "." */
+ /** Partitions of the hostname of the url by "." */
public static String[] getHostSegments(URL url) {
String host = url.getHost();
- //return whole hostname, if it is an ipv4
- //TODO : handle ipv6
- if(IP_PATTERN.matcher(host).matches())
- return new String[] {host};
+ // return whole hostname, if it is an ipv4
+ // TODO : handle ipv6
+ if (IP_PATTERN.matcher(host).matches())
+ return new String[]{host};
return host.split("\\.");
}
- /** Partitions of the hostname of the url by "."
- * @throws MalformedURLException */
- public static String[] getHostSegments(String url) throws MalformedURLException {
- return getHostSegments(new URL(url));
+ /**
+ * Partitions of the hostname of the url by "."
+ *
+ * @throws MalformedURLException
+ */
+ public static String[] getHostSegments(String url)
+ throws MalformedURLException {
+ return getHostSegments(new URL(url));
}
-
- /** Given two urls (source and destination of the redirect),
- * returns the representative one.
- *
- * Implements the algorithm described here:
+
+ /**
+ *
Given two urls, a src and a destination of a redirect, it returns the
+ * representative url.
+ *
+ *
This method implements an extended version of the algorithm used by the
+ * Yahoo! Slurp crawler described here:
+ * How
+ * does the Yahoo! webcrawler handle redirects?
*
- *
- * How does the Yahoo! webcrawler handle redirects?
- *
- * The algorithm is as follows:
*
- * - Choose target url if either url is malformed.
- * - When a page in one domain redirects to a page in another domain,
- * choose the "target" URL.
- * - When a top-level page in a domain presents a permanent redirect
- * to a page deep within the same domain, choose the "source" URL.
- * - When a page deep within a domain presents a permanent redirect
- * to a page deep within the same domain, choose the "target" URL.
- * - When a page in a domain presents a temporary redirect to
- * another page in the same domain, choose the "source" URL.
-
- *
- *
- *
- * @param src Source url of redirect
- * @param dst Destination url of redirect
- * @param temp Flag to indicate if redirect is temporary
- * @return Representative url (either src or dst)
+ * Choose target url if either url is malformed.
+ * If different domains the keep the destination whether or not the
+ * redirect is temp or perm
+ *
+ * If the redirect is permanent and the source is root, keep the source.
+ * - *a.com -> a.com?y=1 || *a.com -> a.com/xyz/index.html
+ * If the redirect is permanent and the source is not root and the
+ * destination is root, keep the destination
+ * - a.com/xyz/index.html -> a.com*
+ * If the redirect is permanent and neither the source nor the destination
+ * is root, then keep the destination
+ * - a.com/xyz/index.html -> a.com/abc/page.html*
+ * If the redirect is temporary and source is root and destination is not
+ * root, then keep the source
+ * - *a.com -> a.com/xyz/index.html
+ * If the redirect is temporary and source is not root and destination is
+ * root, then keep the destination
+ * - a.com/xyz/index.html -> a.com*
+ * If the redirect is temporary and neither the source or the destination
+ * is root, then keep the shortest url. First check for the shortest host,
+ * and if both are equal then check by path. Path is first by length then by
+ * the number of / path separators.
+ *
+ * - a.com/xyz/index.html -> a.com/abc/page.html*
+ * - *www.a.com/xyz/index.html -> www.news.a.com/xyz/index.html
+ *
+ * If the redirect is temporary and both the source and the destination
+ * are root, then keep the shortest sub-domain
+ * - *www.a.com -> www.news.a.com
+ *
+ * While not in this logic there is a further piece of representative url
+ * logic that occurs during indexing and after scoring. During creation of
+ * the basic fields before indexing, if a url has a representative url stored
+ * we check both the url and its representative url (which should never be
+ * the same) against their linkrank scores and the highest scoring one is
+ * kept as the url and the lower scoring one is held as the orig url inside
+ * of the index.
+ *
+ * @param src The source url.
+ * @param dst The destination url.
+ * @param temp Is the redirect a temporary redirect.
+ *
+ * @return String The representative url.
*/
public static String chooseRepr(String src, String dst, boolean temp) {
+
+ // validate both are well formed urls
URL srcUrl;
URL dstUrl;
try {
srcUrl = new URL(src);
dstUrl = new URL(dst);
- } catch (MalformedURLException e) {
+ }
+ catch (MalformedURLException e) {
return dst;
}
+ // get the source and destination domain, host, and page
String srcDomain = URLUtil.getDomainName(srcUrl);
String dstDomain = URLUtil.getDomainName(dstUrl);
+ String srcHost = srcUrl.getHost();
+ String dstHost = dstUrl.getHost();
+ String srcFile = srcUrl.getFile();
+ String dstFile = dstUrl.getFile();
+ // are the source and destination the root path url.com/ or url.com
+ boolean srcRoot = (srcFile.equals("/") || srcFile.length() == 0);
+ boolean destRoot = (dstFile.equals("/") || dstFile.length() == 0);
+
+ // 1) different domain them keep dest, temp or perm
+ // a.com -> b.com*
+ //
+ // 2) permanent and root, keep src
+ // *a.com -> a.com?y=1 || *a.com -> a.com/xyz/index.html
+ //
+ // 3) permanent and not root and dest root, keep dest
+ // a.com/xyz/index.html -> a.com*
+ //
+ // 4) permanent and neither root keep dest
+ // a.com/xyz/index.html -> a.com/abc/page.html*
+ //
+ // 5) temp and root and dest not root keep src
+ // *a.com -> a.com/xyz/index.html
+ //
+ // 7) temp and not root and dest root keep dest
+ // a.com/xyz/index.html -> a.com*
+ //
+ // 8) temp and neither root, keep shortest, if hosts equal by path else by
+ // hosts. paths are first by length then by number of / separators
+ // a.com/xyz/index.html -> a.com/abc/page.html*
+ // *www.a.com/xyz/index.html -> www.news.a.com/xyz/index.html
+ //
+ // 9) temp and both root keep shortest sub domain
+ // *www.a.com -> www.news.a.com
+
+ // if we are dealing with a redirect from one domain to another keep the
+ // destination
if (!srcDomain.equals(dstDomain)) {
return dst;
}
- String srcFile = srcUrl.getFile();
+ // if it is a permanent redirect
+ if (!temp) {
+
+ // if source is root return source, otherwise destination
+ if (srcRoot) {
+ return src;
+ }
+ else {
+ return dst;
+ }
+ }
+ else { // temporary redirect
+
+ // source root and destination not root
+ if (srcRoot && !destRoot) {
+ return src;
+ }
+ else if (!srcRoot && destRoot) { // destination root and source not
+ return dst;
+ }
+ else if (!srcRoot && !destRoot && (srcHost.equals(dstHost))) {
+
+ // source and destination hosts are the same, check paths, host length
+ int numSrcPaths = srcFile.split("/").length;
+ int numDstPaths = dstFile.split("/").length;
+ if (numSrcPaths != numDstPaths) {
+ return (numDstPaths < numSrcPaths ? dst : src);
+ }
+ else {
+ int srcPathLength = srcFile.length();
+ int dstPathLength = dstFile.length();
+ return (dstPathLength < srcPathLength ? dst : src);
+ }
+ }
+ else {
- if (!temp && srcFile.equals("/")) {
- return src;
+ // different host names and both root take the shortest
+ int numSrcSubs = srcHost.split("\\.").length;
+ int numDstSubs = dstHost.split("\\.").length;
+ return (numDstSubs < numSrcSubs ? dst : src);
+ }
}
-
- return temp ? src : dst;
}
/** For testing */
- public static void main(String[] args){
+ public static void main(String[] args) {
+
+// if (args.length != 1) {
+// System.err.println("Usage : URLUtil ");
+// return;
+// }
+//
+// String url = args[0];
+// try {
+// System.out.println(URLUtil.getDomainName(new URL(url)));
+// }
+// catch (MalformedURLException ex) {
+// ex.printStackTrace();
+// }
- if(args.length!=1) {
- System.err.println("Usage : URLUtil ");
- return ;
+ System.out.println(chooseRepr("http://milogardner@yahoo.com/", "http://www.yahoo.com/", true));
+ }
+
+ /**
+ * Returns the lowercased hostname for the url or null if the url is not well
+ * formed.
+ *
+ * @param url The url to check.
+ * @return String The hostname for the url.
+ */
+ public static String getHost(String url) {
+ try {
+ return new URL(url).getHost().toLowerCase();
}
-
- String url = args[0];
+ catch (MalformedURLException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Returns the page for the url. The page consists of the protocol, host,
+ * and path, but does not include the query string. The host is lowercased
+ * but the path is not.
+ *
+ * @param url The url to check.
+ * @return String The page for the url.
+ */
+ public static String getPage(String url) {
try {
- System.out.println(URLUtil.getDomainName(new URL(url)));
+ // get the full url, and replace the query string with and empty string
+ url = url.toLowerCase();
+ String queryStr = new URL(url).getQuery();
+ return (queryStr != null) ? url.replace("?" + queryStr, "") : url;
}
- catch (MalformedURLException ex) {
- ex.printStackTrace();
+ catch (MalformedURLException e) {
+ return null;
}
}
}
Index: src/test/org/apache/nutch/util/TestURLUtil.java
===================================================================
--- src/test/org/apache/nutch/util/TestURLUtil.java (revision 702976)
+++ src/test/org/apache/nutch/util/TestURLUtil.java (working copy)
@@ -22,14 +22,17 @@
import junit.framework.TestCase;
/** Test class for URLUtil */
-public class TestURLUtil extends TestCase {
+public class TestURLUtil
+ extends TestCase {
@Override
- protected void setUp() throws Exception {
+ protected void setUp()
+ throws Exception {
super.setUp();
}
- public void testGetDomainName() throws Exception{
+ public void testGetDomainName()
+ throws Exception {
URL url = null;
@@ -51,7 +54,7 @@
url = new URL("http://www.example.co.uk.com");
assertEquals("uk.com", URLUtil.getDomainName(url));
- //"nn" is not a tld
+ // "nn" is not a tld
url = new URL("http://example.com.nn");
assertEquals("nn", URLUtil.getDomainName(url));
@@ -60,25 +63,26 @@
url = new URL("http://www.edu.tr.xyz");
assertEquals("xyz", URLUtil.getDomainName(url));
-
+
url = new URL("http://www.example.c.se");
assertEquals("example.c.se", URLUtil.getDomainName(url));
- //plc.co.im is listed as a domain suffix
+ // plc.co.im is listed as a domain suffix
url = new URL("http://www.example.plc.co.im");
assertEquals("example.plc.co.im", URLUtil.getDomainName(url));
-
- //2000.hu is listed as a domain suffix
+
+ // 2000.hu is listed as a domain suffix
url = new URL("http://www.example.2000.hu");
assertEquals("example.2000.hu", URLUtil.getDomainName(url));
-
- //test non-ascii
+
+ // test non-ascii
url = new URL("http://www.example.商業.tw");
assertEquals("example.商業.tw", URLUtil.getDomainName(url));
-
+
}
- public void testGetDomainSuffix() throws Exception{
+ public void testGetDomainSuffix()
+ throws Exception {
URL url = null;
url = new URL("http://lucene.apache.org/nutch");
@@ -96,7 +100,7 @@
url = new URL("http://www.example.co.uk.com");
assertEquals("com", URLUtil.getDomainSuffix(url).getDomain());
- //"nn" is not a tld
+ // "nn" is not a tld
url = new URL("http://example.com.nn");
assertNull(URLUtil.getDomainSuffix(url));
@@ -105,59 +109,108 @@
url = new URL("http://www.edu.tr.xyz");
assertNull(URLUtil.getDomainSuffix(url));
-
+
url = new URL("http://subdomain.example.edu.tr");
assertEquals("edu.tr", URLUtil.getDomainSuffix(url).getDomain());
-
+
url = new URL("http://subdomain.example.presse.fr");
assertEquals("presse.fr", URLUtil.getDomainSuffix(url).getDomain());
-
+
url = new URL("http://subdomain.example.presse.tr");
assertEquals("tr", URLUtil.getDomainSuffix(url).getDomain());
-
- //plc.co.im is listed as a domain suffix
+
+ // plc.co.im is listed as a domain suffix
url = new URL("http://www.example.plc.co.im");
assertEquals("plc.co.im", URLUtil.getDomainSuffix(url).getDomain());
-
- //2000.hu is listed as a domain suffix
+
+ // 2000.hu is listed as a domain suffix
url = new URL("http://www.example.2000.hu");
assertEquals("2000.hu", URLUtil.getDomainSuffix(url).getDomain());
-
- //test non-ascii
+
+ // test non-ascii
url = new URL("http://www.example.商業.tw");
assertEquals("商業.tw", URLUtil.getDomainSuffix(url).getDomain());
-
+
}
-
- public void testGetHostSegments() throws Exception{
+
+ public void testGetHostSegments()
+ throws Exception {
URL url;
String[] segments;
-
+
url = new URL("http://subdomain.example.edu.tr");
segments = URLUtil.getHostSegments(url);
assertEquals("subdomain", segments[0]);
assertEquals("example", segments[1]);
assertEquals("edu", segments[2]);
assertEquals("tr", segments[3]);
-
+
url = new URL("http://");
segments = URLUtil.getHostSegments(url);
assertEquals(1, segments.length);
assertEquals("", segments[0]);
-
+
url = new URL("http://140.211.11.130/foundation/contributing.html");
segments = URLUtil.getHostSegments(url);
assertEquals(1, segments.length);
assertEquals("140.211.11.130", segments[0]);
-
- //test non-ascii
+
+ // test non-ascii
url = new URL("http://www.example.商業.tw");
segments = URLUtil.getHostSegments(url);
assertEquals("www", segments[0]);
assertEquals("example", segments[1]);
assertEquals("商業", segments[2]);
assertEquals("tw", segments[3]);
+
+ }
+
+ public void testChooseRepr()
+ throws Exception {
+
+ String aDotCom = "http://www.a.com";
+ String bDotCom = "http://www.b.com";
+ String aSubDotCom = "http://www.news.a.com";
+ String aQStr = "http://www.a.com?y=1";
+ String aPath = "http://www.a.com/xyz/index.html";
+ String aPath2 = "http://www.a.com/abc/page.html";
+ String aPath3 = "http://www.news.a.com/abc/page.html";
+
+ // 1) different domain them keep dest, temp or perm
+ // a.com -> b.com*
+ assertEquals(bDotCom, URLUtil.chooseRepr(aDotCom, bDotCom, true));
+ assertEquals(bDotCom, URLUtil.chooseRepr(aDotCom, bDotCom, false));
+
+ // 2) permanent and root, keep src
+ // *a.com -> a.com?y=1 || *a.com -> a.com/xyz/index.html
+ assertEquals(aDotCom, URLUtil.chooseRepr(aDotCom, aQStr, false));
+ assertEquals(aDotCom, URLUtil.chooseRepr(aDotCom, aPath, false));
+
+ //3) permanent and not root and dest root, keep dest
+ //a.com/xyz/index.html -> a.com*
+ assertEquals(aDotCom, URLUtil.chooseRepr(aPath, aDotCom, false));
+ //4) permanent and neither root keep dest
+ // a.com/xyz/index.html -> a.com/abc/page.html*
+ assertEquals(aPath2, URLUtil.chooseRepr(aPath, aPath2, false));
+
+ //5) temp and root and dest not root keep src
+ //*a.com -> a.com/xyz/index.html
+ assertEquals(aDotCom, URLUtil.chooseRepr(aDotCom, aPath, true));
+
+ //6) temp and not root and dest root keep dest
+ // a.com/xyz/index.html -> a.com*
+ assertEquals(aDotCom, URLUtil.chooseRepr(aPath, aDotCom, true));
+
+ //7) temp and neither root, keep shortest, if hosts equal by path else by hosts
+ // a.com/xyz/index.html -> a.com/abc/page.html*
+ // *www.a.com/xyz/index.html -> www.news.a.com/xyz/index.html
+ assertEquals(aPath2, URLUtil.chooseRepr(aPath, aPath2, true));
+ assertEquals(aPath, URLUtil.chooseRepr(aPath, aPath3, true));
+
+ //8) temp and both root keep shortest sub domain
+ // *www.a.com -> www.news.a.com
+ assertEquals(aDotCom, URLUtil.chooseRepr(aDotCom, aSubDotCom, true));
}
}