Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/fetcher/Fetcher.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/fetcher/Fetcher.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/fetcher/Fetcher.java (working copy)
@@ -29,8 +29,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.NutchWritable;
@@ -45,7 +44,7 @@
/** The fetcher. Most of the work is done by plugins. */
-public class Fetcher extends ToolBase implements MapRunnable {
+public class Fetcher extends Configured implements Tool, MapRunnable {
public static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -55,7 +54,7 @@
public static final String PROTOCOL_REDIR = "protocol";
- public static class InputFormat extends SequenceFileInputFormat {
+ public static class InputFormat extends SequenceFileInputFormat {
/** Don't split inputs, to keep things polite. */
public InputSplit[] getSplits(JobConf job, int nSplits)
throws IOException {
@@ -63,14 +62,14 @@
FileSystem fs = FileSystem.get(job);
InputSplit[] splits = new InputSplit[files.length];
for (int i = 0; i < files.length; i++) {
- splits[i] = new FileSplit(files[i], 0, fs.getLength(files[i]), job);
+ splits[i] = new FileSplit(files[i], 0, fs.getFileStatus(files[i]).getLen(), job);
}
return splits;
}
}
- private RecordReader input;
- private OutputCollector output;
+ private RecordReader input;
+ private OutputCollector output;
private Reporter reporter;
private String segmentName;
@@ -456,7 +455,7 @@
return conf.getBoolean("fetcher.store.content", true);
}
- public void run(RecordReader input, OutputCollector output,
+ public void run(RecordReader input, OutputCollector output,
Reporter reporter) throws IOException {
this.input = input;
@@ -530,7 +529,7 @@
/** Run the fetcher. */
public static void main(String[] args) throws Exception {
- int res = new Fetcher().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new Fetcher(), args);
System.exit(res);
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/DeleteDuplicates.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/DeleteDuplicates.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/DeleteDuplicates.java (working copy)
@@ -28,9 +28,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
@@ -63,8 +61,8 @@
*
* @author Andrzej Bialecki
*/
-public class DeleteDuplicates extends ToolBase
- implements Mapper, Reducer, OutputFormat {
+public class DeleteDuplicates extends Configured
+ implements Tool, Mapper, Reducer, OutputFormat {
private static final Log LOG = LogFactory.getLog(DeleteDuplicates.class);
// Algorithm:
@@ -141,7 +139,7 @@
}
- public static class InputFormat extends InputFormatBase {
+ public static class InputFormat extends FileInputFormat {
private static final long INDEX_LENGTH = Integer.MAX_VALUE;
/** Return each index as a split. */
@@ -155,7 +153,7 @@
return splits;
}
- public class DDRecordReader implements RecordReader {
+ public class DDRecordReader implements RecordReader {
private IndexReader indexReader;
private int maxDoc = 0;
@@ -174,7 +172,7 @@
this.index = index;
}
- public boolean next(WritableComparable key, Writable value)
+ public boolean next(Text key, IndexDoc indexDoc)
throws IOException {
// skip empty indexes
@@ -189,9 +187,8 @@
Document document = indexReader.document(doc);
// fill in key
- ((Text)key).set(document.get("url"));
+ key.set(document.get("url"));
// fill in value
- IndexDoc indexDoc = (IndexDoc)value;
indexDoc.keep = true;
indexDoc.url.set(document.get("url"));
indexDoc.hash.setDigest(document.get("digest"));
@@ -226,11 +223,11 @@
indexReader.close();
}
- public WritableComparable createKey() {
+ public Text createKey() {
return new Text();
}
- public Writable createValue() {
+ public IndexDoc createValue() {
return new IndexDoc();
}
@@ -240,7 +237,7 @@
}
/** Return each index as a split. */
- public RecordReader getRecordReader(InputSplit split,
+ public RecordReader getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException {
FileSplit fsplit = (FileSplit)split;
@@ -250,27 +247,27 @@
}
}
- public static class HashPartitioner implements Partitioner {
+ public static class HashPartitioner implements Partitioner {
public void configure(JobConf job) {}
public void close() {}
- public int getPartition(WritableComparable key, Writable value,
+ public int getPartition(MD5Hash key, Writable value,
int numReduceTasks) {
- int hashCode = ((MD5Hash)key).hashCode();
+ int hashCode = key.hashCode();
return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
}
}
- public static class UrlsReducer implements Reducer {
+ public static class UrlsReducer implements Reducer {
public void configure(JobConf job) {}
public void close() {}
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter) throws IOException {
IndexDoc latest = null;
while (values.hasNext()) {
- IndexDoc value = (IndexDoc)values.next();
+ IndexDoc value = values.next();
if (latest == null) {
latest = value;
continue;
@@ -296,7 +293,7 @@
}
}
- public static class HashReducer implements Reducer {
+ public static class HashReducer implements Reducer {
boolean byScore;
public void configure(JobConf job) {
@@ -304,12 +301,12 @@
}
public void close() {}
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter)
+ public void reduce(MD5Hash key, Iterator values,
+ OutputCollector output, Reporter reporter)
throws IOException {
IndexDoc highest = null;
while (values.hasNext()) {
- IndexDoc value = (IndexDoc)values.next();
+ IndexDoc value = values.next();
// skip already deleted
if (!value.keep) {
LOG.debug("-discard " + value + " (already marked)");
@@ -355,7 +352,7 @@
public void setConf(Configuration conf) {
super.setConf(conf);
try {
- fs = FileSystem.get(conf);
+ if(conf != null) fs = FileSystem.get(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -365,7 +362,7 @@
/** Map [*,IndexDoc] pairs to [index,doc] pairs. */
public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter)
+ OutputCollector output, Reporter reporter)
throws IOException {
IndexDoc indexDoc = (IndexDoc)value;
// don't delete these
@@ -375,14 +372,14 @@
}
/** Delete docs named in values from index named in key. */
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter)
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
throws IOException {
Path index = new Path(key.toString());
IndexReader reader = IndexReader.open(new FsDirectory(fs, index, false, getConf()));
try {
while (values.hasNext()) {
- IntWritable value = (IntWritable)values.next();
+ IntWritable value = values.next();
LOG.debug("-delete " + index + " doc=" + value);
reader.deleteDocument(value.get());
}
@@ -392,11 +389,11 @@
}
/** Write nothing. */
- public RecordWriter getRecordWriter(final FileSystem fs,
+ public RecordWriter getRecordWriter(final FileSystem fs,
final JobConf job,
final String name,
final Progressable progress) throws IOException {
- return new RecordWriter() {
+ return new RecordWriter() {
public void write(WritableComparable key, Writable value)
throws IOException {
throw new UnsupportedOperationException();
@@ -496,7 +493,7 @@
}
public static void main(String[] args) throws Exception {
- int res = new DeleteDuplicates().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new DeleteDuplicates(), args);
System.exit(res);
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexSorter.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexSorter.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexSorter.java (working copy)
@@ -32,12 +32,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.*;
/** Sort a Nutch index by page score. Higher scoring documents are assigned
* smaller document numbers. */
-public class IndexSorter extends ToolBase {
+public class IndexSorter extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(IndexSorter.class);
private static class PostingMap implements Comparable {
@@ -296,7 +296,7 @@
/** */
public static void main(String[] args) throws Exception {
- int res = new IndexSorter().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new IndexSorter(), args);
System.exit(res);
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexMerger.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexMerger.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/IndexMerger.java (working copy)
@@ -25,8 +25,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
import org.apache.hadoop.conf.*;
import org.apache.nutch.util.LogUtil;
@@ -42,7 +41,7 @@
* @author Doug Cutting
* @author Mike Cafarella
*************************************************************************/
-public class IndexMerger extends ToolBase {
+public class IndexMerger extends Configured implements Tool {
public static final Log LOG = LogFactory.getLog(IndexMerger.class);
public static final String DONE_NAME = "merge.done";
@@ -80,17 +79,17 @@
Directory[] dirs = new Directory[indexes.length];
for (int i = 0; i < indexes.length; i++) {
if (LOG.isInfoEnabled()) { LOG.info("Adding " + indexes[i]); }
- dirs[i] = new FsDirectory(fs, indexes[i], false, this.conf);
+ dirs[i] = new FsDirectory(fs, indexes[i], false, getConf());
}
//
// Merge indices
//
IndexWriter writer = new IndexWriter(localOutput.toString(), null, true);
- writer.setMergeFactor(conf.getInt("indexer.mergeFactor", IndexWriter.DEFAULT_MERGE_FACTOR));
- writer.setMaxBufferedDocs(conf.getInt("indexer.minMergeDocs", IndexWriter.DEFAULT_MAX_BUFFERED_DOCS));
- writer.setMaxMergeDocs(conf.getInt("indexer.maxMergeDocs", IndexWriter.DEFAULT_MAX_MERGE_DOCS));
- writer.setTermIndexInterval(conf.getInt("indexer.termIndexInterval", IndexWriter.DEFAULT_TERM_INDEX_INTERVAL));
+ writer.setMergeFactor(getConf().getInt("indexer.mergeFactor", IndexWriter.DEFAULT_MERGE_FACTOR));
+ writer.setMaxBufferedDocs(getConf().getInt("indexer.minMergeDocs", IndexWriter.DEFAULT_MAX_BUFFERED_DOCS));
+ writer.setMaxMergeDocs(getConf().getInt("indexer.maxMergeDocs", IndexWriter.DEFAULT_MAX_MERGE_DOCS));
+ writer.setTermIndexInterval(getConf().getInt("indexer.termIndexInterval", IndexWriter.DEFAULT_TERM_INDEX_INTERVAL));
writer.setInfoStream(LogUtil.getDebugStream(LOG));
writer.setUseCompoundFile(false);
writer.setSimilarity(new NutchSimilarity());
@@ -108,7 +107,7 @@
* Create an index for the input files in the named directory.
*/
public static void main(String[] args) throws Exception {
- int res = new IndexMerger().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new IndexMerger(), args);
System.exit(res);
}
@@ -122,7 +121,7 @@
//
// Parse args, read all index directories to be processed
//
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(getConf());
List indexDirs = new ArrayList();
Path workDir = new Path("indexmerger-" + System.currentTimeMillis());
@@ -151,7 +150,7 @@
LOG.fatal("IndexMerger: " + StringUtils.stringifyException(e));
return -1;
} finally {
- FileSystem.getLocal(conf).delete(workDir);
+ FileSystem.getLocal(getConf()).delete(workDir);
}
}
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/Indexer.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/Indexer.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/indexer/Indexer.java (working copy)
@@ -27,9 +27,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
import org.apache.nutch.parse.*;
import org.apache.nutch.analysis.*;
@@ -51,7 +49,7 @@
import org.apache.nutch.metadata.Nutch;
/** Create indexes for segments. */
-public class Indexer extends ToolBase implements Reducer, Mapper {
+public class Indexer extends Configured implements Tool, Reducer, Mapper {
public static final String DONE_NAME = "index.done";
@@ -85,8 +83,8 @@
/** Unwrap Lucene Documents created by reduce and add them to an index. */
public static class OutputFormat
- extends org.apache.hadoop.mapred.OutputFormatBase {
- public RecordWriter getRecordWriter(final FileSystem fs, JobConf job,
+ extends org.apache.hadoop.mapred.OutputFormatBase {
+ public RecordWriter getRecordWriter(final FileSystem fs, JobConf job,
String name, final Progressable progress) throws IOException {
final Path perm = new Path(job.getOutputPath(), name);
final Path temp =
@@ -109,12 +107,12 @@
writer.setUseCompoundFile(false);
writer.setSimilarity(new NutchSimilarity());
- return new RecordWriter() {
+ return new RecordWriter() {
boolean closed;
- public void write(WritableComparable key, Writable value)
+ public void write(WritableComparable key, LuceneDocumentWrapper value)
throws IOException { // unwrap & index doc
- Document doc = ((LuceneDocumentWrapper) value).get();
+ Document doc = value.get();
NutchAnalyzer analyzer = factory.get(doc.get("lang"));
if (LOG.isInfoEnabled()) {
LOG.info(" Indexing [" + doc.getField("url").stringValue() + "]" +
@@ -174,8 +172,8 @@
public void close() {}
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter)
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
throws IOException {
Inlinks inlinks = null;
CrawlDatum dbDatum = null;
@@ -183,7 +181,7 @@
ParseData parseData = null;
ParseText parseText = null;
while (values.hasNext()) {
- Writable value = ((NutchWritable)values.next()).get(); // unwrap
+ Writable value = values.next().get(); // unwrap
if (value instanceof Inlinks) {
inlinks = (Inlinks)value;
} else if (value instanceof CrawlDatum) {
@@ -248,7 +246,7 @@
fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);
}
// run indexing filters
- doc = this.filters.filter(doc, parse, (Text)key, fetchDatum, inlinks);
+ doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);
} catch (IndexingException e) {
if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); }
return;
@@ -315,7 +313,7 @@
}
public static void main(String[] args) throws Exception {
- int res = new Indexer().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new Indexer(), args);
System.exit(res);
}
@@ -341,8 +339,8 @@
}
}
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter) throws IOException {
+ public void map(Text key, Writable value,
+ OutputCollector output, Reporter reporter) throws IOException {
output.collect(key, new NutchWritable(value));
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java (working copy)
@@ -24,18 +24,18 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.NutchWritable;
import org.apache.nutch.crawl.SignatureFactory;
@@ -65,11 +65,10 @@
* Arc files are tars of compressed gzips which are produced by both the
* internet archive project and the grub distributed crawler project.
*
- * TODO: This class needs to be changed to use ToolRunner instead of ToolBase.
*/
public class ArcSegmentCreator
- extends ToolBase
- implements Mapper {
+ extends Configured
+ implements Tool, Mapper {
public static final Log LOG = LogFactory.getLog(ArcSegmentCreator.class);
public static final String URL_VERSION = "arc.url.version";
@@ -145,7 +144,7 @@
*
* @return The result of the parse in a ParseStatus object.
*/
- private ParseStatus output(OutputCollector output, String segmentName,
+ private ParseStatus output(OutputCollector output, String segmentName,
Text key, CrawlDatum datum, Content content, ProtocolStatus pstatus,
int status) {
@@ -184,7 +183,7 @@
// set the content signature
if (parseResult == null) {
byte[] signature = SignatureFactory.getSignature(getConf()).calculate(
- content, new ParseStatus().getEmptyParse(conf));
+ content, new ParseStatus().getEmptyParse(getConf()));
datum.setSignature(signature);
}
@@ -266,12 +265,12 @@
* segments.
*
* @param key The arc record header.
- * @param value The arc record raw content bytes.
+ * @param bytes The arc record raw content bytes.
* @param output The output collecter.
* @param reporter The progress reporter.
*/
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter)
+ public void map(Text key, BytesWritable bytes,
+ OutputCollector output, Reporter reporter)
throws IOException {
String[] headers = key.toString().split("\\s+");
@@ -289,7 +288,6 @@
// get the raw bytes from the arc file, create a new crawldatum
Text url = new Text();
- BytesWritable bytes = (BytesWritable)value;
CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_DB_FETCHED, interval,
1.0f);
String segmentName = getConf().get(Nutch.SEGMENT_NAME_KEY);
@@ -371,7 +369,7 @@
public static void main(String args[])
throws Exception {
- int res = new ArcSegmentCreator().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new ArcSegmentCreator(), args);
System.exit(res);
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/FreeGenerator.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/FreeGenerator.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/tools/FreeGenerator.java (working copy)
@@ -23,8 +23,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@@ -36,7 +36,8 @@
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.Generator;
import org.apache.nutch.crawl.PartitionUrlByHost;
@@ -53,13 +54,13 @@
*
* @author Andrzej Bialecki
*/
-public class FreeGenerator extends ToolBase {
+public class FreeGenerator extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(FreeGenerator.class);
private static final String FILTER_KEY = "free.generator.filter";
private static final String NORMALIZE_KEY = "free.generator.normalize";
- public static class FG extends MapReduceBase implements Mapper, Reducer {
+ public static class FG extends MapReduceBase implements Mapper, Reducer {
private URLNormalizers normalizers = null;
private URLFilters filters = null;
private ScoringFilters scfilters;
@@ -78,7 +79,7 @@
}
}
- public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+ public void map(WritableComparable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
// value is a line of text
String urlString = value.toString();
try {
@@ -105,9 +106,9 @@
output.collect(url, datum);
}
- public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
// pick just one (discard duplicates)
- output.collect(key, (Writable)values.next());
+ output.collect(key, values.next());
}
}
@@ -161,7 +162,7 @@
}
public static void main(String[] args) throws Exception {
- int res = new FreeGenerator().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new FreeGenerator(), args);
System.exit(res);
}
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbReader.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbReader.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbReader.java (working copy)
@@ -36,8 +36,6 @@
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapFileOutputFormat;
@@ -83,13 +81,12 @@
}
}
- public static class CrawlDbStatMapper implements Mapper {
+ public static class CrawlDbStatMapper implements Mapper {
LongWritable COUNT_1 = new LongWritable(1);
public void configure(JobConf job) {}
public void close() {}
- public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter)
+ public void map(Text key, CrawlDatum cd, OutputCollector output, Reporter reporter)
throws IOException {
- CrawlDatum cd = (CrawlDatum) value;
output.collect(new Text("T"), COUNT_1);
output.collect(new Text("status " + cd.getStatus()), COUNT_1);
output.collect(new Text("retry " + cd.getRetriesSinceFetch()), COUNT_1);
@@ -97,19 +94,19 @@
}
}
- public static class CrawlDbStatCombiner implements Reducer {
+ public static class CrawlDbStatCombiner implements Reducer {
LongWritable val = new LongWritable();
public CrawlDbStatCombiner() { }
public void configure(JobConf job) { }
public void close() {}
- public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+ public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter)
throws IOException {
val.set(0L);
- String k = ((Text)key).toString();
+ String k = key.toString();
if (!k.equals("s")) {
while (values.hasNext()) {
- LongWritable cnt = (LongWritable)values.next();
+ LongWritable cnt = values.next();
val.set(val.get() + cnt.get());
}
output.collect(key, val);
@@ -130,66 +127,54 @@
}
}
- public static class CrawlDbStatReducer implements Reducer {
+ public static class CrawlDbStatReducer implements Reducer {
public void configure(JobConf job) {}
public void close() {}
- public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+ public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter)
throws IOException {
- String k = ((Text) key).toString();
+ String k = key.toString();
if (k.equals("T")) {
// sum all values for this key
long sum = 0;
while (values.hasNext()) {
- sum += ((LongWritable) values.next()).get();
+ sum += values.next().get();
}
// output sum
output.collect(key, new LongWritable(sum));
} else if (k.startsWith("status") || k.startsWith("retry")) {
LongWritable cnt = new LongWritable();
while (values.hasNext()) {
- LongWritable val = (LongWritable)values.next();
+ LongWritable val = values.next();
cnt.set(cnt.get() + val.get());
}
output.collect(key, cnt);
} else if (k.equals("scx")) {
LongWritable cnt = new LongWritable(Long.MIN_VALUE);
while (values.hasNext()) {
- LongWritable val = (LongWritable)values.next();
+ LongWritable val = values.next();
if (cnt.get() < val.get()) cnt.set(val.get());
}
output.collect(key, cnt);
} else if (k.equals("scn")) {
LongWritable cnt = new LongWritable(Long.MAX_VALUE);
while (values.hasNext()) {
- LongWritable val = (LongWritable)values.next();
+ LongWritable val = values.next();
if (cnt.get() > val.get()) cnt.set(val.get());
}
output.collect(key, cnt);
} else if (k.equals("sct")) {
LongWritable cnt = new LongWritable();
while (values.hasNext()) {
- LongWritable val = (LongWritable)values.next();
+ LongWritable val = values.next();
cnt.set(cnt.get() + val.get());
}
output.collect(key, cnt);
}
}
}
-
- public static class CrawlDbDumpReducer implements Reducer {
-
- public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
- while (values.hasNext()) {
- output.collect(key, (Writable)values.next());
- }
- }
-
- public void configure(JobConf job) {}
- public void close() {}
- }
-
- public static class CrawlDbTopNMapper implements Mapper {
+
+ public static class CrawlDbTopNMapper implements Mapper {
private static final FloatWritable fw = new FloatWritable();
private float min = 0.0f;
@@ -200,24 +185,23 @@
}
}
public void close() {}
- public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter)
+ public void map(Text key, CrawlDatum datum, OutputCollector output, Reporter reporter)
throws IOException {
- CrawlDatum datum = (CrawlDatum)value;
if (datum.getScore() < min) return; // don't collect low-scoring records
fw.set(-datum.getScore()); // reverse sorting order
output.collect(fw, key); // invert mapping: score -> url
}
}
- public static class CrawlDbTopNReducer implements Reducer {
+ public static class CrawlDbTopNReducer implements Reducer {
private long topN;
private long count = 0L;
- public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+ public void reduce(FloatWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
while (values.hasNext() && count < topN) {
FloatWritable fw = (FloatWritable)key;
fw.set(-fw.get());
- output.collect(fw, (Writable)values.next());
+ output.collect(fw, values.next());
count++;
}
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDb.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDb.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDb.java (working copy)
@@ -30,8 +30,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
@@ -41,7 +40,7 @@
import org.apache.nutch.util.NutchJob;
/** Maintains an inverted link map, listing incoming links for each url. */
-public class LinkDb extends ToolBase implements Mapper {
+public class LinkDb extends Configured implements Tool, Mapper {
public static final Log LOG = LogFactory.getLog(LinkDb.class);
@@ -53,9 +52,7 @@
private URLFilters urlFilters;
private URLNormalizers urlNormalizers;
- public LinkDb() {
-
- }
+ public LinkDb() {}
public LinkDb(Configuration conf) {
setConf(conf);
@@ -74,8 +71,8 @@
public void close() {}
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter)
+ public void map(Text key, ParseData parseData,
+ OutputCollector output, Reporter reporter)
throws IOException {
String fromUrl = key.toString();
String fromHost = getHost(fromUrl);
@@ -96,7 +93,6 @@
}
}
if (fromUrl == null) return; // discard all outlinks
- ParseData parseData = (ParseData)value;
Outlink[] outlinks = parseData.getOutlinks();
Inlinks inlinks = new Inlinks();
for (int i = 0; i < outlinks.length; i++) {
@@ -149,7 +145,7 @@
Path[] files = fs.listPaths(segmentsDir, new PathFilter() {
public boolean accept(Path f) {
try {
- if (fs.isDirectory(f)) return true;
+ if (fs.getFileStatus(f).isDir()) return true;
} catch (IOException ioe) {};
return false;
}
@@ -255,7 +251,7 @@
}
public static void main(String[] args) throws Exception {
- int res = new LinkDb().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new LinkDb(), args);
System.exit(res);
}
@@ -271,7 +267,7 @@
return -1;
}
Path segDir = null;
- final FileSystem fs = FileSystem.get(conf);
+ final FileSystem fs = FileSystem.get(getConf());
Path db = new Path(args[0]);
ArrayList segs = new ArrayList();
boolean filter = true;
@@ -283,7 +279,7 @@
Path[] files = fs.listPaths(segDir, new PathFilter() {
public boolean accept(Path f) {
try {
- if (fs.isDirectory(f)) return true;
+ if (fs.getFileStatus(f).isDir()) return true;
} catch (IOException ioe) {};
return false;
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbMerger.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbMerger.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbMerger.java (working copy)
@@ -24,10 +24,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapFileOutputFormat;
@@ -36,7 +36,8 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
@@ -58,7 +59,7 @@
*
* @author Andrzej Bialecki
*/
-public class LinkDbMerger extends ToolBase implements Reducer {
+public class LinkDbMerger extends Configured implements Tool, Reducer {
private static final Log LOG = LogFactory.getLog(LinkDbMerger.class);
private int maxInlinks;
@@ -71,12 +72,12 @@
setConf(conf);
}
- public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
Inlinks result = new Inlinks();
while (values.hasNext()) {
- Inlinks inlinks = (Inlinks)values.next();
+ Inlinks inlinks = values.next();
int end = Math.min(maxInlinks - result.size(), inlinks.size());
Iterator it = inlinks.iterator();
@@ -135,7 +136,7 @@
* @param args
*/
public static void main(String[] args) throws Exception {
- int res = new LinkDbMerger().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new LinkDbMerger(), args);
System.exit(res);
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Injector.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Injector.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Injector.java (working copy)
@@ -28,8 +28,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
import org.apache.nutch.net.*;
import org.apache.nutch.scoring.ScoringFilterException;
@@ -39,12 +38,12 @@
/** This class takes a flat file of URLs and adds them to the of pages to be
* crawled. Useful for bootstrapping the system. */
-public class Injector extends ToolBase {
+public class Injector extends Configured implements Tool {
public static final Log LOG = LogFactory.getLog(Injector.class);
/** Normalize and filter injected urls. */
- public static class InjectMapper implements Mapper {
+ public static class InjectMapper implements Mapper {
private URLNormalizers urlNormalizers;
private int interval;
private float scoreInjected;
@@ -65,12 +64,10 @@
public void close() {}
- public void map(WritableComparable key, Writable val,
- OutputCollector output, Reporter reporter)
+ public void map(WritableComparable key, Text value,
+ OutputCollector output, Reporter reporter)
throws IOException {
- Text value = (Text)val;
String url = value.toString(); // value is line of text
- // System.out.println("url: " +url);
try {
url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
url = filters.filter(url); // filter the url
@@ -98,17 +95,17 @@
}
/** Combine multiple new entries for a url. */
- public static class InjectReducer implements Reducer {
+ public static class InjectReducer implements Reducer {
public void configure(JobConf job) {}
public void close() {}
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter)
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
throws IOException {
CrawlDatum old = null;
CrawlDatum injected = null;
while (values.hasNext()) {
- CrawlDatum val = (CrawlDatum)values.next();
+ CrawlDatum val = values.next();
if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
injected = val;
injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
@@ -124,9 +121,7 @@
}
}
- public Injector() {
-
- }
+ public Injector() {}
public Injector(Configuration conf) {
setConf(conf);
@@ -179,7 +174,7 @@
}
public static void main(String[] args) throws Exception {
- int res = new Injector().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args);
System.exit(res);
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDb.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDb.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDb.java (working copy)
@@ -28,8 +28,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
import org.apache.nutch.util.LockUtil;
import org.apache.nutch.util.NutchConfiguration;
@@ -39,7 +38,7 @@
* This class takes the output of the fetcher and updates the
* crawldb accordingly.
*/
-public class CrawlDb extends ToolBase {
+public class CrawlDb extends Configured implements Tool {
public static final Log LOG = LogFactory.getLog(CrawlDb.class);
public static final String CRAWLDB_ADDITIONS_ALLOWED = "db.update.additions.allowed";
@@ -47,11 +46,8 @@
public static final String CURRENT_NAME = "current";
public static final String LOCK_NAME = ".locked";
-
- public CrawlDb() {
-
- }
+ public CrawlDb() {}
public CrawlDb(Configuration conf) {
setConf(conf);
@@ -149,7 +145,7 @@
}
public static void main(String[] args) throws Exception {
- int res = new CrawlDb().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDb(), args);
System.exit(res);
}
@@ -184,7 +180,7 @@
Path[] paths = fs.listPaths(new Path(args[++i]), new PathFilter() {
public boolean accept(Path dir) {
try {
- return fs.isDirectory(dir);
+ return fs.getFileStatus(dir).isDir();
} catch (IOException ioe) {
return false;
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbMerger.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/CrawlDbMerger.java (working copy)
@@ -28,10 +28,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.conf.*;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
@@ -50,10 +49,10 @@
*
* @author Andrzej Bialecki
*/
-public class CrawlDbMerger extends ToolBase {
+public class CrawlDbMerger extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(CrawlDbMerger.class);
- public static class Merger extends MapReduceBase implements Reducer {
+ public static class Merger extends MapReduceBase implements Reducer {
MapWritable meta = new MapWritable();
private FetchSchedule schedule;
@@ -63,13 +62,13 @@
schedule = FetchScheduleFactory.getFetchSchedule(conf);
}
- public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+ public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter)
throws IOException {
CrawlDatum res = null;
long resTime = 0L;
meta.clear();
while (values.hasNext()) {
- CrawlDatum val = (CrawlDatum) values.next();
+ CrawlDatum val = values.next();
if (res == null) {
res = val;
resTime = schedule.calculateLastFetchTime(res);
@@ -138,7 +137,7 @@
* @param args
*/
public static void main(String[] args) throws Exception {
- int res = new CrawlDbMerger().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDbMerger(), args);
System.exit(res);
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java (working copy)
@@ -27,7 +27,7 @@
import org.apache.nutch.net.URLNormalizers;
/** Partition urls by hostname. */
-public class PartitionUrlByHost implements Partitioner {
+public class PartitionUrlByHost implements Partitioner {
private static final Log LOG = LogFactory.getLog(PartitionUrlByHost.class);
private int seed;
@@ -41,9 +41,9 @@
public void close() {}
/** Hash by hostname. */
- public int getPartition(WritableComparable key, Writable value,
+ public int getPartition(Text key, Writable value,
int numReduceTasks) {
- String urlString = ((Text)key).toString();
+ String urlString = key.toString();
try {
urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_PARTITION);
} catch (Exception e) {
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Generator.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Generator.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/Generator.java (working copy)
@@ -29,8 +29,7 @@
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,7 +44,7 @@
import org.apache.nutch.util.NutchJob;
/** Generates a subset of a crawl db to fetch. */
-public class Generator extends ToolBase {
+public class Generator extends Configured implements Tool {
public static final String CRAWL_GENERATE_FILTER = "crawl.generate.filter";
public static final String GENERATE_MAX_PER_HOST_BY_IP = "generate.max.per.host.by.ip";
@@ -81,7 +80,7 @@
}
/** Selects entries due for fetch. */
- public static class Selector implements Mapper, Partitioner, Reducer {
+ public static class Selector implements Mapper, Partitioner, Reducer {
private LongWritable genTime = new LongWritable(System.currentTimeMillis());
private long curTime;
private long limit;
@@ -89,7 +88,7 @@
private HashMap hostCounts =
new HashMap();
private int maxPerHost;
- private Partitioner hostPartitioner = new PartitionUrlByHost();
+ private Partitioner hostPartitioner = new PartitionUrlByHost();
private URLFilters filters;
private URLNormalizers normalizers;
private ScoringFilters scfilters;
@@ -120,10 +119,10 @@
public void close() {}
/** Select & invert subset due for fetch. */
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter)
+ public void map(Text key, CrawlDatum value,
+ OutputCollector output, Reporter reporter)
throws IOException {
- Text url = (Text)key;
+ Text url = key;
if (filter) {
// If filtering is on don't generate URLs that don't pass URLFilters
try {
@@ -136,7 +135,7 @@
}
}
}
- CrawlDatum crawlDatum = (CrawlDatum)value;
+ CrawlDatum crawlDatum = value;
// check fetch schedule
if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
@@ -167,20 +166,20 @@
}
/** Partition by host. */
- public int getPartition(WritableComparable key, Writable value,
+ public int getPartition(FloatWritable key, Writable value,
int numReduceTasks) {
return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
numReduceTasks);
}
/** Collect until limit is reached. */
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter)
+ public void reduce(FloatWritable key, Iterator values,
+ OutputCollector output, Reporter reporter)
throws IOException {
while (values.hasNext() && count < limit) {
- SelectorEntry entry = (SelectorEntry)values.next();
+ SelectorEntry entry = values.next();
Text url = entry.url;
if (maxPerHost > 0) { // are we counting hosts?
@@ -263,9 +262,9 @@
}
}
- public static class SelectorInverseMapper extends MapReduceBase implements Mapper {
+ public static class SelectorInverseMapper extends MapReduceBase implements Mapper {
- public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+ public void map(FloatWritable key, SelectorEntry value, OutputCollector output, Reporter reporter) throws IOException {
SelectorEntry entry = (SelectorEntry)value;
output.collect(entry.url, entry.datum);
}
@@ -304,27 +303,27 @@
/**
* Update the CrawlDB so that the next generate won't include the same URLs.
*/
- public static class CrawlDbUpdater extends MapReduceBase implements Mapper, Reducer {
+ public static class CrawlDbUpdater extends MapReduceBase implements Mapper, Reducer {
long generateTime;
public void configure(JobConf job) {
generateTime = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
}
- public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+ public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
if (key instanceof FloatWritable) { // tempDir source
SelectorEntry se = (SelectorEntry)value;
output.collect(se.url, se.datum);
} else {
- output.collect(key, value);
+ output.collect((Text)key, (CrawlDatum)value);
}
}
- public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
CrawlDatum orig = null;
LongWritable genTime = null;
while (values.hasNext()) {
- CrawlDatum val = (CrawlDatum)values.next();
+ CrawlDatum val = values.next();
if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
genTime = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
if (genTime.get() != generateTime) {
@@ -340,13 +339,10 @@
orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
}
output.collect(key, orig);
- }
-
+ }
}
- public Generator() {
-
- }
+ public Generator() {}
public Generator(Configuration conf) {
setConf(conf);
@@ -523,7 +519,7 @@
* Generate a fetchlist from the crawldb.
*/
public static void main(String args[]) throws Exception {
- int res = new Generator().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new Generator(), args);
System.exit(res);
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbReader.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbReader.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/crawl/LinkDbReader.java (working copy)
@@ -23,12 +23,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.HashPartitioner;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.util.NutchConfiguration;
@@ -37,10 +37,10 @@
import java.util.Iterator;
/** . */
-public class LinkDbReader extends ToolBase implements Closeable {
+public class LinkDbReader extends Configured implements Tool, Closeable {
public static final Log LOG = LogFactory.getLog(LinkDbReader.class);
- private static final Partitioner PARTITIONER = new HashPartitioner();
+ private static final Partitioner PARTITIONER = new HashPartitioner();
private FileSystem fs;
private Path directory;
@@ -111,7 +111,7 @@
}
public static void main(String[] args) throws Exception {
- int res = new LinkDbReader().doMain(NutchConfiguration.create(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new LinkDbReader(), args);
System.exit(res);
}
Index: C:/Workarea/workspace/nut/src/java/org/apache/nutch/parse/ParseSegment.java
===================================================================
--- C:/Workarea/workspace/nut/src/java/org/apache/nutch/parse/ParseSegment.java (revision 609508)
+++ C:/Workarea/workspace/nut/src/java/org/apache/nutch/parse/ParseSegment.java (working copy)
@@ -23,7 +23,7 @@
import org.apache.nutch.crawl.SignatureFactory;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.*;
import org.apache.hadoop.conf.*;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.protocol.*;
@@ -37,7 +37,7 @@
import java.util.Map.Entry;
/* Parse content in a segment. */
-public class ParseSegment extends Configured implements Mapper, Reducer {
+public class ParseSegment extends Configured implements Tool, Mapper, Reducer {
public static final Log LOG = LogFactory.getLog(Parser.class);
@@ -60,15 +60,14 @@
private Text newKey = new Text();
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter)
+ public void map(WritableComparable key, Content content,
+ OutputCollector output, Reporter reporter)
throws IOException {
// convert on the fly from old UTF8 keys
if (key instanceof UTF8) {
newKey.set(key.toString());
key = newKey;
}
- Content content = (Content) value;
ParseResult parseResult = null;
try {
@@ -111,8 +110,8 @@
}
}
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter)
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
throws IOException {
output.collect(key, (Writable)values.next()); // collect first value
}
@@ -144,6 +143,11 @@
public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new ParseSegment(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
Path segment;
String usage = "Usage: ParseSegment segment";
@@ -151,11 +155,9 @@
if (args.length == 0) {
System.err.println(usage);
System.exit(-1);
- }
-
+ }
segment = new Path(args[0]);
-
- ParseSegment parseSegment = new ParseSegment(NutchConfiguration.create());
- parseSegment.parse(segment);
+ parse(segment);
+ return 0;
}
}