import sys, os, signal, time, shutil, cgi
import commands, getopt, re
import urllib2, urllib, socket
from xml.dom import minidom
from xml.dom.minidom import Document
from xml.dom.minidom import parse, parseString
try:
import datetime
except:
pass
from PilotErrors import PilotErrors
from config import config_sm
CMD_CHECKSUM = config_sm.COMMAND_MD5
########### dispatcher status codes
SC_Success = 0
SC_TimeOut = 10
SC_NoJobs = 20
SC_Failed = 30
SC_NonSecure = 40
########### exit code
EC_Failed = 255
# all files that need to be copied to the workdir
fileList = commands.getoutput('ls *.py').split()
# schedconfig DB web servers
#chttpurl = ['http://panda.cern.ch'] # add more servers later?
chttpurl = ['http://voatlas19.cern.ch', 'http://voatlas20.cern.ch', 'http://voatlas21.cern.ch']
#chttpurl = ['http://pandamon.usatlas.bnl.gov',\
# 'http://gridui05.usatlas.bnl.gov','http://gridui06.usatlas.bnl.gov','http://gridui07.usatlas.bnl.gov']
# default pilot log files
pilotlogFilename = "pilotlog.out"
pilotstderrFilename = "pilot.stderr"
def setPilotlogFilename(filename):
""" set the pilot log file name"""
global pilotlogFilename
if len(filename) > 0:
pilotlogFilename = filename
def getPilotlogFilename():
""" return the pilot log file name"""
return pilotlogFilename
def setPilotstderrFilename(filename):
""" set the pilot stderr file name"""
global pilotstderrFilename
if len(filename) > 0:
pilotstderrFilename = filename
def getPilotstderrFilename():
""" return the pilot stderr file name"""
return pilotstderrFilename
def tolog_file(msg):
""" write date+msg to pilot log only """
# t = time.strftime("%d %b %Y %H:%M:%S", time.localtime())
t = time.strftime("%d %b %Y %H:%M:%S", time.gmtime(time.time()))
appendToLog("%s| %s\n" % (t, msg))
def appendToLog(txt):
""" append txt to file """
try:
f = open(pilotlogFilename, 'a')
f.write(txt)
f.close()
except Exception, e:
if "No such file" in str(e):
pass
else:
print "WARNING: Exception caught: %s" % str(e)
def tolog(msg, tofile=True):
""" write date+msg to pilot log and to stdout """
# t = time.strftime("%d %b %Y %H:%M:%S", time.localtime())
t = time.strftime("%d %b %Y %H:%M:%S", time.gmtime(time.time()))
if tofile:
appendToLog("%s| %s\n" % (t, msg))
# remove backqoutes from the msg since they cause problems with batch submission of pilot
# (might be present in error messages from the OS)
msg = msg.replace("`","'")
msg = msg.replace('"','\\"')
print "%s| %s" % (t, msg)
def tolog_err(msg):
""" write error string to log """
tolog("!!WARNING!!4000!! %s" % str(msg))
def tolog_warn(msg):
""" write warning string to log """
tolog("!!WARNING!!4000!! %s" % str(msg))
def makeHTTPUpdate(state, node, port, url=None):
""" make http connection to jobdispatcher """
if state == 'finished' or state == 'failed' or state == 'holding':
tolog("Preparing for final Panda server update")
trial = 1
max_trials = 10
delay = 2*60 # seconds
tolog("Max number of trials: %d, separated by delays of %d seconds" % (max_trials, delay))
else:
# standard non final update
trial = 1
max_trials = 1
delay = None
# make http connection to jobdispatcher
while trial <= max_trials:
if url:
# will be set for the dev server and IT/CERN cloud
_url = '%s:%s/server/panda' % (url, port)
else:
# draw a random URL for the prod server
_url = '%s:%s/server/panda' % (getRandomURL(), port)
ret = httpConnect(node, _url)
if ret[0] and trial == max_trials: # non-zero exit code
if delay: # final update
tolog("!!FAILED!!4000!! [Trial #%d/%d] Could not update Panda server (putting job in holding state if possible)" %\
(trial, max_trials))
# state change will take place in postJobTask
# (the internal pilot state will not be holding but lostheartbeat)
else:
tolog("!!WARNING!!4000!! [Trial #%d/%d] Could not update Panda server, EC = %d" %\
(trial, max_trials, ret[0]))
break
elif ret[0]: # non-zero exit code
tolog("!!WARNING!!4000!! [Trial #%d/%d] Could not update Panda server, EC = %d" %\
(trial, max_trials, ret[0]))
if delay: # final update
tolog("Can not miss the final update. Will take a nap for %d seconds and then try again.." % (delay))
trial += 1
time.sleep(delay)
else: # try again later
tolog("Panda server update postponed..")
break
else:
break
return ret
def httpConnect(data, url, mode="UPDATE", sendproxy=False): # default mode allows exceptions to occur w/o interrupting the program
""" function to handle the http connection """
status = 0
response = None
# check if a job should be downloaded or if it's a server update
if mode == "GETJOB":
cmd = 'getJob'
elif mode == "ISTHEREANALYSISJOB":
cmd = 'isThereAnalysisJob'
else:
cmd = 'updateJob'
# send the data dictionary to the dispatcher using command cmd
# return format: status, parsed data, response
return toDispatcher(url, cmd, data)
def httpConnectOld(node, url, mode="UPDATE", sendproxy=False): # default mode allows exceptions to occur w/o interrupting the program
""" function to handle the http connection """
# test if https_proxy is set up, if not, initiate it
try:
if os.environ["https_proxy"]:
pass
except:
os.environ["https_proxy"] = ""
os.environ["save_https_proxy"] = ""
if os.environ["https_proxy"] != "":
os.environ["save_https_proxy"] = os.environ["https_proxy"]
ecode = 0
response = None
data = None
# check if a job should be downloaded or if it's a server update
if mode == "GETJOB":
cmd = 'getJob'
elif mode == "ISTHEREANALYSISJOB":
cmd = 'isThereAnalysisJob'
else:
cmd = 'updateJob'
url = url + '/' + cmd
rdata = urllib.urlencode(node)
req = urllib2.Request(url)
# check python version
pv = sys.version_info
pyversion = "%s.%s.%s" % (pv[0], pv[1], pv[2])
if pyversion >= "2.4" and os.environ["https_proxy"] != "" : # python 2.4 and a https_proxy is needed
tolog("https_proxy = %s" % str(os.environ["https_proxy"]))
# figure out the host and port from env. variable
try:
psaddr = os.environ["https_proxy"].split("//")[1].split("/")[0]
except Exception:
psaddr = os.environ["https_proxy"]
os.environ["https_proxy"] = ""
tolog("Using psaddr: %s" % (psaddr))
# python 2.4 supports timeout parameter in socket module
timeout = 180
socket.setdefaulttimeout(timeout)
# use an external module, which builds a urllib2 opener that can make https connections over proxy
import httpsByProxy
opener = urllib2.build_opener(httpsByProxy.ConnectHTTPHandler, httpsByProxy.ConnectHTTPSHandler)
req.set_proxy(psaddr, 'https')
else:
# block https_proxy explicitly, even if it's set up in the env., because we can't handle that
# in older versions of python
os.environ["https_proxy"] = ""
proxy_handler = urllib2.ProxyHandler({})
opener = urllib2.build_opener(proxy_handler)
urllib2.install_opener(opener)
# ..................................................................................
# are we sending a proxy to the dispatcher?
if mode == "GETJOB" and sendproxy:
tolog("Asking for a job belonging to a user")
match = re.search('[^:/]+://([^/]+)(/.+)',url)
host = match.group(1)
path = match.group(2)
if os.environ.has_key('X509_USER_PROXY'):
certKey = os.environ['X509_USER_PROXY']
else:
certKey = '/tmp/x509up_u%s' % os.getuid()
# catch connection errors if any
try:
import httplib
conn = httplib.HTTPSConnection(host, key_file=certKey, cert_file=certKey)
except Exception, e:
tolog("Error connecting to jobDispatcher: %s" % str(e))
ecode = 1001
return ecode, None, response
try:
conn.request('POST', path, rdata)
resp = conn.getresponse()
response = resp.read()
# create the parameter list from the dispatcher response
data = parseDispatcherResponse(response)
except Exception, e:
tolog("Error requesting job from jobDispatcher: %s" % str(e))
ecode = 1002
return ecode, None, response
else:
# return at this point for successful job requests
tolog("httpConnect ecode = %d" % (ecode))
return ecode, data, response
else:
tolog("Normal job request")
# ..................................................................................
# normal job requests/updates are performed here
# catch connection errors if any
try:
fd = urllib2.urlopen(req, rdata)
except urllib2.HTTPError, e:
tolog("Error connecting to jobDispatcher: %s" % str(e))
tolog("Server error document follows:")
tolog(str(e))
if mode == "GETJOB":
ecode = 1001
else:
# general error
ecode = 1010
if os.environ["save_https_proxy"] != "":
os.environ["https_proxy"] = os.environ["save_https_proxy"]
tolog("httpConnect ecode = %d" % (ecode))
return ecode, None, response
except urllib2.URLError,e:
ee = str(e)
tolog("Error retrieving data: %s" % (ee))
if mode == "GETJOB":
ecode = 1002
elif ee.find('Connection refused') >= 0:
ecode = 1008
elif ee.find('urlopen error') >= 0:
ecode = 1009
else:
# general error
ecode = 1010
if os.environ["save_https_proxy"] != "":
os.environ["https_proxy"] = os.environ["save_https_proxy"]
tolog("httpConnect ecode = %d" % (ecode))
return ecode, None, response
except Exception,e:
tolog("exception caught during http connection: %s" % str(e))
if mode == "GETJOB":
ecode = 1003
else:
# general error
ecode = 1010
if os.environ["save_https_proxy"] != "":
os.environ["https_proxy"]=os.environ["save_https_proxy"]
tolog("httpConnect ecode = %d" % (ecode))
return ecode, None, response
# catch response errors
try:
response = fd.read()
# create the parameter list from the dispatcher response
data = parseDispatcherResponse(response)
except socket.error,e:
tolog("Error reading data: %s" % str(e))
if mode == "GETJOB":
ecode = 1006
else:
# general error
ecode = 1010
if os.environ["save_https_proxy"] != "":
os.environ["https_proxy"] = os.environ["save_https_proxy"]
tolog("httpConnect ecode = %d" % (ecode))
return ecode, None, None
except Exception,e:
tolog("exception caught when reading data from panda dispatcher server : %s" % str(e))
if mode == "GETJOB":
ecode = 1007
else:
# general error
ecode = 1010
if os.environ["save_https_proxy"] != "":
os.environ["https_proxy"] = os.environ["save_https_proxy"]
tolog("httpConnect ecode = %d" % (ecode))
return ecode, None, None
try:
fd.close()
except Exception, e:
if mode == "GETJOB":
ecode = 1005
else:
# general error
ecode = 1010
if os.environ["save_https_proxy"] != "":
os.environ["https_proxy"] = os.environ["save_https_proxy"]
tolog("httpConnect ecode = %d: %s" % (ecode, str(e)))
return ecode, None, None
if os.environ["save_https_proxy"] != "":
os.environ["https_proxy"] = os.environ["save_https_proxy"]
tolog("httpConnect ecode = %d" % (ecode))
return ecode, data, response
def returnLogMsg(logf=None, linenum=20):
''' return the last N lines of log files into a string'''
thisLog = ''
if logf:
if not os.path.isfile(logf):
thisLog = "\n- No log file %s found -" % (logf)
else:
thisLog = "\n- Log from %s -" % (logf)
f = open(logf)
lines = f.readlines()
f.close()
if len(lines) <= linenum:
ln = len(lines)
else:
ln = linenum
for i in range(-ln,0):
thisLog += lines[i]
return thisLog
def PFCxml(fname, fnlist=[], fguids=[], fntag=None, alog=None, alogguid=None, fsize=[], checksum=[]):
""" fguids list will be changed in the caller as well, since list is mutable object
fguids and fnlist are for output files, alog and alogguid are for workdir tarball log
files, which are not mutable so don't expect the value changed inside this function
will change in the caller as well !!! """
# fnlist = output file list
# fguids = output file guid list
# fntag = pfn/lfn identifier
# alog = name of log file
# alogguid = guid of log file
# fsize = file size list
# checksum = checksum list
# firstly make sure every file has a guid
flist = []
glist = []
from SiteMover import SiteMover
if alog:
flist.append(alog)
if not alogguid:
alogguid = commands.getoutput('uuidgen')
glist.append(alogguid)
if fnlist:
flist = flist + fnlist
for i in range(0, len(fnlist)):
# check for guid
try:
fguids[i]
except IndexError, e:
#print "This item doesn't exist"
fguids.insert(i, commands.getoutput('uuidgen'))
else:
if not fguids[i]: # this guid doesn't exist
fguids[i] = commands.getoutput('uuidgen')
if fntag == "lfn":
# check for file size
try:
fsize[i]
except IndexError, e:
#print "This item doesn't exist"
fsize.insert(i, "")
else:
if not fsize[i]: # this fsize doesn't exist
fsize[i] = ""
# check for checksum
try:
checksum[i]
except IndexError, e:
#print "This item doesn't exist"
checksum.insert(i, "")
else:
if not checksum[i]: # this checksum doesn't exist
checksum[i] = ""
glist = glist + fguids
if fntag == "pfn":
#create the PoolFileCatalog.xml-like file in the workdir
fd = open(fname,"w")
fd.write('\n')
fd.write("\n")
fd.write('\n')
fd.write("\n")
for i in range(0,len(flist)):
fd.write(' \n'%(glist[i]))
fd.write(" \n")
fd.write(' \n'%(flist[i]))
fd.write(" \n")
fd.write(" \n")
fd.write("\n")
fd.close()
elif fntag=="lfn":
# create the metadata.xml-like file that's needed by dispatcher jobs
fd=open(fname,"w")
fd.write('\n')
fd.write("\n")
fd.write('\n')
fd.write("\n")
for i in range(0, len(flist)):
fd.write(' \n' % (glist[i]))
fd.write(" \n")
fd.write(' \n' % (flist[i]))
fd.write(" \n")
# add log file metadata later (not known yet)
if flist[i] == alog:
fd.write(' \n')
fd.write(' \n')
else:
if len(fsize) != 0:
fd.write(' \n' % (fsize[i]))
if len(checksum) != 0:
fd.write(' \n' %\
(SiteMover.getChecksumType(checksum[i]), checksum[i]))
fd.write(" \n")
fd.write("\n")
fd.close()
else:
tolog("fntag is neither lfn nor pfn, didn't create the XML file for output files")
def stageInPyModules(initdir, workdir):
""" copy pilot python modules into pilot workdir from condor initial dir """
if workdir and initdir: # both dir has valid value
for k in fileList:
if os.path.isfile("%s/%s" % (initdir, k)):
try:
shutil.copy2("%s/%s" % (initdir, k), workdir)
except Exception, e:
tolog("!!WARNING!!2999!! stageInPyModules failed to copy file %s/%s to %s: %s" % (initdir, k, workdir, str(e)))
else:
tolog("!!WARNING!!2999!! File missing during stage in: %s/%s" % (initdir, k))
def removePyModules(dir):
""" copy pilot python modules into pilot workdir
from condor initial dir """
if dir: # dir exists
for k in fileList:
try:
os.system("rm -rf %s/%s %s/*.pyc"%(dir,k,dir))
except:
pass
def setTimeConsumed(worknode, t_tuple):
"""
Use the NG formula (system+user in seconds)*CPU[MHz]*664/1400
to convert the time usage into unit of kSpecInt2000seconds
"""
t_tot = reduce(lambda x, y:x+y, t_tuple[2:3])
conversionFactor = worknode.cpu*664/1400/1000
cpuCU = "kSI2kseconds"
cpuCT = int(t_tot*conversionFactor)
return cpuCU, cpuCT, conversionFactor
def timeStamp():
''' return ISO-8601 compliant date/time format '''
tmptz = time.timezone
if tmptz > 0:
signstr = '-'
else:
signstr = '+'
tmptz_hours = int(tmptz/3600)
return str("%s%s%02d%02d" % (time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), signstr, tmptz_hours, int(tmptz/60-tmptz_hours*60)))
def timeStampUTC():
""" return UTC time stamp """
return time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(time.time()))
def getJobStatus(jobId, pshttpurl, psport):
"""
Return the current status of job from the dispatcher
typical dispatcher response: 'status=finished&StatusCode=0'
StatusCode 0: succeeded
10: time-out
30: failed
In the case of time-out, the dispatcher will be asked one more time after 10s
"""
from urllib import urlencode, urlopen
status = 'unknown'
StatusCode = -1
# par = urlencode({'ids': jobId})
nod = {}
nod['ids'] = jobId
# url = "https://gridui01.usatlas.bnl.gov:25443/server/panda/getStatus"
url = "%s:%s/server/panda/getStatus" % (pshttpurl, repr(psport))
# ask dispatcher about lost job status
trial = 1
max_trials = 2
while trial <= max_trials:
try:
# open connection
ret = httpConnect(nod, url)
response = ret[1]
# decode the response
# eg. var = ['status=notfound', 'attemptNr=0', 'StatusCode=0']
var = response.split("&")
# create a dictionary of the response (protects against future updates)
# eg. dic = {'status': 'activated', 'attemptNr': '0', 'StatusCode': '0'}
dic = {}
for i in range(len(var)):
key = var[i].split('=')[0]
dic[key] = var[i].split('=')[1]
status = dic['status'] # e.g. 'holding'
attemptNr = int(dic['attemptNr']) # e.g. '0'
StatusCode = int(dic['StatusCode']) # e.g. '0'
except Exception,e:
tolog("Could not interpret job status from dispatcher: %s, %s" % (response, str(e)))
status = 'unknown'
attemptNr = -1
StatusCode = -1
break
else:
if StatusCode == 0: # success
break
elif StatusCode == 10: # time-out
trial = trial + 1
time.sleep(10)
continue
else: # error
break
return status, attemptNr, StatusCode
def getExitCode(path, filename):
""" Try to read the exit code from the pilot stdout log """
ec = -1
from re import compile, findall
# first create a tmp file with only the last few lines of the status file to avoid
# reading a potentially long status file
tmp_file_name = "tmp-tail-dump-file"
try:
os.system("tail %s/%s >%s/%s" % (path, filename, path, tmp_file_name))
except Exception, e:
tolog("Job Recovery could not create tmp file %s: %s" % (tmp_file_name, str(e)))
else:
# open the tmp file and look for the pilot exit info
try:
tmp_file = open(tmp_file_name, 'r')
except IOError:
tolog("Job Recovery could not open tmp file")
else:
try:
all_lines = tmp_file.readlines()
except Exception, e:
tolog("Job Recovery could not read tmp file %s" % (tmp_file_name, str(e)))
tmp_file.close()
# remove the tmp file
try:
os.remove(tmp_file_name)
except OSError:
tolog("Job Recovery could not remove tmp file: %s" % tmp_file)
# now check the pilot exit info, if is has an exit code - remove the directory
exit_info = compile(r"job ended with \(trf,pilot\) exit code of \(\d+,\d+\)")
exitinfo_has_exit_code = False
for line in all_lines:
if findall(exit_info, line):
exitinfo_has_exit_code = True
return ec
def getRemainingOutputFiles(outFiles):
"""
Return list of files if there are remaining output files in the lost job data directory
"""
remaining_files = []
for file_name in outFiles:
if os.path.exists(file_name):
remaining_files.append(file_name)
return remaining_files
def remove(entries):
"""
Remove files and directories
entries should be a list
"""
status = True
# protect against wrong usage
if type(entries) == list:
if len(entries) > 0:
for entry in entries:
try:
os.system("rm -rf %s" %entry)
except OSError:
tolog("Could not remove %s" % entry)
status = False
else:
tolog("Argument has wrong type, expected list: %s" % str(type(entries)))
status = False
return status
def getCPUmodel():
""" Get cpu model and cache size from /proc/cpuinfo """
# model name : Intel(R) Xeon(TM) CPU 2.40GHz
# cache size : 512 KB
# gives the return string "Intel(R) Xeon(TM) CPU 2.40GHz 512 KB"
cpumodel = ''
cpucache = ''
cache = ''
modelstring = ''
try:
f = open('/proc/cpuinfo', 'r')
except Exception, e:
tolog("Could not open /proc/cpuinfo: %s" % str(e))
else:
re_model = re.compile('^model name\s+:\s+(\w.+)')
re_cache = re.compile('^cache size\s+:\s+(\d+ KB)')
# loop over all lines in cpuinfo
for line in f.readlines():
# try to grab cpumodel from current line
model = re_model.search(line)
if model:
# found cpu model
cpumodel = model.group(1)
# try to grab cache size from current line
cache = re_cache.search(line)
if cache:
# found cache size
cpucache = cache.group(1)
# stop after 1st pair found - can be multiple cpus
if cpumodel and cpucache:
# create return string
modelstring = cpumodel + " " + cpucache
break
f.close()
# default return string if no info was found
if not modelstring:
modelstring = "UNKNOWN"
return modelstring
def getExeErrors(startdir, fileName):
""" Extract exeErrorCode and exeErrorDiag from jobInfo.xml """
exeErrorCode = 0
exeErrorDiag = ""
# first check if the xml file exists (e.g. it doesn't exist for a test job)
import commands
findFlag = False
line = ""
# try to locate the file
out = commands.getoutput("find %s -name %s" % (startdir, fileName))
if out != "":
for line in out.split('\n'):
tolog("Found trf error file at: %s" % (line))
findFlag = True
break # just in case, but there should only be one trf error file
if findFlag:
if os.path.isfile(line):
# import the xml functions
from xml.sax import ContentHandler, make_parser
from xml.sax.handler import feature_namespaces
import JobInfoXML
# Create a parser
parser = make_parser()
# Tell the parser we are not interested in XML namespaces
parser.setFeature(feature_namespaces, 0)
# Create the handler
dh = JobInfoXML.JobInfoXML()
# Tell the parser to use our handler
parser.setContentHandler(dh)
# Parse the input
parser.parse(line)
# get the error code and the error message(s)
exeErrorCode = dh.getCode()
exeErrorDiag = dh.getMessage()
else:
tolog("Could not read trf error file: %s" % (line))
else:
tolog("Could not find trf error file %s in search path %s" % (fileName, startdir))
# only return a maximum of 250 characters in the error message (as defined in PandaDB)
return exeErrorCode, exeErrorDiag[:250]
def debugInfo(_str, tofile=True):
""" print the debug string to stdout"""
tolog("DEBUG: %s" % (_str), tofile=tofile)
def isBuildJob(outFiles):
"""
Check if the job is a build job
(i.e. check if the job only has one output file that is a lib file)
"""
isABuildJob = False
# outFiles only contains a single file for build jobs, the lib file
if len(outFiles) == 1:
# e.g. outFiles[0] = user.paulnilsson.lxplus001.lib._001.log.tgz
if outFiles[0].find(".lib.") > 0:
isABuildJob = True
return isABuildJob
def OSBitsCheck():
""" determine whether the platform is a 32 or 64-bit OS """
b = -1
try:
a = commands.getoutput('uname -a')
b = a.find('x86_64')
except:
return 32 # default
else:
if b == -1 : # 32 bit OS
return 32
else: # 64 bits OS
return 64
def uniqueList(input_list):
"""
return a list of unique entries
input_list = ['1', '1', '2'] -> ['1', '2']
"""
u = {}
for x in input_list:
u[x] = 1
return u.keys()
def diffLists(list1, list2):
"""
compare the input lists (len(list1) must be > len(list2))
and return the difference
"""
d = {}
for x in list1:
d[x] = 1
for x in list2:
if d.has_key(x):
del d[x]
return d.keys()
def getFileInfo(outputFiles, checksum_cmd, skiplog=False):
"""
Return lists with file sizes and checksums for the given output files
"""
fsize = []
checksum = []
# get error handler
error = PilotErrors()
pilotErrorDiag = ""
for file in outputFiles:
# add "" for the log metadata since it has not been created yet
if file.find(".log.") != -1 and skiplog:
ec = -1
else:
from SiteMover import SiteMover
ec, pilotErrorDiag, _fsize, _checksum = SiteMover.getLocalFileInfo(file, csumtype=checksum_cmd)
tolog("Adding %s,%s for file %s using %s" % (_fsize, _checksum, file, checksum_cmd))
if ec == 0:
fsize.append(_fsize)
checksum.append(_checksum)
else:
if ec == error.ERR_FAILEDMD5LOCAL or ec == error.ERR_FAILEDADLOCAL:
fsize.append(_fsize)
checksum.append("")
else:
fsize.append("")
checksum.append("")
if ec != -1: # skip error message for log
tolog("!!WARNING!!4000!! getFileInfo received an error from getLocalFileInfo for file: %s" % (file))
tolog("!!WARNING!!4000!! ec = %d, pilotErrorDiag = %s, fsize = %s, checksum = %s" %\
(ec, pilotErrorDiag, str(_fsize), str(_checksum)))
return fsize, checksum
def addMetadataForLog(fname, fsize, checksum, format=None, date=None):
"""
Add the fsize and checksum values for the log (left empty until this point)
Return exit code and xml
If format = NG, then the NorduGrid format of the metadata will be assumed
"""
ec = 0
lines = ""
try:
f = open(fname, 'r')
except Exception, e:
tolog("Failed to open metadata file: %s" % str(e))
ec = -1
else:
datetag = ""
newdatetag = ""
if format == 'NG':
metadata1 = ''
new_metadata1 = '%s' % (fsize)
datetag = ''
if date and date != "None":
_date = date
else:
_date = "1970-01-01 0:00:00"
newdatetag = '%s' % (_date)
else:
metadata1 = ''
new_metadata1 = '' % (fsize)
# find out if checksum or adler32 should be added
from SiteMover import SiteMover
csumtype = SiteMover.getChecksumType(checksum)
if format == 'NG':
if csumtype == "adler32":
metadata2 = ''
new_metadata2 = '%s' % (checksum)
else:
metadata2 = ''
new_metadata2 = '%s' % (checksum)
else:
if csumtype == "adler32":
metadata2 = ''
new_metadata2 = '' % (checksum)
else:
metadata2 = ''
new_metadata2 = '' % (checksum)
metadata3 = ''
for line in f.readlines():
newline = ""
if line.find(metadata1) != -1:
newline = line.replace(metadata1, new_metadata1)
lines += newline
elif line.find(metadata2) != -1:
newline = line.replace(metadata2, new_metadata2)
lines += newline
elif line.find(metadata3) != -1:
newline = line.replace(metadata3, new_metadata2)
lines += newline
elif line.find('csumtypetobeset') != -1:
newline = line.replace()
else:
lines += line
f.close()
try:
f = open(fname, 'w')
f.write(lines)
f.close()
except Exception, e:
tolog("Failed to write new metadata for log: %s" % str(e))
ec = -1
return ec, lines
def removeFiles(dir, fileList):
"""
Remove the input files from the work dir
"""
ec = 0
found = 0
for file in fileList:
if os.path.isfile("%s/%s" % (dir, file)):
try:
os.remove("%s/%s" % (dir, file))
except Exception, e:
tolog("Failed to remove file: %s/%s" % (dir, file))
ec = 1
else:
tolog("Removed file: %s/%s" % (dir, file))
found += 1
if found > 0:
tolog("Removed %d/%d input file(s)" % (found, len(fileList)))
else:
tolog("Input files already removed (not found)")
return ec
def createPoolFileCatalog(file_list):
"""
Create the PoolFileCatalog.xml
file_list = { guid1 : sfn1, ... }
Adapted from R. Walker's code
"""
outxml = ''
if len(file_list) == 0:
tolog('No input files so no PFC created')
else:
dom = minidom.getDOMImplementation()
doctype = dom.createDocumentType("POOLFILECATALOG","","InMemory")
doc = dom.createDocument(None, "POOLFILECATALOG", doctype)
root = doc.documentElement
doc.appendChild(root)
# Prepare plain text as can`t trust minidom on python <2.3
pfc_text = '\n'
pfc_text += '\n'
pfc_text += '\n'
pfc_text += '\n'
# Strip .N because stagein makes soft link, and params have no .N
for guid in file_list.keys():
sfn = file_list[guid]
ftype='ROOT_All'
file = doc.createElement("File")
file.setAttribute('ID', guid)
root.appendChild(file)
# physical element - file in local directory without .N extension
physical = doc.createElement("physical")
file.appendChild(physical)
pfn = doc.createElement('pfn')
pfn.setAttribute('filetype', ftype)
pfn.setAttribute('name', sfn)
physical.appendChild(pfn)
# logical element - empty
logical = doc.createElement('logical')
file.appendChild(logical)
pfc_text += ' \n \n \n \n \n \n' %\
(guid, ftype, sfn)
pfc_text += '\n'
tolog(str(doc.toxml()))
tolog(pfc_text)
f = open('PoolFileCatalog.xml', 'w')
f.write(pfc_text)
f.close()
outxml = pfc_text
return outxml
def replace(filename, stext, rtext):
""" replace string stext with rtext in file filename """
status = True
try:
input = open(filename, "r")
except Exception, e:
tolog("!!WARNING!!4000!! Open failed with %s" % str(e))
status = False
else:
try:
output = open(filename + "_tmp", "w")
except Exception, e:
tolog("!!WARNING!!4000!! Open failed with %s" % str(e))
status = False
input.close()
else:
for s in input.xreadlines():
output.write(s.replace(stext, rtext))
input.close()
# rename tmp file and overwrite original file
try:
os.rename(filename + "_tmp", filename)
except Exception, e:
tolog("!!WARNING!!4000!! Rename failed with %s" % str(e))
status = False
output.close()
return status
def dumpFile(filename, topilotlog=False):
""" dump a given file to stdout or to pilotlog """
if os.path.exists(filename):
try:
f = open(filename, "r")
except Exception, e:
tolog("!!WARNING!!4000!! Exception caught: %s" % str(e))
else:
for line in f.readlines():
line = line.rstrip()
if topilotlog:
tolog("%s" % (line))
else:
print "%s" % (line)
f.close()
else:
tolog("!!WARNING!!4000!! %s does not exist" % (filename))
def getDirectAccessDic(qdata):
""" return the directAccess dictionary in case the site supports direct access """
# task: create structure
# directAccess = {
# 'oldPrefix' : 'gsiftp://osgserv04.slac.stanford.edu/xrootd/atlas',
# 'newPrefix' : 'root://atl-xrdr.slac.stanford.edu:1094//atlas/xrootd',
# 'useCopyTool' : False,
# 'directIn' : True
# }
# from queuedata variable copysetup
# copysetup = setup_string^oldPrefix^newPrefix^useCopyTool^directIn
# example:
# copysetup=^gsiftp://osgserv04.slac.stanford.edu/xrootd/atlas^root://atl-xrdr.slac.stanford.edu:1094//atlas/xrootd^False^True
# (setup_string not used)
# (all cases tested)
# qdata = 'whatever^gsiftp://osgserv04.slac.stanford.edu/xrootd/atlas^root://atl-xrdr.slac.stanford.edu:1094//atlas/xrootd^False^True'
# qdata = '^gsiftp://osgserv04.slac.stanford.edu/xrootd/atlas^root://atl-xrdr.slac.stanford.edu:1094//atlas/xrootd^False^True'
# qdata = 'gsiftp://osgserv04.slac.stanford.edu/xrootd/atlas^root://atl-xrdr.slac.stanford.edu:1094//atlas/xrootd^False^True'
# qdata = '' or 'whatever'
directAccess = None
if qdata.find('^') > -1:
n = qdata.count('^')
i = 0
# protect against a forgotten inital ^ in case the setup_string is empty!
if n == 3 or n == 4:
# read data
data = qdata.split('^')
if n == 3:
i = -1
else:
copysetup = data[0]
i += 1
oldPrefix = data[i]
i += 1
newPrefix = data[i]
i += 1
if data[i].lower() == 'true':
useCopyTool = True
else:
useCopyTool = False
i += 1
if data[i].lower() == 'true':
directIn = True
else:
directIn = False
# create structure
directAccess = {
'oldPrefix': oldPrefix,
'newPrefix': newPrefix,
'useCopyTool': useCopyTool,
'directIn': directIn
}
tolog("directAccess: %s" % str(directAccess))
else:
tolog("!!WARNING!!4000!! copysetup has wrong format (should contain data separated by 4 ^ signs): %s" % (qdata))
else:
# do nothing, don't care about the copysetup right now (only later in Mover)
# copysetup = qdata
pass
return directAccess
def getErrors(filename):
""" get all !!TEXT!!NUMBER!!... errors from file """
from re import compile, findall
ret = ""
try:
f = open(filename)
lines = f.readlines()
f.close()
except Exception, e:
tolog("!!WARNING!!4000!! could not open/read file: %s" % str(e))
else:
p = r"!!(\S+)!!\d+!!"
pattern = compile(p)
for line in lines:
if findall(pattern, line):
ret += line
return ret
def getLFN(pfn, lfnlist):
"""
get the local file name from the xml, and ignore any trailing __DQ2-parts
e.g. HITS.017771._00188.pool.root__DQ2-1200097946 -> HITS.017771._00188.pool.root
"""
lfn = ""
for lfn in lfnlist:
bpfn = os.path.basename(pfn)
if bpfn[:len(lfn)] == lfn:
break
return lfn
def makeTransRegReport(all_transferred, some_transferred, latereg, nr_transferred, nr_files, fail, ec, ret, fields):
""" make the transfer and registration report """
error = PilotErrors()
tolog("")
tolog("..Transfer and registration report.........................................................................")
tolog(". Mover has finished")
if all_transferred and not latereg:
if nr_files > 1:
tolog(". All (%d) files have been transferred and registered" % (nr_files))
else:
tolog(". The single file has been transferred and registered")
elif all_transferred and latereg:
if nr_files > 1:
tolog(". All (%d) files have been transferred but not registered" % (nr_files))
tolog(". The files will be registrered by a later pilot if job recovery is supported,")
else:
tolog(". The single file has been transferred but not registered")
tolog(". The file will be registrered by a later pilot if job recovery is supported,")
tolog(". otherwise this job will fail")
elif some_transferred and latereg:
tolog(". Some files (%d/%d) were transferred but no file was registered" %\
(nr_transferred, nr_files))
tolog(". The remaining files will be transferred and registrered by a later pilot if job recovery is supported,")
tolog(". otherwise this job will fail")
elif some_transferred and not latereg:
tolog(". Some files (%d/%d) were transferred and registered" %\
(nr_transferred, nr_files))
tolog(". The remaining files will be transferred and registrered by a later pilot if job recovery is supported,")
tolog(". otherwise this job will fail")
elif not some_transferred:
tolog(". No files (%d/%d) were transferred or registered" %\
(nr_transferred, nr_files))
if nr_files > 1:
tolog(". The files will be transferred and registrered by a later pilot if job recovery is supported,")
else:
tolog(". The file will be transferred and registrered by a later pilot if job recovery is supported,")
tolog(". otherwise this job will fail")
else:
tolog(". Mover has finished")
if fail != 0:
tolog(". File transfer exit code : (%d, %s)" %\
(fail, error.getErrorStrVerbose(fail)))
else:
tolog(". File transfer exit code : (%d, )" % (fail))
if some_transferred:
tolog(". File registration return values : (%d, %s, %s)" %\
(ec, error.getErrorStrVerbose(ec), str(ret)))
tolog(". Put function will return fields : %s" % str(fields))
tolog(". Transfer and registration report produced at : %s" % timeStamp())
tolog("...........................................................................................................")
tolog("")
def hasBeenTransferred(fields):
""" determine whether files were successfully transferred """
status = False
s = 0
# the fields will all be empty if no files were transferred
for field in fields:
s += len(field)
if s > 0:
status = True
return status
def removeSRMInfo(f0):
""" remove any SRM info from the f0 string """
from SiteMover import SiteMover
fields0 = ""
for pfns in f0.split("+"):
if pfns != "":
pfns = SiteMover.stripPortAndVersion(pfns)
fields0 += "%s+" % (pfns)
# remove any trailing +-sign
if fields0[-1] == "+":
fields0 = fields0[:-1]
if fields0 == "":
fields0 = f0
if f0 != fields0:
tolog("removeSRMInfo() has updated %s to %s" % (f0, fields0))
return fields0
def registerFiles(fields, ub=None):
"""
register all files in the LRC
"""
error = PilotErrors()
ret = '1'
if ub != "None" and ub != None and ub != "": # ub is 'None' outside the US
# find out if checksum or adler32 should be added
from SiteMover import SiteMover
_checksum = fields[4].split("+")[0] # first one, assume same type for the rest
if len(_checksum) > 0:
csumtype = SiteMover.getChecksumType(_checksum)
else:
csumtype = CMD_CHECKSUM # use default (md5sum)
# remove any srm info from the pfns
#fields[0] = removeSRMInfo(fields[0])
if csumtype == "adler32":
params = urllib.urlencode({'pfns': fields[0], 'lfns': fields[1], 'guids': fields[2], 'fsizes': fields[3],\
'md5sums': '', 'adler32s': fields[4], 'archivals': fields[5]})
else:
params = urllib.urlencode({'pfns': fields[0], 'lfns': fields[1], 'guids': fields[2], 'fsizes': fields[3],\
'md5sums': fields[4], 'adler32s': '', 'archivals': fields[5]})
try:
url = ub + '/lrc/files'
if url.find('//lrc') > 0:
url = url.replace('//lrc','/lrc')
tolog("Will send params: %s" % str(params))
tolog("Trying urlopen with: %s" % (url))
f = urllib.urlopen(url, params)
except Exception, e:
tolog("!!WARNING!!4000!! Unexpected exception: %s" % str(e))
return error.ERR_DDMREG, str(e)
else:
ret = f.read()
if ret != '1':
ret = ret.replace('\n', ' ')
tolog('!!WARNING!!4000!! LRC registration error: %s' % str(ret))
tolog('!!WARNING!!4000!! LRC URL requested: %s' % f.geturl())
if ret == 'LFNnonunique':
return error.ERR_DDMREGDUP, ret
elif ret.find("guid-metadata entry already exists") >= 0:
return error.ERR_GUIDSEXISTSINLRC, ret
else:
return error.ERR_DDMREG, ret
return 0, ret
def lateRegistration(ub, job, type="unknown"):
""" late registration used by the job recovery """
# function will return True if late registration has been performed, False if it failed
# and None if there is nothing to do
status = False
latereg = False
fields = None
# protect against old jobState files which may not have the new variables
try:
tolog("type: %s" % (type))
if type == "output":
if job.output_latereg == "False":
latereg = False
else:
latereg = True
fields = job.output_fields
elif type == "log":
if job.log_latereg == "False":
latereg = False
else:
latereg = True
fields = job.log_field
else:
tolog("!!WARNING!!4000!! Unknown id type for registration: %s" % (type))
tolog("!!WARNING!!4000!! Skipping late registration step")
pass
except Exception, e:
tolog("!!WARNING!!4000!! Late registration has come upon an old jobState file - can not perform this step: %s" % str(e))
pass
else:
tolog("latereg: %s" % str(latereg))
tolog("fields: %s" % str(fields))
# should late registration be performed?
if latereg:
ec, ret = registerFiles(fields, ub=ub)
if ec == 0:
tolog("registerFiles done")
status = True
else:
tolog("!!WARNING!!4000!! File registration returned: (%d, %s)" % (ec, ret))
if not latereg:
tolog("Nothing to register (%s)" % (type))
return None
else:
return status
def isAnalJob(trf, prodSourceLabel=""):
""" Determine whether the job is an analysis job or not """
if (trf.startswith('https://') or trf.startswith('http://')) and prodSourceLabel != "software":
analJob = True
else:
analJob = False
return analJob
def getPayloadName(job):
""" figure out a suitable name for the payload """
jobtrf = job.trf.split(",")[0]
if jobtrf.find("panda") > 0 and jobtrf.find("mover") > 0:
name = "pandamover"
elif jobtrf.find("Athena") > 0 or jobtrf.find("trf") > 0:
name = "athena"
else:
if isBuildJob(job.outFiles):
name = "buildjob"
else:
name = "payload"
return name
def getSiteSpecs():
""" Get available releases (code adapted from Torre) """
if os.environ.has_key("VO_ATLAS_SW_DIR"):
scappdir = readpar('appdir')
if scappdir != "":
if os.path.exists(os.path.join(scappdir, 'software/releases')):
relbase = os.path.join(scappdir, 'software/releases')
elif os.path.exists(os.path.join(scappdir, 'software')):
relbase = os.path.join(scappdir, 'software')
else:
relbase = scappdir
else:
tolog("VO_ATLAS_SW_DIR area: %s" % (os.environ["VO_ATLAS_SW_DIR"]))
relbase = "%s/software" % os.environ["VO_ATLAS_SW_DIR"]
elif os.environ.has_key("OSG_APP"):
scappdir = readpar('appdir')
if scappdir != "":
appdir = scappdir
else:
appdir = os.environ["OSG_APP"]
tolog("OSG app area: %s" % (appdir))
cmd = "/bin/ls -alL %s" % (appdir)
tolog("Executing command: %s" % (cmd))
s = commands.getoutput(cmd)
tolog("Output: %s" % (s))
if appdir[-len('atlas_app/atlas_rel'):] == "atlas_app/atlas_rel":
# appdir already contains the release string, don't add it again
relbase = appdir
else:
relbase = "%s/atlas_app/atlas_rel" % (appdir)
elif os.environ.has_key("APP"):
tolog("APP area: %s" % (os.environ["APP"]))
relbase = "%s/atlas_app/atlas_rel" % os.environ["APP"]
else:
tolog("No software directory found")
relbase = ''
if relbase != '':
tolog("Looking for releases in: %s" % (relbase))
cmd = "/bin/ls -alL " + relbase
tolog("Executing command: %s" % (cmd))
s = commands.getoutput(cmd)
tolog("Output: %s" % (s))
releases = ''
dirlist = commands.getoutput("/bin/ls -1 " + relbase)
dirlist = dirlist.split('\n')
for f in dirlist:
pat = re.compile('^[0-9]+\.[0-9\.]+[0-9a-zA-Z\.\_]*')
mat = pat.match(f)
if mat:
if os.path.exists(relbase + '/' + f + '/cmtsite/setup.sh'):
if releases != '':
releases += '|'
releases += 'Atlas-%s' % f
return releases
else:
return ''
def setUnsetVars(thisSite):
""" set pilot variables in case they have not been set by the pilot launcher """
# thisSite will be updated and returned
tolog('Setting unset pilot variables using queuedata')
if thisSite.appdir == "":
scappdir = readpar('appdir')
if os.environ.has_key("OSG_APP"):
if scappdir == "":
scappdir = os.environ["OSG_APP"]
if scappdir == "":
scappdir = "/usatlas/projects/OSG"
tolog('!!WARNING!!4000!! appdir not set in queuedata or $OSG_APP: using default %s' % (scappdir))
else:
tolog('!!WARNING!!4000!! appdir not set in queuedata - using $OSG_APP: %s' % (scappdir))
if scappdir.find('/atlas_app/atlas_rel'):
tolog('Temporarily removing string /atlas_app/atlas_rel from appdir')
scappdir = scappdir.replace('/atlas_app/atlas_rel','')
tolog('appdir: %s' % (scappdir))
thisSite.appdir = scappdir
if thisSite.dq2url == "":
_dq2url = readpar('dq2url')
if _dq2url == "":
tolog('Note: dq2url not set')
else:
tolog('dq2url: %s' % (_dq2url))
thisSite.dq2url = _dq2url
if thisSite.wntmpdir == "":
_wntmpdir = readpar('wntmpdir')
# if _wntmpdir.startswith("$"):
# _wntmpdir = commands.getoutput("echo %s" % (_wntmpdir))
# elif _wntmpdir == "":
if _wntmpdir == "":
_wntmpdir = thisSite.workdir
tolog('!!WARNING!!4000!! wntmpdir not set - using site workdir: %s' % (_wntmpdir))
tolog('wntmpdir: %s' % (_wntmpdir))
thisSite.wntmpdir = _wntmpdir
return thisSite
def findPythonInRelease(siteroot, atlasRelease):
"""
Set the python executable in the release dir (LCG sites only)
"""
tolog("Trying to find a python executable for release: %s" % (atlasRelease))
scappdir = readpar('appdir')
if scappdir != "":
if os.path.exists(os.path.join(scappdir, 'software/releases')):
_swbase = os.path.join(scappdir, 'software/releases')
elif os.path.exists(os.path.join(scappdir, 'software')):
_swbase = os.path.join(scappdir, 'software')
else:
_swbase = scappdir
cmd = "source %s/%s/setup.sh;" % (_swbase, atlasRelease)
else:
cmd = "source $VO_ATLAS_SW_DIR/software/%s/setup.sh;" % (atlasRelease)
cmd += "source %s/AtlasOffline/%s/AtlasOfflineRelease/cmt/setup.sh" % (siteroot, atlasRelease)
py = ""
tolog("Executing command: %s" % (cmd))
rc, rs = commands.getstatusoutput(cmd)
if rc == 0:
cmd += ";which python"
rs = commands.getoutput(cmd)
if rs.startswith('/'):
tolog("Found: %s" % (rs))
py = rs
else:
if '\n' in rs:
rs = rs.split('\n')[-1]
if rs.startswith('/'):
tolog("Found: %s" % (rs))
py = rs
else:
tolog("!!WARNING!!4000!! No python executable found in release dir: %s" % (rs))
else:
tolog("!!WARNING!!4000!! Command failed: %d, %s" % (rc, rs))
return py
def stringToFields(jobFields):
""" Convert a jobState string to a fields array """
jobFields = jobFields.replace('[','').replace(']','')
jobFields = jobFields.replace("\'","")
rf = jobFields.split(',')
fields = []
for f in rf:
fields += [f.strip()]
return fields
#def getpar(par, s):
# """ extract par from s """
#
# pars = s.split('|')
# for p in pars:
# pat = re.compile(".*%s=(.*)" % (par))
# mat = pat.match(p)
# if mat:
# return mat.group(1)
# return ''
def getpar(par, s):
""" extract par from s """
matches = re.findall("(^|\|)([^\|=]+)=",s)
for tmp,tmpPar in matches:
if tmpPar == par:
patt = "%s=(.*)" % par
idx = matches.index((tmp,tmpPar))
if idx+1 == len(matches):
patt += '$'
else:
patt += "\|%s=" % matches[idx+1][1]
mat = re.search(patt,s)
if mat:
return mat.group(1)
else:
return ''
return ''
def readpar(par):
""" read par from queuedata """
fh = open("%s/queuedata.dat" % (os.environ['PilotHomeDir']))
queuedata = fh.read()
fh.close()
return getpar(par, queuedata)
def getBatchSystemJobID():
""" return the batch system job id (will be reported to the server) """
id = ""
# BQS (e.g. LYON)
if os.environ.has_key("BQSCLUSTER"):
id = os.environ["BQSCLUSTER"]
return "BQS", id
# Torque
if os.environ.has_key("PBS_JOBID"):
id = os.environ["PBS_JOBID"]
return "Torque", id
# LSF
if os.environ.has_key("LSB_JOBID"):
id = os.environ["LSB_JOBID"]
return "LSF", id
# Sun's Grid Engine
if os.environ.has_key("JOB_ID"):
id = os.environ["JOB_ID"]
return "Grid Engine", id
# Condor (variable sent through job submit file)
if os.environ.has_key("clusterid"):
id = os.environ["clusterid"]
return "Condor", id
# # Condor (id unknown)
# if os.environ.has_key("_CONDOR_SCRATCH_DIR"):
# id = "(unknown clusterid)"
# return "Condor", id
return None, id
def touch(filename):
""" touch a file """
if not os.path.isfile(filename):
try:
os.system("touch %s" % (filename))
except exception, e:
tolog("!!WARNING!!1000!! Failed to touch file: %s" % str(e))
else:
tolog("Lock file created: %s" % (filename))
def createLockFile(jobrec, workdir, lockfile="LOCKFILE"):
"""
Site workdir protection to prevent the work dir from being deleted by the cleanup
function if pilot fails to register the log
"""
# only try to create a lock file if it doesn't exist already
# do not bother to create it if the site doesn't allow for job recovery
f = "%s/%s" % (workdir, lockfile)
if lockfile == "LOCKFILE":
if jobrec:
touch(f)
else:
touch(f)
def verifyTransfer(workdir, verbose=True):
""" verify that all files were transferred by checking the existance of the ALLFILESTRANSFERRED lockfile """
status = False
fname = "%s/ALLFILESTRANSFERRED" % (workdir)
if os.path.exists(fname):
if verbose:
tolog("Verified: %s" % (fname))
status = True
else:
if verbose:
tolog("Transfer verification failed: %s (file does not exist)" % (fname))
return status
def removeLEDuplicates(logMsg):
""" identify duplicated messages in the log extracts and remove them """
# first create a new log extracts list that does not have the time stamps
# (which will be different for the otherwise different messages)
# E.g.:
# 31 Mar 2008 01:32:37| !!WARNING!!1999!! Could not read modification time of ESD.020072._04023.pool.root.9
# 31 Mar 2008 02:03:08| !!WARNING!!1999!! Could not read modification time of ESD.020072._04023.pool.root.9
# should only be printed once,
# 31 Mar 2008 01:32:37| !!WARNING!!1999!! Could not read modification time of ESD.020072._04023.pool.root.9
log_extracts_list = logMsg.split('\n')
# create a temporary list with stripped timestamp fields
log_extracts_tmp = []
pattern = re.compile(r"(\d+ [A-Za-z]+ \d+ \d+:\d+:\d+\|)")
for line in log_extracts_list:
# id the time stamp
found = re.findall(pattern, line)
if len(found) > 0:
# remove any time stamp
line = line.replace(found[0], '')
log_extracts_tmp.append(line)
# remove duplicate lines and create an index list to know where the original line was
# (we want to bring back the timestamp)
# do not use dictionaries since they are not sorted
i = 0
log_extracts_index = []
log_extracts_tmp2 = []
for line in log_extracts_tmp:
if line not in log_extracts_tmp2:
log_extracts_index.append(i)
log_extracts_tmp2.append(line)
i += 1
# create the final list
log_extracts_tmp = []
for index in log_extracts_index:
log_extracts_tmp.append(log_extracts_list[index])
# return the stripped logMsg
return "\n".join(log_extracts_tmp)
def replaceQueuedataField(field, value, verbose=True):
""" replace a given queuedata field with a new value """
# copytool = -> lcgcp
# replaceQueuedataField("copytool", "lcgcp")
status = True
queuedata_filename = "%s/queuedata.dat" % (os.environ['PilotHomeDir'])
stext = field + "=" + readpar(field)
rtext = field + "=" + value
if replace(queuedata_filename, stext, rtext):
if verbose:
tolog("Successfully changed %s to: %s" % (field, value))
else:
tolog("!!WARNING!!1999!! Failed to change %s to: %s" % (field, value))
status = False
return status
def verifyCMTCONFIG(releaseCommand):
""" make sure that the required CMTCONFIG match that of the local system """
status = True
pilotErrorDiag = ""
cmd = commands.getoutput("%s;echo $CMTCONFIG" % (releaseCommand))
if cmd.find("\n") >= 0:
# in case the releaseCommand gives info as well, get rid of it
cmd = cmd.split("\n")[-1]
cmtconfig_local = cmd
if cmtconfig_local == "":
cmtconfig_local = "local cmtconfig not set"
cmtconfig_required = readpar('cmtconfig')
if cmtconfig_required == "":
cmtconfig_required = "required cmtconfig not set"
# does the required CMTCONFIG match that of the local system?
if cmtconfig_required != cmtconfig_local:
pilotErrorDiag = "Required CMTCONFIG (%s) incompatible with that of local system (%s)" %\
(cmtconfig_required, cmtconfig_local)
tolog("!!FAILED!!2990!! %s" % (pilotErrorDiag))
status = False
return status, pilotErrorDiag
def writeExitCode(workdir, ec):
""" write exit code ec to file EXITCODE """
fname = "%s/EXITCODE" % (workdir)
try:
f = open(fname, "w")
except Exception, e:
tolog("!!WARNING!!2990!! Could not open: %s, %s" % (fname, str(e)))
else:
f.write("%d" % (ec))
f.close()
tolog("Wrote exit code %d to file: %s" % (ec, fname))
def readExitCode(workdir):
""" read exit code from file /EXITCODE """
ec = 0
fname = "%s/EXITCODE" % (workdir)
if os.path.exists(fname):
try:
f = open(fname, "r")
except Exception, e:
tolog("Failed to open %s: %s" % (fname, str(e)))
else:
ec = int(f.read())
tolog("Found exit code: %d" % (ec))
f.close()
else:
tolog("No exit code to report to wrapper (file %s does not exist)" % (fname))
return ec
def evaluateQueuedata():
""" evaluate environmental variables if used and replace the value in the queuedata """
# the following fields are allowed to contain environmental variables
fields = ["copysetup", "copysetupin", "recoverdir", "wntmpdir", "sepath", "seprodpath", "lfcpath", "lfcprodpath"]
# process each field and evaluate the environment variables if present
for field in fields:
# grab the field value and split it since some fields can contain ^-separators
old_values = readpar(field)
new_values = []
for value in old_values.split("^"):
if value.startswith("$"):
# evaluate the environmental variable
new_value = commands.getoutput("echo %s" % (value))
if new_value == "":
tolog("!!WARNING!!2999!! Environmental variable not set: %s" % (value))
value = new_value
new_values.append(value)
# rebuild the string (^-separated if necessary)
new_values_joined = '^'.join(new_values)
# replace the field value in the queuedata with the new value
if new_values_joined != old_values:
if replaceQueuedataField(field, new_values_joined, verbose=False):
tolog("Updated field %s in queuedata (replaced \'%s\' with \'%s\')" % (field, old_values, new_values_joined))
def verifyQueuedata(queuename, filename, _i, _N, url):
""" check if the downloaded queuedata has the proper format """
hasQueuedata = False
try:
f = open(filename, "r")
except Exception, e:
tolog("!!WARNING!!1999!! Open failed with %s" % str(e))
else:
output = f.read()
f.close()
try:
output_split = output.split('=')[0]
except Exception, e:
output_split = "split failed: %s" % str(e)
if output_split != 'appdir':
if len(output_split) == 0:
tolog("!!WARNING!!1999!! curl command returned empty queuedata (wrong queuename %s?)" % (queuename))
else:
tolog("!!WARNING!!1999!! Attempt %d/%d: curl command did not return valid queuedata from config DB server %s" %\
(_i, _N, url))
output = output.replace('\n', '')
output = output.replace(' ', '')
tolog("!!WARNING!!1999!! Output begins with: %s" % (output[:64]))
try:
os.remove(filename)
except Exception, e:
tolog("!!WARNING!!1999!! Failed to remove file %s: %s" % (filename, str(e)))
else:
# found valid queuedata info, break the for-loop
tolog("schedconfigDB returned: %s" % (output))
hasQueuedata = True
return hasQueuedata
def getQueuedata(queuename, sitename):
""" Download the queuedata if not already downloaded """
# read the queue parameters and create the queuedata.dat file (if the queuename varible is set)
# pilot home dir is not the workdir. It's the sandbox directory for the pilot.
# queuedata structure: (with example values)
# appdir=/ifs0/osg/app/atlas_app/atlas_rel
# dq2url=http://gk03.swt2.uta.edu:8000/dq2/
# copytool=gridftp
# copysetup=
# ddm=UTA_SWT2
# se=http://gk03.swt2.uta.edu:8000/dq2/
# sepath=
# envsetup=
# region=US
# copyprefix=
# lfcpath=
# lfchost=
# sein=
# wntmpdir=/scratch
os.environ['PilotHomeDir'] = commands.getoutput('pwd')
hasQueuedata = False
# try the config servers one by one in case one of them is not responding
filename = '%s/queuedata.dat' % (os.environ['PilotHomeDir'])
if os.path.exists(filename):
tolog("Queuedata has already been downloaded by pilot wrapper script (will confirm validity)")
hasQueuedata = verifyQueuedata(queuename, filename, 1, 1, "(see batch log for url)")
if hasQueuedata:
tolog("Queuedata was successfully downloaded by pilot wrapper script")
else:
tolog("Queuedata was not downloaded successfully by pilot wrapper script, will try again")
if not hasQueuedata:
global chttpurl
# shuffle list so same cache is not hit by all jobs
from random import shuffle
shuffle(chttpurl)
_i = 0
_N = len(chttpurl)
ret = -1
for url in chttpurl:
_i += 1
if 'ANALY_' in sitename:
cmd = 'curl --connect-timeout 20 --max-time 120 -sS "%s:25880/server/pandamon/query?tpmes=pilotpars&siteid=%s" > %s'%\
(url, sitename, filename)
else:
cmd = 'curl --connect-timeout 20 --max-time 120 -sS "%s:25880/server/pandamon/query?tpmes=pilotpars&queue=%s" > %s'%\
(url, queuename, filename)
tolog("Executing command: %s" % (cmd))
try:
# output will be empty since we pipe into a file
ret, output = commands.getstatusoutput(cmd)
except Exception, e:
tolog("!!WARNING!!1999!! Failed with curl command: %s" % str(e))
return -1, False
else:
if ret == 0:
# read back the queuedata to verify its validity
hasQueuedata = verifyQueuedata(queuename, filename, _i, _N, url)
if hasQueuedata:
break
else:
tolog("!!WARNING!!1999!! curl command exited with code %d" % (ret))
if hasQueuedata:
# evaluate environmental variables if used
evaluateQueuedata()
return 0, hasQueuedata
def postProcessQueuedata(queuename, thisSite, jobrec):
""" update queuedata fields if necessary """
#if thisSite.sitename == "UTA_PAUL_TEST":
# ec = replaceQueuedataField("copytool", "cp")
#if thisSite.sitename == "RAL":
# ec = replaceQueuedataField("copytoolin", "rfcpsvcclass")
# ec = replaceQueuedataField("status", "online")
#if thisSite.sitename == "TRIUMF_REPRO":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("copyprefixin", "srm://srm.triumf.ca/^dummy")
# ec = replaceQueuedataField("copytool", "lcgcp")
# ec = replaceQueuedataField("copytoolin", "lcgcp")
# ec = replaceQueuedataField("setokens", "")
# srm://dcsrm.usatlas.bnl.gov:8443/srm/managerv2?SFN=/pnfs/usatlas.bnl.gov/atlasuserdisk/
#if thisSite.sitename == "BNL_ATLAS_test" or thisSite.sitename == "ANALY_BNL_test":
#if thisSite.sitename == "BNL_ATLAS_test":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("retry", "true")
# ec = replaceQueuedataField("copysetup", "/afs/usatlas.bnl.gov/i386_redhat72/opt/dcache/dcache_client_config.sh^srm://dcsrm.usatlas.bnl.gov(:[0-9]+)*(/srm/managerv2?SFN=)*/^dcache:/^False^False")
# ec = replaceQueuedataField("copytool", "lcg-cp2")
# ec = replaceQueuedataField("se", "srm://dcsrm.usatlas.bnl.gov")
# ec = replaceQueuedataField("seprodpath", "/pnfs/usatlas.bnl.gov")
# ec = replaceQueuedataField("lfchost", "lfc.usatlas.bnl.gov")
# ec = replaceQueuedataField("lfcpath", "/grid/atlas/dq2")
# ec = replaceQueuedataField("copyprefix", "^srm://dcsrm.usatlas.bnl.gov")
#if thisSite.sitename == "NIKHEF-ELPROD":
# ec = replaceQueuedataField("seopt", "srm://srm.grid.sara.nl:8443/srm/managerv2?SFN=,srm://tbn18.nikhef.nl:8446/srm/managerv2?SFN=,srm://srm.grid.sara.nl:8443/srm/managerv2?SFN=,srm://srm.grid.sara.nl:8443/srm/managerv2?SFN=")
# ec = replaceQueuedataField("seprodpath","/pnfs/grid.sara.nl/data/atlas/atlasmcdisk/,/dpm/nikhef.nl/home/atlas/atlasdatadisk/,/pfns/grid.sara.nl/data/atlas/atlasdatatape/,/pnfs/grid.sara.nl/data/atlas/atlasmctape/")
#if thisSite.sitename == "HU_ATLAS_Tier2":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("copytool", "lsm")
# ec = replaceQueuedataField("ddm", "NET2_PRODDISK")
# ec = replaceQueuedataField("setokens", "ATLASPRODDISK")
# ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://atlas.bu.edu:8443/srm/v2/server?SFN=")
# ec = replaceQueuedataField("seprodpath", "/gpfs1/atlasproddisk")
#if thisSite.sitename == "BU_ATLAS_Tier2o" or thisSite.sitename == "HU_ATLAS_Tier2":
# ec = replaceQueuedataField("status", "online")
## ec = replaceQueuedataField("ddm", "NET2_PRODDISK")
# ec = replaceQueuedataField("copytool", "lsm")
# ec = replaceQueuedataField("lfchost", "heroatlas.fas.harvard.edu")
## ec = replaceQueuedataField("lfcpath", "/grid/atlas/users/pathena")
# ec = replaceQueuedataField("lfcprodpath", "/grid/atlas/dq2")
# ec = replaceQueuedataField("copyprefix", "^srm://atlas.bu.edu")
#if thisSite.sitename == "ANALY_NET2" or thisSite.sitename == "ANALY_HU_ATLAS_Tier2":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("ddm", "NET2_USERDISK")
# ec = replaceQueuedataField("copytool", "lsm")
# ec = replaceQueuedataField("lfchost", "heroatlas.fas.harvard.edu")
# ec = replaceQueuedataField("lfcpath", "/grid/atlas/users/pathena")
# ec = replaceQueuedataField("lfcprodpath", "/grid/atlas/users/pathena")
# ec = replaceQueuedataField("copyprefix", "^srm://atlas.bu.edu")
# ec = replaceQueuedataField("envsetup", "source /atlasgrid/osg-wn-1.0.0/setup.sh")
# ec = replaceQueuedataField("setokens", "ATLASPRODDISK")
# ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://atlas.bu.edu:8443/srm/v2/server?SFN=")
# ec = replaceQueuedataField("seprodpath", "/gpfs1/atlasproddisk")
#if thisSite.sitename == "ANALY_SLAC":
# ec = replaceQueuedataField("retry", "false")
# ec = replaceQueuedataField("ddm", "SLACXRD_USERDISK")
# ec = replaceQueuedataField("setokens", "ATLASUSERDISK")
# ec = replaceQueuedataField("se", "token:ATLASUSERDISK:srm://osgserv04.slac.stanford.edu:8443/srm/v2/server?SFN=")
# ec = replaceQueuedataField("sepath", "/xrootd/atlas/atlasuserdisk")
# ec = replaceQueuedataField("seprodpath", "/xrootd/atlas/atlasuserdisk")
# ec = replaceQueuedataField("lfchost", "atl-lfc.slac.stanford.edu")
# ec = replaceQueuedataField("copyprefix", "^srm://osgserv04.slac.stanford.edu")
# ec = replaceQueuedataField("dq2url", "") #if thisSite.sitename == "SLACXRD":
# ec = replaceQueuedataField("retry", "false")
#if thisSite.sitename == "GLOW-ATLAS":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("retry", "false")
# ec = replaceQueuedataField("ddm", "WISC_PRODDISK")
# ec = replaceQueuedataField("lfchost", "higgs04.cs.wisc.edu")
# ec = replaceQueuedataField("lfcpath", "/grid/atlas/users/pathena")
# ec = replaceQueuedataField("lfcprodpath", "/grid/atlas/dq2")
# ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://atlas07.cs.wisc.edu:8443/srm/v2/server?SFN=")
# ec = replaceQueuedataField("setokens", "ATLASPRODDISK")
# ec = replaceQueuedataField("sepath", "/atlas/xrootd/atlasuserdisk")
# ec = replaceQueuedataField("seprodpath", "/atlas/xrootd/atlasproddisk")
# ec = replaceQueuedataField("copyprefix", "^srm://atlas07.cs.wisc.edu")
# ec = replaceQueuedataField("dq2url", "")
#if thisSite.sitename == "MWT2_UC" or thisSite.sitename == "UC_ATLAS_MWT2":
# ec = replaceQueuedataField("copytoolin", "dccp")
# ec = replaceQueuedataField("lfchost", "uct2-dc1.uchicago.edu")
# ec = replaceQueuedataField("copyprefix", "^srm://uct2-dc1.uchicago.edu")
# ec = replaceQueuedataField("retry", "false")
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("ddm", "MWT2_UC_PRODDISK")
# ec = replaceQueuedataField("copytool", "lcg-cp2")
# ec = replaceQueuedataField("envsetup", "source /share/wn-client/setup.sh")
# ec = replaceQueuedataField("setokens", "ATLASPRODDISK")
# ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://uct2-dc1.uchicago.edu:8443/srm/managerv2?SFN=")
# ec = replaceQueuedataField("seprodpath", "/pnfs/uchicago.edu/atlasproddisk")
#if thisSite.sitename == "OU_OSCER_ATLAS":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("copysetup", "")
# ec = replaceQueuedataField("se", "gsiftp://tier2-02.ochep.ou.edu/ibrix/data/dq2-cache/")
#if thisSite.sitename == "OU_OCHEP_SWT2":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("lfchost", "tier2-02.ochep.ou.edu")
# ec = replaceQueuedataField("copyprefix", "^tier2-02.ochep.ou.edu")
# ec = replaceQueuedataField("dq2url", "")
#if thisSite.sitename == "AGLT2":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("lfchost", "lfc.aglt2.org")
# ec = replaceQueuedataField("copyprefix", "^srm://head01.aglt2.org")
#if thisSite.sitename == "PIC":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("copyprefixin", "dcap://dcap.pic.es:22125^dummy")
# ec = replaceQueuedataField("copyprefixin", "dcap://dcap.pic.es/pnfs/pic.es/data^dummy")
# ec = replaceQueuedataField("copytoolin", "dccplfc")
#if thisSite.sitename == "ANALY_SWT2_CPB":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("copytool", "xcp")
# ec = replaceQueuedataField("copytoolin", "")
# ec = replaceQueuedataField("copysetupin", "")
# ec = replaceQueuedataField("copysetup", "/cluster/atlas/xrootd/atlasutils/setup_xcp.sh")
# ec = replaceQueuedataField("copyprefix", "^srm://gk03.atlas-swt2.org")
# ec = replaceQueuedataField("lfchost", "gk02.atlas-swt2.org")
# ec = replaceQueuedataField("lfcpath", "/grid/atlas/users/pathena")
# ec = replaceQueuedataField("lfcprodpath", "/grid/atlas/users/pathena")
#if thisSite.sitename == "SWT2_CPB":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("copytool", "xcp")
# ec = replaceQueuedataField("copytoolin", "")
# ec = replaceQueuedataField("copysetupin", "")
# ec = replaceQueuedataField("copysetup", "/cluster/atlas/xrootd/atlasutils/setup_xcp.sh")
# ec = replaceQueuedataField("copyprefix", "^srm://gk03.atlas-swt2.org")
# ec = replaceQueuedataField("lfchost", "gk02.atlas-swt2.org")
# ec = replaceQueuedataField("lfcpath", "")
# ec = replaceQueuedataField("lfcprodpath", "/grid/atlas/dq2")
# ec = replaceQueuedataField("ddm", "SWT2_CPB_PRODDISK")
# ec = replaceQueuedataField("copysetup", "")
# ec = replaceQueuedataField("copysetupin", "/cluster/atlas/xrootd/atlasutils/setup_xcp.sh")
# ec = replaceQueuedataField("envsetup", "source /cluster/grid/wn-client/setup.sh")
# ec = replaceQueuedataField("setokens", "ATLASPRODDISK")
# ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://gk03.atlas-swt2.org:8443/srm/v2/server?SFN=")
# ec = replaceQueuedataField("seprodpath", "/xrd/atlasproddisk")
#if thisSite.sitename == "IU_OSG":
# ec = replaceQueuedataField("status", "online")
# ec = replaceQueuedataField("ddm", "MWT2_IU_PRODDISK")
# ec = replaceQueuedataField("copytool", "lcg-cp2")
# ec = replaceQueuedataField("envsetup", "source /usr/local/osg/setup.sh")
# ec = replaceQueuedataField("setokens", "ATLASPRODDISK")
# ec = replaceQueuedataField("se", "token:ATLASPRODDISK:srm://iut2-dc1.iu.edu:8443/srm/managerv2?SFN=")
# ec = replaceQueuedataField("seprodpath", "/pnfs/iu.edu/atlasproddisk")
#if thisSite.sitename == "MWT2_IU":
# ec = replaceQueuedataField("copyprefix", "^srm://iut2-dc1.iu.edu")
#if thisSite.sitename == "ANALY_LYON_XROOTD":
# ec = replaceQueuedataField("copysetup", "")
# ec = replaceQueuedataField("copysetupin", "/usr/local/shared/bin/xrootd_env.sh^srm://ccsrm.in2p3.fr/pnfs/in2p3.fr/data/atlas/disk/dq2^root://ccsrb15:1094//pnfs/in2p3.fr/data/atlas/disk/dq2^False^True")
#if thisSite.sitename == "INFN-T1":
# ec = replaceQueuedataField("copytool", "storm")
#if thisSite.sitename == "INFN-BOLOGNA":
# ec = replaceQueuedataField("copytool", "storm")
# ec = replaceQueuedataField("lfchost", "lfc.cr.cnaf.infn.it")
# ec = replaceQueuedataField("se", "token:ATLASMCDISK:srm://storm-fe.cr.cnaf.infn.it:8444/srm/managerv2?SFN=,srm://srm-v2.cr.cnaf.infn.it")
# ec = replaceQueuedataField("sepath", "/atlas/atlasmcdisk//users/pathena")
# ec = replaceQueuedataField("seprodpath", "/atlas/atlasmcdisk/,/atlas/atlasdatadisk/,/castor/cnaf.infn.it/grid/lcg/atlas/atlasdatatape/")
# ec = replaceQueuedataField("setokens", "ATLASMCDISK,ATLASDATADISK,ATLASDATATAPE")
#if thisSite.sitename == "ANALY_CERN":
# ec = replaceQueuedataField("appdir", "/afs/cern.ch/atlas")
# _envsetup = "unset GLITE_ENV_SET;" + readpar("envsetup")
# ec = replaceQueuedataField("envsetup", _envsetup)
_status = readpar('status')
if _status != None and _status != "":
if _status.upper() == "OFFLINE":
tolog("Site %s is currently in %s mode - aborting pilot" % (thisSite.sitename, _status.lower()))
return -1, None, None
else:
tolog("Site %s is currently in %s mode" % (thisSite.sitename, _status.lower()))
# override pilot run options
_jobrec = readpar('retry')
if _jobrec.upper() == "TRUE":
tolog("Job recovery turned on")
jobrec = True
elif _jobrec.upper() == "FALSE":
tolog("Job recovery turned off")
jobrec = False
else:
tolog("Job recovery variable (retry) not set")
# set pilot variables in case they have not been set by the pilot launcher
thisSite = setUnsetVars(thisSite)
return 0, thisSite, jobrec
def isSameType(trf, userflag, prodSourceLabel):
""" is the lost job of same type as the current pilot? """
# MOVE TO JOB RECOVERY CLASS LATER
# treat userflag 'self' as 'user'
if userflag == 'self':
userflag = 'user'
if (isAnalJob(trf, prodSourceLabel=prodSourceLabel) and userflag == 'user') or \
(not isAnalJob(trf, prodSourceLabel=prodSourceLabel) and userflag != 'user'):
sametype = True
if userflag == 'user':
tolog("Lost job is of same type as current pilot (analysis pilot, lost analysis job trf: %s)" % (trf))
else:
tolog("Lost job is of same type as current pilot (production pilot, lost production job trf: %s)" % (trf))
else:
sametype = False
if userflag == 'user':
tolog("Lost job is not of same type as current pilot (analysis pilot, lost production job trf: %s)" % (trf))
else:
tolog("Lost job is not of same type as current pilot (production pilot, lost analysis job trf: %s)" % (trf))
return sametype
def verifyProxyValidity():
""" make sure that we have a long lasting proxy before asking for a job """
status = True
envsetupin = readpar('envsetupin')
if envsetupin == "":
envsetup = readpar('envsetup')
if envsetup == "":
tolog("Using envsetup since envsetupin is not set")
else:
tolog("verifyProxyValidity found no envsetup")
else:
envsetup = envsetupin
# do we have a valid proxy?
from SiteMover import SiteMover
ec, pilotErrorDiag = SiteMover.verifyProxy(envsetup)
return ec, pilotErrorDiag
def getGuidsFromXML(dir, id=None, filename=None):
""" extract the guid matching the filename from the xml, or all guids if filename not set """
guids = []
# are we in recovery mode? then id is set
if id:
metadata_filename = "%s/metadata-%s.xml" % (dir, repr(id))
else:
metadata_filename = "%s/metadata.xml" % (dir)
xmldoc = minidom.parse(metadata_filename)
fileList = xmldoc.getElementsByTagName("File")
for thisfile in fileList:
gpfn = str(thisfile.getElementsByTagName("lfn")[0].getAttribute("name"))
if (filename and gpfn == filename) or (not filename):
guid = str(thisfile.getAttribute("ID"))
guids.append(guid)
return guids
def addToSkipped(lfn, guid):
""" add metadata for skipped file """
ec = 0
try:
# append to skipped.xml file
fd = open("skipped.xml", "a")
except Exception, e:
tolog("!!WARNING!!2999!! Exception caught: %s" % str(e))
ec = -1
else:
fd.write(' \n' % (guid))
fd.write(" \n")
fd.write(' \n' % (lfn))
fd.write(" \n")
fd.write(" \n")
fd.close()
return ec
def addSkippedToPFC(fname, skippedfname):
""" add skipped input file info to metadata.xml """
ec = 0
try:
fd = open(skippedfname, "r")
except Exception, e:
tolog("!!WARNING!!2999!! %s" % str(e))
ec = -1
else:
skippedXML = fd.read()
fd.close()
try:
fdPFC = open(fname, "r")
except Exception, e:
tolog("!!WARNING!!2999!! %s" % str(e))
ec = -1
else:
PFCXML = fdPFC.read()
fdPFC.close()
if ec == 0:
# add the skipped file info to the end of the PFC
PFCXML = PFCXML.replace("", skippedXML)
PFCXML += "\n"
# move the old PFC and create a new PFC
try:
os.system("mv %s %s.BAK2" % (fname, fname))
except Exception, e:
tolog("!!WARNING!!2999!! %s" % str(e))
ec = -1
else:
try:
fdNEW = open(fname, "w")
except Exception, e:
tolog("!!WARNING!!2999!! %s" % str(e))
ec = -1
else:
fdNEW.write(PFCXML)
fdNEW.close()
tolog("Wrote updated XML with skipped file info:\n%s" % (PFCXML))
return ec
def verifyPilotWorkir(args, wntmpdir):
""" make sure that the args tuple contains a proper wntmpdir to be used as the pilot workdir """
# if the wrapper script fails to download the queuedata, the pilot will be started with a default workdir
# which might not be ok
try:
if args[2] != wntmpdir and wntmpdir != '':
tolog("!!WARNING!!2999!! Args tuple does not contain the same wntmpdir as schedconfig: %s ne %s" % (args[2], wntmpdir))
# update the tuple with the schedconfig value
tolog("Original args tuple: %s" % str(args))
proper_args_list = list(args)
proper_args_list[2] = wntmpdir
args = tuple(proper_args_list)
tolog("Updated args tuple: %s" % str(args))
else:
tolog("Args tuple contains a valid wntmpdir: %s" % (wntmpdir))
except Exception, e:
tolog("!!WARNING!!2999!! Caught exception: %s" % str(e))
return args
def getSwbase(appdir, release):
""" return the swbase variable """
# appdir comes from thisSite.appdir (might not be set)
# release info is needed to figure out the correct path to use when schedconfig.appdir is set
swbase = ""
if readpar('region') == 'Nordugrid':
if os.environ.has_key('RUNTIME_CONFIG_DIR'):
_swbase = os.environ['RUNTIME_CONFIG_DIR']
if os.path.exists(_swbase):
swbase = _swbase
elif os.environ.has_key('VO_ATLAS_SW_DIR'):
# use the appdir from queuedata if available
scappdir = readpar('appdir')
if scappdir != "":
# e.g. for CERN-UNVALID, appdir contains the full path to the releases
if os.path.exists(os.path.join(scappdir, release)):
swbase = scappdir
# for other CERN sites 'software/release' must be added
elif os.path.exists(os.path.join(scappdir, 'software/releases')):
swbase = os.path.join(scappdir, 'software/releases')
# for remaining LCG sites, only 'software' needs to be added
else:
swbase = os.path.join(scappdir, 'software')
else:
# for CERN
if os.path.exists(os.environ['VO_ATLAS_SW_DIR'] + '/software/releases'):
swbase = os.environ['VO_ATLAS_SW_DIR'] + '/software/releases'
# for remaining LCG sites
else:
swbase = os.environ['VO_ATLAS_SW_DIR'] + '/software'
else:
# for non-LCG sites
if appdir.find('atlas_app/atlas_rel') < 0:
swbase = appdir + '/atlas_app/atlas_rel'
else:
swbase = appdir
return swbase.replace('//','/')
class _Curl:
""" curl class """
# constructor
def __init__(self):
# path to curl
self.path = 'curl'
# verification of the host certificate
self.verifyHost = True
# request a compressed response
self.compress = True
# SSL cert/key
if os.environ.has_key('X509_USER_PROXY'):
self.sslCert = os.environ['X509_USER_PROXY']
self.sslKey = os.environ['X509_USER_PROXY']
else:
self.sslCert = '/tmp/x509up_u%s' % str(os.getuid())
self.sslKey = '/tmp/x509up_u%s' % str(os.getuid())
# CA cert dir
if os.environ.has_key('X509_CERT_DIR'):
self.sslCertDir = os.environ['X509_CERT_DIR']
else:
_dir = '/etc/grid-security/certificates'
if os.path.exists(_dir):
self.sslCertDir = _dir
else:
tolog("!!WARNING!!2999!! $X509_CERT_DIR is not set and default location %s does not exist" % (_dir))
self.sslCertDir = ''
# GET method
def get(self, url, data):
# make command
com = '%s --silent --get' % self.path
if not self.verifyHost:
com += ' --insecure'
if self.compress:
com += ' --compressed'
if self.sslCertDir != '':
com += ' --capath %s' % self.sslCertDir
if self.sslCert != '':
com += ' --cert %s' % self.sslCert
if self.sslKey != '':
com += ' --key %s' % self.sslKey
#com += ' --verbose'
# data
strData = ''
for key in data.keys():
strData += 'data="%s"\n' % urllib.urlencode({key:data[key]})
# write data to temporary config file
# tmpName = commands.getoutput('uuidgen')
tmpName = 'curl.config'
tmpFile = open(tmpName,'w')
tmpFile.write(strData)
tmpFile.close()
com += ' --config %s' % tmpName
com += ' %s' % url
# execute
tolog("Executing command: %s" % (com))
ret = commands.getstatusoutput(com)
# remove temporary file
#os.remove(tmpName)
return ret
# POST method
def post(self, url, data):
# make command
com = '%s --silent --show-error' % self.path
if not self.verifyHost:
com += ' --insecure'
if self.compress:
com += ' --compressed'
if self.sslCertDir != '':
com += ' --capath %s' % self.sslCertDir
if self.sslCert != '':
com += ' --cert %s' % self.sslCert
if self.sslKey != '':
com += ' --key %s' % self.sslKey
#com += ' --verbose'
# data
strData = ''
for key in data.keys():
strData += 'data="%s"\n' % urllib.urlencode({key:data[key]})
# write data to temporary config file
#tmpName = commands.getoutput('uuidgen')
tmpName = 'curl.config'
tmpFile = open(tmpName,'w')
tmpFile.write(strData)
tmpFile.close()
com += ' --config %s' % tmpName
com += ' %s' % url
# execute
tolog("Executing command: %s" % (com))
ret = commands.getstatusoutput(com)
# remove temporary file
#os.remove(tmpName)
return ret
# PUT method
def put(self, url, data):
# make command
com = '%s --silent' % self.path
if not self.verifyHost:
com += ' --insecure'
if self.compress:
com += ' --compressed'
if self.sslCertDir != '':
com += ' --capath %s' % self.sslCertDir
if self.sslCert != '':
com += ' --cert %s' % self.sslCert
if self.sslKey != '':
com += ' --key %s' % self.sslKey
#com += ' --verbose'
# emulate PUT
for key in data.keys():
com += ' -F "%s=@%s"' % (key,data[key])
com += ' %s' % url
# execute
tolog("Executing command: %s" % (com))
return commands.getstatusoutput(com)
# send message to dispatcher
def toDispatcher(baseURL, cmd, data):
""" sends 'data' using command 'cmd' to the dispatcher """
try:
tpre = datetime.datetime.utcnow()
except:
pass
tolog("toDispatcher: cmd = %s" % (cmd))
tolog("toDispatcher: len(data) = %d" % len(data))
# instantiate curl
curl = _Curl()
# execute
url = baseURL + '/' + cmd
curlstat, response = curl.post(url, data)
try:
tpost = datetime.datetime.utcnow()
tolog("Elapsed seconds: %d" % ((tpost-tpre).seconds))
except:
pass
try:
if curlstat == 0:
# parse response message
outtxt = response.lower()
if outtxt.find('') > 0:
if outtxt.find('read timeout') > 0:
tolog("!!WARNING!!2999!! Timeout on dispatcher exchange")
else:
tolog("!!WARNING!!2999!! HTTP error on dispatcher exchange")
tolog("HTTP output: %s" % (response))
return EC_Failed, None, None
# create the parameter list from the dispatcher response
data = parseDispatcherResponse(response)
status = int(data['StatusCode'])
if status == SC_Success:
tolog("Successful dispatcher exchange")
elif status == SC_NoJobs:
tolog("!!WARNING!!0!! Dispatcher has no jobs")
elif status == SC_TimeOut:
tolog("!!WARNING!!2999!! Dispatcher reports timeout")
elif status == SC_Failed:
tolog("!!WARNING!!2999!! Dispatcher reports failure processing message")
elif status == SC_NonSecure:
tolog("!!FAILED!!2999!! Attempt to retrieve job with non-secure connection disallowed")
else:
tolog("!!WARNING!!2999!! Unknown dispatcher status code: %d" % (status))
tolog("Dumping curl.config file:")
dumpFile('curl.config', topilotlog=True)
else:
tolog("!!WARNING!!2999!! Dispatcher message curl error: %d " % (curlstat))
tolog("Response = %s" % (response))
tolog("Dumping curl.config file:")
dumpFile('curl.config', topilotlog=True)
return curlstat, None, None
if status == SC_Success:
return status, data, response
else:
return status, None, None
except:
type, value, traceBack = sys.exc_info()
tolog("ERROR %s : %s %s" % (cmd, type, value))
return EC_Failed, None, None
def getPilotToken(tofile=False):
""" read the pilot token from file """
pilottoken = None
filename = "pilottoken.txt"
if os.path.exists(filename):
try:
f = open(filename, "r")
except Exception, e:
tolog("!!WARNING!!2999!! Could not open pilot token file: %s" % str(e), tofile=tofile)
else:
try:
pilottoken = f.read()
except Exception, e:
tolog("!!WARNING!!2999!! Could not read pilot token: %s" % str(e), tofile=tofile)
else:
f.close()
tolog("Successfully read pilot token", tofile=tofile)
try:
os.remove(filename)
except Exception, e:
tolog("!!WARNING!!2999!! Could not remove pilot token file: %s" % str(e), tofile=tofile)
else:
tolog("Pilot token file has been removed", tofile=tofile)
return pilottoken
def getRandomURL():
""" return a random url from the urlList for getJob/updateJob """
from random import shuffle
urlList = ['https://gridui05.usatlas.bnl.gov','https://gridui06.usatlas.bnl.gov','https://gridui07.usatlas.bnl.gov']
shuffle(urlList)
return urlList[0]
def parseDispatcherResponse(response):
""" create the parameter list from the dispatcher response """
parList = cgi.parse_qsl(response, keep_blank_values=True)
tolog("Dispatcher response: %s" % str(parList))
data = {}
for p in parList:
data[p[0]] = p[1]
return data