Index: src/test/org/apache/nutch/indexer/TestIndexingFilters.java
===================================================================
--- src/test/org/apache/nutch/indexer/TestIndexingFilters.java	(revision 558683)
+++ src/test/org/apache/nutch/indexer/TestIndexingFilters.java	(working copy)
@@ -43,7 +43,7 @@
     conf.set(IndexingFilters.INDEXINGFILTER_ORDER, class1 + " " + class2);
 
     IndexingFilters filters = new IndexingFilters(conf);
-    filters.filter(new Document(), new ParseImpl("text", new ParseData(
+    filters.filter(new NutchDocument(), new ParseImpl("text", new ParseData(
         new ParseStatus(), "title", new Outlink[0], new Metadata())), new Text(
         "http://www.example.com/"), new CrawlDatum(), new Inlinks());
   }
Index: src/java/org/apache/nutch/indexer/solr/SolrWriter.java
===================================================================
--- src/java/org/apache/nutch/indexer/solr/SolrWriter.java	(revision 0)
+++ src/java/org/apache/nutch/indexer/solr/SolrWriter.java	(revision 0)
@@ -0,0 +1,35 @@
+package org.apache.nutch.indexer.solr;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.NutchIndexWriter;
+
+public class SolrWriter implements NutchIndexWriter {
+  
+  private SolrClient client;
+
+  @Override
+  public void close() throws IOException {
+    client.commitAndOptimize(true);
+  }
+
+  @Override
+  public void open(Configuration conf)
+  throws IOException {
+    String solrServer = conf.get(SolrConstants.SERVER_URL);
+    if (solrServer == null) {
+      throw new IOException("Solr backend is enabled but " +
+                            "Solr server's URL is not given");
+    } else {
+      client = new SolrClient(solrServer);
+    }
+  }
+
+  @Override
+  public void write(NutchDocument doc) throws IOException {
+    client.addDocument(doc);
+  }
+
+}
Index: src/java/org/apache/nutch/indexer/solr/SolrConstants.java
===================================================================
--- src/java/org/apache/nutch/indexer/solr/SolrConstants.java	(revision 0)
+++ src/java/org/apache/nutch/indexer/solr/SolrConstants.java	(revision 0)
@@ -0,0 +1,8 @@
+package org.apache.nutch.indexer.solr;
+
+public interface SolrConstants {
+  public static final String SOLR_PREFIX = "solr.";
+  
+  public static final String SERVER_URL = SOLR_PREFIX + "server.url";
+  
+}
Index: src/java/org/apache/nutch/indexer/solr/SolrClient.java
===================================================================
--- src/java/org/apache/nutch/indexer/solr/SolrClient.java	(revision 0)
+++ src/java/org/apache/nutch/indexer/solr/SolrClient.java	(revision 0)
@@ -0,0 +1,157 @@
+package org.apache.nutch.indexer.solr;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.nutch.indexer.NutchDocument;
+
+public class SolrClient {
+
+  public static final Log LOG = LogFactory.getLog(SolrClient.class);
+
+  /** maximum number of documents to buffer before sending them to solr. */
+  private static final int MAX_BUFFERED_DOCS = 1000;
+
+  private static final int INITIAL_FIELD_BUFFER_CAPACITY = 65536;
+
+  private static final int INITIAL_DOC_BUFFER_CAPACITY = MAX_BUFFERED_DOCS * 65536;
+
+  private StringBuilder docBuffer;
+
+  private int numBufferedDocs;
+
+  private String solrUpdateUrl;
+
+  private HttpClient httpClient;
+
+  public SolrClient(String solrServer) {
+    if (solrServer.endsWith("/")) {
+      this.solrUpdateUrl = solrServer + "update";
+    } else {
+      this.solrUpdateUrl = solrServer + "/update";
+    }
+
+    httpClient = new HttpClient();
+    initDocBuffer();
+  }
+
+  /** Adds single document to index. */
+  @SuppressWarnings("unchecked")
+  public void addDocument(NutchDocument doc) throws IOException {
+    docBuffer.append("<doc boost=\"" + doc.getScore() + "\">");
+    Iterator<Map.Entry<String, List<String>>> iterator = doc.fieldIterator();
+    while (iterator.hasNext()) {
+      Map.Entry<String, List<String>> entry = iterator.next();
+      List<String> values = entry.getValue();
+      for (String value : values) {
+        addField(entry.getKey(), value);
+      }
+    }
+    docBuffer.append("</doc>");
+
+    if (++numBufferedDocs >= MAX_BUFFERED_DOCS) {
+      sendDocuments();
+    }
+  }
+
+  /**
+   * Commit changes and optimize. Note: Solr will not serve newly added/removed
+   * documents until commit unless autoCommit=true.
+   * 
+   * @param block If true, this method will block until optimize is complete.
+   */
+  public void commitAndOptimize(boolean block) throws IOException {
+    if (numBufferedDocs > 0) {
+      sendDocuments();
+    }
+    sendToSolrServer("<commit/>");
+    sendToSolrServer("<optimize waitFlush=\"" + block + "\" " + 
+                     "waitSearcher=\"" + block + "\"/>");
+  }
+
+  private void initDocBuffer() {
+    docBuffer = new StringBuilder(INITIAL_DOC_BUFFER_CAPACITY);
+    docBuffer.append("<add>");
+    numBufferedDocs = 0;
+  }
+
+  private static boolean isLegalXml(final char c) {
+    return c == 0x9 || c == 0xa || c == 0xd || (c >= 0x20 && c <= 0xd7ff)
+        || (c >= 0xe000 && c <= 0xfffd) || (c >= 0x10000 && c <= 0x10ffff);
+  }
+
+  private void addField(String name, String value) 
+  throws UnsupportedEncodingException {
+    docBuffer.append("<field name=\"");
+    docBuffer.append(name);
+    docBuffer.append("\">");
+    StringBuilder valBuffer = new StringBuilder(INITIAL_FIELD_BUFFER_CAPACITY);
+    int valLen = value.length();
+    // escape illegal xml characters
+    for (int i = 0; i < valLen; i++) {
+      char ch = value.charAt(i);
+      switch (ch) {
+      case '"':
+        valBuffer.append("&quot;");
+      case '\'':
+        valBuffer.append("&apos;");
+        break;
+      case '&':
+        valBuffer.append("&amp;");
+        break;
+      case '<':
+        valBuffer.append("&lt;");
+        break;
+      case '>':
+        valBuffer.append("&gt;");
+        break;
+      default:
+        // skip illegal characters we can't handle
+        if (isLegalXml(ch)) {
+          valBuffer.append(ch);
+        }
+      }
+    }
+    docBuffer.append(valBuffer.toString());
+    docBuffer.append("</field>");
+  }
+
+  private void sendDocuments() throws IOException {
+    if (numBufferedDocs == 0) {
+      return;
+    }
+    docBuffer.append("</add>");
+    sendToSolrServer(docBuffer.toString());
+    initDocBuffer();
+  }
+
+  private void sendToSolrServer(String data) throws IOException {
+    PostMethod post = new PostMethod(solrUpdateUrl);
+    post.setRequestEntity(new StringRequestEntity(data));
+    try {
+      int code = httpClient.executeMethod(post);
+      // httpclient enforces reading response before sending another request
+      byte[] response = post.getResponseBody();
+      if (code != 200) {
+        throw new IOException("Http response code is " + code + ", not 200. "
+            + "Response:" + new String(response));
+      }
+      post.releaseConnection();
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      IOException ioe = new IOException();
+      ioe.initCause(e);
+      throw ioe;
+    }
+
+  }
+}
Index: src/java/org/apache/nutch/indexer/NutchIndexWriter.java
===================================================================
--- src/java/org/apache/nutch/indexer/NutchIndexWriter.java	(revision 0)
+++ src/java/org/apache/nutch/indexer/NutchIndexWriter.java	(revision 0)
@@ -0,0 +1,14 @@
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface NutchIndexWriter {
+  public void open(Configuration conf) throws IOException; 
+  
+  public void write(NutchDocument doc) throws IOException;
+  
+  public void close() throws IOException;
+  
+}
Index: src/java/org/apache/nutch/indexer/IndexingFilter.java
===================================================================
--- src/java/org/apache/nutch/indexer/IndexingFilter.java	(revision 558683)
+++ src/java/org/apache/nutch/indexer/IndexingFilter.java	(working copy)
@@ -17,11 +17,9 @@
 
 package org.apache.nutch.indexer;
 
-// Lucene imports
-import org.apache.lucene.document.Document;
-
 // Hadoop imports
 import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 
 // Nutch imports
@@ -52,6 +50,15 @@
    * should be discarded)
    * @throws IndexingException
    */
-  Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
+  NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
     throws IndexingException;
+  
+  /** Adds index-level configuraition options. 
+   * Implementations can update given configuration to pass document-independent
+   * information to indexing backends. As a rule of thumb, prefix meta keys
+   * with the name of the backend intended. For example, when 
+   * passing information to lucene backend, prefix keys with "lucene.".
+   * @param conf Configuration instance.
+   * */
+  public void addIndexBackendOptions(Configuration conf);
 }
Index: src/java/org/apache/nutch/indexer/NutchDocument.java
===================================================================
--- src/java/org/apache/nutch/indexer/NutchDocument.java	(revision 0)
+++ src/java/org/apache/nutch/indexer/NutchDocument.java	(revision 0)
@@ -0,0 +1,125 @@
+package org.apache.nutch.indexer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.nutch.metadata.Metadata;
+
+/** A {@link NutchDocument} is the unit of indexing.*/
+public class NutchDocument implements Writable {
+  
+  public static final byte VERSION = 1;
+  
+  private Map<String, List<String>> fields;
+  
+  private Metadata documentMeta;
+  
+  private float score;
+  
+  public NutchDocument() {
+    fields = new HashMap<String, List<String>>();
+    documentMeta = new Metadata();
+    score = 0.0f;
+  }
+  
+  public void add(String name, String value) {
+    List<String> fieldValues = fields.get(name);
+    if (fieldValues == null) {
+      fieldValues = new ArrayList<String>();
+    }
+    fieldValues.add(value);
+    fields.put(name, fieldValues);
+  }
+  
+  private void addFieldUnprotected(String name, String value) {
+    fields.get(name).add(value);
+  }
+  
+  public String getFieldValue(String name) {
+    List<String> fieldValues = fields.get(name);
+    if (fieldValues == null) {
+      return null;
+    }
+    if (fieldValues.size() == 0) {
+      return null;
+    }
+    return fieldValues.get(0);
+  }
+  
+  public List<String> getFieldValues(String name) {
+    return fields.get(name);
+  }
+  
+  public List<String> removeField(String name) {
+    return fields.remove(name);
+  }
+  
+  public Collection<String> getFieldNames() {
+    return fields.keySet();
+  }
+  
+  /** Iterate over all fields. */
+  public Iterator<Map.Entry<String, List<String>>> fieldIterator() {
+    return fields.entrySet().iterator();
+  }
+  
+  public float getScore() {
+    return score;
+  }
+
+  public void setScore(float score) {
+    this.score = score;
+  }
+  
+  public Metadata getDocumentMeta() {
+    return documentMeta;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte version = in.readByte();
+    if (version != VERSION) {
+      throw new VersionMismatchException(VERSION, version);
+    }
+    int size = WritableUtils.readVInt(in);
+    for (int i = 0; i < size; i++) {
+      String name = Text.readString(in);
+      int numValues = WritableUtils.readVInt(in);
+      fields.put(name, new ArrayList<String>());
+      for (int j = 0; j < numValues; j++) {
+        String value = Text.readString(in);
+        addFieldUnprotected(name, value);
+      }
+    }
+    score = in.readFloat();
+    documentMeta.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeByte(VERSION);
+    WritableUtils.writeVInt(out, fields.size());
+    for (Map.Entry<String, List<String>> entry : fields.entrySet()) {
+      Text.writeString(out, entry.getKey());
+      List<String> values = entry.getValue();
+      WritableUtils.writeVInt(out, values.size());
+      for (String value : values) {
+        Text.writeString(out, value);
+      }
+    }
+    out.writeFloat(score);
+    documentMeta.write(out);
+  }
+
+}
Index: src/java/org/apache/nutch/indexer/lucene/LuceneWriter.java
===================================================================
--- src/java/org/apache/nutch/indexer/lucene/LuceneWriter.java	(revision 0)
+++ src/java/org/apache/nutch/indexer/lucene/LuceneWriter.java	(revision 0)
@@ -0,0 +1,295 @@
+package org.apache.nutch.indexer.lucene;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.nutch.analysis.AnalyzerFactory;
+import org.apache.nutch.analysis.NutchAnalyzer;
+import org.apache.nutch.analysis.NutchDocumentAnalyzer;
+import org.apache.nutch.indexer.Indexer;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.NutchIndexWriter;
+import org.apache.nutch.indexer.NutchSimilarity;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.util.LogUtil;
+
+public class LuceneWriter implements NutchIndexWriter {
+  
+  public static enum STORE { YES, NO, COMPRESS }
+
+  public static enum INDEX { NO, NO_NORMS, TOKENIZED, UNTOKENIZED }
+
+  public static enum VECTOR { NO, OFFSET, POS, POS_OFFSET, YES }
+
+  private IndexWriter writer;
+
+  private AnalyzerFactory analyzerFactory;
+
+  private Path perm;
+
+  private Path temp;
+
+  private FileSystem fs;
+  
+  private Map<String, Field.Store> fieldStore;
+  
+  private Map<String, Field.Index> fieldIndex;
+  
+  private Map<String, Field.TermVector> fieldVector;
+  
+  public LuceneWriter() {
+    fieldStore = new HashMap<String, Field.Store>();
+    fieldIndex = new HashMap<String, Field.Index>();
+    fieldVector = new HashMap<String, Field.TermVector>();
+  }
+
+  private Document createLuceneDoc(NutchDocument doc) {
+    Document out = new Document();
+    
+    out.setBoost(doc.getScore());
+    
+    Iterator<Map.Entry<String, List<String>>> iterator = doc.fieldIterator();
+    Metadata documentMeta = doc.getDocumentMeta();
+    while (iterator.hasNext()) {
+      Map.Entry<String, List<String>> entry = iterator.next();
+      String fieldName = entry.getKey();
+      
+      Field.Store store = fieldStore.get(fieldName);
+      Field.Index index = fieldIndex.get(fieldName);
+      Field.TermVector vector = fieldVector.get(fieldName);
+
+      // default values
+      if (store == null) {
+        store = Field.Store.NO;
+      }
+      
+      if (index == null) {
+        index = Field.Index.NO;
+      }
+      
+      if (vector == null) {
+        vector = Field.TermVector.NO;
+      }
+      
+      // read document-level field information
+      String[] fieldMetas = 
+        documentMeta.getValues(LuceneConstants.FIELD_PREFIX + fieldName);
+      if (fieldMetas.length != 0) {
+        for (String val : fieldMetas) {
+          if (LuceneConstants.STORE_YES.equals(val)) {
+            store = Field.Store.YES;
+          } else if (LuceneConstants.STORE_NO.equals(val)) {
+            store = Field.Store.NO;
+          } else if (LuceneConstants.INDEX_TOKENIZED.equals(val)) {
+            index = Field.Index.TOKENIZED;
+          } else if (LuceneConstants.INDEX_NO.equals(val)) {
+            index = Field.Index.NO;
+          } else if (LuceneConstants.INDEX_UNTOKENIZED.equals(val)) {
+            index = Field.Index.UN_TOKENIZED;
+          } else if (LuceneConstants.INDEX_NO_NORMS.equals(val)) {
+            index = Field.Index.NO_NORMS;
+          } else if (LuceneConstants.VECTOR_NO.equals(val)) {
+            vector = Field.TermVector.NO;
+          } else if (LuceneConstants.VECTOR_YES.equals(val)) {
+            vector = Field.TermVector.YES;
+          } else if (LuceneConstants.VECTOR_POS.equals(val)) {
+            vector = Field.TermVector.WITH_POSITIONS;
+          } else if (LuceneConstants.VECTOR_POS_OFFSET.equals(val)) {
+            vector = Field.TermVector.WITH_POSITIONS_OFFSETS;
+          } else if (LuceneConstants.VECTOR_OFFSET.equals(val)) {
+            vector = Field.TermVector.WITH_OFFSETS;
+          }
+        }
+      }
+
+      for (String fieldValue : entry.getValue()) {
+        out.add(new Field(fieldName, fieldValue, store, index, vector));
+      }
+    }
+    
+    return out;
+  }
+  
+  @SuppressWarnings("unchecked")
+  private void processOptions(Configuration conf) {
+    Iterator iterator = conf.entries();
+    while (iterator.hasNext()) {
+      String key = (String) ((Map.Entry)iterator.next()).getKey();
+      if (!key.startsWith(LuceneConstants.LUCENE_PREFIX)) {
+        continue;
+      }
+      if (key.startsWith(LuceneConstants.FIELD_STORE_PREFIX)) {
+        String field = 
+          key.substring(LuceneConstants.FIELD_STORE_PREFIX.length());
+        LuceneWriter.STORE store = LuceneWriter.STORE.valueOf(conf.get(key));
+        switch (store) {
+        case YES:
+          fieldStore.put(field, Field.Store.YES);
+          break;
+        case NO:
+          fieldStore.put(field, Field.Store.NO);
+          break;
+        case COMPRESS:
+          fieldStore.put(field, Field.Store.COMPRESS);
+          break;
+        }        
+      } else if (key.startsWith(LuceneConstants.FIELD_INDEX_PREFIX)) {
+        String field = 
+          key.substring(LuceneConstants.FIELD_INDEX_PREFIX.length());
+        LuceneWriter.INDEX index = LuceneWriter.INDEX.valueOf(conf.get(key));
+        switch (index) {
+        case NO:
+          fieldIndex.put(field, Field.Index.NO);
+          break;
+        case NO_NORMS:
+          fieldIndex.put(field, Field.Index.NO_NORMS);
+          break;
+        case TOKENIZED:
+          fieldIndex.put(field, Field.Index.TOKENIZED);
+          break;
+        case UNTOKENIZED:
+          fieldIndex.put(field, Field.Index.UN_TOKENIZED);
+          break;
+        }   
+      } else if (key.startsWith(LuceneConstants.FIELD_VECTOR_PREFIX)) {
+        String field = 
+          key.substring(LuceneConstants.FIELD_VECTOR_PREFIX.length());
+        LuceneWriter.VECTOR vector = LuceneWriter.VECTOR.valueOf(conf.get(key));
+        switch (vector) {
+        case NO:
+          fieldVector.put(field, Field.TermVector.NO);
+          break;
+        case OFFSET:
+          fieldVector.put(field, Field.TermVector.WITH_OFFSETS);
+          break;
+        case POS:
+          fieldVector.put(field, Field.TermVector.WITH_POSITIONS);
+          break;
+        case POS_OFFSET:
+          fieldVector.put(field, Field.TermVector.WITH_POSITIONS_OFFSETS);
+          break;
+        case YES:
+          fieldVector.put(field, Field.TermVector.YES);
+          break;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.optimize();
+    writer.close();
+    fs.completeLocalOutput(perm, temp); // copy to dfs
+    fs.createNewFile(new Path(perm, Indexer.DONE_NAME));
+  }
+
+  @Override
+  public void open(Configuration conf)
+  throws IOException {
+    this.fs = FileSystem.get(conf);
+    perm = new Path(conf.get(LuceneConstants.OUTPUT_DIR));
+    temp = new Path(conf.get(LuceneConstants.TEMP_OUTPUT_DIR));
+
+    fs.delete(perm); // delete old, if any
+    analyzerFactory = new AnalyzerFactory(conf);
+    writer = new IndexWriter(fs.startLocalOutput(perm, temp).toString(),
+        new NutchDocumentAnalyzer(conf), true);
+
+    writer.setMergeFactor(conf.getInt("indexer.mergeFactor", 10));
+    writer.setMaxBufferedDocs(conf.getInt("indexer.minMergeDocs", 100));
+    writer.setMaxMergeDocs(conf
+        .getInt("indexer.maxMergeDocs", Integer.MAX_VALUE));
+    writer.setTermIndexInterval(conf.getInt("indexer.termIndexInterval", 128));
+    writer.setMaxFieldLength(conf.getInt("indexer.max.tokens", 10000));
+    writer.setInfoStream(LogUtil.getDebugStream(Indexer.LOG));
+    writer.setUseCompoundFile(false);
+    writer.setSimilarity(new NutchSimilarity());
+
+    processOptions(conf);
+  }
+
+  @Override
+  public void write(NutchDocument doc) throws IOException {
+    Document luceneDoc = createLuceneDoc(doc);
+    NutchAnalyzer analyzer = analyzerFactory.get(luceneDoc.get("lang"));
+    if (Indexer.LOG.isDebugEnabled()) {
+      Indexer.LOG.debug("Indexing [" + luceneDoc.get("url")
+          + "] with analyzer " + analyzer + " (" + luceneDoc.get("lang")
+          + ")");
+    }
+    writer.addDocument(luceneDoc, analyzer);
+
+  }
+
+  /** Adds a lucene field. 
+   * <p>
+   * This method is provided for backward-compatibility with
+   * older indexing filters. This should not be used by newer
+   * implementations since this is slower than 
+   * {@link NutchDocument#add(String, String)} and will be removed
+   * in a future release.
+   * </p>
+   * @param f Lucene field to be added.
+   * @deprecated Use {@link NutchDocument#add(String, String)} instead and
+   * set index-level metadata for field information.
+   * */
+  public static void add(NutchDocument doc, Field f) {
+    String fieldName = f.name();
+    String key = LuceneConstants.FIELD_PREFIX + fieldName;
+    Metadata documentMeta = doc.getDocumentMeta();
+    if (f.isStored()) {
+      documentMeta.add(key, LuceneConstants.STORE_YES);
+    } else if (f.isCompressed()) {
+      documentMeta.add(key, LuceneConstants.STORE_COMPRESS);
+    } else {
+      documentMeta.add(key, LuceneConstants.STORE_NO);
+    }
+    
+    if (f.isIndexed()) {
+      if (f.isTokenized()) {
+        documentMeta.add(key, LuceneConstants.INDEX_TOKENIZED);
+      } else if (f.getOmitNorms()) {
+        documentMeta.add(key, LuceneConstants.INDEX_NO_NORMS);
+      } else {
+        documentMeta.add(key, LuceneConstants.INDEX_UNTOKENIZED);
+      }
+    } else {
+      documentMeta.add(key, LuceneConstants.INDEX_NO);
+    }
+    
+    if (f.isStoreOffsetWithTermVector() && f.isStorePositionWithTermVector()) {
+      documentMeta.add(key, LuceneConstants.VECTOR_POS_OFFSET);
+    } else if (f.isStoreOffsetWithTermVector()) {
+      documentMeta.add(key, LuceneConstants.VECTOR_OFFSET);
+    } else if (f.isStorePositionWithTermVector()) {
+      documentMeta.add(key, LuceneConstants.VECTOR_POS);
+    } else if (f.isTermVectorStored()) {
+      documentMeta.add(key, LuceneConstants.VECTOR_YES);
+    } else {
+      documentMeta.add(key, LuceneConstants.VECTOR_NO);
+    }
+  }
+
+  public static void addFieldOptions(String field, LuceneWriter.STORE store, 
+      LuceneWriter.INDEX index, LuceneWriter.VECTOR vector, Configuration conf) {
+    
+    conf.set(LuceneConstants.FIELD_STORE_PREFIX + field, store.toString());
+    conf.set(LuceneConstants.FIELD_INDEX_PREFIX + field, index.toString());
+    conf.set(LuceneConstants.FIELD_VECTOR_PREFIX + field, vector.toString());
+  }
+  
+  public static void addFieldOptions(String field, LuceneWriter.STORE store, 
+      LuceneWriter.INDEX index, Configuration conf) {
+    LuceneWriter.addFieldOptions(field, store, index, LuceneWriter.VECTOR.NO, conf);
+  }
+}
Index: src/java/org/apache/nutch/indexer/lucene/LuceneConstants.java
===================================================================
--- src/java/org/apache/nutch/indexer/lucene/LuceneConstants.java	(revision 0)
+++ src/java/org/apache/nutch/indexer/lucene/LuceneConstants.java	(revision 0)
@@ -0,0 +1,42 @@
+package org.apache.nutch.indexer.lucene;
+
+public interface LuceneConstants {
+  public static final String LUCENE_PREFIX = "lucene.";
+  
+  public static final String OUTPUT_DIR = LUCENE_PREFIX + "output.dir";
+  
+  public static final String TEMP_OUTPUT_DIR = LUCENE_PREFIX + "tmp.dir";
+  
+  static final String FIELD_PREFIX = LUCENE_PREFIX + "field.";
+  
+  static final String FIELD_STORE_PREFIX = FIELD_PREFIX + "store.";
+  
+  static final String FIELD_INDEX_PREFIX = FIELD_PREFIX + "index.";
+  
+  static final String FIELD_VECTOR_PREFIX = FIELD_PREFIX + "vector.";
+  
+  static final String STORE_YES = "store.yes";
+  
+  static final String STORE_NO = "store.no";
+  
+  static final String STORE_COMPRESS = "store.compress";
+    
+  static final String INDEX_NO = "index.no";
+    
+  static final String INDEX_NO_NORMS = "index.no_norms";
+    
+  static final String INDEX_TOKENIZED = "index.tokenized";
+    
+  static final String INDEX_UNTOKENIZED = "index.untokenized";
+  
+  static final String VECTOR_NO = "vector.no";
+  
+  static final String VECTOR_POS = "vector.pos";
+  
+  static final String VECTOR_OFFSET = "vector.offset";
+  
+  static final String VECTOR_POS_OFFSET = "vector.pos_offset";
+  
+  static final String VECTOR_YES = "vector.yes";
+  
+}
Index: src/java/org/apache/nutch/indexer/IndexingFilters.java
===================================================================
--- src/java/org/apache/nutch/indexer/IndexingFilters.java	(revision 558683)
+++ src/java/org/apache/nutch/indexer/IndexingFilters.java	(working copy)
@@ -24,8 +24,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.lucene.document.Document;
-
 import org.apache.nutch.plugin.*;
 import org.apache.nutch.parse.Parse;
 import org.apache.hadoop.conf.Configuration;
@@ -41,7 +39,7 @@
   public final static Log LOG = LogFactory.getLog(IndexingFilters.class);
 
   private IndexingFilter[] indexingFilters;
-
+  
   public IndexingFilters(Configuration conf) {
     /* Get indexingfilter.order property */
     String order = conf.get(INDEXINGFILTER_ORDER);
@@ -62,15 +60,15 @@
         if (point == null)
           throw new RuntimeException(IndexingFilter.X_POINT_ID + " not found.");
         Extension[] extensions = point.getExtensions();
-        HashMap filterMap = new HashMap();
+        HashMap<String, IndexingFilter> filterMap = 
+          new HashMap<String, IndexingFilter>();
         for (int i = 0; i < extensions.length; i++) {
           Extension extension = extensions[i];
           IndexingFilter filter = (IndexingFilter) extension
               .getExtensionInstance();
-          if (LOG.isInfoEnabled()) {
-            LOG.info("Adding " + filter.getClass().getName());
-          }
+          LOG.info("Adding " + filter.getClass().getName());
           if (!filterMap.containsKey(filter.getClass().getName())) {
+            filter.addIndexBackendOptions(conf);
             filterMap.put(filter.getClass().getName(), filter);
           }
         }
@@ -80,15 +78,16 @@
          */
         if (orderedFilters == null) {
           conf.setObject(IndexingFilter.class.getName(),
-              (IndexingFilter[]) filterMap.values().toArray(
+              filterMap.values().toArray(
                   new IndexingFilter[0]));
           /* Otherwise run the filters in the required order */
         } else {
           ArrayList<IndexingFilter> filters = new ArrayList<IndexingFilter>();
           for (int i = 0; i < orderedFilters.length; i++) {
-            IndexingFilter filter = (IndexingFilter) filterMap
+            IndexingFilter filter = filterMap
                 .get(orderedFilters[i]);
             if (filter != null) {
+              filter.addIndexBackendOptions(conf);
               filters.add(filter);
             }
           }
@@ -104,7 +103,7 @@
   }                  
 
   /** Run all defined filters. */
-  public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum,
+  public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum,
       Inlinks inlinks) throws IndexingException {
     for (int i = 0; i < this.indexingFilters.length; i++) {
       doc = this.indexingFilters[i].filter(doc, parse, url, datum, inlinks);
@@ -114,4 +113,5 @@
 
     return doc;
   }
+
 }
Index: src/java/org/apache/nutch/indexer/Indexer.java
===================================================================
--- src/java/org/apache/nutch/indexer/Indexer.java	(revision 558683)
+++ src/java/org/apache/nutch/indexer/Indexer.java	(working copy)
@@ -25,17 +25,16 @@
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolBase;
 import org.apache.nutch.parse.*;
-import org.apache.nutch.analysis.*;
 
 import org.apache.nutch.scoring.ScoringFilterException;
 import org.apache.nutch.scoring.ScoringFilters;
-import org.apache.nutch.util.LogUtil;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
 
@@ -45,116 +44,95 @@
 import org.apache.nutch.crawl.LinkDb;
 import org.apache.nutch.crawl.NutchWritable;
 
-import org.apache.lucene.index.*;
-import org.apache.lucene.document.*;
+import org.apache.nutch.indexer.lucene.LuceneConstants;
+import org.apache.nutch.indexer.lucene.LuceneWriter;
+import org.apache.nutch.indexer.solr.SolrConstants;
+import org.apache.nutch.indexer.solr.SolrWriter;
 import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.metadata.Nutch;
 
 /** Create indexes for segments. */
 public class Indexer extends ToolBase implements Reducer, Mapper {
-  
+
   public static final String DONE_NAME = "index.done";
 
   public static final Log LOG = LogFactory.getLog(Indexer.class);
   
-  /** A utility class used to pass a lucene document from Indexer.reduce 
-   * to Indexer.OutputFormat.
-   * Note: Despite its name, it can't properly wrap a lucene document - it
-   * doesn't know how to serialize/deserialize a lucene document.
-   */
-  private static class LuceneDocumentWrapper implements Writable {
-    private Document doc;
-	  
-    public LuceneDocumentWrapper(Document doc) {
-      this.doc = doc;
-    }
-    
-    public Document get() {
-      return doc;
-    }
+  private static final String LUCENE_ENABLED_KEY = 
+    "indexer.lucene.backend.enabled";
+  
+  private static final String SOLR_ENABLED_KEY = 
+    "indexer.solr.backend.enabled";
+  
+  public static class OutputFormat extends org.apache.hadoop.mapred.OutputFormatBase {
 
-    public void readFields(DataInput in) throws IOException { 
-      // intentionally left blank
-    }
-		
-    public void write(DataOutput out) throws IOException {
-      // intentionally left blank
-    }
-	  
-  }
+    public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
+        String name, final Progressable progress) throws IOException {
 
-  /** Unwrap Lucene Documents created by reduce and add them to an index. */
-  public static class OutputFormat
-    extends org.apache.hadoop.mapred.OutputFormatBase {
-    public RecordWriter getRecordWriter(final FileSystem fs, JobConf job,
-                                        String name, final Progressable progress) throws IOException {
-      final Path perm = new Path(job.getOutputPath(), name);
-      final Path temp =
-        job.getLocalPath("index/_"+Integer.toString(new Random().nextInt()));
+      final List<NutchIndexWriter> writers = new ArrayList<NutchIndexWriter>();
 
-      fs.delete(perm);                            // delete old, if any
+      if (job.getBoolean(LUCENE_ENABLED_KEY, false)) {
+        LuceneWriter.addFieldOptions("segment", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);
+        LuceneWriter.addFieldOptions("digest", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);
+        LuceneWriter.addFieldOptions("boost", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);
+        
+        job.set(LuceneConstants.OUTPUT_DIR, 
+                      new Path(job.getOutputPath(), name).toString());
+        
+        Path temp = job.getLocalPath("index/_"  + 
+                                     Integer.toString(new Random().nextInt()));
+        job.set(LuceneConstants.TEMP_OUTPUT_DIR, temp.toString());
 
-      final AnalyzerFactory factory = new AnalyzerFactory(job);
-      final IndexWriter writer =                  // build locally first
-        new IndexWriter(fs.startLocalOutput(perm, temp).toString(),
-                        new NutchDocumentAnalyzer(job), true);
+        writers.add(new LuceneWriter());
+      }
 
-      writer.setMergeFactor(job.getInt("indexer.mergeFactor", 10));
-      writer.setMaxBufferedDocs(job.getInt("indexer.minMergeDocs", 100));
-      writer.setMaxMergeDocs(job.getInt("indexer.maxMergeDocs", Integer.MAX_VALUE));
-      writer.setTermIndexInterval
-        (job.getInt("indexer.termIndexInterval", 128));
-      writer.setMaxFieldLength(job.getInt("indexer.max.tokens", 10000));
-      writer.setInfoStream(LogUtil.getInfoStream(LOG));
-      writer.setUseCompoundFile(false);
-      writer.setSimilarity(new NutchSimilarity());
+      if (job.getBoolean(SOLR_ENABLED_KEY, false)) {
+        writers.add(new SolrWriter());
+      }
+      
+      for (NutchIndexWriter writer : writers) {
+        writer.open(job);
+      }
 
       return new RecordWriter() {
-          boolean closed;
+        boolean closed;
 
-          public void write(WritableComparable key, Writable value)
-            throws IOException {                  // unwrap & index doc
-            Document doc = ((LuceneDocumentWrapper) value).get();
-            NutchAnalyzer analyzer = factory.get(doc.get("lang"));
-            if (LOG.isInfoEnabled()) {
-              LOG.info(" Indexing [" + doc.getField("url").stringValue() + "]" +
-                       " with analyzer " + analyzer +
-                       " (" + doc.get("lang") + ")");
-            }
-            writer.addDocument(doc, analyzer);
-            progress.progress();
+        public void write(WritableComparable key, Writable value)
+        throws IOException {                  // unwrap & index doc
+          NutchDocument doc = (NutchDocument)value;
+          for (NutchIndexWriter writer : writers) {
+            writer.write(doc);
           }
-          
-          public void close(final Reporter reporter) throws IOException {
-            // spawn a thread to give progress heartbeats
-            Thread prog = new Thread() {
-                public void run() {
-                  while (!closed) {
-                    try {
-                      reporter.setStatus("closing");
-                      Thread.sleep(1000);
-                    } catch (InterruptedException e) { continue; }
-                      catch (Throwable e) { return; }
-                  }
-                }
-              };
+          progress.progress();
+        }
 
-            try {
-              prog.start();
-              if (LOG.isInfoEnabled()) { LOG.info("Optimizing index."); }
-              // optimize & close index
-              writer.optimize();
+        public void close(final Reporter reporter) throws IOException {
+          // spawn a thread to give progress heartbeats
+          Thread prog = new Thread() {
+            public void run() {
+              while (!closed) {
+                try {
+                  reporter.setStatus("closing");
+                  Thread.sleep(1000);
+                } catch (InterruptedException e) { continue; }
+                catch (Throwable e) { return; }
+              }
+            }
+          };
+
+          try {
+            prog.start();
+            for (NutchIndexWriter writer : writers) {
               writer.close();
-              fs.completeLocalOutput(perm, temp);   // copy to dfs
-              fs.createNewFile(new Path(perm, DONE_NAME));
-            } finally {
-              closed = true;
             }
+          } finally {
+            closed = true;
           }
-        };
+        }
+      };
     }
   }
-
+  
   private IndexingFilters filters;
   private ScoringFilters scfilters;
 
@@ -173,6 +151,11 @@
   }
 
   public void close() {}
+  
+  public void map(WritableComparable key, Writable value,
+      OutputCollector output, Reporter reporter) throws IOException {
+    output.collect(key, new NutchWritable(value));
+  }
 
   public void reduce(WritableComparable key, Iterator values,
                      OutputCollector output, Reporter reporter)
@@ -217,27 +200,16 @@
     if (!parseData.getStatus().isSuccess()) {
       return;
     }
-
-    Document doc = new Document();
+    
+    NutchDocument doc = new NutchDocument();
     Metadata metadata = parseData.getContentMeta();
 
     // add segment, used to map from merged index back to segment files
-    doc.add(new Field("segment", metadata.get(Nutch.SEGMENT_NAME_KEY),
-            Field.Store.YES, Field.Index.NO));
+    doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));
 
     // add digest, used by dedup
-    doc.add(new Field("digest", metadata.get(Nutch.SIGNATURE_KEY),
-            Field.Store.YES, Field.Index.NO));
+    doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));
 
-//     if (LOG.isInfoEnabled()) {
-//       LOG.info("Url: "+key.toString());
-//       LOG.info("Title: "+parseData.getTitle());
-//       LOG.info(crawlDatum.toString());
-//       if (inlinks != null) {
-//         LOG.info(inlinks.toString());
-//       }
-//     }
-
     Parse parse = new ParseImpl(parseText, parseData);
     try {
       // run indexing filters
@@ -262,33 +234,50 @@
       return;
     }
     // apply boost to all indexed fields.
-    doc.setBoost(boost);
+    doc.setScore(boost);
     // store boost for use by explain and dedup
-    doc.add(new Field("boost", Float.toString(boost),
-            Field.Store.YES, Field.Index.NO));
+    doc.add("boost", Float.toString(boost));
 
-    output.collect(key, new LuceneDocumentWrapper(doc));
+    output.collect(key, doc);
   }
+  
+  public void index(Path luceneDir, String solrUrl, Path crawlDb, 
+                    Path linkDb, Collection<Path> segments)
+  throws IOException {
 
-  public void index(Path indexDir, Path crawlDb, Path linkDb, Path[] segments)
-    throws IOException {
+    LOG.info("Indexer: starting");
+    LOG.info("Indexer: crawldbb: " + crawlDb);
+    LOG.info("Indexer: linkdb: " + linkDb);
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Indexer: starting");
-      LOG.info("Indexer: linkdb: " + linkDb);
+    JobConf job = new NutchJob(getConf());
+    String jobName = "index";
+    if (luceneDir != null) {
+      // add lucene output dir to job name
+      jobName += " lucene=" + luceneDir;
+      job.setBoolean(LUCENE_ENABLED_KEY, true);
+      LOG.info("Indexer: luceneDir: " + luceneDir);
+    } else {
+      job.setBoolean(LUCENE_ENABLED_KEY, false);
     }
+    
+    if (solrUrl != null) {
+      // add solr server url to job name
+      jobName += " solr=" + solrUrl;
+      job.setBoolean(SOLR_ENABLED_KEY, true);
+      job.set(SolrConstants.SERVER_URL, solrUrl);
+      LOG.info("Indexer: solrUrl: " + solrUrl);
+    } else {
+      job.setBoolean(SOLR_ENABLED_KEY, false);
+    }
+    
+    job.setJobName(jobName);
 
-    JobConf job = new NutchJob(getConf());
-    job.setJobName("index " + indexDir);
-
-    for (int i = 0; i < segments.length; i++) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Indexer: adding segment: " + segments[i]);
-      }
-      job.addInputPath(new Path(segments[i], CrawlDatum.FETCH_DIR_NAME));
-      job.addInputPath(new Path(segments[i], CrawlDatum.PARSE_DIR_NAME));
-      job.addInputPath(new Path(segments[i], ParseData.DIR_NAME));
-      job.addInputPath(new Path(segments[i], ParseText.DIR_NAME));
+    for (Path segment : segments) {
+      LOG.info("Indexer: adding segment: " + segment);
+      job.addInputPath(new Path(segment, CrawlDatum.FETCH_DIR_NAME));
+      job.addInputPath(new Path(segment, CrawlDatum.PARSE_DIR_NAME));
+      job.addInputPath(new Path(segment, ParseData.DIR_NAME));
+      job.addInputPath(new Path(segment, ParseText.DIR_NAME));
     }
 
     job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME));
@@ -298,10 +287,15 @@
     job.setMapperClass(Indexer.class);
     job.setReducerClass(Indexer.class);
 
-    job.setOutputPath(indexDir);
+    if (luceneDir == null) {
+      job.setOutputPath(new Path("notused"));
+    } else {
+      job.setOutputPath(luceneDir);
+    }
     job.setOutputFormat(OutputFormat.class);
     job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(NutchWritable.class);
+    job.setMapOutputValueClass(NutchWritable.class);
+    job.setOutputValueClass(NutchDocument.class);
 
     JobClient.runJob(job);
     if (LOG.isInfoEnabled()) { LOG.info("Indexer: done"); }
@@ -314,19 +308,45 @@
   
   public int run(String[] args) throws Exception {
     
-    if (args.length < 4) {
-      System.err.println("Usage: <index> <crawldb> <linkdb> <segment> ...");
+    if (args.length < 5) {
+      System.err.println("Usage: (-lucene <index path>) (-solr <server url>)" + 
+                         " <crawldb> <linkdb> <segment> ..."); 
+                         
       return -1;
     }
     
-    Path[] segments = new Path[args.length-3];
-    for (int i = 3; i < args.length; i++) {
-      segments[i-3] = new Path(args[i]);
+    Path luceneDir = null;
+    String solrUrl = null;
+    Path crawlDb = null;
+    Path linkDb = null;
+    ArrayList<Path> segments = new ArrayList<Path>();
+    int i;
+    for (i = 0; i < args.length; i++) {
+      if (args[i].equals("-lucene")) {
+        luceneDir = new Path(args[++i]);
+      } else if (args[i].equals("-solr")) {
+        solrUrl = args[++i];
+      } else {
+        break;
+      }
     }
+    
+    if (luceneDir == null && solrUrl == null) {
+      System.err.println("Usage: (-lucene <index path>) (-solr <server url>)" + 
+      " <crawldb> <linkdb> <segment> ..."); 
+      
+      return -1;
+    }
+    
+    crawlDb = new Path(args[i++]);
+    linkDb = new Path(args[i++]);
+    
+    for (; i < args.length; i++) {
+      segments.add(new Path(args[i]));
+    }
 
     try {
-      index(new Path(args[0]), new Path(args[1]), new Path(args[2]),
-                  segments);
+      index(luceneDir, solrUrl, crawlDb, linkDb, segments);
       return 0;
     } catch (Exception e) {
       LOG.fatal("Indexer: " + StringUtils.stringifyException(e));
@@ -334,9 +354,4 @@
     }
   }
 
-  public void map(WritableComparable key, Writable value,
-      OutputCollector output, Reporter reporter) throws IOException {
-    output.collect(key, new NutchWritable(value));
-  }
-
 }
Index: src/java/org/apache/nutch/scoring/ScoringFilter.java
===================================================================
--- src/java/org/apache/nutch/scoring/ScoringFilter.java	(revision 558683)
+++ src/java/org/apache/nutch/scoring/ScoringFilter.java	(working copy)
@@ -25,6 +25,7 @@
 import org.apache.lucene.document.Document;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.parse.Parse;
 import org.apache.nutch.parse.ParseData;
 import org.apache.nutch.plugin.Pluggable;
@@ -156,6 +157,6 @@
    * other scoring strategies by modifying Lucene document directly.
    * @throws ScoringFilterException
    */
-  public float indexerScore(Text url, Document doc, CrawlDatum dbDatum,
+  public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum,
           CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException;
 }
Index: src/java/org/apache/nutch/scoring/ScoringFilters.java
===================================================================
--- src/java/org/apache/nutch/scoring/ScoringFilters.java	(revision 558683)
+++ src/java/org/apache/nutch/scoring/ScoringFilters.java	(working copy)
@@ -25,6 +25,7 @@
 import org.apache.lucene.document.Document;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.parse.Parse;
 import org.apache.nutch.parse.ParseData;
 import org.apache.nutch.plugin.Extension;
@@ -135,7 +136,7 @@
     return adjust;
   }
 
-  public float indexerScore(Text url, Document doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException {
+  public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException {
     for (int i = 0; i < this.filters.length; i++) {
       initScore = this.filters[i].indexerScore(url, doc, dbDatum, fetchDatum, parse, inlinks, initScore);
     }
Index: src/java/org/apache/nutch/crawl/Inlinks.java
===================================================================
--- src/java/org/apache/nutch/crawl/Inlinks.java	(revision 558683)
+++ src/java/org/apache/nutch/crawl/Inlinks.java	(working copy)
@@ -69,7 +69,7 @@
 
   /** Return the set of anchor texts.  Only a single anchor with a given text
    * is permitted from a given domain. */
-  public String[] getAnchors() throws IOException {
+  public String[] getAnchors() {
     HashMap domainToAnchors = new HashMap();
     ArrayList results = new ArrayList();
     Iterator it = inlinks.iterator();
Index: src/java/org/apache/nutch/crawl/Crawl.java
===================================================================
--- src/java/org/apache/nutch/crawl/Crawl.java	(revision 558683)
+++ src/java/org/apache/nutch/crawl/Crawl.java	(working copy)
@@ -131,7 +131,7 @@
       linkDbTool.invert(linkDb, segments, true, true, false); // invert links
 
       // index, dedup & merge
-      indexer.index(indexes, crawlDb, linkDb, fs.listPaths(segments));
+      indexer.index(indexes, null, crawlDb, linkDb, Arrays.asList(fs.listPaths(segments)));
       dedup.dedup(new Path[] { indexes });
       merger.merge(fs.listPaths(indexes), index, tmpDir);
     } else {
Index: src/plugin/microformats-reltag/src/java/org/apache/nutch/microformats/reltag/RelTagIndexingFilter.java
===================================================================
--- src/plugin/microformats-reltag/src/java/org/apache/nutch/microformats/reltag/RelTagIndexingFilter.java	(revision 558683)
+++ src/plugin/microformats-reltag/src/java/org/apache/nutch/microformats/reltag/RelTagIndexingFilter.java	(working copy)
@@ -22,17 +22,15 @@
 import org.apache.nutch.crawl.Inlinks;
 import org.apache.nutch.indexer.IndexingFilter;
 import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.lucene.LuceneWriter;
 import org.apache.hadoop.io.Text;
 import org.apache.nutch.parse.Parse;
 
 // Hadoop imports
 import org.apache.hadoop.conf.Configuration;
 
-// Lucene imports
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Document;
 
-
 /**
  * An {@link org.apache.nutch.indexer.IndexingFilter} that 
  * add <code>tag</code> field(s) to the document.
@@ -48,22 +46,26 @@
 
 
   // Inherited JavaDoc
-  public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
+  public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
     throws IndexingException {
 
     // Check if some Rel-Tags found, possibly put there by RelTagParser
     String[] tags = parse.getData().getParseMeta().getValues(RelTagParser.REL_TAG);
     if (tags != null) {
       for (int i=0; i<tags.length; i++) {
-        doc.add(new Field("tag", tags[i],
-                          Field.Store.YES, Field.Index.UN_TOKENIZED));
+        doc.add("tag", tags[i]);
       }
     }
 
     return doc;
   }
-
   
+  @Override
+  public void addIndexBackendOptions(Configuration conf) {
+    LuceneWriter.addFieldOptions("tag", LuceneWriter.STORE.YES, 
+        LuceneWriter.INDEX.UNTOKENIZED, conf);    
+  }
+  
   /* ----------------------------- *
    * <implementation:Configurable> *
    * ----------------------------- */
Index: src/plugin/index-basic/src/java/org/apache/nutch/indexer/basic/BasicIndexingFilter.java
===================================================================
--- src/plugin/index-basic/src/java/org/apache/nutch/indexer/basic/BasicIndexingFilter.java	(revision 558683)
+++ src/plugin/index-basic/src/java/org/apache/nutch/indexer/basic/BasicIndexingFilter.java	(working copy)
@@ -21,20 +21,19 @@
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.lucene.document.DateTools;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
 
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.parse.Parse;
 
 import org.apache.nutch.indexer.IndexingFilter;
 import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.lucene.LuceneWriter;
 import org.apache.hadoop.io.Text;
 
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Inlinks;
 
-import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import org.apache.hadoop.conf.Configuration;
@@ -46,7 +45,7 @@
   private int MAX_TITLE_LENGTH;
   private Configuration conf;
 
-  public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
+  public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
     throws IndexingException {
     
     String host = null;
@@ -58,29 +57,16 @@
     }
 
     if (host != null) {
-      // add host as un-stored, indexed and tokenized
-      doc.add(new Field("host", host, Field.Store.NO, Field.Index.TOKENIZED));
-      // add site as un-stored, indexed and un-tokenized
-      doc.add(new Field("site", host, Field.Store.NO, Field.Index.UN_TOKENIZED));
+      doc.add("host", host);
+      doc.add("site", host);
     }
 
-
-    // url is both stored and indexed, so it's both searchable and returned
-    doc.add(new Field("url", url.toString(), Field.Store.YES, Field.Index.TOKENIZED));
+    doc.add("url", url.toString());
+    doc.add("content", parse.getText());
     
-    // content is indexed, so that it's searchable, but not stored in index
-    doc.add(new Field("content", parse.getText(), Field.Store.NO, Field.Index.TOKENIZED));
-    
-    // anchors are indexed, so they're searchable, but not stored in index
-    try {
-      String[] anchors = (inlinks != null ? inlinks.getAnchors() : new String[0]);
-      for (int i = 0; i < anchors.length; i++) {
-        doc.add(new Field("anchor", anchors[i], Field.Store.NO, Field.Index.TOKENIZED));
-      }
-    } catch (IOException ioe) {
-      if (LOG.isWarnEnabled()) {
-        LOG.warn("BasicIndexingFilter: can't get anchors for " + url.toString());
-      }
+    String[] anchors = (inlinks != null ? inlinks.getAnchors() : new String[0]);
+    for (String anchor : anchors) {
+      doc.add("anchor", anchor);
     }
 
     // title
@@ -88,22 +74,56 @@
     if (title.length() > MAX_TITLE_LENGTH) {      // truncate title if needed
       title = title.substring(0, MAX_TITLE_LENGTH);
     }
-    // add title indexed and stored so that it can be displayed
-    doc.add(new Field("title", title, Field.Store.YES, Field.Index.TOKENIZED));
+    doc.add("title", title);
+    
     // add cached content/summary display policy, if available
     String caching = parse.getData().getMeta(Nutch.CACHING_FORBIDDEN_KEY);
     if (caching != null && !caching.equals(Nutch.CACHING_FORBIDDEN_NONE)) {
-      doc.add(new Field("cache", caching, Field.Store.YES, Field.Index.NO));
+      doc.add("cache", caching);
     }
     
     // add timestamp when fetched, for deduplication
-    doc.add(new Field("tstamp",
-        DateTools.timeToString(datum.getFetchTime(), DateTools.Resolution.MILLISECOND),
-        Field.Store.YES, Field.Index.NO));
+    doc.add("tstamp",
+            DateTools.timeToString(datum.getFetchTime(), 
+            DateTools.Resolution.MILLISECOND));
 
     return doc;
   }
+  
+  public void addIndexBackendOptions(Configuration conf) {
+    
+    ///////////////////////////
+    //    add lucene options   //
+    ///////////////////////////
+    
+    // host is un-stored, indexed and tokenized
+    LuceneWriter.addFieldOptions("host", LuceneWriter.STORE.NO,
+        LuceneWriter.INDEX.TOKENIZED, conf); 
 
+    // site is un-stored, indexed and un-tokenized
+    LuceneWriter.addFieldOptions("site", LuceneWriter.STORE.NO, 
+        LuceneWriter.INDEX.UNTOKENIZED, conf);
+
+    // url is both stored and indexed, so it's both searchable and returned
+    LuceneWriter.addFieldOptions("url", LuceneWriter.STORE.YES, 
+        LuceneWriter.INDEX.TOKENIZED, conf);
+
+    // content is indexed, so that it's searchable, but not stored in index
+    LuceneWriter.addFieldOptions("content", LuceneWriter.STORE.NO, 
+        LuceneWriter.INDEX.TOKENIZED, conf);
+
+    // anchors are indexed, so they're searchable, but not stored in index
+    LuceneWriter.addFieldOptions("anchor", LuceneWriter.STORE.NO, 
+        LuceneWriter.INDEX.TOKENIZED, conf);
+
+    // title is indexed and stored so that it can be displayed
+    LuceneWriter.addFieldOptions("title", LuceneWriter.STORE.YES, 
+        LuceneWriter.INDEX.TOKENIZED, conf);
+
+    LuceneWriter.addFieldOptions("cache", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, conf);
+    LuceneWriter.addFieldOptions("tstamp", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, conf);
+  }
+
   public void setConf(Configuration conf) {
     this.conf = conf;
     this.MAX_TITLE_LENGTH = conf.getInt("indexer.max.title.length", 100);
Index: src/plugin/languageidentifier/src/java/org/apache/nutch/analysis/lang/LanguageIndexingFilter.java
===================================================================
--- src/plugin/languageidentifier/src/java/org/apache/nutch/analysis/lang/LanguageIndexingFilter.java	(revision 558683)
+++ src/plugin/languageidentifier/src/java/org/apache/nutch/analysis/lang/LanguageIndexingFilter.java	(working copy)
@@ -22,6 +22,8 @@
 import org.apache.nutch.crawl.Inlinks;
 import org.apache.nutch.indexer.IndexingFilter;
 import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.lucene.LuceneWriter;
 import org.apache.hadoop.io.Text;
 import org.apache.nutch.parse.Parse;
 import org.apache.nutch.metadata.Metadata;
@@ -30,11 +32,7 @@
 // Hadoop imports
 import org.apache.hadoop.conf.Configuration;
 
-// Lucene imports
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Document;
 
-
 /**
  * An {@link org.apache.nutch.indexer.IndexingFilter} that 
  * add a <code>lang</code> (language) field to the document.
@@ -65,7 +63,7 @@
   }
 
   // Inherited JavaDoc
-  public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
+  public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
     throws IndexingException {
 
     // check if LANGUAGE found, possibly put there by HTMLLanguageParser
@@ -92,10 +90,16 @@
       lang = "unknown";
     }
 
-    doc.add(new Field("lang", lang, Field.Store.YES, Field.Index.UN_TOKENIZED));
+    doc.add("lang", lang);
 
     return doc;
   }
+
+  @Override
+  public void addIndexBackendOptions(Configuration conf) {
+    LuceneWriter.addFieldOptions("lang", LuceneWriter.STORE.YES, 
+        LuceneWriter.INDEX.UNTOKENIZED, conf);
+  }
   
   public void setConf(Configuration conf) {
     this.conf = conf;
@@ -105,4 +109,5 @@
   public Configuration getConf() {
     return this.conf;
   }
+  
 }
Index: src/plugin/scoring-opic/src/java/org/apache/nutch/scoring/opic/OPICScoringFilter.java
===================================================================
--- src/plugin/scoring-opic/src/java/org/apache/nutch/scoring/opic/OPICScoringFilter.java	(revision 558683)
+++ src/plugin/scoring-opic/src/java/org/apache/nutch/scoring/opic/OPICScoringFilter.java	(working copy)
@@ -29,11 +29,9 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.lucene.document.Document;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Inlinks;
-import org.apache.nutch.fetcher.Fetcher;
-import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.parse.Parse;
 import org.apache.nutch.parse.ParseData;
@@ -158,7 +156,7 @@
   }
 
   /** Dampen the boost value by scorePower.*/
-  public float indexerScore(Text url, Document doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException {
+  public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException {
     return (float)Math.pow(dbDatum.getScore(), scorePower) * initScore;
   }
 }
Index: src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java
===================================================================
--- src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java	(revision 558683)
+++ src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java	(working copy)
@@ -17,20 +17,19 @@
 
 package org.creativecommons.nutch;
 
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
 import org.apache.nutch.metadata.CreativeCommons;
 
 import org.apache.nutch.parse.Parse;
 
 import org.apache.nutch.indexer.IndexingFilter;
 import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.lucene.LuceneWriter;
 import org.apache.hadoop.io.Text;
 
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Inlinks;
 import org.apache.nutch.metadata.Metadata;
-import org.apache.nutch.metadata.CreativeCommons;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -50,7 +49,7 @@
 
   private Configuration conf;
 
-  public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
+  public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
     throws IndexingException {
     
     Metadata metadata = parse.getData().getParseMeta();
@@ -86,7 +85,7 @@
   /** Add the features represented by a license URL.  Urls are of the form
    * "http://creativecommons.org/licenses/xx-xx/xx/xx", where "xx" names a
    * license feature. */
-  public void addUrlFeatures(Document doc, String urlString) {
+  public void addUrlFeatures(NutchDocument doc, String urlString) {
     try {
       URL url = new URL(urlString);
 
@@ -108,9 +107,14 @@
     }
   }
   
-  private void addFeature(Document doc, String feature) {
-    doc.add(new Field(FIELD, feature, Field.Store.YES, Field.Index.UN_TOKENIZED));
+  private void addFeature(NutchDocument doc, String feature) {
+    doc.add(FIELD, feature);
   }
+  
+  @Override
+  public void addIndexBackendOptions(Configuration conf) {
+    LuceneWriter.addFieldOptions(FIELD, LuceneWriter.STORE.YES, LuceneWriter.INDEX.UNTOKENIZED, conf);
+  }
 
   public void setConf(Configuration conf) {
     this.conf = conf;
Index: src/plugin/feed/src/java/org/apache/nutch/indexer/feed/FeedIndexingFilter.java
===================================================================
--- src/plugin/feed/src/java/org/apache/nutch/indexer/feed/FeedIndexingFilter.java	(revision 558683)
+++ src/plugin/feed/src/java/org/apache/nutch/indexer/feed/FeedIndexingFilter.java	(working copy)
@@ -25,12 +25,12 @@
 //APACHE imports
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Inlinks;
 import org.apache.nutch.indexer.IndexingException;
 import org.apache.nutch.indexer.IndexingFilter;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.lucene.LuceneWriter;
 import org.apache.nutch.metadata.Feed;
 import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.parse.Parse;
@@ -71,7 +71,7 @@
    * index.
    *  
    */
-  public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum,
+  public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum,
                          Inlinks inlinks) throws IndexingException {
     ParseData parseData = parse.getData();
     Metadata parseMeta = parseData.getParseMeta();
@@ -84,35 +84,31 @@
     
     if (authors != null) {
       for (String author : authors) {
-        doc.add(new Field(Feed.FEED_AUTHOR, author, 
-            Field.Store.YES, Field.Index.TOKENIZED));
+        doc.add(Feed.FEED_AUTHOR, author);
       }
     }
     
     if (tags != null) {
       for (String tag : tags) {
-        doc.add(new Field(Feed.FEED_TAGS, tag, 
-            Field.Store.YES, Field.Index.TOKENIZED));
+        doc.add(Feed.FEED_TAGS, tag);
       }
     }
     
     if (feed != null)
-      doc.add(new Field(Feed.FEED, feed, Field.Store.YES, Field.Index.TOKENIZED));
+      doc.add(Feed.FEED, feed);
     
     SimpleDateFormat sdf = new SimpleDateFormat(dateFormatStr);
     sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
     if (published != null) {
       Date date = new Date(Long.parseLong(published));
       String dateString = sdf.format(date);
-      doc.add(new Field(PUBLISHED_DATE, dateString, 
-                        Field.Store.YES, Field.Index.NO_NORMS));
+      doc.add(PUBLISHED_DATE, dateString);
     }
     
     if (updated != null) {
       Date date = new Date(Long.parseLong(updated));
       String dateString = sdf.format(date);
-      doc.add(new Field(UPDATED_DATE, dateString, 
-                        Field.Store.YES, Field.Index.NO_NORMS));
+      doc.add(UPDATED_DATE, dateString);
     }
         
     return doc;
@@ -126,6 +122,25 @@
     return conf;
   }
 
+  @Override
+  public void addIndexBackendOptions(Configuration conf) {
+    LuceneWriter.addFieldOptions(Feed.FEED_AUTHOR,
+        LuceneWriter.STORE.YES, LuceneWriter.INDEX.TOKENIZED, conf);
+    
+    LuceneWriter.addFieldOptions(Feed.FEED_TAGS,
+        LuceneWriter.STORE.YES, LuceneWriter.INDEX.TOKENIZED, conf);
+    
+    LuceneWriter.addFieldOptions(Feed.FEED,
+        LuceneWriter.STORE.YES, LuceneWriter.INDEX.TOKENIZED, conf);
+    
+    LuceneWriter.addFieldOptions(PUBLISHED_DATE,
+        LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO_NORMS, conf);
+
+    LuceneWriter.addFieldOptions(UPDATED_DATE,
+        LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO_NORMS, conf);
+
+  }
+
   /**
    * Sets the {@link Configuration} object used to configure this
    * {@link IndexingFilter}.
Index: src/plugin/subcollection/src/java/org/apache/nutch/indexer/subcollection/SubcollectionIndexingFilter.java
===================================================================
--- src/plugin/subcollection/src/java/org/apache/nutch/indexer/subcollection/SubcollectionIndexingFilter.java	(revision 558683)
+++ src/plugin/subcollection/src/java/org/apache/nutch/indexer/subcollection/SubcollectionIndexingFilter.java	(working copy)
@@ -19,8 +19,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.Text;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,6 +28,8 @@
 
 import org.apache.nutch.indexer.IndexingFilter;
 import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.lucene.LuceneWriter;
 
 import org.apache.nutch.collection.CollectionManager;
 import org.apache.nutch.crawl.CrawlDatum;
@@ -62,14 +62,20 @@
    * @param doc
    * @param url
    */
-  private void addSubCollectionField(Document doc, String url) {
+  private void addSubCollectionField(NutchDocument doc, String url) {
     String collname = CollectionManager.getCollectionManager(getConf()).getSubCollections(url);
-    doc.add(new Field(FIELD_NAME, collname, Field.Store.YES, Field.Index.TOKENIZED));
+    doc.add(FIELD_NAME, collname);
   }
 
-  public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException {
+  public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException {
     String sUrl = url.toString();
     addSubCollectionField(doc, sUrl);
     return doc;
   }
+
+  @Override
+  public void addIndexBackendOptions(Configuration conf) {
+    LuceneWriter.addFieldOptions(FIELD_NAME, LuceneWriter.STORE.YES,
+        LuceneWriter.INDEX.UNTOKENIZED, conf);
+  }
 }
Index: src/plugin/index-more/src/java/org/apache/nutch/indexer/more/MoreIndexingFilter.java
===================================================================
--- src/plugin/index-more/src/java/org/apache/nutch/indexer/more/MoreIndexingFilter.java	(revision 558683)
+++ src/plugin/index-more/src/java/org/apache/nutch/indexer/more/MoreIndexingFilter.java	(working copy)
@@ -27,9 +27,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-
 import org.apache.nutch.metadata.Metadata;
 
 import org.apache.nutch.net.protocols.HttpDateFormat;
@@ -39,6 +36,8 @@
 
 import org.apache.nutch.indexer.IndexingFilter;
 import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.lucene.LuceneWriter;
 
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Inlinks;
@@ -82,7 +81,7 @@
   /** Get the MimeTypes resolver instance. */
   private MimeTypes MIME; 
   
-  public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
+  public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
     throws IndexingException {
 
     String url_s = url.toString();
@@ -97,7 +96,7 @@
     
   // Add time related meta info.  Add last-modified if present.  Index date as
   // last-modified, or, if that's not present, use fetch time.
-  private Document addTime(Document doc, ParseData data,
+  private NutchDocument addTime(NutchDocument doc, ParseData data,
                            String url, CrawlDatum datum) {
     long time = -1;
 
@@ -105,7 +104,7 @@
     if (lastModified != null) {                   // try parse last-modified
       time = getTime(lastModified,url);           // use as time
                                                   // store as string
-      doc.add(new Field("lastModified", new Long(time).toString(), Field.Store.YES, Field.Index.NO));
+      doc.add("lastModified", Long.toString(time));
     }
 
     if (time == -1) {                             // if no last-modified
@@ -119,7 +118,7 @@
     String dateString = sdf.format(new Date(time));
 
     // un-stored, indexed and un-tokenized
-    doc.add(new Field("date", dateString, Field.Store.NO, Field.Index.UN_TOKENIZED));
+    doc.add("date", dateString);
 
     return doc;
   }
@@ -169,17 +168,17 @@
   }
 
   // Add Content-Length
-  private Document addLength(Document doc, ParseData data, String url) {
+  private NutchDocument addLength(NutchDocument doc, ParseData data, String url) {
     String contentLength = data.getMeta(Response.CONTENT_LENGTH);
 
     if (contentLength != null)
-      doc.add(new Field("contentLength", contentLength, Field.Store.YES, Field.Index.NO));
+      doc.add("contentLength", contentLength);
 
     return doc;
   }
 
   // Add Content-Type and its primaryType and subType
-  private Document addType(Document doc, ParseData data, String url) {
+  private NutchDocument addType(NutchDocument doc, ParseData data, String url) {
     MimeType mimeType = null;
     String contentType = data.getMeta(Response.CONTENT_TYPE);
     if (contentType == null) {
@@ -199,7 +198,7 @@
         try {
             mimeType = new MimeType(contentType);
         } catch (MimeTypeException e) {
-            if (LOG.isWarnEnabled()) { LOG.warn(url + e.toString()); }
+            LOG.warn(url + e.toString());
             mimeType = null;
         }
     }
@@ -225,14 +224,13 @@
     // type:vnd.ms-powerpoint
     // all case insensitive.
     // The query filter is implemented in TypeQueryFilter.java
-    doc.add(new Field("type", contentType, Field.Store.NO, Field.Index.UN_TOKENIZED));
-    doc.add(new Field("type", primaryType, Field.Store.NO, Field.Index.UN_TOKENIZED));
-    doc.add(new Field("type", subType, Field.Store.NO, Field.Index.UN_TOKENIZED));
+    doc.add("type", contentType);
+    doc.add("type", primaryType);
+    doc.add("type", subType);
 
     // add its primaryType and subType to respective fields
-    // as stored, indexed and un-tokenized
-    doc.add(new Field("primaryType", primaryType, Field.Store.YES, Field.Index.UN_TOKENIZED));
-    doc.add(new Field("subType", subType, Field.Store.YES, Field.Index.UN_TOKENIZED));
+    doc.add("primaryType", primaryType);
+    doc.add("subType", subType);
 
     return doc;
   }
@@ -261,7 +259,7 @@
     }
   }
 
-  private Document resetTitle(Document doc, ParseData data, String url) {
+  private NutchDocument resetTitle(NutchDocument doc, ParseData data, String url) {
     String contentDisposition = data.getMeta(Metadata.CONTENT_DISPOSITION);
     if (contentDisposition == null)
       return doc;
@@ -270,14 +268,40 @@
     for (int i=0; i<patterns.length; i++) {
       if (matcher.contains(contentDisposition,patterns[i])) {
         result = matcher.getMatch();
-        doc.add(new Field("title", result.group(1), Field.Store.YES, Field.Index.NO));
+        doc.add("title", result.group(1));
         break;
       }
     }
 
     return doc;
   }
+  
+  public void addIndexBackendOptions(Configuration conf) {
+    
+    ///////////////////////////
+    //    add lucene options //
+    ///////////////////////////
+    
+    LuceneWriter.addFieldOptions("type", LuceneWriter.STORE.NO,
+        LuceneWriter.INDEX.UNTOKENIZED, conf); 
+    
+    // primaryType and subType are stored, indexed and un-tokenized
+    LuceneWriter.addFieldOptions("primaryType", LuceneWriter.STORE.YES,
+        LuceneWriter.INDEX.UNTOKENIZED, conf); 
+    LuceneWriter.addFieldOptions("subType", LuceneWriter.STORE.YES,
+        LuceneWriter.INDEX.UNTOKENIZED, conf);
+    
+    LuceneWriter.addFieldOptions("contentLength", LuceneWriter.STORE.YES,
+        LuceneWriter.INDEX.NO, conf); 
 
+    LuceneWriter.addFieldOptions("lastModified", LuceneWriter.STORE.YES,
+        LuceneWriter.INDEX.NO, conf); 
+
+    // un-stored, indexed and un-tokenized
+    LuceneWriter.addFieldOptions("date", LuceneWriter.STORE.NO,
+        LuceneWriter.INDEX.UNTOKENIZED, conf); 
+  }
+
   public void setConf(Configuration conf) {
     this.conf = conf;
     MAGIC = conf.getBoolean("mime.type.magic", true);
