Index: src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java	(revision 475794)
+++ src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java	(working copy)
@@ -17,6 +17,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.*;
+import java.util.Vector;
 
 import org.apache.hadoop.ipc.VersionedProtocol;
 
@@ -74,4 +75,18 @@
      * jobs.
      */
     public JobStatus[] jobsToComplete() throws IOException;
+
+    /**
+     * @return a vector with running jobs
+     */
+    public Vector runningJobs();
+    /**
+     * @return a vector with completed jobs
+     */
+    public Vector completedJobs();
+    /**
+     * @return a vector with failed jobs
+     */
+    public Vector failedJobs();
+
 }
Index: src/java/org/apache/hadoop/mapred/JobInProgress.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobInProgress.java	(revision 475794)
+++ src/java/org/apache/hadoop/mapred/JobInProgress.java	(working copy)
@@ -31,7 +31,7 @@
 // and its latest JobStatus, plus a set of tables for 
 // doing bookkeeping of its Tasks.
 ///////////////////////////////////////////////////////
-class JobInProgress {
+public class JobInProgress {
     private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobInProgress");
 
     JobProfile profile;
Index: src/java/org/apache/hadoop/mapred/LocalJobRunner.java
===================================================================
--- src/java/org/apache/hadoop/mapred/LocalJobRunner.java	(revision 475794)
+++ src/java/org/apache/hadoop/mapred/LocalJobRunner.java	(working copy)
@@ -26,7 +26,7 @@
 import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
-class LocalJobRunner implements JobSubmissionProtocol {
+public class LocalJobRunner implements JobSubmissionProtocol {
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.mapred.LocalJobRunner");
 
@@ -36,13 +36,18 @@
   private int map_tasks = 0;
   private int reduce_tasks = 0;
 
+  private static LocalJobRunner fInstance;
+  private String localMachine;
+  private long startTime;
+
+
   private JobTrackerMetrics myMetrics = null;
 
   public long getProtocolVersion(String protocol, long clientVersion) {
     return JobSubmissionProtocol.versionID;
   }
   
-  private class Job extends Thread
+  public class Job extends Thread
     implements TaskUmbilicalProtocol {
     private String file;
     private String id;
@@ -56,6 +61,17 @@
     private Path localFile;
     private FileSystem localFs;
 
+    private long startTime = 0;
+    private long finishTime = 0;
+    int mapDone = 0;
+    int reduceDone = 0;
+    private String mapTaskId="";
+    private String mapStateString="";
+    private String reduceTaskId="";
+    private String reduceStateString="";
+    private StringBuffer mapDiagnostic;
+    private StringBuffer reduceDiagnostic;
+
     public long getProtocolVersion(String protocol, long clientVersion) {
       return TaskUmbilicalProtocol.versionID;
     }
@@ -77,6 +93,10 @@
 
       jobs.put(id, this);
 
+      this.startTime = System.currentTimeMillis();
+      this.mapDiagnostic = new StringBuffer();
+      this.reduceDiagnostic = new StringBuffer();
+
       this.start();
     }
 
@@ -102,7 +122,8 @@
           map.setConf(localConf);
           map_tasks += 1;
           myMetrics.launchMap();
-          map.run(localConf, this);
+          map.run(localConf, this); 
+          mapDone++;
           myMetrics.completeMap();
           map_tasks -= 1;
         }
@@ -128,12 +149,14 @@
           reduce_tasks += 1;
           myMetrics.launchReduce();
           reduce.run(localConf, this);
-          myMetrics.completeReduce();
+	  this.reduceDone++;
+          myMetrics.completeReduce();	
           reduce_tasks -= 1;
         }
         this.mapoutputFile.removeAll(reduceId);
         
         this.status.setRunState(JobStatus.SUCCEEDED);
+	finishTime = System.currentTimeMillis();
 
       } catch (Throwable t) {
         this.status.setRunState(JobStatus.FAILED);
@@ -163,15 +186,27 @@
       if (taskIndex >= 0) {                       // mapping
         float numTasks = mapIds.size();
         status.setMapProgress(taskIndex/numTasks + progress/numTasks);
+        this.mapTaskId = taskId;
+        this.mapStateString = state;
+
       } else {
         status.setReduceProgress(progress);
+        this.reduceTaskId = taskId;
+        this.reduceStateString = state;
+
       }
       
       // ignore phase
     }
 
     public void reportDiagnosticInfo(String taskid, String trace) {
-      // Ignore for now
+      float taskIndex = mapIds.indexOf(taskid);
+      if (taskIndex >= 0) {
+        this.mapDiagnostic.append(trace);
+      } else {
+        this.reduceDiagnostic.append(trace);
+      }
+
     }
 
     public boolean ping(String taskid) throws IOException {
@@ -191,13 +226,55 @@
       LOG.fatal("FSError: "+ message);
     }
 
+    public String getJobId() {
+      return this.id;
+    }
+
+    public JobStatus getStatus() {
+      return this.status;
+    }
+
+    public long getStartTime() {
+      return this.startTime;
+    }
+
+    public long getFinishTime(){
+      return this.finishTime;
+    }
+
+    public int getDesiredMaps() {
+      return this.mapIds.size();
+    }
+
+    public int getDesiredReduces() {
+      return 1;
+    }
+
+    public int getFinishedMaps() {
+      return this.mapDone;
+    }
+
+    public int getFinishedReduces() {
+      return this.reduceDone;
+    }
+
+
   }
 
+  public static LocalJobRunner getInstance(Configuration configuration) throws IOException {
+    if(fInstance == null) {
+      fInstance = new LocalJobRunner(configuration);
+    }
+    return fInstance;
+  }
+
   public LocalJobRunner(Configuration conf) throws IOException {
     this.fs = FileSystem.get(conf);
     this.conf = conf;
     myMetrics = new JobTrackerMetrics();
-  }
+    this.localMachine = "local";
+    this.startTime = System.currentTimeMillis();
+}
 
   // JobSubmissionProtocol methods
 
@@ -207,6 +284,10 @@
 
   public void killJob(String id) {
     ((Thread)jobs.get(id)).stop();
+    Job job = (Job) jobs.get(id);
+    job.finishTime = System.currentTimeMillis();
+    ((Thread) job).stop();
+
   }
 
   public JobProfile getJobProfile(String id) {
@@ -215,10 +296,26 @@
   }
 
   public TaskReport[] getMapTaskReports(String id) {
-    return new TaskReport[0];
+    float progress = getJobStatus(id).mapProgress();
+    StringBuffer buffer = getJob(id).mapDiagnostic;
+    String stateString = getJob(id).mapStateString;
+    String taskId = getJob(id).mapTaskId;
+    long startTime = getJob(id).startTime;
+    long finishTime = 0; //cannot obtain 
+    return new TaskReport[] { new TaskReport(
+        taskId, progress, stateString, new String[] { buffer.toString() }, startTime, finishTime) };
+
   }
   public TaskReport[] getReduceTaskReports(String id) {
-    return new TaskReport[0];
+    float progress = getJobStatus(id).reduceProgress();
+    StringBuffer buffer = getJob(id).reduceDiagnostic;
+    String stateString = getJob(id).reduceStateString;
+    String taskId = getJob(id).reduceTaskId;
+    long startTime = getJob(id).startTime;
+    long finishTime = 0; //cannot obtain
+    return new TaskReport[] { new TaskReport(
+        taskId, progress, stateString, new String[] { buffer.toString() }, startTime, finishTime) };
+
   }
 
   public JobStatus getJobStatus(String id) {
@@ -235,4 +332,54 @@
   }
 
   public JobStatus[] jobsToComplete() {return null;}
+
+  public Vector runningJobs() {
+    return getJobs(JobStatus.RUNNING);
+  }
+
+  public Vector completedJobs() {
+    return getJobs(JobStatus.SUCCEEDED);
+  }
+
+  public Vector failedJobs() {
+    return getJobs(JobStatus.FAILED);
+ }
+
+  /**
+   * @param status
+   * @return a job by status
+   */
+  public Vector getJobs(int status) {
+    Vector v = new Vector();
+    for (Iterator it = this.jobs.values().iterator(); it.hasNext();) {
+      Job job = (Job) it.next();
+     if (job.getStatus().getRunState() == status) {
+       v.add(job);
+      }
+    }
+    return v;
+  }
+
+  /**
+   * @param jobId
+   * @return the job by id
+   */
+  public Job getJob(String jobId) {
+    return (Job) this.jobs.get(jobId);
+  }
+
+  /**
+   * @return the name of the machine
+   */
+   public String getJobTrackerMachine() {
+    return this.localMachine;
+  }
+
+  /**
+   * @return the start time
+   */
+  public long getStartTime() {
+    return this.startTime;
+  }
+
 }
Index: src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java	(revision 475794)
+++ src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java	(working copy)
@@ -28,7 +28,7 @@
  *
  * @author Mike Cafarella
  **************************************************/
-class TaskTrackerStatus implements Writable {
+public class TaskTrackerStatus implements Writable {
 
     static {                                        // register a ctor
       WritableFactories.setFactory
Index: src/java/org/apache/hadoop/mapred/JobProfile.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobProfile.java	(revision 475794)
+++ src/java/org/apache/hadoop/mapred/JobProfile.java	(working copy)
@@ -26,7 +26,7 @@
  *
  * @author Mike Cafarella
  **************************************************/
-class JobProfile implements Writable {
+public class JobProfile implements Writable {
 
     static {                                      // register a ctor
       WritableFactories.setFactory
Index: src/java/org/apache/hadoop/dfs/DFSClient.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DFSClient.java	(revision 475794)
+++ src/java/org/apache/hadoop/dfs/DFSClient.java	(working copy)
@@ -39,7 +39,7 @@
  *
  * @author Mike Cafarella, Tessa MacDuff
  ********************************************************/
-class DFSClient implements FSConstants {
+public class DFSClient implements FSConstants {
     public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.DFSClient");
     static int MAX_BLOCK_ACQUIRE_FAILURES = 3;
     private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
Index: src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java	(revision 475794)
+++ src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java	(working copy)
@@ -25,7 +25,7 @@
  * @author Mike Cafarella
  * @author Konstantin Shvachko
  **************************************************/
-class DatanodeDescriptor extends DatanodeInfo {
+public class DatanodeDescriptor extends DatanodeInfo {
 
   private volatile TreeSet blocks = new TreeSet();
 
@@ -40,7 +40,7 @@
   /**
    * Create DatanodeDescriptor.
    */
-  DatanodeDescriptor( DatanodeID nodeID, 
+  public DatanodeDescriptor( DatanodeID nodeID, 
                       long capacity, 
                       long remaining,
                       int xceiverCount ) {
Index: src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DistributedFileSystem.java	(revision 475794)
+++ src/java/org/apache/hadoop/dfs/DistributedFileSystem.java	(working copy)
@@ -212,7 +212,7 @@
         return "DFS[" + dfs + "]";
     }
 
-    DFSClient getClient() {
+    public DFSClient getClient() {
         return dfs;
     }
     
