from MoverCatalog import MoverCatalog from QueueCatalog import QueueCatalog from Log import Log import MoversLog import string import site_conf import sys import os import commands import time MAX_RETRIES = 2 # don't retry PROGRESS_LOGFILE = 'progress.log' class Mover: """Mover 'agent'""" def __init__(self, fname, log): self.log = log self.pf = Log('Mover', PROGRESS_LOGFILE) self.fname = fname self.cat = MoverCatalog(log, 'xmlcatalog_file:%s' % fname) self.queue = QueueCatalog(site_conf.queued_transfers_cat, self.log) def move(self): """Perform file movement.""" files = self.cat.read() # delete the file here so it doesn't get picked up by another process del self.cat try: os.remove(self.fname) except: pass try: os.remove(self.fname+'.BAK') except: pass self.log.logDebug( "Removed "+self.fname) failures = 0 ftssuccess = [] self.reqs = {} self.status = {} # submit / accumulate transfer requests for i in range(0, len(files)): guid = files.keys()[i] source_file = files[guid]['pfn'] dest_site = files[guid]['destination'] fsize = files[guid]['fsize'] tool, source_pfn, dest_pfn = self.resolveProtocols(source_file, dest_site) if tool is None: self.log.logError("Skipped moving file with GUID %s since no suitable tool found" % guid) continue self.log.logInfo("Moving %s to %s using %s" % (source_pfn, dest_pfn, tool)) # initialize memory to store final status per tool, per bulk if not self.status.has_key(tool): self.status[tool] = {} # do file copy and get bulkid bulkid = self.copyFile(tool, source_pfn, dest_pfn, fsize) # for this guid, remember tool and bulkid that were used self.reqs[guid] = (tool, bulkid, source_file, dest_pfn) # finish all pending bulk requests per tool for tool in self.status.keys(): self.finishBulk(tool) # report on success/failure per bulk for bulkid in self.status[tool].keys(): s = self.status[tool][bulkid]['status'] o = self.status[tool][bulkid]['output'] tsize = self.status[tool][bulkid]['size'] ttime = self.status[tool][bulkid]['time'] id = self.status[tool][bulkid]['id'] if s == 0: self.pf.logInfo("SUCCESS BULKID:%s " \ "BULKSIZE:%d BULKTIME:%f" % (id, tsize, ttime) ) else: self.pf.logInfo("FAILURE BULKID:%s BULKSIZE:%d " \ "BULKTIME:%f ERROR:%s" % (id, tsize, ttime, o.replace('\n',',')) ) # for each file for guid in files.keys(): # collect corresponding transfer result if guid in self.reqs: tool, bulkid, source_file, dest_pfn = self.reqs[guid] s = self.status[tool][bulkid]['status'] o = self.status[tool][bulkid]['output'] tsize = self.status[tool][bulkid]['size'] ttime = self.status[tool][bulkid]['time'] if tool == 'fts': if source_file in self.status[tool][bulkid]['failures']: s = 1 else: s = 0 # didn't even attempt a copy (no tool found) else: r = self.queue.changeState([guid], 'failedReplication') if r != []: self.log.logError('Failed to change state of GUIDs %s' % r) del files[guid] failures += 1 continue # success! if s == 0: try: self.queue.renamePFN(source_file, dest_pfn) self.pf.logInfo("SUCCESS GUID:%s BULKID:%d " \ "SRC:%s DEST:%s" % (guid, bulkid, source_file, dest_pfn) ) if tool == 'fts': ftssuccess.append(guid) except: self.log.logError('Failed to rename PFN for %s' % source_file) # remove file from validation list and count failure del files[guid] failures += 1 # failed copy! else: if source_file is not None: if tool == 'fts': self.pf.logInfo("FAILED GUID:%s FTSID:%s " \ "SRC:%s DEST:%s" % (guid, self.status['fts'][bulkid]['id'], source_file, dest_pfn) ) else: self.pf.logInfo("FAILED GUID:%s BULKID:%d " \ "SRC:%s DEST:%s" % (guid, bulkid, source_file, dest_pfn) ) # remove file from validation list and count failure del files[guid] failures += 1 retries = self.queue.getRetries(guid) if retries == '': retries = 1 else: retries = int(retries) # # TODO: Improve replication strategy. Currently tries MAX_RETRIES. If # databucket doesn't retry (otherwise it would go to unknownSURL) # which is an invalid state for databuckets! # if retries >= MAX_RETRIES: r = self.queue.changeState([guid], 'failedReplication') if r != []: self.log.logError('Failed to change state of GUIDs %s' % r) else: # if it is not a databucket, set to unknown surl to try again atts = self.queue.findFilesQuery("guid='%s'" % guid) if atts[guid]['srcsite'] != site_conf.site_id: r = self.queue.changeState([guid], 'unknownSURL') if r == []: try: self.queue.updateRetries(guid) self.queue.renamePFN(source_file, 'unknownSURL:'+guid) except: self.log.logError('Failed updating queue catalog') else: self.log.logError('Failed to change state of GUIDs %s' % r) # for databuckets we don't retry for now else: r = self.queue.changeState([guid], 'failedReplication') if r != []: self.log.logError('Failed to change state of GUIDs %s' % r) # if ignoring validation using FTS if site_conf.__dict__.has_key('ignore_validation_with_fts') and \ site_conf.ignore_validation_with_fts == 'yes': r = self.queue.changeState(ftssuccess, 'novalidation') if r != []: self.log.logError('Failed to change state of GUIDs to novalidation %s' % r) for file in ftssuccess: del files[file] # (bulk) change state to ready to be validated for all guids r = self.queue.changeState(files.keys(), 'toValidate') if r != []: self.log.logError('Failed to change state of GUIDs to toValidate %s' % r) # remove entry from special '.movers' file MoversLog.removeEntry(os.getpid()) return failures def resolveProtocols(self, source, dest_site): # source will always have a protocol (srm, file etc) p_src = source.split(':')[0] dest_se = {} dest = p_dest = '' # For datablocks: find local storage path and protocol if dest_site.find(site_conf.site_id) != -1: # if source is gsiftp or sfn, try to choose srm for destination if (p_src == 'sfn' or p_src == 'gsiftp') and \ site_conf.default_storage['backend'].has_key('srm'): dest_se = site_conf.default_storage dest = dest_se['backend']['srm'] p_dest = 'srm' else: # try to find destination protocol matching source if site_conf.default_storage['backend'].has_key(p_src): dest_se = site_conf.default_storage dest = dest_se['backend'][p_src] p_dest = p_src else: # if not, go to default protocol dest_se = site_conf.default_storage p_dest = site_conf.default_protocol dest = dest_se['backend'][p_dest] # For databuckets: find remote storage path and protocol else: dest_host = None for site in site_conf.remote_storages.keys(): if dest_site.startswith(site): dest_host = site break if dest_host is None: self.log.logError('No configuration for destination: '+dest_site) return None, None, None # get the backend for the destination dest = site_conf.remote_storages[dest_host][0] p_dest = dest.split(':')[0] # TODO: Databuckets don't have the possibility to specify which # tool to use for a given protocol, like we do for datablocks # with 'cp_tool' or 'srm_tool'... dsname = dest_site.replace(site_conf.site_id, '') if dsname[-1] == '/': dsname = dsname[:-1] if dsname[0] == '/': dsname = dsname[1:] dsname = dsname.replace('/', '.') # build final destination name (adding dataset name) # TODO: Add destination directory based on source directory?? dest = ((dest[-1] == '/') and dest[:-1] or dest)+'/'+dsname+'/'+source.split('/')[-1] # posix cp if p_src == 'file' and p_dest == 'file': if dest_se.has_key('cp_tool'): return dest_se['cp_tool'], source, dest return 'cp', source, dest # rfio to/from castor to/from local file system elif p_src == 'file' or p_src == 'rfio': if p_dest == 'rfio' or p_dest == 'file': return 'rfcp', source, dest else: self.log.logError('Incompatible protocols: source %s destination %s' % \ (p_src, p_dest) ) return None, None, None # http to local file elif p_src == 'http': if p_dest == 'file': return 'http', source, dest else: self.log.logError('Can only do wget to local file. Destination is %s.' % \ p_dest) return None, None, None # gsiftp to gsiftp elif (p_src == 'gsiftp' or p_src == 'sfn') and \ (p_dest == 'gsiftp' or p_dest == 'sfn'): if dest_se.has_key('gsiftp_tool'): return dest_se['gsiftp_tool'], source, dest return 'guc', source, dest # srm elif ((p_src == 'sfn' or p_src == 'gsiftp' or p_src == 'srm') and p_dest =='srm') or \ (p_src == 'srm' and (p_dest == 'sfn' or p_dest == 'gsiftp' or p_dest == 'srm')): # TODO: Fix SRM endpoints! Use fully qualified surls and take out double slashes if source.find('gsiftp') == -1 and source.find(':8443/srm/managerv1?SFN=') == -1: s = source.split('/', 3) source = 'srm://'+s[2].split(':')[0]+':8443/srm/managerv1?SFN=/'+s[3].replace('//', '/') if dest.find('gsiftp') == -1 and dest.find(':8443/srm/managerv1?SFN=') == -1: d = dest.split('/', 3) dest = 'srm://'+d[2].split(':')[0]+':8443/srm/managerv1?SFN=/'+d[3].replace('//', '/') if dest_se.has_key('srm_tool'): return dest_se['srm_tool'], source, dest return 'fts', source, dest else: self.log.logError('Only http, file, rfio, sfn, gsiftp and srm protocols supported at the moment!') return None, None, None def copyFile(self, tool, source_pfn, dest_pfn, fsize): """Call the method to copy the file depending on the chosen protocol""" if tool == 'wget': id = self.do_wget(source_pfn, dest_pfn, fsize) elif tool == 'rfcp': id = self.do_rfcp(source_pfn, dest_pfn, fsize) elif tool == 'cp': id = self.do_cp(source_pfn, dest_pfn, fsize) elif tool == 'fts': id = self.add_to_fts(source_pfn, dest_pfn, fsize) elif tool == 'guc': id = self.do_guc(source_pfn, dest_pfn, fsize) else: self.log.logError('Unsupported tool %s' % tool) return -1 return id def finishBulk(self, tool): """Finish pending bulk requests""" if tool == 'fts': result = self.finish_fts() def newBulk(self, tool): id = len(self.status[tool]) id += 1 el = { 'defs': [], 'guids': [], 'status': None, 'output': None, 'size': 0, 'time': 0.0, 'id': id} self.status[tool][id] = el return id def getLatestBulk(self, tool): id = len(self.status[tool]) if id == 0: # generate first element id = 1 el = { 'defs': [], 'guids': [], 'status': None, 'output': None, 'size': 0, 'time': 0.0, 'id': id} self.status[tool][id] = el return id def do_wget(self, source, dest, fsize): """Do WGET copy of file and return new ID""" id = self.newBulk('wget') st = time.time() s, o = commands.getstatusoutput('wget %s -O %s' % (source, dest)) t = time.time() - st self.status['wget'][id]['status'] = s self.status['wget'][id]['output'] = o self.status['wget'][id]['size'] = int(fsize) self.status['wget'][id]['time'] = float(t) return id def do_cp(self, source, dest, fsize): """Do CP copy of file and return new ID""" if dest[:5] == 'file:': dest = dest[5:] if source[:5] == 'file:': source = source[5:] id = self.newBulk('cp') st = time.time() s, o = commands.getstatusoutput('cp %s %s' % (source, dest)) t = time.time() - st self.status['cp'][id]['status'] = s self.status['cp'][id]['output'] = o self.status['cp'][id]['size'] = int(fsize) self.status['cp'][id]['time'] = float(t) return id def do_rfcp(self, source, dest, fsize): """Do RFCP copy of file and return new ID""" if source[:5] == 'rfio:' or source[:5] == 'file:': source = source[5:] if dest[:5] == 'rfio:' or dest[:5] == 'file:': dest = dest[5:] id = self.newBulk('rfcp') st = time.time() s,o = commands.getstatusoutput('rfcp %s %s' % (source, dest)) # see if the dest directory path doesn't exist and if so create it if s != 0 and o.find(dest+': No such file or directory') != -1: fname = dest.split('/')[-1] self.log.logInfo('Trying to create directory '+dest.replace('/'+fname, '')) s2, o2 = commands.getstatusoutput('rfmkdir -p %s' % dest.replace('/'+fname, '')) if s2 == 0: s,o = commands.getstatusoutput('rfcp %s %s' % (source, dest)) t = time.time() - st self.status['rfcp'][id]['status'] = s self.status['rfcp'][id]['output'] = o self.status['rfcp'][id]['size'] = int(fsize) self.status['rfcp'][id]['time'] = float(t) return id def do_guc(self, source, dest, fsize): """Do globus-url-copy copy of file and return new ID""" # match LCG to OSG if source[:4] == 'sfn:': source = 'gsiftp:'+source[4:] if dest[:4] == 'sfn:': dest = 'gsiftp:'+dest[4:] destdir = '' elelist = dest.split('/')[:-1] for x in elelist: destdir = destdir +'/'+x destdir = destdir[1:] print 'Making directory %s if necessary ' % destdir s,o = commands.getstatusoutput('edg-gridftp-mkdir %s' % destdir) if s != 0: print 'problem making destination directory:', o id = self.newBulk('guc') st = time.time() s, o = commands.getstatusoutput('globus-url-copy %s %s' % (source, dest)) t = time.time() - st self.status['guc'][id]['status'] = s self.status['guc'][id]['output'] = o self.status['guc'][id]['size'] = int(fsize) self.status['guc'][id]['time'] = float(t) return id def add_to_fts(self, source, dest, fsize): """Add to bulk FTS request""" if source[:4] == 'sfn:': source = 'gsiftp:'+source[4:] if dest[:4] == 'sfn:': dest = 'gsiftp:'+dest[4:] id = self.getLatestBulk('fts') self.status['fts'][id]['defs'].append( (source, dest) ) self.status['fts'][id]['size'] += int(fsize) return 1 def finish_fts(self): """Submit bulk FTS request""" id = self.getLatestBulk('fts') # create temporary file with transfers try: tmpfile = os.tmpnam() except RuntimeWarning: pass f = open(tmpfile, 'w') for req in self.status['fts'][id]['defs']: f.write(req[0]+" "+req[1]+"\n") f.close() # create subdir for dest gsiftp server by hand dest = req[1] if dest.find('gsiftp') != -1: destdir = '' elelist = dest.split('/')[:-1] for x in elelist: destdir = destdir +'/'+x destdir = destdir[1:] print 'Making directory %s if necessary ' % destdir s,o = commands.getstatusoutput('edg-gridftp-mkdir %s' % destdir) if s != 0: print 'problem making destination directory:', o self.log.logDebug("FTS transfer definition file is %s" % tmpfile) s = o = None # submit request st = time.time() s, o = commands.getstatusoutput('glite-transfer-submit -s %s -p %s -f %s' % (site_conf.fts_endpoint, site_conf.fts_pwd, tmpfile)) if s != 0: self.status['fts'][id]['status'] = s self.status['fts'][id]['output'] = o self.status['fts'][id]['time'] = 0 self.status['fts'][id]['failures'] = [] for req in self.status['fts'][id]['defs']: self.status['fts'][id]['failures'].append(req[0]) return ftsid = o.strip() self.log.logInfo("FTS transfer ID is %s" % ftsid) self.status['fts'][id]['id'] = ftsid finished = failed = '' lst = '' # poll until finished while finished == '' and failed == '': s, o = commands.getstatusoutput('glite-transfer-status -s %s %s' % (site_conf.fts_endpoint, ftsid)) o = o.strip() if o != lst: self.log.logDebug("FTS id %s transfer status changed to %s" % (ftsid, o)) lst = o if o == 'Finished' or o == 'Done': s = 0 finished = o self.status['fts'][id]['failures'] = [] elif o == 'Failed' or o == 'Hold' or o == 'Canceled': s = 1 failed = o out = commands.getoutput('glite-transfer-status -l -s %s %s' % (site_conf.fts_endpoint, ftsid)) self.status['fts'][id]['failures'] = self.parseFTSFailures(out) else: time.sleep(site_conf.fts_poll_sec) t = time.time() - st # delete temporary file with transfers os.remove(tmpfile) # update status self.status['fts'][id]['status'] = s self.status['fts'][id]['output'] = o self.status['fts'][id]['time'] = float(t) return s def parseFTSFailures(self, output): """Parse the output of glite-transfer-status -l and return a list of failed files.""" failed = [] # list of source files that failed to copy output = output.strip().split('\n') for l in output: l = l.split() if len(l) < 2: continue if l[0] == 'Source:': source = l[1] elif l[0] == 'State:': if l[1] == 'Failed' or l[1] == 'Hold': failed.append(source) return failed def main(argv): l = Log('Mover') if len(argv) != 1: print >>sys.stderr, "Mover catalog XML filename missing in arguments!" sys.exit(1) mc = Mover(argv[0], l) n = mc.move() if n == 0: l.logInfo("Done successfully") else: l.logInfo("%d replication(s) failed!" % n) if __name__ == '__main__': main(sys.argv[1:])