Index: src/java/org/apache/nutch/ipc/Server.java
===================================================================
--- src/java/org/apache/nutch/ipc/Server.java	(revision 326658)
+++ src/java/org/apache/nutch/ipc/Server.java	(working copy)
@@ -95,7 +95,9 @@
       }
       try {
         socket.close();
-      } catch (IOException e) {}
+      } catch (IOException e) {
+        LOG.info(getName() + ": e=" + e);
+      }
       LOG.info(getName() + ": exiting");
     }
   }
@@ -166,9 +168,9 @@
 
   /** Handles queued calls . */
   private class Handler extends Thread {
-    public Handler() {
+    public Handler(int instanceNumber) {
       this.setDaemon(true);
-      this.setName("Server handler on " + port);
+      this.setName("Server handler "+ instanceNumber + " on " + port);
     }
 
     public void run() {
@@ -242,7 +244,7 @@
     listener.start();
     
     for (int i = 0; i < handlerCount; i++) {
-      Handler handler = new Handler();
+      Handler handler = new Handler(i);
       handler.start();
     }
   }
@@ -247,8 +249,9 @@
     }
   }
 
-  /** Stops the service.  No calls will be handled after this is called.  All
-   * threads will exit. */
+  /** Stops the service.  No new calls will be handled after this is called.  All
+   * subthreads will likely be finished after this returns.
+   */
   public synchronized void stop() {
     LOG.info("Stopping server on " + port);
     running = false;
@@ -253,14 +256,19 @@
     LOG.info("Stopping server on " + port);
     running = false;
     try {
-      Thread.sleep(timeout);                        // let all threads exit
+      Thread.sleep(timeout);     //  inexactly wait for pending requests to finish
     } catch (InterruptedException e) {}
-    notify();
+    notifyAll();
   }
 
-  /** Wait for the server to be stopped. */
+  /** Wait for the server to be stopped.
+   * Does not wait for all subthreads to finish.
+   *  See {@link #stop()}.
+   */
   public synchronized void join() throws InterruptedException {
-    wait();
+    while (running) {
+      wait();
+    }
   }
 
   /** Called for each call. */
Index: src/java/org/apache/nutch/ndfs/DataNode.java
===================================================================
--- src/java/org/apache/nutch/ndfs/DataNode.java	(revision 326658)
+++ src/java/org/apache/nutch/ndfs/DataNode.java	(working copy)
@@ -37,7 +37,7 @@
  **********************************************************/
 public class DataNode implements FSConstants, Runnable {
     public static final Logger LOG = LogFormatter.getLogger("org.apache.nutch.ndfs.DataNode");
-    //
+  //
     // REMIND - mjc - I might bring "maxgigs" back so user can place 
     // artificial  limit on space
     //private static final long GIGABYTE = 1024 * 1024 * 1024;
@@ -66,6 +66,8 @@
     Vector receivedBlockList = new Vector();
     int xmitsInProgress = 0;
     Daemon dataXceiveServer = null;
+    long blockReportInterval;
+    private long datanodeStartupPeriod;
 
     /**
      * Create given a configuration and a dataDir.
@@ -73,7 +75,14 @@
     public DataNode(NutchConf conf, String datadir) throws IOException {
         this(InetAddress.getLocalHost().getHostName(), 
              new File(datadir),
-             createSocketAddr(conf.get("fs.default.name", "local")));
+             createSocketAddr(conf.get("fs.default.name",
+                 "fs.default.name Not Set")));
+      long blockReportIntervalBasis =
+          conf.getLong("ndfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
+      this.blockReportInterval =
+          blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
+      datanodeStartupPeriod =
+          conf.getLong("ndfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);
     }
 
     /**
@@ -108,6 +117,7 @@
 
     /**
      * Shut down this instance of the datanode.
+     * Returns only after shutdown is complete.
      */
     void shutdown() {
         this.shouldRun = false;
@@ -126,8 +136,7 @@
         long lastHeartbeat = 0, lastBlockReport = 0;
         long sendStart = System.currentTimeMillis();
         int heartbeatsSent = 0;
-        long blockReportInterval =
-          BLOCKREPORT_INTERVAL - new Random().nextInt((int)(BLOCKREPORT_INTERVAL/10));
+        LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
 
         //
         // Now loop for a long time....
@@ -180,7 +189,7 @@
 		// to pass from the time of connection to the first block-transfer.
 		// Otherwise we transfer a lot of blocks unnecessarily.
 		//
-		if (now - sendStart > DATANODE_STARTUP_PERIOD) {
+		if (now - sendStart > datanodeStartupPeriod) {
 		    //
 		    // Check to see if there are any block-instructions from the
 		    // namenode that this datanode should perform.
@@ -647,11 +656,13 @@
                 offerService();
             } catch (Exception ex) {
                 LOG.info("Exception: " + ex);
-                LOG.info("Lost connection to namenode.  Retrying...");
+              if (shouldRun) {
+                LOG.info("Lost connection to namenode (or other error).  Retrying...");
                 try {
-                    Thread.sleep(5000);
+                  Thread.sleep(5000);
                 } catch (InterruptedException ie) {
                 }
+              }
             }
         }
     }
@@ -656,7 +667,9 @@
         }
     }
 
-    /**
+    /** Start datanode daemons.
+     * Start a datanode daemon for each comma separated data directory
+     * specified in property ndfs.data.dir
      */
     public static void run(NutchConf conf) throws IOException {
         String[] dataDirs = conf.getStrings("ndfs.data.dir");
@@ -661,18 +674,46 @@
     public static void run(NutchConf conf) throws IOException {
         String[] dataDirs = conf.getStrings("ndfs.data.dir");
         for (int i = 0; i < dataDirs.length; i++) {
-            String dataDir = dataDirs[i];
-            File data = new File(dataDir);
-            data.mkdirs();
-            if (!data.isDirectory()) {
-                LOG.warning("Can't start DataNode in non-directory: "+dataDir);
-                continue;
-            }
-            new Thread(new DataNode(conf, dataDir), "DataNode: "+dataDir).start();
+          DataNode dn = makeInstanceForDir(dataDirs[i], conf);
+          if (dn != null) {
+            Thread t = new Thread(dn, "DataNode: "+dataDirs[i]);
+            t.setDaemon(true);
+            t.start();
+          }
         }
     }
 
-    /**
+  /**
+   * Make an instance of DataNode after ensuring that given data directory
+   * (and parent directories, if necessary) can be created.
+   * @param dataDir where the new DataNode instance should keep its files.
+   * @param conf NutchConf instance to use.
+   * @return DataNode instance for given data dir and conf, or null if directory
+   * cannot be created.
+   * @throws IOException
+   */
+  static DataNode makeInstanceForDir(String dataDir, NutchConf conf) throws IOException {
+    DataNode dn = null;
+    File data = new File(dataDir);
+    data.mkdirs();
+    if (!data.isDirectory()) {
+      LOG.warning("Can't start DataNode in non-directory: "+dataDir);
+      return null;
+    } else {
+      dn = new DataNode(conf, dataDir);
+    }
+    return dn;
+  }
+
+  public String toString() {
+    return "DataNode{" +
+        "data=" + data +
+        ", localName='" + localName + "'" +
+        ", xmitsInProgress=" + xmitsInProgress +
+        "}";
+  }
+
+  /**
      */
     public static void main(String args[]) throws IOException {
         LogFormatter.setShowThreadIDs(true);
Index: src/java/org/apache/nutch/ndfs/FSDataset.java
===================================================================
--- src/java/org/apache/nutch/ndfs/FSDataset.java	(revision 326658)
+++ src/java/org/apache/nutch/ndfs/FSDataset.java	(working copy)
@@ -29,7 +29,8 @@
  ***************************************************/
 public class FSDataset implements FSConstants {
     static final double USABLE_DISK_PCT = 0.98;
-    /**
+
+  /**
      * A node type that can be built into a tree reflecting the
      * hierarchy of blocks on the local disk.
      */
@@ -166,6 +167,13 @@
             blkid = blkid >> ((15 - halfByteIndex) * 4);
             return (int) ((0x000000000000000F) & blkid);
         }
+
+        public String toString() {
+          return "FSDir{" +
+              "dir=" + dir +
+              ", children=" + (children == null ? null : Arrays.asList(children)) +
+              "}";
+        }
     }
 
     //////////////////////////////////////////////////////
@@ -411,4 +419,11 @@
         // REMIND - mjc - should cache this result for performance
         return new File(tmp, b.getBlockName());
     }
+
+    public String toString() {
+      return "FSDataset{" +
+        "dirpath='" + dirpath + "'" +
+        "}";
+    }
+
 }
Index: src/java/org/apache/nutch/ndfs/FSNamesystem.java
===================================================================
--- src/java/org/apache/nutch/ndfs/FSNamesystem.java	(revision 326658)
+++ src/java/org/apache/nutch/ndfs/FSNamesystem.java	(working copy)
@@ -53,6 +53,9 @@
     // Whether we should use disk-availability info when determining target
     final static boolean USE_AVAILABILITY = NutchConf.get().getBoolean("ndfs.availability.allocation", false);
 
+    private boolean allowSameHostTargets =
+        NutchConf.get().getBoolean("test.ndfs.same.host.targets.allowed", false);
+
     //
     // Stores the correct file name hierarchy
     //
@@ -127,8 +130,8 @@
     // Store set of Blocks that need to be replicated 1 or more times.
     // We also store pending replication-orders.
     //
-    TreeSet neededReplications = new TreeSet();
-    TreeSet pendingReplications = new TreeSet();
+    private TreeSet neededReplications = new TreeSet();
+    private TreeSet pendingReplications = new TreeSet();
 
     //
     // Used for handling lock-leases
@@ -133,8 +136,8 @@
     //
     // Used for handling lock-leases
     //
-    TreeMap leases = new TreeMap();
-    TreeSet sortedLeases = new TreeSet();
+    private TreeMap leases = new TreeMap();
+    private TreeSet sortedLeases = new TreeSet();
 
     //
     // Threaded object that checks to see if we have been
@@ -159,17 +162,23 @@
         this.systemStart = System.currentTimeMillis();
     }
 
-    /**
+    /** Close down this filesystem manager.
+     * Causes heartbeat and lease daemons to stop; waits briefly for
+     * them to finish, but a short timeout returns control back to caller.
      */
     public void close() {
+      synchronized (this) {
         fsRunning = false;
+      }
         try {
-            hbthread.join();
+            hbthread.join(3000);
         } catch (InterruptedException ie) {
-        }
-        try {
-            lmthread.join();
-        } catch (InterruptedException ie) {
+        } finally {
+          // using finally to ensure we also wait for lease daemon
+          try {
+            lmthread.join(3000);
+          } catch (InterruptedException ie) {
+          }
         }
     }
 
@@ -218,6 +227,9 @@
      * of machines.  The first on this list should be where the client 
      * writes data.  Subsequent items in the list must be provided in
      * the connection to the first datanode.
+     * @return Return an array that consists of the block, plus a set
+     * of machines, or null if src is invalid for creation (based on
+     * {@link FSDirectory#isValidToCreate(UTF8)}.
      */
     public synchronized Object[] startFile(UTF8 src, UTF8 holder, boolean overwrite) {
         Object results[] = null;
@@ -234,7 +246,8 @@
                 // Get the array of replication targets 
                 DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION, null);
                 if (targets.length < MIN_REPLICATION) {
-                    LOG.info("Target-length is " + targets.length + ", below MIN_REPLICATION (" + MIN_REPLICATION + ")");
+                    LOG.warning("Target-length is " + targets.length +
+                        ", below MIN_REPLICATION (" + MIN_REPLICATION + ")");
                     return null;
                 }
 
@@ -257,9 +270,11 @@
                 // Create next block
                 results[0] = allocateBlock(src);
                 results[1] = targets;
+            } else { // ! fileValid
+              LOG.warning("Cannot start file because it is invalid. src=" + src);
             }
         } else {
-            LOG.info("Cannot start file because pendingCreates is non-null");
+            LOG.warning("Cannot start file because pendingCreates is non-null. src=" + src);
         }
         return results;
     }
@@ -1172,10 +1187,13 @@
         }
     }
 
-
     /**
-     * Get a certain number of targets, if possible.  If not,
-     * return as many as we can.
+     * Get a certain number of targets, if possible.
+     * If not, return as many as we can.
+     * @param desiredReplicates number of duplicates wanted.
+     * @param forbiddenNodes of DatanodeInfo instances that should not be
+     * considered targets.
+     * @return array of DatanodeInfo instances uses as targets.
      */
     DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes) {
         TreeSet alreadyChosen = new TreeSet();
@@ -1187,7 +1205,7 @@
                 targets.add(target);
                 alreadyChosen.add(target);
             } else {
-                break;
+                break; // calling chooseTarget again won't help
             }
         }
         return (DatanodeInfo[]) targets.toArray(new DatanodeInfo[targets.size()]);
@@ -1200,6 +1218,10 @@
      * Right now it chooses randomly from available boxes.  In future could 
      * choose according to capacity and load-balancing needs (or even 
      * network-topology, to avoid inter-switch traffic).
+     * @param forbidden1 DatanodeInfo targets not allowed, null allowed.
+     * @param forbidden2 DatanodeInfo targets not allowed, null allowed.
+     * @return DatanodeInfo instance to use or null if something went wrong
+     * (a log message is emitted if null is returned).
      */
     DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2) {
         //
@@ -1207,10 +1229,11 @@
         //
         int totalMachines = datanodeMap.size();
         if (totalMachines == 0) {
-            LOG.info("While choosing target, totalMachines is " + totalMachines);
+            LOG.warning("While choosing target, totalMachines is " + totalMachines);
             return null;
         }
 
+        TreeSet forbiddenMachines = new TreeSet();
         //
         // In addition to already-chosen datanode/port pairs, we want to avoid
         // already-chosen machinenames.  (There can be multiple datanodes per
@@ -1217,17 +1240,29 @@
         // machine.)  We might relax this requirement in the future, though. (Maybe
         // so that at least one replicate is off the machine.)
         //
-        TreeSet forbiddenMachines = new TreeSet();
+        UTF8 hostOrHostAndPort = null;
         if (forbidden1 != null) {
+          // add name [and host] of all elements in forbidden1 to forbiddenMachines
             for (Iterator it = forbidden1.iterator(); it.hasNext(); ) {
                 DatanodeInfo cur = (DatanodeInfo) it.next();
-                forbiddenMachines.add(cur.getName());
+                if (allowSameHostTargets) {
+                  hostOrHostAndPort = cur.getName(); // forbid same host:port
+                } else {
+                  hostOrHostAndPort = cur.getHost(); // forbid same host
+                }
+                forbiddenMachines.add(hostOrHostAndPort);
             }
         }
         if (forbidden2 != null) {
+          // add name [and host] of all elements in forbidden2 to forbiddenMachines
             for (Iterator it = forbidden2.iterator(); it.hasNext(); ) {
                 DatanodeInfo cur = (DatanodeInfo) it.next();
-                forbiddenMachines.add(cur.getName());
+              if (allowSameHostTargets) {
+                hostOrHostAndPort = cur.getName(); // forbid same host:port
+              } else {
+                hostOrHostAndPort = cur.getHost(); // forbid same host
+              }
+              forbiddenMachines.add(hostOrHostAndPort);
             }
         }
 
@@ -1238,9 +1273,12 @@
         Vector targetList = new Vector();
         for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
             DatanodeInfo node = (DatanodeInfo) it.next();
-            if ((forbidden1 == null || ! forbidden1.contains(node)) &&
-                (forbidden2 == null || ! forbidden2.contains(node)) &&
-                (! forbiddenMachines.contains(node.getName()))) {
+            if (allowSameHostTargets) {
+                hostOrHostAndPort = node.getName(); // match host:port
+            } else {
+                hostOrHostAndPort = node.getHost(); // match host
+            }
+            if (! forbiddenMachines.contains(hostOrHostAndPort)) {
                 targetList.add(node);
                 totalRemaining += node.getRemaining();
             }
@@ -1250,6 +1288,11 @@
         // Now pick one
         //
         if (targetList.size() == 0) {
+            LOG.warning("Zero targets found, forbidden1.size=" +
+                ( forbidden1 != null ? forbidden1.size() : 0 ) +
+                " allowSameHostTargets=" + allowSameHostTargets +
+                " forbidden2.size()=" +
+                ( forbidden2 != null ? forbidden2.size() : 0 ));
             return null;
         } else if (! USE_AVAILABILITY) {
             int target = r.nextInt(targetList.size());
@@ -1266,7 +1309,7 @@
                 }
             }
 
-            LOG.info("Impossible state.  When trying to choose target node, could not find any.  This may indicate that datanode capacities are being updated during datanode selection.  Anyway, now returning an arbitrary target to recover...");
+            LOG.warning("Impossible state.  When trying to choose target node, could not find any.  This may indicate that datanode capacities are being updated during datanode selection.  Anyway, now returning an arbitrary target to recover...");
             return (DatanodeInfo) targetList.elementAt(r.nextInt(targetList.size()));
         }
     }
Index: src/java/org/apache/nutch/ndfs/NameNode.java
===================================================================
--- src/java/org/apache/nutch/ndfs/NameNode.java	(revision 326658)
+++ src/java/org/apache/nutch/ndfs/NameNode.java	(working copy)
@@ -38,8 +38,12 @@
 public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
     public static final Logger LOG = LogFormatter.getLogger("org.apache.nutch.ndfs.NameNode");
 
-    FSNamesystem namesystem;
-    Server server;
+    private FSNamesystem namesystem;
+    private Server server;
+    private int handlerCount = 2;
+
+    /** only used for testing purposes  */
+    private boolean stopRequested = false;
 
     /**
      * Create a NameNode at the default location
@@ -52,11 +56,13 @@
     }
 
     /**
-     * Create a NameNode at the specified location
+     * Create a NameNode at the specified location and start it.
      */
     public NameNode(File dir, int port) throws IOException {
         this.namesystem = new FSNamesystem(dir);
-        this.server = RPC.getServer(this, port, 10, false);
+        this.handlerCount =
+            NutchConf.get().getInt("ndfs.namenode.handler.count", 10);
+        this.server = RPC.getServer(this, port, handlerCount, false);
         this.server.start();
     }
 
@@ -61,9 +67,10 @@
     }
 
     /**
-     * Run forever
+     * Wait for service to finish.
+     * (Normally, it runs forever.)
      */
-    public void offerService() {
+    public void join() {
         try {
             this.server.join();
         } catch (InterruptedException ie) {
@@ -70,6 +77,19 @@
         }
     }
 
+    /**
+     * Stop all NameNode threads and wait for all to finish.
+     * Package-only access since this is intended for JUnit testing.
+    */
+    void stop() {
+      if (! stopRequested) {
+        stopRequested = true;
+        namesystem.close();
+        server.stop();
+        //this.join();
+      }
+    }
+
     /////////////////////////////////////////////////////
     // ClientProtocol
     /////////////////////////////////////////////////////
@@ -78,7 +98,7 @@
     public LocatedBlock[] open(String src) throws IOException {
         Object openResults[] = namesystem.open(new UTF8(src));
         if (openResults == null) {
-            throw new IOException("Cannot find filename " + src);
+            throw new IOException("Cannot open filename " + src);
         } else {
             Block blocks[] = (Block[]) openResults[0];
             DatanodeInfo sets[][] = (DatanodeInfo[][]) openResults[1];
@@ -95,7 +115,7 @@
     public LocatedBlock create(String src, String clientName, boolean overwrite) throws IOException {
         Object results[] = namesystem.startFile(new UTF8(src), new UTF8(clientName), overwrite);
         if (results == null) {
-            throw new IOException("Cannot create file " + src);
+            throw new IOException("Cannot create file " + src + " on client " + clientName);
         } else {
             Block b = (Block) results[0];
             DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
@@ -324,6 +344,6 @@
      */
     public static void main(String argv[]) throws IOException, InterruptedException {
         NameNode namenode = new NameNode();
-        namenode.offerService();
+        namenode.join();
     }
 }
