Index: src/java/org/apache/nutch/crawl/metadata/HostType.java
===================================================================
--- src/java/org/apache/nutch/crawl/metadata/HostType.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/metadata/HostType.java	(revision 0)
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+public class HostType implements WritableComparable<HostType> {
+
+  public static final int METADATA_CONTAINER = 1;
+
+  public static final int URL_PARSEDATA_CONTAINER = 2;
+
+  private Text _host = new Text();
+
+  private IntWritable _type = new IntWritable();
+
+  public HostType() {
+  }
+
+  public HostType(Text host, int type) {
+    _host.set(host);
+    _type.set(type);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _host.readFields(in);
+    _type.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    _host.write(out);
+    _type.write(out);
+  }
+
+  @Override
+  public int compareTo(HostType that) {
+    int i = _host.compareTo(that._host);
+    if (i == 0) {
+      // in case host is identically make sure that METADATA_CONTAINER is
+      // passing a
+      // reducer first
+      i = ((_type.get() < that._type.get() ? -1 : (_type.get() == that._type
+          .get()) ? 0 : 1));
+    }
+    return i;
+  }
+
+  public Text getHost() {
+    return _host;
+  }
+
+  @Override
+  public int hashCode() {
+    return _host.hashCode() + _type.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    HostType other = (HostType) obj;
+    return _host.equals(other._host) && _type.equals(other._type);
+  }
+
+  @Override
+  public String toString() {
+    return _host.toString() + " (" + _type + ")";
+  }
+
+}
Index: src/java/org/apache/nutch/crawl/metadata/MetadataMerger.java
===================================================================
--- src/java/org/apache/nutch/crawl/metadata/MetadataMerger.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/metadata/MetadataMerger.java	(revision 0)
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl.metadata;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.nutch.crawl.metadata.MetadataInjector.MetadataContainer;
+import org.apache.nutch.crawl.metadata.ParseDataWrapper.UrlParseDataContainer;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.NutchJob;
+
+public class MetadataMerger extends Configured {
+
+  public static final Log LOG = LogFactory.getLog(MetadataMerger.class);
+
+  public static class ObjectWritableMapper implements
+      Mapper<HostType, Writable, HostType, ObjectWritable> {
+
+    @Override
+    public void map(HostType key, Writable value,
+        OutputCollector<HostType, ObjectWritable> collector, Reporter reporter)
+        throws IOException {
+      ObjectWritable objectWritable = new ObjectWritable(value);
+      collector.collect(key, objectWritable);
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+  }
+
+  public static class MetadataReducer implements
+      Reducer<HostType, ObjectWritable, HostType, ObjectWritable> {
+
+    private MetadataContainer _metadataContainer;
+
+    public void reduce(HostType key, Iterator<ObjectWritable> values,
+        OutputCollector<HostType, ObjectWritable> out, Reporter report)
+        throws IOException {
+
+      while (values.hasNext()) {
+        ObjectWritable obj = (ObjectWritable) values.next();
+        Object value = obj.get(); // unwrap
+        if (value instanceof MetadataContainer) {
+          _metadataContainer = (MetadataContainer) value;
+          return;
+        }
+
+        UrlParseDataContainer urlParseDataContainer = (UrlParseDataContainer) value;
+        Text url = urlParseDataContainer.getUrl();
+        ParseData parseData = urlParseDataContainer.getParseData();
+        Metadata metadataFromSegment = parseData.getParseMeta();
+        //
+
+        for (Metadata metadata : _metadataContainer.getMetadatas()) {
+          String pattern = metadata.get("pattern");
+          if (url.toString().startsWith(pattern)) {
+            String[] names = metadata.names();
+            for (String name : names) {
+              String[] metadataValues = metadata.getValues(name);
+              for (String metadataValue : metadataValues) {
+                metadataFromSegment.add(name, metadataValue);
+              }
+            }
+          }
+        }
+
+        out.collect(key, obj);
+
+      }
+    }
+
+    public void configure(JobConf arg0) {
+    }
+
+    public void close() throws IOException {
+    }
+
+  }
+
+  public MetadataMerger(Configuration conf) {
+    super(conf);
+  }
+
+  public void merge(Path metadataDb, Path wrappedParseData, Path out)
+      throws IOException {
+    LOG.info("metadata update: merge started.");
+    JobConf mergeJob = new NutchJob(getConf());
+    mergeJob.setJobName("merging: " + metadataDb + " and " + wrappedParseData);
+    mergeJob.setInputFormat(SequenceFileInputFormat.class);
+
+    FileInputFormat.addInputPath(mergeJob, wrappedParseData);
+    FileInputFormat.addInputPath(mergeJob, new Path(metadataDb, "current"));
+    FileOutputFormat.setOutputPath(mergeJob, out);
+    mergeJob.setMapperClass(ObjectWritableMapper.class);
+    mergeJob.setReducerClass(MetadataReducer.class);
+    mergeJob.setOutputFormat(MapFileOutputFormat.class);
+    mergeJob.setOutputKeyClass(HostType.class);
+    mergeJob.setOutputValueClass(ObjectWritable.class);
+    JobClient.runJob(mergeJob);
+  }
+
+}
Index: src/java/org/apache/nutch/crawl/metadata/ParseDataWrapper.java
===================================================================
--- src/java/org/apache/nutch/crawl/metadata/ParseDataWrapper.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/metadata/ParseDataWrapper.java	(revision 0)
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+public class ParseDataWrapper extends Configured {
+
+  public static final Log LOG = LogFactory.getLog(ParseDataWrapper.class);
+
+  public static class UrlParseDataContainer implements Writable {
+
+    private Text _url = new Text();
+    private ParseData _parseData = new ParseData();
+
+    public UrlParseDataContainer() {
+    }
+
+    public UrlParseDataContainer(Text url, ParseData parseData) {
+      _url = url;
+      _parseData = parseData;
+    }
+
+    public Text getUrl() {
+      return _url;
+    }
+
+    public void setUrl(Text url) {
+      _url = url;
+    }
+
+    public ParseData getParseData() {
+      return _parseData;
+    }
+
+    public void setParseData(ParseData parseData) {
+      _parseData = parseData;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      _url.readFields(in);
+      _parseData = ParseData.read(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      _url.write(out);
+      _parseData.write(out);
+    }
+
+  }
+
+  public static class ParseDataWrapperMapper implements
+      Mapper<Text, ParseData, HostType, UrlParseDataContainer> {
+
+    @Override
+    public void map(Text key, ParseData value,
+        OutputCollector<HostType, UrlParseDataContainer> collector,
+        Reporter reporter) throws IOException {
+      String url = key.toString();
+      String host = new URL(url).getHost();
+      UrlParseDataContainer container = new UrlParseDataContainer(
+          new Text(url), value);
+      HostType hostType = new HostType(new Text(host),
+          HostType.URL_PARSEDATA_CONTAINER);
+      collector.collect(hostType, container);
+    }
+
+    @Override
+    public void configure(JobConf arg0) {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+  }
+
+  public ParseDataWrapper(Configuration conf) {
+    super(conf);
+  }
+
+  public void wrap(Path segment, Path out) throws IOException {
+
+    JobConf job = new NutchJob(getConf());
+    job.setJobName("wrap parse data from segment: " + segment);
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
+
+    job.setMapperClass(ParseDataWrapperMapper.class);
+
+    FileOutputFormat.setOutputPath(job, out);
+    job.setOutputFormat(MapFileOutputFormat.class);
+    job.setOutputKeyClass(HostType.class);
+    job.setOutputValueClass(UrlParseDataContainer.class);
+    JobClient.runJob(job);
+  }
+
+  public static void main(String[] args) throws IOException {
+    Configuration configuration = NutchConfiguration.create();
+    ParseDataWrapper wrapper = new ParseDataWrapper(configuration);
+    wrapper.wrap(new Path(args[0]), new Path(args[1]));
+  }
+}
Index: src/java/org/apache/nutch/crawl/metadata/ParseDataUnwrapper.java
===================================================================
--- src/java/org/apache/nutch/crawl/metadata/ParseDataUnwrapper.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/metadata/ParseDataUnwrapper.java	(revision 0)
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl.metadata;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.nutch.crawl.metadata.ParseDataWrapper.UrlParseDataContainer;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.NutchJob;
+
+public class ParseDataUnwrapper extends Configured {
+
+  public static class ParseDataUnwrapperMapper implements
+      Mapper<HostType, ObjectWritable, Text, ParseData> {
+
+    @Override
+    public void map(HostType key, ObjectWritable value,
+        OutputCollector<Text, ParseData> out, Reporter reporter)
+        throws IOException {
+      UrlParseDataContainer container = (UrlParseDataContainer) value.get();
+      Text url = container.getUrl();
+      ParseData parseData = container.getParseData();
+      out.collect(url, parseData);
+    }
+
+    @Override
+    public void configure(JobConf arg0) {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+  }
+
+  public ParseDataUnwrapper(Configuration configuration) {
+    super(configuration);
+  }
+
+  public void unwrap(Path wrappedParseData, Path out) throws IOException {
+    JobConf convertJob = new NutchJob(getConf());
+    convertJob.setJobName("format converting: " + wrappedParseData);
+    FileInputFormat.addInputPath(convertJob, wrappedParseData);
+    convertJob.setInputFormat(SequenceFileInputFormat.class);
+    convertJob.setMapperClass(ParseDataUnwrapperMapper.class);
+    FileOutputFormat.setOutputPath(convertJob, out);
+    FileOutputFormat.setCompressOutput(convertJob, true);
+    convertJob.setOutputFormat(MapFileOutputFormat.class);
+    convertJob.setOutputKeyClass(Text.class);
+    convertJob.setOutputValueClass(ParseData.class);
+    JobClient.runJob(convertJob);
+  }
+}
Index: src/java/org/apache/nutch/crawl/metadata/ParseDataUpdater.java
===================================================================
--- src/java/org/apache/nutch/crawl/metadata/ParseDataUpdater.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/metadata/ParseDataUpdater.java	(revision 0)
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl.metadata;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.NutchConfiguration;
+
+public class ParseDataUpdater extends Configured {
+
+  public static final Log LOG = LogFactory.getLog(ParseDataUpdater.class);
+
+  public ParseDataUpdater(Configuration configuration) {
+    super(configuration);
+  }
+
+  public void update(Path metadataDb, Path segment) throws IOException {
+    LOG.info("metadata update: starting");
+    LOG.info("metadata update: db: " + metadataDb);
+    LOG.info("metadata update: segment: " + segment);
+
+    // tmp dir for all jobs
+    Path tempDir = new Path(getConf().get("mapred.temp.dir",
+        System.getProperty("java.io.tmpdir")));
+    String id = Integer.toString(new Random().nextInt(Integer.MAX_VALUE));
+    LOG.info("write tmp files into: " + tempDir);
+    LOG.info("metadata update: wrap parsedata: " + segment);
+
+    String name = "metadata-wrap-parsedata-temp-" + id;
+    Path wrappedParseData = new Path(tempDir, name);
+    ParseDataWrapper parseDataWrapper = new ParseDataWrapper(getConf());
+    parseDataWrapper.wrap(segment, wrappedParseData);
+
+    LOG.info("metadata update: merge metadatadb and wrapped parse_data: "
+        + segment);
+    name = "metadata-merge-parsedata-temp-" + id;
+    Path mergeMetadataParseData = new Path(tempDir, name);
+    MetadataMerger metadataMerger = new MetadataMerger(getConf());
+    metadataMerger.merge(metadataDb, wrappedParseData, mergeMetadataParseData);
+    FileSystem.get(getConf()).delete(wrappedParseData, true);
+
+    // convert formats
+    name = "metadata-merge-unwrap-temp-" + id;
+    Path unwrapParseData = new Path(tempDir, name);
+    ParseDataUnwrapper unwrapper = new ParseDataUnwrapper(getConf());
+    unwrapper.unwrap(mergeMetadataParseData, unwrapParseData);
+    FileSystem.get(getConf()).delete(mergeMetadataParseData, true);
+
+    // install new parse_data
+    FileSystem fs = FileSystem.get(getConf());
+    Path old = new Path(segment, "old_parse_data");
+    Path current = new Path(segment, ParseData.DIR_NAME);
+    if (fs.exists(current)) {
+      if (fs.exists(old)) {
+        fs.delete(old, true);
+      }
+      fs.rename(current, old);
+    }
+    fs.rename(unwrapParseData, current);
+    if (fs.exists(old)) {
+      fs.delete(old, true);
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    Path metadataDb = new Path(args[0]);
+    Path segment = new Path(args[1]);
+    Configuration conf = NutchConfiguration.create();
+    ParseDataUpdater parseDataUpdater = new ParseDataUpdater(conf);
+    parseDataUpdater.update(metadataDb, segment);
+
+  }
+}
Index: src/java/org/apache/nutch/crawl/metadata/MetadataInjector.java
===================================================================
--- src/java/org/apache/nutch/crawl/metadata/MetadataInjector.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/metadata/MetadataInjector.java	(revision 0)
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+public class MetadataInjector extends Configured {
+
+  private static final Log LOG = LogFactory.getLog(MetadataInjector.class);
+
+  public static class MetadataContainer implements Writable {
+
+    private Set<Metadata> _metadatas = new HashSet<Metadata>();
+
+    public MetadataContainer() {
+    }
+
+    public MetadataContainer(Metadata... metadatas) {
+      for (Metadata metadata : metadatas) {
+        _metadatas.add(metadata);
+      }
+    }
+
+    public void addMetadata(Metadata metadata) {
+      _metadatas.add(metadata);
+    }
+
+    public Set<Metadata> getMetadatas() {
+      return _metadatas;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      _metadatas.clear();
+      int size = in.readInt();
+      for (int i = 0; i < size; i++) {
+        Metadata metadata = new Metadata();
+        metadata.readFields(in);
+        _metadatas.add(metadata);
+      }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(_metadatas.size());
+      for (Metadata metadata : _metadatas) {
+        metadata.write(out);
+      }
+    }
+
+    @Override
+    public String toString() {
+      String s = "";
+      for (Metadata metadata : _metadatas) {
+        s += "\r\n";
+        s += metadata;
+      }
+      return s;
+    }
+
+    @Override
+    public int hashCode() {
+      return _metadatas.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      MetadataContainer container = (MetadataContainer) obj;
+      return _metadatas.equals(container._metadatas);
+    }
+
+  }
+
+  public MetadataInjector(Configuration conf) {
+    super(conf);
+  }
+
+  /**
+   * Creates {@link HostType} - {@link MetadataContainer} tuples from each text
+   * line.
+   */
+  public static class MetadataInjectMapper implements
+      Mapper<WritableComparable<Object>, Text, HostType, MetadataContainer> {
+
+    private URLNormalizers _urlNormalizers;
+
+    public void configure(JobConf job) {
+      _urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT);
+    }
+
+    public void close() {
+    }
+
+    @Override
+    public void map(WritableComparable<Object> key, Text val,
+        OutputCollector<HostType, MetadataContainer> output, Reporter reporter)
+        throws IOException {
+
+      String line = val.toString();
+      String[] splits = line.split("\t");
+      String url = splits[0];
+      String meteKey = null;
+      Metadata metadata = new Metadata();
+      metadata.add("pattern", url);
+      for (int i = 1; i < splits.length; i++) {
+        String split = splits[i];
+        if (split.endsWith(":")) {
+          meteKey = split.substring(0, split.length() - 1);
+          continue;
+        }
+        metadata.add(meteKey, split);
+      }
+
+      try {
+        url = _urlNormalizers.normalize(url, "metadata"); // normalize
+      } catch (Exception e) {
+        LOG.warn("Skipping " + url + ":" + e.toString());
+        url = null;
+      }
+      if (url != null) {
+        String host;
+        try {
+          host = new URL(url).getHost();
+        } catch (Exception e) {
+          LOG.warn("unable to get host from: " + url + " : " + e.toString());
+          return;
+        }
+
+        HostType hostType = new HostType(new Text(host),
+            HostType.METADATA_CONTAINER);
+        MetadataContainer metadataContainer = new MetadataContainer(metadata);
+        output.collect(hostType, metadataContainer);
+      }
+    }
+
+  }
+
+  /**
+   * Reduces {@link HostType} - {@link MetadataContainer} tuples
+   */
+  public static class MetadataInjectReducer implements
+      Reducer<HostType, MetadataContainer, HostType, MetadataContainer> {
+    public void configure(JobConf job) {
+    }
+
+    public void close() {
+    }
+
+    @Override
+    public void reduce(HostType key, Iterator<MetadataContainer> values,
+        OutputCollector<HostType, MetadataContainer> output, Reporter reporter)
+        throws IOException {
+      MetadataContainer metadataContainer = new MetadataContainer();
+      while (values.hasNext()) {
+        MetadataContainer next = values.next();
+        Set<Metadata> nextMetadatas = next.getMetadatas();
+        for (Metadata nextMetadata : nextMetadatas) {
+          metadataContainer.addMetadata(nextMetadata);
+        }
+
+      }
+      output.collect(key, metadataContainer);
+    }
+  }
+
+  public void inject(Path metadataDb, Path urlDir) throws IOException {
+
+    LOG.info("MetadataInjector: starting");
+    LOG.info("MetadataInjector: metadataDb: " + metadataDb);
+    LOG.info("MetadataInjector: urlDir: " + urlDir);
+
+    String name = Integer.toString(new Random().nextInt(Integer.MAX_VALUE));
+    name = "/metadata-inject-temp-" + name;
+    Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + name);
+
+    JobConf sortJob = new NutchJob(getConf());
+    sortJob.setJobName("metadata-inject " + urlDir);
+    FileInputFormat.addInputPath(sortJob, urlDir);
+    sortJob.setMapperClass(MetadataInjectMapper.class);
+
+    FileOutputFormat.setOutputPath(sortJob, tempDir);
+    sortJob.setOutputFormat(SequenceFileOutputFormat.class);
+    sortJob.setOutputKeyClass(HostType.class);
+    sortJob.setOutputValueClass(MetadataContainer.class);
+    JobClient.runJob(sortJob);
+
+    LOG.info("MetadataInjector: Merging injected urls into metadataDb.");
+    String newDbName = Integer
+        .toString(new Random().nextInt(Integer.MAX_VALUE));
+    Path newDb = new Path(metadataDb, newDbName);
+
+    JobConf job = new NutchJob(getConf());
+    job.setJobName("merge metadata " + metadataDb);
+
+    Path current = new Path(metadataDb, "current");
+    if (FileSystem.get(job).exists(current)) {
+      FileInputFormat.addInputPath(job, current);
+    }
+    FileInputFormat.addInputPath(job, tempDir);
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setReducerClass(MetadataInjectReducer.class);
+    FileOutputFormat.setOutputPath(job, newDb);
+    job.setOutputFormat(MapFileOutputFormat.class);
+    job.setOutputKeyClass(HostType.class);
+    job.setOutputValueClass(MetadataContainer.class);
+    JobClient.runJob(job);
+
+    LOG.info("rename metadataDb");
+    FileSystem fs = new JobClient(job).getFs();
+    Path old = new Path(metadataDb, "old");
+    fs.delete(old, true);
+    if (fs.exists(current)) {
+      fs.rename(current, old);
+    }
+    fs.rename(newDb, current);
+    fs.delete(old, true);
+
+    // clean up
+    fs.delete(tempDir, true);
+    LOG.info("MetadataInjector: done");
+  }
+
+  public static void main(String[] args) throws Exception {
+    MetadataInjector injector = new MetadataInjector(NutchConfiguration
+        .create());
+    if (args.length < 2) {
+      System.err.println("Usage: MetadataInjector <metadataDb> <urls>");
+      return;
+    }
+    injector.inject(new Path(args[0]), new Path(args[1]));
+  }
+}
