Index: socorro/unittest/transmitter/testTransmitter.py
===================================================================
--- socorro/unittest/transmitter/testTransmitter.py	(revision 0)
+++ socorro/unittest/transmitter/testTransmitter.py	(revision 0)
@@ -0,0 +1,90 @@
+import socorro.transmitter.transmitter as transmitter
+  
+class TestTransmitter(object):
+    
+  def setUp(self):
+    config = Config()
+    setattr(config.transmitterLoopTime, 'seconds', 6)
+    self.transmitter = TestableTransmitter(config)
+    self.transmitCount = 0    
+        
+  def testParseAndProcessEntry(self):
+    entries = ['1265336168.676271 11111111-f005-4980-9d6a-500a52100209',              # 0
+               '1265336168.676271 22222222-f005-4980-9d6a-500a52100209 OK',           # 1
+               '1265336168.676271 33333333-f005-4980-9d6a-500a52100209 FAIL',         # 2
+               '1265336168.676271',                                                   # 3
+               '33333333-f005-4980-9d6a-500a52100209',
+               '1265336168.676271 44444444-f005-4980-9d6a-500a52100209 UNKNOWN',
+               '1265336168.676271 55555555-f005-4980-9d6a-500a52100209 UNKNOWN CODE', # 6
+               '1265336168.676271 66666666-f005-4980-9d6a-500a52100209']              # 7
+    
+    setattr(self.transmitter, 'transmit', self.countGoodTransmit)    
+    actual  = self.transmitter.parseAndProcessEntry(entries, 0)
+    assert actual, 'Journal entry is good and has not be processed. We can do this.'
+    assert '1265336168.676271 11111111-f005-4980-9d6a-500a52100209 OK' == entries[0], 'Entry updated as Successful transmit'
+    assert self.transmitCount == 1, 'transmit should be called'
+
+    actual  = self.transmitter.parseAndProcessEntry(entries, 1)    
+    assert not actual, 'Journal entry is already processed.'
+    assert self.transmitCount == 1, 'no new calls to transmit'
+
+    actual  = self.transmitter.parseAndProcessEntry(entries, 2)    
+    assert not actual, 'Journal entry is already processed.'
+    assert self.transmitCount == 1, 'no new calls to transmit'
+
+    for x in range(4):
+      i = x + 3 # 3, 4, 5, 6
+      actual  = self.transmitter.parseAndProcessEntry(entries, i)
+      assert not actual, 'Journal entry is already processed.'
+      assert self.transmitCount == 1, 'no new calls to transmit'
+
+    setattr(self.transmitter, 'transmit', self.countBadTransmit)
+    actual  = self.transmitter.parseAndProcessEntry(entries, 7)
+    assert actual, 'Journal entry is already processed.'
+    assert self.transmitCount == 2, 'no new calls to transmit'
+    assert '1265336168.676271 66666666-f005-4980-9d6a-500a52100209 FAIL' == entries[7], 'Entry updated as Successful transmit'
+
+  #----------------- Test Helper Methods -------------------#
+
+  def countGoodTransmit(self, uuid):
+    """ Monkey patched Transmitter.transmit implementation """
+    self.transmitCount += 1
+    return True
+
+  def countBadTransmit(self, uuid):
+    """ Monkey patched Transmitter.transmit implementation """
+    self.transmitCount += 1
+    return False  
+
+#----------------- Mock Object Support -------------------#
+class value(object):
+  """ Mock value of config property """
+  pass
+
+class Config(object):
+  def __init__(self):
+    self.hbaseHost = ''
+    self.hbasePort = ''
+    self.transmitterLoopTime = value()
+    self.configValues = [self.hbaseHost, self.hbasePort, self.transmitterLoopTime]
+
+  #----------------- Probably an easier way to mock config(acts as module) -------------------#
+  def __iter__(self):
+    return self.configValues.__iter__()
+  def __len__(self):
+    return len(self.configValues)
+  def __contains__(self, v):
+    return True
+  def __getitem__(self, v):
+    return self.configValues[v]
+
+class TestableTransmitter(transmitter.Transmitter):
+  """ Mock Transmitter which is Network Neutered
+      Not to be confused with Network Neutrality
+  """
+  def connectToHBase(self):
+    pass
+
+  def transmit(self, uuid):
+    raise StandardError, 'transmit called during unit test. Provide an implementation. Programming error.'
+  # transmit should be monkey patched if called during a unit test
Index: socorro/integrationtest/README.txt
===================================================================
--- socorro/integrationtest/README.txt	(revision 1780)
+++ socorro/integrationtest/README.txt	(working copy)
@@ -1,8 +1,8 @@
 Socorro Integration tests
 
-Nothing much to say here: The old code was made obsolete by the materialized
-views code checked in 2009, August.
+nosetests testTransmitter.py
 
+TODO: Cleanup, Move comments below here into a unittest docstring or README.txt ?
 Small amount of testing can be done, as follows:
 
 From the unittest/cron directory, run fillDB -a [your choice of fill options] # note use -a
Index: socorro/integrationtest/transmitter/startNonHbaseTransmitter.py
===================================================================
--- socorro/integrationtest/transmitter/startNonHbaseTransmitter.py	(revision 0)
+++ socorro/integrationtest/transmitter/startNonHbaseTransmitter.py	(revision 0)
@@ -0,0 +1,39 @@
+#! /usr/bin/env python
+
+import logging
+import logging.handlers
+import sys
+
+try:
+  import config.transmitterconfig as config
+except ImportError:
+  import transmitterconfig as config
+
+#import socorro.integrationtest.noneHbaseTransmitter
+import noneHbaseTransmitter
+import socorro.lib.ConfigurationManager as configurationManager
+
+try:
+  configurationContext = configurationManager.newConfiguration(configurationModule=config, applicationName="Socorro Monitor 2.0")
+except configurationManager.NotAnOptionError, x:
+  print >> sys.stderr, x
+  print >> sys.stderr, "for usage, try --help"
+  sys.exit()
+
+logger = logging.getLogger("transmitter")
+logger.setLevel(logging.DEBUG)
+
+stderrLog = logging.handlers.SysLogHandler(configurationContext.logSysLogAddress, configurationContext.logSysLogFacility)
+stderrLog.setLevel(configurationContext.logFileErrorLoggingLevel)
+stderrLogFormatter = logging.Formatter(configurationContext.logFileLineFormatString)
+stderrLog.setFormatter(stderrLogFormatter)
+logger.addHandler(stderrLog)
+
+logger.debug("current configuration\n%s", str(configurationContext))
+
+try:
+#  t = socorro.integrationtest.noneHbaseTransmitter.NoneHbaseTransmitter(configurationContext)
+  t = noneHbaseTransmitter.NoneHbaseTransmitter(configurationContext)
+  t.main()
+finally:
+  logger.info("done.")

Property changes on: socorro/integrationtest/transmitter/startNonHbaseTransmitter.py
___________________________________________________________________
Added: svn:executable
   + *

Index: socorro/integrationtest/transmitter/testTransmitter.py
===================================================================
--- socorro/integrationtest/transmitter/testTransmitter.py	(revision 0)
+++ socorro/integrationtest/transmitter/testTransmitter.py	(revision 0)
@@ -0,0 +1,109 @@
+""" See bashenv... Having trouble with nosetests, for now run as
+    To run this integraton test, cd into this current directory.
+    nosetests testTransmitter.py
+
+    Okay figured out nosetests and ipython trouble... our unittest directory was overwriting /usr/lib/python2.4/unittest.py
+
+    Sets up various senarios, tests, and cleans up.
+    If tests fail and you need to debug, uncomment print statements and disable the second call to cleanUpFromLasttime
+"""
+from glob import glob
+import logging
+import logging.handlers
+import os
+import time
+
+from socorro.collector.hbaseJournal import HBaseJournal
+
+class TestTransmitterIntegration(object):
+  def testNormalTransmission(self):
+    """ Pretty straight forward conditions, 4 deamons processing 4 logs """
+    journalFilename = './hbase-journal.log'
+    self.cleanUpFromLasttime(journalFilename)
+  
+    rolledJournalSuffix = ['.2010-02-04_18-16', '.2010-02-04_18-19', '.2010-02-04_18-30', '.2010-02-04_18-20']
+    uuids = [['00000000-f001-4980-9d6a-500a52100209', '00000000-f002-4980-9d6a-500a52100209', '00000000-f003-4980-9d6a-500a52100209',
+              '00000000-f004-4980-9d6a-500a52100209', '00000000-f005-4980-9d6a-500a52100209'],
+             
+             ['11111111-f001-4980-9d6a-500a52100209', '11111111-f002-4980-9d6a-500a52100209', '11111111-f003-4980-9d6a-500a52100209',
+              '11111111-f004-4980-9d6a-500a52100209', '11111111-f005-4980-9d6a-500a52100209'],
+             
+             ['22222222-f001-4980-9d6a-500a52100209', '22222222-f002-4980-9d6a-500a52100209', '22222222-f003-4980-9d6a-500a52100209',
+              '22222222-f004-4980-9d6a-500a52100209', '22222222-f005-4980-9d6a-500a52100209'],
+             
+             ['33333333-f001-4980-9d6a-500a52100209', '33333333-f002-4980-9d6a-500a52100209', '33333333-f003-4980-9d6a-500a52100209',
+              '33333333-f004-4980-9d6a-500a52100209', '33333333-f005-4980-9d6a-500a52100209']]
+    
+    # TODO self.filloutUuidsTo999
+    for x in zip(rolledJournalSuffix, uuids):
+      suffix, uuidrow = x
+      logger = self.makeLogger(journalFilename + suffix)
+      for uuid in uuidrow:
+        #print "journaling %s" % uuid
+        logger.info(uuid)
+      
+    # Simulate collectors logs which SHOULD NOT BE processed
+    inUseJournal = self.makeLogger(journalFilename)
+    badUuids = ['99999999-f001-4980-9d6a-500a52100209', '99999999-f002-4980-9d6a-500a52100209', '99999999-f003-4980-9d6a-500a52100209',
+              '99999999-f004-4980-9d6a-500a52100209', '99999999-f005-4980-9d6a-500a52100209']
+    for uuid in badUuids:
+      inUseJournal.info(uuid)
+
+    logging.shutdown() # Make sure ALL our test data is on disk
+    
+    #print "launching code"
+    pid1 = os.spawnlp(os.P_NOWAIT, './startNonHbaseTransmitter.py', './startNonHbaseTransmitter.py', '--rolledJournalRoot', './')
+    pid2 = os.spawnlp(os.P_NOWAIT, './startNonHbaseTransmitter.py', './startNonHbaseTransmitter.py', '--rolledJournalRoot', './')
+    pid3 = os.spawnlp(os.P_NOWAIT, './startNonHbaseTransmitter.py', './startNonHbaseTransmitter.py', '--rolledJournalRoot', './')
+    pid4 = os.spawnlp(os.P_NOWAIT, './startNonHbaseTransmitter.py', './startNonHbaseTransmitter.py', '--rolledJournalRoot', './')    
+    os.waitpid(-1, 0)
+    #print 'done waiting'
+    #grasping for straws here.... give disk time to catch up before we verify contents of actual data transferred
+    time.sleep(2)
+    actual = []
+    for x in [pid1, pid2, pid3, pid4]:
+      actualFile = './actual%d' % x
+      if os.path.exists(actualFile):
+        f = open(actualFile, 'r')
+        wholeFile = f.read()
+        #print 'whole file = ' + wholeFile
+        parts = wholeFile.splitlines()
+        #print 'parts = ' + str(parts)
+        actual += parts
+        f.close()
+        #print 'actual = ' + str(actual)
+    expected = []
+    for row in uuids:
+      for uuid in row:
+        expected.append(uuid)
+
+    #print 'expected = ' + str(expected)
+    
+    assert len(actual) == len(expected), "We have the same number of entries in the log as was processed %d == %d" % (len(actual), len(expected))
+
+    # And every expected UUID is in the list
+    for x in expected:
+      actual.index(x) # Throws ValueError if we lost an entry
+
+    # Disable this cleanUpFromLasttime to keep actual<pid> files, etc
+    assert os.path.exists(journalFilename), 'Collector Journal Log should have been processed and deleted by transmitter'
+    self.cleanUpFromLasttime(journalFilename)
+
+  #--------------- Test Helper Methods ---------------#
+      
+  def makeLogger(self, logfilename):
+    journal = logging.getLogger(logfilename)
+    journal.setLevel(logging.DEBUG)
+    fileLog = logging.FileHandler(logfilename)
+    fileLog.setLevel(logging.DEBUG)
+    fileLog.setFormatter(logging.Formatter(HBaseJournal.JOURNAL_FORMAT))
+    journal.addHandler(fileLog)
+    return journal
+
+  def cleanUpFromLasttime(self, inUseJournalFilename):
+    for actual in glob('./actual*'):
+      #print 'Removing %s from current directory' % actual
+      os.remove(actual)
+      
+#test = IntegrationTestTransmitter()
+#test.testTransmission()
Index: socorro/integrationtest/transmitter/noneHbaseTransmitter.py
===================================================================
--- socorro/integrationtest/transmitter/noneHbaseTransmitter.py	(revision 0)
+++ socorro/integrationtest/transmitter/noneHbaseTransmitter.py	(revision 0)
@@ -0,0 +1,26 @@
+import logging
+import os
+import sys
+
+from transmitter.transmitter import Transmitter
+
+logger = logging.getLogger("transmitter")
+
+class NoneHbaseTransmitter(Transmitter):
+  def connectToHBase(self):
+    pass
+
+  def transmit(self, entry):
+    testResults = './actual' + str(os.getpid())
+    logger.info('Storing %s into %s', entry, testResults)
+    f = open(testResults, 'a')
+    f.write(entry + '\n')
+    # this didn't seem to help, sleeping in integration test runner
+    f.flush()
+    os.fsync(f)
+    f.close()
+
+    return True
+
+  def waitForNewFiles(self):
+    sys.exit(0)
Index: socorro/collector/hbaseJournal.py
===================================================================
--- socorro/collector/hbaseJournal.py	(revision 0)
+++ socorro/collector/hbaseJournal.py	(revision 0)
@@ -0,0 +1,45 @@
+
+import logging
+import logging.handlers
+
+class HBaseJournal(object):
+  """ Simple wrapper class to isolate how collector
+      writes to an hbase journal file.
+
+      TBD: Should this be SysLogFileHandler with rotation
+           happening outside of Python?
+
+      If you alter the format of HBaseJournal.JOURNALFORMAT, also update:
+      * transmitter.PROCESSED_ENTRY_LENGTH
+      * transmitter.NEW_ENTRY_LENGTH
+  """
+  
+  def __init__(self, config):
+    super(HBaseJournal, self).__init__()    
+    assert "hbaseJournalPathname"   in config, "hbaseJournalPathname is missing from the configuration"
+
+    """ TBD
+    assert "hbaseJournalWhenRolled" in config, "hbaseJournalWhenRolled is missing from the configuration"
+    assert "hbaseJournalInterval"   in config, "hbaseJournalInterval is missing from the configuration"
+    """
+
+    self.logger = logging.getLogger("hbaseJournal")
+    self.logger.setLevel(logging.DEBUG)
+    
+    # Ya, this didn't work out so much... TBD with IT's help http://code.google.com/p/socorro/wiki/SocorroTransmitter#Collector_Details
+    # fileLog = logging.handlers.TimedRotatingFileHandler(config.hbaseJournalPathname, config.hbaseJournalWhenRolled, int(config.hbaseJournalInterval))
+    fileLog = logging.FileHandler(config.hbaseJournalPathname)
+
+    fileLog.setLevel(logging.DEBUG)
+    fileLog.setFormatter(logging.Formatter(HBaseJournal.JOURNAL_FORMAT))
+    self.logger.addHandler(fileLog)
+
+
+  JOURNAL_FORMAT = '%(created)f %(message)s'    
+
+  def log(self, uuid):
+    """ Records a UUID of a crash to be transfered to HBase.
+        This will crate a journal entry which wil go on
+        to be proccessed by transmitter.Transmitter
+    """
+    self.logger.info(uuid)
Index: socorro/collector/hbaseClient.py
===================================================================
--- socorro/collector/hbaseClient.py	(revision 1780)
+++ socorro/collector/hbaseClient.py	(working copy)
@@ -1,236 +0,0 @@
-#!/usr/bin/python
-import simplejson as json
-
-from thrift import Thrift
-from thrift.transport import TSocket, TTransport
-from thrift.protocol import TBinaryProtocol
-from hbase import ttypes
-from hbase.hbasethrift import Client, ColumnDescriptor, Mutation
-
-class HBaseConnection(object):
-  """
-  Base class for hbase connections.  Supplies methods for a few basic
-  queries and methods for cleanup of thrift results.
-  """
-  def __init__(self,host,port):
-    """
-    Establishes the underlying connection to hbase
-    """
-    # Make socket
-    transport = TSocket.TSocket(host, port)
-    # Buffering is critical. Raw sockets are very slow
-    self.transport = TTransport.TBufferedTransport(transport)
-    # Wrap in a protocol
-    self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
-    # Create a client to use the protocol encoder
-    self.client = Client(self.protocol)
-    # Connect!
-    self.transport.open()
-
-  def close(self):
-    """
-    Close the hbase connection
-    """
-    self.transport.close()
-
-  def _make_rows_nice(self,client_result_object):
-    """
-    Apply _make_row_nice to multiple rows
-    """
-    res = [self._make_row_nice(row) for row in client_result_object]
-    return res
-
-  def _make_row_nice(self,client_row_object):
-    """
-    Pull out the contents of the thrift column result objects into a python dict
-    """
-    columns = {}
-    for column in client_row_object.columns.keys():
-      columns[column]=client_row_object.columns[column].value
-    return columns
-
-  def describe_table(self,table_name):
-    return self.client.getColumnDescriptors(table_name)
-
-  def get_full_row(self,table_name, row_id):
-    """
-    Get back every column value for a specific row_id
-    """
-    return self._make_rows_nice(self.client.getRow(table_name, row_id))
-
-class HBaseConnectionForCrashReports(HBaseConnection):i
-  """
-  A subclass of the HBaseConnection class providing more crah report specific methods
-  """
-  def __init__(self,host,port):
-    super(HBaseConnectionForCrashReports,self).__init__(host,port)
-
-  def _make_row_nice(self,client_row_object):
-    columns = super(HBaseConnectionForCrashReports,self)._make_row_nice(client_row_object)
-    columns['ooid'] = client_row_object.row[6:]
-    return columns
-
-  def get_report(self,ooid):
-    """
-    Return the full row for a given ooid
-    """
-    return self.get_full_row('crash_reports',ooid[-6:]+ooid)[0]
-
-  def get_json(self,ooid):
-    """
-    Return the json metadata for a given ooid
-    """
-    return json.loads(self._make_rows_nice(self.client.getRowWithColumns('crash_reports',ooid[-6:]+ooid,['meta_data:json']))[0]["meta_data:json"])
-
-  def get_dump(self,ooid):
-    """
-    Return the minidump for a given ooid
-    """
-    return self.client.getRowWithColumns('crash_reports',ooid[-6:]+ooid,['raw_data:dump'])[0].columns['raw_data:dump'].value
-
-  def get_jsonz(self,ooid):
-    """
-    Return the cooked json for a given ooid
-    """
-    return json.loads(self._make_rows_nice(self.client.getRowWithColumns('crash_reports',ooid[-6:]+ooid,['processed_data:json']))[0]["processed_data:json"])
-
-  def scan_starting_with(self,prefix,limit=None):
-    """
-    Reurns a generator yield rows starting with prefix.  Remember
-    that ooids are stored internally with their 6 digit date used as a prefix!
-    """
-    scanner = self.client.scannerOpenWithPrefix('crash_reports', prefix, ['meta_data:json'])
-    i = 0
-    r = self.client.scannerGet(scanner)
-    while r and (not limit or i < int(limit)):
-      yield self._make_row_nice(r[0])
-      r = self.client.scannerGet(scanner)
-      i+=1
-    self.client.scannerClose(scanner)
-
-  def create_ooid(self,ooid,json,dump):
-    """
-    Create a crash report record in hbase from serialized json and 
-    bytes of the minidump
-    """
-    row_id = ooid[-6:]+ooid
-    self.client.mutateRow('crash_reports',row_id,[Mutation(column="meta_data:json",value=json), Mutation(column="raw_data:dump",value=dump)])
-
-  def create_ooid_from_file(self,ooid,json_path,dump_path):
-    """
-    Convenience method for creating an ooid from disk
-    """
-    json_file = open(json_path,'r')
-    #Apparently binary mode only matters in windows, but it won't hurt anything on unix systems.
-    dump_file = open(dump_path,'rb')
-    json = json_file.read()
-    dump = dump_file.read()
-    json_file.close()
-    dump_file.close()
-    self.create_ooid(ooid,json,dump)
-
-  def create_ooid_from_jsonz(self,ooid,jsonz_string):
-    """
-    Create a crash report from the cooked json output of the processor
-    """
-    row_id = ooid[-6:]+ooid
-    self.client.mutateRow('crash_reports',row_id,[Mutation(column="processed_data:json",value=jsonz_string)])
-
-
-if __name__=="__main__":
-  import pprint
-  import sys
-
-  def ppjson(data, sort_keys=False, indent=4):
-    print json.dumps(data, sort_keys, indent)
-
-  def usage():
-    print """
-  Usage: %s [-h host[:port]] command [arg1 [arg2...]]
-
-  Commands:
-    Crash Report specific:
-      get_report ooid
-      get_json ooid
-      get_dump ooid
-      scan_starting_with YYMMDD [limit]
-      create_ooid ooid json dump
-      create_ooid_from_file ooid json_path dump_path
-      test
-    HBase generic:
-      describe_table table_name
-      get_full_row table_name row_id
-  """ % sys.argv[0]
-
-  if len(sys.argv) <= 1 or sys.argv[1] == '--help':
-    usage()
-    sys.exit(0)
-
-  pp = pprint.PrettyPrinter(indent = 2)
-  host = 'localhost'
-  port = 9090
-  argi = 1
-
-  if sys.argv[argi] == '-h':
-    parts = sys.argv[argi+1].split(':')
-    host = parts[0]
-    if len(parts) == 2:
-      port = int(parts[1])
-    argi += 2
-
-
-  cmd = sys.argv[argi]
-  args = sys.argv[argi+1:]
-
-  connection = HBaseConnectionForCrashReports(host, port)
-
-  if cmd == 'get_report':
-    if len(args) != 1:
-      usage()
-      sys.exit(1)
-    pp.pprint(connection.get_report(*args))
-
-  elif cmd == 'get_json':
-    if len(args) != 1:
-      usage()
-      sys.exit(1)
-    ppjson(connection.get_json(*args))
-
-  elif cmd == 'get_dump':
-    if len(args) != 1:
-      usage()
-      sys.exit(1)
-    print(connection.get_dump(*args))
-
-  elif cmd == 'scan_starting_with':
-    if len(args) < 1:
-      usage()
-      sys.exit(1)
-    for row in connection.scan_starting_with(*args):
-      ppjson(row)
-
-  elif cmd == 'create_ooid':
-    if len(args) != 3:
-      usage()
-      sys.exit(1)
-    ppjson(connection.create_ooid(*args))
-
-  elif cmd == 'create_ooid_from_file':
-    if len(args) != 3:
-      usage()
-      sys.exit(1)
-    ppjson(connection.create_ooid_from_file(*args))
-
-  elif cmd == 'describe_table':
-    if len(args) != 1:
-      usage()
-      sys.exit(1)
-    ppjson(connection.describe_table(*args))
-
-  elif cmd == 'get_full_row':
-    if len(args) != 2:
-      usage()
-      sys.exit(1)
-    pp.pprint(connection.get_full_row(*args))
-
-  connection.close()
Index: socorro/collector/initializer.py
===================================================================
--- socorro/collector/initializer.py	(revision 1780)
+++ socorro/collector/initializer.py	(working copy)
@@ -3,16 +3,10 @@
 import logging.handlers
 
 import socorro.lib.ConfigurationManager
-import socorro.lib.util as sutil
 import socorro.lib.JsonDumpStorage as jds
-
 import socorro.collector.collect as collect
+import hbaseJournal
 
-import hbaseClient
-
-#for perf, remove me soon
-import time
-
 #-----------------------------------------------------------------------------------------------------------------
 def createPersistentInitialization(configModule):
   storage = {}
@@ -29,6 +23,8 @@
   rotatingFileLog.setFormatter(rotatingFileLogFormatter)
   logger.addHandler(rotatingFileLog)
 
+  storage["hbaseJournal"] = hbaseJournal.HBaseJournal(config)
+
   logger.info("current configuration\n%s", str(config))
 
   standardFileSystemStorage = jds.JsonDumpStorage(root = config.storageRoot,
@@ -50,9 +46,4 @@
                                                  )
   storage["deferredFileSystemStorage"] = deferredFileSystemStorage
 
-  beforeCreate = time.time()
-  hbaseConnection = hbaseClient.HBaseConnectionForCrashReports( config.hbaseHost, config.hbasePort)
-  storage["hbaseConnection"] = hbaseConnection
-  logger.info("Time to Create hbase conn %s" % (time.time() - beforeCreate))
-
   return storage
Index: socorro/collector/modpython-collector.py
===================================================================
--- socorro/collector/modpython-collector.py	(revision 1780)
+++ socorro/collector/modpython-collector.py	(working copy)
@@ -9,11 +9,6 @@
 import socorro.lib.util as sutil
 import socorro.lib.ooid as ooid
 
-#profiling, remove me soon
-import time
-def mprofile(message, before, logger):
-  logger.info("%s %s" % (time.time() - before, message))
-
 #-----------------------------------------------------------------------------------------------------------------
 if __name__ != "__main__":
   from mod_python import apache
@@ -26,7 +21,6 @@
 
 #-----------------------------------------------------------------------------------------------------------------
 def handler(req):
-  beforeHandler = time.time()
   global persistentStorage
   try:
     x = persistentStorage
@@ -79,26 +73,14 @@
 
       jsonFileHandle, dumpFileHandle = fileSystemStorage.newEntry(uuid, persistentStorage["hostname"], dt.datetime.now())
       try:
-
-        beforeDisk = time.time()
         collectObject.storeDump(dump.file, dumpFileHandle)
         collectObject.storeJson(jsonDataDictionary, jsonFileHandle)
-        mprofile("Wrote to disk", beforeDisk, logger) 
+        persistentStorage["hbaseJournal"].log(uuid)        
       finally:
         dumpFileHandle.close()
         jsonFileHandle.close()
 
-      try:
-        dumpData = dump.file.read()
-        logger.info("about to create ooid %s" % str(jsonDataDictionary))
-        beforeHbase = time.time()
-        persistentStorage["hbaseConnection"].create_ooid(uuid, str(jsonDataDictionary), dumpData)
-        mprofile("Wrote to hbase", beforeHbase, logger)
-      except:
-        sutil.reportExceptionAndContinue(logger)
-
       req.write("CrashID=%s%s\n" % (config.dumpIDPrefix, uuid))
-      mprofile("Finished Handler", beforeHandler, logger)
       return apache.OK
     except:
       logger.info("mod-python subinterpreter name: %s", req.interpreter)
Index: socorro/transmitter/transmitter.py
===================================================================
--- socorro/transmitter/transmitter.py	(revision 0)
+++ socorro/transmitter/transmitter.py	(revision 0)
@@ -0,0 +1,266 @@
+import errno
+import fcntl
+from glob import glob
+import logging
+import os
+import os.path
+import tempfile
+import time
+
+import hbase.ttypes
+
+import hbaseClient
+import socorro.lib.util
+
+logger = logging.getLogger("transmitter")
+
+class Transmitter(object):
+  """ Run via scripts/startTransmitter.py
+
+      Copies raw crash dump and metadata json files over to HBase for storage
+      on the HDFS. The collector creates a journal of crash uuids which
+      it as accepted and written to disk. This journal is rolled periodically
+      and transmitters look for journal files which are not actively in use
+      by a collector (rolled) or another transmitter (locked).
+
+      Rolled journals MUST NOT BE stored on NFS. These files are small and will
+      be removed automatically by a startTransmitter.py process.
+
+      Once an available rolled journal is found the transmitter works through
+      the file transmitting individual crashes to HBase and updating the journal
+      with an OK or FAIL code.
+
+      journals are removed from the filesystem once 100% transmitted and this
+      cycle begins again.
+
+      Transmitter's are multi-processes safe, but not multi-thread safe.
+      Processes may be killed at any time and restarted at any time.
+  """
+  def __init__(self, config):
+    super(Transmitter, self).__init__()
+    assert "hbaseHost" in config, "hbaseHost is missing from the configuration"
+    assert "hbasePort" in config, "hbasePort is missing from the configuration"
+    assert "hbaseTimeout" in config, "hbaseTimeout is missing from the configuration"    
+    
+    assert "transmitterLoopTime" in config, "transmitterLoopTime is missing from the configuration"
+
+    self.loopTime = config.transmitterLoopTime.seconds
+    self.config = config
+    self.hbaseConnection = None
+    self.lockFile = None
+
+    self.connectToHBase()
+
+  def connectToHBase(self):
+    """ Sets or refreshes the hbaseConnection """
+    logger.info("Establishing Thrift Connection to %s on port %d with %d timeout", self.config.hbaseHost, int(self.config.hbasePort), int(self.config.hbaseTimeout))
+    while True:
+      try:
+        self.hbaseConnection = hbaseClient.HBaseConnectionForCrashReports(self.config.hbaseHost, int(self.config.hbasePort), int(self.config.hbaseTimeout))
+        break
+      except Exception, e:
+        logger.info('FAILED to open connection to hbase, caught %s. Sleeping and will retry', type(e))
+        socorro.lib.util.reportExceptionAndContinue(logger)
+        time.sleep(30)
+    # TODO - should we FAIL hard and sys.exit(1) instead of loop sleep pattern?
+    logger.info("Connection established")
+    
+  def main(self):
+    while (True):
+      newJournalFiles = self.searchForNewJournalFiles(self.config.rolledJournalRoot)
+      for filename in newJournalFiles:
+        if self.obtainLockOnJournal(filename):
+          self.transmitEachLine(filename)
+      self.waitForNewFiles()
+
+  def searchForNewJournalFiles(self, path):
+    """ hbase journal files are a log of successfully collected crashes uuids.
+        The are stored in a flat directory with a time extension. We are only
+        interested in the 'rolled' files.
+
+        Example: /tmp/hbase-journal.log                        (in use)
+                 /tmp/hbase-journal.log.2010-02-04_18-23       (rolled)
+                 /tmp/hbase-journal.log.2010-02-04_18-23.lock  (locked)
+
+       TBD: the in use journal might not exist on this box, as the
+       transmitter could be run anywhere.
+
+        Config Values that affect this are:
+          hbaseJournalFilename
+    """
+    return glob(path + 'hbase-journal.log.[0-9]*-[0-9][0-9]')
+
+  def obtainLockOnJournal(self, filename):
+    """ Attempt to lock a journal in a multi-process safe way.
+        We will end up creating files that signals other processes
+        that this journal is being processed.
+
+        returns False if someone else beats us to the punch
+
+        To free a lock, terminate the transmitter processes.
+
+        Old stale lock files maybe freely deleted. Their apperance should be
+        rare and don't block processing as locking is a system call and not the
+        presence of the lock file. Do not delete a lock file which is in use,
+        as this could result in race conditions.
+
+        Not NFS safe
+    """
+    # Our list of journal files could be out of date
+    if not os.path.exists(filename):
+      logger.debug("Stale file from list, skipping missing journal %s", filename)
+      return False
+    lockFilename = self.lockFilename(filename)
+    self.lockFile = open(lockFilename, 'w')
+    try:
+      fcntl.flock(self.lockFile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
+      logger.debug("LOCK SUCCESS %s", filename)
+      return True
+    except IOError, e:
+      if errno.EACCES == e.errno or errno.EAGAIN == e.errno:
+        logger.debug("Unable to obtain a lock on %s skipping", filename)
+        return False
+      else:
+        socorro.lib.util.reportExceptionAndContinue(logger)
+        raise e
+
+  def lockFilename(self, filename):
+    return filename + '.lock'
+
+  def transmitEachLine(self, filename):
+    """ We read the journal's entire contents into memory and
+        transmit crashes to hbase. As we mark uuid's as finished
+        we update the journal. Finally this rolled journal log
+        is deleted and lock file is removed.
+    """
+    try:
+      entries = self.loadJournalEntries(filename)
+      logger.debug('Processing %d entries', len(entries))
+      for i in range(len(entries)):
+        try:
+          if self.parseAndProcessEntry(entries, i):            
+            self.checkpointJournal(filename, entries)
+        except Exception, e:
+          logger.error("Unknown error %s, while processing [%s]", type(e), entries[i])
+          socorro.lib.util.reportExceptionAndContinue(logger)
+      #remove rolled journal log and our lock
+      if os.path.exists(filename):
+        logger.info("Finished processing journal, removing %s", filename)
+        os.remove(filename)
+      else:
+        logger.warn('Odd, journal %s disappeared before we could cleanup', filename)
+    finally:
+      logger.debug("Giving up lock on journal, removing %s", self.lockFile.name)
+      self.lockFile.close()
+      os.remove(self.lockFile.name)
+
+  def parseAndProcessEntry(self, entries, i):
+    """ Returns True is the entry was processed (transmitted or failure)
+                False if the entry was skipped
+    """
+    parts = entries[i].split(' ')
+    if len(parts) == PROCESSED_ENTRY_LENGTH and parts[TRANSMIT_STATUS] == 'OK':
+      logging.info("Skipping already transferred entry %s", parts[1])
+    elif len(parts) == NEW_ENTRY_LENGTH:
+      # write to hbase including retry logic
+      if self.transmit(parts[1]):
+        entries[i] = entries[i] + " OK"
+      else:
+        entries[i] = entries[i] + " FAIL"
+      # Either OK or FAIL, we still want to checkpoint the journal
+      return True
+    elif len(parts) == PROCESSED_ENTRY_LENGTH and parts[TRANSMIT_STATUS] == 'FAIL':
+      logger.debug('Skipping previously failed entry %s', entries[1])
+    else:
+      logger.error("Invalid journal entr[_%s]", entries[i])
+    return False
+      
+  def loadJournalEntries(self, filename):
+    try:
+      if os.path.exists(filename):
+        journal = open(filename, 'r')
+        data = journal.read()
+        journal.close()
+        return data.splitlines()
+    except Exception:
+      socorro.lib.util.reportExceptionAndContinue(logger)
+    return[]
+  
+  def transmit(self, entry):
+    """ Attempt to transfer a crash (dump and json) over
+        into the hbase system. If there is a network error
+        we will retry and backoff in an infinite loop.
+
+        Once successfully transmitted, we return True
+
+        If there is another type of error, then we will
+        log it and return False. This is a source of
+        lost transmissions.
+    """
+    dump     = entry + '.dump'
+    metadump = entry + '.json'
+    if not os.path.exists(dump):
+      logger.error("Binary dump file %s.dump does not exist, unable to transmit from journal", entry)
+      return False
+
+    if not os.path.exists(metadump):
+      logger.error("JSON metadata file %s.json does not exist, unable to transmit from journal", entry)
+      return False
+    
+    uuid = os.path.basename(entry)
+    logger.info('Transmitting %s to %s', uuid, self.config.hbaseHost)
+    throttle = 1
+    while True:
+      try:
+        self.hbaseConnection.create_ooid_from_file(uuid, metadump, dump)
+        return True
+      except IOError, hbase.ttypes.IOError:
+        # Get a fresh connection, perhaps letting load balancer give us a fresh host
+        self.connectToHBase()
+        # Backoff calls to HBase from 300 milliseconds, 3.3 seconds, etc to 30 seconds
+        throttle += 10
+        if throttle > 100:
+          logger.critical('Failed 10 requests to transmit crash to hbase, restarting loop')
+          throttle = 1
+        else:
+          logger.error('Failed to transmit crash to hbase, backing off')
+        socorro.lib.util.reportExceptionAndContinue(logger)
+        throttleSeconds = throttle * 0.3
+        time.sleep(throttleSeconds)
+      except Exception, x:
+        logger.error("Unknown error %s, Failed to transmit %s", type(x), uuid)
+        socorro.lib.util.reportExceptionAndContinue(logger)
+        return False
+
+  def checkpointJournal(self, filename, entries):
+    """ Calling this method updates the last modified time of the
+        journal log file. This is useful, when combined with the presence of
+        a PID lockfile, in knowing that a journal is being processed and detecting
+        stale PID lockfiles.
+    """
+    _, tmpfilename = tempfile.mkstemp()
+    entries = map(lambda x: x + "\n", entries)
+    tmpFile = open(tmpfilename, 'w')
+    try:
+      tmpFile.writelines(entries)
+    finally:
+      tmpFile.close()
+    logger.debug('Renaming %s to %s', tmpfilename, filename)
+    os.rename(tmpfilename, filename)
+    
+  def waitForNewFiles(self):
+    logger.info('Finished processing available files, Sleeping for %d seconds', self.loopTime)
+    time.sleep(self.loopTime)
+
+""" These Constants are tied to the format of the journal log being split on
+    whitespace. See hbaseJournal.JOURNAL_FORMAT
+
+    time        UUID                                 Transmit Status
+    123456.1234 ac60196f-9e1b-4a31-810a-41a562100203
+    123456.1234 ac60196f-9e1b-4a31-810a-41a562100203 FAIL
+    123456.1234 ac60196f-9e1b-4a31-810a-41a562100203 OK
+"""
+PROCESSED_ENTRY_LENGTH = 3
+NEW_ENTRY_LENGTH       = 2
+
+TRANSMIT_STATUS        = 2
Index: socorro/transmitter/__init__.py
===================================================================
Index: socorro/transmitter/hbaseClient.py
===================================================================
--- socorro/transmitter/hbaseClient.py	(revision 0)
+++ socorro/transmitter/hbaseClient.py	(working copy)
@@ -1,16 +1,15 @@
 #!/usr/bin/python
 import simplejson as json
 
-from thrift import Thrift
 from thrift.transport import TSocket, TTransport
 from thrift.protocol import TBinaryProtocol
-from hbase import ttypes
-from hbase.hbasethrift import Client, ColumnDescriptor, Mutation
+from hbase.hbasethrift import Client, Mutation
 
 class HBaseConnection(object):
-  def __init__(self,host,port):
+  def __init__(self, host, port, timeout):
     # Make socket
     transport = TSocket.TSocket(host, port)
+    transport.setTimeout(timeout)
     # Buffering is critical. Raw sockets are very slow
     self.transport = TTransport.TBufferedTransport(transport)
     # Wrap in a protocol
@@ -40,8 +39,8 @@
     return self._make_rows_nice(self.client.getRow(table_name, row_id))
 
 class HBaseConnectionForCrashReports(HBaseConnection):
-  def __init__(self,host,port):
-    super(HBaseConnectionForCrashReports,self).__init__(host,port)
+  def __init__(self,host,port, timeout):
+    super(HBaseConnectionForCrashReports,self).__init__(host,port, timeout)
 
   def _make_row_nice(self,client_row_object):
     columns = super(HBaseConnectionForCrashReports,self)._make_row_nice(client_row_object)
@@ -61,6 +60,7 @@
     return json.loads(self._make_rows_nice(self.client.getRowWithColumns('crash_reports',ooid[-6:]+ooid,['processed_data:json']))[0]["processed_data:json"])
 
   def scan_starting_with(self,prefix,limit=None):
+    print 'Scanning ' + prefix
     scanner = self.client.scannerOpenWithPrefix('crash_reports', prefix, ['meta_data:json'])
     i = 0
     r = self.client.scannerGet(scanner)
Index: scripts/startTransmitter.py
===================================================================
--- scripts/startTransmitter.py	(revision 0)
+++ scripts/startTransmitter.py	(revision 0)
@@ -0,0 +1,37 @@
+#! /usr/bin/env python
+
+import logging
+import logging.handlers
+
+try:
+  import config.transmitterconfig as config
+except ImportError:
+  import transmitterconfig as config
+
+import socorro.transmitter.transmitter as transmitter
+import socorro.lib.ConfigurationManager as configurationManager
+
+try:
+  configurationContext = configurationManager.newConfiguration(configurationModule=config, applicationName="Socorro Monitor 2.0")
+except configurationManager.NotAnOptionError, x:
+  import sys
+  print >> sys.stderr, x
+  print >> sys.stderr, "for usage, try --help"
+  sys.exit()
+
+logger = logging.getLogger("transmitter")
+logger.setLevel(logging.DEBUG)
+
+stderrLog = logging.handlers.SysLogHandler(configurationContext.logSysLogAddress, configurationContext.logSysLogFacility)
+stderrLog.setLevel(configurationContext.logFileErrorLoggingLevel)
+stderrLogFormatter = logging.Formatter(configurationContext.logFileLineFormatString)
+stderrLog.setFormatter(stderrLogFormatter)
+logger.addHandler(stderrLog)
+
+logger.debug("current configuration\n%s", str(configurationContext))
+
+try:
+  t = transmitter.Transmitter(configurationContext)
+  t.main()
+finally:
+  logger.info("done.")

Property changes on: scripts/startTransmitter.py
___________________________________________________________________
Added: svn:executable
   + *


Property changes on: scripts/config
___________________________________________________________________
Added: svn:mergeinfo
   Merged /branches/socorro-ui/scripts/config:r1194-1215
   Merged /branches/dyconfig/scripts/config:r1608-1671
   Merged /branches/lars13/scripts/config:r1620-1652
   Merged /branches/normalization/scripts/config:r1025-1250
   Merged /branches/lars14-collector/scripts/config:r1703-1758
   Merged /branches/griswolf-work/scripts/config:r785-1218
   Merged /branches/db-neutral/scripts/config:r721-784
   Merged /branches/lars14-collector/webapp-php/scripts/config:r1703-1732
   Merged /branches/lars14-collector:r1703-1758

Index: scripts/config/transmitterconfig.py.dist
===================================================================
--- scripts/config/transmitterconfig.py.dist	(revision 0)
+++ scripts/config/transmitterconfig.py.dist	(revision 0)
@@ -0,0 +1,54 @@
+import logging
+import socorro.lib.ConfigurationManager as cm
+
+# Storage constants
+
+hbaseHost = cm.Option()
+hbaseHost.doc = 'Hostname for hbase hadoop cluster. May be a VIP or load balancer'
+hbaseHost.default = 'hbase-hostname.example.com'
+
+hbasePort = cm.Option()
+hbasePort.doc = 'hbase port number'
+hbasePort.default = 9090
+
+hbaseTimeout = cm.Option()
+hbaseTimeout.doc = 'Connection timeout for connecting and calls to HBase. Time in milliseconds (Thrift transport layer)'
+hbaseTimeout.default = 30000
+
+rolledJournalRoot = cm.Option()
+rolledJournalRoot.doc = 'The root of the filesystem which contains collector journal files after they have been rolled. *must not be NFS*'
+rolledJournalRoot.default = '/home/aking/breakpad/socorro-hdfs/test/'
+
+transmitterLoopTime = cm.Option()
+transmitterLoopTime.doc = 'the time to wait between attempts to get jobs (HHH:MM:SS)'
+transmitterLoopTime.default = '0:00:06'
+transmitterLoopTime.fromStringConverter = cm.timeDeltaConverter
+
+logSysLogAddress = cm.Option()
+logSysLogAddress.doc = "Hostname and port for SysLog messages. Example: (localhost', 514') or '/dev/log'"
+logSysLogAddress.default = '/dev/log'
+
+logSysLogFacility = cm.Option()
+logSysLogFacility.doc = 'Facility for SysLog - defaults to LOG_USER'
+logSysLogFacility.default = logging.handlers.SysLogHandler.LOG_USER
+
+logFilePathname = cm.Option()
+logFilePathname.doc = 'full pathname for the log file'
+logFilePathname.default = '/home/aking/modpython-collector-hdfs/collector.log'
+
+logFileMaximumSize = cm.Option()
+logFileMaximumSize.doc = 'maximum size in bytes of the log file'
+logFileMaximumSize.default = 1000000
+
+logFileMaximumBackupHistory = cm.Option()
+logFileMaximumBackupHistory.doc = 'maximum number of log files to keep'
+logFileMaximumBackupHistory.default = 50
+
+logFileLineFormatString = cm.Option()
+logFileLineFormatString.doc = 'python logging system format for log file entries'
+logFileLineFormatString.default = '%(asctime)s %(levelname)s -  PID:%(process)d %(message)s'
+
+logFileErrorLoggingLevel = cm.Option()
+logFileErrorLoggingLevel.doc = 'logging level for the log file (10 - DEBUG, 20 - INFO, 30 - WARNING, 40 - ERROR, 50 - CRITICAL)'
+logFileErrorLoggingLevel.default = 10
+
Index: scripts/config/monitorconfig.py.dist
===================================================================
--- scripts/config/monitorconfig.py.dist	(revision 1780)
+++ scripts/config/monitorconfig.py.dist	(working copy)
@@ -28,7 +28,7 @@
 
 saveFailedMinidumpsTo = cm.Option()
 saveFailedMinidumpsTo.doc = 'the location for saving dumps that failed processing (leave blank to delete them instead)'
-saveFailedMinidumpsTo.default = '/tmp/socorro-failed'
+saveSuccessfulMinidumpsTo.default = '/tmp/socorro-failed'
 
 dumpPermissions = cm.Option()
 dumpPermissions.doc = 'when saving dumps, the pemission flags to be used'
Index: docs/README.txt
===================================================================
--- docs/README.txt	(revision 0)
+++ docs/README.txt	(revision 0)
@@ -0,0 +1,15 @@
+Please see http://code.google.com/p/socorro/w/list for more information.
+
+Project source layout
+
+Socorro UI
+webapp-php - Kohana PHP codebase which powers crash-stats.mozilla.com
+
+Socorro
+scripts - crons, daemons, etc
+socorro - Python codebase for various Socorro sub-systems
+thirdparty - Thirdparty Python modules or machine generated code
+
+Developer Tools
+tools
+    
\ No newline at end of file
