From e7eeb570caa18e5f8c09b20e47b5a9cb0c2083cf Mon Sep 17 00:00:00 2001
From: kaveh <kaveh@2locos.com>
Date: Thu, 11 Sep 2014 15:00:19 -0700
Subject: [PATCH] adding support for sharding indexer for solr

---
 conf/nutch-default.xml                             |  22 ++
 src/plugin/build.xml                               |   2 +
 src/plugin/indexer-solrshard/build.xml             |  22 ++
 src/plugin/indexer-solrshard/ivy.xml               |  43 ++++
 src/plugin/indexer-solrshard/plugin.xml            |  54 ++++
 .../indexwriter/solrshard/SolrMappingReader.java   | 143 +++++++++++
 .../indexwriter/solrshard/SolrShardConstants.java  |  42 ++++
 .../solrshard/SolrShardIndexWriter.java            | 271 +++++++++++++++++++++
 .../indexwriter/solrshard/SolrShardUtils.java      |  90 +++++++
 .../nutch/indexwriter/solrshard/package-info.java  |  21 ++
 10 files changed, 710 insertions(+)
 create mode 100644 src/plugin/indexer-solrshard/build.xml
 create mode 100644 src/plugin/indexer-solrshard/ivy.xml
 create mode 100644 src/plugin/indexer-solrshard/plugin.xml
 create mode 100644 src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrMappingReader.java
 create mode 100644 src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardConstants.java
 create mode 100644 src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardIndexWriter.java
 create mode 100644 src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardUtils.java
 create mode 100644 src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/package-info.java

diff --git conf/nutch-default.xml conf/nutch-default.xml
index 908354d..223c18a 100644
--- conf/nutch-default.xml
+++ conf/nutch-default.xml
@@ -1476,6 +1476,28 @@
   </description>
 </property>
 
+<!-- solrshard index properties -->
+
+<property>
+  <name>solr.shardkey</name>
+  <value>id</value>
+  <description>
+  the field that would be used to generate the hash value for sharding.
+  this should the the same field as uniqueKey in the shcema file, otherwise
+  the delete operation will not work correctly. 
+  </description>
+</property>
+
+<!--
+<property>
+  <name>solr.server.urls</name>
+  <value></value>
+  <description>
+  a comma seperated list of core urls that you want to use 
+  </description>
+</property>
+-->
+
 <!-- Elasticsearch properties -->
 
 <property>
diff --git src/plugin/build.xml src/plugin/build.xml
index 46b834b..71f27c7 100755
--- src/plugin/build.xml
+++ src/plugin/build.xml
@@ -37,6 +37,7 @@
      <ant dir="indexer-dummy" target="deploy"/>
      <ant dir="indexer-elastic" target="deploy"/>
      <ant dir="indexer-solr" target="deploy"/>
+     <ant dir="indexer-solrshard" target="deploy"/>
      <ant dir="language-identifier" target="deploy"/>
      <ant dir="lib-http" target="deploy"/>
      <ant dir="lib-nekohtml" target="deploy"/>
@@ -127,6 +128,7 @@
     <ant dir="indexer-dummy" target="clean"/>
     <ant dir="indexer-elastic" target="clean"/>
     <ant dir="indexer-solr" target="clean"/>
+    <ant dir="indexer-solrshard" target="clean"/>
     <ant dir="language-identifier" target="clean"/>
     <!-- <ant dir="lib-commons-httpclient" target="clean"/> -->
     <ant dir="lib-http" target="clean"/>
diff --git src/plugin/indexer-solrshard/build.xml src/plugin/indexer-solrshard/build.xml
new file mode 100644
index 0000000..7f71950
--- /dev/null
+++ src/plugin/indexer-solrshard/build.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-solrshard" default="jar-core">
+
+  <import file="../build-plugin.xml" />
+
+</project>
diff --git src/plugin/indexer-solrshard/ivy.xml src/plugin/indexer-solrshard/ivy.xml
new file mode 100644
index 0000000..259e7c3
--- /dev/null
+++ src/plugin/indexer-solrshard/ivy.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<ivy-module version="1.0">
+  <info organisation="org.apache.nutch" module="${ant.project.name}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/>
+    <description>
+        Apache Nutch
+    </description>
+  </info>
+
+  <configurations>
+    <include file="../../..//ivy/ivy-configurations.xml"/>
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+
+  <dependencies>
+   <dependency org="org.apache.solr" name="solr-solrj" rev="3.4.0"
+		conf="*->default"/>
+  </dependencies>
+  
+</ivy-module>
diff --git src/plugin/indexer-solrshard/plugin.xml src/plugin/indexer-solrshard/plugin.xml
new file mode 100644
index 0000000..ffaed29
--- /dev/null
+++ src/plugin/indexer-solrshard/plugin.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<plugin id="indexer-solrshard" name="SOLRShardIndexWriter" version="1.0.0"
+  provider-name="nutch.apache.org">
+
+  <runtime>
+    <library name="indexer-solrshard.jar">
+      <export name="*" />
+    </library>
+
+     <library name="activation-1.1.jar"/>
+     <library name="commons-codec-1.4.jar"/>
+     <library name="commons-httpclient-3.1.jar"/>
+     <library name="commons-io-1.4.jar"/>
+     <library name="commons-logging-1.1.1.jar"/>
+     <library name="geronimo-stax-api_1.0_spec-1.0.1.jar"/>
+     <library name="jline-0.9.1.jar"/>
+     <library name="log4j-1.2.15.jar"/>
+     <library name="lucene-core-3.4.0.jar"/>
+     <library name="mail-1.4.1.jar"/>
+     <!-- library name="slf4j-api-1.6.1.jar"/ -->
+     <library name="solr-solrj-3.4.0.jar"/>
+     <library name="stax-api-1.0.1.jar"/>
+     <library name="wstx-asl-3.2.7.jar"/>
+     <library name="zookeeper-3.3.1.jar"/>
+  </runtime>
+
+  <requires>
+    <import plugin="nutch-extensionpoints" />
+  </requires>
+
+  <extension id="org.apache.nutch.indexer.solrshard"
+    name="SOLR Shard Index Writer"
+    point="org.apache.nutch.indexer.IndexWriter">
+    <implementation id="SOLRShardIndexWriter"
+      class="org.apache.nutch.indexwriter.solrshard.SolrShardIndexWriter" />
+  </extension>
+
+</plugin>
diff --git src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrMappingReader.java src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrMappingReader.java
new file mode 100644
index 0000000..1ea91ab
--- /dev/null
+++ src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrMappingReader.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.solrshard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.util.ObjectCache;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+public class SolrMappingReader {
+  public static Logger LOG = LoggerFactory.getLogger(SolrMappingReader.class);
+  
+  private Configuration conf;
+  
+  private Map<String, String> keyMap = new HashMap<String, String>();
+  private Map<String, String> copyMap = new HashMap<String, String>();
+  private String uniqueKey = "id";
+  
+  public static synchronized SolrMappingReader getInstance(Configuration conf) {
+    ObjectCache cache = ObjectCache.get(conf);
+    SolrMappingReader instance = (SolrMappingReader)cache.getObject(SolrMappingReader.class.getName());
+    if (instance == null) {
+      instance = new SolrMappingReader(conf);
+      cache.setObject(SolrMappingReader.class.getName(), instance);
+    }
+    return instance;
+  }
+
+  protected SolrMappingReader(Configuration conf) {
+    this.conf = conf;
+    parseMapping();
+  }
+
+  private void parseMapping() {    
+    InputStream ssInputStream = null;
+    ssInputStream = conf.getConfResourceAsInputStream(conf.get(SolrShardConstants.MAPPING_FILE, "solrindex-mapping.xml"));
+
+    InputSource inputSource = new InputSource(ssInputStream);
+    try {
+      DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+      DocumentBuilder builder = factory.newDocumentBuilder();
+      Document document = builder.parse(inputSource);
+      Element rootElement = document.getDocumentElement();
+      NodeList fieldList = rootElement.getElementsByTagName("field");
+      if (fieldList.getLength() > 0) {
+        for (int i = 0; i < fieldList.getLength(); i++) {
+          Element element = (Element) fieldList.item(i);
+          LOG.info("source: " + element.getAttribute("source") + " dest: " + element.getAttribute("dest"));
+          keyMap.put(element.getAttribute("source"), element.getAttribute("dest"));
+        }
+      }
+      NodeList copyFieldList = rootElement.getElementsByTagName("copyField");
+      if (copyFieldList.getLength() > 0) {
+        for (int i = 0; i < copyFieldList.getLength(); i++) {
+          Element element = (Element) copyFieldList.item(i);
+          LOG.info("source: " + element.getAttribute("source") + " dest: " + element.getAttribute("dest"));
+          copyMap.put(element.getAttribute("source"), element.getAttribute("dest"));
+        }
+      }
+      NodeList uniqueKeyItem = rootElement.getElementsByTagName("uniqueKey");
+      if (uniqueKeyItem.getLength() > 1) {
+        LOG.warn("More than one unique key definitions found in solr index mapping, using default 'id'");
+        uniqueKey = "id";
+      }
+      else if (uniqueKeyItem.getLength() == 0) {
+        LOG.warn("No unique key definition found in solr index mapping using, default 'id'");
+      }
+      else{
+    	  uniqueKey = uniqueKeyItem.item(0).getFirstChild().getNodeValue();
+      }
+    } catch (MalformedURLException e) {
+        LOG.warn(e.toString());
+    } catch (SAXException e) {
+        LOG.warn(e.toString());
+    } catch (IOException e) {
+    	LOG.warn(e.toString());
+    } catch (ParserConfigurationException e) {
+    	LOG.warn(e.toString());
+    } 
+  }
+	  
+  public Map<String, String> getKeyMap() {
+    return keyMap;
+  }
+	  
+  public Map<String, String> getCopyMap() {
+    return copyMap;
+  }
+	  
+  public String getUniqueKey() {
+    return uniqueKey;
+  }
+
+  public String hasCopy(String key) {
+    if (copyMap.containsKey(key)) {
+      key = (String) copyMap.get(key);
+    }
+    return key;
+  }
+
+  public String mapKey(String key) throws IOException {
+    if(keyMap.containsKey(key)) {
+      key = (String) keyMap.get(key);
+    }
+    return key;
+  }
+
+  public String mapCopyKey(String key) throws IOException {
+    if(copyMap.containsKey(key)) {
+      key = (String) copyMap.get(key);
+    }
+    return key;
+  }
+}
diff --git src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardConstants.java src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardConstants.java
new file mode 100644
index 0000000..2201aba
--- /dev/null
+++ src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardConstants.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.solrshard;
+
+public interface SolrShardConstants {
+	public static final String SOLR_PREFIX = "solr.";
+
+  public static final String SERVER_URLS = SOLR_PREFIX + "server.urls";
+
+  public static final String COMMIT_SIZE = SOLR_PREFIX + "commit.size";
+
+  public static final String MAPPING_FILE = SOLR_PREFIX + "mapping.file";
+
+  public static final String USE_AUTH = SOLR_PREFIX + "auth";
+
+  public static final String USERNAME = SOLR_PREFIX + "auth.username";
+
+  public static final String PASSWORD = SOLR_PREFIX + "auth.password";
+	
+	public static final String SHARD_KEY = SOLR_PREFIX + "shardkey";
+  
+  @Deprecated
+  public static final String COMMIT_INDEX = SOLR_PREFIX + "commit.index";
+  
+  @Deprecated
+  public static final String PARAMS = SOLR_PREFIX + "params";
+
+}
diff --git src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardIndexWriter.java src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardIndexWriter.java
new file mode 100644
index 0000000..0b8131d
--- /dev/null
+++ src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardIndexWriter.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.solrshard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.IndexerMapReduce;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.NutchField;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SolrShardIndexWriter implements IndexWriter {
+
+    public static final Logger LOG = LoggerFactory
+            .getLogger(SolrShardIndexWriter.class);
+
+    private List<SolrServer> solrs;
+    private SolrMappingReader solrMapping;
+    private ModifiableSolrParams params;
+
+    private Configuration config;
+
+    private final List<List<SolrInputDocument>> inputDocs = new ArrayList<List<SolrInputDocument>>();
+    
+    private String shardKeyName;
+
+    private int batchSize;
+    private int numDeletes = 0;
+    private boolean delete = false;
+
+    public void open(JobConf job, String name) throws IOException {
+    	List<SolrServer> servers = SolrShardUtils.getCommonsHttpSolrServers(job);
+        init(servers, job);
+    }
+
+    // package protected for tests
+    void init(List<SolrServer> servers, JobConf job) throws IOException {
+        solrs = servers;
+        for( int i =0 ; i < solrs.size(); ++i ) {
+        	inputDocs.add(new ArrayList<SolrInputDocument>());
+        }
+        
+        shardKeyName = job.get(SolrShardConstants.SHARD_KEY, "id");
+        batchSize = job.getInt(SolrShardConstants.COMMIT_SIZE, 1000);
+        solrMapping = SolrMappingReader.getInstance(job);
+        delete = job.getBoolean(IndexerMapReduce.INDEXER_DELETE, false);
+        // parse optional params
+        params = new ModifiableSolrParams();
+        String paramString = job.get(IndexerMapReduce.INDEXER_PARAMS);
+        if (paramString != null) {
+            String[] values = paramString.split("&");
+            for (String v : values) {
+                String[] kv = v.split("=");
+                if (kv.length < 2) {
+                    continue;
+                }
+                params.add(kv[0], kv[1]);
+            }
+        }
+    }
+
+    public void delete(String key) throws IOException {
+        if (delete) {
+            try {
+        		int hashCode = key.hashCode();
+        		int index = (hashCode & Integer.MAX_VALUE) % solrs.size();
+      		
+        		SolrServer solr = solrs.get(index);
+        		
+        		
+                solr.deleteById(key);
+                numDeletes++;
+            } catch (final SolrServerException e) {
+                throw makeIOException(e);
+            }
+        }
+    }
+
+    @Override
+    public void update(NutchDocument doc) throws IOException {
+        write(doc);
+    }
+
+    public void write(NutchDocument doc) throws IOException {
+
+		String key = (String)doc.getFieldValue(shardKeyName); //explicit cast by km
+		int hashCode = key.hashCode();
+		int index = (hashCode & Integer.MAX_VALUE) % solrs.size();
+		
+        final SolrInputDocument inputDoc = new SolrInputDocument();
+        for (final Entry<String, NutchField> e : doc) {
+            for (final Object val : e.getValue().getValues()) {
+                // normalise the string representation for a Date
+                Object val2 = val;
+
+                if (val instanceof Date) {
+                    val2 = DateUtil.getThreadLocalDateFormat().format(val);
+                }
+
+                if (e.getKey().equals("content") || e.getKey().equals("title")) {
+                    val2 = SolrShardUtils.stripNonCharCodepoints((String) val);
+                }
+
+                inputDoc.addField(solrMapping.mapKey(e.getKey()), val2, e
+                        .getValue().getWeight());
+                String sCopy = solrMapping.mapCopyKey(e.getKey());
+                if (sCopy != e.getKey()) {
+                    inputDoc.addField(sCopy, val);
+                }
+            }
+        }
+
+        inputDoc.setDocumentBoost(doc.getWeight());
+        
+		List<SolrInputDocument> inputDocList = inputDocs.get(index);
+		SolrServer solr = solrs.get(index);
+		//Log.info("doc:\n"+inputDoc);
+        inputDocList.add(inputDoc);
+        
+    	int totalDocs = 0;
+    	for (List<SolrInputDocument> sublist : inputDocs)
+    	{
+    	    // TODO: Null checking
+    	    totalDocs += sublist.size();
+    	}
+    	
+        if (totalDocs + numDeletes >= batchSize) {
+            try {
+                LOG.info("Indexing " + totalDocs
+                        + " documents");
+                LOG.info("Deleting " + Integer.toString(numDeletes)
+                        + " documents");
+                numDeletes = 0;
+                
+      			for(int i = 0; i < inputDocs.size(); i++) {
+    				List<SolrInputDocument> inputDocLst = inputDocs.get(i);
+    				if (!inputDocLst.isEmpty()) {
+    					SolrServer solr2 = solrs.get(i);  
+    					
+        			 	UpdateRequest req = new UpdateRequest();
+        			 	req.add(inputDocLst);
+        			 	req.setParams(params);
+        			 	req.process(solr2);
+    				}
+ 
+    			}
+                
+                
+            } catch (final SolrServerException e) {
+                throw makeIOException(e);
+            }
+            inputDocs.clear();
+        }
+    }
+
+    public void close() throws IOException {
+        try {
+            if (!inputDocs.isEmpty()) {
+            	int total = 0;
+            	for (List<SolrInputDocument> sublist : inputDocs)
+            	{
+            	    // TODO: Null checking
+            	    total += sublist.size();
+            	}
+            	
+                LOG.info("Indexing " + total
+                        + " documents");
+                if (numDeletes > 0) {
+                    LOG.info("Deleting " + Integer.toString(numDeletes)
+                            + " documents");
+                }
+                
+    			for(int i = 0; i < inputDocs.size(); i++) {
+    				List<SolrInputDocument> inputDocList = inputDocs.get(i);
+    				if (!inputDocList.isEmpty()) {
+    					SolrServer solr = solrs.get(i);  
+    					
+        			 	UpdateRequest req = new UpdateRequest();
+        			 	req.add(inputDocList);
+        			 	req.setParams(params);
+        			 	req.process(solr);
+        			 	inputDocList.clear();
+    				}
+ 
+    			}
+               
+            }
+        } catch (final SolrServerException e) {
+            throw makeIOException(e);
+        }
+    }
+
+    @Override
+    public void commit() throws IOException {
+        try {
+        	for (SolrServer solr : solrs) {
+        		solr.commit();
+        	}
+
+       	
+        } catch (SolrServerException e) {
+            throw makeIOException(e);
+        }
+    }
+
+    public static IOException makeIOException(SolrServerException e) {
+        final IOException ioe = new IOException();
+        ioe.initCause(e);
+        return ioe;
+    }
+
+    @Override
+    public Configuration getConf() {
+        return config;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        config = conf;
+        String serverURL = conf.get(SolrShardConstants.SERVER_URLS);
+        if (serverURL == null) {
+            String message = "Missing SOLR URL. Should be set via -D "
+                    + SolrShardConstants.SERVER_URLS;
+            message+="\n"+describe();
+            LOG.error(message);
+            throw new RuntimeException(message);
+        }
+    }
+
+    public String describe(){
+    	StringBuffer sb = new StringBuffer("SOLRIndexWriter\n");
+    	sb.append("\t").append(SolrShardConstants.SERVER_URLS).append(" : URLs of the SOLR instances seperated by comma (mandatory)\n");
+    	sb.append("\t").append(SolrShardConstants.SHARD_KEY).append(" : name of the field that should be used for sharding (default 'id')\n");
+    	sb.append("\t").append(SolrShardConstants.COMMIT_SIZE).append(" : buffer size when sending to SOLR (default 1000)\n");
+    	sb.append("\t").append(SolrShardConstants.MAPPING_FILE).append(" : name of the mapping file for fields (default solrindex-mapping.xml)\n");
+    	sb.append("\t").append(SolrShardConstants.USE_AUTH).append(" : use authentication (default false)\n");
+    	sb.append("\t").append(SolrShardConstants.USERNAME).append(" : username for authentication\n");
+    	sb.append("\t").append(SolrShardConstants.PASSWORD).append(" : password for authentication\n");
+    	return sb.toString();
+    }
+    
+}
diff --git src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardUtils.java src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardUtils.java
new file mode 100644
index 0000000..0ee2b29
--- /dev/null
+++ src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/SolrShardUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.solrshard;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.auth.AuthScope;
+import org.apache.commons.httpclient.UsernamePasswordCredentials;
+import org.apache.commons.httpclient.params.HttpClientParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.SolrServer;
+
+
+import java.util.*;
+import java.net.MalformedURLException;
+
+public class SolrShardUtils {
+
+  public static Logger LOG = LoggerFactory.getLogger(SolrShardUtils.class);
+
+  public static List<SolrServer> getCommonsHttpSolrServers(JobConf job) throws MalformedURLException {
+    HttpClient client=new HttpClient();
+
+    // Check for username/password
+    if (job.getBoolean(SolrShardConstants.USE_AUTH, false)) {
+      String username = job.get(SolrShardConstants.USERNAME);
+
+      LOG.info("Authenticating as: " + username);
+
+      AuthScope scope = new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM, AuthScope.ANY_SCHEME);
+
+      client.getState().setCredentials(scope, new UsernamePasswordCredentials(username, job.get(SolrShardConstants.PASSWORD)));
+
+      HttpClientParams params = client.getParams();
+      params.setAuthenticationPreemptive(true);
+
+      client.setParams(params);
+    }
+
+    String serverURLs = job.get(SolrShardConstants.SERVER_URLS);
+    String [] coreURLs = serverURLs.split( ",");
+    
+    List<SolrServer> results = new ArrayList<SolrServer>();
+    
+    
+    for( String url : coreURLs ) {
+    	results.add( new CommonsHttpSolrServer(url, client) );    	
+    }
+    
+    return results;
+  }
+  
+  public static String stripNonCharCodepoints(String input) {
+	    StringBuilder retval = new StringBuilder();
+	    char ch;
+
+	    for (int i = 0; i < input.length(); i++) {
+	      ch = input.charAt(i);
+
+	      // Strip all non-characters http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]
+	      // and non-printable control characters except tabulator, new line and carriage return
+	      if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step 0x10000
+	          ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range
+	          (ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef
+	          (ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {
+
+	        retval.append(ch);
+	      }
+	    }
+
+	    return retval.toString();
+	  }
+  
+}
diff --git src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/package-info.java src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/package-info.java
new file mode 100644
index 0000000..442066f
--- /dev/null
+++ src/plugin/indexer-solrshard/src/java/org/apache/nutch/indexwriter/solrshard/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Index writer plugin for <a href="http://lucene.apache.org/solr/">Apache Solr</a>.
+ */
+package org.apache.nutch.indexwriter.solrshard;
-- 
1.9.3

