Index: src/test/org/apache/nutch/storage/TestGoraStorage.java
===================================================================
--- src/test/org/apache/nutch/storage/TestGoraStorage.java	(revision 1180939)
+++ src/test/org/apache/nutch/storage/TestGoraStorage.java	(working copy)
@@ -16,228 +16,195 @@
  ******************************************************************************/
 package org.apache.nutch.storage;
 
-import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Iterator;
+import java.util.Collection;
 import java.util.List;
-import java.util.Random;
-import java.util.Vector;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.avro.util.Utf8;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.nutch.util.NutchConfiguration;
+import org.apache.commons.io.IOUtils;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.util.AbstractNutchTest;
+import org.apache.nutch.util.CrawlTestUtil;
+import org.hsqldb.Server;
 
-import junit.framework.TestCase;
+/**
+ * Tests basic Gora functionality by writing and reading webpages.
+ */
+public class TestGoraStorage extends AbstractNutchTest {
 
-public class TestGoraStorage extends TestCase {
-  Configuration conf;
-  
-  public void init() throws Exception {
-    conf = NutchConfiguration.create();
-  }
-  
-  public void setUp() throws Exception {
-    conf = NutchConfiguration.create();
-    DataStore<String,WebPage> store;
-    
-    store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
-    store.deleteByQuery(store.newQuery());
-    store.close();
+  /**
+   * Sequentially read and write pages to a store.
+   * 
+   * @throws Exception
+   */
+  public void testSinglethreaded() throws Exception {
+    String id = "singlethread";
+    readWrite(id, webPageStore);
   }
-  
-  private class Worker extends Thread {
-    DataStore<String,WebPage> store;
+
+  private static void readWrite(String id, DataStore<String, WebPage> store) 
+      throws IOException {
     WebPage page = new WebPage();
-    int start, count, id;
-    int reopenMark, reopenCount = 0;
-    boolean reopens = false;
-    
-    public Worker(int id, int start, int count, boolean reopens) {
-      this.id = id;
-      this.start = start;
-      this.count = count;
-      reopenMark = new Random().nextInt(count / 4) + count / 4;
-      this.reopens = reopens;
+    int max = 1000;
+    for (int i = 0; i < max; i++) {
+      // store a page with title
+      String key = "key-" + id + "-" + i;
+      String title = "title" + i;
+      page.setTitle(new Utf8(title));
+      store.put(key, page);
+      store.flush();
+
+      // retrieve page and check title
+      page = store.get(key);
+      assertNotNull(page);
+      assertEquals(title, page.getTitle().toString());
     }
-    
-    public void run() {
-      threadCount.incrementAndGet();
-      try {
-        store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
-        for (int i = 0; i < count; i++) {
-          if (i > 0 && ((count % 10) == 0)) {
-            System.out.println(" -W" + id + "(" + i + "/" + count + ")");
-          }
-          if (reopens && (i > 0) && (i % reopenMark) == 0) {
-            System.out.println(" -W" + id + " reopen " + (++reopenCount));
-            store.flush();
-            store.close();
-            store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
-          }
-          page.setTitle(new Utf8(String.valueOf(start + i)));
-          store.put(String.valueOf(start + i), page);
-          try {
-            sleep(10);
-          } catch (Exception e) {};
-        }
-        store.flush();
-        store.close();
-      } catch (Exception e) {
-        fail(e.getMessage());
-      }
-      threadCount.decrementAndGet();
+
+    // scan over the rows
+    Result<String, WebPage> result = store.execute(store.newQuery());
+    int count = 0;
+    while (result.next()) {
+      // only count keys in the store for the current id
+      if (result.getKey().contains(id))
+        count++;
     }
+    // check amount
+    assertEquals(max, count);
   }
-  
-  private AtomicInteger threadCount = new AtomicInteger(0);
-  
-  public void testMultithread() throws Exception {
-    int COUNT = 1000;
-    int NUM = 100;
-    DataStore<String,WebPage> store;
-    
-    for (int i = 0; i < NUM; i++) {
-      Worker w = new Worker(i, i * COUNT, COUNT, true);
-      w.start();
-    }
-    while (threadCount.get() > 0) {
-      try {
-        Thread.sleep(5000);
-        System.out.println("-threads " + threadCount.get() + "/" + NUM);
-      } catch (Exception e) {};
+
+  /**
+   * Tests multiple thread reading and writing to the same store, this should be
+   * no problem because {@link DataStore} implementations claim to be thread
+   * safe.
+   * 
+   * @throws Exception
+   */
+  public void testMultithreaded() throws Exception {
+    // create a fixed thread pool
+    int numThreads = 8;
+    ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+
+    // define a list of tasks
+    Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
+    for (int i = 0; i < numThreads; i++) {
+      tasks.add(new Callable<Integer>() {
+        @Override
+        public Integer call() {
+          try {
+            // run a sequence
+            readWrite(Thread.currentThread().getName(), webPageStore);
+            // everything ok, return 0
+            return 0;
+          } catch (Exception e) {
+            e.printStackTrace();
+            // this will fail the test
+            return 1;
+          }
+        }
+      });
     }
-    System.out.println("Verifying...");
-    store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
-    Result<String,WebPage> res = store.execute(store.newQuery());
-    int size = COUNT * NUM;
-    BitSet keys = new BitSet(size);
-    while (res.next()) {
-      String key = res.getKey();
-      WebPage p = res.get();
-      assertEquals(key, p.getTitle().toString());
-      int pos = Integer.parseInt(key);
-      assertTrue(pos < size && pos >= 0);
-      if (keys.get(pos)) {
-        fail("key " + key + " already set!");
-      }
-      keys.set(pos);
+
+    // submit them at once
+    List<Future<Integer>> results = pool.invokeAll(tasks);
+
+    // check results
+    for (Future<Integer> result : results) {
+      assertEquals(0, (int) result.get());
     }
-    assertEquals(size, keys.cardinality());
   }
   
+  /**
+   * Tests multiple processes reading and writing to the same store backend, 
+   * this is to simulate a multi process Nutch environment (i.e. MapReduce).
+   * 
+   * @throws Exception
+   */
   public void testMultiProcess() throws Exception {
-    int COUNT = 1000;
-    int NUM = 100;
-    DataStore<String,WebPage> store;
-    List<Process> procs = new ArrayList<Process>();
+    // create and start a hsql server, a stand-alone memory backed db
+    // (important: a stand-alone server should be used because simple
+    //  file based access i.e. jdbc:hsqldb:file is NOT thread-safe.)
+    Server server = new Server();
+    server.setDaemon(true);
+    server.setSilent(true); // disables LOTS of trace
+    final String className = getClass().getName();
+    server.setDatabasePath(0, "mem:" + className);
+    server.setDatabaseName(0, className);
+    server.start();
     
-    for (int i = 0; i < NUM; i++) {
-      Process p = launch(i, i * COUNT, COUNT);
-      procs.add(p);
-    }
+    // create a fixed thread pool
+    int numThreads = 4;
+    ExecutorService pool = Executors.newFixedThreadPool(numThreads);
     
-    while (procs.size() > 0) {
-      try {
-        Thread.sleep(5000);
-      } catch (Exception e) {};
-      Iterator<Process> it = procs.iterator();
-      while (it.hasNext()) {
-        Process p = it.next();
-        int code = 1;
-        try {
-          code = p.exitValue();
-          assertEquals(0, code);
-          it.remove();
-          p.destroy();
-        } catch (IllegalThreadStateException e) {
-          // not ready yet
+    // spawn multiple processes, each thread spawns own process
+    Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
+    for (int i = 0; i < numThreads; i++) {
+      tasks.add(new Callable<Integer>() {
+        @Override
+        public Integer call() {
+          try {
+            String separator = System.getProperty("file.separator");
+            String classpath = System.getProperty("java.class.path");
+            String path = System.getProperty("java.home") + separator + "bin"
+                + separator + "java";
+            ProcessBuilder processBuilder = new ProcessBuilder(path, "-cp", 
+                classpath, className);
+            processBuilder.redirectErrorStream(true);
+            Process process = processBuilder.start();
+            InputStream in = process.getInputStream();
+            int exit = process.waitFor();
+            //print the output of the process
+            System.out.println("===Process stream for " + Thread.currentThread() 
+                + "\n" + IOUtils.toString(in) + "===End of process stream.");
+            in.close();
+            // process should exit with zero code
+            return exit;
+          } catch (Exception e) {
+            e.printStackTrace();
+            // this will fail the test
+            return 1;
+          }
         }
-      }
-      System.out.println("* running " + procs.size() + "/" + NUM);
-    }
-    System.out.println("Verifying...");
-    store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
-    Result<String,WebPage> res = store.execute(store.newQuery());
-    int size = COUNT * NUM;
-    BitSet keys = new BitSet(size);
-    while (res.next()) {
-      String key = res.getKey();
-      WebPage p = res.get();
-      assertEquals(key, p.getTitle().toString());
-      int pos = Integer.parseInt(key);
-      assertTrue(pos < size && pos >= 0);
-      if (keys.get(pos)) {
-        fail("key " + key + " already set!");
-      }
-      keys.set(pos);
+      });
     }
-    if (size != keys.cardinality()) {
-      System.out.println("ERROR Missing keys:");
-      for (int i = 0; i < size; i++) {
-        if (keys.get(i)) continue;
-        System.out.println(" " + i);
-      }
-      fail("key count should be " + size + " but is " + keys.cardinality());
-    }
-  }
-  
-  private Process launch(int id, int start, int count) throws Exception {
-    //  Build exec child jmv args.
-    Vector<String> vargs = new Vector<String>(8);
-    File jvm =                                  // use same jvm as parent
-      new File(new File(System.getProperty("java.home"), "bin"), "java");
 
-    vargs.add(jvm.toString());
+    // submit them at once
+    List<Future<Integer>> results = pool.invokeAll(tasks);
 
-    // Add child (task) java-vm options.
-    // tmp dir
-    String prop = System.getProperty("java.io.tmpdir");
-    vargs.add("-Djava.io.tmpdir=" + prop);
-    // library path
-    prop = System.getProperty("java.library.path");
-    if (prop != null) {
-      vargs.add("-Djava.library.path=" + prop);      
+    // check results
+    for (Future<Integer> result : results) {
+      assertEquals(0, (int) result.get());
     }
-    // working dir
-    prop = System.getProperty("user.dir");
-    vargs.add("-Duser.dir=" + prop);    
-    // combat the stupid Xerces issue
-    vargs.add("-Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
-    // prepare classpath
-    String sep = System.getProperty("path.separator");
-    StringBuffer classPath = new StringBuffer();
-    // start with same classpath as parent process
-    classPath.append(System.getProperty("java.class.path"));
-    //classPath.append(sep);
-    // Add classpath.
-    vargs.add("-classpath");
-    vargs.add(classPath.toString());
     
-    // append class name and args
-    vargs.add(TestGoraStorage.class.getName());
-    vargs.add(String.valueOf(id));
-    vargs.add(String.valueOf(start));
-    vargs.add(String.valueOf(count));
-    ProcessBuilder builder = new ProcessBuilder(vargs);
-    return builder.start();
+    //stop db
+    server.stop();
   }
-  
+
   public static void main(String[] args) throws Exception {
-    if (args.length < 3) {
-      System.err.println("Usage: TestGoraStore <id> <startKey> <numRecords>");
-      System.exit(-1);
-    }
-    TestGoraStorage test = new TestGoraStorage();
-    test.init();
-    int id = Integer.parseInt(args[0]);
-    int start = Integer.parseInt(args[1]);
-    int count = Integer.parseInt(args[2]);
-    Worker w = test.new Worker(id, start, count, true);
-    w.run();
-    System.exit(0);
+    // entry point for the multiprocess test
+    System.out.println("Starting!");
+
+    Configuration localConf = CrawlTestUtil.createConfiguration();
+    localConf.set("storage.data.store.class", "org.apache.gora.sql.store.SqlStore");
+
+    //connect to file based sql db
+    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.driver","org.hsqldb.jdbcDriver");
+    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.url",
+        "jdbc:hsqldb:hsql://localhost/"+TestGoraStorage.class.getName());
+    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.user","sa");
+    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.password","");
+
+    DataStore<String, WebPage> store = StorageUtils.createWebStore(localConf,
+        String.class, WebPage.class);
+    readWrite("single_id", store);
+    System.out.println("Done.");
   }
 }
