diff --git conf/nutch-default.xml conf/nutch-default.xml
index bf1189a..066c3e3 100644
--- conf/nutch-default.xml
+++ conf/nutch-default.xml
@@ -1896,4 +1896,45 @@ CAUTION: Set the parser.timeout to -1 or a bigger value than 30, when using this
+
+
+ publisher.queue.type
+
+
+ Choose the type of Queue being used (ex - RabbitMQ, ActiveMq, Kafka, etc).
+ Currently there exists an implemtation for RabbitMQ producer.
+
+
+
+
+
+ rabbitmq.exchange.server
+
+
+ Name for the exchange server to use. Default - "fetcher_log"
+
+
+
+ rabbitmq.exchange.type
+
+
+ There are a few exchange types available: direct, topic, headers and fanout. Default "fanout".
+
+
+
+ rabbitmq.host
+
+
+ Host on which the RabbitMQ server is running. Default "localhost".
+
+
+
+ rabbitmq.queue.routingkey
+
+
+ The routingKey used by publisher to publish messages to specific queues. If the exchange type is "fanout", then this property is ignored.
+
+
+
+
diff --git ivy/ivy.xml ivy/ivy.xml
index 9428fe0..f44c4d4 100644
--- ivy/ivy.xml
+++ ivy/ivy.xml
@@ -114,6 +114,9 @@
+
+
+
diff --git src/java/org/apache/nutch/fetcher/FetcherThread.java src/java/org/apache/nutch/fetcher/FetcherThread.java
index 9a482b9..38aa3c1 100644
--- src/java/org/apache/nutch/fetcher/FetcherThread.java
+++ src/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.NutchWritable;
import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.fetcher.FetcherThreadEvent.PublishEventType;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.net.URLFilterException;
@@ -128,6 +129,9 @@ public class FetcherThread extends Thread {
//Used by the REST service
private FetchNode fetchNode;
private boolean reportToNutchServer;
+
+ //Used for publishing events
+ private FetcherThreadPublisher publisher;
public FetcherThread(Configuration conf, AtomicInteger activeThreads, FetchItemQueues fetchQueues,
QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong lastRequestStart, Reporter reporter,
@@ -156,6 +160,7 @@ public class FetcherThread extends Thread {
this.storingContent = storingContent;
this.pages = pages;
this.bytes = bytes;
+ this.publisher = new FetcherThreadPublisher(conf);
queueMode = conf.get("fetcher.queue.mode",
FetchItemQueues.QUEUE_MODE_HOST);
// check that the mode is known
@@ -239,6 +244,15 @@ public class FetcherThread extends Thread {
// fetch the page
redirecting = false;
redirectCount = 0;
+
+ //Publisher event
+ FetcherThreadEvent startEvent = new FetcherThreadEvent();
+ startEvent.setEventType(PublishEventType.START);
+ startEvent.setUrl(fit.getUrl().toString());
+ startEvent.setTimestamp(System.currentTimeMillis());
+ publisher.publish(startEvent);
+
+
do {
if (LOG.isInfoEnabled()) {
LOG.info("fetching " + fit.url + " (queue crawl delay="
@@ -303,7 +317,15 @@ public class FetcherThread extends Thread {
fetchNode.setFetchTime(System.currentTimeMillis());
fetchNode.setUrl(fit.url);
}
-
+
+ //Publish fetch finish event
+ FetcherThreadEvent endEvent = new FetcherThreadEvent();
+ endEvent.setEventType(PublishEventType.END);
+ endEvent.setUrl(fit.getUrl().toString());
+ endEvent.setTimestamp(System.currentTimeMillis());
+ endEvent.addEventData("status", status.getName());
+ publisher.publish(endEvent);
+
reporter.incrCounter("FetcherStatus", status.getName(), 1);
switch (status.getCode()) {
@@ -655,7 +677,21 @@ public class FetcherThread extends Thread {
outlinkList.add(links[i]);
outlinks.add(toUrl);
}
-
+
+ //Publish fetch report event
+ FetcherThreadEvent reportEvent = new FetcherThreadEvent();
+ reportEvent.setEventType(PublishEventType.REPORT);
+ reportEvent.setUrl(url.toString());
+ reportEvent.setTimestamp(System.currentTimeMillis());
+ reportEvent.addOutlinksToEventData(outlinkList);
+ reportEvent.addEventData("title", parseData.getTitle());
+ reportEvent.addEventData("content-type", parseData.getContentMeta().get("content-type"));
+ reportEvent.addEventData("score", datum.getScore());
+ reportEvent.addEventData("fetchTime", datum.getFetchTime());
+ reportEvent.addEventData("content-language", parseData.getContentMeta().get("content-language"));
+ System.out.println(parseData.getContentMeta().names());
+ publisher.publish(reportEvent);
+
// Only process depth N outlinks
if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
reporter.incrCounter("FetcherOutlinks", "outlinks_detected",
diff --git src/java/org/apache/nutch/fetcher/FetcherThreadEvent.java src/java/org/apache/nutch/fetcher/FetcherThreadEvent.java
new file mode 100644
index 0000000..646f6f8
--- /dev/null
+++ src/java/org/apache/nutch/fetcher/FetcherThreadEvent.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nutch.parse.Outlink;
+
+/**
+ * This class is used to capture the various events occurring at fetch time. These events are sent to a queue
+ * implementing the producer
+ * @author Sujen Shah
+ *
+ */
+@SuppressWarnings("serial")
+public class FetcherThreadEvent implements Serializable{
+
+ public static enum PublishEventType {START, END, REPORT}
+
+ private PublishEventType eventType;
+ private Map eventData;
+ private String url;
+ private Long timestamp;
+
+ public PublishEventType getEventType() {
+ return eventType;
+ }
+ public void setEventType(PublishEventType eventType) {
+ this.eventType = eventType;
+ }
+ public Map getEventData() {
+ return eventData;
+ }
+ public void setEventData(Map eventData) {
+ this.eventData = eventData;
+ }
+ public String getUrl() {
+ return url;
+ }
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public void addEventData(String key, Object value) {
+ if(eventData == null) {
+ eventData = new HashMap();
+ }
+ eventData.put(key, value);
+ }
+
+ public void addOutlinksToEventData(Collection links) {
+ ArrayList