import os, sys, commands, getopt, time, random, string import atexit, signal import Site, pUtil, Job, Node, RunJobUtilities import Mover as mover from pUtil import debugInfo, tolog, isAnalJob, readpar, createLockFile from PilotErrors import PilotErrors from ProxyGuard import ProxyGuard from shutil import copy2 from config import config_sm sys.path.append('..') # For Torre's code CMD_CHECKSUM = config_sm.COMMAND_MD5 # global variables pilotserver = "localhost" # default server pilotport = 88888 # default port failureCode = None # set by signal handler when user/batch system kills the job pworkdir = "/tmp" # site work dir used by the parent multiTaskFlag = False # default is not to run in multiTask mode (parallel analysis and production) logguid = None # guid for the log file debugLevel = 0 # 0: debug info off, 1: display function name when called, 2: full debug info pilotlogfilename = "pilotlog.txt" # default pilotlog filename stageinTries = 1 # number of stage-in tries testLevel = 0 # test suite control variable (0: no test, 1: put error, 2: ...) pilot_initdir = "" # location of where the pilot is untarred and started proxycheckFlag = True # True (default): perform proxy validity checks, False: no check globalPilotErrorDiag = "" # global pilotErrorDiag used with signal handler (only) globalErrorCode = 0 # global error code used with signal handler (only) # python 2.2.3 gives keyerror if a http_proxy env. variable is not valued beforehand try: if os.environ["http_proxy"]: pass except KeyError,e: os.environ["http_proxy"] = "" try: if os.environ["https_proxy"]: pass except KeyError,e: os.environ["https_proxy"] = "" def usage(): """ usage: python runJob.py -s -d -a -l -w -p -q -g -o -m -i -b -k -x -v -t where: is the name of the site that this job is landed,like BNL_ATLAS_1 is the pathname to the work directory of this job on the site is the pathname to the directory of the executables is the path to where to pilot code is untarred and started is the URL of the pilot TCP server that the job should send updates to is the port on which the pilot server listens on is the URL of the https web server for the local site's DQ2 siteservice location of grid client software is the pathname to the work directory of the parent is set to true for primary (production) jobs in multitask mode, used to renice the production process guid for the log file 0: debug info off, 1: display function name when called, 2: full debug info name of log file number of tries for stage-ins (default is 1) 0: no test, 1: simulate put error, 2: ... True (default): perform proxy validity checks, False: no check """ print usage.__doc__ def argParser(argv): """ parse command line arguments for the main script """ global pilotserver, pilotport, uflag, pworkdir, multiTaskFlag, logguid,\ debugLevel, pilotlogfilename, stageinTries, testLevel, pilot_initdir, proxycheckFlag # some default values sitename = "testsite" workdir = "/tmp" pworkdir = "/tmp" appdir = "/usatlas/projects/OSG" dq2url = "" wnclientdir = "" try: opts, args = getopt.getopt(argv, 'a:b:d:g:i:k:l:m:o:p:q:s:t:v:w:x:') except getopt.GetoptError: tolog("!!FAILED!!3000!! Invalid arguments and options!") usage() os._exit(5) for o, a in opts: if o == "-s": sitename = str(a) elif o == "-d": workdir = str(a) elif o == "-a": appdir = str(a) elif o == "-l": pilot_initdir = a elif o == "-v": testLevel = a elif o == "-w": pilotserver = str(a) elif o == "-p": pilotport = int(a) elif o == "-q": dq2url = str(a) elif o == "-g": wnclientdir = str(a) elif o == "-o": pworkdir = str(a) elif o == "-i": logguid = str(a) elif o == "-k": pilotlogfilename = str(a) elif o == "-m": mtFlag = str(a) if mtFlag.upper() == "TRUE": multiTaskFlag = True else: multiTaskFlag = False elif o == "-t": pcFlag = str(a) if pcFlag.upper() == "TRUE": proxycheckFlag = True else: proxycheckFlag = False elif o == "-b": debugLevel = int(a) elif o == "-x": stageinTries = int(a) else: tolog("!!FAILED!!3000!! Unknown option: %s (ignoring)" % (o)) usage() if debugLevel > 0: debugInfo("Debug level set to %d in argParser()" % debugLevel) if debugLevel == 2: debugInfo("sitename: %s" % sitename) debugInfo("workdir: %s" % workdir) debugInfo("pworkdir: %s" % pworkdir) debugInfo("appdir: %s" % appdir) debugInfo("pilot_initdir: %s" % pilot_initdir) debugInfo("pilotserver: %s" % pilotserver) debugInfo("pilotport: %d" % pilotport) debugInfo("dq2url: %s" % dq2url) debugInfo("multiTaskFlag: %s" % str(multiTaskFlag)) debugInfo("wnclientdir: %s" % wnclientdir) debugInfo("logguid: %s" % logguid) return sitename, appdir, workdir, dq2url, wnclientdir, pworkdir def cleanup(job, rf=None): """ cleanup function 'rf' is a list that will contain the names of the files that could be transferred In case of transfer problems, all remaining files will be found and moved to the data directory for later recovery. """ if debugLevel > 0: debugInfo("cleanup() called") tolog("********************************************************") tolog(" This job ended with (trf,pilot) exit code of (%d,%d)" % (job.result[1], job.result[2])) tolog("********************************************************") # clean up the pilot wrapper modules pUtil.removePyModules(job.workdir) if os.path.isdir(job.workdir): os.chdir(job.workdir) # remove input files from the job workdir remFiles = job.inFiles for inf in remFiles: if inf and inf != 'NULL' and os.path.isfile("%s/%s" % (job.workdir, inf)): # non-empty string and not NULL try: os.remove("%s/%s" % (job.workdir, inf)) except Exception,e: tolog("!!WARNING!!3000!! Ignore this Exception when deleting file %s: %s" % (inf, str(e))) pass # only remove output files if status is not 'holding' # in which case the files should be saved for the job recovery. # the job itself must also have finished with a zero trf error code # (data will be moved to another directory to keep it out of the log file) # always copy the metadata-.xml to the site work dir # WARNING: this metadata file might contain info about files that were not successfully moved to the SE # it will be regenerated by the job recovery for the cases where there are output files in the datadir try: copy2("%s/metadata-%d.xml" % (job.workdir, job.jobId), "%s/metadata-%s.xml" % (pworkdir, repr(job.jobId))) except Exception,e: tolog("Warning: Could not copy metadata-%s.xml to site work dir - ddm Adder problems will occure in case of job recovery" %\ (repr(job.jobId))) # only copy athena metadata for non-build jobs if not pUtil.isBuildJob(job.outFiles): try: copy2("%s/metadata.xml.ATHENA" % (job.workdir), "%s/metadata-%s.xml.ATHENA" % (pworkdir, repr(job.jobId))) except Exception, e: if job.result[1] != 0: tolog("metadata was not created since the payload failed") else: tolog("Warning: Could not copy metadata-%s.xml.ATHENA to site work dir: %s" % (repr(job.jobId), str(e))) if job.result[0] == 'holding' and job.result[1] == 0: try: # create the data directory os.makedirs(job.datadir) except OSError, e: tolog("!!WARNING!!3000!! Could not create data directory: %s, %s" % (job.datadir, str(e))) else: # find all remaining files in case 'rf' is not empty remaining_files = [] moved_files_list = [] try: if rf != None: moved_files_list = RunJobUtilities.getFileNamesFromString(rf[1]) remaining_files = RunJobUtilities.getRemainingFiles(moved_files_list, job.outFiles) except Exception, e: tolog("!!WARNING!!3000!! Illegal return value from Mover: %s, %s" % (str(rf), str(e))) remaining_files = job.outFiles # move all remaining output files to the data directory nr_moved = 0 for file in remaining_files: try: os.system("mv %s %s" % (file, job.datadir)) except OSError, e: tolog("!!WARNING!!3000!! Failed to move file %s (abort all)" % file) break else: nr_moved += 1 tolog("Moved %d/%d output file(s) to: %s" % (nr_moved, len(remaining_files), job.datadir)) # remove all successfully copied files from the local directory nr_removed = 0 for file in moved_files_list: try: os.system("rm %s" % (file)) except OSError, e: tolog("!!WARNING!!3000!! Failed to remove output file: %s, %s" % (file, str(e))) else: nr_removed += 1 tolog("Removed %d output file(s) from local dir" % (nr_removed)) # copy the PoolFileCatalog.xml for non build jobs if not pUtil.isBuildJob(remaining_files): tolog("Copying PoolFileCatalog.xml to: %s" % (job.datadir)) try: copy2("%s/PoolFileCatalog.xml" % (job.workdir), job.datadir) except Exception, e: tolog("!!WARNING!!3000!! Could not copy PoolFileCatalog.xml to data dir - expect ddm Adder problems during job recovery") # remove all remaining output files from the work directory # (a successfully copied file should already have been removed by the Mover) rem = False for inf in job.outFiles: if inf and inf != 'NULL' and os.path.isfile("%s/%s" % (job.workdir, inf)): # non-empty string and not NULL try: os.remove("%s/%s" % (job.workdir, inf)) except Exception,e: tolog("!!WARNING!!3000!! Ignore this Exception when deleting file %s: %s" % (inf, str(e))) pass else: tolog("Lingering output file removed: %s" % (inf)) rem = True if not rem: tolog("All output files already removed from local dir") tolog("Payload cleanup has finished") def sysExit(job, rf=None): ''' wrapper around sys.exit rs is the return string from Mover::put containing a list of files that were not transfered ''' if debugLevel > 0: debugInfo("sysExit() called") cleanup(job, rf=rf) sys.stderr.close() tolog("runJob (payload wrapper) has finished") # change to sys.exit? os._exit(job.result[2]) # pilotExitCode, don't confuse this with the overall pilot exit code, # which doesn't get reported back to panda server anyway def failJob(transExitCode, pilotExitCode, job, pilotserver, pilotport, ins=None, pilotErrorDiag=None, docleanup=True): """ set the fail code and exit """ job.setState(["failed", transExitCode, pilotExitCode]) if pilotErrorDiag: job.pilotErrorDiag = pilotErrorDiag tolog("Will now update pilot server") rt = RunJobUtilities.updatePilotServer(job, pilotserver, pilotport, final=True) if ins: ec = pUtil.removeFiles(job.workdir, ins) if docleanup: sysExit(job) # main process starts here if __name__ == "__main__": # protect the runJob code with exception handling hP_ret = False try: # always use this filename as the new jobDef module name import newJobDef jobSite = Site.Site() jobSite.setSiteInfo(argParser(sys.argv[1:])) # reassign workdir for this job jobSite.workdir = jobSite.wntmpdir if pilotlogfilename != "": pUtil.setPilotlogFilename(pilotlogfilename) # set node info node = Node.Node() node.setNodeName(os.uname()[1]) node.collectWNInfo(jobSite.workdir) # redirect stder sys.stderr = open("%s/runjob.stderr" % (jobSite.workdir), "w") tolog("Current job workdir is: %s" % os.getcwd()) tolog("Site workdir is: %s" % jobSite.workdir) region = readpar('region') try: job = Job.Job() job.setJobDef(newJobDef.job) job.workdir = jobSite.workdir # figure out and set payload file names job.setPayloadName(pUtil.getPayloadName(job)) except Exception, e: pilotErrorDiag = "Failed to process job info: %s" % str(e) tolog("!!WARNING!!3000!! %s" % (pilotErrorDiag)) failJob(0, 1220, job, pilotserver, pilotport, pilotErrorDiag=pilotErrorDiag) # prepare for the output file data directory # (will only created for jobs that end up in a 'holding' state) job.datadir = pworkdir + "/PandaJob_%d_data" % (job.jobId) #job.displayJob() atexit.register(cleanup, job) # to trigger an exception so that the SIGTERM signal can trigger cleanup function to run # because by default signal terminates process without cleanup. def sig2exc(sig, frm): global failureCode, globalPilotErrorDiag, globalErrorCode globalPilotErrorDiag = "!!FAILED!!3000!! SIGTERM Signal %s is caught in child pid=%d!\n" % (sig, os.getpid()) tolog(globalPilotErrorDiag) if sig == signal.SIGTERM: globalErrorCode = 1201 elif sig == signal.SIGQUIT: globalErrorCode = 1202 elif sig == signal.SIGSEGV: globalErrorCode = 1203 elif sig == signal.SIGXCPU: globalErrorCode = 1204 else: globalErrorCode = 1200 failureCode = globalErrorCode # print to stderr print >> sys.stderr, globalPilotErrorDiag raise SystemError(sig) signal.signal(signal.SIGTERM, sig2exc) signal.signal(signal.SIGQUIT, sig2exc) signal.signal(signal.SIGSEGV, sig2exc) signal.signal(signal.SIGXCPU, sig2exc) # see if it's an analysis job or not analJob = isAnalJob(job.trf.split(",")[0], prodSourceLabel=job.prodSourceLabel) if analJob: tolog("User analysis job") else: tolog("Production job") # renice the production process in case of multitasking if multiTaskFlag and not analJob: os.nice(10) # split up the job parameters to be able to loop over the tasks jobParameterList = job.jobPars.split("\n") jobHomePackageList = job.homePackage.split("\n") jobTrfList = job.trf.split("\n") if region == 'Nordugrid': jobAtlasRelease = os.environ['ATLAS_RELEASE'].split(",") else: jobAtlasRelease = job.atlasRelease.split("\n") # verify that the multi-trf job is setup properly ec, pilotErrorDiag, jobAtlasRelease = RunJobUtilities.verifyMultiTrf(jobParameterList, jobHomePackageList, jobTrfList, jobAtlasRelease) if ec > 0: job.pilotErrorDiag = pilotErrorDiag failJob(0, ec, job, pilotserver, pilotport, pilotErrorDiag=pilotErrorDiag) tolog("Number of transformations to process: %s" % len(jobParameterList)) if len(jobParameterList) > 1: multi_trf = True else: multi_trf = False # setup starts here ................................................................................ os.chdir(jobSite.workdir) tolog("Current job workdir is %s" % os.getcwd()) # setup the trf(s) runCommandList = [] _i = 0 _n = len(jobParameterList) _stdout = job.stdout _stderr = job.stderr for (_jobPars, _homepackage, _trf, _swRelease) in map(None, jobParameterList, jobHomePackageList, jobTrfList, jobAtlasRelease): tolog("Preparing setup #%d/%d" % (_i + 1, _n)) # reset variables job.jobPars = _jobPars job.homePackage = _homepackage job.trf = _trf job.atlasRelease = _swRelease if multi_trf: job.stdout = _stdout.replace(".txt", "_%d.txt" % (_i + 1)) job.stderr = _stderr.replace(".txt", "_%d.txt" % (_i + 1)) # setup the trf ec, pilotErrorDiag, cmd, special_setup_cmd = RunJobUtilities.setupTrf(job, jobSite) if ec > 0: tolog("!!FAILED!!3000!! Setup failed") job.pilotErrorDiag = pilotErrorDiag failJob(0, ec, job, pilotserver, pilotport, pilotErrorDiag=pilotErrorDiag) # add the setup command to the command list runCommandList.append(cmd) _i += 1 job.stdout = _stdout job.stderr = _stderr tolog("Setup has finished successfully") # (setup ends here) ................................................................................ tolog("Setting stage-in state until all input files have been copied") job.setState(["stagein", 0, 0]) # send the special setup string back to the pilot (needed for the log transfer on xrdcp systems) rt = RunJobUtilities.updatePilotServer(job, pilotserver, pilotport, spsetup=special_setup_cmd) # stage-in ................................................................................ # prepare the input files (remove non-valid names) if there are any ins = RunJobUtilities.prepareInFiles(job.inFiles) # get error handler error = PilotErrors() # move input files from local DDM area to workdir if needed useCT = True if ins: tolog("Preparing for get command") tolog("ins = %s" % str(ins)) # remove all input root files for analysis job for xrootd sites # (they will be read by pAthena directly from xrootd) if analJob: # create the direct access dictionary dInfo = pUtil.getDirectAccessDic(readpar('copysetupin')) # if copysetupin did not contain direct access info, try the copysetup instead if not dInfo: dInfo = pUtil.getDirectAccessDic(readpar('copysetup')) # check if we should use the copytool if dInfo: if not dInfo['useCopyTool']: useCT = False if useCT: tolog("Copy tool will be used for stage-in") else: tolog("Direct access mode: Copy tool will not be used for stage-in of root files except to distinguish root files from non-root files") # transfer input files pilotErrorDiag = "" ec = 0 tin_0 = os.times() ec, pilotErrorDiag = RunJobUtilities.get_data(job, jobSite, ins, stageinTries, analJob=analJob, usect=useCT,\ pinitdir=pilot_initdir, proxycheck=proxycheckFlag, spsetup=special_setup_cmd) tin_1 = os.times() job.timeStageIn = int(round(tin_1[4] - tin_0[4])) if ec: tolog("Failing job with ec: %d" % (ec)) failJob(0, ec, job, pilotserver, pilotport, ins=ins, pilotErrorDiag=pilotErrorDiag) # (stage-in ends here) .................................................................... # change to running state since all input files have been staged tolog("Changing to running state since all input files have been staged") job.setState(["running", 0, 0]) rt = RunJobUtilities.updatePilotServer(job, pilotserver, pilotport) tolog("http_proxy: %s" % (os.environ["http_proxy"])) tolog("https_proxy: %s" % (os.environ["https_proxy"])) # unset https_proxy for BNL site, because wget command w/o noproxy option # doesn't work for BNL, but without a https_proxy works (since to connect # to a https server listening on non-standard port doesn't require proxy) if jobSite.sitename.upper().find("BNL") != -1 and os.environ["https_proxy"] != "": os.environ["save_https_proxy"] = os.environ["https_proxy"] os.environ["https_proxy"] = "" # do not hide the proxy for PandaMover since it needs it or for sites that has sc.proxy = donothide if 'DDM' not in jobSite.sitename and readpar('proxy') != 'donothide': # create the proxy guard object (must be created here before the sig2exc()) proxyguard = ProxyGuard() # hide the proxy hP_ret = proxyguard.hideProxy() if not hP_ret: tolog("Warning: Proxy exposed to payload") # run the athena process, which could take days to finish t0 = os.times() res = (0, 'Undefined') # loop over all run commands (only >1 for multi-trfs) _i = 0 getstatusoutput_was_interrupted = False for cmd in runCommandList: _i += 1 # this should not be necessary since athena should skip these files: # correct jobPars for analysis jobs if input files have been skipped #fname = "%s/skipped.xml" % (job.workdir) #if os.path.exists(fname): # tolog("Setup found a skipped.xml file, will now correct jobPars") # cmd = RunJobUtilities.removeSkippedFromJobPars(fname, cmd) try: tolog("Executing job command #%d/%d: %s (length = %d)" % (_i, _n, cmd, len(cmd))) res = commands.getstatusoutput(cmd) except Exception, e: tolog("!!FAILED!!3000!! Failed to run command %s" % str(e)) getstatusoutput_was_interrupted = True if failureCode: job.result[2] = failureCode tolog("!!FAILED!!3000!! Failure code: %d" % (failureCode)) break else: if res[0] == 0: tolog("Job command #%d/%d finished" % (_i, _n)) # post processing # if _i == 1 and _n > 1: # tolog("Post-processing of job #1") # # was any output files created? if not, fail the job # _found = False # _size = 0 # for file in os.listdir(jobSite.workdir): # if file.find("RDO") >= 0: # _fname = os.path.join(jobSite.workdir, file) # # is this actually a root file? # if mover.isRootFile(_fname): # # verify that the file has a size larger than zero # _size = os.path.getsize(_fname) # if _size > 0: # _found = True # break # if not _found: # pilotErrorDiag = "Multi-trf error: Trf digi script did not produce the RDO file, can not proceed with the reco trf" # tolog("!!FAILED!!2999!! %s" % (pilotErrorDiag)) # failJob(transExitCode, 1165, job, pilotserver, pilotport, pilotErrorDiag=pilotErrorDiag) # else: # tolog("Confirmed that the trf digi script produced the RDO file: %s, size: %d" % (file, _size)) # tolog("Will now proceed with the reco trf") else: tolog("Job command #%d/%d failed: res = %s" % (_i, _n, str(res))) break t1 = os.times() t = map(lambda x, y:x-y, t1, t0) # get the time consumed job.cpuConsumptionUnit, job.cpuConsumptionTime, job.cpuConversionFactor = pUtil.setTimeConsumed(node,t) tolog("Job CPU usage: %s %s" % (job.cpuConsumptionTime, job.cpuConsumptionUnit)) tolog("Job CPU conversion factor: %1.10f" % job.cpuConversionFactor) job.timeExe = int(round(t1[4] - t0[4])) # restore the proxy if hP_ret: rP_ret = proxyguard.restoreProxy() if not rP_ret: tolog("Warning: Problems with storage can occur since proxy could not be restored") else: hP_ret = False tolog("ProxyGuard has finished successfully") if jobSite.sitename.upper().find("BNL") != -1 and os.environ["save_https_proxy"] != "": os.environ["https_proxy"] = os.environ["save_https_proxy"] rc = res[0]%255 transExitCode = rc tolog("Original exit code: %d" % (res[0])) tolog("Exit code: %d" % (rc)) # dump an extract of the payload output if multi_trf: _stdout = job.stdout _stderr = job.stderr _stdout = _stdout.replace(".txt", "_N.txt") _stderr = _stderr.replace(".txt", "_N.txt") tolog("NOTE: For %s output, see files %s, %s (N = [1, %d])" % (job.payload, _stdout, _stderr, _n)) else: tolog("NOTE: For %s output, see files %s, %s" % (job.payload, job.stdout, job.stderr)) # filename = "%s/%s" % (job.workdir, athena_stdout) # RunJobUtilities.dumpOutput(filename) failed = False out_of_memory = False for i in range(_n): _stderr = job.stderr if multi_trf: _stderr = _stderr.replace(".txt", "_%d.txt" % (i + 1)) filename = os.path.join(job.workdir, _stderr) tolog("Processing stderr file: %s" % (filename)) if os.path.exists(filename): if os.path.getsize(filename) > 0: tolog("!!WARNING!!3000!! %s produced stderr, will dump to log" % (job.payload)) stderr_output = RunJobUtilities.dumpOutput(filename) if stderr_output.find("MemoryRescueSvc") >= 0 and \ stderr_output.find("FATAL out of memory: taking the application down") > 0: out_of_memory = True failed = True else: tolog("Warning: File %s does not exist" % (filename)) # if payload leaves the input files, delete them explicitly if ins: ec = pUtil.removeFiles(job.workdir, ins) # a killed job can have empty output but still rc == 0 no_athena_output = False installation_error = False if getstatusoutput_was_interrupted: _stdout = job.stdout if multi_trf: _stdout = _stdout.replace(".txt", "_%d.txt" % (_i)) filename = os.path.join(job.workdir, _stdout) if os.path.exists(filename): if os.path.getsize(filename) > 0: tolog("Athena produced stdout but was interrupted (getstatusoutput threw an exception)") else: no_athena_output = True failed = True else: failed = True no_athena_output = True elif len(res[1]) < 20: # protect the following comparison against massive outputs if res[1] == 'Undefined': failed = True no_athena_output = True elif failureCode: failed = True else: # check for installation error res_tmp = res[1][:1024] if res_tmp[0:3] == "sh:" and res_tmp.find('setup.sh') and res_tmp.find('No such file or directory'): failed = True installation_error = True # handle non-zero failed job return code but do not set pilot error codes to all payload errors if rc or failed: if failureCode: pilotErrorDiag = "Payload failed: Interrupt failure code: %d" % (failureCode) # (do not set pilot error code) elif getstatusoutput_was_interrupted: raise Exception, "Job execution was interrupted (see stderr)" elif out_of_memory: pilotErrorDiag = "Athena ran out of memory" job.result[2] = 1212 elif no_athena_output: pilotErrorDiag = "Payload failed: No athena output" job.result[2] = 1210 elif installation_error: pilotErrorDiag = "Payload failed: Missing installation" job.result[2] = 1211 elif rc: # Handle PandaMover errors if rc == 176: pilotErrorDiag = "PandaMover staging error: File is not cached" job.result[2] = 1173 elif rc == 86: pilotErrorDiag = "PandaMover transfer failure" job.result[2] = 1174 else: # check for specific errors in athena stdout filename = "%s/%s" % (job.workdir, job.stdout) if os.path.exists(filename): e1 = "prepare 5 database is locked" e2 = "Error SQLiteStatement" _out = commands.getoutput('grep "%s" %s | grep "%s"' % (e1, filename, e2)) if 'sqlite' in _out: pilotErrorDiag = "NFS/SQLite locking problems: %s" % (_out) job.result[2] = 1115 else: pilotErrorDiag = "Job failed: Non-zero failed job return code: %d" % (rc) # (do not set a pilot error code) else: pilotErrorDiag = "Job failed: Non-zero failed job return code: %d (%s does not exist)" % (rc, job.stdout) # (do not set a pilot error code) else: pilotErrorDiag = "Payload failed due to unknown reason (check payload stdout)" job.result[2] = 1220 tolog("!!FAILED!!3000!! %s" % (pilotErrorDiag)) failJob(rc, job.result[2], job, pilotserver, pilotport, pilotErrorDiag=pilotErrorDiag) # stage-out ............................................................................... # verify and prepare and the output files for transfer ec, pilotErrorDiag, outs, modt = RunJobUtilities.prepareOutFiles(job.outFiles, job.workdir) if ec: # missing output file (only error code from prepareOutFiles) failJob(transExitCode, 1165, job, pilotserver, pilotport, pilotErrorDiag=pilotErrorDiag) # create a dictionary of the output files with matched modification times (needed to create the NG OutputFiles.xml) outsDICT = dict(zip(outs, modt)) # add the log file with a fictious date since it has not been created yet outsDICT[job.logFile] = '' tolog("outsDICT: %s" % str(outsDICT)) # Now it's a good time to get/assign guids to the output files, because some jobs do # create a XML file that contains the guids of the output files if outs: # non-empty list of output files if not pUtil.isBuildJob(outs): ec, pilotErrorDiag, job.outFilesGuids = RunJobUtilities.getOutFilesGuids(job.outFiles, job.workdir) if ec: # missing PoolFileCatalog (only error code from getOutFilesGuids) failJob(transExitCode, 1184, job, pilotserver, pilotport, pilotErrorDiag=pilotErrorDiag) else: tolog("Build job - do not use PoolFileCatalog to get guid (generated)") if debugLevel == 2: debugInfo("outs = %s" % str(outs)) debugInfo("outFiles = %s" % str(job.outFiles)) debugInfo("outFilesGuids = %s" % str(job.outFilesGuids)) else: tolog("This job has no output files") if job.destinationDblock and job.destinationDblock[0] != 'NULL' and job.destinationDblock[0] != ' ': dsname = job.destinationDblock[0] else: dsname = "%s-%s-%s" % (time.localtime()[0:3]) # pass it a random name # create xml string to pass to dispatcher for atlas jobs if outs or (job.logFile and job.logFile != ''): # we re-create the metadata.xml file, putting guids of ALL output files into it. # output files that miss guids from the job itself will get guids in PFCxml function # only copy athena metadata for non-build jobs if not pUtil.isBuildJob(outs): oldMDName = "%s/metadata.xml" % (job.workdir) newMDName = "%s/metadata.xml.ATHENA" % (job.workdir) try: os.rename(oldMDName, newMDName) except: tolog("Warning: Could not open the original %s file, but harmless, pass it" % (oldMDName)) pass else: tolog("Renamed %s to %s" % (oldMDName, newMDName)) if logguid: guid = logguid else: guid = job.tarFileGuid # get the file sizes and checksums for the local output files # WARNING: temporary redundancy. fsize and checksum is checked again in mover code, merge later _outs = [] _outs.append(job.logFile) for f in outs: _outs.append(f) # which checksum command should be used? query the site mover from SiteMoverFarm import getSiteMover sitemover = getSiteMover(readpar('copytool'), "") checksum_cmd = sitemover.getChecksumCommand() tolog("Got checksum command: %s" % (checksum_cmd)) # WARNING: any errors are lost if occur in getFileInfo() fsize, checksum = pUtil.getFileInfo(_outs, checksum_cmd, skiplog=True) # create preliminary metadata (no metadata yet about log file - added later in pilot.py) _fname = "%s/metadata-%d.xml" % (job.workdir, job.jobId) pUtil.PFCxml(_fname, outs, fguids=job.outFilesGuids, fntag="lfn",\ alog=job.logFile, alogguid=guid, fsize=fsize, checksum=checksum) tolog("..............................................................................................................") tolog("Created %s with:" % (_fname)) tolog(".. log : %s" % (job.logFile)) tolog(".. log guid : %s" % str(guid)) tolog(".. out files : %s" % str(outs)) tolog(".. out file guids : %s" % str(job.outFilesGuids)) tolog(".. fsize : %s" % str(fsize)) tolog(".. checksum : %s" % checksum) tolog("..............................................................................................................") # convert the preliminary metadata.xml file to OutputFiles.xml for NG if region == 'Nordugrid': if RunJobUtilities.convertMetadata4NG("%s/OutputFiles.xml" % (job.workdir), _fname, outsDICT, dsname): tolog("Metadata has been converted to NG format") else: tolog("!!WARNING!!1999!! Could not convert metadata to NG format") # move output files from workdir to local DDM area if outs: tolog("Calling put command") # we create the .xml file for mover to use pfnFile = "OutPutFileCatalog.xml" pUtil.PFCxml(pfnFile, outs, fguids=job.outFilesGuids, fntag="pfn") # generate the xml for output files tolog("Using the newly-generated %s/%s for put operation" % (job.workdir, pfnFile)) tin_0 = os.times() status = True # will be set to false if put fails rs = '' # return string from put_data with filename in case of transfer error tolog("Setting stage-out state until all output files have been copied") job.setState(["stageout", 0, 0]) rt = RunJobUtilities.updatePilotServer(job, pilotserver, pilotport) latereg = False rf = None try: rc, job.pilotErrorDiag, rf, rs = mover.mover_put_data("xmlcatalog_file:%s" % (pfnFile),\ dsname, jobSite.sitename, ub=jobSite.dq2url,\ analJob=analJob, testLevel=testLevel, pinitdir=pilot_initdir,\ proxycheck=proxycheckFlag, spsetup=special_setup_cmd,\ token=job.destinationDBlockToken) tin_1 = os.times() job.timeStageOut = int(round(tin_1[4] - tin_0[4])) except SystemError,e: tin_1 = os.times() job.timeStageOut = int(round(tin_1[4] - tin_0[4])) job.pilotErrorDiag = "Put function is interrupted by SystemError: %s" % str(e) tolog("!!WARNING!!3000!! %s" % (job.pilotErrorDiag)) status = False job.setState(["holding", transExitCode, 1200]) # update job status rt = RunJobUtilities.updatePilotServer(job, pilotserver, pilotport, final=True) sysExit(job, rf) except Exception, e: tin_1 = os.times() job.timeStageOut = int(round(tin_1[4] - tin_0[4])) import traceback if 'format_exc' in traceback.__all__: trace = traceback.format_exc() pilotErrorDiag = "Put function can not be called for staging out: %s, %s" % (str(e), trace) else: tolog("traceback.format_exc() not available in this python version") pilotErrorDiag = "Put function can not be called for staging out: %s" % (str(e)) tolog("!!WARNING!!3000!! %s" % (pilotErrorDiag)) status = False job.setState(["holding", transExitCode, 1131]) # update job status rt = RunJobUtilities.updatePilotServer(job, pilotserver, pilotport, final=True) sysExit(job, rf) else: if job.pilotErrorDiag != "": job.pilotErrorDiag = "Put error: " + job.pilotErrorDiag tolog("Put function returned code: %d" % (rc)) if rc == error.ERR_MOVEFAILED or rc == error.ERR_FAILEDCP: # remove any trailing "\r" or "\n" (there can be two of them) if rs != None: rs = rs.rstrip() tolog("Error string: %s" % (rs)) tolog("!!WARNING!!3000!! Put error: Error in copying the file from job workdir to local SE") job.setState(["holding", transExitCode, 1137]) elif rc == error.ERR_FAILEDRM: tolog("!!FAILED!!3000!! Put error: Failed to remove readOnly file in dCache") job.setState(["failed", transExitCode, 1148]) elif rc == error.ERR_LFCADDCSUMFAILED: tolog("!!FAILED!!3000!! Put error: Failed to add file size and checksum to LFC") job.setState(["failed", transExitCode, 1105]) elif rc == error.ERR_FAILEDLFCREG: tolog("!!FAILED!!3000!! Put error: LFC registration failed") job.setState(["failed", transExitCode, 1169]) elif rc == error.ERR_FAILEDLCGREG: tolog("!!FAILED!!3000!! Put error: LCG registration failed") job.setState(["failed", transExitCode, 1108]) elif rc == error.ERR_MKDIR: tolog("!!WARNING!!3000!! Put error: Error in mkdir on localSE, not allowed or no available space") job.setState(["holding", transExitCode, 1134]) elif rc == error.ERR_NOSTMATCHDEST: tolog("!!WARNING!!3000!! Put error: Space token descriptor does not match destination path") job.setState(["holding", transExitCode, 1120]) elif rc == error.ERR_TIMEOUT: tolog("!!WARNING!!3000!! Put error: Command timed out") job.setState(["holding", transExitCode, 1152]) elif rc == error.ERR_NOSTORAGE: tolog("!!WARNING!!3000!! Put error: Fetching default storage URL failed") job.setState(["holding", transExitCode, 1133]) elif rc == error.ERR_FAILEDSIZELOCAL: tolog("!!WARNING!!3000!! Put error: Could not get file size in job workdir") job.setState(["holding", transExitCode, 1135]) elif rc == error.ERR_FAILEDMD5LOCAL: tolog("!!WARNING!!3000!! Put error: Error running md5sum to the file in job workdir") job.setState(["holding", transExitCode, 1136]) elif rc == error.ERR_FAILEDSIZE: tolog("!!WARNING!!3000!! Put error: Could not get the file size on local SE") job.setState(["holding", transExitCode, 1138]) elif rc == error.ERR_WRONGSIZE: tolog("!!FAILED!!3000!! Put error: Problem with copying from job workdir to local SE: size mismatch") job.setState(["failed", transExitCode, 1139]) elif rc == error.ERR_FAILEDMD5: tolog("!!WARNING!!3000!! Error running md5sum to the file on local SE") job.setState(["holding", transExitCode, 1140]) elif rc == error.ERR_WRONGMD5: tolog("!!WARNING!!3000!! Put error: Problem with copying from job workdir to local SE: md5sum mismatch") job.setState(["holding", transExitCode, 1141]) elif rc == error.ERR_WRONGAD: tolog("!!WARNING!!3000!! Put error: Problem with copying from job workdir to local SE: adler32 mismatch") job.setState(["holding", transExitCode, 1172]) elif rc == error.ERR_NOPROXY: tolog("!!WARNING!!3000!! Put error: No valid grid proxy") job.setState(["holding", transExitCode, 1163]) elif rc == error.ERR_NOVOMSPROXY: tolog("!!WARNING!!3000!! Put error: No valid voms proxy") job.setState(["holding", transExitCode, 1177]) elif rc == error.ERR_MISSFILE: tolog("!!FAILED!!3000!! Put error: Local output file missing") job.setState(["failed", transExitCode, 1165]) elif rc == error.ERR_SIGPIPE: tolog("!!FAILED!!3000!! Put error: File copy broken by SIGPIPE") job.setState(["failed", transExitCode, 1166]) elif rc == error.ERR_FAILEDADLOCAL: tolog("!!WARNING!!3000!! Put error: Error running adler32 on the file in job workdir") job.setState(["holding", transExitCode, 1185]) elif rc == error.ERR_PUTGLOBUSSYSERR: tolog("!!WARNING!!3000!! Put error: Globus system error") job.setState(["holding", transExitCode, 1181]) elif rc == error.ERR_LFCIMPORT: tolog("!!WARNING!!3000!! Put error: Could not import LFC python module") job.setState(["holding", transExitCode, 1114]) elif rc != 0: # remove any trailing "\r" or "\n" (there can be two of them) if rs != None: rs = rs.rstrip() tolog("Error string: %s" % (rs)) if rs == "Error: string Limit exceeded 250": tolog("!!FAILED!!3000!! Put error: file name string limit exceeded 250") job.setState(["failed", transExitCode, 1160]) else: tolog("!!WARNING!!3000!! Put error: Error in copying the file from job workdir to local SE, rc = %d, rs = %s" % (rc, rs)) job.setState(["holding", transExitCode, 1137]) else: # set preliminary finished (may be overwritten below in the LRC registration) job.setState(["finished", transExitCode, 0]) # create a weak lockfile meaning that file transfer worked # (useful for job recovery if activated) # in the job workdir createLockFile(True, jobSite.workdir, lockfile="ALLFILESTRANSFERRED") # file transfer worked, now register the output files in the LRC ub = jobSite.dq2url lfchost = readpar('lfchost') if ub != "None" and ub != None and ub != "" and lfchost == "": # ub is 'None' outside the US rc, ret = pUtil.registerFiles(rf, ub=ub) if ret == None: ret = "(None)" if rc == 0: tolog("LRC registration succeeded") # everything has been done, finish the job job.setState(["finished", transExitCode, 0]) elif rc == error.ERR_DDMREG: # remove any trailing "\r" or "\n" (there can be two of them) ret = ret.rstrip() if ret.find("string Limit exceeded 250") >= 0: job.pilotErrorDiag = "LRC registration error: file name string limit exceeded 250: %s" % (ret) tolog("!!FAILED!!3000!! %s" % (job.pilotErrorDiag)) job.setState(["failed", transExitCode, 1160]) elif ret.find("Connection refused") >= 0: job.pilotErrorDiag = "LRC registration error: %s" % (ret) tolog("!!WARNING!!3000!! %s" % (job.pilotErrorDiag)) # postpone LRC registration if site supports job recovery job.setState(["holding", transExitCode, 1101]) latereg = True else: job.pilotErrorDiag = "LRC registration error: %s" % (ret) tolog("!!WARNING!!3000!! %s" % (job.pilotErrorDiag)) # postpone LRC registration if site supports job recovery job.setState(["holding", transExitCode, 1132]) latereg = True elif rc == error.ERR_DDMREGDUP: job.pilotErrorDiag = "LRC registration error: Non-unique LFN: %s" % (ret) tolog("!!FAILED!!3000!! %s" % (job.pilotErrorDiag)) job.setState(["failed", transExitCode, 1162]) elif rc == error.ERR_GUIDSEXISTSINLRC: job.pilotErrorDiag = "LRC registration error: Guid-metadata entry already exists: %s" % (ret) tolog("!!FAILED!!3000!! %s" % (job.pilotErrorDiag)) job.setState(["failed", transExitCode, 1183]) else: job.pilotErrorDiag = "LRC registration failed: %d, %s" % (rc, ret) tolog("!!WARNING!!3000!! %s" % (job.pilotErrorDiag)) # postpone LRC registration if site supports job recovery job.setState(["holding", transExitCode, 1132]) latereg = True else: if lfchost != "": tolog("No LRC file registration since lfchost is set") else: tolog("No LRC file registration since dq2url is not set") if job.result[0] == "holding" and '(unrecoverable)' in job.pilotErrorDiag: job.result[0] = "failed" tolog("!!WARNING!!2999!! HOLDING state changed to FAILED since error is unrecoverable") if job.result[0] == "finished": rt = RunJobUtilities.updatePilotServer(job, pilotserver, pilotport, final=True) else: rt = RunJobUtilities.updatePilotServer(job, pilotserver, pilotport, final=True, latereg=latereg, rf=rf) sysExit(job, rf) job.setState(["finished", 0, 0]) rt = RunJobUtilities.updatePilotServer(job, pilotserver, pilotport, final=True) sysExit(job) except Exception, errorMsg: if globalPilotErrorDiag != "": pilotErrorDiag = "Exception caught in runJob: %s" % (globalPilotErrorDiag) else: pilotErrorDiag = "Exception caught in runJob: %s" % str(errorMsg) import traceback if 'format_exc' in traceback.__all__: pilotErrorDiag += ", " + traceback.format_exc() try: tolog("!!FAILED!!3001!! %s" % (pilotErrorDiag)) except Exception, e: if len(pilotErrorDiag) > 10000: pilotErrorDiag = pilotErrorDiag[:10000] tolog("!!FAILED!!3001!! Truncated (%s): %s" % (str(e), pilotErrorDiag)) else: pilotErrorDiag = "Exception caught in runJob: %s" % str(e) tolog("!!FAILED!!3001!! %s" % (pilotErrorDiag)) # restore the proxy if necessary if hP_ret: rP_ret = proxyguard.restoreProxy() if not rP_ret: tolog("Warning: Problems with storage can occur since proxy could not be restored") else: hP_ret = False tolog("ProxyGuard has finished successfully") import newJobDef job = Job.Job() job.setJobDef(newJobDef.job) job.pilotErrorDiag = pilotErrorDiag job.result[0] = "failed" if globalErrorCode != 0: job.result[2] = globalErrorCode else: job.result[2] = 1111 tolog("Failing job with error code: %d" % (job.result[2])) # fail the job without calling sysExit/cleanup (will be called anyway) failJob(0, job.result[2], job, pilotserver, pilotport, pilotErrorDiag=pilotErrorDiag, docleanup=False) # end of runJob