import sys, os, signal, time, shutil, cgi import commands, getopt, re import urllib2, urllib, socket from xml.dom import minidom from xml.dom.minidom import Document from xml.dom.minidom import parse, parseString try: import datetime except: pass from PilotErrors import PilotErrors from config import config_sm CMD_CHECKSUM = config_sm.COMMAND_MD5 ########### dispatcher status codes SC_Success = 0 SC_TimeOut = 10 SC_NoJobs = 20 SC_Failed = 30 SC_NonSecure = 40 ########### exit code EC_Failed = 255 # all files that need to be copied to the workdir fileList = commands.getoutput('ls *.py').split() # schedconfig DB web servers #chttpurl = ['http://panda.cern.ch'] # add more servers later? chttpurl = ['http://voatlas19.cern.ch', 'http://voatlas20.cern.ch', 'http://voatlas21.cern.ch'] #chttpurl = ['http://pandamon.usatlas.bnl.gov',\ # 'http://gridui05.usatlas.bnl.gov','http://gridui06.usatlas.bnl.gov','http://gridui07.usatlas.bnl.gov'] # default pilot log files pilotlogFilename = "pilotlog.out" pilotstderrFilename = "pilot.stderr" def setPilotlogFilename(filename): """ set the pilot log file name""" global pilotlogFilename if len(filename) > 0: pilotlogFilename = filename def getPilotlogFilename(): """ return the pilot log file name""" return pilotlogFilename def setPilotstderrFilename(filename): """ set the pilot stderr file name""" global pilotstderrFilename if len(filename) > 0: pilotstderrFilename = filename def getPilotstderrFilename(): """ return the pilot stderr file name""" return pilotstderrFilename def tolog_file(msg): """ write date+msg to pilot log only """ # t = time.strftime("%d %b %Y %H:%M:%S", time.localtime()) t = time.strftime("%d %b %Y %H:%M:%S", time.gmtime(time.time())) appendToLog("%s| %s\n" % (t, msg)) def appendToLog(txt): """ append txt to file """ try: f = open(pilotlogFilename, 'a') f.write(txt) f.close() except Exception, e: if "No such file" in str(e): pass else: print "WARNING: Exception caught: %s" % str(e) def tolog(msg, tofile=True): """ write date+msg to pilot log and to stdout """ # t = time.strftime("%d %b %Y %H:%M:%S", time.localtime()) t = time.strftime("%d %b %Y %H:%M:%S", time.gmtime(time.time())) if tofile: appendToLog("%s| %s\n" % (t, msg)) # remove backqoutes from the msg since they cause problems with batch submission of pilot # (might be present in error messages from the OS) msg = msg.replace("`","'") msg = msg.replace('"','\\"') print "%s| %s" % (t, msg) def tolog_err(msg): """ write error string to log """ tolog("!!WARNING!!4000!! %s" % str(msg)) def tolog_warn(msg): """ write warning string to log """ tolog("!!WARNING!!4000!! %s" % str(msg)) def makeHTTPUpdate(state, node, port, url=None): """ make http connection to jobdispatcher """ if state == 'finished' or state == 'failed' or state == 'holding': tolog("Preparing for final Panda server update") trial = 1 max_trials = 10 delay = 2*60 # seconds tolog("Max number of trials: %d, separated by delays of %d seconds" % (max_trials, delay)) else: # standard non final update trial = 1 max_trials = 1 delay = None # make http connection to jobdispatcher while trial <= max_trials: if url: # will be set for the dev server and IT/CERN cloud _url = '%s:%s/server/panda' % (url, port) else: # draw a random URL for the prod server _url = '%s:%s/server/panda' % (getRandomURL(), port) ret = httpConnect(node, _url) if ret[0] and trial == max_trials: # non-zero exit code if delay: # final update tolog("!!FAILED!!4000!! [Trial #%d/%d] Could not update Panda server (putting job in holding state if possible)" %\ (trial, max_trials)) # state change will take place in postJobTask # (the internal pilot state will not be holding but lostheartbeat) else: tolog("!!WARNING!!4000!! [Trial #%d/%d] Could not update Panda server, EC = %d" %\ (trial, max_trials, ret[0])) break elif ret[0]: # non-zero exit code tolog("!!WARNING!!4000!! [Trial #%d/%d] Could not update Panda server, EC = %d" %\ (trial, max_trials, ret[0])) if delay: # final update tolog("Can not miss the final update. Will take a nap for %d seconds and then try again.." % (delay)) trial += 1 time.sleep(delay) else: # try again later tolog("Panda server update postponed..") break else: break return ret def httpConnect(data, url, mode="UPDATE", sendproxy=False): # default mode allows exceptions to occur w/o interrupting the program """ function to handle the http connection """ status = 0 response = None # check if a job should be downloaded or if it's a server update if mode == "GETJOB": cmd = 'getJob' elif mode == "ISTHEREANALYSISJOB": cmd = 'isThereAnalysisJob' else: cmd = 'updateJob' # send the data dictionary to the dispatcher using command cmd # return format: status, parsed data, response return toDispatcher(url, cmd, data) def httpConnectOld(node, url, mode="UPDATE", sendproxy=False): # default mode allows exceptions to occur w/o interrupting the program """ function to handle the http connection """ # test if https_proxy is set up, if not, initiate it try: if os.environ["https_proxy"]: pass except: os.environ["https_proxy"] = "" os.environ["save_https_proxy"] = "" if os.environ["https_proxy"] != "": os.environ["save_https_proxy"] = os.environ["https_proxy"] ecode = 0 response = None data = None # check if a job should be downloaded or if it's a server update if mode == "GETJOB": cmd = 'getJob' elif mode == "ISTHEREANALYSISJOB": cmd = 'isThereAnalysisJob' else: cmd = 'updateJob' url = url + '/' + cmd rdata = urllib.urlencode(node) req = urllib2.Request(url) # check python version pv = sys.version_info pyversion = "%s.%s.%s" % (pv[0], pv[1], pv[2]) if pyversion >= "2.4" and os.environ["https_proxy"] != "" : # python 2.4 and a https_proxy is needed tolog("https_proxy = %s" % str(os.environ["https_proxy"])) # figure out the host and port from env. variable try: psaddr = os.environ["https_proxy"].split("//")[1].split("/")[0] except Exception: psaddr = os.environ["https_proxy"] os.environ["https_proxy"] = "" tolog("Using psaddr: %s" % (psaddr)) # python 2.4 supports timeout parameter in socket module timeout = 180 socket.setdefaulttimeout(timeout) # use an external module, which builds a urllib2 opener that can make https connections over proxy import httpsByProxy opener = urllib2.build_opener(httpsByProxy.ConnectHTTPHandler, httpsByProxy.ConnectHTTPSHandler) req.set_proxy(psaddr, 'https') else: # block https_proxy explicitly, even if it's set up in the env., because we can't handle that # in older versions of python os.environ["https_proxy"] = "" proxy_handler = urllib2.ProxyHandler({}) opener = urllib2.build_opener(proxy_handler) urllib2.install_opener(opener) # .................................................................................. # are we sending a proxy to the dispatcher? if mode == "GETJOB" and sendproxy: tolog("Asking for a job belonging to a user") match = re.search('[^:/]+://([^/]+)(/.+)',url) host = match.group(1) path = match.group(2) if os.environ.has_key('X509_USER_PROXY'): certKey = os.environ['X509_USER_PROXY'] else: certKey = '/tmp/x509up_u%s' % os.getuid() # catch connection errors if any try: import httplib conn = httplib.HTTPSConnection(host, key_file=certKey, cert_file=certKey) except Exception, e: tolog("Error connecting to jobDispatcher: %s" % str(e)) ecode = 1001 return ecode, None, response try: conn.request('POST', path, rdata) resp = conn.getresponse() response = resp.read() # create the parameter list from the dispatcher response data = parseDispatcherResponse(response) except Exception, e: tolog("Error requesting job from jobDispatcher: %s" % str(e)) ecode = 1002 return ecode, None, response else: # return at this point for successful job requests tolog("httpConnect ecode = %d" % (ecode)) return ecode, data, response else: tolog("Normal job request") # .................................................................................. # normal job requests/updates are performed here # catch connection errors if any try: fd = urllib2.urlopen(req, rdata) except urllib2.HTTPError, e: tolog("Error connecting to jobDispatcher: %s" % str(e)) tolog("Server error document follows:") tolog(str(e)) if mode == "GETJOB": ecode = 1001 else: # general error ecode = 1010 if os.environ["save_https_proxy"] != "": os.environ["https_proxy"] = os.environ["save_https_proxy"] tolog("httpConnect ecode = %d" % (ecode)) return ecode, None, response except urllib2.URLError,e: ee = str(e) tolog("Error retrieving data: %s" % (ee)) if mode == "GETJOB": ecode = 1002 elif ee.find('Connection refused') >= 0: ecode = 1008 elif ee.find('urlopen error') >= 0: ecode = 1009 else: # general error ecode = 1010 if os.environ["save_https_proxy"] != "": os.environ["https_proxy"] = os.environ["save_https_proxy"] tolog("httpConnect ecode = %d" % (ecode)) return ecode, None, response except Exception,e: tolog("exception caught during http connection: %s" % str(e)) if mode == "GETJOB": ecode = 1003 else: # general error ecode = 1010 if os.environ["save_https_proxy"] != "": os.environ["https_proxy"]=os.environ["save_https_proxy"] tolog("httpConnect ecode = %d" % (ecode)) return ecode, None, response # catch response errors try: response = fd.read() # create the parameter list from the dispatcher response data = parseDispatcherResponse(response) except socket.error,e: tolog("Error reading data: %s" % str(e)) if mode == "GETJOB": ecode = 1006 else: # general error ecode = 1010 if os.environ["save_https_proxy"] != "": os.environ["https_proxy"] = os.environ["save_https_proxy"] tolog("httpConnect ecode = %d" % (ecode)) return ecode, None, None except Exception,e: tolog("exception caught when reading data from panda dispatcher server : %s" % str(e)) if mode == "GETJOB": ecode = 1007 else: # general error ecode = 1010 if os.environ["save_https_proxy"] != "": os.environ["https_proxy"] = os.environ["save_https_proxy"] tolog("httpConnect ecode = %d" % (ecode)) return ecode, None, None try: fd.close() except Exception, e: if mode == "GETJOB": ecode = 1005 else: # general error ecode = 1010 if os.environ["save_https_proxy"] != "": os.environ["https_proxy"] = os.environ["save_https_proxy"] tolog("httpConnect ecode = %d: %s" % (ecode, str(e))) return ecode, None, None if os.environ["save_https_proxy"] != "": os.environ["https_proxy"] = os.environ["save_https_proxy"] tolog("httpConnect ecode = %d" % (ecode)) return ecode, data, response def returnLogMsg(logf=None, linenum=20): ''' return the last N lines of log files into a string''' thisLog = '' if logf: if not os.path.isfile(logf): thisLog = "\n- No log file %s found -" % (logf) else: thisLog = "\n- Log from %s -" % (logf) f = open(logf) lines = f.readlines() f.close() if len(lines) <= linenum: ln = len(lines) else: ln = linenum for i in range(-ln,0): thisLog += lines[i] return thisLog def PFCxml(fname, fnlist=[], fguids=[], fntag=None, alog=None, alogguid=None, fsize=[], checksum=[]): """ fguids list will be changed in the caller as well, since list is mutable object fguids and fnlist are for output files, alog and alogguid are for workdir tarball log files, which are not mutable so don't expect the value changed inside this function will change in the caller as well !!! """ # fnlist = output file list # fguids = output file guid list # fntag = pfn/lfn identifier # alog = name of log file # alogguid = guid of log file # fsize = file size list # checksum = checksum list # firstly make sure every file has a guid flist = [] glist = [] from SiteMover import SiteMover if alog: flist.append(alog) if not alogguid: alogguid = commands.getoutput('uuidgen') glist.append(alogguid) if fnlist: flist = flist + fnlist for i in range(0, len(fnlist)): # check for guid try: fguids[i] except IndexError, e: #print "This item doesn't exist" fguids.insert(i, commands.getoutput('uuidgen')) else: if not fguids[i]: # this guid doesn't exist fguids[i] = commands.getoutput('uuidgen') if fntag == "lfn": # check for file size try: fsize[i] except IndexError, e: #print "This item doesn't exist" fsize.insert(i, "") else: if not fsize[i]: # this fsize doesn't exist fsize[i] = "" # check for checksum try: checksum[i] except IndexError, e: #print "This item doesn't exist" checksum.insert(i, "") else: if not checksum[i]: # this checksum doesn't exist checksum[i] = "" glist = glist + fguids if fntag == "pfn": #create the PoolFileCatalog.xml-like file in the workdir fd = open(fname,"w") fd.write('\n') fd.write("\n") fd.write('\n') fd.write("\n") for i in range(0,len(flist)): fd.write(' \n'%(glist[i])) fd.write(" \n") fd.write(' \n'%(flist[i])) fd.write(" \n") fd.write(" \n") fd.write("\n") fd.close() elif fntag=="lfn": # create the metadata.xml-like file that's needed by dispatcher jobs fd=open(fname,"w") fd.write('\n') fd.write("\n") fd.write('\n') fd.write("\n") for i in range(0, len(flist)): fd.write(' \n' % (glist[i])) fd.write(" \n") fd.write(' \n' % (flist[i])) fd.write(" \n") # add log file metadata later (not known yet) if flist[i] == alog: fd.write(' \n') fd.write(' \n') else: if len(fsize) != 0: fd.write(' \n' % (fsize[i])) if len(checksum) != 0: fd.write(' \n' %\ (SiteMover.getChecksumType(checksum[i]), checksum[i])) fd.write(" \n") fd.write("\n") fd.close() else: tolog("fntag is neither lfn nor pfn, didn't create the XML file for output files") def stageInPyModules(initdir, workdir): """ copy pilot python modules into pilot workdir from condor initial dir """ if workdir and initdir: # both dir has valid value for k in fileList: if os.path.isfile("%s/%s" % (initdir, k)): try: shutil.copy2("%s/%s" % (initdir, k), workdir) except Exception, e: tolog("!!WARNING!!2999!! stageInPyModules failed to copy file %s/%s to %s: %s" % (initdir, k, workdir, str(e))) else: tolog("!!WARNING!!2999!! File missing during stage in: %s/%s" % (initdir, k)) def removePyModules(dir): """ copy pilot python modules into pilot workdir from condor initial dir """ if dir: # dir exists for k in fileList: try: os.system("rm -rf %s/%s %s/*.pyc"%(dir,k,dir)) except: pass def setTimeConsumed(worknode, t_tuple): """ Use the NG formula (system+user in seconds)*CPU[MHz]*664/1400 to convert the time usage into unit of kSpecInt2000seconds """ t_tot = reduce(lambda x, y:x+y, t_tuple[2:3]) conversionFactor = worknode.cpu*664/1400/1000 cpuCU = "kSI2kseconds" cpuCT = int(t_tot*conversionFactor) return cpuCU, cpuCT, conversionFactor def timeStamp(): ''' return ISO-8601 compliant date/time format ''' tmptz = time.timezone if tmptz > 0: signstr = '-' else: signstr = '+' tmptz_hours = int(tmptz/3600) return str("%s%s%02d%02d" % (time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), signstr, tmptz_hours, int(tmptz/60-tmptz_hours*60))) def timeStampUTC(): """ return UTC time stamp """ return time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(time.time())) def getJobStatus(jobId, pshttpurl, psport): """ Return the current status of job from the dispatcher typical dispatcher response: 'status=finished&StatusCode=0' StatusCode 0: succeeded 10: time-out 30: failed In the case of time-out, the dispatcher will be asked one more time after 10s """ from urllib import urlencode, urlopen status = 'unknown' StatusCode = -1 # par = urlencode({'ids': jobId}) nod = {} nod['ids'] = jobId # url = "https://gridui01.usatlas.bnl.gov:25443/server/panda/getStatus" url = "%s:%s/server/panda/getStatus" % (pshttpurl, repr(psport)) # ask dispatcher about lost job status trial = 1 max_trials = 2 while trial <= max_trials: try: # open connection ret = httpConnect(nod, url) response = ret[1] # decode the response # eg. var = ['status=notfound', 'attemptNr=0', 'StatusCode=0'] var = response.split("&") # create a dictionary of the response (protects against future updates) # eg. dic = {'status': 'activated', 'attemptNr': '0', 'StatusCode': '0'} dic = {} for i in range(len(var)): key = var[i].split('=')[0] dic[key] = var[i].split('=')[1] status = dic['status'] # e.g. 'holding' attemptNr = int(dic['attemptNr']) # e.g. '0' StatusCode = int(dic['StatusCode']) # e.g. '0' except Exception,e: tolog("Could not interpret job status from dispatcher: %s, %s" % (response, str(e))) status = 'unknown' attemptNr = -1 StatusCode = -1 break else: if StatusCode == 0: # success break elif StatusCode == 10: # time-out trial = trial + 1 time.sleep(10) continue else: # error break return status, attemptNr, StatusCode def getExitCode(path, filename): """ Try to read the exit code from the pilot stdout log """ ec = -1 from re import compile, findall # first create a tmp file with only the last few lines of the status file to avoid # reading a potentially long status file tmp_file_name = "tmp-tail-dump-file" try: os.system("tail %s/%s >%s/%s" % (path, filename, path, tmp_file_name)) except Exception, e: tolog("Job Recovery could not create tmp file %s: %s" % (tmp_file_name, str(e))) else: # open the tmp file and look for the pilot exit info try: tmp_file = open(tmp_file_name, 'r') except IOError: tolog("Job Recovery could not open tmp file") else: try: all_lines = tmp_file.readlines() except Exception, e: tolog("Job Recovery could not read tmp file %s" % (tmp_file_name, str(e))) tmp_file.close() # remove the tmp file try: os.remove(tmp_file_name) except OSError: tolog("Job Recovery could not remove tmp file: %s" % tmp_file) # now check the pilot exit info, if is has an exit code - remove the directory exit_info = compile(r"job ended with \(trf,pilot\) exit code of \(\d+,\d+\)") exitinfo_has_exit_code = False for line in all_lines: if findall(exit_info, line): exitinfo_has_exit_code = True return ec def getRemainingOutputFiles(outFiles): """ Return list of files if there are remaining output files in the lost job data directory """ remaining_files = [] for file_name in outFiles: if os.path.exists(file_name): remaining_files.append(file_name) return remaining_files def remove(entries): """ Remove files and directories entries should be a list """ status = True # protect against wrong usage if type(entries) == list: if len(entries) > 0: for entry in entries: try: os.system("rm -rf %s" %entry) except OSError: tolog("Could not remove %s" % entry) status = False else: tolog("Argument has wrong type, expected list: %s" % str(type(entries))) status = False return status def getCPUmodel(): """ Get cpu model and cache size from /proc/cpuinfo """ # model name : Intel(R) Xeon(TM) CPU 2.40GHz # cache size : 512 KB # gives the return string "Intel(R) Xeon(TM) CPU 2.40GHz 512 KB" cpumodel = '' cpucache = '' cache = '' modelstring = '' try: f = open('/proc/cpuinfo', 'r') except Exception, e: tolog("Could not open /proc/cpuinfo: %s" % str(e)) else: re_model = re.compile('^model name\s+:\s+(\w.+)') re_cache = re.compile('^cache size\s+:\s+(\d+ KB)') # loop over all lines in cpuinfo for line in f.readlines(): # try to grab cpumodel from current line model = re_model.search(line) if model: # found cpu model cpumodel = model.group(1) # try to grab cache size from current line cache = re_cache.search(line) if cache: # found cache size cpucache = cache.group(1) # stop after 1st pair found - can be multiple cpus if cpumodel and cpucache: # create return string modelstring = cpumodel + " " + cpucache break f.close() # default return string if no info was found if not modelstring: modelstring = "UNKNOWN" return modelstring def getExeErrors(startdir, fileName): """ Extract exeErrorCode and exeErrorDiag from jobInfo.xml """ exeErrorCode = 0 exeErrorDiag = "" # first check if the xml file exists (e.g. it doesn't exist for a test job) import commands findFlag = False line = "" # try to locate the file out = commands.getoutput("find %s -name %s" % (startdir, fileName)) if out != "": for line in out.split('\n'): tolog("Found trf error file at: %s" % (line)) findFlag = True break # just in case, but there should only be one trf error file if findFlag: if os.path.isfile(line): # import the xml functions from xml.sax import ContentHandler, make_parser from xml.sax.handler import feature_namespaces import JobInfoXML # Create a parser parser = make_parser() # Tell the parser we are not interested in XML namespaces parser.setFeature(feature_namespaces, 0) # Create the handler dh = JobInfoXML.JobInfoXML() # Tell the parser to use our handler parser.setContentHandler(dh) # Parse the input parser.parse(line) # get the error code and the error message(s) exeErrorCode = dh.getCode() exeErrorDiag = dh.getMessage() else: tolog("Could not read trf error file: %s" % (line)) else: tolog("Could not find trf error file %s in search path %s" % (fileName, startdir)) # only return a maximum of 250 characters in the error message (as defined in PandaDB) return exeErrorCode, exeErrorDiag[:250] def debugInfo(_str, tofile=True): """ print the debug string to stdout""" tolog("DEBUG: %s" % (_str), tofile=tofile) def isBuildJob(outFiles): """ Check if the job is a build job (i.e. check if the job only has one output file that is a lib file) """ isABuildJob = False # outFiles only contains a single file for build jobs, the lib file if len(outFiles) == 1: # e.g. outFiles[0] = user.paulnilsson.lxplus001.lib._001.log.tgz if outFiles[0].find(".lib.") > 0: isABuildJob = True return isABuildJob def OSBitsCheck(): """ determine whether the platform is a 32 or 64-bit OS """ b = -1 try: a = commands.getoutput('uname -a') b = a.find('x86_64') except: return 32 # default else: if b == -1 : # 32 bit OS return 32 else: # 64 bits OS return 64 def uniqueList(input_list): """ return a list of unique entries input_list = ['1', '1', '2'] -> ['1', '2'] """ u = {} for x in input_list: u[x] = 1 return u.keys() def diffLists(list1, list2): """ compare the input lists (len(list1) must be > len(list2)) and return the difference """ d = {} for x in list1: d[x] = 1 for x in list2: if d.has_key(x): del d[x] return d.keys() def getFileInfo(outputFiles, checksum_cmd, skiplog=False): """ Return lists with file sizes and checksums for the given output files """ fsize = [] checksum = [] # get error handler error = PilotErrors() pilotErrorDiag = "" for file in outputFiles: # add "" for the log metadata since it has not been created yet if file.find(".log.") != -1 and skiplog: ec = -1 else: from SiteMover import SiteMover ec, pilotErrorDiag, _fsize, _checksum = SiteMover.getLocalFileInfo(file, csumtype=checksum_cmd) tolog("Adding %s,%s for file %s using %s" % (_fsize, _checksum, file, checksum_cmd)) if ec == 0: fsize.append(_fsize) checksum.append(_checksum) else: if ec == error.ERR_FAILEDMD5LOCAL or ec == error.ERR_FAILEDADLOCAL: fsize.append(_fsize) checksum.append("") else: fsize.append("") checksum.append("") if ec != -1: # skip error message for log tolog("!!WARNING!!4000!! getFileInfo received an error from getLocalFileInfo for file: %s" % (file)) tolog("!!WARNING!!4000!! ec = %d, pilotErrorDiag = %s, fsize = %s, checksum = %s" %\ (ec, pilotErrorDiag, str(_fsize), str(_checksum))) return fsize, checksum def addMetadataForLog(fname, fsize, checksum, format=None, date=None): """ Add the fsize and checksum values for the log (left empty until this point) Return exit code and xml If format = NG, then the NorduGrid format of the metadata will be assumed """ ec = 0 lines = "" try: f = open(fname, 'r') except Exception, e: tolog("Failed to open metadata file: %s" % str(e)) ec = -1 else: datetag = "" newdatetag = "" if format == 'NG': metadata1 = '' new_metadata1 = '%s' % (fsize) datetag = '' if date and date != "None": _date = date else: _date = "1970-01-01 0:00:00" newdatetag = '%s' % (_date) else: metadata1 = '' new_metadata1 = '' % (fsize) # find out if checksum or adler32 should be added from SiteMover import SiteMover csumtype = SiteMover.getChecksumType(checksum) if format == 'NG': if csumtype == "adler32": metadata2 = '' new_metadata2 = '%s' % (checksum) else: metadata2 = '' new_metadata2 = '%s' % (checksum) else: if csumtype == "adler32": metadata2 = '' new_metadata2 = '' % (checksum) else: metadata2 = '' new_metadata2 = '' % (checksum) metadata3 = '' for line in f.readlines(): newline = "" if line.find(metadata1) != -1: newline = line.replace(metadata1, new_metadata1) lines += newline elif line.find(metadata2) != -1: newline = line.replace(metadata2, new_metadata2) lines += newline elif line.find(metadata3) != -1: newline = line.replace(metadata3, new_metadata2) lines += newline elif line.find('csumtypetobeset') != -1: newline = line.replace() else: lines += line f.close() try: f = open(fname, 'w') f.write(lines) f.close() except Exception, e: tolog("Failed to write new metadata for log: %s" % str(e)) ec = -1 return ec, lines def removeFiles(dir, fileList): """ Remove the input files from the work dir """ ec = 0 found = 0 for file in fileList: if os.path.isfile("%s/%s" % (dir, file)): try: os.remove("%s/%s" % (dir, file)) except Exception, e: tolog("Failed to remove file: %s/%s" % (dir, file)) ec = 1 else: tolog("Removed file: %s/%s" % (dir, file)) found += 1 if found > 0: tolog("Removed %d/%d input file(s)" % (found, len(fileList))) else: tolog("Input files already removed (not found)") return ec def createPoolFileCatalog(file_list): """ Create the PoolFileCatalog.xml file_list = { guid1 : sfn1, ... } Adapted from R. Walker's code """ outxml = '' if len(file_list) == 0: tolog('No input files so no PFC created') else: dom = minidom.getDOMImplementation() doctype = dom.createDocumentType("POOLFILECATALOG","","InMemory") doc = dom.createDocument(None, "POOLFILECATALOG", doctype) root = doc.documentElement doc.appendChild(root) # Prepare plain text as can`t trust minidom on python <2.3 pfc_text = '\n' pfc_text += '\n' pfc_text += '\n' pfc_text += '\n' # Strip .N because stagein makes soft link, and params have no .N for guid in file_list.keys(): sfn = file_list[guid] ftype='ROOT_All' file = doc.createElement("File") file.setAttribute('ID', guid) root.appendChild(file) # physical element - file in local directory without .N extension physical = doc.createElement("physical") file.appendChild(physical) pfn = doc.createElement('pfn') pfn.setAttribute('filetype', ftype) pfn.setAttribute('name', sfn) physical.appendChild(pfn) # logical element - empty logical = doc.createElement('logical') file.appendChild(logical) pfc_text += ' \n \n \n \n \n \n' %\ (guid, ftype, sfn) pfc_text += '\n' tolog(str(doc.toxml())) tolog(pfc_text) f = open('PoolFileCatalog.xml', 'w') f.write(pfc_text) f.close() outxml = pfc_text return outxml def replace(filename, stext, rtext): """ replace string stext with rtext in file filename """ status = True try: input = open(filename, "r") except Exception, e: tolog("!!WARNING!!4000!! Open failed with %s" % str(e)) status = False else: try: output = open(filename + "_tmp", "w") except Exception, e: tolog("!!WARNING!!4000!! Open failed with %s" % str(e)) status = False input.close() else: for s in input.xreadlines(): output.write(s.replace(stext, rtext)) input.close() # rename tmp file and overwrite original file try: os.rename(filename + "_tmp", filename) except Exception, e: tolog("!!WARNING!!4000!! Rename failed with %s" % str(e)) status = False output.close() return status def dumpFile(filename, topilotlog=False): """ dump a given file to stdout or to pilotlog """ if os.path.exists(filename): try: f = open(filename, "r") except Exception, e: tolog("!!WARNING!!4000!! Exception caught: %s" % str(e)) else: for line in f.readlines(): line = line.rstrip() if topilotlog: tolog("%s" % (line)) else: print "%s" % (line) f.close() else: tolog("!!WARNING!!4000!! %s does not exist" % (filename)) def getDirectAccessDic(qdata): """ return the directAccess dictionary in case the site supports direct access """ # task: create structure # directAccess = { # 'oldPrefix' : 'gsiftp://osgserv04.slac.stanford.edu/xrootd/atlas', # 'newPrefix' : 'root://atl-xrdr.slac.stanford.edu:1094//atlas/xrootd', # 'useCopyTool' : False, # 'directIn' : True # } # from queuedata variable copysetup # copysetup = setup_string^oldPrefix^newPrefix^useCopyTool^directIn # example: # copysetup=^gsiftp://osgserv04.slac.stanford.edu/xrootd/atlas^root://atl-xrdr.slac.stanford.edu:1094//atlas/xrootd^False^True # (setup_string not used) # (all cases tested) # qdata = 'whatever^gsiftp://osgserv04.slac.stanford.edu/xrootd/atlas^root://atl-xrdr.slac.stanford.edu:1094//atlas/xrootd^False^True' # qdata = '^gsiftp://osgserv04.slac.stanford.edu/xrootd/atlas^root://atl-xrdr.slac.stanford.edu:1094//atlas/xrootd^False^True' # qdata = 'gsiftp://osgserv04.slac.stanford.edu/xrootd/atlas^root://atl-xrdr.slac.stanford.edu:1094//atlas/xrootd^False^True' # qdata = '' or 'whatever' directAccess = None if qdata.find('^') > -1: n = qdata.count('^') i = 0 # protect against a forgotten inital ^ in case the setup_string is empty! if n == 3 or n == 4: # read data data = qdata.split('^') if n == 3: i = -1 else: copysetup = data[0] i += 1 oldPrefix = data[i] i += 1 newPrefix = data[i] i += 1 if data[i].lower() == 'true': useCopyTool = True else: useCopyTool = False i += 1 if data[i].lower() == 'true': directIn = True else: directIn = False # create structure directAccess = { 'oldPrefix': oldPrefix, 'newPrefix': newPrefix, 'useCopyTool': useCopyTool, 'directIn': directIn } tolog("directAccess: %s" % str(directAccess)) else: tolog("!!WARNING!!4000!! copysetup has wrong format (should contain data separated by 4 ^ signs): %s" % (qdata)) else: # do nothing, don't care about the copysetup right now (only later in Mover) # copysetup = qdata pass return directAccess def getErrors(filename): """ get all !!TEXT!!NUMBER!!... errors from file """ from re import compile, findall ret = "" try: f = open(filename) lines = f.readlines() f.close() except Exception, e: tolog("!!WARNING!!4000!! could not open/read file: %s" % str(e)) else: p = r"!!(\S+)!!\d+!!" pattern = compile(p) for line in lines: if findall(pattern, line): ret += line return ret def getLFN(pfn, lfnlist): """ get the local file name from the xml, and ignore any trailing __DQ2-parts e.g. HITS.017771._00188.pool.root__DQ2-1200097946 -> HITS.017771._00188.pool.root """ lfn = "" for lfn in lfnlist: bpfn = os.path.basename(pfn) if bpfn[:len(lfn)] == lfn: break return lfn def makeTransRegReport(all_transferred, some_transferred, latereg, nr_transferred, nr_files, fail, ec, ret, fields): """ make the transfer and registration report """ error = PilotErrors() tolog("") tolog("..Transfer and registration report.........................................................................") tolog(". Mover has finished") if all_transferred and not latereg: if nr_files > 1: tolog(". All (%d) files have been transferred and registered" % (nr_files)) else: tolog(". The single file has been transferred and registered") elif all_transferred and latereg: if nr_files > 1: tolog(". All (%d) files have been transferred but not registered" % (nr_files)) tolog(". The files will be registrered by a later pilot if job recovery is supported,") else: tolog(". The single file has been transferred but not registered") tolog(". The file will be registrered by a later pilot if job recovery is supported,") tolog(". otherwise this job will fail") elif some_transferred and latereg: tolog(". Some files (%d/%d) were transferred but no file was registered" %\ (nr_transferred, nr_files)) tolog(". The remaining files will be transferred and registrered by a later pilot if job recovery is supported,") tolog(". otherwise this job will fail") elif some_transferred and not latereg: tolog(". Some files (%d/%d) were transferred and registered" %\ (nr_transferred, nr_files)) tolog(". The remaining files will be transferred and registrered by a later pilot if job recovery is supported,") tolog(". otherwise this job will fail") elif not some_transferred: tolog(". No files (%d/%d) were transferred or registered" %\ (nr_transferred, nr_files)) if nr_files > 1: tolog(". The files will be transferred and registrered by a later pilot if job recovery is supported,") else: tolog(". The file will be transferred and registrered by a later pilot if job recovery is supported,") tolog(". otherwise this job will fail") else: tolog(". Mover has finished") if fail != 0: tolog(". File transfer exit code : (%d, %s)" %\ (fail, error.getErrorStrVerbose(fail))) else: tolog(". File transfer exit code : (%d, )" % (fail)) if some_transferred: tolog(". File registration return values : (%d, %s, %s)" %\ (ec, error.getErrorStrVerbose(ec), str(ret))) tolog(". Put function will return fields : %s" % str(fields)) tolog(". Transfer and registration report produced at : %s" % timeStamp()) tolog("...........................................................................................................") tolog("") def hasBeenTransferred(fields): """ determine whether files were successfully transferred """ status = False s = 0 # the fields will all be empty if no files were transferred for field in fields: s += len(field) if s > 0: status = True return status def removeSRMInfo(f0): """ remove any SRM info from the f0 string """ from SiteMover import SiteMover fields0 = "" for pfns in f0.split("+"): if pfns != "": pfns = SiteMover.stripPortAndVersion(pfns) fields0 += "%s+" % (pfns) # remove any trailing +-sign if fields0[-1] == "+": fields0 = fields0[:-1] if fields0 == "": fields0 = f0 if f0 != fields0: tolog("removeSRMInfo() has updated %s to %s" % (f0, fields0)) return fields0 def registerFiles(fields, ub=None): """ register all files in the LRC """ error = PilotErrors() ret = '1' if ub != "None" and ub != None and ub != "": # ub is 'None' outside the US # find out if checksum or adler32 should be added from SiteMover import SiteMover _checksum = fields[4].split("+")[0] # first one, assume same type for the rest if len(_checksum) > 0: csumtype = SiteMover.getChecksumType(_checksum) else: csumtype = CMD_CHECKSUM # use default (md5sum) # remove any srm info from the pfns #fields[0] = removeSRMInfo(fields[0]) if csumtype == "adler32": params = urllib.urlencode({'pfns': fields[0], 'lfns': fields[1], 'guids': fields[2], 'fsizes': fields[3],\ 'md5sums': '', 'adler32s': fields[4], 'archivals': fields[5]}) else: params = urllib.urlencode({'pfns': fields[0], 'lfns': fields[1], 'guids': fields[2], 'fsizes': fields[3],\ 'md5sums': fields[4], 'adler32s': '', 'archivals': fields[5]}) try: url = ub + '/lrc/files' if url.find('//lrc') > 0: url = url.replace('//lrc','/lrc') tolog("Will send params: %s" % str(params)) tolog("Trying urlopen with: %s" % (url)) f = urllib.urlopen(url, params) except Exception, e: tolog("!!WARNING!!4000!! Unexpected exception: %s" % str(e)) return error.ERR_DDMREG, str(e) else: ret = f.read() if ret != '1': ret = ret.replace('\n', ' ') tolog('!!WARNING!!4000!! LRC registration error: %s' % str(ret)) tolog('!!WARNING!!4000!! LRC URL requested: %s' % f.geturl()) if ret == 'LFNnonunique': return error.ERR_DDMREGDUP, ret elif ret.find("guid-metadata entry already exists") >= 0: return error.ERR_GUIDSEXISTSINLRC, ret else: return error.ERR_DDMREG, ret return 0, ret def lateRegistration(ub, job, type="unknown"): """ late registration used by the job recovery """ # function will return True if late registration has been performed, False if it failed # and None if there is nothing to do status = False latereg = False fields = None # protect against old jobState files which may not have the new variables try: tolog("type: %s" % (type)) if type == "output": if job.output_latereg == "False": latereg = False else: latereg = True fields = job.output_fields elif type == "log": if job.log_latereg == "False": latereg = False else: latereg = True fields = job.log_field else: tolog("!!WARNING!!4000!! Unknown id type for registration: %s" % (type)) tolog("!!WARNING!!4000!! Skipping late registration step") pass except Exception, e: tolog("!!WARNING!!4000!! Late registration has come upon an old jobState file - can not perform this step: %s" % str(e)) pass else: tolog("latereg: %s" % str(latereg)) tolog("fields: %s" % str(fields)) # should late registration be performed? if latereg: ec, ret = registerFiles(fields, ub=ub) if ec == 0: tolog("registerFiles done") status = True else: tolog("!!WARNING!!4000!! File registration returned: (%d, %s)" % (ec, ret)) if not latereg: tolog("Nothing to register (%s)" % (type)) return None else: return status def isAnalJob(trf, prodSourceLabel=""): """ Determine whether the job is an analysis job or not """ if (trf.startswith('https://') or trf.startswith('http://')) and prodSourceLabel != "software": analJob = True else: analJob = False return analJob def getPayloadName(job): """ figure out a suitable name for the payload """ jobtrf = job.trf.split(",")[0] if jobtrf.find("panda") > 0 and jobtrf.find("mover") > 0: name = "pandamover" elif jobtrf.find("Athena") > 0 or jobtrf.find("trf") > 0: name = "athena" else: if isBuildJob(job.outFiles): name = "buildjob" else: name = "payload" return name def getSiteSpecs(): """ Get available releases (code adapted from Torre) """ if os.environ.has_key("VO_ATLAS_SW_DIR"): scappdir = readpar('appdir') if scappdir != "": if os.path.exists(os.path.join(scappdir, 'software/releases')): relbase = os.path.join(scappdir, 'software/releases') elif os.path.exists(os.path.join(scappdir, 'software')): relbase = os.path.join(scappdir, 'software') else: relbase = scappdir else: tolog("VO_ATLAS_SW_DIR area: %s" % (os.environ["VO_ATLAS_SW_DIR"])) relbase = "%s/software" % os.environ["VO_ATLAS_SW_DIR"] elif os.environ.has_key("OSG_APP"): scappdir = readpar('appdir') if scappdir != "": appdir = scappdir else: appdir = os.environ["OSG_APP"] tolog("OSG app area: %s" % (appdir)) cmd = "/bin/ls -alL %s" % (appdir) tolog("Executing command: %s" % (cmd)) s = commands.getoutput(cmd) tolog("Output: %s" % (s)) if appdir[-len('atlas_app/atlas_rel'):] == "atlas_app/atlas_rel": # appdir already contains the release string, don't add it again relbase = appdir else: relbase = "%s/atlas_app/atlas_rel" % (appdir) elif os.environ.has_key("APP"): tolog("APP area: %s" % (os.environ["APP"])) relbase = "%s/atlas_app/atlas_rel" % os.environ["APP"] else: tolog("No software directory found") relbase = '' if relbase != '': tolog("Looking for releases in: %s" % (relbase)) cmd = "/bin/ls -alL " + relbase tolog("Executing command: %s" % (cmd)) s = commands.getoutput(cmd) tolog("Output: %s" % (s)) releases = '' dirlist = commands.getoutput("/bin/ls -1 " + relbase) dirlist = dirlist.split('\n') for f in dirlist: pat = re.compile('^[0-9]+\.[0-9\.]+[0-9a-zA-Z\.\_]*') mat = pat.match(f) if mat: if os.path.exists(relbase + '/' + f + '/cmtsite/setup.sh'): if releases != '': releases += '|' releases += 'Atlas-%s' % f return releases else: return '' def setUnsetVars(thisSite): """ set pilot variables in case they have not been set by the pilot launcher """ # thisSite will be updated and returned tolog('Setting unset pilot variables using queuedata') if thisSite.appdir == "": scappdir = readpar('appdir') if os.environ.has_key("OSG_APP"): if scappdir == "": scappdir = os.environ["OSG_APP"] if scappdir == "": scappdir = "/usatlas/projects/OSG" tolog('!!WARNING!!4000!! appdir not set in queuedata or $OSG_APP: using default %s' % (scappdir)) else: tolog('!!WARNING!!4000!! appdir not set in queuedata - using $OSG_APP: %s' % (scappdir)) if scappdir.find('/atlas_app/atlas_rel'): tolog('Temporarily removing string /atlas_app/atlas_rel from appdir') scappdir = scappdir.replace('/atlas_app/atlas_rel','') tolog('appdir: %s' % (scappdir)) thisSite.appdir = scappdir if thisSite.dq2url == "": _dq2url = readpar('dq2url') if _dq2url == "": tolog('Note: dq2url not set') else: tolog('dq2url: %s' % (_dq2url)) thisSite.dq2url = _dq2url if thisSite.wntmpdir == "": _wntmpdir = readpar('wntmpdir') # if _wntmpdir.startswith("$"): # _wntmpdir = commands.getoutput("echo %s" % (_wntmpdir)) # elif _wntmpdir == "": if _wntmpdir == "": _wntmpdir = thisSite.workdir tolog('!!WARNING!!4000!! wntmpdir not set - using site workdir: %s' % (_wntmpdir)) tolog('wntmpdir: %s' % (_wntmpdir)) thisSite.wntmpdir = _wntmpdir return thisSite def findPythonInRelease(siteroot, atlasRelease): """ Set the python executable in the release dir (LCG sites only) """ tolog("Trying to find a python executable for release: %s" % (atlasRelease)) scappdir = readpar('appdir') if scappdir != "": if os.path.exists(os.path.join(scappdir, 'software/releases')): _swbase = os.path.join(scappdir, 'software/releases') elif os.path.exists(os.path.join(scappdir, 'software')): _swbase = os.path.join(scappdir, 'software') else: _swbase = scappdir cmd = "source %s/%s/setup.sh;" % (_swbase, atlasRelease) else: cmd = "source $VO_ATLAS_SW_DIR/software/%s/setup.sh;" % (atlasRelease) cmd += "source %s/AtlasOffline/%s/AtlasOfflineRelease/cmt/setup.sh" % (siteroot, atlasRelease) py = "" tolog("Executing command: %s" % (cmd)) rc, rs = commands.getstatusoutput(cmd) if rc == 0: cmd += ";which python" rs = commands.getoutput(cmd) if rs.startswith('/'): tolog("Found: %s" % (rs)) py = rs else: if '\n' in rs: rs = rs.split('\n')[-1] if rs.startswith('/'): tolog("Found: %s" % (rs)) py = rs else: tolog("!!WARNING!!4000!! No python executable found in release dir: %s" % (rs)) else: tolog("!!WARNING!!4000!! Command failed: %d, %s" % (rc, rs)) return py def stringToFields(jobFields): """ Convert a jobState string to a fields array """ jobFields = jobFields.replace('[','').replace(']','') jobFields = jobFields.replace("\'","") rf = jobFields.split(',') fields = [] for f in rf: fields += [f.strip()] return fields #def getpar(par, s): # """ extract par from s """ # # pars = s.split('|') # for p in pars: # pat = re.compile(".*%s=(.*)" % (par)) # mat = pat.match(p) # if mat: # return mat.group(1) # return '' def getpar(par, s): """ extract par from s """ matches = re.findall("(^|\|)([^\|=]+)=",s) for tmp,tmpPar in matches: if tmpPar == par: patt = "%s=(.*)" % par idx = matches.index((tmp,tmpPar)) if idx+1 == len(matches): patt += '$' else: patt += "\|%s=" % matches[idx+1][1] mat = re.search(patt,s) if mat: return mat.group(1) else: return '' return '' def readpar(par): """ read par from queuedata """ fh = open("%s/queuedata.dat" % (os.environ['PilotHomeDir'])) queuedata = fh.read() fh.close() return getpar(par, queuedata) def getBatchSystemJobID(): """ return the batch system job id (will be reported to the server) """ id = "" # BQS (e.g. LYON) if os.environ.has_key("BQSCLUSTER"): id = os.environ["BQSCLUSTER"] return "BQS", id # Torque if os.environ.has_key("PBS_JOBID"): id = os.environ["PBS_JOBID"] return "Torque", id # LSF if os.environ.has_key("LSB_JOBID"): id = os.environ["LSB_JOBID"] return "LSF", id # Sun's Grid Engine if os.environ.has_key("JOB_ID"): id = os.environ["JOB_ID"] return "Grid Engine", id # Condor (variable sent through job submit file) if os.environ.has_key("clusterid"): id = os.environ["clusterid"] return "Condor", id # # Condor (id unknown) # if os.environ.has_key("_CONDOR_SCRATCH_DIR"): # id = "(unknown clusterid)" # return "Condor", id return None, id def touch(filename): """ touch a file """ if not os.path.isfile(filename): try: os.system("touch %s" % (filename)) except exception, e: tolog("!!WARNING!!1000!! Failed to touch file: %s" % str(e)) else: tolog("Lock file created: %s" % (filename)) def createLockFile(jobrec, workdir, lockfile="LOCKFILE"): """ Site workdir protection to prevent the work dir from being deleted by the cleanup function if pilot fails to register the log """ # only try to create a lock file if it doesn't exist already # do not bother to create it if the site doesn't allow for job recovery f = "%s/%s" % (workdir, lockfile) if lockfile == "LOCKFILE": if jobrec: touch(f) else: touch(f) def verifyTransfer(workdir, verbose=True): """ verify that all files were transferred by checking the existance of the ALLFILESTRANSFERRED lockfile """ status = False fname = "%s/ALLFILESTRANSFERRED" % (workdir) if os.path.exists(fname): if verbose: tolog("Verified: %s" % (fname)) status = True else: if verbose: tolog("Transfer verification failed: %s (file does not exist)" % (fname)) return status def removeLEDuplicates(logMsg): """ identify duplicated messages in the log extracts and remove them """ # first create a new log extracts list that does not have the time stamps # (which will be different for the otherwise different messages) # E.g.: # 31 Mar 2008 01:32:37| !!WARNING!!1999!! Could not read modification time of ESD.020072._04023.pool.root.9 # 31 Mar 2008 02:03:08| !!WARNING!!1999!! Could not read modification time of ESD.020072._04023.pool.root.9 # should only be printed once, # 31 Mar 2008 01:32:37| !!WARNING!!1999!! Could not read modification time of ESD.020072._04023.pool.root.9 log_extracts_list = logMsg.split('\n') # create a temporary list with stripped timestamp fields log_extracts_tmp = [] pattern = re.compile(r"(\d+ [A-Za-z]+ \d+ \d+:\d+:\d+\|)") for line in log_extracts_list: # id the time stamp found = re.findall(pattern, line) if len(found) > 0: # remove any time stamp line = line.replace(found[0], '') log_extracts_tmp.append(line) # remove duplicate lines and create an index list to know where the original line was # (we want to bring back the timestamp) # do not use dictionaries since they are not sorted i = 0 log_extracts_index = [] log_extracts_tmp2 = [] for line in log_extracts_tmp: if line not in log_extracts_tmp2: log_extracts_index.append(i) log_extracts_tmp2.append(line) i += 1 # create the final list log_extracts_tmp = [] for index in log_extracts_index: log_extracts_tmp.append(log_extracts_list[index]) # return the stripped logMsg return "\n".join(log_extracts_tmp) def replaceQueuedataField(field, value, verbose=True): """ replace a given queuedata field with a new value """ # copytool = -> lcgcp # replaceQueuedataField("copytool", "lcgcp") status = True queuedata_filename = "%s/queuedata.dat" % (os.environ['PilotHomeDir']) stext = field + "=" + readpar(field) rtext = field + "=" + value if replace(queuedata_filename, stext, rtext): if verbose: tolog("Successfully changed %s to: %s" % (field, value)) else: tolog("!!WARNING!!1999!! Failed to change %s to: %s" % (field, value)) status = False return status def verifyCMTCONFIG(releaseCommand): """ make sure that the required CMTCONFIG match that of the local system """ status = True pilotErrorDiag = "" cmd = commands.getoutput("%s;echo $CMTCONFIG" % (releaseCommand)) if cmd.find("\n") >= 0: # in case the releaseCommand gives info as well, get rid of it cmd = cmd.split("\n")[-1] cmtconfig_local = cmd if cmtconfig_local == "": cmtconfig_local = "local cmtconfig not set" cmtconfig_required = readpar('cmtconfig') if cmtconfig_required == "": cmtconfig_required = "required cmtconfig not set" # does the required CMTCONFIG match that of the local system? if cmtconfig_required != cmtconfig_local: pilotErrorDiag = "Required CMTCONFIG (%s) incompatible with that of local system (%s)" %\ (cmtconfig_required, cmtconfig_local) tolog("!!FAILED!!2990!! %s" % (pilotErrorDiag)) status = False return status, pilotErrorDiag def writeExitCode(workdir, ec): """ write exit code ec to file EXITCODE """ fname = "%s/EXITCODE" % (workdir) try: f = open(fname, "w") except Exception, e: tolog("!!WARNING!!2990!! Could not open: %s, %s" % (fname, str(e))) else: f.write("%d" % (ec)) f.close() tolog("Wrote exit code %d to file: %s" % (ec, fname)) def readExitCode(workdir): """ read exit code from file /EXITCODE """ ec = 0 fname = "%s/EXITCODE" % (workdir) if os.path.exists(fname): try: f = open(fname, "r") except Exception, e: tolog("Failed to open %s: %s" % (fname, str(e))) else: ec = int(f.read()) tolog("Found exit code: %d" % (ec)) f.close() else: tolog("No exit code to report to wrapper (file %s does not exist)" % (fname)) return ec def evaluateQueuedata(): """ evaluate environmental variables if used and replace the value in the queuedata """ # the following fields are allowed to contain environmental variables fields = ["copysetup", "copysetupin", "recoverdir", "wntmpdir", "sepath", "seprodpath", "lfcpath", "lfcprodpath"] # process each field and evaluate the environment variables if present for field in fields: # grab the field value and split it since some fields can contain ^-separators old_values = readpar(field) new_values = [] for value in old_values.split("^"): if value.startswith("$"): # evaluate the environmental variable new_value = commands.getoutput("echo %s" % (value)) if new_value == "": tolog("!!WARNING!!2999!! Environmental variable not set: %s" % (value)) value = new_value new_values.append(value) # rebuild the string (^-separated if necessary) new_values_joined = '^'.join(new_values) # replace the field value in the queuedata with the new value if new_values_joined != old_values: if replaceQueuedataField(field, new_values_joined, verbose=False): tolog("Updated field %s in queuedata (replaced \'%s\' with \'%s\')" % (field, old_values, new_values_joined)) def verifyQueuedata(queuename, filename, _i, _N, url): """ check if the downloaded queuedata has the proper format """ hasQueuedata = False try: f = open(filename, "r") except Exception, e: tolog("!!WARNING!!1999!! Open failed with %s" % str(e)) else: output = f.read() f.close() try: output_split = output.split('=')[0] except Exception, e: output_split = "split failed: %s" % str(e) if output_split != 'appdir': if len(output_split) == 0: tolog("!!WARNING!!1999!! curl command returned empty queuedata (wrong queuename %s?)" % (queuename)) else: tolog("!!WARNING!!1999!! Attempt %d/%d: curl command did not return valid queuedata from config DB server %s" %\ (_i, _N, url)) output = output.replace('\n', '') output = output.replace(' ', '') tolog("!!WARNING!!1999!! Output begins with: %s" % (output[:64])) try: os.remove(filename) except Exception, e: tolog("!!WARNING!!1999!! Failed to remove file %s: %s" % (filename, str(e))) else: # found valid queuedata info, break the for-loop tolog("schedconfigDB returned: %s" % (output)) hasQueuedata = True return hasQueuedata def getQueuedata(queuename, sitename): """ Download the queuedata if not already downloaded """ # read the queue parameters and create the queuedata.dat file (if the queuename varible is set) # pilot home dir is not the workdir. It's the sandbox directory for the pilot. # queuedata structure: (with example values) # appdir=/ifs0/osg/app/atlas_app/atlas_rel # dq2url=http://gk03.swt2.uta.edu:8000/dq2/ # copytool=gridftp # copysetup= # ddm=UTA_SWT2 # se=http://gk03.swt2.uta.edu:8000/dq2/ # sepath= # envsetup= # region=US # copyprefix= # lfcpath= # lfchost= # sein= # wntmpdir=/scratch os.environ['PilotHomeDir'] = commands.getoutput('pwd') hasQueuedata = False # try the config servers one by one in case one of them is not responding filename = '%s/queuedata.dat' % (os.environ['PilotHomeDir']) if os.path.exists(filename): tolog("Queuedata has already been downloaded by pilot wrapper script (will confirm validity)") hasQueuedata = verifyQueuedata(queuename, filename, 1, 1, "(see batch log for url)") if hasQueuedata: tolog("Queuedata was successfully downloaded by pilot wrapper script") else: tolog("Queuedata was not downloaded successfully by pilot wrapper script, will try again") if not hasQueuedata: global chttpurl # shuffle list so same cache is not hit by all jobs from random import shuffle shuffle(chttpurl) _i = 0 _N = len(chttpurl) ret = -1 for url in chttpurl: _i += 1 if 'ANALY_' in sitename: cmd = 'curl --connect-timeout 20 --max-time 120 -sS "%s:25880/server/pandamon/query?tpmes=pilotpars&siteid=%s" > %s'%\ (url, sitename, filename) else: cmd = 'curl --connect-timeout 20 --max-time 120 -sS "%s:25880/server/pandamon/query?tpmes=pilotpars&queue=%s" > %s'%\ (url, queuename, filename) tolog("Executing command: %s" % (cmd)) try: # output will be empty since we pipe into a file ret, output = commands.getstatusoutput(cmd) except Exception, e: tolog("!!WARNING!!1999!! Failed with curl command: %s" % str(e)) return -1, False else: if ret == 0: # read back the queuedata to verify its validity hasQueuedata = verifyQueuedata(queuename, filename, _i, _N, url) if hasQueuedata: break else: tolog("!!WARNING!!1999!! curl command exited with code %d" % (ret)) if hasQueuedata: # evaluate environmental variables if used evaluateQueuedata() return 0, hasQueuedata def postProcessQueuedata(queuename, thisSite, jobrec): """ update queuedata fields if necessary """ #if thisSite.sitename == "UTA_PAUL_TEST": # ec = replaceQueuedataField("copytool", "cp") #if thisSite.sitename == "RAL": # ec = replaceQueuedataField("copytoolin", "rfcpsvcclass") # ec = replaceQueuedataField("status", "online") #if thisSite.sitename == "TRIUMF_REPRO": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("copyprefixin", "srm://srm.triumf.ca/^dummy") # ec = replaceQueuedataField("copytool", "lcgcp") # ec = replaceQueuedataField("copytoolin", "lcgcp") # ec = replaceQueuedataField("setokens", "") # srm://dcsrm.usatlas.bnl.gov:8443/srm/managerv2?SFN=/pnfs/usatlas.bnl.gov/atlasuserdisk/ #if thisSite.sitename == "BNL_ATLAS_test" or thisSite.sitename == "ANALY_BNL_test": #if thisSite.sitename == "BNL_ATLAS_test": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("retry", "true") # ec = replaceQueuedataField("copysetup", "/afs/usatlas.bnl.gov/i386_redhat72/opt/dcache/dcache_client_config.sh^srm://dcsrm.usatlas.bnl.gov(:[0-9]+)*(/srm/managerv2?SFN=)*/^dcache:/^False^False") # ec = replaceQueuedataField("copytool", "lcg-cp2") # ec = replaceQueuedataField("se", "srm://dcsrm.usatlas.bnl.gov") # ec = replaceQueuedataField("seprodpath", "/pnfs/usatlas.bnl.gov") # ec = replaceQueuedataField("lfchost", "lfc.usatlas.bnl.gov") # ec = replaceQueuedataField("lfcpath", "/grid/atlas/dq2") # ec = replaceQueuedataField("copyprefix", "^srm://dcsrm.usatlas.bnl.gov") #if thisSite.sitename == "NIKHEF-ELPROD": # ec = replaceQueuedataField("seopt", "srm://srm.grid.sara.nl:8443/srm/managerv2?SFN=,srm://tbn18.nikhef.nl:8446/srm/managerv2?SFN=,srm://srm.grid.sara.nl:8443/srm/managerv2?SFN=,srm://srm.grid.sara.nl:8443/srm/managerv2?SFN=") # ec = replaceQueuedataField("seprodpath","/pnfs/grid.sara.nl/data/atlas/atlasmcdisk/,/dpm/nikhef.nl/home/atlas/atlasdatadisk/,/pfns/grid.sara.nl/data/atlas/atlasdatatape/,/pnfs/grid.sara.nl/data/atlas/atlasmctape/") #if thisSite.sitename == "HU_ATLAS_Tier2": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("copytool", "lsm") # ec = replaceQueuedataField("ddm", "NET2_PRODDISK") # ec = replaceQueuedataField("setokens", "ATLASPRODDISK") # ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://atlas.bu.edu:8443/srm/v2/server?SFN=") # ec = replaceQueuedataField("seprodpath", "/gpfs1/atlasproddisk") #if thisSite.sitename == "BU_ATLAS_Tier2o" or thisSite.sitename == "HU_ATLAS_Tier2": # ec = replaceQueuedataField("status", "online") ## ec = replaceQueuedataField("ddm", "NET2_PRODDISK") # ec = replaceQueuedataField("copytool", "lsm") # ec = replaceQueuedataField("lfchost", "heroatlas.fas.harvard.edu") ## ec = replaceQueuedataField("lfcpath", "/grid/atlas/users/pathena") # ec = replaceQueuedataField("lfcprodpath", "/grid/atlas/dq2") # ec = replaceQueuedataField("copyprefix", "^srm://atlas.bu.edu") #if thisSite.sitename == "ANALY_NET2" or thisSite.sitename == "ANALY_HU_ATLAS_Tier2": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("ddm", "NET2_USERDISK") # ec = replaceQueuedataField("copytool", "lsm") # ec = replaceQueuedataField("lfchost", "heroatlas.fas.harvard.edu") # ec = replaceQueuedataField("lfcpath", "/grid/atlas/users/pathena") # ec = replaceQueuedataField("lfcprodpath", "/grid/atlas/users/pathena") # ec = replaceQueuedataField("copyprefix", "^srm://atlas.bu.edu") # ec = replaceQueuedataField("envsetup", "source /atlasgrid/osg-wn-1.0.0/setup.sh") # ec = replaceQueuedataField("setokens", "ATLASPRODDISK") # ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://atlas.bu.edu:8443/srm/v2/server?SFN=") # ec = replaceQueuedataField("seprodpath", "/gpfs1/atlasproddisk") #if thisSite.sitename == "ANALY_SLAC": # ec = replaceQueuedataField("retry", "false") # ec = replaceQueuedataField("ddm", "SLACXRD_USERDISK") # ec = replaceQueuedataField("setokens", "ATLASUSERDISK") # ec = replaceQueuedataField("se", "token:ATLASUSERDISK:srm://osgserv04.slac.stanford.edu:8443/srm/v2/server?SFN=") # ec = replaceQueuedataField("sepath", "/xrootd/atlas/atlasuserdisk") # ec = replaceQueuedataField("seprodpath", "/xrootd/atlas/atlasuserdisk") # ec = replaceQueuedataField("lfchost", "atl-lfc.slac.stanford.edu") # ec = replaceQueuedataField("copyprefix", "^srm://osgserv04.slac.stanford.edu") # ec = replaceQueuedataField("dq2url", "") #if thisSite.sitename == "SLACXRD": # ec = replaceQueuedataField("retry", "false") #if thisSite.sitename == "GLOW-ATLAS": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("retry", "false") # ec = replaceQueuedataField("ddm", "WISC_PRODDISK") # ec = replaceQueuedataField("lfchost", "higgs04.cs.wisc.edu") # ec = replaceQueuedataField("lfcpath", "/grid/atlas/users/pathena") # ec = replaceQueuedataField("lfcprodpath", "/grid/atlas/dq2") # ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://atlas07.cs.wisc.edu:8443/srm/v2/server?SFN=") # ec = replaceQueuedataField("setokens", "ATLASPRODDISK") # ec = replaceQueuedataField("sepath", "/atlas/xrootd/atlasuserdisk") # ec = replaceQueuedataField("seprodpath", "/atlas/xrootd/atlasproddisk") # ec = replaceQueuedataField("copyprefix", "^srm://atlas07.cs.wisc.edu") # ec = replaceQueuedataField("dq2url", "") #if thisSite.sitename == "MWT2_UC" or thisSite.sitename == "UC_ATLAS_MWT2": # ec = replaceQueuedataField("copytoolin", "dccp") # ec = replaceQueuedataField("lfchost", "uct2-dc1.uchicago.edu") # ec = replaceQueuedataField("copyprefix", "^srm://uct2-dc1.uchicago.edu") # ec = replaceQueuedataField("retry", "false") # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("ddm", "MWT2_UC_PRODDISK") # ec = replaceQueuedataField("copytool", "lcg-cp2") # ec = replaceQueuedataField("envsetup", "source /share/wn-client/setup.sh") # ec = replaceQueuedataField("setokens", "ATLASPRODDISK") # ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://uct2-dc1.uchicago.edu:8443/srm/managerv2?SFN=") # ec = replaceQueuedataField("seprodpath", "/pnfs/uchicago.edu/atlasproddisk") #if thisSite.sitename == "OU_OSCER_ATLAS": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("copysetup", "") # ec = replaceQueuedataField("se", "gsiftp://tier2-02.ochep.ou.edu/ibrix/data/dq2-cache/") #if thisSite.sitename == "OU_OCHEP_SWT2": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("lfchost", "tier2-02.ochep.ou.edu") # ec = replaceQueuedataField("copyprefix", "^tier2-02.ochep.ou.edu") # ec = replaceQueuedataField("dq2url", "") #if thisSite.sitename == "AGLT2": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("lfchost", "lfc.aglt2.org") # ec = replaceQueuedataField("copyprefix", "^srm://head01.aglt2.org") #if thisSite.sitename == "PIC": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("copyprefixin", "dcap://dcap.pic.es:22125^dummy") # ec = replaceQueuedataField("copyprefixin", "dcap://dcap.pic.es/pnfs/pic.es/data^dummy") # ec = replaceQueuedataField("copytoolin", "dccplfc") #if thisSite.sitename == "ANALY_SWT2_CPB": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("copytool", "xcp") # ec = replaceQueuedataField("copytoolin", "") # ec = replaceQueuedataField("copysetupin", "") # ec = replaceQueuedataField("copysetup", "/cluster/atlas/xrootd/atlasutils/setup_xcp.sh") # ec = replaceQueuedataField("copyprefix", "^srm://gk03.atlas-swt2.org") # ec = replaceQueuedataField("lfchost", "gk02.atlas-swt2.org") # ec = replaceQueuedataField("lfcpath", "/grid/atlas/users/pathena") # ec = replaceQueuedataField("lfcprodpath", "/grid/atlas/users/pathena") #if thisSite.sitename == "SWT2_CPB": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("copytool", "xcp") # ec = replaceQueuedataField("copytoolin", "") # ec = replaceQueuedataField("copysetupin", "") # ec = replaceQueuedataField("copysetup", "/cluster/atlas/xrootd/atlasutils/setup_xcp.sh") # ec = replaceQueuedataField("copyprefix", "^srm://gk03.atlas-swt2.org") # ec = replaceQueuedataField("lfchost", "gk02.atlas-swt2.org") # ec = replaceQueuedataField("lfcpath", "") # ec = replaceQueuedataField("lfcprodpath", "/grid/atlas/dq2") # ec = replaceQueuedataField("ddm", "SWT2_CPB_PRODDISK") # ec = replaceQueuedataField("copysetup", "") # ec = replaceQueuedataField("copysetupin", "/cluster/atlas/xrootd/atlasutils/setup_xcp.sh") # ec = replaceQueuedataField("envsetup", "source /cluster/grid/wn-client/setup.sh") # ec = replaceQueuedataField("setokens", "ATLASPRODDISK") # ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://gk03.atlas-swt2.org:8443/srm/v2/server?SFN=") # ec = replaceQueuedataField("seprodpath", "/xrd/atlasproddisk") #if thisSite.sitename == "IU_OSG": # ec = replaceQueuedataField("status", "online") # ec = replaceQueuedataField("ddm", "MWT2_IU_PRODDISK") # ec = replaceQueuedataField("copytool", "lcg-cp2") # ec = replaceQueuedataField("envsetup", "source /usr/local/osg/setup.sh") # ec = replaceQueuedataField("setokens", "ATLASPRODDISK") # ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://iut2-dc1.iu.edu:8443/srm/managerv2?SFN=") # ec = replaceQueuedataField("seprodpath", "/pnfs/iu.edu/atlasproddisk") #if thisSite.sitename == "MWT2_IU": # ec = replaceQueuedataField("copyprefix", "^srm://iut2-dc1.iu.edu") #if thisSite.sitename == "ANALY_LYON_XROOTD": # ec = replaceQueuedataField("copysetup", "") # ec = replaceQueuedataField("copysetupin", "/usr/local/shared/bin/xrootd_env.sh^srm://ccsrm.in2p3.fr/pnfs/in2p3.fr/data/atlas/disk/dq2^root://ccsrb15:1094//pnfs/in2p3.fr/data/atlas/disk/dq2^False^True") #if thisSite.sitename == "INFN-T1": # ec = replaceQueuedataField("copytool", "storm") #if thisSite.sitename == "INFN-BOLOGNA": # ec = replaceQueuedataField("copytool", "storm") # ec = replaceQueuedataField("lfchost", "lfc.cr.cnaf.infn.it") # ec = replaceQueuedataField("se", "token:ATLASMCDISK:srm://storm-fe.cr.cnaf.infn.it:8444/srm/managerv2?SFN=,srm://srm-v2.cr.cnaf.infn.it") # ec = replaceQueuedataField("sepath", "/atlas/atlasmcdisk//users/pathena") # ec = replaceQueuedataField("seprodpath", "/atlas/atlasmcdisk/,/atlas/atlasdatadisk/,/castor/cnaf.infn.it/grid/lcg/atlas/atlasdatatape/") # ec = replaceQueuedataField("setokens", "ATLASMCDISK,ATLASDATADISK,ATLASDATATAPE") #if thisSite.sitename == "ANALY_CERN": # ec = replaceQueuedataField("appdir", "/afs/cern.ch/atlas") # _envsetup = "unset GLITE_ENV_SET;" + readpar("envsetup") # ec = replaceQueuedataField("envsetup", _envsetup) _status = readpar('status') if _status != None and _status != "": if _status.upper() == "OFFLINE": tolog("Site %s is currently in %s mode - aborting pilot" % (thisSite.sitename, _status.lower())) return -1, None, None else: tolog("Site %s is currently in %s mode" % (thisSite.sitename, _status.lower())) # override pilot run options _jobrec = readpar('retry') if _jobrec.upper() == "TRUE": tolog("Job recovery turned on") jobrec = True elif _jobrec.upper() == "FALSE": tolog("Job recovery turned off") jobrec = False else: tolog("Job recovery variable (retry) not set") # set pilot variables in case they have not been set by the pilot launcher thisSite = setUnsetVars(thisSite) return 0, thisSite, jobrec def isSameType(trf, userflag, prodSourceLabel): """ is the lost job of same type as the current pilot? """ # MOVE TO JOB RECOVERY CLASS LATER # treat userflag 'self' as 'user' if userflag == 'self': userflag = 'user' if (isAnalJob(trf, prodSourceLabel=prodSourceLabel) and userflag == 'user') or \ (not isAnalJob(trf, prodSourceLabel=prodSourceLabel) and userflag != 'user'): sametype = True if userflag == 'user': tolog("Lost job is of same type as current pilot (analysis pilot, lost analysis job trf: %s)" % (trf)) else: tolog("Lost job is of same type as current pilot (production pilot, lost production job trf: %s)" % (trf)) else: sametype = False if userflag == 'user': tolog("Lost job is not of same type as current pilot (analysis pilot, lost production job trf: %s)" % (trf)) else: tolog("Lost job is not of same type as current pilot (production pilot, lost analysis job trf: %s)" % (trf)) return sametype def verifyProxyValidity(): """ make sure that we have a long lasting proxy before asking for a job """ status = True envsetupin = readpar('envsetupin') if envsetupin == "": envsetup = readpar('envsetup') if envsetup == "": tolog("Using envsetup since envsetupin is not set") else: tolog("verifyProxyValidity found no envsetup") else: envsetup = envsetupin # do we have a valid proxy? from SiteMover import SiteMover ec, pilotErrorDiag = SiteMover.verifyProxy(envsetup) return ec, pilotErrorDiag def getGuidsFromXML(dir, id=None, filename=None): """ extract the guid matching the filename from the xml, or all guids if filename not set """ guids = [] # are we in recovery mode? then id is set if id: metadata_filename = "%s/metadata-%s.xml" % (dir, repr(id)) else: metadata_filename = "%s/metadata.xml" % (dir) xmldoc = minidom.parse(metadata_filename) fileList = xmldoc.getElementsByTagName("File") for thisfile in fileList: gpfn = str(thisfile.getElementsByTagName("lfn")[0].getAttribute("name")) if (filename and gpfn == filename) or (not filename): guid = str(thisfile.getAttribute("ID")) guids.append(guid) return guids def addToSkipped(lfn, guid): """ add metadata for skipped file """ ec = 0 try: # append to skipped.xml file fd = open("skipped.xml", "a") except Exception, e: tolog("!!WARNING!!2999!! Exception caught: %s" % str(e)) ec = -1 else: fd.write(' \n' % (guid)) fd.write(" \n") fd.write(' \n' % (lfn)) fd.write(" \n") fd.write(" \n") fd.close() return ec def addSkippedToPFC(fname, skippedfname): """ add skipped input file info to metadata.xml """ ec = 0 try: fd = open(skippedfname, "r") except Exception, e: tolog("!!WARNING!!2999!! %s" % str(e)) ec = -1 else: skippedXML = fd.read() fd.close() try: fdPFC = open(fname, "r") except Exception, e: tolog("!!WARNING!!2999!! %s" % str(e)) ec = -1 else: PFCXML = fdPFC.read() fdPFC.close() if ec == 0: # add the skipped file info to the end of the PFC PFCXML = PFCXML.replace("", skippedXML) PFCXML += "\n" # move the old PFC and create a new PFC try: os.system("mv %s %s.BAK2" % (fname, fname)) except Exception, e: tolog("!!WARNING!!2999!! %s" % str(e)) ec = -1 else: try: fdNEW = open(fname, "w") except Exception, e: tolog("!!WARNING!!2999!! %s" % str(e)) ec = -1 else: fdNEW.write(PFCXML) fdNEW.close() tolog("Wrote updated XML with skipped file info:\n%s" % (PFCXML)) return ec def verifyPilotWorkir(args, wntmpdir): """ make sure that the args tuple contains a proper wntmpdir to be used as the pilot workdir """ # if the wrapper script fails to download the queuedata, the pilot will be started with a default workdir # which might not be ok try: if args[2] != wntmpdir and wntmpdir != '': tolog("!!WARNING!!2999!! Args tuple does not contain the same wntmpdir as schedconfig: %s ne %s" % (args[2], wntmpdir)) # update the tuple with the schedconfig value tolog("Original args tuple: %s" % str(args)) proper_args_list = list(args) proper_args_list[2] = wntmpdir args = tuple(proper_args_list) tolog("Updated args tuple: %s" % str(args)) else: tolog("Args tuple contains a valid wntmpdir: %s" % (wntmpdir)) except Exception, e: tolog("!!WARNING!!2999!! Caught exception: %s" % str(e)) return args def getSwbase(appdir, release): """ return the swbase variable """ # appdir comes from thisSite.appdir (might not be set) # release info is needed to figure out the correct path to use when schedconfig.appdir is set swbase = "" if readpar('region') == 'Nordugrid': if os.environ.has_key('RUNTIME_CONFIG_DIR'): _swbase = os.environ['RUNTIME_CONFIG_DIR'] if os.path.exists(_swbase): swbase = _swbase elif os.environ.has_key('VO_ATLAS_SW_DIR'): # use the appdir from queuedata if available scappdir = readpar('appdir') if scappdir != "": # e.g. for CERN-UNVALID, appdir contains the full path to the releases if os.path.exists(os.path.join(scappdir, release)): swbase = scappdir # for other CERN sites 'software/release' must be added elif os.path.exists(os.path.join(scappdir, 'software/releases')): swbase = os.path.join(scappdir, 'software/releases') # for remaining LCG sites, only 'software' needs to be added else: swbase = os.path.join(scappdir, 'software') else: # for CERN if os.path.exists(os.environ['VO_ATLAS_SW_DIR'] + '/software/releases'): swbase = os.environ['VO_ATLAS_SW_DIR'] + '/software/releases' # for remaining LCG sites else: swbase = os.environ['VO_ATLAS_SW_DIR'] + '/software' else: # for non-LCG sites if appdir.find('atlas_app/atlas_rel') < 0: swbase = appdir + '/atlas_app/atlas_rel' else: swbase = appdir return swbase.replace('//','/') class _Curl: """ curl class """ # constructor def __init__(self): # path to curl self.path = 'curl' # verification of the host certificate self.verifyHost = True # request a compressed response self.compress = True # SSL cert/key if os.environ.has_key('X509_USER_PROXY'): self.sslCert = os.environ['X509_USER_PROXY'] self.sslKey = os.environ['X509_USER_PROXY'] else: self.sslCert = '/tmp/x509up_u%s' % str(os.getuid()) self.sslKey = '/tmp/x509up_u%s' % str(os.getuid()) # CA cert dir if os.environ.has_key('X509_CERT_DIR'): self.sslCertDir = os.environ['X509_CERT_DIR'] else: _dir = '/etc/grid-security/certificates' if os.path.exists(_dir): self.sslCertDir = _dir else: tolog("!!WARNING!!2999!! $X509_CERT_DIR is not set and default location %s does not exist" % (_dir)) self.sslCertDir = '' # GET method def get(self, url, data): # make command com = '%s --silent --get' % self.path if not self.verifyHost: com += ' --insecure' if self.compress: com += ' --compressed' if self.sslCertDir != '': com += ' --capath %s' % self.sslCertDir if self.sslCert != '': com += ' --cert %s' % self.sslCert if self.sslKey != '': com += ' --key %s' % self.sslKey #com += ' --verbose' # data strData = '' for key in data.keys(): strData += 'data="%s"\n' % urllib.urlencode({key:data[key]}) # write data to temporary config file # tmpName = commands.getoutput('uuidgen') tmpName = 'curl.config' tmpFile = open(tmpName,'w') tmpFile.write(strData) tmpFile.close() com += ' --config %s' % tmpName com += ' %s' % url # execute tolog("Executing command: %s" % (com)) ret = commands.getstatusoutput(com) # remove temporary file #os.remove(tmpName) return ret # POST method def post(self, url, data): # make command com = '%s --silent --show-error' % self.path if not self.verifyHost: com += ' --insecure' if self.compress: com += ' --compressed' if self.sslCertDir != '': com += ' --capath %s' % self.sslCertDir if self.sslCert != '': com += ' --cert %s' % self.sslCert if self.sslKey != '': com += ' --key %s' % self.sslKey #com += ' --verbose' # data strData = '' for key in data.keys(): strData += 'data="%s"\n' % urllib.urlencode({key:data[key]}) # write data to temporary config file #tmpName = commands.getoutput('uuidgen') tmpName = 'curl.config' tmpFile = open(tmpName,'w') tmpFile.write(strData) tmpFile.close() com += ' --config %s' % tmpName com += ' %s' % url # execute tolog("Executing command: %s" % (com)) ret = commands.getstatusoutput(com) # remove temporary file #os.remove(tmpName) return ret # PUT method def put(self, url, data): # make command com = '%s --silent' % self.path if not self.verifyHost: com += ' --insecure' if self.compress: com += ' --compressed' if self.sslCertDir != '': com += ' --capath %s' % self.sslCertDir if self.sslCert != '': com += ' --cert %s' % self.sslCert if self.sslKey != '': com += ' --key %s' % self.sslKey #com += ' --verbose' # emulate PUT for key in data.keys(): com += ' -F "%s=@%s"' % (key,data[key]) com += ' %s' % url # execute tolog("Executing command: %s" % (com)) return commands.getstatusoutput(com) # send message to dispatcher def toDispatcher(baseURL, cmd, data): """ sends 'data' using command 'cmd' to the dispatcher """ try: tpre = datetime.datetime.utcnow() except: pass tolog("toDispatcher: cmd = %s" % (cmd)) tolog("toDispatcher: len(data) = %d" % len(data)) # instantiate curl curl = _Curl() # execute url = baseURL + '/' + cmd curlstat, response = curl.post(url, data) try: tpost = datetime.datetime.utcnow() tolog("Elapsed seconds: %d" % ((tpost-tpre).seconds)) except: pass try: if curlstat == 0: # parse response message outtxt = response.lower() if outtxt.find('') > 0: if outtxt.find('read timeout') > 0: tolog("!!WARNING!!2999!! Timeout on dispatcher exchange") else: tolog("!!WARNING!!2999!! HTTP error on dispatcher exchange") tolog("HTTP output: %s" % (response)) return EC_Failed, None, None # create the parameter list from the dispatcher response data = parseDispatcherResponse(response) status = int(data['StatusCode']) if status == SC_Success: tolog("Successful dispatcher exchange") elif status == SC_NoJobs: tolog("!!WARNING!!0!! Dispatcher has no jobs") elif status == SC_TimeOut: tolog("!!WARNING!!2999!! Dispatcher reports timeout") elif status == SC_Failed: tolog("!!WARNING!!2999!! Dispatcher reports failure processing message") elif status == SC_NonSecure: tolog("!!FAILED!!2999!! Attempt to retrieve job with non-secure connection disallowed") else: tolog("!!WARNING!!2999!! Unknown dispatcher status code: %d" % (status)) tolog("Dumping curl.config file:") dumpFile('curl.config', topilotlog=True) else: tolog("!!WARNING!!2999!! Dispatcher message curl error: %d " % (curlstat)) tolog("Response = %s" % (response)) tolog("Dumping curl.config file:") dumpFile('curl.config', topilotlog=True) return curlstat, None, None if status == SC_Success: return status, data, response else: return status, None, None except: type, value, traceBack = sys.exc_info() tolog("ERROR %s : %s %s" % (cmd, type, value)) return EC_Failed, None, None def getPilotToken(tofile=False): """ read the pilot token from file """ pilottoken = None filename = "pilottoken.txt" if os.path.exists(filename): try: f = open(filename, "r") except Exception, e: tolog("!!WARNING!!2999!! Could not open pilot token file: %s" % str(e), tofile=tofile) else: try: pilottoken = f.read() except Exception, e: tolog("!!WARNING!!2999!! Could not read pilot token: %s" % str(e), tofile=tofile) else: f.close() tolog("Successfully read pilot token", tofile=tofile) try: os.remove(filename) except Exception, e: tolog("!!WARNING!!2999!! Could not remove pilot token file: %s" % str(e), tofile=tofile) else: tolog("Pilot token file has been removed", tofile=tofile) return pilottoken def getRandomURL(): """ return a random url from the urlList for getJob/updateJob """ from random import shuffle urlList = ['https://gridui05.usatlas.bnl.gov','https://gridui06.usatlas.bnl.gov','https://gridui07.usatlas.bnl.gov'] shuffle(urlList) return urlList[0] def parseDispatcherResponse(response): """ create the parameter list from the dispatcher response """ parList = cgi.parse_qsl(response, keep_blank_values=True) tolog("Dispatcher response: %s" % str(parList)) data = {} for p in parList: data[p[0]] = p[1] return data