Index: src/java/org/apache/nutch/scoring/webgraph/WebGraph.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/java/org/apache/nutch/scoring/webgraph/WebGraph.java (revision 1431804)
+++ src/java/org/apache/nutch/scoring/webgraph/WebGraph.java (revision )
@@ -34,6 +34,11 @@
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -45,19 +50,11 @@
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapFileOutputFormat;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.nutch.crawl.NutchWritable;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.metadata.Nutch;
@@ -69,7 +66,6 @@
import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.LockUtil;
import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.URLUtil;
@@ -77,14 +73,14 @@
* Creates three databases, one for inlinks, one for outlinks, and a node
* database that holds the number of in and outlinks to a url and the current
* score for the url.
- *
+ *
* The score is set by an analysis program such as LinkRank. The WebGraph is an
* update-able database. Outlinks are stored by their fetch time or by the
* current system time if no fetch time is available. Only the most recent
* version of outlinks for a given url is stored. As more crawls are executed
* and the WebGraph updated, newer Outlinks will replace older Outlinks. This
* allows the WebGraph to adapt to changes in the link structure of the web.
- *
+ *
* The Inlink database is created from the Outlink database and is regenerated
* when the WebGraph is updated. The Node database is created from both the
* Inlink and Outlink databases. Because the Node database is overwritten when
@@ -96,7 +92,6 @@
public class WebGraph
extends Configured
implements Tool {
-
public static final Logger LOG = LoggerFactory.getLogger(WebGraph.class);
public static final String LOCK_NAME = ".locked";
public static final String INLINK_DIR = "inlinks";
@@ -109,22 +104,11 @@
* by domain and host can be ignored. The number of Outlinks out to a given
* page or domain can also be limited.
*/
- public static class OutlinkDb
- extends Configured
- implements Mapper,
- Reducer {
-
+ public static class OutlinkDbMapper
+ extends Mapper {
public static final String URL_NORMALIZING = "webgraph.url.normalizers";
public static final String URL_FILTERING = "webgraph.url.filters";
- // ignoring internal domains, internal hosts
- private boolean ignoreDomain = true;
- private boolean ignoreHost = true;
-
- // limiting urls out to a page or to a domain
- private boolean limitPages = true;
- private boolean limitDomains = true;
-
// using normalizers and/or filters
private boolean normalize = false;
private boolean filter = false;
@@ -132,13 +116,11 @@
// url normalizers, filters and job configuration
private URLNormalizers urlNormalizers;
private URLFilters filters;
- private JobConf conf;
/**
* Normalizes and trims extra whitespace from the given url.
- *
+ *
* @param url The url to normalize.
- *
* @return The normalized url.
*/
private String normalizeUrl(String url) {
@@ -153,11 +135,11 @@
// normalize and trim the url
normalized = urlNormalizers.normalize(url,
- URLNormalizers.SCOPE_DEFAULT);
+ URLNormalizers.SCOPE_DEFAULT);
normalized = normalized.trim();
}
catch (Exception e) {
- LOG.warn("Skipping " + url + ":" + e);
+ LOG.warn("Skipping " + url + ":" + e);
normalized = null;
}
}
@@ -168,30 +150,28 @@
* Filters the given url.
*
* @param url The url to filter.
- *
* @return The filtered url or null.
*/
private String filterUrl(String url) {
if (!filter) {
- return url;
+ return url;
}
try {
- url = filters.filter(url);
+ url = filters.filter(url);
} catch (Exception e) {
- url = null;
+ url = null;
}
- return url;
- }
+ return url;
+ }
/**
* Returns the fetch time from the parse data or the current system time if
* the fetch time doesn't exist.
- *
+ *
* @param data The parse data.
- *
* @return The fetch time as a long.
*/
private long getFetchTime(ParseData data) {
@@ -213,36 +193,25 @@
/**
* Default constructor.
*/
- public OutlinkDb() {
+ public OutlinkDbMapper() {
}
/**
- * Configurable constructor.
- */
- public OutlinkDb(Configuration conf) {
- setConf(conf);
- }
-
- /**
* Configures the OutlinkDb job. Sets up internal links and link limiting.
*/
- public void configure(JobConf conf) {
- this.conf = conf;
- ignoreHost = conf.getBoolean("link.ignore.internal.host", true);
- ignoreDomain = conf.getBoolean("link.ignore.internal.domain", true);
- limitPages = conf.getBoolean("link.ignore.limit.page", true);
- limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
+ protected void setup(Context context
+ ) throws IOException, InterruptedException {
- normalize = conf.getBoolean(URL_NORMALIZING, false);
- filter = conf.getBoolean(URL_FILTERING, false);
+ normalize = context.getConfiguration().getBoolean(URL_NORMALIZING, false);
+ filter = context.getConfiguration().getBoolean(URL_FILTERING, false);
if (normalize) {
- urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
+ urlNormalizers = new URLNormalizers(context.getConfiguration(), URLNormalizers.SCOPE_DEFAULT);
}
if (filter) {
- filters = new URLFilters(conf);
+ filters = new URLFilters(context.getConfiguration());
- }
+ }
}
/**
@@ -250,8 +219,8 @@
* maps out new LinkDatum objects from new crawls ParseData.
*/
public void map(Text key, Writable value,
- OutputCollector output, Reporter reporter)
- throws IOException {
+ Context context)
+ throws IOException, InterruptedException {
// normalize url, stop processing if null
String url = normalizeUrl(key.toString());
@@ -275,7 +244,7 @@
datum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) {
// Tell the reducer to get rid of all instances of this key
- output.collect(key, new NutchWritable(new BooleanWritable(true)));
+ context.write(key, new NutchWritable(new BooleanWritable(true)));
}
}
else if (value instanceof ParseData) {
@@ -311,7 +280,7 @@
for (String outlinkUrl : outlinkMap.keySet()) {
String anchor = outlinkMap.get(outlinkUrl);
LinkDatum datum = new LinkDatum(outlinkUrl, anchor, fetchTime);
- output.collect(key, new NutchWritable(datum));
+ context.write(key, new NutchWritable(datum));
}
}
else if (value instanceof LinkDatum) {
@@ -322,22 +291,64 @@
datum.setUrl(linkDatumUrl);
// collect existing outlinks from existing OutlinkDb
- output.collect(key, new NutchWritable(datum));
+ context.write(key, new NutchWritable(datum));
}
}
}
- public void reduce(Text key, Iterator values,
- OutputCollector output, Reporter reporter)
- throws IOException {
+ /**
+ * Called once at the end of the task.
+ */
+ protected void cleanup(Context context
+ ) throws IOException, InterruptedException {
+ // NOTHING
+ }
+ }
+ /**
+ * The OutlinkDb creates a database of all outlinks. Outlinks to internal urls
+ * by domain and host can be ignored. The number of Outlinks out to a given
+ * page or domain can also be limited.
+ */
+ public static class OutlinkDbReducer
+ extends Reducer {
+
+ // ignoring internal domains, internal hosts
+ private boolean ignoreDomain = true;
+ private boolean ignoreHost = true;
+
+ // limiting urls out to a page or to a domain
+ private boolean limitPages = true;
+ private boolean limitDomains = true;
+
+ /**
+ * Default constructor.
+ */
+ public OutlinkDbReducer() {
+ }
+
+ /**
+ * Configures the OutlinkDb job. Sets up internal links and link limiting.
+ */
+ @Override
+ protected void setup(Context context
+ ) throws IOException, InterruptedException {
+
+ ignoreHost = context.getConfiguration().getBoolean("link.ignore.internal.host", true);
+ ignoreDomain = context.getConfiguration().getBoolean("link.ignore.internal.domain", true);
+ limitPages = context.getConfiguration().getBoolean("link.ignore.limit.page", true);
+ limitDomains = context.getConfiguration().getBoolean("link.ignore.limit.domain", true);
+ }
+
+ @Override
+ protected void reduce(Text key, Iterable values, Context context)
+ throws IOException, InterruptedException {
+
// aggregate all outlinks, get the most recent timestamp for a fetch
// which should be the timestamp for all of the most recent outlinks
long mostRecent = 0L;
List outlinkList = new ArrayList();
- while (values.hasNext()) {
- Writable value = values.next().get();
-
+ for (Writable value : values) {
if (value instanceof LinkDatum) {
// loop through, change out most recent timestamp if needed
LinkDatum next = (LinkDatum)value;
@@ -345,15 +356,15 @@
if (mostRecent == 0L || mostRecent < timestamp) {
mostRecent = timestamp;
}
- outlinkList.add((LinkDatum)WritableUtils.clone(next, conf));
- reporter.incrCounter("WebGraph.outlinks", "added links", 1);
+ outlinkList.add((LinkDatum)WritableUtils.clone(next, context.getConfiguration()));
+ context.getCounter("WebGraph.outlinks", "added links").increment(1);
- }
- else if (value instanceof BooleanWritable) {
+ }
+ else if (value instanceof BooleanWritable) {
BooleanWritable delete = (BooleanWritable)value;
// Actually, delete is always true, otherwise we don't emit it in the mapper in the first place
if (delete.get() == true) {
// This page is gone, do not emit it's outlinks
- reporter.incrCounter("WebGraph.outlinks", "removed links", 1);
+ context.getCounter("WebGraph.outlinks", "removed links").increment(1);
return;
}
}
@@ -385,16 +396,13 @@
&& (!limitDomains || (limitDomains && !domains.contains(toDomain)))
&& (!ignoreHost || (ignoreHost && !toHost.equalsIgnoreCase(host)))
&& (!ignoreDomain || (ignoreDomain && !toDomain.equalsIgnoreCase(domain)))) {
- output.collect(key, datum);
+ context.write(key, datum);
pages.add(toPage);
domains.add(toDomain);
}
}
- }
+ }
-
- public void close() {
}
- }
/**
* The InlinkDb creates a database of Inlinks. Inlinks are inverted from the
@@ -402,44 +410,32 @@
* updated.
*/
private static class InlinkDb
- extends Configured
- implements Mapper {
+ extends Mapper {
- private JobConf conf;
private long timestamp;
/**
* Default constructor.
*/
public InlinkDb() {
+ timestamp = System.currentTimeMillis();
}
/**
- * Configurable constructor.
+ * Called once at the end of the task.
*/
- public InlinkDb(Configuration conf) {
- setConf(conf);
+ protected void cleanup(Context context
+ ) throws IOException, InterruptedException {
+ // NOTHING
}
/**
- * Configures job. Sets timestamp for all Inlink LinkDatum objects to the
- * current system time.
- */
- public void configure(JobConf conf) {
- this.conf = conf;
- timestamp = System.currentTimeMillis();
- }
-
- public void close() {
- }
-
- /**
* Inverts the Outlink LinkDatum objects into new LinkDatum objects with a
* new system timestamp, type and to and from url switched.
*/
public void map(Text key, LinkDatum datum,
- OutputCollector output, Reporter reporter)
- throws IOException {
+ Context context)
+ throws IOException, InterruptedException {
// get the to and from url and the anchor
String fromUrl = key.toString();
@@ -449,7 +445,7 @@
// flip the from and to url and set the new link type
LinkDatum inlink = new LinkDatum(fromUrl, anchor, timestamp);
inlink.setLinkType(LinkDatum.INLINK);
- output.collect(new Text(toUrl), inlink);
+ context.write(new Text(toUrl), inlink);
}
}
@@ -458,11 +454,7 @@
* for each url and a score slot for analysis programs such as LinkRank.
*/
private static class NodeDb
- extends Configured
- implements Reducer {
-
- private JobConf conf;
-
+ extends Reducer {
/**
* Default constructor.
*/
@@ -470,29 +462,20 @@
}
/**
- * Configurable constructor.
+ * Called once at the end of the task.
*/
- public NodeDb(Configuration conf) {
- setConf(conf);
+ protected void cleanup(Context context
+ ) throws IOException, InterruptedException {
+ // NOTHING
}
/**
- * Configures job.
- */
- public void configure(JobConf conf) {
- this.conf = conf;
- }
-
- public void close() {
- }
-
- /**
* Counts the number of inlinks and outlinks for each url and sets a default
* score of 0.0 for each url (node) in the webgraph.
*/
public void reduce(Text key, Iterator values,
- OutputCollector output, Reporter reporter)
- throws IOException {
+ Mapper.Context context)
+ throws IOException, InterruptedException {
Node node = new Node();
int numInlinks = 0;
@@ -503,7 +486,7 @@
LinkDatum next = values.next();
if (next.getLinkType() == LinkDatum.INLINK) {
numInlinks++;
- }
+ }
else if (next.getLinkType() == LinkDatum.OUTLINK) {
numOutlinks++;
}
@@ -513,25 +496,25 @@
node.setNumInlinks(numInlinks);
node.setNumOutlinks(numOutlinks);
node.setInlinkScore(0.0f);
- output.collect(key, node);
+ context.write(key, node);
- }
+ }
}
/**
* Creates the three different WebGraph databases, Outlinks, Inlinks, and
* Node. If a current WebGraph exists then it is updated, if it doesn't exist
* then a new WebGraph database is created.
- *
+ *
* @param webGraphDb The WebGraph to create or update.
* @param segments The array of segments used to update the WebGraph. Newer
* segments and fetch times will overwrite older segments.
* @param normalize whether to use URLNormalizers on URL's in the segment
* @param filter whether to use URLFilters on URL's in the segment
- *
+ *
* @throws IOException If an error occurs while processing the WebGraph.
*/
public void createWebGraph(Path webGraphDb, Path[] segments, boolean normalize, boolean filter)
- throws IOException {
+ throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
@@ -563,7 +546,7 @@
Path tempOutlinkDb = new Path(outlinkDb + "-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
- JobConf outlinkJob = new NutchJob(conf);
+ Job outlinkJob = new Job(conf);
outlinkJob.setJobName("Outlinkdb: " + outlinkDb);
boolean deleteGone = conf.getBoolean("link.delete.gone", false);
@@ -592,121 +575,124 @@
}
}
- // add the existing webgraph
- LOG.info("OutlinkDb: adding input: " + outlinkDb);
- FileInputFormat.addInputPath(outlinkJob, outlinkDb);
+ // add the existing webgraph
+ LOG.info("OutlinkDb: adding input: " + outlinkDb);
+ FileInputFormat.addInputPath(outlinkJob, outlinkDb);
- outlinkJob.setBoolean(OutlinkDb.URL_NORMALIZING, normalize);
- outlinkJob.setBoolean(OutlinkDb.URL_FILTERING, filter);
+ outlinkJob.getConfiguration().setBoolean(OutlinkDbMapper.URL_NORMALIZING, normalize);
+ outlinkJob.getConfiguration().setBoolean(OutlinkDbMapper.URL_FILTERING, filter);
- outlinkJob.setInputFormat(SequenceFileInputFormat.class);
- outlinkJob.setMapperClass(OutlinkDb.class);
- outlinkJob.setReducerClass(OutlinkDb.class);
+ outlinkJob.setInputFormatClass(SequenceFileInputFormat.class);
+ outlinkJob.setMapperClass(OutlinkDbMapper.class);
+ outlinkJob.setReducerClass(OutlinkDbReducer.class);
- outlinkJob.setMapOutputKeyClass(Text.class);
- outlinkJob.setMapOutputValueClass(NutchWritable.class);
- outlinkJob.setOutputKeyClass(Text.class);
- outlinkJob.setOutputValueClass(LinkDatum.class);
- FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb);
+ outlinkJob.setMapOutputKeyClass(Text.class);
+ outlinkJob.setMapOutputValueClass(NutchWritable.class);
+ outlinkJob.setOutputKeyClass(Text.class);
+ outlinkJob.setOutputValueClass(LinkDatum.class);
+ FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb);
- outlinkJob.setOutputFormat(MapFileOutputFormat.class);
- outlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+ outlinkJob.setOutputFormatClass(MapFileOutputFormat.class);
+ outlinkJob.getConfiguration().setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
- // run the outlinkdb job and replace any old outlinkdb with the new one
- try {
+ // run the outlinkdb job and replace any old outlinkdb with the new one
+ try {
LOG.info("OutlinkDb: running");
- JobClient.runJob(outlinkJob);
+ try {
+ outlinkJob.waitForCompletion(true);
+ } catch (Exception e) {
+ throw e;
+ }
LOG.info("OutlinkDb: installing " + outlinkDb);
FSUtils.replace(fs, oldOutlinkDb, outlinkDb, true);
FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true);
if (!preserveBackup && fs.exists(oldOutlinkDb)) fs.delete(oldOutlinkDb, true);
LOG.info("OutlinkDb: finished");
- }
- catch (IOException e) {
+ }
+ catch (IOException e) {
-
// remove lock file and and temporary directory if an error occurs
LockUtil.removeLockFile(fs, lock);
if (fs.exists(tempOutlinkDb)) {
- fs.delete(tempOutlinkDb, true);
+ fs.delete(tempOutlinkDb, true);
}
LOG.error(StringUtils.stringifyException(e));
throw e;
- }
+ }
-
- // inlink and temp link database paths
- Path inlinkDb = new Path(webGraphDb, INLINK_DIR);
- Path tempInlinkDb = new Path(inlinkDb + "-"
+ // inlink and temp link database paths
+ Path inlinkDb = new Path(webGraphDb, INLINK_DIR);
+ Path tempInlinkDb = new Path(inlinkDb + "-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-
- JobConf inlinkJob = new NutchJob(conf);
+ Job inlinkJob = new Job(conf);
- inlinkJob.setJobName("Inlinkdb " + inlinkDb);
- LOG.info("InlinkDb: adding input: " + outlinkDb);
- FileInputFormat.addInputPath(inlinkJob, outlinkDb);
+ inlinkJob.setJobName("Inlinkdb " + inlinkDb);
+ LOG.info("InlinkDb: adding input: " + outlinkDb);
+ FileInputFormat.addInputPath(inlinkJob, outlinkDb);
- inlinkJob.setInputFormat(SequenceFileInputFormat.class);
+ inlinkJob.setInputFormatClass(SequenceFileInputFormat.class);
- inlinkJob.setMapperClass(InlinkDb.class);
- inlinkJob.setMapOutputKeyClass(Text.class);
- inlinkJob.setMapOutputValueClass(LinkDatum.class);
- inlinkJob.setOutputKeyClass(Text.class);
- inlinkJob.setOutputValueClass(LinkDatum.class);
- FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb);
+ inlinkJob.setMapperClass(InlinkDb.class);
+ inlinkJob.setMapOutputKeyClass(Text.class);
+ inlinkJob.setMapOutputValueClass(LinkDatum.class);
+ inlinkJob.setOutputKeyClass(Text.class);
+ inlinkJob.setOutputValueClass(LinkDatum.class);
+ FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb);
- inlinkJob.setOutputFormat(MapFileOutputFormat.class);
- inlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
-
+ inlinkJob.setOutputFormatClass(MapFileOutputFormat.class);
+ inlinkJob.getConfiguration().setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
- try {
+ try {
-
- // run the inlink and replace any old with new
- LOG.info("InlinkDb: running");
+ // run the inlink and replace any old with new
+ LOG.info("InlinkDb: running");
- JobClient.runJob(inlinkJob);
+ try {
+ inlinkJob.waitForCompletion(true);
+ } catch (Exception e) {
+ throw e;
+ }
- LOG.info("InlinkDb: installing " + inlinkDb);
- FSUtils.replace(fs, inlinkDb, tempInlinkDb, true);
- LOG.info("InlinkDb: finished");
+ LOG.info("InlinkDb: installing " + inlinkDb);
+ FSUtils.replace(fs, inlinkDb, tempInlinkDb, true);
+ LOG.info("InlinkDb: finished");
}
- catch (IOException e) {
+ catch (IOException e) {
-
// remove lock file and and temporary directory if an error occurs
LockUtil.removeLockFile(fs, lock);
if (fs.exists(tempInlinkDb)) {
fs.delete(tempInlinkDb, true);
- }
+ }
LOG.error(StringUtils.stringifyException(e));
throw e;
- }
+ }
-
- // node and temp node database paths
- Path nodeDb = new Path(webGraphDb, NODE_DIR);
- Path tempNodeDb = new Path(nodeDb + "-"
+ // node and temp node database paths
+ Path nodeDb = new Path(webGraphDb, NODE_DIR);
+ Path tempNodeDb = new Path(nodeDb + "-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-
- JobConf nodeJob = new NutchJob(conf);
+ Job nodeJob = new Job(conf);
- nodeJob.setJobName("NodeDb " + nodeDb);
- LOG.info("NodeDb: adding input: " + outlinkDb);
- LOG.info("NodeDb: adding input: " + inlinkDb);
- FileInputFormat.addInputPath(nodeJob, outlinkDb);
- FileInputFormat.addInputPath(nodeJob, inlinkDb);
+ nodeJob.setJobName("NodeDb " + nodeDb);
+ LOG.info("NodeDb: adding input: " + outlinkDb);
+ LOG.info("NodeDb: adding input: " + inlinkDb);
+ FileInputFormat.addInputPath(nodeJob, outlinkDb);
+ FileInputFormat.addInputPath(nodeJob, inlinkDb);
- nodeJob.setInputFormat(SequenceFileInputFormat.class);
+ nodeJob.setInputFormatClass(SequenceFileInputFormat.class);
nodeJob.setReducerClass(NodeDb.class);
nodeJob.setMapOutputKeyClass(Text.class);
nodeJob.setMapOutputValueClass(LinkDatum.class);
nodeJob.setOutputKeyClass(Text.class);
nodeJob.setOutputValueClass(Node.class);
FileOutputFormat.setOutputPath(nodeJob, tempNodeDb);
- nodeJob.setOutputFormat(MapFileOutputFormat.class);
- nodeJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+ nodeJob.setOutputFormatClass(MapFileOutputFormat.class);
+ nodeJob.getConfiguration().setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
try {
-
// run the node job and replace old nodedb with new
LOG.info("NodeDb: running");
- JobClient.runJob(nodeJob);
+ try {
+ nodeJob.waitForCompletion(true);
+ } catch (Exception e) {
+ throw e;
+ }
LOG.info("NodeDb: installing " + nodeDb);
FSUtils.replace(fs, nodeDb, tempNodeDb, true);
LOG.info("NodeDb: finished");
}
catch (IOException e) {
-
+
// remove lock file and and temporary directory if an error occurs
LockUtil.removeLockFile(fs, lock);
if (fs.exists(tempNodeDb)) {
fs.delete(tempNodeDb, true);
- }
+ }
LOG.error(StringUtils.stringifyException(e));
throw e;
}
@@ -774,8 +760,8 @@
}
}
- // Handle segmentDir option
- if (line.hasOption("segmentDir")) {
+ // Handle segmentDir option
+ if (line.hasOption("segmentDir")) {
Path dir = new Path(line.getOptionValue("segmentDir"));
FileSystem fs = dir.getFileSystem(getConf());
FileStatus[] fstats = fs.listStatus(dir, HadoopFSUtil.getPassDirectoriesFilter(fs));
@@ -784,18 +770,18 @@
boolean normalize = false;
- if (line.hasOption("normalize")) {
+ if (line.hasOption("normalize")) {
normalize = true;
}
boolean filter = false;
- if (line.hasOption("filter")) {
+ if (line.hasOption("filter")) {
filter = true;
}
- createWebGraph(new Path(webGraphDb), segPaths, normalize, filter);
- return 0;
+ createWebGraph(new Path(webGraphDb), segPaths, normalize, filter);
+ return 0;
}
catch (Exception e) {
LOG.error("WebGraph: " + StringUtils.stringifyException(e));