Index: ivy/ivy.xml
===================================================================
--- ivy/ivy.xml (revision 1328229)
+++ ivy/ivy.xml (working copy)
@@ -68,6 +68,8 @@
+
+
Index: conf/gora-hbase-mapping.xml
===================================================================
--- conf/gora-hbase-mapping.xml (revision 1328229)
+++ conf/gora-hbase-mapping.xml (working copy)
@@ -58,4 +58,16 @@
+
+
+
+
+
+
+
+
Index: build.xml
===================================================================
--- build.xml (revision 1328229)
+++ build.xml (working copy)
@@ -487,6 +487,20 @@
It could not be loaded from ${ivy.repo.url}
+
+
+
+
+
+
+
+
+
+
+
+
Index: src/gora/host.avsc
===================================================================
--- src/gora/host.avsc (revision 0)
+++ src/gora/host.avsc (revision 0)
@@ -0,0 +1,9 @@
+{"name": "Host",
+ "type": "record",
+ "namespace": "org.apache.nutch.storage",
+ "fields": [
+ {"name": "metadata", "type": {"type": "map", "values": "bytes"}},
+ {"name": "outlinks", "type": {"type": "map", "values": "string"}},
+ {"name": "inlinks", "type": {"type": "map", "values": "string"}}
+ ]
+}
Index: src/java/org/apache/nutch/util/domain/DomainStatistics.java
===================================================================
--- src/java/org/apache/nutch/util/domain/DomainStatistics.java (revision 1328229)
+++ src/java/org/apache/nutch/util/domain/DomainStatistics.java (working copy)
@@ -103,7 +103,7 @@
mode = MODE_SUFFIX;
job.getConfiguration().setInt("domain.statistics.mode", mode);
- DataStore store = StorageUtils.createDataStore(
+ DataStore store = StorageUtils.createWebStore(
job.getConfiguration(), String.class, WebPage.class);
Query query = store.newQuery();
Index: src/java/org/apache/nutch/util/Histogram.java
===================================================================
--- src/java/org/apache/nutch/util/Histogram.java (revision 0)
+++ src/java/org/apache/nutch/util/Histogram.java (revision 0)
@@ -0,0 +1,129 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.util;
+
+import java.util.*;
+
+public class Histogram {
+
+ private Map map = new HashMap();
+ private float totalValue = 0;
+ private int totalCount = 0;
+
+ public void add(E x) {
+ add(x, 1);
+ }
+
+ public void add(E x, float value) {
+ HistogramEntry entry;
+ if (map.containsKey(x)) {
+ entry = map.get(x);
+ entry.value += value;
+ entry.count++;
+ } else {
+ entry = new HistogramEntry();
+ entry.value = value;
+ entry.count = 1;
+ map.put(x, entry);
+ }
+ totalValue += value;
+ totalCount += 1;
+ }
+
+ public Set getKeys() {
+ return map.keySet();
+ }
+
+ public float getValue(E x) {
+ return map.get(x).value;
+ }
+
+ public int getCount(E x) {
+ return map.get(x).count;
+ }
+
+ public void add(Histogram other) {
+ for (E x : other.getKeys()) {
+ add(x, other.getValue(x));
+ }
+ }
+
+ public Histogram normalize() {
+ Histogram normalized = new Histogram();
+ Set keys = getKeys();
+ for (E x : keys) {
+ normalized.add(x, getValue(x) / totalValue);
+ }
+ return normalized;
+ }
+
+ public List sortInverseByValue() {
+ List> list =
+ new Vector>(map.entrySet());
+
+ // Sort the list using an annonymous inner class implementing Comparator for
+ // the compare method
+ java.util.Collections.sort(list,
+ new Comparator>() {
+ public int compare(Map.Entry entry,
+ Map.Entry entry1) {
+ return (entry.getValue().equals(entry1.getValue()) ? 0 : (entry
+ .getValue().value < entry1.getValue().value ? 1 : -1));
+ }
+ });
+ List list2 = new Vector();
+ for (Map.Entry entry : list) {
+ list2.add(entry.getKey());
+ }
+ return list2;
+ }
+
+ public List sortByValue() {
+ List> list =
+ new Vector>(map.entrySet());
+
+ // Sort the list using an annonymous inner class implementing Comparator for
+ // the compare method
+ java.util.Collections.sort(list,
+ new Comparator>() {
+ public int compare(Map.Entry entry,
+ Map.Entry entry1) {
+ return (entry.getValue().equals(entry1.getValue()) ? 0 : (entry
+ .getValue().value > entry1.getValue().value ? 1 : -1));
+ }
+ });
+ List list2 = new Vector();
+ for (Map.Entry entry : list) {
+ list2.add(entry.getKey());
+ }
+ return list2;
+ }
+
+ public String toString(List items) {
+ StringBuilder strBuilder = new StringBuilder();
+ for (E item : items) {
+ strBuilder.append(map.get(item).value).append(",").append(item)
+ .append("\n");
+ }
+ return strBuilder.toString();
+ }
+
+ class HistogramEntry {
+ float value;
+ int count;
+ }
+}
Index: src/java/org/apache/nutch/util/TableUtil.java
===================================================================
--- src/java/org/apache/nutch/util/TableUtil.java (revision 1328229)
+++ src/java/org/apache/nutch/util/TableUtil.java (working copy)
@@ -126,6 +126,17 @@
buf.append(splits[0]);
}
+ public static String reverseHost(String hostName) {
+ StringBuilder buf = new StringBuilder();
+ reverseAppendSplits(hostName.split("\\."), buf);
+ return buf.toString();
+
+ }
+ public static String unreverseHost(String reversedHostName) {
+ return reverseHost(reversedHostName); // Reversible
+ }
+
+
/**
* Convert given Utf8 instance to String
*
Index: src/java/org/apache/nutch/fetcher/FetcherReducer.java
===================================================================
--- src/java/org/apache/nutch/fetcher/FetcherReducer.java (revision 1328229)
+++ src/java/org/apache/nutch/fetcher/FetcherReducer.java (working copy)
@@ -34,11 +34,12 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.util.Utf8;
-import org.slf4j.Logger;
+import org.apache.gora.mapreduce.GoraReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.crawl.URLWebPage;
+import org.apache.nutch.host.HostDb;
import org.apache.nutch.net.URLFilterException;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
@@ -51,12 +52,13 @@
import org.apache.nutch.protocol.ProtocolStatusCodes;
import org.apache.nutch.protocol.ProtocolStatusUtils;
import org.apache.nutch.protocol.RobotRules;
+import org.apache.nutch.storage.Host;
import org.apache.nutch.storage.Mark;
import org.apache.nutch.storage.ProtocolStatus;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.URLUtil;
-import org.apache.gora.mapreduce.GoraReducer;
+import org.slf4j.Logger;
public class FetcherReducer
extends GoraReducer {
@@ -261,12 +263,15 @@
long minCrawlDelay;
Configuration conf;
long timelimit = -1;
+
+ boolean useHostSettings = false;
+ HostDb hostDb = null;
public static final String QUEUE_MODE_HOST = "byHost";
public static final String QUEUE_MODE_DOMAIN = "byDomain";
public static final String QUEUE_MODE_IP = "byIP";
- public FetchItemQueues(Configuration conf) {
+ public FetchItemQueues(Configuration conf) throws IOException {
this.conf = conf;
this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
@@ -277,6 +282,17 @@
queueMode = QUEUE_MODE_HOST;
}
LOG.info("Using queue mode : "+queueMode);
+
+ // Optionally enable host specific queue behavior
+ if (queueMode.equals(QUEUE_MODE_HOST)) {
+ useHostSettings = conf.getBoolean("fetcher.queue.use.host.settings", false);
+ if (useHostSettings) {
+ LOG.info("Host specific queue settings enabled.");
+ // Initialize the HostDb if we need it.
+ hostDb = new HostDb(conf);
+ }
+ }
+
this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
this.timelimit = conf.getLong("fetcher.timelimit", -1);
@@ -317,8 +333,27 @@
public synchronized FetchItemQueue getFetchItemQueue(String id) {
FetchItemQueue fiq = queues.get(id);
if (fiq == null) {
- // initialize queue
- fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+ // Create a new queue
+ if (useHostSettings) {
+ // Use host specific queue settings (if defined in the host table)
+ try {
+ String hostname = id.substring(id.indexOf("://")+3);
+ Host host = hostDb.getByHostName(hostname);
+ if (host != null) {
+ fiq = new FetchItemQueue(conf,
+ host.getInt("q_mt", maxThreads),
+ host.getLong("q_cd", crawlDelay),
+ host.getLong("q_mcd", minCrawlDelay));
+ }
+
+ } catch (IOException e) {
+ LOG.error("Error while trying to access host settings", e);
+ }
+ }
+ if (fiq == null) {
+ // Use queue defaults
+ fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+ }
queues.put(id, fiq);
}
return fiq;
Index: src/java/org/apache/nutch/indexer/IndexerReducer.java
===================================================================
--- src/java/org/apache/nutch/indexer/IndexerReducer.java (revision 1328229)
+++ src/java/org/apache/nutch/indexer/IndexerReducer.java (working copy)
@@ -48,7 +48,7 @@
filters = new IndexingFilters(conf);
scoringFilters = new ScoringFilters(conf);
try {
- store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
+ store = StorageUtils.createWebStore(conf, String.class, WebPage.class);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
Index: src/java/org/apache/nutch/storage/Host.java
===================================================================
--- src/java/org/apache/nutch/storage/Host.java (revision 0)
+++ src/java/org/apache/nutch/storage/Host.java (revision 0)
@@ -0,0 +1,151 @@
+package org.apache.nutch.storage;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificExceptionBase;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.ListGenericArray;
+
+@SuppressWarnings("all")
+public class Host extends PersistentBase {
+ public static final org.apache.avro.Schema _SCHEMA = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Host\",\"namespace\":\"org.apache.nutch.storage\",\"fields\":[{\"name\":\"metadata\",\"type\":{\"type\":\"map\",\"values\":\"bytes\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"inlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}");
+ public java.util.Map metadata;
+ public java.util.Map outlinks;
+ public java.util.Map inlinks;
+
+ public static enum Field {
+ METADATA(0,"metadata"),
+ OUTLINKS(1,"outlinks"),
+ INLINKS(2,"inlinks"),
+ ;
+ private int index;
+ private String name;
+ Field(int index, String name) {this.index=index;this.name=name;}
+ public int getIndex() {return index;}
+ public String getName() {return name;}
+ public String toString() {return name;}
+ };
+ public static final String[] _ALL_FIELDS = {"metadata","outlinks","inlinks"};
+ static {
+ PersistentBase.registerFields(Host.class, _ALL_FIELDS);
+ }
+
+ public Host() {
+ this(new StateManagerImpl());
+ }
+ public Host(StateManager stateManager) {
+ super(stateManager);
+ metadata = new StatefulHashMap();
+ inlinks = new StatefulHashMap();
+ outlinks = new StatefulHashMap();
+ }
+ public Host newInstance(StateManager stateManager) {
+ return new Host(stateManager);
+ }
+ public Schema getSchema() { return _SCHEMA; }
+ public Object get(int _field) {
+ switch (_field) {
+ case 0: return metadata;
+ case 1: return outlinks;
+ case 2: return inlinks;
+ default: throw new AvroRuntimeException("Bad index");
+ }
+ }
+ @SuppressWarnings(value="unchecked")
+ public void put(int _field, Object _value) {
+
+ if(isFieldEqual(_field, _value)) return;
+ getStateManager().setDirty(this, _field);
+ switch (_field) {
+ case 0: metadata = (Map)_value; break;
+ case 1: outlinks = (Map)_value; break;
+ case 2: inlinks = (Map)_value; break;
+ default: throw new AvroRuntimeException("Bad index");
+ }
+ }
+
+ public Map getMetadata() {
+ return (Map) get(0);
+ }
+ public ByteBuffer getFromMetadata(Utf8 key) {
+ if (metadata == null) { return null; }
+ return metadata.get(key);
+ }
+
+ public void putToMetadata(Utf8 key, ByteBuffer value) {
+ getStateManager().setDirty(this, 0);
+ metadata.put(key, value);
+ }
+ public ByteBuffer removeFromMetadata(Utf8 key) {
+ if (metadata == null) { return null; }
+ getStateManager().setDirty(this, 0);
+ return metadata.remove(key);
+ }
+ public Map getOutlinks() {
+ return (Map) get(1);
+ }
+ public Utf8 getFromOutlinks(Utf8 key) {
+ if (outlinks == null) { return null; }
+ return outlinks.get(key);
+ }
+ public void putToOutlinks(Utf8 key, Utf8 value) {
+ getStateManager().setDirty(this, 1);
+ outlinks.put(key, value);
+ }
+ public Utf8 removeFromOutlinks(Utf8 key) {
+ if (outlinks == null) { return null; }
+ getStateManager().setDirty(this, 1);
+ return outlinks.remove(key);
+ }
+ public Map getInlinks() {
+ return (Map) get(2);
+ }
+ public Utf8 getFromInlinks(Utf8 key) {
+ if (inlinks == null) { return null; }
+ return inlinks.get(key);
+ }
+ public void putToInlinks(Utf8 key, Utf8 value) {
+ getStateManager().setDirty(this, 2);
+ inlinks.put(key, value);
+ }
+ public Utf8 removeFromInlinks(Utf8 key) {
+ if (inlinks == null) { return null; }
+ getStateManager().setDirty(this, 2);
+ return inlinks.remove(key);
+ }
+
+
+
+
+ public boolean contains(String key) {
+ return metadata.containsKey(new Utf8(key));
+ }
+
+ public String getValue(String key, String defaultValue) {
+ if (!contains(key)) return defaultValue;
+ return new String(metadata.get(new Utf8(key)).array());
+ }
+
+ public int getInt(String key, int defaultValue) {
+ if (!contains(key)) return defaultValue;
+ return Integer.parseInt(getValue(key,null));
+ }
+ public long getLong(String key, long defaultValue) {
+ if (!contains(key)) return defaultValue;
+ return Long.parseLong(getValue(key,null));
+ }
+}
Index: src/java/org/apache/nutch/storage/StorageUtils.java
===================================================================
--- src/java/org/apache/nutch/storage/StorageUtils.java (revision 1328229)
+++ src/java/org/apache/nutch/storage/StorageUtils.java (working copy)
@@ -35,27 +35,40 @@
public class StorageUtils {
- @SuppressWarnings("unchecked")
- public static DataStore createDataStore(Configuration conf,
- Class keyClass, Class persistentClass) throws ClassNotFoundException, GoraException {
- Class extends DataStore> dataStoreClass =
- (Class extends DataStore>) getDataStoreClass(conf);
- return DataStoreFactory.createDataStore(dataStoreClass,
- keyClass, persistentClass);
- }
-
+ /** Creates a store for the given persistentClass.
+ * Currently supports Webpage and Host stores.
+ *
+ * @param conf
+ * @param keyClass
+ * @param persistentClass
+ * @return
+ * @throws ClassNotFoundException
+ * @throws GoraException
+ */
@SuppressWarnings("unchecked")
public static DataStore createWebStore(Configuration conf,
Class keyClass, Class persistentClass) throws ClassNotFoundException, GoraException {
- String schema = conf.get("storage.schema", "webpage");
+
+ String schema = null;
+ if (WebPage.class.equals(persistentClass)) {
+ schema = conf.get("storage.schema.webpage", "webpage");
+ } else if (Host.class.equals(persistentClass)) {
+ schema = conf.get("storage.schema.host", "host");
+ } else {
+ throw new UnsupportedOperationException("Unable to create store for class " + persistentClass);
+ }
+
String crawlId = conf.get(Nutch.CRAWL_ID_KEY, "");
-
+
if (!crawlId.isEmpty()) {
schema = crawlId + "_" + schema;
}
Class extends DataStore> dataStoreClass =
(Class extends DataStore>) getDataStoreClass(conf);
+ //Following line gets a compile error during upgrade to newer Gora,
+ //Simply add 'conf' as an additional argument to fix this.
+ //(between persistentClass and schema)
return DataStoreFactory.createDataStore(dataStoreClass,
keyClass, persistentClass, schema);
}
@@ -122,7 +135,7 @@
GoraOutputFormat.setOutput(job, store, true);
}
- private static String[] toStringArray(Collection fields) {
+ public static String[] toStringArray(Collection fields) {
String[] arr = new String[fields.size()];
Iterator iter = fields.iterator();
for (int i = 0; i < arr.length; i++) {
Index: src/java/org/apache/nutch/storage/WebTableCreator.java
===================================================================
--- src/java/org/apache/nutch/storage/WebTableCreator.java (revision 1328229)
+++ src/java/org/apache/nutch/storage/WebTableCreator.java (working copy)
@@ -22,7 +22,7 @@
public class WebTableCreator {
public static void main(String[] args) throws Exception {
DataStore store =
- StorageUtils.createDataStore(NutchConfiguration.create(), String.class,
+ StorageUtils.createWebStore(NutchConfiguration.create(), String.class,
WebPage.class);
System.out.println(store);
Index: src/java/org/apache/nutch/host/HostDbUpdateJob.java
===================================================================
--- src/java/org/apache/nutch/host/HostDbUpdateJob.java (revision 0)
+++ src/java/org/apache/nutch/host/HostDbUpdateJob.java (revision 0)
@@ -0,0 +1,141 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.host;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.gora.mapreduce.GoraMapper;
+import org.apache.gora.mapreduce.GoraReducer;
+import org.apache.gora.query.Query;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.storage.Host;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scans the web table and create host entries for each unique host.
+ *
+ *
+ **/
+
+public class HostDbUpdateJob implements Tool {
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(HostDbUpdateJob.class);
+ private static final Collection FIELDS = new HashSet();
+
+ private Configuration conf;
+
+ static {
+ FIELDS.add(WebPage.Field.STATUS);
+ }
+
+ /**
+ * Maps each WebPage to a host key.
+ */
+ public static class Mapper extends GoraMapper {
+
+ @Override
+ protected void map(String key, WebPage value, Context context)
+ throws IOException, InterruptedException {
+
+ String reversedHost = TableUtil.getReversedHost(key);
+ context.write(new Text(reversedHost), value);
+ }
+ }
+
+ public HostDbUpdateJob() {
+ }
+
+ public HostDbUpdateJob(Configuration conf) {
+ setConf(conf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public void updateHosts(boolean buildLinkDb) throws Exception {
+
+ if (buildLinkDb) {
+ FIELDS.add(WebPage.Field.INLINKS);
+ FIELDS.add(WebPage.Field.OUTLINKS);
+ }
+
+ NutchJob job = new NutchJob(getConf(), "hostdb-update");
+
+ // === Map ===
+ DataStore pageStore = StorageUtils.createWebStore(
+ job.getConfiguration(), String.class, WebPage.class);
+ Query query = pageStore.newQuery();
+ query.setFields(StorageUtils.toStringArray(FIELDS)); // Note: pages without
+ // these fields are
+ // skipped
+ GoraMapper.initMapperJob(job, query, pageStore, Text.class, WebPage.class,
+ HostDbUpdateJob.Mapper.class, null, true);
+
+ // === Reduce ===
+ DataStore hostStore = StorageUtils.createWebStore(
+ job.getConfiguration(), String.class, Host.class);
+ GoraReducer.initReducerJob(job, hostStore, HostDbUpdateReducer.class);
+
+ job.waitForCompletion(true);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ boolean linkDb=false;
+ for (int i = 0; i < args.length; i++) {
+ if ("-linkDb".equals(args[i])) {
+ linkDb = true;
+ } else if ("-crawlId".equals(args[i])) {
+ getConf().set(Nutch.CRAWL_ID_KEY, args[++i]);
+ }
+ else {
+ throw new IllegalArgumentException("unrecognized arg " + args[i]
+ + " usage: (-linkDb) (-crawlId )");
+ }
+ }
+ LOG.info("Updating HostDb. Adding links:" + linkDb);
+ updateHosts(linkDb);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ final int res = ToolRunner.run(NutchConfiguration.create(),
+ new HostDbUpdateJob(), args);
+ System.exit(res);
+ }
+}
Index: src/java/org/apache/nutch/host/HostDbUpdateReducer.java
===================================================================
--- src/java/org/apache/nutch/host/HostDbUpdateReducer.java (revision 0)
+++ src/java/org/apache/nutch/host/HostDbUpdateReducer.java (revision 0)
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.host;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.mapreduce.GoraReducer;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlStatus;
+import org.apache.nutch.storage.Host;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.Histogram;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Combines all WebPages with the same host key to create a Host object,
+ * with some statistics.
+ */
+public class HostDbUpdateReducer extends GoraReducer {
+
+ @Override
+ protected void reduce(Text key, Iterable values, Context context)
+ throws IOException, InterruptedException {
+
+ int numPages = 0;
+ int numFetched = 0;
+ boolean buildLinkDb = true;
+
+ Histogram inlinkCount = new Histogram();
+ Histogram outlinkCount = new Histogram();
+
+ for (WebPage page: values) {
+ // count number of pages
+ numPages++;
+ // count number of fetched pages
+ if (page.getStatus() == CrawlStatus.STATUS_FETCHED) {
+ numFetched++;
+ }
+
+ // build host link db
+ // TODO: limit number of links
+ if (buildLinkDb) {
+ if (page.getInlinks() != null) {
+ Set inlinks = page.getInlinks().keySet();
+ for (Utf8 inlink: inlinks) {
+ String host = URLUtil.getHost(inlink.toString());
+ inlinkCount.add(host);
+ }
+ }
+ if (page.getOutlinks() != null) {
+ Set outlinks = page.getOutlinks().keySet();
+ for (Utf8 outlink: outlinks) {
+ String host = URLUtil.getHost(outlink.toString());
+ outlinkCount.add(host);
+ }
+ }
+ }
+ }
+
+ // output host data
+ Host host = new Host();
+ host.putToMetadata(new Utf8("p"),ByteBuffer.wrap(Integer.toString(numPages).getBytes()));
+ if (numFetched > 0) {
+ host.putToMetadata(new Utf8("f"),ByteBuffer.wrap(Integer.toString(numFetched).getBytes()));
+ }
+ for (String inlink: inlinkCount.getKeys()) {
+ host.putToInlinks(new Utf8(inlink), new Utf8(Integer.toString(inlinkCount.getCount(inlink))));
+ }
+ for (String outlink: outlinkCount.getKeys()) {
+ host.putToOutlinks(new Utf8(outlink), new Utf8(Integer.toString(outlinkCount.getCount(outlink))));
+ }
+
+ context.write(key.toString(), host);
+ }
+}
Index: src/java/org/apache/nutch/host/HostInjectorJob.java
===================================================================
--- src/java/org/apache/nutch/host/HostInjectorJob.java (revision 0)
+++ src/java/org/apache/nutch/host/HostInjectorJob.java (revision 0)
@@ -0,0 +1,178 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.host;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.mapreduce.GoraOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.storage.Host;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates or updates an existing host table from a text file.
+ * The files contain one host name per line, optionally followed by custom
+ * metadata separated by tabs with the metadata key is separated from the
+ * corresponding value by '='.
+ * The URLs must contain the protocol as well as the host name
+ * e.g. http://www.nutch.org \t nutch.score=10 \t nutch.fetchInterval=2592000 \t
+ * userType=open_source
+ **/
+
+public class HostInjectorJob implements Tool {
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(HostInjectorJob.class);
+
+ private Configuration conf;
+
+ private static final Set FIELDS = new HashSet();
+ static {
+ FIELDS.add(Host.Field.METADATA);
+ }
+
+ public HostInjectorJob() {
+
+ }
+
+ public HostInjectorJob(Configuration conf) {
+ setConf(conf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public static class UrlMapper extends
+ Mapper {
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ }
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ String url = value.toString();
+
+ // skip empty lines
+ if (url.trim().length() == 0)
+ return;
+
+ Map metadata = new TreeMap();
+ if (url.indexOf("\t") != -1) {
+ String[] splits = url.split("\t");
+ url = splits[0];
+ for (int s = 1; s < splits.length; s++) {
+ // find separation between name and value
+ int indexEquals = splits[s].indexOf("=");
+ if (indexEquals == -1) {
+ // skip anything without a =
+ continue;
+ }
+ String metaname = splits[s].substring(0, indexEquals);
+ String metavalue = splits[s].substring(indexEquals + 1);
+ metadata.put(metaname, metavalue);
+ }
+ }
+
+ // now add the metadata
+ Host host = new Host();
+
+ Iterator keysIter = metadata.keySet().iterator();
+ while (keysIter.hasNext()) {
+ String keymd = keysIter.next();
+ String valuemd = metadata.get(keymd);
+ host.putToMetadata(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes()));
+ }
+
+ String hostkey = TableUtil.reverseHost(url);
+ context.write(hostkey, host);
+ }
+ }
+
+ public boolean inject(Path hostDir) throws Exception {
+ LOG.info("HostInjectorJob: starting");
+ LOG.info("HostInjectorJob: hostDir: " + hostDir);
+ Job job = new NutchJob(getConf(), "inject-hosts " + hostDir);
+ FileInputFormat.addInputPath(job, hostDir);
+ job.setMapperClass(UrlMapper.class);
+ job.setMapOutputKeyClass(String.class);
+ job.setMapOutputValueClass(Host.class);
+ job.setOutputFormatClass(GoraOutputFormat.class);
+ GoraOutputFormat.setOutput(job,
+ StorageUtils.createWebStore(getConf(), String.class, Host.class), true);
+ job.setReducerClass(Reducer.class);
+ job.setNumReduceTasks(0);
+ return job.waitForCompletion(true);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 1) {
+ System.err.println("Usage: HostInjectorJob ");
+ return -1;
+ }
+ try {
+ boolean success = inject(new Path(args[0]));
+ if (!success) {
+ LOG.error("HostInjectorJob: failed ");
+ return -1;
+ }
+ LOG.info("HostInjectorJob: finished");
+ return -0;
+ } catch (Exception e) {
+ LOG.error("HostInjectorJob: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(),
+ new HostInjectorJob(), args);
+ System.exit(res);
+ }
+}
Index: src/java/org/apache/nutch/host/HostDbReader.java
===================================================================
--- src/java/org/apache/nutch/host/HostDbReader.java (revision 0)
+++ src/java/org/apache/nutch/host/HostDbReader.java (revision 0)
@@ -0,0 +1,88 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.host;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.storage.Host;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.TableUtil;
+
+/**
+ * Display entries from the hostDB. Allows to verify that the storage is OK.
+ **/
+
+public class HostDbReader extends Configured implements Tool {
+ public static final Log LOG = LogFactory.getLog(HostDbReader.class);
+
+ private void read(String key) throws ClassNotFoundException, IOException {
+
+ DataStore datastore = StorageUtils.createWebStore(getConf(),
+ String.class, Host.class);
+
+ Query query = datastore.newQuery();
+ // possibly add a contraint to the query
+ if (key != null) {
+ query.setKey(TableUtil.reverseUrl(key));
+ }
+ // query.setFields(Host._ALL_FIELDS);
+ Result result = datastore.execute(query);
+
+ while (result.next()) {
+ String hostName = TableUtil.unreverseUrl(result.getKey());
+ Host host = result.get();
+ System.out.println(hostName);
+ System.out.println(host);
+ }
+ result.close();
+ datastore.close();
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new HostDbReader(),
+ args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length > 1) {
+ System.err.println("Usage: HostDBReader [key]");
+ return -1;
+ }
+ try {
+ String key = null;
+ if (args.length == 1)
+ key = args[0];
+ read(key);
+ return 0;
+ } catch (Exception e) {
+ LOG.fatal("HostDBReader: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+
+}
Index: src/java/org/apache/nutch/host/HostDb.java
===================================================================
--- src/java/org/apache/nutch/host/HostDb.java (revision 0)
+++ src/java/org/apache/nutch/host/HostDb.java (revision 0)
@@ -0,0 +1,146 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.host;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.storage.Host;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.util.TableUtil;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+/**
+ * A caching wrapper for the host datastore.
+ */
+public class HostDb implements Closeable {
+ public static final Log LOG = LogFactory.getLog(HostDb.class);
+
+ private static final class CacheHost {
+ private final Host host;
+ private final long timestamp;
+ public CacheHost(Host host, long timestamp) {
+ this.host = host;
+ this.timestamp = timestamp;
+ }
+ }
+ private final static CacheHost NULL_HOST = new CacheHost(null,0);
+
+
+ private DataStore hostStore;
+
+ public static final String HOSTDB_LRU_SIZE = "hostdb.lru.size";
+ public static final int DEFAULT_LRU_SIZE = 100;
+ public static final String HOSTDB_CONCURRENCY_LEVEL = "hostdb.concurrency.level";
+ public static final int DEFAULT_HOSTDB_CONCURRENCY_LEVEL = 8;
+
+ private Cache cache;
+
+ private AtomicLong lastFlush;
+
+ public HostDb(Configuration conf) throws IOException {
+ try {
+ hostStore = StorageUtils.createWebStore(conf, String.class, Host.class);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Create a cache.
+ // We add a removal listener to see if we need to flush the store,
+ // in order to adhere to the put-flush-get semantic
+ // ("read your own write") of DataStore.
+
+ long lruSize = conf.getLong(HOSTDB_LRU_SIZE, DEFAULT_LRU_SIZE);
+ int concurrencyLevel = conf.getInt(HOSTDB_CONCURRENCY_LEVEL,
+ DEFAULT_HOSTDB_CONCURRENCY_LEVEL);
+ RemovalListener listener =
+ new RemovalListener() {
+ @Override
+ public void onRemoval(
+ RemovalNotification notification) {
+ CacheHost removeFromCacheHost = notification.getValue();
+ if (removeFromCacheHost != NULL_HOST) {
+ if (removeFromCacheHost.timestamp < lastFlush.get()) {
+ try {
+ hostStore.flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ lastFlush.set(System.currentTimeMillis());
+ }
+ }
+ }
+ };
+
+ cache=CacheBuilder.newBuilder().maximumSize(lruSize)
+ .removalListener(listener).concurrencyLevel(concurrencyLevel)
+ .build();
+ lastFlush.set(System.currentTimeMillis());
+ }
+
+
+
+ public Host get(final String key) throws IOException {
+ Callable valueLoader = new Callable() {
+ @Override
+ public CacheHost call() throws Exception {
+ Host host = hostStore.get(key);
+ if (host == null) return NULL_HOST;
+ return new CacheHost(host, System.currentTimeMillis());
+ }
+ };
+ CacheHost cachedHost;
+ try {
+ cachedHost = cache.get(key, valueLoader);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ if (cachedHost != NULL_HOST) {
+ return cachedHost.host;
+ } else {
+ return null;
+ }
+ }
+
+
+
+ public Host getByHostName(String hostName) throws IOException {
+ return get(TableUtil.reverseHost(hostName));
+ }
+
+
+ public void put(String key, Host host) throws IOException {
+ cache.put(key, new CacheHost(host, System.currentTimeMillis()));
+ hostStore.put(key, host);
+ }
+
+ @Override
+ public void close() throws IOException {
+ hostStore.flush();
+ }
+}
Index: default.properties
===================================================================
--- default.properties (revision 1328229)
+++ default.properties (working copy)
@@ -16,6 +16,7 @@
build.encoding = UTF-8
build.ivy.dir=${build.dir}/ivy
build.lib.dir=${build.dir}/lib
+build.gora=${build.dir}/gora
test.src.dir = ./src/test
test.build.dir = ${build.dir}/test