#!/bin/bash "exec" "python" "$0" "$@" """ Run Athena Usage: $ source $SITEROOT/setup.sh $ source $T_DISTREL/AtlasRelease/*/cmt/setup.sh -tag_add=??? $ runAthena -l [libraries] -r [rundir] -j [jobOs] -i [inputs] -o [optputs] -c \ -p [pool_refs] -u [lrc_url] -l [libraries] : an archive which contains libraries -r [rundir] : relative path to the directory where Athena runs -j [jobOs] : job options passed to athena. format: 'options' -i [inputs] : list of input files. format: ['in1',...'inN'] -o [outputs] : map of output files. format: {'type':'name',..} type:'hist','ntuple','ESD','AOD','TAG','AANT','Stream1','THIST' -b : bytestream -c : event collection -p [pool_refs] : list of POOL refs -u [lrc_url] : URL of LRC -f [fragment] : jobO fragment -a [jobO files]: archive name of jobOs -m [minbias] : list of minimum bias files -n [cavern] : list of cavern files Example: runAthena \ -l libraries.tgz \ -r PhysicsAnalysis/AnalysisCommon/UserAnalysis/UserAnalysis-00-03-03/run \ -j "-c 'EvtMax=10' opt.py RecExCommon.py" \ -i ['input1.AOD.pool.root','input2.AOD.pool.root','input3.AOD.pool.root'] \ -o ['hist':'hist.root','ntuple':'ntuple.root','log':'athena.log'] Procedure: * expand libraries * make PoolFileCatalog.xml * create post-jobO which overwrites some parameters * get PDGTABLE.MeV * run athena """ # import optparse import re import os import sys import types import getopt import commands import urllib import urllib2 import xml.dom.minidom # error code EC_PoolCatalog = 20 EC_MissingArg = 30 EC_AthenaFail = 40 EC_NoInput = 141 try: release = os.environ["swRelease"] release = release.replace('Atlas-','') except: print "!!FAILED!!2999!!buildJob: Bad environment, no release defined" if os.environ.has_key("VO_ATLAS_SW_DIR"): print "VO_ATLAS_SW_DIR area $VO_ATLAS_SW_DIR:" print commands.getoutput("ls -alL $VO_ATLAS_SW_DIR") relbase = "%s/software" % os.environ["VO_ATLAS_SW_DIR"] siteroot = "%s/%s" % ( relbase, release ) setupdir = siteroot; sitesetup = "source %s/setup.sh;" % siteroot appdir = relbase elif os.environ.has_key("OSG_APP"): print "OSG_APP area $OSG_APP:" print commands.getoutput("ls -alL $OSG_APP") relbase = "%s/atlas_app/atlas_rel" % os.environ["OSG_APP"] siteroot = "%s/%s" % ( relbase, release ) sitesetup = "%s/setup.sh;" % siteroot setupdir = "%s/atlas_app/atlas_rel/%s/dist/%s/AtlasRelease/*/cmt" % (os.environ['APP'], release, release) sitesetup += 'source %s/setup.sh -tag_add=DC2;' % setupdir appdir = os.environ['APP'] elif os.environ.has_key("APP"): print "APP area $APP:" print commands.getoutput("ls -alL $APP") relbase = "%s/atlas_app/atlas_rel" % os.environ["APP"] siteroot = "%s/%s" % ( relbase, release ) sitesetup = "%s/setup.sh;" % siteroot setupdir = "%s/atlas_app/atlas_rel/%s/dist/%s/AtlasRelease/*/cmt" % (os.environ['APP'], release, release) sitesetup += 'source %s/setup.sh %s -tag_add=DC2;' % setupdir appdir = os.environ['APP'] else: print "!!FAILED!!2500!!pathena: Cannot locate ATLAS software" sys.exit(1) cmd1 = "source %s/atlas_app/atlas_rel/%s/setup.sh;source %s/atlas_app/atlas_rel/%s/cmtsite/setup.sh -tag=AtlasOffline,%s"% (appdir,release,appdir,release,release) cacheVer = re.search('AnalysisTransforms-(.+)',job.homePackage) if cacheVer != None: cmd2="export CMTPATH=%s/atlas_app/atlas_rel/%s/AtlasProduction/%s;source %s/atlas_app/atlas_rel/%s/AtlasOffline/%s/AtlasOfflineRunTime/cmt/setup.sh"% (jobSite.appdir,job.atlasRelease,cacheVer.group(1),jobSite.appdir,job.atlasRelease,job.atlasRelease) else: cmd2="source %s/atlas_app/atlas_rel/%s/AtlasOffline/%s/AtlasOfflineRunTime/cmt/setup.sh"% (jobSite.appdir,job.atlasRelease,job.atlasRelease) print "sitesetup: ",sitesetup print "setupdir listing: " print commands.getoutput("ls -alR "+setupdir); os.environ["SITEROOT"]=siteroot print "Release base directory %s:" % relbase print commands.getoutput("ls -alL %s"% relbase) print "SITEROOT directory %s:" % siteroot print commands.getoutput("ls -alL %s"% siteroot) if not os.path.exists(siteroot): print "!!FAILED!!2500!!pathena: Release not available" print "SITEROOT doesn't exist" sys.exit(1) print "APP=",os.environ["APP"] if os.environ.has_key('logFile'): logtgz = os.environ['logFile'] else: logtgz = '' # command-line parameters eventColl = False byteStream = False poolRefs = [] urlLRC = '' libraries = '' fragmentJobO = '' archiveJobO = '' minbiasFiles = [] cavernFiles = [] opts, args = getopt.getopt(sys.argv[1:], "l:r:j:i:o:bcp:u:f:a:m:n:", ["pilotpars"]) for o, a in opts: if o == "-l": libraries=a if o == "-r": runDir=a if o == "-j": jobO=urllib.unquote(a) if o == "-i": exec "inputFiles="+a if o == "-o": exec "outputFiles="+a if o == "-m": exec "minbiasFiles="+a if o == "-n": exec "cavernFiles="+a if o == "-b": byteStream = True if o == "-c": eventColl = True if o == "-p": exec "poolRefs="+a if o == "-u": urlLRC=a if o == "-f": fragmentJobO=a if o == "-a": archiveJobO=a # dump parameter try: print libraries print runDir print jobO print inputFiles print outputFiles print byteStream print eventColl print poolRefs print urlLRC print fragmentJobO print minbiasFiles print cavernFiles except: sys.exit(EC_MissingArg) # check input files curFiles = os.listdir('.') flagMinBias = False flagCavern = False if len(inputFiles) > 0: tmpFiles = tuple(inputFiles) for tmpF in tmpFiles: if not tmpF in curFiles: print "%s not exist" % tmpF inputFiles.remove(tmpF) if len(inputFiles) == 0: print "No input file is available" sys.exit(EC_NoInput) if len(minbiasFiles) > 0: flagMinBias = True tmpFiles = tuple(minbiasFiles) for tmpF in tmpFiles: if not tmpF in curFiles: print "%s not exist" % tmpF minbiasFiles.remove(tmpF) if len(minbiasFiles) == 0: print "No input file is available for Minimum-bias" sys.exit(EC_NoInput) if len(cavernFiles) > 0: flagCavern = True tmpFiles = tuple(cavernFiles) for tmpF in tmpFiles: if not tmpF in curFiles: print "%s not exist" % tmpF cavernFiles.remove(tmpF) if len(cavernFiles) == 0: print "No input file is available for Cavern" sys.exit(EC_NoInput) # correct length if flagMinBias: if len(inputFiles) > len(minbiasFiles): inputFiles = inputFiles[:len(minbiasFiles)] if flagCavern: if len(inputFiles) > len(cavernFiles): inputFiles = inputFiles[:len(cavernFiles)] if flagMinBias: if len(inputFiles) < len(minbiasFiles): minbiasFiles = minbiasFiles[:len(inputFiles)] if flagCavern: if len(inputFiles) < len(cavernFiles): cavernFiles = cavernFiles[:len(inputFiles)] print "=== New inputFiles ===" print inputFiles if flagMinBias: print "=== New minbiasFiles ===" print minbiasFiles if flagCavern: print "=== New cavernFiles ===" print cavernFiles # save current dir currentDir = os.getcwd() # crate work dir workDir = currentDir+"/workDir" commands.getoutput('rm -rf %s' % workDir) os.makedirs(workDir) os.chdir(workDir) # expand libraries if libraries == '': pass elif libraries.startswith('/'): out = commands.getoutput('tar xvfz %s' % libraries) else: out = commands.getoutput('tar xvfz %s/%s' % (currentDir,libraries)) # expand jobOs if needed if archiveJobO != "": print "--- wget for jobO ---" output = commands.getoutput('wget -h') wgetCommand = 'wget' for line in output.split('\n'): if re.search('--no-check-certificate',line) != None: wgetCommand = 'wget --no-check-certificate' break print commands.getoutput('%s https://gridui01.usatlas.bnl.gov:25443/cache/%s' % (wgetCommand,archiveJobO)) commands.getoutput('tar xvfz %s' % archiveJobO) # make rundir just in case commands.getoutput('mkdir -p %s' % runDir) # go to run dir os.chdir(runDir) # build pool catalog print "build pool catalog" commands.getoutput('rm -f PoolFileCatalog.xml') if eventColl: # ROOT ver collection or AANT if len(inputFiles)>0: # get extPoolRefs.C macroPoolRefs = 'extPoolRefs.C' # append workdir to CMTPATH env = 'CMTPATH=%s:$CMTPATH' % workDir com = 'export %s;' % env com += 'mkdir cmt; cd cmt;' com += 'echo "use AtlasPolicy AtlasPolicy-*" > requirements;' com += 'cmt config;' com += 'source setup.sh;' com += 'cd ..;' com += 'get_files -jo %s' % macroPoolRefs print commands.getoutput(com) # build ROOT command com = 'echo ' for fileName in inputFiles: com += '%s ' % fileName # form symlink to input file without attemptNr newFileName = re.sub('\.\d+$','',fileName) os.symlink('%s/%s' % (currentDir,fileName),newFileName) com += ' --- | root.exe -b %s' % macroPoolRefs print com status,output = commands.getstatusoutput(com) print output # get POOL refs for line in output.split('\n'): if line.startswith('PoolRef:') or line.startswith('ESD Ref:'): match = re.search('\[DB=([^\]]+)\]',line) if match != None: poolRefs.append(match.group(1)) # new poolRefs print "=== New poolRefs ===" print poolRefs if len(poolRefs)>0: # get PoolFileCatalog iGUID = 0 pfnMap = {} strGUIDs = '' url = urlLRC + '/lrc/PoolFileCatalog' for guid in poolRefs: iGUID += 1 # make argument strGUIDs += '%s ' % guid if iGUID % 40 == 0 or iGUID == len(poolRefs): # get PoolFileCatalog strGUIDs = strGUIDs.rstrip() data = {'guids':strGUIDs} # avoid too long argument strGUIDs = '' # GET url = '%s/lrc/PoolFileCatalog?%s' % (urlLRC,urllib.urlencode(data)) req = urllib2.Request(url) fd = urllib2.urlopen(req) out = fd.read() if out.startswith('Error'): continue if not out.startswith(' """ outFile.write(header) # write files item = \ """ """ for guid,pfn in pfnMap.iteritems(): outFile.write(item % (guid.upper(),pfn)) # write trailer trailer = \ """ """ outFile.write(trailer) outFile.close() else: # POOL or BS files for fileName in inputFiles+minbiasFiles+cavernFiles: # form symlink to input file os.symlink('%s/%s' % (currentDir,fileName),fileName) if not byteStream: # insert it to pool catalog com = 'pool_insertFileToCatalog %s' % fileName print com status,output = commands.getstatusoutput(com) print output if status: print "Failed : %d" % status sys.exit(EC_PoolCatalog) # create post-jobO file which overwrites some parameters postOpt = commands.getoutput('uuidgen') + '.py' oFile = open(postOpt,'w') if len(inputFiles) != 0: if re.search('theApp.EvtMax',fragmentJobO) == None: oFile.write('theApp.EvtMax = -1\n') if byteStream: # BS oFile.write('ByteStreamInputSvc = Service( "ByteStreamInputSvc" )\n') oFile.write('ByteStreamInputSvc.FullFileName = %s\n' % inputFiles) else: oFile.write('EventSelector = Service( "EventSelector" )\n') if eventColl: # TAG newInputs = [] for infile in inputFiles: # remove suffix for event collection newInputs.append(re.sub('\.root$','',infile)) oFile.write('EventSelector.InputCollections = %s\n' % newInputs) oFile.write('EventSelector.CollectionType = "ExplicitROOT"\n') else: # normal POOL oFile.write('EventSelector.InputCollections = %s\n' % inputFiles) if flagMinBias: oFile.write('minBiasEventSelector = Service( "minBiasEventSelector" )\n') oFile.write('minBiasEventSelector.InputCollections = %s\n' % minbiasFiles) if flagCavern: oFile.write('cavernEventSelector = Service( "cavernEventSelector" )\n') oFile.write('cavernEventSelector.InputCollections = %s\n' % cavernFiles) if outputFiles.has_key('hist'): oFile.write('HistogramPersistencySvc=Service("HistogramPersistencySvc")\n') oFile.write('HistogramPersistencySvc.OutputFile = "%s"\n' % outputFiles['hist']) if outputFiles.has_key('ntuple'): oFile.write('NTupleSvc = Service( "NTupleSvc" )\n') for sName,fName in outputFiles['ntuple']: firstFlag = True if firstFlag: firstFlag = False oFile.write('NTupleSvc.Output=["%s DATAFILE=\'%s\' OPT=\'NEW\'"]\n' % (sName,fName)) else: oFile.write('NTupleSvc.Output+=["%s DATAFILE=\'%s\' OPT=\'NEW\'"]\n' % (sName,fName)) oFile.write(""" _configs = [] try: from AthenaCommon.AlgSequence import AlgSequence tmpKeys = AlgSequence().allConfigurables.keys() for key in tmpKeys: if key.find('/') != -1: key = key.split('/')[-1] if hasattr(AlgSequence(),key): _configs.append(key) except: pass def _getConfig(key): from AthenaCommon.AlgSequence import AlgSequence return getattr(AlgSequence(),key) """) if outputFiles.has_key('ESD'): oFile.write(""" key = "StreamESD" if key in _configs: StreamESD = _getConfig( key ) else: StreamESD = Algorithm( key ) """) oFile.write('StreamESD.OutputFile = "%s"\n' % outputFiles['ESD']) if outputFiles.has_key('AOD'): oFile.write(""" key = "StreamAOD" if key in _configs: StreamAOD = _getConfig( key ) else: StreamAOD = Algorithm( key ) """) oFile.write('StreamAOD.OutputFile = "%s"\n' % outputFiles['AOD']) if outputFiles.has_key('TAG'): oFile.write(""" key = "StreamTAG" if key in _configs: StreamTAG = _getConfig( key ) else: StreamTAG = Algorithm( key ) """) oFile.write('StreamTAG.OutputCollection = "%s"\n' % re.sub('\.root$','',outputFiles['TAG'])) if outputFiles.has_key('AANT'): firstFlag = True oFile.write('THistSvc = Service ( "THistSvc" )\n') sNameList = [] for aName,sName,fName in outputFiles['AANT']: if not sName in sNameList: sNameList.append(sName) if firstFlag: firstFlag = False oFile.write('THistSvc.Output = ["%s DATAFILE=\'%s\' OPT=\'UPDATE\'"]\n' % (sName,fName)) else: oFile.write('THistSvc.Output += ["%s DATAFILE=\'%s\' OPT=\'UPDATE\'"]\n' % (sName,fName)) oFile.write(""" key = "%s" if key in _configs: AANTupleStream = _getConfig( key ) else: AANTupleStream = Algorithm( key ) """ % aName) oFile.write('AANTupleStream.StreamName = "%s"\n' % sName) oFile.write('AANTupleStream.OutputName = "%s"\n' % fName) if outputFiles.has_key('THIST'): oFile.write('THistSvc.Output += ["HIST DATAFILE=\'%s\' OPT=\'UPDATE\'"]\n' % outputFiles['THIST']) else: if outputFiles.has_key('THIST'): oFile.write('THistSvc = Service ( "THistSvc" )\n') oFile.write('THistSvc.Output = ["HIST DATAFILE=\'%s\' OPT=\'UPDATE\'"]\n' % outputFiles['THIST']) if outputFiles.has_key('Stream1'): oFile.write(""" key = "Stream1" if key in _configs: Stream1 = _getConfig( key ) else: Stream1 = Algorithm( key ) """) oFile.write('Stream1.OutputFile = "%s"\n' % outputFiles['Stream1']) if fragmentJobO != "": oFile.write('%s\n' % fragmentJobO) oFile.close() oFile = open(postOpt) lines = '' for line in oFile: lines += line print lines oFile.close() # get PDGTABLE.MeV print "Get PDGTABLE.MeV: ", commands.getoutput('get_files PDGTABLE.MeV') # append workdir to CMTPATH env = 'CMTPATH=%s:$CMTPATH' % workDir print env print "PATH: ", os.environ['PATH'] # construct command com = 'export %s;' % env com += 'mkdir cmt; cd cmt;' com += 'echo "use AtlasPolicy AtlasPolicy-*" > requirements;' com += 'cmt config;' com += 'source setup.sh;' com += 'cd ..; env;' com += 'athena.py -s %s %s' % (jobO,postOpt) print com # run athena status,out = commands.getstatusoutput(com) print out print commands.getoutput('ls -l') # copy results for file in outputFiles.values(): if type(file) != types.StringType: # for AANT for aaT in file: commands.getoutput('mv %s %s' % (aaT[-1],currentDir)) else: commands.getoutput('mv %s %s' % (file,currentDir)) # copy PoolFC.xml commands.getoutput('mv -f PoolFileCatalog.xml %s' % currentDir) # go back to current dir os.chdir(currentDir) print print commands.getoutput('pwd') print commands.getoutput('ls -l') # remove work dir commands.getoutput('rm -rf %s' % workDir) # return if status: print "execute script: Running athena failed : %d" % status sys.exit(EC_AthenaFail) else: print "execute script: Running athena was successful" sys.exit(0)