Index: conf/nutch-default.xml
===================================================================
--- conf/nutch-default.xml	(revision 1550730)
+++ conf/nutch-default.xml	(working copy)
@@ -571,6 +571,51 @@
   </description>
 </property>
 
+
+<!-- Patch Nutch-289 storing IP addresses -->
+
+<property>
+  <name>db.lookup.ip.during.update</name>
+  <value>true</value>
+  <description>If true IP addresse of extracted URLs will be resolved and stored 
+  in the crawlDatum. 
+  Resolving IP addresses during update is faster than during segment generation
+  but only required when generate.max.per.host.by.ip is true. 
+  </description>
+</property>
+
+<property>
+  <name>db.lookup.ip.threads</name>
+  <value>20</value>
+  <description>If db.lookup.ip.during.update is true, this gives the number
+  threads used for ip lookup. The default value is 20. Please remark, that these
+  number of threads are run for each map job and generate fast requests to your dns.
+  Be careful not to generate a denial of service attack against your dns.
+  This is only required when generate.max.per.host.by.ip is true (Tested: 200). 
+  </description>
+</property>
+
+<property>
+  <name>db.lookup.ip.max.cache.size</name>
+  <value>30000</value>
+  <description>If true IP addresse of extracted URLs will be resolved and stored 
+  in the crawlDatum. 
+  Resolving IP addresses during update is faster than during segment generation
+  but only required when generate.max.per.host.by.ip is true (Tested: 30000). 
+  </description>
+</property>
+
+<property>
+  <name>db.lookup.ip.max.parallel.dns</name>
+  <value>216</value>
+  <description>Number of allowed parallel requests to the dns per map job. To get an
+  estimate of the number of total requests to the dns you have to multiply this value
+  with the number of used map jobs (Tested: 2160).
+  </description>
+</property>
+
+<!-- End of Patch Nutch-289 -->
+
 <!-- generate properties -->
 
 <property>
Index: src/plugin/parse-tika/src/test/org/apache/nutch/tika/TestMSWordParser.java
===================================================================
--- src/plugin/parse-tika/src/test/org/apache/nutch/tika/TestMSWordParser.java	(revision 1550730)
+++ src/plugin/parse-tika/src/test/org/apache/nutch/tika/TestMSWordParser.java	(working copy)
@@ -23,7 +23,6 @@
 import org.apache.nutch.protocol.ProtocolException;
 
 import org.apache.nutch.parse.Parse;
-import org.apache.nutch.parse.ParseImpl;
 import org.apache.nutch.parse.ParseUtil;
 import org.apache.nutch.parse.ParseException;
 import org.apache.hadoop.conf.Configuration;
@@ -33,8 +32,6 @@
 import org.apache.nutch.crawl.CrawlDatum;
 
 import java.io.File;
-import java.io.FilenameFilter;
-
 import junit.framework.TestCase;
 
 /** 
Index: src/plugin/parse-tika/src/test/org/apache/nutch/tika/TestRTFParser.java
===================================================================
--- src/plugin/parse-tika/src/test/org/apache/nutch/tika/TestRTFParser.java	(revision 1550730)
+++ src/plugin/parse-tika/src/test/org/apache/nutch/tika/TestRTFParser.java	(working copy)
@@ -20,23 +20,9 @@
 // JUnit imports
 import junit.framework.TestCase;
 
-// Nutch imports
-import org.apache.nutch.crawl.CrawlDatum;
-import org.apache.nutch.metadata.DublinCore;
-import org.apache.nutch.metadata.Metadata;
-import org.apache.nutch.parse.Parse;
-import org.apache.nutch.parse.ParseUtil;
 import org.apache.nutch.parse.ParseException;
-import org.apache.nutch.protocol.Content;
-import org.apache.nutch.protocol.Protocol;
 import org.apache.nutch.protocol.ProtocolException;
-import org.apache.nutch.protocol.ProtocolFactory;
-import org.apache.nutch.util.NutchConfiguration;
 
-// Hadoop imports
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-
 /**
  * Unit tests for TestRTFParser. (Adapted from John Xing msword unit tests).
  * 
@@ -44,26 +30,29 @@
  */
 public class TestRTFParser extends TestCase {
 
-	private String fileSeparator = System.getProperty("file.separator");
-	// This system property is defined in ./src/plugin/build-plugin.xml
-	private String sampleDir = System.getProperty("test.data", ".");
-	// Make sure sample files are copied to "test.data" as specified in
-	// ./src/plugin/parse-rtf/build.xml during plugin compilation.
-	// Check ./src/plugin/parse-rtf/sample/README.txt for what they are.
-	private String rtfFile = "test.rtf";
+  @SuppressWarnings("unused")
+  private String fileSeparator = System.getProperty("file.separator");
+  // This system property is defined in ./src/plugin/build-plugin.xml
+  @SuppressWarnings("unused")
+  private String sampleDir = System.getProperty("test.data", ".");
+  // Make sure sample files are copied to "test.data" as specified in
+  // ./src/plugin/parse-rtf/build.xml during plugin compilation.
+  // Check ./src/plugin/parse-rtf/sample/README.txt for what they are.
+  @SuppressWarnings("unused")
+  private String rtfFile = "test.rtf";
 
-	public TestRTFParser(String name) {
-		super(name);
-	}
+  public TestRTFParser(String name) {
+    super(name);
+  }
 
-	protected void setUp() {
-	}
+  protected void setUp() {
+  }
 
-	protected void tearDown() {
-	}
+  protected void tearDown() {
+  }
 
-	public void testIt() throws ProtocolException, ParseException {
-	  /* Temporarily disabled - see Tika-748
+  public void testIt() throws ProtocolException, ParseException {
+    /* Temporarily disabled - see Tika-748
 
 		String urlString;
 		Protocol protocol;
@@ -86,6 +75,6 @@
 		// METADATA extraction is not yet supported in Tika
 		// assertEquals("test rft document", title);
 		// assertEquals("tests", meta.get(DublinCore.SUBJECT));
-  */
-	}
+     */
+  }
 }
Index: src/java/org/apache/nutch/crawl/CrawlDb.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDb.java	(revision 1550730)
+++ src/java/org/apache/nutch/crawl/CrawlDb.java	(working copy)
@@ -49,11 +49,11 @@
   public static final String CRAWLDB_PURGE_404 = "db.update.purge.404";
 
   public static final String CURRENT_NAME = "current";
-  
+
   public static final String LOCK_NAME = ".locked";
-  
+
   public CrawlDb() {}
-  
+
   public CrawlDb(Configuration conf) {
     setConf(conf);
   }
@@ -62,7 +62,7 @@
     boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, true);
     update(crawlDb, segments, normalize, filter, additionsAllowed, false);
   }
-  
+
   public void update(Path crawlDb, Path[] segments, boolean normalize, boolean filter, boolean additionsAllowed, boolean force) throws IOException {
     FileSystem fs = FileSystem.get(getConf());
     Path lock = new Path(crawlDb, LOCK_NAME);
@@ -70,6 +70,18 @@
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
 
+    if (getConf().getBoolean("db.lookup.ip.during.update", false)){
+      Path crawlDatumPath = null;
+      if(null != segments && segments.length > 0) {
+        crawlDatumPath = new Path(segments[0], CrawlDatum.PARSE_DIR_NAME);
+      } else {
+        crawlDatumPath = new Path(crawlDb, CURRENT_NAME);
+      }
+      LOG.info("CrawlDb update: segment resolving IP addresses working on: " + crawlDatumPath);
+      new IpAddressResolver().resolve(crawlDatumPath, getConf());
+    }
+
+
     JobConf job = CrawlDb.createJob(getConf(), crawlDb);
     job.setBoolean(CRAWLDB_ADDITIONS_ALLOWED, additionsAllowed);
     job.setBoolean(CrawlDbFilter.URL_FILTERING, filter);
@@ -116,10 +128,10 @@
   }
 
   public static JobConf createJob(Configuration config, Path crawlDb)
-    throws IOException {
+      throws IOException {
     Path newCrawlDb =
-      new Path(crawlDb,
-               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+        new Path(crawlDb,
+            Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
     JobConf job = new NutchJob(config);
     job.setJobName("crawldb " + crawlDb);
@@ -183,7 +195,6 @@
     boolean normalize = false;
     boolean filter = false;
     boolean force = false;
-    boolean url404Purging = false;
     final FileSystem fs = FileSystem.get(getConf());
     boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, true);
     HashSet<Path> dirs = new HashSet<Path>();
Index: src/java/org/apache/nutch/crawl/LinkDb.java
===================================================================
--- src/java/org/apache/nutch/crawl/LinkDb.java	(revision 1550730)
+++ src/java/org/apache/nutch/crawl/LinkDb.java	(working copy)
@@ -272,7 +272,6 @@
       System.err.println("\t-noFilter\tdon't apply URLFilters to link URLs");
       return -1;
     }
-    Path segDir = null;
     final FileSystem fs = FileSystem.get(getConf());
     Path db = new Path(args[0]);
     ArrayList<Path> segs = new ArrayList<Path>();
Index: src/java/org/apache/nutch/crawl/URLPartitioner.java
===================================================================
--- src/java/org/apache/nutch/crawl/URLPartitioner.java	(revision 1550730)
+++ src/java/org/apache/nutch/crawl/URLPartitioner.java	(working copy)
@@ -26,6 +26,7 @@
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
+import org.apache.nutch.crawl.Generator.SelectorEntry;
 import org.apache.nutch.net.URLNormalizers;
 import org.apache.nutch.util.URLUtil;
 
@@ -33,7 +34,7 @@
  * Partition urls by host, domain name or IP depending on the value of the
  * parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP'
  */
-public class URLPartitioner implements Partitioner<Text,Writable> {
+public class URLPartitioner implements Partitioner<Writable, SelectorEntry> {
   private static final Logger LOG = LoggerFactory.getLogger(URLPartitioner.class);
 
   public static final String PARTITION_MODE_KEY = "partition.url.mode";
@@ -42,6 +43,8 @@
   public static final String PARTITION_MODE_DOMAIN = "byDomain";
   public static final String PARTITION_MODE_IP = "byIP";
 
+  private final Text ipKey = new Text("_ip_");
+
   private int seed;
   private URLNormalizers normalizers;
   private String mode = PARTITION_MODE_HOST;
@@ -59,10 +62,21 @@
   }
 
   public void close() {}
+  
+  
+  /** Hash by domain name. */
+  public int getPartition(Writable key, SelectorEntry value, int numReduceTasks) {
+	  return getPartition(((SelectorEntry)value).url.toString(), value, numReduceTasks);
+  }
 
   /** Hash by domain name. */
-  public int getPartition(Text key, Writable value, int numReduceTasks) {
-    String urlString = key.toString();
+  public int getPartition(Text key, SelectorEntry value, int numReduceTasks) {
+	  return getPartition(key.toString(), value, numReduceTasks);
+  }
+
+  
+  /** Hash by domain name. */
+  private int getPartition(String urlString, SelectorEntry entry, int numReduceTasks) {
     URL url = null;
     int hashCode = urlString.hashCode();
     try {
@@ -77,8 +91,16 @@
         .getDomainName(url).hashCode();
     else if (mode.equals(PARTITION_MODE_IP)) {
       try {
-        InetAddress address = InetAddress.getByName(url.getHost());
-        hashCode = address.getHostAddress().hashCode();
+        String address = null;
+        Writable ipText = entry.datum.getMetaData().get(ipKey);
+        if(ipText != null && !ipText.toString().equals("")){
+        	address = ipText.toString();
+        	LOG.info("IP " + address);
+        } else {
+        	address = InetAddress.getByName(url.getHost()).getHostAddress();
+        	entry.datum.getMetaData().put(ipKey, new Text(address));
+        }                
+        hashCode = address.hashCode();
       } catch (UnknownHostException e) {
         Generator.LOG.info("Couldn't find IP for host: " + url.getHost());
       }
Index: src/java/org/apache/nutch/crawl/Generator.java
===================================================================
--- src/java/org/apache/nutch/crawl/Generator.java	(revision 1550730)
+++ src/java/org/apache/nutch/crawl/Generator.java	(working copy)
@@ -119,7 +119,7 @@
     private int segCounts[];
     private int maxCount;
     private boolean byDomain = false;
-    private Partitioner<Text,Writable> partitioner = new URLPartitioner();
+    private Partitioner<Writable,SelectorEntry> partitioner = new URLPartitioner();
     private URLFilters filters;
     private URLNormalizers normalizers;
     private ScoringFilters scfilters;
@@ -180,7 +180,7 @@
         }
       }
       CrawlDatum crawlDatum = value;
-
+      
       // check fetch schedule
       if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
         LOG.debug("-shouldFetch rejected '" + url + "', fetchTime="
@@ -224,7 +224,7 @@
 
     /** Partition by host / domain or IP. */
     public int getPartition(FloatWritable key, Writable value, int numReduceTasks) {
-      return partitioner.getPartition(((SelectorEntry) value).url, key, numReduceTasks);
+        return partitioner.getPartition(key, (SelectorEntry) value, numReduceTasks);    
     }
 
     /** Collect until limit is reached. */
Index: src/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java
===================================================================
--- src/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java	(revision 1550730)
+++ src/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java	(working copy)
@@ -18,7 +18,6 @@
 package org.apache.nutch.crawl;
 
 import java.io.BufferedReader;
-import java.io.FileReader;
 import java.io.IOException;
 import java.io.Reader;
 import java.util.HashMap;
@@ -26,7 +25,6 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.*;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.metadata.HttpHeaders;
 import org.apache.nutch.util.NutchConfiguration;
Index: src/java/org/apache/nutch/crawl/IpAddressResolver.java
===================================================================
--- src/java/org/apache/nutch/crawl/IpAddressResolver.java	(revision 0)
+++ src/java/org/apache/nutch/crawl/IpAddressResolver.java	(revision 0)
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.MapRunnable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.DNSCache;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Multithreaded IP Resolver
+ */
+public class IpAddressResolver extends Configured implements Tool,
+MapRunnable<Text, CrawlDatum, Text, CrawlDatum> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(IpAddressResolver.class);
+
+
+  public static class InputFormat extends
+  SequenceFileInputFormat<Text, CrawlDatum> {
+    /** Don't split inputs, to keep things polite. */
+    public InputSplit[] getSplits(JobConf job, int nSplits)
+        throws IOException {
+      FileStatus[] files = listStatus(job);
+      FileSplit[] splits = new FileSplit[files.length];
+      for (int i = 0; i < files.length; i++) {
+        FileStatus cur = files[i];
+        splits[i] = new FileSplit(cur.getPath(), 0, cur.getLen(),
+            (String[]) null);
+      }	
+      job.setInt("generatedSplits", splits.length);
+      return splits;
+    }
+  }
+
+
+  /**
+   * Class UnresolvedHostsCache - a cache for hosts which could not be resolved by the dns.
+   */
+  public class UnresolvedHostsCache extends LinkedHashMap<String, Integer> {
+
+    private static final long serialVersionUID = 1L;
+
+    public UnresolvedHostsCache() {
+      super(3000, 0.75F, true);
+    }
+    @Override
+    protected boolean removeEldestEntry(Entry<String, Integer> eldest) {
+      return size() > 3000;
+    }
+  }
+
+  private JobConf job;
+
+  private RecordReader<Text, CrawlDatum> input;
+  private OutputCollector<Text, CrawlDatum> output;
+  private Reporter reporter;
+
+  private int activeThreads = 0;
+  private long start = System.currentTimeMillis();
+  private long lastRequestStart;
+  private int processedUrls;
+  private long resolvedIps;
+  private long errors;
+
+  private int numOfThreads;
+
+  private final Text ipKey = new Text("_ip_");
+
+  public static final int NUM_OF_THREADS_DEFAULT = 1;	
+
+  /** Cache with unresolved hosts. */
+  private Map<String, Integer> unresolvedHosts = Collections.synchronizedMap(new UnresolvedHostsCache());
+
+  /**
+   * One cache for all threads on the current map job, which should be ok, if
+   * performing on data bundled by ip addresses. This might be a bottleneck
+   * but performs better than the bottleneck dns.
+   */
+  protected DNSCache hostIpCache = new DNSCache();
+
+  public void configure(JobConf job) {
+    this.job = job;
+  }
+
+  /** run */
+  public void run(RecordReader<Text, CrawlDatum> input,
+      OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+          throws IOException {
+    this.input = input;
+    this.output = output;
+    this.reporter = reporter;
+
+    hostIpCache.configure(this.job);
+
+    long timeout = this.job.getInt("mapred.task.timeout", 10 * 60 * 1000) / 2;
+
+    this.numOfThreads = this.job.getInt("db.lookup.ip.threads", NUM_OF_THREADS_DEFAULT);
+
+    LOG.info("Resolver: threads: " + numOfThreads);
+    for (int i = 0; i < numOfThreads; i++) {
+      new ResolveThread().start();
+    }
+
+    do {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // Empty
+      }
+      reportStatus();
+      synchronized (this) {
+        if ((System.currentTimeMillis() - lastRequestStart) > timeout) {
+          LOG.warn("Aborting with " + activeThreads + " hung threads.");
+          return;
+        }
+      }
+    } while (activeThreads > 0);
+    reportStatus(); // a final status report
+  }
+
+  /**
+   * Class ResolveThread
+   */
+  class ResolveThread extends Thread {
+    public void run() {
+      synchronized (IpAddressResolver.this) {
+        activeThreads++;
+      }
+      try {
+        Text key = new Text();
+        CrawlDatum datum = new CrawlDatum();
+        while (true) {
+          try {
+            if (!input.next(key, datum)) {
+              break;
+            }
+          } catch (IOException e) {
+            e.printStackTrace();
+            LOG.error("resolver caught:" + StringUtils.stringifyException(e));
+            break;
+          }
+
+          lastRequestStart = System.currentTimeMillis();
+
+          if (datum.getMetaData().get(ipKey) == null) {
+
+            String host = new URL(key.toString()).getHost();						
+            Integer failures = unresolvedHosts.get(host);
+
+            // After # of failures, we believe the nameserver is responding but host is bad.
+            if (failures == null || failures < 1) {
+
+              try {
+                byte[] ip = hostIpCache.resolveHost(host);
+                datum.getMetaData().put(ipKey, new Text(InetAddress.getByAddress(ip).getHostAddress()));
+                resolvedIps++;
+              } catch (Exception e) {
+                //LOG.info(StringUtils.stringifyException(e));
+                LOG.info(e.getMessage());
+                LOG.info("Could not determine IP address for host " + host);
+                errors++;
+                unresolvedHosts.put(host, null == failures ? new Integer(0) : new Integer(failures +1) );
+              }							
+            } else {
+              LOG.info("Skipping 10 times unresolved, bad IP address " + host);
+              errors++;
+            }
+          }
+          // we collect the url - datum in case to not loose the record.
+          processedUrls++;
+          output.collect(key, datum);
+        }
+      } catch (Exception e) {
+        LOG.info(StringUtils.stringifyException(e));
+      } finally {
+        synchronized (IpAddressResolver.this) {
+          activeThreads--;
+        }
+      }
+    }
+  }
+
+  private void reportStatus() throws IOException {
+    String status;
+    synchronized (this) {
+      long elapsed = (System.currentTimeMillis() - start) / 1000;
+      status = processedUrls + " processed urls, " + resolvedIps
+          + " resolved Ips, " + errors + " errors, "
+          + (resolvedIps / elapsed) + " IP/sec";
+      status += " " + hostIpCache.toString();
+    }
+    reporter.setStatus(status);
+  }
+
+
+  public void resolve(Path crawlDatumPath, Configuration config) throws IOException {
+    LOG.info("IpAddressResolver: starting");
+
+    checkConfiguration(config);
+
+    if (!config.getBoolean("db.lookup.ip.during.update", false)) {
+      LOG.info("Skipping IpAddressResolver resolve due of configuration of db.lookup.ip.during.update");
+      return;
+    }
+
+    Path newCrawlDatumPath = new Path(crawlDatumPath.getParent(), 
+        Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(config);
+    job.setJobName("ipresolver");
+
+    //		job.setInt(NUM_OF_THREADS, numOfThreads);
+    //		job.setSpeculativeExecution(false);
+    FileInputFormat.addInputPath(job, crawlDatumPath);
+    job.setInputFormat(InputFormat.class);
+
+    job.setMapRunnerClass(IpAddressResolver.class);
+
+    FileOutputFormat.setOutputPath(job, newCrawlDatumPath);
+    job.setOutputFormat(MapFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(CrawlDatum.class);
+
+    JobClient jc = new JobClient(job);
+
+    RunningJob runningJob = JobClient.runJob(job);
+    LOG.info("RunningJob id was " + runningJob.getID().toString());
+
+    // replacing new crawlDb with existing crawlDb
+    FileSystem fs = jc.getFs();
+    Path old = new Path(crawlDatumPath.getParent(), "old_crawlDatum");
+    fs.delete(old, true);
+    fs.rename(crawlDatumPath, old);
+    fs.rename(newCrawlDatumPath, crawlDatumPath);
+    fs.delete(old, true);
+    LOG.info("IpAddress Resolver: done");
+  }
+
+  /**
+   * checkConfiguration
+   */
+  private void checkConfiguration(Configuration config) {
+
+  }
+
+  /** Run the IpAddressResolver. */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new IpAddressResolver(), args);
+    System.exit(res);
+  }
+
+  /**
+   * run method
+   */
+  public int run(String[] args) throws Exception {
+
+    String usage = "Usage: IpAddressResolver <crawldb> [-threads n] [-maxParallelDns n]";
+
+    if (args.length < 1) {
+      System.err.println(usage);
+      return -1;
+    }
+
+    LOG.info("Using crawldb:" + args[0]);
+    Path crawlDb = new Path(args[0], "current");
+    Configuration config = getConf();
+
+
+    int threads = config.getInt("db.lookup.ip.threads", NUM_OF_THREADS_DEFAULT);
+    int maxParallelDns = config.getInt("db.lookup.ip.max.parallel.dns", DNSCache.NUM_OF_PARALLEL_DNS_THREADS);
+
+    for (int i = 1; i < args.length; i++) { // parse command line
+      if (args[i].equals("-threads")) {              // found -threads option
+        threads = Integer.parseInt(args[++i]);
+      } else if(args[i].equals("-maxParallelDns")) { // found -maxParallelDns option
+        maxParallelDns = Integer.parseInt(args[++i]);
+      }
+    }
+    // Overwritting values in configuration
+    config.setInt("db.lookup.ip.threads", threads);
+    config.setInt("db.lookup.ip.max.parallel.dns", maxParallelDns);
+
+    // Overwritting configured value, because we are ToolRunner'ing
+    config.setBoolean("db.lookup.ip.during.update", true);
+
+    try {
+      LOG.info("CrawlDb update: resolving IP addresses in crawldb " + crawlDb);
+      resolve(crawlDb, config);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("IpAddressResolver: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+
+}
Index: src/java/org/apache/nutch/util/DNSCache.java
===================================================================
--- src/java/org/apache/nutch/util/DNSCache.java	(revision 0)
+++ src/java/org/apache/nutch/util/DNSCache.java	(revision 0)
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
+
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class Memoizer - implements Pattern Memoizer
+ * 
+ * This is based on Java Concurrency in Practice, page 108. by  
+ * Brian Goetz, Joshua Bloch, Joseph Bowbeer and Doug Lea.
+ *
+ * @param <A>
+ * @param <V>
+ */
+public class DNSCache {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DNSCache.class);
+
+  private final ConcurrentMap<String, Future<byte[]>> cache = new ConcurrentHashMap<String, Future<byte[]>>();
+
+  private Map<String, Object> deletionCursor = null;
+
+  public static final int MAX_CACHE_SIZE_DEFAULT = 1000;
+  public static final int NUM_OF_PARALLEL_DNS_THREADS = 1;
+
+  private int maxCacheSize;
+  private int maxParallelDns;
+
+  private JobConf job;
+
+  private Semaphore sem;
+
+  private int dnsRequests = 0;
+  private int resolvedByCache = 0;
+
+  public class DeletionCursor extends LinkedHashMap<String, Object> {
+    private static final long serialVersionUID = 1L;
+    private ConcurrentMap<String, Future<byte[]>> cache = null;
+    private int maxSize;
+
+    public DeletionCursor(ConcurrentMap<String, Future<byte[]>> cache, int maxSize) {
+      super(maxSize, 0.75F, true);
+      this.cache = cache;
+      this.maxSize = maxSize;
+    }
+    @Override
+    protected boolean removeEldestEntry(Map.Entry<String, Object> eldest) {
+      boolean doDelete = size() > this.maxSize;
+      if(doDelete) {
+        this.cache.remove(eldest.getKey());
+      }
+      return doDelete;
+    }
+  }
+
+  /** configure. */
+  public void configure(JobConf job) {
+    this.job = job;
+
+    this.maxCacheSize = job.getInt("db.lookup.ip.max.cache.size", MAX_CACHE_SIZE_DEFAULT);
+    this.maxParallelDns = job.getInt("db.lookup.ip.max.parallel.dns", NUM_OF_PARALLEL_DNS_THREADS);
+    int generatedSplits = job.getInt("generatedSplits", 1);
+
+    int numMapTasks = job.getNumMapTasks();
+
+    int defaultMapsSize = 1;
+    try {
+      JobClient jc = new JobClient(job);
+      defaultMapsSize = jc.getDefaultMaps();
+    } catch(IOException ioe) {
+      LOG.error(ioe.getMessage());
+    }
+
+    int parallelDnsPerMap;
+
+    if(generatedSplits >= defaultMapsSize) {
+      parallelDnsPerMap = this.maxParallelDns / defaultMapsSize;
+      parallelDnsPerMap = parallelDnsPerMap > 0 ? parallelDnsPerMap : defaultMapsSize;
+    }
+    else if(generatedSplits >= numMapTasks ) {
+      parallelDnsPerMap = this.maxParallelDns / numMapTasks;
+      parallelDnsPerMap = parallelDnsPerMap > 0 ? parallelDnsPerMap : numMapTasks;
+    } else {
+      parallelDnsPerMap = this.maxParallelDns / generatedSplits;
+      parallelDnsPerMap = parallelDnsPerMap > 0 ? parallelDnsPerMap : 1;		
+    }
+
+    LOG.info("Configured values: maxCacheSize=" + maxCacheSize);
+    LOG.info("Configured values: maxParallelDns=" + maxParallelDns);
+    LOG.info("Hadoop defaultMapsSize=" + defaultMapsSize);
+    LOG.info("Hadoop generated splits=" + generatedSplits);
+    LOG.info("Hadoop numMapTasks=" + numMapTasks);
+    LOG.info("MaxParallelDns= " + this.maxParallelDns + ", generatedSplits=" + generatedSplits + 
+        " => parallelDnsPerMap=" + parallelDnsPerMap);
+
+    this.sem = new Semaphore(parallelDnsPerMap);
+    this.maxParallelDns = parallelDnsPerMap;
+    this.deletionCursor = Collections.synchronizedMap(new DeletionCursor(cache, maxCacheSize));
+
+  }
+
+  /** resolveHost */
+  public byte[] resolveHost(final String host) throws InterruptedException, ExecutionException  {
+    while (true) {	
+      Future<byte[]> future = cache.get(host);
+      if (future == null) {
+        Callable<byte[]> callable = new Callable<byte[]>() {
+          public byte[] call() throws InterruptedException, ExecutionException {
+            try {
+              InetAddress byName = InetAddress.getByName(host);
+              byte[] ip = byName.getAddress();
+              LOG.info(host + " " + InetAddress.getByAddress(ip));
+              return ip;
+            } catch (UnknownHostException uhe) {
+              throw new ExecutionException(uhe.getMessage(), uhe);
+            }
+          }
+        };
+        FutureTask<byte[]> futureTask = new FutureTask<byte[]>(callable);
+        future = cache.putIfAbsent(host, futureTask);
+        if (future == null) {
+          this.deletionCursor.put(host, (Object) null);
+          boolean acquired = false;
+          future = futureTask;
+          try {
+            sem.acquire();
+            acquired = true;
+            LOG.info((this.maxParallelDns - sem.availablePermits()) + " active dns treads");
+            futureTask.run();
+            dnsRequests++;						
+          } catch(InterruptedException ie) {
+            LOG.info("Waiting for semaphore interrupted. Continuing");
+            futureTask.run();
+            dnsRequests++;
+          }
+          if(acquired) {
+            sem.release();
+          }
+        }
+      }
+      try {
+        resolvedByCache++;
+        byte[] ip = future.get();
+        this.deletionCursor.get(host);
+        return ip;
+      } catch (CancellationException e) {
+        this.cache.remove(host, future);
+        this.deletionCursor.remove(host);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "DNSCache [cacheSize=" + this.cache.size() + ", resolvedByCache=" + this.resolvedByCache + ", dnsRequests=" + 
+        this.dnsRequests + "]";
+  }
+
+}
