Index: conf/nutch-default.xml
===================================================================
--- conf/nutch-default.xml (revision 375657)
+++ conf/nutch-default.xml (working copy)
@@ -288,7 +288,8 @@
10
The number of FetcherThreads the fetcher should use.
This is also determines the maximum number of requests that are
- made at once (each FetcherThread handles one connection).
+ made at once (each FetcherThread handles one connection). When
+ using bandwidth limiting this is the starting thread count.
@@ -299,6 +300,23 @@
+ fetcher.target.bandwidth
+ -1
+ The desired overall fetcher bandwidth usage in kb/sec, or -1 to disable.
+ Each fetcher task will ratelimit to fetcher.target.bandwidth / mapred.reduce.tasks by
+ increasing or decreasing the thread count up to fetcher.threads.maximum.
+
+
+
+
+ fetcher.threads.maximum
+ 80
+ Maximum number of threads used by one task. Additional threads above fetcher.threads.fetch
+ may be created in order to reach the desired bandwidth but the total number of threads should not
+ exceed the maximum number of threads
+
+
+
fetcher.verbose
false
If true, fetcher will log more verbosely.
Index: src/java/org/apache/nutch/fetcher/Fetcher.java
===================================================================
--- src/java/org/apache/nutch/fetcher/Fetcher.java (revision 375657)
+++ src/java/org/apache/nutch/fetcher/Fetcher.java (working copy)
@@ -33,6 +33,8 @@
import org.apache.nutch.util.*;
import java.util.logging.*;
+import java.util.ArrayList;
+import java.util.List;
/** The fetcher. Most of the work is done by plugins. */
public class Fetcher extends Configured implements MapRunnable {
@@ -80,6 +82,7 @@
private URLFilters urlFilters;
private ParseUtil parseUtil;
private ProtocolFactory protocolFactory;
+ private boolean halted;
public FetcherThread(Configuration conf) {
this.setDaemon(true); // don't hang JVM on exit
@@ -102,7 +105,7 @@
break; // exit
try { // get next entry from input
- if (!input.next(key, datum)) {
+ if (!input.next(key, datum) || isHalted()) {
break; // at eof, exit
}
} catch (IOException e) {
@@ -250,7 +253,14 @@
LOG.severe("fetcher caught:"+e.toString());
}
}
-
+
+ public synchronized void setHalted(boolean halted) {
+ this.halted = halted;
+ }
+
+ public synchronized boolean isHalted() {
+ return halted;
+ }
}
public Fetcher() { super(null); }
@@ -266,14 +276,57 @@
String status;
synchronized (this) {
long elapsed = (System.currentTimeMillis() - start)/1000;
- status =
- pages+" pages, "+errors+" errors, "
+ status =
+ pages+" pages, " + errors + " errors, " + activeThreads + " threads, "
+ Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, "
+ Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, ";
}
reporter.setStatus(status);
}
+ private void adjustNumberOfThreads(int targetBandwidth, int maxNumberOfThreads, List threads) throws IOException {
+ int bandwidth;
+ synchronized (this) {
+ long elapsed = (System.currentTimeMillis() - start)/1000;
+ bandwidth = Math.round(((((float)bytes)*8)/1024)/elapsed);
+ }
+ if (activeThreads > 0) {
+ int averageBdwPerThread = Math.round(bandwidth/activeThreads);
+ if (targetBandwidth > bandwidth) {
+ if (averageBdwPerThread > 0) {
+ int remainingBdw = targetBandwidth - bandwidth;
+ // Use hanf
+ int additionalThreads = Math.round(remainingBdw/(averageBdwPerThread / 2));
+ int availableThreads = maxNumberOfThreads - activeThreads;
+
+ // determine the number of available threads (min between availableThreads and additionalThreads)
+ additionalThreads = (availableThreads < additionalThreads ? availableThreads:additionalThreads);
+ for (int i = 0; i < additionalThreads; i++) { // spawn threads
+ FetcherThread thread = new FetcherThread(getConf());
+ thread.start();
+ threads.add(thread);
+ }
+ }
+ } else {
+ // if the bandwidth we're using is greater then the expected bandwidth, we have to stop some threads
+ if (averageBdwPerThread > 0) {
+ int excessBdw = bandwidth - targetBandwidth;
+ int excessThreads = Math.round(excessBdw/averageBdwPerThread);
+
+ for (int i = 0; i < excessThreads; i++) { // halt threads
+ // make sure we keep at least one thread active
+ if (threads.size() >= 1) {
+ FetcherThread thread = (FetcherThread)threads.get(0);
+ thread.setHalted(true);
+ threads.remove(0);
+ thread = null;
+ }
+ }
+ }
+ }
+ }
+ }
+
public void configure(JobConf job) {
setConf(job);
@@ -301,25 +354,41 @@
this.output = output;
this.reporter = reporter;
+ // array of active threads
+ List threads = new ArrayList();
+
this.maxRedirect = getConf().getInt("http.redirect.max", 3);
int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
- LOG.info("Fetcher: threads: " + threadCount);
+ LOG.info("Fetcher: threads start: " + threadCount);
+ int maxNumberOfThreads = getConf().getInt("fetcher.threads.maximum", threadCount);
+ LOG.info("Fetcher: threads maximum: " + threadCount);
+ int desiredBandwidthPerTask = getConf().getInt("fetcher.task.bandwidth", -1);
+
for (int i = 0; i < threadCount; i++) { // spawn threads
- new FetcherThread(getConf()).start();
+ FetcherThread thread = new FetcherThread(getConf());
+ thread.start();
+ threads.add(thread);
}
// select a timeout that avoids a task timeout
long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
+ int threadRefreshCounter = 0;
do { // wait for threads to exit
try {
Thread.sleep(1000);
+ threadRefreshCounter++;
} catch (InterruptedException e) {}
reportStatus();
+ // adjust the number of threads after every 10 seconds
+ if (desiredBandwidthPerTask > -1 && threadRefreshCounter%10 == 0 && activeThreads > 0) {
+ adjustNumberOfThreads(desiredBandwidthPerTask, maxNumberOfThreads, threads);
+ }
+
// some requests seem to hang, despite all intentions
synchronized (this) {
if ((System.currentTimeMillis() - lastRequestStart) > timeout) {
@@ -341,6 +410,14 @@
JobConf job = new JobConf(getConf());
job.setInt("fetcher.threads.fetch", threads);
+
+ int desiredBandwidthPerTask = -1;
+ int desiredBandwidth = getConf().getInt("fetcher.target.bandwidth", -1);
+ if (desiredBandwidth > -1) {
+ desiredBandwidthPerTask = Math.round(desiredBandwidth/job.getNumReduceTasks());
+ LOG.info("Fetcher: task bandwidth target: " + desiredBandwidthPerTask);
+ }
+ job.setInt("fetcher.task.bandwidth", desiredBandwidthPerTask);
job.set(SEGMENT_NAME_KEY, segment.getName());
job.setBoolean("fetcher.parse", parsing);
@@ -360,7 +437,6 @@
LOG.info("Fetcher: done");
}
-
/** Run the fetcher. */
public static void main(String[] args) throws Exception {
@@ -383,7 +459,12 @@
threads = Integer.parseInt(args[++i]);
} else if (args[i].equals("-noParsing")) parsing = false;
}
-
+ int maxNumberOfThreads = conf.getInt("fetcher.threads.maximum", threads);
+ if (maxNumberOfThreads < threads) {
+ System.err.println("Initial number of threads greater then maximum number of threads");
+ System.exit(-1);
+ }
+
conf.setInt("fetcher.threads.fetch", threads);
if (!parsing) {
conf.setBoolean("fetcher.parse", parsing);
@@ -393,4 +474,5 @@
fetcher.fetch(segment, threads, parsing); // run the Fetcher
}
+
}