diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index 579ff14..44fd520 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -1024,4 +1024,25 @@
   <description>Default class for storing data</description>
 </property>
 
+<property>
+  <name>storage.schema</name>
+  <value>webpage</value>
+  <description>This value holds the schema name used for Nutch web db.
+  Note that Nutch ignores the value in the gora mapping files, and uses
+  this as the schema name.
+  </description>
+</property>
+
+<property>
+  <name>storage.schema.id</name>
+  <value>new</value>
+  <description>This value helps differentiate between the datasets that
+  the jobs in the crawl cycle generate and operate on. The value will
+  be input to all the jobs which then will use it as a prefix when
+  accessing to the schemas. The default configuration uses no id to prefix
+  the schemas. The value could also be given as a command line argument
+  to each job.
+  </description>
+</property>
+
 </configuration>
diff --git a/src/java/org/apache/nutch/crawl/DbUpdaterJob.java b/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
index 4a96c90..bf754db 100644
--- a/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
+++ b/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
@@ -42,16 +42,16 @@ implements Tool {
     FIELDS.add(WebPage.Field.PREV_FETCH_TIME);
     FIELDS.add(WebPage.Field.PREV_SIGNATURE);
   }
-  
+
   public DbUpdaterJob() {
-    
+
   }
-  
+
   public DbUpdaterJob(Configuration conf) {
     setConf(conf);
   }
 
-  private int updateTable() throws Exception {
+  private int updateTable(String schemaId) throws Exception {
     LOG.info("DbUpdaterJob: starting");
     Job job = new NutchJob(getConf(), "update-table");
     //job.setBoolean(ALL, updateAll);
@@ -61,9 +61,9 @@ implements Tool {
     // TODO: Figure out why this needs to be here
     job.getConfiguration().setClass("mapred.output.key.comparator.class",
         StringComparator.class, RawComparator.class);
-    StorageUtils.initMapperJob(job, fields, String.class,
+    StorageUtils.initMapperJob(job, schemaId, fields, String.class,
         NutchWritable.class, DbUpdateMapper.class);
-    StorageUtils.initReducerJob(job, DbUpdateReducer.class);
+    StorageUtils.initReducerJob(job, schemaId, DbUpdateReducer.class);
 
     boolean success = job.waitForCompletion(true);
     if (!success){
@@ -75,7 +75,11 @@ implements Tool {
   }
 
   public int run(String[] args) throws Exception {
-	return updateTable();
+    String schemaId = null;
+    if (args.length == 2 && "-schemaId".equals(args[0])) {
+      schemaId = args[1];
+    }
+    return updateTable(schemaId);
   }
 
   public static void main(String[] args) throws Exception {
diff --git a/src/java/org/apache/nutch/crawl/GeneratorJob.java b/src/java/org/apache/nutch/crawl/GeneratorJob.java
index 0589f26..b78999c 100644
--- a/src/java/org/apache/nutch/crawl/GeneratorJob.java
+++ b/src/java/org/apache/nutch/crawl/GeneratorJob.java
@@ -114,19 +114,19 @@ public class GeneratorJob extends Configured implements Tool {
   }
 
   public GeneratorJob() {
-    
+
   }
-  
+
   public GeneratorJob(Configuration conf) {
     setConf(conf);
   }
-  
+
   /**
    * Mark URLs ready for fetching.
    * @throws ClassNotFoundException
    * @throws InterruptedException
    * */
-  public String generate(long topN, long curTime, boolean filter, boolean norm)
+  public String generate(long topN, long curTime, boolean filter, boolean norm, String schemaId)
       throws Exception {
 
     LOG.info("GeneratorJob: Selecting best-scoring urls due for fetch.");
@@ -158,9 +158,9 @@ public class GeneratorJob extends Configured implements Tool {
     }
 
     Job job = new NutchJob(getConf(), "generate: " + crawlId);
-    StorageUtils.initMapperJob(job, FIELDS, SelectorEntry.class, WebPage.class,
-        GeneratorMapper.class, URLPartitioner.class);
-    StorageUtils.initReducerJob(job, GeneratorReducer.class);
+    StorageUtils.initMapperJob(job, schemaId, FIELDS, SelectorEntry.class,
+        WebPage.class, GeneratorMapper.class, URLPartitioner.class, true);
+    StorageUtils.initReducerJob(job, schemaId, GeneratorReducer.class);
 
     boolean success = job.waitForCompletion(true);
     if (!success) return null;
@@ -173,6 +173,7 @@ public class GeneratorJob extends Configured implements Tool {
   public int run(String[] args) throws Exception {
     long curTime = System.currentTimeMillis(), topN = Long.MAX_VALUE;
     boolean filter = true, norm = true;
+    String schemaId = null;
 
     for (int i = 0; i < args.length; i++) {
       if ("-topN".equals(args[i])) {
@@ -181,11 +182,13 @@ public class GeneratorJob extends Configured implements Tool {
         filter = false;
       } else if ("-noNorm".equals(args[i])) {
         norm = false;
+      } else if ("-schemaId".equals(args[i])) {
+        schemaId = args[++i];
       }
     }
 
     try {
-      return (generate(topN, curTime, filter, norm) != null) ? 0 : -1;
+      return (generate(topN, curTime, filter, norm, schemaId) != null) ? 0 : -1;
     } catch (Exception e) {
       LOG.error("GeneratorJob: " + StringUtils.stringifyException(e));
       return -1;
diff --git a/src/java/org/apache/nutch/crawl/InjectorJob.java b/src/java/org/apache/nutch/crawl/InjectorJob.java
index baf75dd..e3f6ed5 100644
--- a/src/java/org/apache/nutch/crawl/InjectorJob.java
+++ b/src/java/org/apache/nutch/crawl/InjectorJob.java
@@ -34,6 +34,7 @@ import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.TableUtil;
 import org.gora.mapreduce.GoraMapper;
 import org.gora.mapreduce.GoraOutputFormat;
+import org.gora.store.DataStore;
 
 /** This class takes a flat file of URLs and adds them to the of pages to be
  * crawled.  Useful for bootstrapping the system.
@@ -172,13 +173,13 @@ public class InjectorJob extends GoraMapper<String, WebPage, String, WebPage>
   }
 
   public InjectorJob() {
-    
+
   }
-  
+
   public InjectorJob(Configuration conf) {
     setConf(conf);
   }
-  
+
   @Override
   public Configuration getConf() {
     return conf;
@@ -212,7 +213,7 @@ public class InjectorJob extends GoraMapper<String, WebPage, String, WebPage>
     context.write(key, row);
   }
 
-  public void inject(Path urlDir) throws Exception {
+  public void inject(Path urlDir, String schemaId) throws Exception {
     LOG.info("InjectorJob: starting");
     LOG.info("InjectorJob: urlDir: " + urlDir);
 
@@ -223,15 +224,16 @@ public class InjectorJob extends GoraMapper<String, WebPage, String, WebPage>
     job.setMapOutputKeyClass(String.class);
     job.setMapOutputValueClass(WebPage.class);
     job.setOutputFormatClass(GoraOutputFormat.class);
-    GoraOutputFormat.setOutput(job, String.class,
-        WebPage.class, StorageUtils.getDataStoreClass(getConf()), true);
+    DataStore<String, WebPage> store = StorageUtils.createWebStore(job.getConfiguration(),
+        String.class, WebPage.class, schemaId);
+    GoraOutputFormat.setOutput(job, store, true);
     job.setReducerClass(Reducer.class);
     job.setNumReduceTasks(0);
     job.waitForCompletion(true);
 
     job = new NutchJob(getConf(), "inject-p2 " + urlDir);
-    StorageUtils.initMapperJob(job, FIELDS, String.class, WebPage.class,
-        InjectorJob.class);
+    StorageUtils.initMapperJob(job, schemaId, FIELDS, String.class,
+        WebPage.class, InjectorJob.class);
     job.setNumReduceTasks(0);
     job.waitForCompletion(true);
   }
@@ -239,11 +241,16 @@ public class InjectorJob extends GoraMapper<String, WebPage, String, WebPage>
   @Override
   public int run(String[] args) throws Exception {
     if (args.length < 1) {
-      System.err.println("Usage: InjectorJob <url_dir>");
+      System.err.println("Usage: InjectorJob <url_dir> [-schemaId <id>]");
       return -1;
     }
+    String schemaId = null;
+    if (args.length == 3 && "-schemaId".equals(args[1])) {
+      schemaId = args[2];
+    }
+
     try {
-      inject(new Path(args[0]));
+      inject(new Path(args[0]), schemaId);
       LOG.info("InjectorJob: finished");
       return -0;
     } catch (Exception e) {
diff --git a/src/java/org/apache/nutch/crawl/WebTableReader.java b/src/java/org/apache/nutch/crawl/WebTableReader.java
index cb0cf6d..8c4383e 100644
--- a/src/java/org/apache/nutch/crawl/WebTableReader.java
+++ b/src/java/org/apache/nutch/crawl/WebTableReader.java
@@ -59,6 +59,7 @@ public class WebTableReader extends Configured implements Tool {
     public WebTableStatMapper() {
     }
 
+    @Override
     public void setup(Context context) {
       sort = context.getConfiguration().getBoolean("db.reader.stats.sort",
           false);
@@ -92,9 +93,11 @@ public class WebTableReader extends Configured implements Tool {
       Reducer<Text, LongWritable, Text, LongWritable> {
     LongWritable val = new LongWritable();
 
+    @Override
     public void setup(Context context) {
     }
 
+    @Override
     public void cleanup(Context context) {
     }
 
@@ -136,6 +139,7 @@ public class WebTableReader extends Configured implements Tool {
   public static class WebTableStatReducer extends
       Reducer<Text, LongWritable, Text, LongWritable> {
 
+    @Override
     public void cleanup(Context context) {
     }
 
@@ -190,7 +194,7 @@ public class WebTableReader extends Configured implements Tool {
 
   }
 
-  public void processStatJob(boolean sort) throws IOException,
+  public void processStatJob(String schemaId, boolean sort) throws IOException,
       ClassNotFoundException, InterruptedException {
 
     if (LOG.isInfoEnabled()) {
@@ -204,8 +208,8 @@ public class WebTableReader extends Configured implements Tool {
 
     job.getConfiguration().setBoolean("db.reader.stats.sort", sort);
 
-    DataStore<String, WebPage> store = StorageUtils.createDataStore(job
-        .getConfiguration(), String.class, WebPage.class);
+    DataStore<String, WebPage> store = StorageUtils.createWebStore(job
+        .getConfiguration(), String.class, WebPage.class, schemaId);
     Query<String, WebPage> query = store.newQuery();
     query.setFields(WebPage._ALL_FIELDS);
 
@@ -301,10 +305,10 @@ public class WebTableReader extends Configured implements Tool {
   }
 
   /** Prints out the entry to the standard out **/
-  private void read(String key, boolean dumpContent, boolean dumpHeaders,
+  private void read(String schemaId, String key, boolean dumpContent, boolean dumpHeaders,
       boolean dumpLinks, boolean dumpText) throws ClassNotFoundException, IOException {
-    DataStore<String, WebPage> datastore = StorageUtils.createDataStore(getConf(),
-        String.class, WebPage.class);
+    DataStore<String, WebPage> datastore = StorageUtils.createWebStore(getConf(),
+        String.class, WebPage.class, schemaId);
 
     Query<String, WebPage> query = datastore.newQuery();
     String reversedUrl = TableUtil.reverseUrl(key);
@@ -375,7 +379,7 @@ public class WebTableReader extends Configured implements Tool {
 
   }
 
-  public void processDumpJob(String output, Configuration config, String regex,
+  public void processDumpJob(String schemaId, String output, Configuration config, String regex,
       boolean content, boolean headers, boolean links, boolean text)
       throws IOException, ClassNotFoundException, InterruptedException {
 
@@ -391,9 +395,9 @@ public class WebTableReader extends Configured implements Tool {
     cfg.setBoolean(WebTableRegexMapper.headersParamName, headers);
     cfg.setBoolean(WebTableRegexMapper.linksParamName, links);
     cfg.setBoolean(WebTableRegexMapper.textParamName, text);
-    
-    DataStore<String, WebPage> store = StorageUtils.createDataStore(job
-        .getConfiguration(), String.class, WebPage.class);
+
+    DataStore<String, WebPage> store = StorageUtils.createWebStore(job
+        .getConfiguration(), String.class, WebPage.class, schemaId);
     Query<String, WebPage> query = store.newQuery();
     query.setFields(WebPage._ALL_FIELDS);
 
@@ -425,9 +429,9 @@ public class WebTableReader extends Configured implements Tool {
     sb.append("prevFetchTime:\t" + page.getPrevFetchTime()).append("\n");
     sb.append("retries:\t" + page.getRetriesSinceFetch()).append("\n");
     sb.append("modifiedTime:\t" + page.getModifiedTime()).append("\n");
-    sb.append("protocolStatus:\t" + 
+    sb.append("protocolStatus:\t" +
         ProtocolStatusUtils.toString(page.getProtocolStatus())).append("\n");
-    sb.append("parseStatus:\t" + 
+    sb.append("parseStatus:\t" +
         ParseStatusUtils.toString(page.getParseStatus())).append("\n");
     sb.append("title:\t" + page.getTitle()).append("\n");
     sb.append("score:\t" + page.getScore()).append("\n");
@@ -467,7 +471,7 @@ public class WebTableReader extends Configured implements Tool {
       if (headers != null) {
         for (Entry<Utf8,Utf8> e : headers.entrySet()) {
           sb.append("header:\t" + e.getKey() + "\t" + e.getValue() + "\n");
-        }        
+        }
       }
     }
     ByteBuffer content = page.getContent();
@@ -481,9 +485,9 @@ public class WebTableReader extends Configured implements Tool {
     if (text != null && dumpText) {
       sb.append("text:start:\n");
       sb.append(text.toString());
-      sb.append("\ntext:end:\n");      
+      sb.append("\ntext:end:\n");
     }
-    
+
     return sb.toString();
   }
 
@@ -492,13 +496,14 @@ public class WebTableReader extends Configured implements Tool {
         args);
     System.exit(res);
   }
-  
+
   private static enum Op {READ, STAT, DUMP};
 
   public int run(String[] args) throws Exception {
     if (args.length < 1) {
       System.err
-          .println("Usage: WebTableReader (-stats | -url [url] | -dump <out_dir> [-regex regex]) [-content] [-headers] [-links] [-text]");
+          .println("Usage: WebTableReader (-stats | -url [url] | -dump <out_dir> [-regex regex]) [-schemaId <id>] [-content] [-headers] [-links] [-text]");
+      System.err.println("\t-schemaId <id>\t the id to prefix the schemas to operate on, (default: storage.schema.id)");
       System.err
           .println("\t-stats [-sort] \tprint overall statistics to System.out");
       System.err.println("\t\t[-sort]\tlist status sorted by host");
@@ -521,6 +526,7 @@ public class WebTableReader extends Configured implements Tool {
     boolean headers = false;
     boolean toSort = false;
     String regex = ".+";
+    String schemaId = null;
     Op op = null;
     try {
       for (int i = 0; i < args.length; i++) {
@@ -546,6 +552,8 @@ public class WebTableReader extends Configured implements Tool {
           text = true;
         } else if (args[i].equals("-regex")) {
           regex = args[++i];
+        } else if (args[i].equals("-schemaId")) {
+          schemaId = args[++i];
         }
       }
       if (op == null) {
@@ -553,13 +561,13 @@ public class WebTableReader extends Configured implements Tool {
       }
       switch (op) {
       case READ:
-        read(param, content, headers, links, text);
+        read(schemaId, param, content, headers, links, text);
         break;
       case STAT:
-        processStatJob(toSort);
+        processStatJob(schemaId, toSort);
         break;
       case DUMP:
-        processDumpJob(param, getConf(), regex, content, headers, links, text);
+        processDumpJob(schemaId, param, getConf(), regex, content, headers, links, text);
         break;
       }
       return 0;
diff --git a/src/java/org/apache/nutch/fetcher/FetcherJob.java b/src/java/org/apache/nutch/fetcher/FetcherJob.java
index 3d6fdcd..2fb06f9 100644
--- a/src/java/org/apache/nutch/fetcher/FetcherJob.java
+++ b/src/java/org/apache/nutch/fetcher/FetcherJob.java
@@ -38,7 +38,7 @@ public class FetcherJob implements Tool {
   public static final int PERM_REFRESH_TIME = 5;
 
   public static final Utf8 REDIRECT_DISCOVERED = new Utf8("___rdrdsc__");
-  
+
   public static final String RESUME_KEY = "fetcher.job.resume";
   public static final String PARSE_KEY = "fetcher.parse";
   public static final String THREADS_KEY = "fetcher.threads.fetch";
@@ -107,11 +107,11 @@ public class FetcherJob implements Tool {
   public static final Logger LOG = LoggerFactory.getLogger(FetcherJob.class);
 
   private Configuration conf;
-  
+
   public FetcherJob() {
-    
+
   }
-  
+
   public FetcherJob(Configuration conf) {
     setConf(conf);
   }
@@ -150,7 +150,8 @@ public class FetcherJob implements Tool {
    * @return 0 on success
    * @throws Exception
    */
-  public int fetch(String crawlId, int threads, boolean shouldResume, boolean parse, int numTasks)
+  public int fetch(String crawlId, String schemaId, int threads,
+      boolean shouldResume, boolean parse, int numTasks)
       throws Exception {
     LOG.info("FetcherJob: starting");
 
@@ -162,7 +163,7 @@ public class FetcherJob implements Tool {
     getConf().set(GeneratorJob.CRAWL_ID, crawlId);
     getConf().setBoolean(PARSE_KEY, parse);
     getConf().setBoolean(RESUME_KEY, shouldResume);
-    
+
     // set the actual time for the timelimit relative
     // to the beginning of the whole job and not of a specific task
     // otherwise it keeps trying again if a task fails
@@ -184,9 +185,9 @@ public class FetcherJob implements Tool {
 
     Job job = new NutchJob(getConf(), "fetch");
     Collection<WebPage.Field> fields = getFields(job);
-    StorageUtils.initMapperJob(job, fields, IntWritable.class,
+    StorageUtils.initMapperJob(job, schemaId, fields, IntWritable.class,
         FetchEntry.class, FetcherMapper.class, PartitionUrlByHost.class, false);
-    StorageUtils.initReducerJob(job, FetcherReducer.class);
+    StorageUtils.initReducerJob(job, schemaId, FetcherReducer.class);
     if (numTasks < 1) {
       job.setNumReduceTasks(job.getConfiguration().getInt("mapred.map.tasks",
           job.getNumReduceTasks()));
@@ -242,10 +243,12 @@ public class FetcherJob implements Tool {
     int threads = -1;
     boolean shouldResume = false;
     boolean parse = getConf().getBoolean(PARSE_KEY, false);
-    String crawlId;
+    String crawlId, schemaId = null;
 
-    String usage = "Usage: FetcherJob (<crawl id> | -all) [-threads N] [-parse] [-resume] [-numTasks N]\n" +
+    String usage = "Usage: FetcherJob (<crawl id> | -all) [-schema <id>] " +
+      "[-threads N] [-parse] [-resume] [-numTasks N]\n" +
       "\tcrawlId\tcrawl identifier returned by Generator, or -all for all generated crawlId-s\n" +
+      "\t-schemaId <id>\t the id to prefix the schemas to operate on, (default: storage.schema.id)\n" +
       "\t-threads N\tnumber of fetching threads per task\n" +
       "\t-parse\tif specified then fetcher will immediately parse fetched content\n" +
       "\t-resume\tresume interrupted job\n" +
@@ -271,11 +274,13 @@ public class FetcherJob implements Tool {
       } else if ("-parse".equals(args[i])) {
         parse = true;
       } else if ("-numTasks".equals(args[i])) {
-      		numTasks = Integer.parseInt(args[++i]);
+        numTasks = Integer.parseInt(args[++i]);
+      } else if ("-schemaId".equals(args[i])) {
+        schemaId = args[++i];
       }
     }
 
-    int fetchcode = fetch(crawlId, threads, shouldResume, parse, numTasks); // run the Fetcher
+    int fetchcode = fetch(crawlId, schemaId, threads, shouldResume, parse, numTasks); // run the Fetcher
 
     return fetchcode;
   }
diff --git a/src/java/org/apache/nutch/indexer/IndexerJob.java b/src/java/org/apache/nutch/indexer/IndexerJob.java
index 3b791af..8e6d879 100644
--- a/src/java/org/apache/nutch/indexer/IndexerJob.java
+++ b/src/java/org/apache/nutch/indexer/IndexerJob.java
@@ -94,7 +94,7 @@ implements Tool {
     return columns;
   }
 
-  protected Job createIndexJob(Configuration conf, String jobName, String crawlId)
+  protected Job createIndexJob(Configuration conf, String jobName, String crawlId, String schemaId)
   throws IOException, ClassNotFoundException {
     conf.set(GeneratorJob.CRAWL_ID, crawlId);
     Job job = new NutchJob(conf, jobName);
@@ -103,7 +103,7 @@ implements Tool {
         StringComparator.class, RawComparator.class);
 
     Collection<WebPage.Field> fields = getFields(job);
-    StorageUtils.initMapperJob(job, fields, String.class, WebPage.class, this.getClass());
+    StorageUtils.initMapperJob(job, schemaId, fields, String.class, WebPage.class, this.getClass());
     job.setReducerClass(IndexerReducer.class);
     job.setOutputFormatClass(IndexerOutputFormat.class);
     return job;
diff --git a/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java b/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java
index cdd4ba9..b7af690 100644
--- a/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java
+++ b/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java
@@ -36,13 +36,13 @@ public class SolrIndexerJob extends IndexerJob {
 
   public static Logger LOG = LoggerFactory.getLogger(SolrIndexerJob.class);
 
-  private void indexSolr(String solrUrl, String crawlId) throws Exception {
+  private void indexSolr(String solrUrl, String crawlId, String schemaId) throws Exception {
     LOG.info("SolrIndexerJob: starting");
 
     NutchIndexWriterFactory.addClassToConf(getConf(), SolrWriter.class);
     getConf().set(SolrConstants.SERVER_URL, solrUrl);
 
-    Job job = createIndexJob(getConf(), "solr-index", crawlId);
+    Job job = createIndexJob(getConf(), "solr-index", crawlId, schemaId);
     Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
                 + new Random().nextInt());
 
@@ -61,12 +61,16 @@ public class SolrIndexerJob extends IndexerJob {
 
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
-      System.err.println("Usage: SolrIndexerJob <solr url> (<crawl id> | -all | -reindex)");
+      System.err.println("Usage: SolrIndexerJob <solr url> (<crawl id> | -all | -reindex) [-schemaId <id>]");
       return -1;
     }
 
+    String schemaId = null;
+    if (args.length == 4 && "-schemaId".equals(args[2])) {
+      schemaId = args[3];
+    }
     try {
-      indexSolr(args[0], args[1]);
+      indexSolr(args[0], args[1], schemaId);
       return 0;
     } catch (final Exception e) {
       LOG.error("SolrIndexerJob: " + StringUtils.stringifyException(e));
diff --git a/src/java/org/apache/nutch/parse/ParserJob.java b/src/java/org/apache/nutch/parse/ParserJob.java
index cda297f..5531904 100644
--- a/src/java/org/apache/nutch/parse/ParserJob.java
+++ b/src/java/org/apache/nutch/parse/ParserJob.java
@@ -29,7 +29,7 @@ public class ParserJob extends GoraMapper<String, WebPage, String, WebPage>
     implements Tool {
 
   public static final Logger LOG = LoggerFactory.getLogger(ParserJob.class);
-  
+
   private static final String RESUME_KEY = "parse.job.resume";
   private static final String FORCE_KEY = "parse.job.force";
 
@@ -51,15 +51,15 @@ public class ParserJob extends GoraMapper<String, WebPage, String, WebPage>
   private ParseUtil parseUtil;
 
   private boolean shouldResume;
-  
+
   private boolean force;
 
   private Utf8 crawlId;
-  
+
   public ParserJob() {
-    
+
   }
-  
+
   public ParserJob(Configuration conf) {
     setConf(conf);
   }
@@ -69,7 +69,7 @@ public class ParserJob extends GoraMapper<String, WebPage, String, WebPage>
     Configuration conf = context.getConfiguration();
     parseUtil = new ParseUtil(conf);
     shouldResume = conf.getBoolean(RESUME_KEY, false);
-    force = conf.getBoolean(FORCE_KEY, false);    
+    force = conf.getBoolean(FORCE_KEY, false);
     crawlId = new Utf8(conf.get(GeneratorJob.CRAWL_ID, Nutch.ALL_CRAWL_ID_STR));
   }
 
@@ -87,7 +87,7 @@ public class ParserJob extends GoraMapper<String, WebPage, String, WebPage>
       if (force) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Forced parsing " + TableUtil.unreverseUrl(key) + "; already parsed");
-        }        
+        }
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; already parsed");
@@ -144,7 +144,7 @@ public class ParserJob extends GoraMapper<String, WebPage, String, WebPage>
     this.conf = conf;
   }
 
-  public int parse(String crawlId, boolean shouldResume, boolean force) throws Exception {
+  public int parse(String crawlId, String schemaId, boolean shouldResume, boolean force) throws Exception {
     LOG.info("ParserJob: starting");
 
     if (crawlId != null) {
@@ -164,9 +164,9 @@ public class ParserJob extends GoraMapper<String, WebPage, String, WebPage>
     final Job job = new NutchJob(getConf(), "parse");
 
     Collection<WebPage.Field> fields = getFields(job);
-    StorageUtils.initMapperJob(job, fields, String.class, WebPage.class,
+    StorageUtils.initMapperJob(job, schemaId, fields, String.class, WebPage.class,
         ParserJob.class);
-    StorageUtils.initReducerJob(job, IdentityPageReducer.class);
+    StorageUtils.initReducerJob(job, schemaId, IdentityPageReducer.class);
     job.setNumReduceTasks(0);
     boolean success = job.waitForCompletion(true);
     if (!success){
@@ -180,36 +180,39 @@ public class ParserJob extends GoraMapper<String, WebPage, String, WebPage>
   public int run(String[] args) throws Exception {
     boolean shouldResume = false;
     boolean force = false;
-    String crawlId = null;
+    String crawlId = null, schemaId = null;
 
     if (args.length < 1) {
-      System.err.println("Usage: ParserJob (<crawlId> | -all) [-resume] [-force]");
+      System.err.println("Usage: ParserJob (<crawlId> | -all) [-schemaId <id>] [-resume] [-force]");
       System.err.println("\tcrawlId\tsymbolic crawl ID created by Generator");
+      System.err.println("\t-schemaId <id>\t the id to prefix the schemas to operate on, (default: storage.schema.id)");
       System.err.println("\t-all\tconsider pages from all crawl jobs");
       System.err.println("-resume\tresume a previous incomplete job");
       System.err.println("-force\tforce re-parsing even if a page is already parsed");
       return -1;
     }
-    for (String s : args) {
-      if ("-resume".equals(s)) {
+    for (int i = 0; i < args.length; i++) {
+      if ("-resume".equals(args[i])) {
         shouldResume = true;
-      } else if ("-force".equals(s)) {
+      } else if ("-force".equals(args[i])) {
         force = true;
-      } else if ("-all".equals(s)) {
-        crawlId = s;
+      } else if ("-schemaId".equals(args[i])) {
+        schemaId = args[++i];
+      } else if ("-all".equals(args[i])) {
+        crawlId = args[i];
       } else {
         if (crawlId != null) {
           System.err.println("CrawlId already set to '" + crawlId + "'!");
           return -1;
         }
-        crawlId = s;
+        crawlId = args[i];
       }
     }
     if (crawlId == null) {
       System.err.println("CrawlId not set (or -all not specified)!");
       return -1;
     }
-    return parse(crawlId, shouldResume, force);
+    return parse(crawlId, schemaId, shouldResume, force);
   }
 
   public static void main(String[] args) throws Exception {
diff --git a/src/java/org/apache/nutch/storage/StorageUtils.java b/src/java/org/apache/nutch/storage/StorageUtils.java
index 6c1e67f..9197f08 100644
--- a/src/java/org/apache/nutch/storage/StorageUtils.java
+++ b/src/java/org/apache/nutch/storage/StorageUtils.java
@@ -27,6 +27,23 @@ public class StorageUtils {
   }
 
   @SuppressWarnings("unchecked")
+  public static <K, V extends Persistent> DataStore<K, V> createWebStore(Configuration conf,
+      Class<K> keyClass, Class<V> persistentClass, String schemaId) throws ClassNotFoundException {
+    String schema = conf.get("storage.schema", "webpage");
+    if (schemaId == null) {
+      schemaId = conf.get("storage.schema.id", "");
+    }
+    if (!schemaId.isEmpty()) {
+      schema = schemaId + "_" + schema;
+    }
+
+    Class<? extends DataStore<K, V>> dataStoreClass =
+      (Class<? extends DataStore<K, V>>) getDataStoreClass(conf);
+    return DataStoreFactory.createDataStore(dataStoreClass,
+            keyClass, persistentClass, schema);
+  }
+
+  @SuppressWarnings("unchecked")
   public static <K, V extends Persistent> Class<? extends DataStore<K, V>>
   getDataStoreClass(Configuration conf)  throws ClassNotFoundException {
     return (Class<? extends DataStore<K, V>>)
@@ -34,39 +51,42 @@ public class StorageUtils {
           "org.gora.hbase.store.HBaseStore"));
   }
 
-  public static <K, V> void initMapperJob(Job job,
+  public static <K, V> void initMapperJob(Job job, String schemaId,
       Collection<WebPage.Field> fields,
       Class<K> outKeyClass, Class<V> outValueClass,
       Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass, boolean reuseObjects)
   throws ClassNotFoundException, IOException {
-    initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass, null, reuseObjects);
+    initMapperJob(job, schemaId, fields, outKeyClass, outValueClass,
+        mapperClass, null, reuseObjects);
   }
 
-  public static <K, V> void initMapperJob(Job job,
+  public static <K, V> void initMapperJob(Job job, String schemaId,
       Collection<WebPage.Field> fields,
       Class<K> outKeyClass, Class<V> outValueClass,
       Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass)
   throws ClassNotFoundException, IOException {
-    initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass, null, true);
+    initMapperJob(job, schemaId, fields, outKeyClass, outValueClass,
+        mapperClass, null, true);
   }
 
-  public static <K, V> void initMapperJob(Job job,
+  public static <K, V> void initMapperJob(Job job, String schemaId,
       Collection<WebPage.Field> fields,
       Class<K> outKeyClass, Class<V> outValueClass,
       Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
       Class<? extends Partitioner<K, V>> partitionerClass)
   throws ClassNotFoundException, IOException {
-    initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass, partitionerClass, true);
+    initMapperJob(job, schemaId, fields, outKeyClass, outValueClass,
+        mapperClass, partitionerClass, true);
   }
 
-  public static <K, V> void initMapperJob(Job job,
+  public static <K, V> void initMapperJob(Job job, String schemaId,
       Collection<WebPage.Field> fields,
       Class<K> outKeyClass, Class<V> outValueClass,
       Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
       Class<? extends Partitioner<K, V>> partitionerClass, boolean reuseObjects)
   throws ClassNotFoundException, IOException {
-    DataStore<String, WebPage> store =
-      createDataStore(job.getConfiguration(), String.class, WebPage.class);
+    DataStore<String, WebPage> store = createWebStore(job.getConfiguration(),
+        String.class, WebPage.class, schemaId);
     if (store==null) throw new RuntimeException("Could not create datastore");
     Query<String, WebPage> query = store.newQuery();
     query.setFields(toStringArray(fields));
@@ -75,12 +95,12 @@ public class StorageUtils {
     GoraOutputFormat.setOutput(job, store, true);
   }
 
-  public static <K, V> void initReducerJob(Job job,
+  public static <K, V> void initReducerJob(Job job, String schemaId,
       Class<? extends GoraReducer<K, V, String, WebPage>> reducerClass)
   throws ClassNotFoundException {
     Configuration conf = job.getConfiguration();
     DataStore<String, WebPage> store =
-      StorageUtils.createDataStore(conf, String.class, WebPage.class);
+      StorageUtils.createWebStore(conf, String.class, WebPage.class, schemaId);
     GoraReducer.initReducerJob(job, store, reducerClass);
     GoraOutputFormat.setOutput(job, store, true);
   }
diff --git a/src/java/org/apache/nutch/tools/Benchmark.java b/src/java/org/apache/nutch/tools/Benchmark.java
index 1d8f78e..ae751fb 100644
--- a/src/java/org/apache/nutch/tools/Benchmark.java
+++ b/src/java/org/apache/nutch/tools/Benchmark.java
@@ -32,7 +32,7 @@ public class Benchmark extends Configured implements Tool {
     int res = ToolRunner.run(conf, new Benchmark(), args);
     System.exit(res);
   }
-  
+
   private void createSeeds(FileSystem fs, Path seedsDir, int count) throws Exception {
     OutputStream os = fs.create(new Path(seedsDir, "seeds"));
     for (int i = 0; i < count; i++) {
@@ -42,7 +42,7 @@ public class Benchmark extends Configured implements Tool {
     os.flush();
     os.close();
   }
-  
+
   public static final class BenchmarkResults {
     Map<String,Map<String,Long>> timings = new HashMap<String,Map<String,Long>>();
     List<String> runs = new ArrayList<String>();
@@ -51,7 +51,7 @@ public class Benchmark extends Configured implements Tool {
     long topN;
     long elapsed;
     String plugins;
-    
+
     public void addTiming(String stage, String run, long timing) {
       if (!runs.contains(run)) {
         runs.add(run);
@@ -66,7 +66,8 @@ public class Benchmark extends Configured implements Tool {
       }
       t.put(run, timing);
     }
-    
+
+    @Override
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("* Plugins:\t" + plugins + "\n");
@@ -89,7 +90,7 @@ public class Benchmark extends Configured implements Tool {
       }
       return sb.toString();
     }
-    
+
     public List<String> getStages() {
       return stages;
     }
@@ -97,7 +98,7 @@ public class Benchmark extends Configured implements Tool {
       return runs;
     }
   }
-  
+
   public int run(String[] args) throws Exception {
     String plugins = "protocol-http|parse-tika|scoring-opic|urlfilter-regex|urlnormalizer-pass";
     int seeds = 1;
@@ -105,9 +106,11 @@ public class Benchmark extends Configured implements Tool {
     int threads = 10;
     //boolean delete = true;
     long topN = Long.MAX_VALUE;
-    
+    String schemaId = null;
+
     if (args.length == 0) {
-      System.err.println("Usage: Benchmark [-seeds NN] [-depth NN] [-threads NN] [-maxPerHost NN] [-plugins <regex>]");
+      System.err.println("Usage: Benchmark [-schema <id>] [-seeds NN] [-depth NN] [-threads NN] [-maxPerHost NN] [-plugins <regex>]");
+      System.err.println("\t-schemaId id\t the id to prefix the schemas to operate on, (default: storage.schema.id)");
       System.err.println("\t-seeds NN\tcreate NN unique hosts in a seed list (default: 1)");
       System.err.println("\t-depth NN\tperform NN crawl cycles (default: 10)");
       System.err.println("\t-threads NN\tuse NN threads per Fetcher task (default: 10)");
@@ -121,7 +124,9 @@ public class Benchmark extends Configured implements Tool {
     }
     int maxPerHost = Integer.MAX_VALUE;
     for (int i = 0; i < args.length; i++) {
-      if (args[i].equals("-seeds")) {
+      if (args[i].equals("-schema")) {
+        schemaId = args[++i];
+      } else if (args[i].equals("-seeds")) {
         seeds = Integer.parseInt(args[++i]);
       } else if (args[i].equals("-threads")) {
         threads = Integer.parseInt(args[++i]);
@@ -136,12 +141,12 @@ public class Benchmark extends Configured implements Tool {
         return -1;
       }
     }
-    BenchmarkResults res = benchmark(seeds, depth, threads, maxPerHost, topN, plugins);
+    BenchmarkResults res = benchmark(schemaId, seeds, depth, threads, maxPerHost, topN, plugins);
     System.out.println(res);
     return 0;
   }
-  
-  public BenchmarkResults benchmark(int seeds, int depth, int threads, int maxPerHost,
+
+  public BenchmarkResults benchmark(String schemaId, int seeds, int depth, int threads, int maxPerHost,
         long topN, String plugins) throws Exception {
     Configuration conf = getConf();
     conf.set("http.proxy.host", "localhost");
@@ -153,7 +158,7 @@ public class Benchmark extends Configured implements Tool {
     }
     conf.setInt(GeneratorJob.GENERATOR_MAX_COUNT, maxPerHost);
     conf.set(GeneratorJob.GENERATOR_COUNT_MODE, GeneratorJob.GENERATOR_COUNT_VALUE_HOST);
-    Job job = new NutchJob(conf);    
+    Job job = new NutchJob(conf);
     FileSystem fs = FileSystem.get(job.getConfiguration());
     Path dir = new Path(getConf().get("hadoop.tmp.dir"),
             "bench-" + System.currentTimeMillis());
@@ -166,16 +171,16 @@ public class Benchmark extends Configured implements Tool {
       LOG.info("crawl started in: " + dir);
       LOG.info("rootUrlDir = " + rootUrlDir);
       LOG.info("threads = " + threads);
-      LOG.info("depth = " + depth);      
+      LOG.info("depth = " + depth);
     }
-    
+
     BenchmarkResults res = new BenchmarkResults();
     res.depth = depth;
     res.plugins = plugins;
     res.seeds = seeds;
     res.threads = threads;
     res.topN = topN;
-    
+
     res.elapsed = System.currentTimeMillis();
     InjectorJob injector = new InjectorJob(conf);
     GeneratorJob generator = new GeneratorJob(conf);
@@ -184,17 +189,17 @@ public class Benchmark extends Configured implements Tool {
     DbUpdaterJob crawlDbTool = new DbUpdaterJob(conf);
     // not needed in the new API
     //LinkDb linkDbTool = new LinkDb(getConf());
-    
+
     long start = System.currentTimeMillis();
     // initialize crawlDb
-    injector.inject(rootUrlDir);
+    injector.inject(rootUrlDir, schemaId);
     long delta = System.currentTimeMillis() - start;
     res.addTiming("inject", "0", delta);
     int i;
     for (i = 0; i < depth; i++) {             // generate new segment
       start = System.currentTimeMillis();
       String crawlId = generator.generate(topN, System.currentTimeMillis(),
-              false, false);
+              false, false, schemaId);
       delta = System.currentTimeMillis() - start;
       res.addTiming("generate", i + "", delta);
       if (crawlId == null) {
@@ -203,12 +208,12 @@ public class Benchmark extends Configured implements Tool {
       }
       boolean isParsing = getConf().getBoolean("fetcher.parse", true);
       start = System.currentTimeMillis();
-      fetcher.fetch(crawlId, threads, false, isParsing, -1);  // fetch it
+      fetcher.fetch(crawlId, schemaId, threads, false, isParsing, -1);  // fetch it
       delta = System.currentTimeMillis() - start;
       res.addTiming("fetch", i + "", delta);
       if (!isParsing) {
         start = System.currentTimeMillis();
-        parseSegment.parse(crawlId, false, false);    // parse it, if needed
+        parseSegment.parse(crawlId, schemaId, false, false);    // parse it, if needed
         delta = System.currentTimeMillis() - start;
         res.addTiming("parse", i + "", delta);
       }
@@ -224,7 +229,7 @@ public class Benchmark extends Configured implements Tool {
     res.elapsed = System.currentTimeMillis() - res.elapsed;
     WebTableReader dbreader = new WebTableReader();
     dbreader.setConf(conf);
-    dbreader.processStatJob(false);
+    dbreader.processStatJob(schemaId, false);
     return res;
   }
 
diff --git a/src/test/org/apache/nutch/crawl/TestGenerator.java b/src/test/org/apache/nutch/crawl/TestGenerator.java
index e37b18a..7234f6f 100644
--- a/src/test/org/apache/nutch/crawl/TestGenerator.java
+++ b/src/test/org/apache/nutch/crawl/TestGenerator.java
@@ -41,7 +41,7 @@ import org.apache.nutch.util.TableUtil;
 public class TestGenerator extends AbstractNutchTest {
 
   public static final Logger LOG = LoggerFactory.getLogger(TestGenerator.class);
-  
+
   private static String[] FIELDS = new String[] {
     WebPage.Field.MARKERS.getName(),
     WebPage.Field.SCORE.getName()
@@ -172,7 +172,7 @@ public class TestGenerator extends AbstractNutchTest {
       webPageStore.put(TableUtil.reverseUrl(uwp.getUrl()), uwp.getDatum());
     }
     webPageStore.flush();
-    
+
     Configuration myConfiguration = new Configuration(conf);
     myConfiguration.setInt(GeneratorJob.GENERATOR_MAX_COUNT, 1);
     myConfiguration.set(GeneratorJob.GENERATOR_COUNT_MODE, GeneratorJob.GENERATOR_COUNT_VALUE_DOMAIN);
@@ -255,7 +255,7 @@ public class TestGenerator extends AbstractNutchTest {
     // generate segment
     GeneratorJob g = new GeneratorJob();
     g.setConf(config);
-    String crawlId = g.generate(numResults, System.currentTimeMillis(), filter, false);
+    String crawlId = g.generate(numResults, System.currentTimeMillis(), filter, false, null);
     if (crawlId == null)
       throw new RuntimeException("Generator failed");
   }
diff --git a/src/test/org/apache/nutch/crawl/TestInjector.java b/src/test/org/apache/nutch/crawl/TestInjector.java
index d8c869c..928a7cf 100644
--- a/src/test/org/apache/nutch/crawl/TestInjector.java
+++ b/src/test/org/apache/nutch/crawl/TestInjector.java
@@ -44,7 +44,7 @@ import org.junit.Before;
  * Basic injector test: 1. Creates a text file with urls 2. Injects them into
  * crawldb 3. Reads crawldb entries and verifies contents 4. Injects more urls
  * into webdb 5. Reads crawldb entries and verifies contents
- * 
+ *
  * @author nutch-dev <nutch-dev at lucene.apache.org>
  */
 public class TestInjector extends AbstractNutchTest {
@@ -67,7 +67,7 @@ public class TestInjector extends AbstractNutchTest {
 
     InjectorJob injector = new InjectorJob();
     injector.setConf(conf);
-    injector.inject(urlPath);
+    injector.inject(urlPath, null);
 
     // verify results
     List<String> read = readDb();
@@ -89,7 +89,7 @@ public class TestInjector extends AbstractNutchTest {
       urlsCheck.add(u + "\tnutch.score=1");
     }
     CrawlTestUtil.generateSeedList(fs, urlPath, urls2);
-    injector.inject(urlPath);
+    injector.inject(urlPath, null);
     urls.addAll(urlsCheck);
 
     // verify results
@@ -104,13 +104,13 @@ public class TestInjector extends AbstractNutchTest {
     assertTrue(urls.containsAll(read));
 
   }
-  
+
   private static final String[] fields = new String[] {
     WebPage.Field.MARKERS.getName(),
     WebPage.Field.METADATA.getName(),
     WebPage.Field.SCORE.getName()
   };
-  
+
   private List<String> readDb() throws Exception {
     List<URLWebPage> pages = CrawlTestUtil.readContents(webPageStore, null, fields);
     ArrayList<String> read = new ArrayList<String>();
diff --git a/src/test/org/apache/nutch/fetcher/TestFetcher.java b/src/test/org/apache/nutch/fetcher/TestFetcher.java
index 1b97fc3..3a8720e 100644
--- a/src/test/org/apache/nutch/fetcher/TestFetcher.java
+++ b/src/test/org/apache/nutch/fetcher/TestFetcher.java
@@ -46,6 +46,7 @@ public class TestFetcher extends AbstractNutchTest {
   Path urlPath;
   Server server;
 
+  @Override
   public void setUp() throws Exception{
     super.setUp();
     urlPath = new Path(testdir, "urls");
@@ -53,47 +54,48 @@ public class TestFetcher extends AbstractNutchTest {
     server.start();
   }
 
+  @Override
   public void tearDown() throws Exception{
     server.stop();
     fs.delete(testdir, true);
   }
-  
+
   public void testFetch() throws Exception {
-    
+
     //generate seedlist
     ArrayList<String> urls = new ArrayList<String>();
-    
+
     addUrl(urls,"index.html");
     addUrl(urls,"pagea.html");
     addUrl(urls,"pageb.html");
     addUrl(urls,"dup_of_pagea.html");
     addUrl(urls,"nested_spider_trap.html");
     addUrl(urls,"exception.html");
-    
+
     CrawlTestUtil.generateSeedList(fs, urlPath, urls);
-    
+
     //inject
     InjectorJob injector = new InjectorJob(conf);
-    injector.inject(urlPath);
+    injector.inject(urlPath, null);
 
     //generate
     long time = System.currentTimeMillis();
     GeneratorJob g = new GeneratorJob(conf);
-    String crawlId = g.generate(Long.MAX_VALUE, time, false, false);
+    String crawlId = g.generate(Long.MAX_VALUE, time, false, false, null);
 
     //fetch
     time = System.currentTimeMillis();
     conf.setBoolean(FetcherJob.PARSE_KEY, true);
     FetcherJob fetcher = new FetcherJob(conf);
-    fetcher.fetch(crawlId, 1, false, true, -1);
+    fetcher.fetch(crawlId, null, 1, false, true, -1);
 
     time = System.currentTimeMillis() - time;
-    
+
     //verify politeness, time taken should be more than (num_of_pages +1)*delay
     int minimumTime = (int) ((urls.size() + 1) * 1000 *
         conf.getFloat("fetcher.server.delay", 5));
     assertTrue(time > minimumTime);
-    
+
     List<URLWebPage> pages = CrawlTestUtil.readContents(webPageStore, Mark.FETCH_MARK, (String[])null);
     assertEquals(urls.size(), pages.size());
     List<String> handledurls = new ArrayList<String>();
@@ -104,7 +106,7 @@ public class TestFetcher extends AbstractNutchTest {
       }
       String content = new String(bb.array());
       if (content.indexOf("Nutch fetcher test page")!=-1) {
-        handledurls.add(up.getUrl());        
+        handledurls.add(up.getUrl());
       }
     }
     Collections.sort(urls);
@@ -121,7 +123,7 @@ public class TestFetcher extends AbstractNutchTest {
   private void addUrl(ArrayList<String> urls, String page) {
     urls.add("http://127.0.0.1:" + server.getConnectors()[0].getPort() + "/" + page);
   }
-  
+
   public void testAgentNameCheck() {
 
     boolean failedNoAgentName = false;
diff --git a/src/test/org/apache/nutch/util/AbstractNutchTest.java b/src/test/org/apache/nutch/util/AbstractNutchTest.java
index 635a930..7fb6f71 100644
--- a/src/test/org/apache/nutch/util/AbstractNutchTest.java
+++ b/src/test/org/apache/nutch/util/AbstractNutchTest.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.nutch.crawl.URLWebPage;
 import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.StorageUtils;
 import org.apache.nutch.storage.WebPage;
 import org.apache.nutch.util.TableUtil;
 import org.gora.query.Query;
@@ -49,11 +50,12 @@ public class AbstractNutchTest extends TestCase {
   protected Path testdir = new Path("build/test/inject-test");
   protected DataStore<String, WebPage> webPageStore;
   protected boolean persistentDataStore = false;
-  
+
   @Override
   public void setUp() throws Exception {
     super.setUp();
     conf = CrawlTestUtil.createConfiguration();
+    conf.set("storage.data.store.class", "org.gora.sql.store.SqlStore");
     fs = FileSystem.get(conf);
     // using hsqldb in memory
     DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.driver","org.hsqldb.jdbcDriver");
@@ -61,13 +63,8 @@ public class AbstractNutchTest extends TestCase {
     DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.url","jdbc:hsqldb:mem:" + getClass().getName());
     DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.user","sa");
     DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.password","");
-    if (persistentDataStore) {
-      webPageStore = DataStoreFactory.getDataStore(SqlStore.class,
-          String.class, WebPage.class);      
-    } else {
-      webPageStore = DataStoreFactory.createDataStore(SqlStore.class,
-          String.class, WebPage.class);
-    }
+    webPageStore = StorageUtils.createWebStore(conf, String.class,
+        WebPage.class, null);
   }
 
   @Override
