diff -crNB apache-nutch-1.5.1/conf/nutch-default.xml apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/conf/nutch-default.xml
*** apache-nutch-1.5.1/conf/nutch-default.xml 2013-11-28 19:43:49.454932554 +0100
--- apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/conf/nutch-default.xml 2013-11-28 20:01:42.317162515 +0100
***************
*** 555,560 ****
--- 555,605 ----
+
+
+
+
+ db.lookup.ip.during.update
+ true
+ If true IP addresse of extracted URLs will be resolved and stored
+ in the crawlDatum.
+ Resolving IP addresses during update is faster than during segment generation
+ but only required when generate.max.per.host.by.ip is true.
+
+
+
+
+ db.lookup.ip.threads
+ 20
+ If db.lookup.ip.during.update is true, this gives the number
+ threads used for ip lookup. The default value is 20. Please remark, that these
+ number of threads are run for each map job and generate fast requests to your dns.
+ Be careful not to generate a denial of service attack against your dns.
+ This is only required when generate.max.per.host.by.ip is true (Tested: 200).
+
+
+
+
+ db.lookup.ip.max.cache.size
+ 30000
+ If true IP addresse of extracted URLs will be resolved and stored
+ in the crawlDatum.
+ Resolving IP addresses during update is faster than during segment generation
+ but only required when generate.max.per.host.by.ip is true (Tested: 30000).
+
+
+
+
+ db.lookup.ip.max.parallel.dns
+ 216
+ Number of allowed parallel requests to the dns per map job. To get an
+ estimate of the number of total requests to the dns you have to multiply this value
+ with the number of used map jobs (Tested: 2160).
+
+
+
+
+
diff -crNB apache-nutch-1.5.1/src/java/org/apache/nutch/crawl/CrawlDb.java apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/src/java/org/apache/nutch/crawl/CrawlDb.java
*** apache-nutch-1.5.1/src/java/org/apache/nutch/crawl/CrawlDb.java 2012-07-03 20:20:47.000000000 +0200
--- apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/src/java/org/apache/nutch/crawl/CrawlDb.java 2013-03-28 14:45:57.000000000 +0100
***************
*** 70,75 ****
--- 70,87 ----
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
+ if (getConf().getBoolean("db.lookup.ip.during.update", false)){
+ Path crawlDatumPath = null;
+ if(null != segments && segments.length > 0) {
+ crawlDatumPath = new Path(segments[0], CrawlDatum.PARSE_DIR_NAME);
+ } else {
+ crawlDatumPath = new Path(crawlDb, CURRENT_NAME);
+ }
+ LOG.info("CrawlDb update: segment resolving IP addresses working on: " + crawlDatumPath);
+ new IpAddressResolver().resolve(crawlDatumPath, getConf());
+ }
+
+
JobConf job = CrawlDb.createJob(getConf(), crawlDb);
job.setBoolean(CRAWLDB_ADDITIONS_ALLOWED, additionsAllowed);
job.setBoolean(CrawlDbFilter.URL_FILTERING, filter);
diff -crNB apache-nutch-1.5.1/src/java/org/apache/nutch/crawl/Generator.java apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/src/java/org/apache/nutch/crawl/Generator.java
*** apache-nutch-1.5.1/src/java/org/apache/nutch/crawl/Generator.java 2012-07-03 20:20:47.000000000 +0200
--- apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/src/java/org/apache/nutch/crawl/Generator.java 2013-11-28 18:38:24.066539730 +0100
***************
*** 120,126 ****
private int segCounts[];
private int maxCount;
private boolean byDomain = false;
! private Partitioner partitioner = new URLPartitioner();
private URLFilters filters;
private URLNormalizers normalizers;
private ScoringFilters scfilters;
--- 120,126 ----
private int segCounts[];
private int maxCount;
private boolean byDomain = false;
! private Partitioner partitioner = new URLPartitioner();
private URLFilters filters;
private URLNormalizers normalizers;
private ScoringFilters scfilters;
***************
*** 184,190 ****
}
}
CrawlDatum crawlDatum = value;
!
// check fetch schedule
if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
LOG.debug("-shouldFetch rejected '" + url + "', fetchTime="
--- 184,190 ----
}
}
CrawlDatum crawlDatum = value;
!
// check fetch schedule
if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
LOG.debug("-shouldFetch rejected '" + url + "', fetchTime="
***************
*** 228,234 ****
/** Partition by host / domain or IP. */
public int getPartition(FloatWritable key, Writable value, int numReduceTasks) {
! return partitioner.getPartition(((SelectorEntry) value).url, key, numReduceTasks);
}
/** Collect until limit is reached. */
--- 228,234 ----
/** Partition by host / domain or IP. */
public int getPartition(FloatWritable key, Writable value, int numReduceTasks) {
! return partitioner.getPartition(key, (SelectorEntry) value, numReduceTasks);
}
/** Collect until limit is reached. */
diff -crNB apache-nutch-1.5.1/src/java/org/apache/nutch/crawl/IpAddressResolver.java apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/src/java/org/apache/nutch/crawl/IpAddressResolver.java
*** apache-nutch-1.5.1/src/java/org/apache/nutch/crawl/IpAddressResolver.java 1970-01-01 01:00:00.000000000 +0100
--- apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/src/java/org/apache/nutch/crawl/IpAddressResolver.java 2013-11-28 16:55:06.485737476 +0100
***************
*** 0 ****
--- 1,344 ----
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nutch.crawl;
+
+ import java.io.IOException;
+ import java.net.InetAddress;
+ import java.net.URL;
+ import java.util.Collections;
+ import java.util.LinkedHashMap;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.Random;
+
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.conf.Configured;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.BytesWritable;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapred.FileInputFormat;
+ import org.apache.hadoop.mapred.FileOutputFormat;
+ import org.apache.hadoop.mapred.FileSplit;
+ import org.apache.hadoop.mapred.InputSplit;
+ import org.apache.hadoop.mapred.JobClient;
+ import org.apache.hadoop.mapred.JobConf;
+ import org.apache.hadoop.mapred.MapFileOutputFormat;
+ import org.apache.hadoop.mapred.MapRunnable;
+ import org.apache.hadoop.mapred.OutputCollector;
+ import org.apache.hadoop.mapred.RecordReader;
+ import org.apache.hadoop.mapred.Reporter;
+ import org.apache.hadoop.mapred.RunningJob;
+ import org.apache.hadoop.mapred.SequenceFileInputFormat;
+ import org.apache.hadoop.util.StringUtils;
+ import org.apache.hadoop.util.Tool;
+ import org.apache.hadoop.util.ToolRunner;
+ import org.apache.nutch.util.DNSCache;
+ import org.apache.nutch.util.NutchConfiguration;
+ import org.apache.nutch.util.NutchJob;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ /**
+ * Multithreaded IP Resolver
+ */
+ public class IpAddressResolver extends Configured implements Tool,
+ MapRunnable {
+
+ public static final Logger LOG = LoggerFactory.getLogger(IpAddressResolver.class);
+
+
+ public static class InputFormat extends
+ SequenceFileInputFormat {
+ /** Don't split inputs, to keep things polite. */
+ public InputSplit[] getSplits(JobConf job, int nSplits)
+ throws IOException {
+ FileStatus[] files = listStatus(job);
+ FileSplit[] splits = new FileSplit[files.length];
+ for (int i = 0; i < files.length; i++) {
+ FileStatus cur = files[i];
+ splits[i] = new FileSplit(cur.getPath(), 0, cur.getLen(),
+ (String[]) null);
+ }
+ job.setInt("generatedSplits", splits.length);
+ return splits;
+ }
+ }
+
+
+ /**
+ * Class UnresolvedHostsCache - a cache for hosts which could not be resolved by the dns.
+ */
+ public class UnresolvedHostsCache extends LinkedHashMap {
+
+ private static final long serialVersionUID = 1L;
+
+ public UnresolvedHostsCache() {
+ super(3000, 0.75F, true);
+ }
+ @Override
+ protected boolean removeEldestEntry(Entry eldest) {
+ return size() > 3000;
+ }
+ }
+
+ private JobConf job;
+
+ private RecordReader input;
+ private OutputCollector output;
+ private Reporter reporter;
+
+ private int activeThreads = 0;
+ private long start = System.currentTimeMillis();
+ private long lastRequestStart;
+ private int processedUrls;
+ private long resolvedIps;
+ private long errors;
+
+ private int numOfThreads;
+
+ private final Text ipKey = new Text("_ip_");
+
+ public static final int NUM_OF_THREADS_DEFAULT = 1;
+
+ /** Cache with unresolved hosts. */
+ private Map unresolvedHosts = Collections.synchronizedMap(new UnresolvedHostsCache());
+
+ /**
+ * One cache for all threads on the current map job, which should be ok, if
+ * performing on data bundled by ip addresses. This might be a bottleneck
+ * but performs better than the bottleneck dns.
+ */
+ protected DNSCache hostIpCache = new DNSCache();
+
+ public void configure(JobConf job) {
+ this.job = job;
+ }
+
+ /** run */
+ public void run(RecordReader input,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+ this.input = input;
+ this.output = output;
+ this.reporter = reporter;
+
+ hostIpCache.configure(this.job);
+
+ long timeout = this.job.getInt("mapred.task.timeout", 10 * 60 * 1000) / 2;
+
+ this.numOfThreads = this.job.getInt("db.lookup.ip.threads", NUM_OF_THREADS_DEFAULT);
+
+ LOG.info("Resolver: threads: " + numOfThreads);
+ for (int i = 0; i < numOfThreads; i++) {
+ new ResolveThread().start();
+ }
+
+ do {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Empty
+ }
+ reportStatus();
+ synchronized (this) {
+ if ((System.currentTimeMillis() - lastRequestStart) > timeout) {
+ LOG.warn("Aborting with " + activeThreads + " hung threads.");
+ return;
+ }
+ }
+ } while (activeThreads > 0);
+ reportStatus(); // a final status report
+ }
+
+ /**
+ * Class ResolveThread
+ */
+ class ResolveThread extends Thread {
+ public void run() {
+ synchronized (IpAddressResolver.this) {
+ activeThreads++;
+ }
+ try {
+ Text key = new Text();
+ CrawlDatum datum = new CrawlDatum();
+ while (true) {
+ try {
+ if (!input.next(key, datum)) {
+ break;
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ LOG.error("resolver caught:" + StringUtils.stringifyException(e));
+ break;
+ }
+
+ lastRequestStart = System.currentTimeMillis();
+
+ if (datum.getMetaData().get(ipKey) == null) {
+
+ String host = new URL(key.toString()).getHost();
+ Integer failures = unresolvedHosts.get(host);
+
+ // After # of failures, we believe the nameserver is responding but host is bad.
+ if (failures == null || failures < 1) {
+
+ try {
+ byte[] ip = hostIpCache.resolveHost(host);
+ datum.getMetaData().put(ipKey, new Text(InetAddress.getByAddress(ip).getHostAddress()));
+ resolvedIps++;
+ } catch (Exception e) {
+ //LOG.info(StringUtils.stringifyException(e));
+ LOG.info(e.getMessage());
+ LOG.info("Could not determine IP address for host " + host);
+ errors++;
+ unresolvedHosts.put(host, null == failures ? new Integer(0) : new Integer(failures +1) );
+ }
+ } else {
+ LOG.info("Skipping 10 times unresolved, bad IP address " + host);
+ errors++;
+ }
+ }
+ // we collect the url - datum in case to not loose the record.
+ processedUrls++;
+ output.collect(key, datum);
+ }
+ } catch (Exception e) {
+ LOG.info(StringUtils.stringifyException(e));
+ } finally {
+ synchronized (IpAddressResolver.this) {
+ activeThreads--;
+ }
+ }
+ }
+ }
+
+ private void reportStatus() throws IOException {
+ String status;
+ synchronized (this) {
+ long elapsed = (System.currentTimeMillis() - start) / 1000;
+ status = processedUrls + " processed urls, " + resolvedIps
+ + " resolved Ips, " + errors + " errors, "
+ + (resolvedIps / elapsed) + " IP/sec";
+ status += " " + hostIpCache.toString();
+ }
+ reporter.setStatus(status);
+ }
+
+
+ public void resolve(Path crawlDatumPath, Configuration config) throws IOException {
+ LOG.info("IpAddressResolver: starting");
+
+ checkConfiguration(config);
+
+ if (!config.getBoolean("db.lookup.ip.during.update", false)) {
+ LOG.info("Skipping IpAddressResolver resolve because of configuration of db.lookup.ip.during.update");
+ return;
+ }
+
+ Path newCrawlDatumPath = new Path(crawlDatumPath.getParent(),
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf job = new NutchJob(config);
+ job.setJobName("ipresolver");
+
+ // job.setInt(NUM_OF_THREADS, numOfThreads);
+ // job.setSpeculativeExecution(false);
+ FileInputFormat.addInputPath(job, crawlDatumPath);
+ job.setInputFormat(InputFormat.class);
+
+ job.setMapRunnerClass(IpAddressResolver.class);
+
+ FileOutputFormat.setOutputPath(job, newCrawlDatumPath);
+ job.setOutputFormat(MapFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(CrawlDatum.class);
+
+ JobClient jc = new JobClient(job);
+
+ RunningJob runningJob = JobClient.runJob(job);
+ LOG.info("RunningJob id was " + runningJob.getID().toString());
+
+ // replacing new crawlDb with existing crawlDb
+ FileSystem fs = jc.getFs();
+ Path old = new Path(crawlDatumPath.getParent(), "old_crawlDatum");
+ fs.delete(old, true);
+ fs.rename(crawlDatumPath, old);
+ fs.rename(newCrawlDatumPath, crawlDatumPath);
+ fs.delete(old, true);
+ LOG.info("IpAddress Resolver: done");
+ }
+
+ /**
+ * checkConfiguration
+ */
+ private void checkConfiguration(Configuration config) {
+
+ }
+
+ /** Run the IpAddressResolver. */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new IpAddressResolver(), args);
+ System.exit(res);
+ }
+
+ /**
+ * run method
+ */
+ public int run(String[] args) throws Exception {
+
+ String usage = "Usage: IpAddressResolver [-threads n] [-maxParallelDns n]";
+
+ if (args.length < 1) {
+ System.err.println(usage);
+ return -1;
+ }
+
+ LOG.info("Using crawldb:" + args[0]);
+ Path crawlDb = new Path(args[0], "current");
+ Configuration config = getConf();
+
+
+ int threads = config.getInt("db.lookup.ip.threads", NUM_OF_THREADS_DEFAULT);
+ int maxParallelDns = config.getInt("db.lookup.ip.max.parallel.dns", DNSCache.NUM_OF_PARALLEL_DNS_THREADS);
+
+ for (int i = 1; i < args.length; i++) { // parse command line
+ if (args[i].equals("-threads")) { // found -threads option
+ threads = Integer.parseInt(args[++i]);
+ } else if(args[i].equals("-maxParallelDns")) { // found -maxParallelDns option
+ maxParallelDns = Integer.parseInt(args[++i]);
+ }
+ }
+ // Overwritting values in configuration
+ config.setInt("db.lookup.ip.threads", threads);
+ config.setInt("db.lookup.ip.max.parallel.dns", maxParallelDns);
+
+ // Overwritting configured value, because we are ToolRunner'ing
+ config.setBoolean("db.lookup.ip.during.update", true);
+
+ try {
+ LOG.info("CrawlDb update: resolving IP addresses in crawldb " + crawlDb);
+ resolve(crawlDb, config);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("IpAddressResolver: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+
+ }
diff -crNB apache-nutch-1.5.1/src/java/org/apache/nutch/crawl/URLPartitioner.java apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/src/java/org/apache/nutch/crawl/URLPartitioner.java
*** apache-nutch-1.5.1/src/java/org/apache/nutch/crawl/URLPartitioner.java 2012-07-03 20:20:48.000000000 +0200
--- apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/src/java/org/apache/nutch/crawl/URLPartitioner.java 2013-02-13 17:21:31.000000000 +0100
***************
*** 26,31 ****
--- 26,32 ----
import org.slf4j.LoggerFactory;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
+ import org.apache.nutch.crawl.Generator.SelectorEntry;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.util.URLUtil;
***************
*** 33,39 ****
* Partition urls by host, domain name or IP depending on the value of the
* parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP'
*/
! public class URLPartitioner implements Partitioner {
private static final Logger LOG = LoggerFactory.getLogger(URLPartitioner.class);
public static final String PARTITION_MODE_KEY = "partition.url.mode";
--- 34,40 ----
* Partition urls by host, domain name or IP depending on the value of the
* parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP'
*/
! public class URLPartitioner implements Partitioner {
private static final Logger LOG = LoggerFactory.getLogger(URLPartitioner.class);
public static final String PARTITION_MODE_KEY = "partition.url.mode";
***************
*** 42,47 ****
--- 43,50 ----
public static final String PARTITION_MODE_DOMAIN = "byDomain";
public static final String PARTITION_MODE_IP = "byIP";
+ private final Text ipKey = new Text("_ip_");
+
private int seed;
private URLNormalizers normalizers;
private String mode = PARTITION_MODE_HOST;
***************
*** 59,68 ****
}
public void close() {}
/** Hash by domain name. */
! public int getPartition(Text key, Writable value, int numReduceTasks) {
! String urlString = key.toString();
URL url = null;
int hashCode = urlString.hashCode();
try {
--- 62,82 ----
}
public void close() {}
+
+
+ /** Hash by domain name. */
+ public int getPartition(Writable key, SelectorEntry value, int numReduceTasks) {
+ return getPartition(((SelectorEntry)value).url.toString(), value, numReduceTasks);
+ }
+
+ /** Hash by domain name. */
+ public int getPartition(Text key, SelectorEntry value, int numReduceTasks) {
+ return getPartition(key.toString(), value, numReduceTasks);
+ }
+
/** Hash by domain name. */
! private int getPartition(String urlString, SelectorEntry entry, int numReduceTasks) {
URL url = null;
int hashCode = urlString.hashCode();
try {
***************
*** 77,84 ****
.getDomainName(url).hashCode();
else if (mode.equals(PARTITION_MODE_IP)) {
try {
! InetAddress address = InetAddress.getByName(url.getHost());
! hashCode = address.getHostAddress().hashCode();
} catch (UnknownHostException e) {
Generator.LOG.info("Couldn't find IP for host: " + url.getHost());
}
--- 91,106 ----
.getDomainName(url).hashCode();
else if (mode.equals(PARTITION_MODE_IP)) {
try {
! String address = null;
! Writable ipText = entry.datum.getMetaData().get(ipKey);
! if(ipText != null && !ipText.toString().equals("")){
! address = ipText.toString();
! LOG.info("IP " + address);
! } else {
! address = InetAddress.getByName(url.getHost()).getHostAddress();
! entry.datum.getMetaData().put(ipKey, new Text(address));
! }
! hashCode = address.hashCode();
} catch (UnknownHostException e) {
Generator.LOG.info("Couldn't find IP for host: " + url.getHost());
}
diff -crNB apache-nutch-1.5.1/src/java/org/apache/nutch/util/DNSCache.java apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/src/java/org/apache/nutch/util/DNSCache.java
*** apache-nutch-1.5.1/src/java/org/apache/nutch/util/DNSCache.java 1970-01-01 01:00:00.000000000 +0100
--- apache-nutch-1.5.1-neo-patched_workaround_live_201311281323/src/java/org/apache/nutch/util/DNSCache.java 2013-11-28 16:55:54.182896833 +0100
***************
*** 0 ****
--- 1,194 ----
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nutch.util;
+
+ import java.io.IOException;
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.Collections;
+ import java.util.LinkedHashMap;
+ import java.util.Map;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.CancellationException;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.FutureTask;
+ import java.util.concurrent.Semaphore;
+
+ import org.apache.hadoop.conf.Configured;
+ import org.apache.hadoop.mapred.JobClient;
+ import org.apache.hadoop.mapred.JobConf;
+ import org.apache.nutch.crawl.IpAddressResolver;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ /**
+ * Class Memoizer - implements Pattern Memoizer
+ *
+ * This is based on Java Concurrency in Practice, page 108. by
+ * Brian Goetz, Joshua Bloch, Joseph Bowbeer and Doug Lea.
+ *
+ * @param
+ * @param
+ */
+ public class DNSCache {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DNSCache.class);
+
+ private final ConcurrentMap> cache = new ConcurrentHashMap>();
+
+ private Map deletionCursor = null;
+
+ public static final int MAX_CACHE_SIZE_DEFAULT = 1000;
+ public static final int NUM_OF_PARALLEL_DNS_THREADS = 1;
+
+ private int maxCacheSize;
+ private int maxParallelDns;
+
+ private JobConf job;
+
+ private Semaphore sem;
+
+ private int dnsRequests = 0;
+ private int resolvedByCache = 0;
+
+ public class DeletionCursor extends LinkedHashMap {
+ private static final long serialVersionUID = 1L;
+ private ConcurrentMap> cache = null;
+ private int maxSize;
+
+ public DeletionCursor(ConcurrentMap> cache, int maxSize) {
+ super(maxSize, 0.75F, true);
+ this.cache = cache;
+ this.maxSize = maxSize;
+ }
+ @Override
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ boolean doDelete = size() > this.maxSize;
+ if(doDelete) {
+ this.cache.remove(eldest.getKey());
+ }
+ return doDelete;
+ }
+ }
+
+ /** configure. */
+ public void configure(JobConf job) {
+ this.job = job;
+
+ this.maxCacheSize = job.getInt("db.lookup.ip.max.cache.size", MAX_CACHE_SIZE_DEFAULT);
+ this.maxParallelDns = job.getInt("db.lookup.ip.max.parallel.dns", NUM_OF_PARALLEL_DNS_THREADS);
+ int generatedSplits = job.getInt("generatedSplits", 1);
+
+ int numMapTasks = job.getNumMapTasks();
+
+ int defaultMapsSize = 1;
+ try {
+ JobClient jc = new JobClient(job);
+ defaultMapsSize = jc.getDefaultMaps();
+ } catch(IOException ioe) {
+ LOG.error(ioe.getMessage());
+ }
+
+ int parallelDnsPerMap;
+
+ if(generatedSplits >= defaultMapsSize) {
+ parallelDnsPerMap = this.maxParallelDns / defaultMapsSize;
+ parallelDnsPerMap = parallelDnsPerMap > 0 ? parallelDnsPerMap : defaultMapsSize;
+ }
+ else if(generatedSplits >= numMapTasks ) {
+ parallelDnsPerMap = this.maxParallelDns / numMapTasks;
+ parallelDnsPerMap = parallelDnsPerMap > 0 ? parallelDnsPerMap : numMapTasks;
+ } else {
+ parallelDnsPerMap = this.maxParallelDns / generatedSplits;
+ parallelDnsPerMap = parallelDnsPerMap > 0 ? parallelDnsPerMap : 1;
+ }
+
+ LOG.info("Configured values: maxCacheSize=" + maxCacheSize);
+ LOG.info("Configured values: maxParallelDns=" + maxParallelDns);
+ LOG.info("Hadoop defaultMapsSize=" + defaultMapsSize);
+ LOG.info("Hadoop generated splits=" + generatedSplits);
+ LOG.info("Hadoop numMapTasks=" + numMapTasks);
+ LOG.info("MaxParallelDns= " + this.maxParallelDns + ", generatedSplits=" + generatedSplits +
+ " => parallelDnsPerMap=" + parallelDnsPerMap);
+
+ this.sem = new Semaphore(parallelDnsPerMap);
+ this.maxParallelDns = parallelDnsPerMap;
+ this.deletionCursor = Collections.synchronizedMap(new DeletionCursor(cache, maxCacheSize));
+
+ }
+
+ /** resolveHost */
+ public byte[] resolveHost(final String host) throws InterruptedException, ExecutionException {
+ while (true) {
+ Future future = cache.get(host);
+ if (future == null) {
+ Callable callable = new Callable() {
+ public byte[] call() throws InterruptedException, ExecutionException {
+ try {
+ InetAddress byName = InetAddress.getByName(host);
+ byte[] ip = byName.getAddress();
+ LOG.info(host + " " + InetAddress.getByAddress(ip));
+ return ip;
+ } catch (UnknownHostException uhe) {
+ throw new ExecutionException(uhe.getMessage(), uhe);
+ }
+ }
+ };
+ FutureTask futureTask = new FutureTask(callable);
+ future = cache.putIfAbsent(host, futureTask);
+ if (future == null) {
+ this.deletionCursor.put(host, (Object) null);
+ boolean acquired = false;
+ future = futureTask;
+ try {
+ sem.acquire();
+ acquired = true;
+ LOG.info((this.maxParallelDns - sem.availablePermits()) + " active dns treads");
+ futureTask.run();
+ dnsRequests++;
+ } catch(InterruptedException ie) {
+ LOG.info("Waiting for semaphore interrupted. Continuing");
+ futureTask.run();
+ dnsRequests++;
+ }
+ if(acquired) {
+ sem.release();
+ }
+ }
+ }
+ try {
+ resolvedByCache++;
+ byte[] ip = future.get();
+ this.deletionCursor.get(host);
+ return ip;
+ } catch (CancellationException e) {
+ this.cache.remove(host, future);
+ this.deletionCursor.remove(host);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DNSCache [cacheSize=" + this.cache.size() + ", resolvedByCache=" + this.resolvedByCache + ", dnsRequests=" +
+ this.dnsRequests + "]";
+ }
+
+ }