Index: ivy/ivy.xml
===================================================================
--- ivy/ivy.xml	(revision 1328229)
+++ ivy/ivy.xml	(working copy)
@@ -68,6 +68,8 @@
 
 		<dependency org="org.jdom" name="jdom" rev="1.1" conf="*->default"/>
 
+  <dependency org="com.google.guava" name="guava" rev="11.0.2" />
+
 		<!--Configuration: test -->
 
 		<!--artifacts needed for testing -->
Index: conf/gora-hbase-mapping.xml
===================================================================
--- conf/gora-hbase-mapping.xml	(revision 1328229)
+++ conf/gora-hbase-mapping.xml	(working copy)
@@ -58,4 +58,16 @@
         <field name="markers" family="mk"/>
     </class>
     
+    <table name="host">
+      <family name="mtdt" maxVersions="1"/>
+      <family name="il" maxVersions="1"/>
+      <family name="ol" maxVersions="1"/>
+    </table>
+    
+    <class table="host" keyClass="java.lang.String" name="org.apache.nutch.storage.Host">
+      <field name="metadata" family="mtdt"/>
+      <field name="inlinks" family="il"/>
+      <field name="outlinks" family="ol"/>
+    </class>
+    
 </gora-orm>
Index: build.xml
===================================================================
--- build.xml	(revision 1328229)
+++ build.xml	(working copy)
@@ -487,6 +487,20 @@
    It could not be loaded from ${ivy.repo.url}
   </fail>
  </target>
+	
+  <target name="compile-avro-schema" depends="resolve-default">
+    <typedef name="schema" 
+  	         classname="org.apache.avro.specific.SchemaTask"
+  	         classpathref="classpath" />
+    
+  	<mkdir dir="${build.gora}" />
+  	<schema destdir="${build.gora}">
+  	  <fileset dir="./src/gora">
+  	    <include name="**/*.avsc"/>
+  	  </fileset>
+  	</schema>
+  	
+	</target>
 
  <!-- ================================================================== -->
  <!-- Documentation -->
Index: src/gora/host.avsc
===================================================================
--- src/gora/host.avsc	(revision 0)
+++ src/gora/host.avsc	(revision 0)
@@ -0,0 +1,9 @@
+{"name": "Host",
+ "type": "record",
+ "namespace": "org.apache.nutch.storage",
+ "fields": [
+        {"name": "metadata", "type": {"type": "map", "values": "bytes"}},
+        {"name": "outlinks", "type": {"type": "map", "values": "string"}},
+        {"name": "inlinks", "type": {"type": "map", "values": "string"}}
+   ]
+}
Index: src/java/org/apache/nutch/util/domain/DomainStatistics.java
===================================================================
--- src/java/org/apache/nutch/util/domain/DomainStatistics.java	(revision 1328229)
+++ src/java/org/apache/nutch/util/domain/DomainStatistics.java	(working copy)
@@ -103,7 +103,7 @@
 			mode = MODE_SUFFIX;
 		job.getConfiguration().setInt("domain.statistics.mode", mode);
 
-		DataStore<String, WebPage> store = StorageUtils.createDataStore(
+		DataStore<String, WebPage> store = StorageUtils.createWebStore(
 				job.getConfiguration(), String.class, WebPage.class);
 
 		Query<String, WebPage> query = store.newQuery();
Index: src/java/org/apache/nutch/util/Histogram.java
===================================================================
--- src/java/org/apache/nutch/util/Histogram.java	(revision 0)
+++ src/java/org/apache/nutch/util/Histogram.java	(revision 0)
@@ -0,0 +1,129 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.util;
+
+import java.util.*;
+
+public class Histogram<E> {
+
+  private Map<E, HistogramEntry> map = new HashMap<E, HistogramEntry>();
+  private float totalValue = 0;
+  private int totalCount = 0;
+
+  public void add(E x) {
+    add(x, 1);
+  }
+
+  public void add(E x, float value) {
+    HistogramEntry entry;
+    if (map.containsKey(x)) {
+      entry = map.get(x);
+      entry.value += value;
+      entry.count++;
+    } else {
+      entry = new HistogramEntry();
+      entry.value = value;
+      entry.count = 1;
+      map.put(x, entry);
+    }
+    totalValue += value;
+    totalCount += 1;
+  }
+
+  public Set<E> getKeys() {
+    return map.keySet();
+  }
+
+  public float getValue(E x) {
+    return map.get(x).value;
+  }
+
+  public int getCount(E x) {
+    return map.get(x).count;
+  }
+
+  public void add(Histogram<E> other) {
+    for (E x : other.getKeys()) {
+      add(x, other.getValue(x));
+    }
+  }
+
+  public Histogram<E> normalize() {
+    Histogram<E> normalized = new Histogram<E>();
+    Set<E> keys = getKeys();
+    for (E x : keys) {
+      normalized.add(x, getValue(x) / totalValue);
+    }
+    return normalized;
+  }
+
+  public List<E> sortInverseByValue() {
+    List<Map.Entry<E, HistogramEntry>> list = 
+        new Vector<Map.Entry<E, HistogramEntry>>(map.entrySet());
+
+    // Sort the list using an annonymous inner class implementing Comparator for
+    // the compare method
+    java.util.Collections.sort(list,
+        new Comparator<Map.Entry<E, HistogramEntry>>() {
+          public int compare(Map.Entry<E, HistogramEntry> entry,
+              Map.Entry<E, HistogramEntry> entry1) {
+            return (entry.getValue().equals(entry1.getValue()) ? 0 : (entry
+                .getValue().value < entry1.getValue().value ? 1 : -1));
+          }
+        });
+    List<E> list2 = new Vector<E>();
+    for (Map.Entry<E, HistogramEntry> entry : list) {
+      list2.add(entry.getKey());
+    }
+    return list2;
+  }
+
+  public List<E> sortByValue() {
+    List<Map.Entry<E, HistogramEntry>> list = 
+        new Vector<Map.Entry<E, HistogramEntry>>(map.entrySet());
+
+    // Sort the list using an annonymous inner class implementing Comparator for
+    // the compare method
+    java.util.Collections.sort(list,
+        new Comparator<Map.Entry<E, HistogramEntry>>() {
+          public int compare(Map.Entry<E, HistogramEntry> entry,
+              Map.Entry<E, HistogramEntry> entry1) {
+            return (entry.getValue().equals(entry1.getValue()) ? 0 : (entry
+                .getValue().value > entry1.getValue().value ? 1 : -1));
+          }
+        });
+    List<E> list2 = new Vector<E>();
+    for (Map.Entry<E, HistogramEntry> entry : list) {
+      list2.add(entry.getKey());
+    }
+    return list2;
+  }
+
+  public String toString(List<E> items) {
+    StringBuilder strBuilder = new StringBuilder();
+    for (E item : items) {
+      strBuilder.append(map.get(item).value).append(",").append(item)
+          .append("\n");
+    }
+    return strBuilder.toString();
+  }
+
+  class HistogramEntry {
+    float value;
+    int count;
+  }
+}
Index: src/java/org/apache/nutch/util/TableUtil.java
===================================================================
--- src/java/org/apache/nutch/util/TableUtil.java	(revision 1328229)
+++ src/java/org/apache/nutch/util/TableUtil.java	(working copy)
@@ -126,6 +126,17 @@
     buf.append(splits[0]);
   }
 
+  public static String reverseHost(String hostName) {
+    StringBuilder buf = new StringBuilder();
+    reverseAppendSplits(hostName.split("\\."), buf);
+    return buf.toString();
+    
+  }
+  public static String unreverseHost(String reversedHostName) {
+    return reverseHost(reversedHostName); // Reversible
+  }
+  
+  
   /**
    * Convert given Utf8 instance to String
    *
Index: src/java/org/apache/nutch/fetcher/FetcherReducer.java
===================================================================
--- src/java/org/apache/nutch/fetcher/FetcherReducer.java	(revision 1328229)
+++ src/java/org/apache/nutch/fetcher/FetcherReducer.java	(working copy)
@@ -34,11 +34,12 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.avro.util.Utf8;
-import org.slf4j.Logger;
+import org.apache.gora.mapreduce.GoraReducer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.nutch.crawl.CrawlStatus;
 import org.apache.nutch.crawl.URLWebPage;
+import org.apache.nutch.host.HostDb;
 import org.apache.nutch.net.URLFilterException;
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.net.URLNormalizers;
@@ -51,12 +52,13 @@
 import org.apache.nutch.protocol.ProtocolStatusCodes;
 import org.apache.nutch.protocol.ProtocolStatusUtils;
 import org.apache.nutch.protocol.RobotRules;
+import org.apache.nutch.storage.Host;
 import org.apache.nutch.storage.Mark;
 import org.apache.nutch.storage.ProtocolStatus;
 import org.apache.nutch.storage.WebPage;
 import org.apache.nutch.util.TableUtil;
 import org.apache.nutch.util.URLUtil;
-import org.apache.gora.mapreduce.GoraReducer;
+import org.slf4j.Logger;
 
 public class FetcherReducer
 extends GoraReducer<IntWritable, FetchEntry, String, WebPage> {
@@ -261,12 +263,15 @@
     long minCrawlDelay;
     Configuration conf;
     long timelimit = -1;
+    
+    boolean useHostSettings = false;
+    HostDb hostDb = null;
 
     public static final String QUEUE_MODE_HOST = "byHost";
     public static final String QUEUE_MODE_DOMAIN = "byDomain";
     public static final String QUEUE_MODE_IP = "byIP";
 
-    public FetchItemQueues(Configuration conf) {
+    public FetchItemQueues(Configuration conf) throws IOException {
       this.conf = conf;
       this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
       queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
@@ -277,6 +282,17 @@
         queueMode = QUEUE_MODE_HOST;
       }
       LOG.info("Using queue mode : "+queueMode);
+      
+      // Optionally enable host specific queue behavior 
+      if (queueMode.equals(QUEUE_MODE_HOST)) {
+        useHostSettings = conf.getBoolean("fetcher.queue.use.host.settings", false);
+        if (useHostSettings) {
+          LOG.info("Host specific queue settings enabled.");
+          // Initialize the HostDb if we need it.
+          hostDb = new HostDb(conf);
+        }
+      }
+      
       this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
       this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
       this.timelimit = conf.getLong("fetcher.timelimit", -1);
@@ -317,8 +333,27 @@
     public synchronized FetchItemQueue getFetchItemQueue(String id) {
       FetchItemQueue fiq = queues.get(id);
       if (fiq == null) {
-        // initialize queue
-        fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+        // Create a new queue
+        if (useHostSettings) {
+          // Use host specific queue settings (if defined in the host table)
+          try {
+            String hostname = id.substring(id.indexOf("://")+3);
+            Host host = hostDb.getByHostName(hostname);
+            if (host != null) {
+              fiq = new FetchItemQueue(conf,
+                                       host.getInt("q_mt", maxThreads),
+                                       host.getLong("q_cd", crawlDelay),
+                                       host.getLong("q_mcd", minCrawlDelay));
+            }
+            
+          } catch (IOException e) {
+            LOG.error("Error while trying to access host settings", e);
+          }
+        } 
+        if (fiq == null) {
+          // Use queue defaults
+          fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+        }
         queues.put(id, fiq);
       }
       return fiq;
Index: src/java/org/apache/nutch/indexer/IndexerReducer.java
===================================================================
--- src/java/org/apache/nutch/indexer/IndexerReducer.java	(revision 1328229)
+++ src/java/org/apache/nutch/indexer/IndexerReducer.java	(working copy)
@@ -48,7 +48,7 @@
     filters = new IndexingFilters(conf);
     scoringFilters = new ScoringFilters(conf);
     try {
-      store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
+      store = StorageUtils.createWebStore(conf, String.class, WebPage.class);
     } catch (ClassNotFoundException e) {
       throw new IOException(e);
     }
Index: src/java/org/apache/nutch/storage/Host.java
===================================================================
--- src/java/org/apache/nutch/storage/Host.java	(revision 0)
+++ src/java/org/apache/nutch/storage/Host.java	(revision 0)
@@ -0,0 +1,151 @@
+package org.apache.nutch.storage;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificExceptionBase;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.ListGenericArray;
+
+@SuppressWarnings("all")
+public class Host extends PersistentBase {
+  public static final org.apache.avro.Schema _SCHEMA = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Host\",\"namespace\":\"org.apache.nutch.storage\",\"fields\":[{\"name\":\"metadata\",\"type\":{\"type\":\"map\",\"values\":\"bytes\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"inlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}");
+  public java.util.Map<org.apache.avro.util.Utf8,java.nio.ByteBuffer> metadata;
+  public java.util.Map<org.apache.avro.util.Utf8,org.apache.avro.util.Utf8> outlinks;
+  public java.util.Map<org.apache.avro.util.Utf8,org.apache.avro.util.Utf8> inlinks;
+  
+  public static enum Field {
+    METADATA(0,"metadata"),
+    OUTLINKS(1,"outlinks"),
+    INLINKS(2,"inlinks"),
+    ;
+    private int index;
+    private String name;
+    Field(int index, String name) {this.index=index;this.name=name;}
+    public int getIndex() {return index;}
+    public String getName() {return name;}
+    public String toString() {return name;}
+  };
+  public static final String[] _ALL_FIELDS = {"metadata","outlinks","inlinks"};
+  static {
+    PersistentBase.registerFields(Host.class, _ALL_FIELDS);
+  }
+
+  public Host() {
+    this(new StateManagerImpl());
+  }
+  public Host(StateManager stateManager) {
+    super(stateManager);
+    metadata = new StatefulHashMap<Utf8,ByteBuffer>();
+    inlinks = new StatefulHashMap<Utf8,Utf8>();
+    outlinks = new StatefulHashMap<Utf8,Utf8>();
+  }
+  public Host newInstance(StateManager stateManager) {
+    return new Host(stateManager);
+  }
+  public Schema getSchema() { return _SCHEMA; }
+  public Object get(int _field) {
+    switch (_field) {
+    case 0: return metadata;
+    case 1: return outlinks;
+    case 2: return inlinks;
+    default: throw new AvroRuntimeException("Bad index");
+    }
+  }
+  @SuppressWarnings(value="unchecked")
+  public void put(int _field, Object _value) {
+    
+    if(isFieldEqual(_field, _value)) return;
+    getStateManager().setDirty(this, _field);
+    switch (_field) {
+    case 0: metadata = (Map<Utf8,ByteBuffer>)_value; break;
+    case 1: outlinks = (Map<Utf8,Utf8>)_value; break;
+    case 2: inlinks = (Map<Utf8,Utf8>)_value; break;
+    default: throw new AvroRuntimeException("Bad index");
+    }
+  } 
+  
+  public Map<Utf8, ByteBuffer> getMetadata() {
+    return (Map<Utf8, ByteBuffer>) get(0);
+  }
+  public ByteBuffer getFromMetadata(Utf8 key) {
+    if (metadata == null) { return null; }
+    return metadata.get(key);
+  }
+  
+  public void putToMetadata(Utf8 key, ByteBuffer value) {
+    getStateManager().setDirty(this, 0);
+    metadata.put(key, value);
+  }
+  public ByteBuffer removeFromMetadata(Utf8 key) {
+    if (metadata == null) { return null; }
+    getStateManager().setDirty(this, 0);
+    return metadata.remove(key);
+  }
+  public Map<Utf8, Utf8> getOutlinks() {
+    return (Map<Utf8, Utf8>) get(1);
+  }
+  public Utf8 getFromOutlinks(Utf8 key) {
+    if (outlinks == null) { return null; }
+    return outlinks.get(key);
+  }
+  public void putToOutlinks(Utf8 key, Utf8 value) {
+    getStateManager().setDirty(this, 1);
+    outlinks.put(key, value);
+  }
+  public Utf8 removeFromOutlinks(Utf8 key) {
+    if (outlinks == null) { return null; }
+    getStateManager().setDirty(this, 1);
+    return outlinks.remove(key);
+  }
+  public Map<Utf8, Utf8> getInlinks() {
+    return (Map<Utf8, Utf8>) get(2);
+  }
+  public Utf8 getFromInlinks(Utf8 key) {
+    if (inlinks == null) { return null; }
+    return inlinks.get(key);
+  }
+  public void putToInlinks(Utf8 key, Utf8 value) {
+    getStateManager().setDirty(this, 2);
+    inlinks.put(key, value);
+  }
+  public Utf8 removeFromInlinks(Utf8 key) {
+    if (inlinks == null) { return null; }
+    getStateManager().setDirty(this, 2);
+    return inlinks.remove(key);
+  }
+  
+  
+  
+  
+  public boolean contains(String key) {
+    return metadata.containsKey(new Utf8(key));
+  }
+  
+  public String getValue(String key, String defaultValue) {
+    if (!contains(key)) return defaultValue;
+    return new String(metadata.get(new Utf8(key)).array());
+  }
+  
+  public int getInt(String key, int defaultValue) {
+    if (!contains(key)) return defaultValue;
+    return Integer.parseInt(getValue(key,null));
+  }
+  public long getLong(String key, long defaultValue) {
+    if (!contains(key)) return defaultValue;
+    return Long.parseLong(getValue(key,null));
+  }
+}
Index: src/java/org/apache/nutch/storage/StorageUtils.java
===================================================================
--- src/java/org/apache/nutch/storage/StorageUtils.java	(revision 1328229)
+++ src/java/org/apache/nutch/storage/StorageUtils.java	(working copy)
@@ -35,27 +35,40 @@
 
 public class StorageUtils {
 
-  @SuppressWarnings("unchecked")
-  public static <K, V extends Persistent> DataStore<K, V> createDataStore(Configuration conf,
-      Class<K> keyClass, Class<V> persistentClass) throws ClassNotFoundException, GoraException {
-    Class<? extends DataStore<K, V>> dataStoreClass =
-      (Class<? extends DataStore<K, V>>) getDataStoreClass(conf);
-    return DataStoreFactory.createDataStore(dataStoreClass,
-            keyClass, persistentClass);
-  }
-
+  /** Creates a store for the given persistentClass.
+   * Currently supports Webpage and Host stores.
+   * 
+   * @param conf
+   * @param keyClass
+   * @param persistentClass
+   * @return
+   * @throws ClassNotFoundException
+   * @throws GoraException
+   */
   @SuppressWarnings("unchecked")
   public static <K, V extends Persistent> DataStore<K, V> createWebStore(Configuration conf,
       Class<K> keyClass, Class<V> persistentClass) throws ClassNotFoundException, GoraException {
-    String schema = conf.get("storage.schema", "webpage");
+    
+    String schema = null;
+    if (WebPage.class.equals(persistentClass)) {
+      schema = conf.get("storage.schema.webpage", "webpage");
+    } else if (Host.class.equals(persistentClass)) {
+      schema = conf.get("storage.schema.host", "host");
+    } else {
+      throw new UnsupportedOperationException("Unable to create store for class " + persistentClass);
+    }
+    
     String crawlId = conf.get(Nutch.CRAWL_ID_KEY, "");
-
+    
     if (!crawlId.isEmpty()) {
       schema = crawlId + "_" + schema;
     }
 
     Class<? extends DataStore<K, V>> dataStoreClass =
       (Class<? extends DataStore<K, V>>) getDataStoreClass(conf);
+    //Following line gets a compile error during upgrade to newer Gora,
+    //Simply add 'conf' as an additional argument to fix this. 
+    //(between persistentClass and schema)
     return DataStoreFactory.createDataStore(dataStoreClass,
             keyClass, persistentClass, schema);
   }
@@ -122,7 +135,7 @@
     GoraOutputFormat.setOutput(job, store, true);
   }
 
-  private static String[] toStringArray(Collection<WebPage.Field> fields) {
+  public static String[] toStringArray(Collection<WebPage.Field> fields) {
     String[] arr = new String[fields.size()];
     Iterator<WebPage.Field> iter = fields.iterator();
     for (int i = 0; i < arr.length; i++) {
Index: src/java/org/apache/nutch/storage/WebTableCreator.java
===================================================================
--- src/java/org/apache/nutch/storage/WebTableCreator.java	(revision 1328229)
+++ src/java/org/apache/nutch/storage/WebTableCreator.java	(working copy)
@@ -22,7 +22,7 @@
 public class WebTableCreator {
   public static void main(String[] args) throws Exception {
     DataStore<String, WebPage> store =
-      StorageUtils.createDataStore(NutchConfiguration.create(), String.class,
+      StorageUtils.createWebStore(NutchConfiguration.create(), String.class,
         WebPage.class);
 
     System.out.println(store);
Index: src/java/org/apache/nutch/host/HostDbUpdateJob.java
===================================================================
--- src/java/org/apache/nutch/host/HostDbUpdateJob.java	(revision 0)
+++ src/java/org/apache/nutch/host/HostDbUpdateJob.java	(revision 0)
@@ -0,0 +1,141 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.host;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.gora.mapreduce.GoraMapper;
+import org.apache.gora.mapreduce.GoraReducer;
+import org.apache.gora.query.Query;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.storage.Host;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scans the web table and create host entries for each unique host.
+ * 
+ * 
+ **/
+
+public class HostDbUpdateJob implements Tool {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(HostDbUpdateJob.class);
+  private static final Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
+
+  private Configuration conf;
+
+  static {
+    FIELDS.add(WebPage.Field.STATUS);
+  }
+
+  /**
+   * Maps each WebPage to a host key.
+   */
+  public static class Mapper extends GoraMapper<String, WebPage, Text, WebPage> {
+
+    @Override
+    protected void map(String key, WebPage value, Context context)
+        throws IOException, InterruptedException {
+
+      String reversedHost = TableUtil.getReversedHost(key);
+      context.write(new Text(reversedHost), value);
+    }
+  }
+
+  public HostDbUpdateJob() {
+  }
+
+  public HostDbUpdateJob(Configuration conf) {
+    setConf(conf);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public void updateHosts(boolean buildLinkDb) throws Exception {
+
+    if (buildLinkDb) {
+      FIELDS.add(WebPage.Field.INLINKS);
+      FIELDS.add(WebPage.Field.OUTLINKS);
+    }
+
+    NutchJob job = new NutchJob(getConf(), "hostdb-update");
+
+    // === Map ===
+    DataStore<String, WebPage> pageStore = StorageUtils.createWebStore(
+        job.getConfiguration(), String.class, WebPage.class);
+    Query<String, WebPage> query = pageStore.newQuery();
+    query.setFields(StorageUtils.toStringArray(FIELDS)); // Note: pages without
+                                                         // these fields are
+                                                         // skipped
+    GoraMapper.initMapperJob(job, query, pageStore, Text.class, WebPage.class,
+        HostDbUpdateJob.Mapper.class, null, true);
+
+    // === Reduce ===
+    DataStore<String, Host> hostStore = StorageUtils.createWebStore(
+        job.getConfiguration(), String.class, Host.class);
+    GoraReducer.initReducerJob(job, hostStore, HostDbUpdateReducer.class);
+
+    job.waitForCompletion(true);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    boolean linkDb=false;
+    for (int i = 0; i < args.length; i++) {
+      if ("-linkDb".equals(args[i])) {
+        linkDb = true;
+      } else if ("-crawlId".equals(args[i])) {
+        getConf().set(Nutch.CRAWL_ID_KEY, args[++i]);
+      }
+      else {
+        throw new IllegalArgumentException("unrecognized arg " + args[i] 
+            + " usage: (-linkDb) (-crawlId <crawlId>)");
+      }
+    }
+    LOG.info("Updating HostDb. Adding links:" + linkDb);
+    updateHosts(linkDb);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    final int res = ToolRunner.run(NutchConfiguration.create(),
+        new HostDbUpdateJob(), args);
+    System.exit(res);
+  }
+}
Index: src/java/org/apache/nutch/host/HostDbUpdateReducer.java
===================================================================
--- src/java/org/apache/nutch/host/HostDbUpdateReducer.java	(revision 0)
+++ src/java/org/apache/nutch/host/HostDbUpdateReducer.java	(revision 0)
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.host;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.mapreduce.GoraReducer;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlStatus;
+import org.apache.nutch.storage.Host;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.Histogram;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Combines all WebPages with the same host key to create a Host object, 
+ * with some statistics.
+ */
+public class HostDbUpdateReducer extends GoraReducer<Text, WebPage, String, Host> {
+  
+  @Override
+  protected void reduce(Text key, Iterable<WebPage> values, Context context)
+    throws IOException, InterruptedException {
+    
+    int numPages = 0;
+    int numFetched = 0;
+    boolean buildLinkDb = true;
+    
+    Histogram<String> inlinkCount = new Histogram<String>();
+    Histogram<String> outlinkCount = new Histogram<String>();
+    
+    for (WebPage page: values) {
+      // count number of pages
+      numPages++;     
+      // count number of fetched pages
+      if (page.getStatus() == CrawlStatus.STATUS_FETCHED) {
+        numFetched++;
+      }
+      
+      // build host link db
+      // TODO: limit number of links
+      if (buildLinkDb) {
+        if (page.getInlinks() != null) {
+          Set<Utf8> inlinks = page.getInlinks().keySet();
+          for (Utf8 inlink: inlinks) {
+            String host = URLUtil.getHost(inlink.toString());
+            inlinkCount.add(host);
+          }
+        }
+        if (page.getOutlinks() != null) {
+          Set<Utf8> outlinks = page.getOutlinks().keySet();
+          for (Utf8 outlink: outlinks) {
+            String host = URLUtil.getHost(outlink.toString());
+            outlinkCount.add(host);
+          }
+        }
+      }
+    }
+    
+    // output host data
+    Host host = new Host();
+    host.putToMetadata(new Utf8("p"),ByteBuffer.wrap(Integer.toString(numPages).getBytes()));
+    if (numFetched > 0) {
+      host.putToMetadata(new Utf8("f"),ByteBuffer.wrap(Integer.toString(numFetched).getBytes())); 
+    }
+    for (String inlink: inlinkCount.getKeys()) {
+      host.putToInlinks(new Utf8(inlink), new Utf8(Integer.toString(inlinkCount.getCount(inlink))));
+    }
+    for (String outlink: outlinkCount.getKeys()) {
+      host.putToOutlinks(new Utf8(outlink), new Utf8(Integer.toString(outlinkCount.getCount(outlink))));
+    }
+    
+    context.write(key.toString(), host);
+  }
+}
Index: src/java/org/apache/nutch/host/HostInjectorJob.java
===================================================================
--- src/java/org/apache/nutch/host/HostInjectorJob.java	(revision 0)
+++ src/java/org/apache/nutch/host/HostInjectorJob.java	(revision 0)
@@ -0,0 +1,178 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.host;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.mapreduce.GoraOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.storage.Host;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates or updates an existing host table from a text file.<br>
+ * The files contain one host name per line, optionally followed by custom
+ * metadata separated by tabs with the metadata key is separated from the
+ * corresponding value by '='. <br>
+ * The URLs must contain the protocol as well as the host name <br>
+ * e.g. http://www.nutch.org \t nutch.score=10 \t nutch.fetchInterval=2592000 \t
+ * userType=open_source
+ **/
+
+public class HostInjectorJob implements Tool {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(HostInjectorJob.class);
+
+  private Configuration conf;
+
+  private static final Set<Host.Field> FIELDS = new HashSet<Host.Field>();
+  static {
+    FIELDS.add(Host.Field.METADATA);
+  }
+
+  public HostInjectorJob() {
+
+  }
+
+  public HostInjectorJob(Configuration conf) {
+    setConf(conf);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public static class UrlMapper extends
+      Mapper<LongWritable, Text, String, Host> {
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+    }
+
+    @Override
+    protected void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      String url = value.toString();
+
+      // skip empty lines
+      if (url.trim().length() == 0)
+        return;
+
+      Map<String, String> metadata = new TreeMap<String, String>();
+      if (url.indexOf("\t") != -1) {
+        String[] splits = url.split("\t");
+        url = splits[0];
+        for (int s = 1; s < splits.length; s++) {
+          // find separation between name and value
+          int indexEquals = splits[s].indexOf("=");
+          if (indexEquals == -1) {
+            // skip anything without a =
+            continue;
+          }
+          String metaname = splits[s].substring(0, indexEquals);
+          String metavalue = splits[s].substring(indexEquals + 1);
+          metadata.put(metaname, metavalue);
+        }
+      }
+
+      // now add the metadata
+      Host host = new Host();
+
+      Iterator<String> keysIter = metadata.keySet().iterator();
+      while (keysIter.hasNext()) {
+        String keymd = keysIter.next();
+        String valuemd = metadata.get(keymd);
+        host.putToMetadata(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes()));
+      }
+
+      String hostkey = TableUtil.reverseHost(url);
+      context.write(hostkey, host);
+    }
+  }
+
+  public boolean inject(Path hostDir) throws Exception {
+    LOG.info("HostInjectorJob: starting");
+    LOG.info("HostInjectorJob: hostDir: " + hostDir);
+    Job job = new NutchJob(getConf(), "inject-hosts " + hostDir);
+    FileInputFormat.addInputPath(job, hostDir);
+    job.setMapperClass(UrlMapper.class);
+    job.setMapOutputKeyClass(String.class);
+    job.setMapOutputValueClass(Host.class);
+    job.setOutputFormatClass(GoraOutputFormat.class);
+    GoraOutputFormat.setOutput(job,
+        StorageUtils.createWebStore(getConf(), String.class, Host.class), true);
+    job.setReducerClass(Reducer.class);
+    job.setNumReduceTasks(0);
+    return job.waitForCompletion(true);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 1) {
+      System.err.println("Usage: HostInjectorJob <host_dir>");
+      return -1;
+    }
+    try {
+      boolean success = inject(new Path(args[0]));
+      if (!success) {
+        LOG.error("HostInjectorJob: failed ");
+        return -1;
+      }
+      LOG.info("HostInjectorJob: finished");
+      return -0;
+    } catch (Exception e) {
+      LOG.error("HostInjectorJob: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(),
+        new HostInjectorJob(), args);
+    System.exit(res);
+  }
+}
Index: src/java/org/apache/nutch/host/HostDbReader.java
===================================================================
--- src/java/org/apache/nutch/host/HostDbReader.java	(revision 0)
+++ src/java/org/apache/nutch/host/HostDbReader.java	(revision 0)
@@ -0,0 +1,88 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.host;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.storage.Host;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.TableUtil;
+
+/**
+ * Display entries from the hostDB. Allows to verify that the storage is OK.
+ **/
+
+public class HostDbReader extends Configured implements Tool {
+  public static final Log LOG = LogFactory.getLog(HostDbReader.class);
+
+  private void read(String key) throws ClassNotFoundException, IOException {
+
+    DataStore<String, Host> datastore = StorageUtils.createWebStore(getConf(),
+        String.class, Host.class);
+
+    Query<String, Host> query = datastore.newQuery();
+    // possibly add a contraint to the query
+    if (key != null) {
+      query.setKey(TableUtil.reverseUrl(key));
+    }
+    // query.setFields(Host._ALL_FIELDS);
+    Result<String, Host> result = datastore.execute(query);
+
+    while (result.next()) {
+      String hostName = TableUtil.unreverseUrl(result.getKey());
+      Host host = result.get();
+      System.out.println(hostName);
+      System.out.println(host);
+    }
+    result.close();
+    datastore.close();
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new HostDbReader(),
+        args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length > 1) {
+      System.err.println("Usage: HostDBReader [key]");
+      return -1;
+    }
+    try {
+      String key = null;
+      if (args.length == 1)
+        key = args[0];
+      read(key);
+      return 0;
+    } catch (Exception e) {
+      LOG.fatal("HostDBReader: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+
+}
Index: src/java/org/apache/nutch/host/HostDb.java
===================================================================
--- src/java/org/apache/nutch/host/HostDb.java	(revision 0)
+++ src/java/org/apache/nutch/host/HostDb.java	(revision 0)
@@ -0,0 +1,146 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.host;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.storage.Host;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.util.TableUtil;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+/**
+ * A caching wrapper for the host datastore. 
+ */
+public class HostDb implements Closeable {
+  public static final Log LOG = LogFactory.getLog(HostDb.class);
+  
+  private static final class CacheHost {
+    private final Host host;
+    private final long timestamp;
+    public CacheHost(Host host, long timestamp) {
+      this.host = host;
+      this.timestamp = timestamp;
+    }   
+  }
+  private final static CacheHost NULL_HOST = new CacheHost(null,0);
+  
+
+  private DataStore<String, Host> hostStore;
+
+  public static final String HOSTDB_LRU_SIZE = "hostdb.lru.size";
+  public static final int DEFAULT_LRU_SIZE = 100;
+  public static final String HOSTDB_CONCURRENCY_LEVEL = "hostdb.concurrency.level";
+  public static final int DEFAULT_HOSTDB_CONCURRENCY_LEVEL = 8;
+
+  private Cache<String, CacheHost> cache;
+  
+  private AtomicLong lastFlush;
+
+  public HostDb(Configuration conf) throws IOException {
+    try {
+      hostStore = StorageUtils.createWebStore(conf, String.class, Host.class);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Create a cache.
+    // We add a removal listener to see if we need to flush the store,
+    // in order to adhere to the put-flush-get semantic 
+    // ("read your own write") of DataStore.
+    
+    long lruSize = conf.getLong(HOSTDB_LRU_SIZE, DEFAULT_LRU_SIZE);
+    int concurrencyLevel = conf.getInt(HOSTDB_CONCURRENCY_LEVEL, 
+        DEFAULT_HOSTDB_CONCURRENCY_LEVEL);
+    RemovalListener<String, CacheHost> listener = 
+        new RemovalListener<String, CacheHost>() {
+          @Override
+          public void onRemoval(
+              RemovalNotification<String, CacheHost> notification) {
+            CacheHost removeFromCacheHost = notification.getValue();
+            if (removeFromCacheHost != NULL_HOST) {
+              if (removeFromCacheHost.timestamp < lastFlush.get()) {
+                try {
+                  hostStore.flush();
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+                lastFlush.set(System.currentTimeMillis());
+              }
+            }
+          }
+    };
+    
+    cache=CacheBuilder.newBuilder().maximumSize(lruSize)
+        .removalListener(listener).concurrencyLevel(concurrencyLevel)
+        .build();
+    lastFlush.set(System.currentTimeMillis());
+  }
+
+  
+  
+  public Host get(final String key) throws IOException {
+    Callable<CacheHost> valueLoader = new Callable<CacheHost>() {
+      @Override
+      public CacheHost call() throws Exception {
+        Host host = hostStore.get(key);
+        if (host == null) return NULL_HOST;
+        return new CacheHost(host, System.currentTimeMillis());
+      }  
+    };
+    CacheHost cachedHost;
+    try {
+      cachedHost = cache.get(key, valueLoader);
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+    if (cachedHost != NULL_HOST) {
+      return cachedHost.host;
+    } else {
+      return null;
+    }
+  }
+ 
+
+
+  public Host getByHostName(String hostName) throws IOException {
+   return get(TableUtil.reverseHost(hostName));
+  }
+  
+  
+  public void put(String key, Host host) throws IOException {
+    cache.put(key, new CacheHost(host, System.currentTimeMillis()));
+    hostStore.put(key, host);
+  }
+
+  @Override
+  public void close() throws IOException {
+    hostStore.flush();
+  }
+}
Index: default.properties
===================================================================
--- default.properties	(revision 1328229)
+++ default.properties	(working copy)
@@ -16,6 +16,7 @@
 build.encoding = UTF-8
 build.ivy.dir=${build.dir}/ivy
 build.lib.dir=${build.dir}/lib
+build.gora=${build.dir}/gora
 
 test.src.dir = ./src/test
 test.build.dir = ${build.dir}/test
