Index: scripts/config/transmitterconfig.py.dist
===================================================================
--- scripts/config/transmitterconfig.py.dist	(revision 0)
+++ scripts/config/transmitterconfig.py.dist	(revision 0)
@@ -0,0 +1,61 @@
+import logging
+import socorro.lib.ConfigurationManager as cm
+
+# Storage constants
+
+try:
+  from config.commonconfig import storageRoot
+  from config.commonconfig import deferredStorageRoot
+  from config.commonconfig import jsonFileSuffix
+  from config.commonconfig import dumpFileSuffix
+except ImportError:
+  from commonconfig import storageRoot
+  from commonconfig import deferredStorageRoot
+  from commonconfig import jsonFileSuffix
+  from commonconfig import dumpFileSuffix
+  
+hbaseHost = cm.Option()
+hbaseHost.doc = 'Hostname for hbase hadoop cluster. May be a VIP or load balancer'
+hbaseHost.default = 'thrift-socorro-hadoop'
+
+hbasePort = cm.Option()
+hbasePort.doc = 'hbase port number'
+hbasePort.default = 9090
+
+rolledJournalRoot = cm.Option()
+rolledJournalRoot.doc = 'The root of the filesystem which contains collector journal files after they have been rolled. *must not be NFS*'
+rolledJournalRoot.default = '/home/aking/breakpad/socorro-hdfs/test/'
+
+transmitterLoopTime = cm.Option()
+transmitterLoopTime.doc = 'the time to wait between attempts to get jobs (HHH:MM:SS)'
+transmitterLoopTime.default = '0:00:06'
+transmitterLoopTime.fromStringConverter = cm.timeDeltaConverter
+
+logSysLogAddress = cm.Option()
+logSysLogAddress.doc = "Hostname and port for SysLog messages. Example: (localhost', 514') or '/dev/log'"
+logSysLogAddress.default = address='/dev/log'
+
+logSysLogFacility = cm.Option()
+logSysLogFacility.doc = 'Facility for SysLog - defaults to LOG_USER'
+logSysLogFacility.default = logging.handlers.SysLogHandler.LOG_USER
+
+logFilePathname = cm.Option()
+logFilePathname.doc = 'full pathname for the log file'
+logFilePathname.default = '/home/aking/modpython-collector-hdfs/collector.log'
+
+logFileMaximumSize = cm.Option()
+logFileMaximumSize.doc = 'maximum size in bytes of the log file'
+logFileMaximumSize.default = 1000000
+
+logFileMaximumBackupHistory = cm.Option()
+logFileMaximumBackupHistory.doc = 'maximum number of log files to keep'
+logFileMaximumBackupHistory.default = 50
+
+logFileLineFormatString = cm.Option()
+logFileLineFormatString.doc = 'python logging system format for log file entries'
+logFileLineFormatString.default = '%(asctime)s %(levelname)s -  PID:%(process)d %(message)s'
+
+logFileErrorLoggingLevel = cm.Option()
+logFileErrorLoggingLevel.doc = 'logging level for the log file (10 - DEBUG, 20 - INFO, 30 - WARNING, 40 - ERROR, 50 - CRITICAL)'
+logFileErrorLoggingLevel.default = 10
+
Index: socorro/unittest/transmitter/testTransmitter.py
===================================================================
--- socorro/unittest/transmitter/testTransmitter.py	(revision 0)
+++ socorro/unittest/transmitter/testTransmitter.py	(revision 0)
@@ -0,0 +1,70 @@
+
+import logging
+import logging.handlers
+import os
+
+import socorro.transmitter.transmitter as transmitter
+
+class TestTransmitter(object):
+  def setUp(self):
+    #TODO setup and tear down hbase-journal log files
+    self.logname  = '/tmp/unittest-journal'
+    self.journal = self.setUpALog()
+    self.rolledJournalFilenames = ['/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-16', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-19', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-30', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-20', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-31', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-18', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-28', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-24', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-26', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-33', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-27', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-21', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-35', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-29', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-34', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-17', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-25', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-22', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-23', '/home/aking/breakpad/socorro-hdfs/hbase-journal.log.2010-02-04_18-32']
+
+    
+  def tearDown(self):
+    try:
+      self.tearDownALog()
+    except OSError:
+      pass
+    
+  def testSearchForNewJournalFiles(self):
+    t = transmitter.Transmitter()
+    actual = t.searchForNewJournalFiles('/home/aking/breakpad/socorro-hdfs/')
+    expected = self.rolledJournalFilenames
+    assert actual == expected, 'We should get rolled, but not locking or locked files'
+
+  def setUpALog(self):
+    # pretend we are the collector
+
+
+    journal = logging.getLogger('UnitTestJournal')
+    journal.setLevel(logging.DEBUG)
+
+    handler = logging.handlers.RotatingFileHandler(self.logname)
+    handler.setFormatter(logging.Formatter('%(created)f %(message)s'))
+    journal.addHandler(handler)
+    return journal
+
+  def tearDownALog(self):
+    os.remove(self.logname)
+    
+  def testLoadJournalEntries(self):
+    t = transmitter.Transmitter()
+
+    for x in range(100):
+      self.journal.info(str(x) + '3732fa-7b30-4de3-aebd-5988a21002' + str(x))
+    
+    entries = t.loadJournalEntries(self.logname)
+
+    assert len(entries) == 100, "Got some entries"
+    assert '13732fa-7b30-4de3-aebd-5988a210021' == entries[1].split(' ')[1], 'And the second UUID is as we expected'
+    
+  def testCheckpointJournal(self):
+    t = transmitter.Transmitter()
+    t.refreshPidLockFile(self.logname)
+    entries = ['1265600113.848139 03732fa-7b30-4de3-aebd-5988a210020 OK',
+               '1265600113.848380 13732fa-7b30-4de3-aebd-5988a210021 OK',
+               '1265600113.848543 23732fa-7b30-4de3-aebd-5988a210022 OK']
+
+    t.checkpointJournal(self.logname, entries)
+
+    actualEntries = t.loadJournalEntries(self.logname)
+
+    assert len(entries) == len(actualEntries), 'We get back the same number of entries'
+
+    for x in zip(entries, actualEntries):
+      expected, actual = x
+      assert expected.strip() == actual, 'Checkpointing a log file should save and we can read it back in'
+    #TODO we should test that tranmitter adds whitespace
Index: socorro/transmitter/transmitter.py
===================================================================
--- socorro/transmitter/transmitter.py	(revision 0)
+++ socorro/transmitter/transmitter.py	(revision 0)
@@ -0,0 +1,237 @@
+import errno
+import fcntl
+from glob import glob
+import logging
+import os
+import os.path
+import tempfile
+import time
+
+import simplejson
+import hbase.ttypes
+
+import hbaseClient
+import socorro.lib.util
+
+logger = logging.getLogger("transmitter")
+
+class Transmitter(object):
+  """ Run via scripts/startTransmitter.py
+
+      Copies raw crash dump and metadata json files over to HBase for storage
+      on the HDFS. The collector creates a journal of crash uuids which
+      it as accepted and written to disk. This journal is rolled periodically
+      and transmitters look for journal files which are not actively in use
+      by a collector (rolled) or another transmitter (locked).
+
+      Rolled journals MUST NOT BE stored on NFS. These files are small and will
+      be removed automatically by a startTransmitter.py process.
+
+      Once an available rolled journal is found the transmitter works through
+      the file transmitting individual crashes to HBase and updating the journal
+      with an OK or FAIL code.
+
+      journals are removed from the filesystem once 100% transmitted and this
+      cycle begins again.
+
+      Transmitter's are multi-processes safe, but not multi-thread safe.
+      Processes may be killed at any time and restarted at any time.
+  """
+  def __init__(self, config):
+    super(Transmitter, self).__init__()
+    assert "hbaseHost" in config, "hbaseHost is missing from the configuration"
+    assert "hbasePort" in config, "hbasePort is missing from the configuration"
+    assert "transmitterLoopTime" in config, "transmitterLoopTime is missing from the configuration"
+
+    self.loopTime = config.transmitterLoopTime.seconds
+    logger.info("Establishing Thrift Connection to %s on port %s", config.hbaseHost, config.hbasePort)
+    while True:
+      try:
+        self.hbaseConnection = hbaseClient.HBaseConnectionForCrashReports( config.hbaseHost, config.hbasePort)
+        break
+      except:
+        logger.warn('FAILED to open connection to hbase. Sleeping and will retry')
+        socorro.lib.util.reportExceptionAndContinue(logger)
+        time.sleep(30)
+    # TODO - should we FAIL hard and sys.exit(1) instead of loop sleep pattern?
+    logger.info("Connection established")
+    self.config = config
+    
+  def main(self):
+    while (True):
+      newJournalFiles = self.searchForNewJournalFiles(self.config.rolledJournalRoot)
+      for filename in newJournalFiles:
+        if self.obtainLockOnJournal(filename):
+          self.transmitEachLine(filename)
+      self.waitForNewFiles()
+
+  def searchForNewJournalFiles(self, path):
+    """ hbase journal files are a log of successfully collected crashes uuids.
+        The are stored in a flat directory with a time extension. We are only
+        interested in the 'rolled' files.
+
+        Example: /tmp/hbase-journal.log                        (in use)
+                 /tmp/hbase-journal.log.2010-02-04_18-23       (rolled)
+                 /tmp/hbase-journal.log.2010-02-04_18-23.lock  (locked)
+
+        Config Values that affect this are:
+          hbaseJournalFilename
+    """
+    return glob(path + 'hbase-journal.log.[0-9]*-[0-9][0-9]')
+
+  def obtainLockOnJournal(self, filename):
+    """ Attempt to lock a journal in a multi-process safe way.
+        We will end up creating files that signals other processes
+        that this journal is being processed.
+
+        returns False if someone else beats us to the punch
+
+        To free a lock, terminate the transmitter processes.
+
+        Old stale lock files maybe freely deleted. Their apperance should be
+        rare and don't block processing as locking is a system call and not the
+        presence of the lock file. Do not delete a lock file which is in use,
+        as this could result in race conditions.
+
+        Not NFS safe
+    """
+    # Our list of journal files could be out of date
+    if not os.path.exists(filename):
+      logger.info("Ha, ha, Skipping missing journal %s", filename)
+      return False
+    lockFilename = self.lockFilename(filename)
+    self.lockFile = open(lockFilename, 'w')
+    try:
+      fcntl.flock(self.lockFile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
+      logger.info("LOCK SUCCESS %s", filename)
+      return True
+    except IOError, e:
+      if errno.EACCES == e.errno or errno.EAGAIN == e.errno:
+        logger.info("Unable to obtain a lock on %s skipping", filename)
+        return False
+      else:
+        socorro.lib.util.reportExceptionAndContinue(logger)
+        raise e
+
+
+  def lockFilename(self, filename):
+    return filename + '.lock'
+
+  def transmitEachLine(self, filename):
+    """ We read the journal's entire contents into memory and
+        transmit crashes to hbase. As we mark uuid's as finished
+        we update the journal. Finally this rolled journal log
+        is deleted and lock file is removed.
+    """
+    try:
+      entries = self.loadJournalEntries(filename)
+      for i in range(len(entries)):
+        try:
+          if self.parseAndProcessEntry(entries, i):            
+            self.checkpointJournal(filename, entries)
+        except Exception, e:
+          socorro.lib.util.reportExceptionAndContinue(logger)
+      #remove rolled journal log and our lock
+      if os.path.exists(filename):
+        logger.info("Finished processing journal, removing %s", filename)
+        os.remove(filename)
+    finally:
+      logger.info("Giving up lock on journal, removing %s", self.lockFile.name)
+      self.lockFile.close()
+      os.remove(self.lockFile.name)
+      
+  def parseAndProcessEntry(self, entries, i):
+    """ Returns True is the entry was processed (transmitted or failure)
+                False if the entry was skipped
+    """
+    parts = entries[i].split(' ')
+    if len(parts) == 3 and parts[2] == 'OK':
+      logging.info("Skipping already transferred entry %s", parts[1])
+    elif len(parts) == 2:
+      # write to hbase including retry logic
+      if self.transmit(parts[1]):
+        entries[i] = entries[i] + " OK"
+      else:
+        entries[i] = entries[i] + " FAIL"
+      # Either OK or FAIL, we still want to checkpoint the journal
+      return True
+    elif len(parts) == 3 and parts[2] == 'FAIL':
+      logger.info('Skipping previously failed entry %s', entries[1])
+    else:
+      logger.error("Invalid journal entr[_%s]", entries[i])
+    return False
+      
+  def loadJournalEntries(self, filename):
+    try:
+      if os.path.exists(filename):
+        journal = open(filename, 'r')
+        data = journal.read()
+        journal.close()
+        return data.splitlines()
+    except Exception:
+      socorro.lib.util.reportExceptionAndContinue(logger)
+    return[]
+  
+  def transmit(self, entry):
+    """ Attempt to transfer a crash (dump and json) over
+        into the hbase system. If there is a network error
+        we will retry and backoff in an infinite loop.
+
+        Once successfully transmitted, we return True
+
+        If there is another type of error, then we will
+        log it and return False. This is a source of
+        lost transmissions.
+    """
+    dump     = entry + '.dump'
+    metadump = entry + '.json'
+    if not os.path.exists(dump):
+      logger.error("Binary dump file %s.dump does not exist, unable to transmit from journal", entry)
+      return False
+
+    if not os.path.exists(metadump):
+      logger.error("JSON metadata file %s.json does not exist, unable to transmit from journal", entry)
+      return False
+    
+    uuid = os.path.basename(entry)
+    logger.info('Transmitting %s to %s', uuid, self.config.hbaseHost)
+    throttle = 1
+    while True:
+      try:
+        self.hbaseConnection.create_ooid_from_file(uuid, metadump, dump)
+        return True
+      except IOError, hbase.ttypes.IOError:
+        # Backoff calls to HBase from 300 milliseconds, 3.3 seconds, etc to 30 seconds
+        throttle += 10
+        if throttle > 100:
+          logger.critical('Failed 10 requests to transmit crash to hbase, restarting loop')
+          throttle = 1
+        else:
+          logger.error('Failed to transmit crash to hbase, backing off')
+        socorro.lib.util.reportExceptionAndContinue(logger)
+        throttleSeconds = throttle * 0.3
+        time.sleep(throttleSeconds)
+      except:
+        logger.error("Unknown error, Failed to transmit %s", uuid)
+        socorro.lib.util.reportExceptionAndContinue(logger)
+        return False
+
+  def checkpointJournal(self, filename, entries):
+    """ Calling this method updates the last modified time of the
+        journal log file. This is useful, when combined with the presence of
+        a PID lockfile, in knowing that a journal is being processed and detecting
+        stale PID lockfiles.
+    """
+    tmpfile, tmpfilename = tempfile.mkstemp()
+    entries = map(lambda x: x + "\n", entries)
+    file = open(tmpfilename, 'w')
+    try:
+      file.writelines(entries)
+    finally:
+      file.close()
+    
+    os.rename(tmpfilename, filename)
+    
+  def waitForNewFiles(self):
+    logger.info('Finished processing available files, Sleeping for %d seconds', self.loopTime)
+    time.sleep(self.loopTime)
Index: socorro/transmitter/__init__.py
===================================================================
