import getopt import site_conf import os import sys import time import random import x509 import urllib import commands from LocationClient import LocationClient from RepositoryClient import RepositoryClient from ContentClient import ContentClient from ClaimsClient import ClaimsClient from ClaimsCatalog import ClaimsCatalog from ContentCatalog import ContentCatalog from QueueCatalog import QueueCatalog from ReplicaCatalog import ReplicaCatalog from Log import Log import DQ2Mail x509 = x509.getX509() if x509 == -1: DQ2Mail.sendErrorMail('[Fetcher] Proxy certificate expired!') sys.exit(1) class Fetcher: """Fetcher 'agent'""" def __init__(self, logfile): self.log = logfile self.childs = [] self.initCatalogs() def doCallback(self, vuid=None, dsn=None): if site_conf.__dict__.has_key('callback_dataset_complete'): cb = site_conf.callback_dataset_complete if vuid is not None: cb = cb.replace('[VUID]', vuid) if dsn is not None: cb = cb.replace('[DSN]', dsn) time.sleep(0.01) s, o = commands.getstatusoutput("curl -k %s" % cb) if s != 0: self.log.logError('Callback to %s failed!' % cb) return False else: self.log.logInfo('Callback to %s succeeded!' % cb) return True def initCatalogs(self): """Initialize catalogs.""" if self.__dict__.has_key('lc'): del self.lc if self.__dict__.has_key('rc'): del self.rc if self.__dict__.has_key('cc'): del self.cc if self.__dict__.has_key('clc'): del self.clc if self.__dict__.has_key('lcc'): del self.lcc if self.__dict__.has_key('lrc'): del self.lrc if self.__dict__.has_key('ldc'): del self.ldc if self.__dict__.has_key('qt'): del self.qt self.lc = LocationClient(x509) self.rc = RepositoryClient(x509) self.cc = ContentClient(x509) self.clc = ClaimsClient(proxy_cert=x509) self.lcc = ContentCatalog(site_conf.local_content_cat) if site_conf.local_replica_cat is not None: self.lrc = ReplicaCatalog(site_conf.local_replica_cat, self.log) else: self.lrc = None self.ldc = ContentCatalog(site_conf.local_databuckets_cat) self.qt = QueueCatalog(site_conf.queued_transfers_cat, self.log) def parseQueryString(self, query): """Return a dictionary of key value pairs given in the query string""" dict = {} params = query.split('?') if len(params) != 1: params = params[1].split('&') for p in params: pair = p.split('=') pair[1] = urllib.unquote(pair[1]) #pair[1] = pair[1].replace('%2F', '/') dict[pair[0]] = pair[1] return dict def isDatablock(self, siteId): """Check whether this is not a databucket""" # TODO: Add check to see if siteId URL is valid according to some rules! return not self.isDatabucket(siteId) def isDatabucket(self, url): """Check whether this is a site URL plus databucket indication""" # TODO: Add check to see if siteId URL is valid according to some rules! d = self.parseQueryString(url) if d.has_key('databucket') and d['databucket'][0].upper() == 'Y': return True return False def getStorage(self, url): """Return storage from URL subscription (e.g '?se=XXX'), or blank if no storage indication""" d = self.parseQueryString(url) if d.has_key('se'): return d['se'] return '' def getDatablockSubscription(self, subs): """Given set of datablock subscriptions, choose one to handle.""" # find not currently subscribed datasets dsns = [] for sub in subs: uid = sub[0] destination = sub[1] # resolve either vuid or duid to a vuid (latest version) vuid = self.rc.resolveDUID(uid) if vuid is None or vuid.find('No such vuid or duid') != -1: # registered vuid doesn't exist... self.log.logError( "Registered vuid or duid %s no longer exists in repository, removing subscription" % uid) # remove subscription self.lc.deleteDatasetReplica(uid, destination) continue # check if we have already picked up this subscription # # WHY HERE?? # if self.lcc.datasetExists(vuid): # do callback # TODO: add dsn support # self.doCallback(vuid=vuid) self.log.logDebug("Datablock subscription for VUID '%s' already being processed" % vuid) continue # check we don't already have the latest version of a duid if vuid != uid: if site_conf.site_id in self.lc.queryDatasetLocations(vuid, 'yes'): # do callback anyway # self.doCallback(vuid=vuid) self.log.logDebug("Subscription for latest version of DUID '%s' already exists" % uid) continue name = self.rc.resolveVUID(vuid)[0] if name is None: self.log.logError('Error resolving VUID %s' % vuid) continue if name[0] != '/': name = '/'+name dsns.append( (vuid, destination+name) ) if len(dsns) == 0: self.log.logInfo("No new datablock subscriptions available") return None # make up a dictionary of subscriptions subsdict = {} for sub in dsns: vuid = sub[0] desthost = sub[1] self.log.logInfo("Datablock subscription found. UID '%s', destination '%s'" % (vuid, desthost)) subsdict[vuid] = desthost return subsdict def applyPolicySites(self, complete, incomplete): """Apply policy to order the complete and incomplete sites according to the close sites in site_conf.""" com_ord = [] incom_ord = [] for csite in site_conf.close_sites: for site in complete: if site.find(csite) != -1: com_ord.append(site) for site in incomplete: if site.find(csite) != -1: incom_ord.append(site) for site in complete: if site not in com_ord: com_ord.append(site) for site in incomplete: if site not in incom_ord: incom_ord.append(site) return com_ord, incom_ord def fetchDatablocks(self, uids): """If subscription datablock subscription is available, add transfer request with sources from close sites only.""" r = self.getDatablockSubscription(uids) if r is None: return False # loop over all new subscriptions launching new process for each for vuid in r.keys(): pid = os.fork() if pid == 0: # child os.setsid() self.initCatalogs() # find files in block files = self.cc.queryFilesInDataset(vuid) guids = files.keys() lfns = files.values() # skip all guids that already exist on catalog # # TODO: This is the most expensive call on all system # IMPROVE IT (MySQL timestamps? keep values in Fetcher memory # between calls, etc) # if self.lrc is not None: i = 0 for j in self.lrc.guidsExist(guids): guid = guids.pop(j-i) lfns.pop(j-i) self.log.logDebug( "Skipped "+guid+" since already on site") i += 1 # if all guids are already on site... if guids == []: # do callback # TODO: add dsn support if not self.doCallback(vuid=vuid): sys.exit(0) # publish dataset in central res = self.lc.addDatasetReplica(vuid, site_conf.site_id, 'yes') if res.find('Error') != -1: self.log.logError(res) DQ2Mail.sendErrorMail(res) sys.exit(0) self.log.logInfo( "VUID "+vuid+" published since already on site") # add subscription to local copy of catalogs guidstr = '' for guid in guids: guidstr += files[guid]+'+'+guid+'+' guidstr = guidstr[:-1] complete = self.lc.queryDatasetLocations(vuid, 'yes') incomplete = self.lc.queryDatasetLocations(vuid, 'no') try: incomplete.remove(site_conf.site_id) except: # subs to duid so add vuid res = self.lc.addDatasetReplica(vuid, site_conf.site_id, 'no') if res.find('Error') != -1: self.log.logError(res) DQ2Mail.sendErrorMail(res) sys.exit(0) # order complete and incomplete by close site complete, incomplete = self.applyPolicySites(complete, incomplete) self.log.logInfo('Adding subscription to VUID '+vuid) try: res = self.lcc.addDataset(vuid, guidstr, complete, incomplete) if res != []: self.log.logError(str(res)) DQ2Mail.sendErrorMail(str(res)) sys.exit(0) except Exception, e: self.log.logError( str(e)+' '+vuid) # add site hint for closest replicas if len(complete) > 0: srcsite = complete[0] elif len(incomplete) > 0: srcsite = incomplete[0] else: srcsite = 'unknownSite' self.log.logInfo('hint for close site: '+ srcsite) # add contents to replica catalog # time consuming so process in batches limit = site_conf.bulk_request_limit for i in range(0, len(guids)/limit+1): upper = i*limit+limit if upper > len(guids): upper = len(guids) self.log.logDebug('adding surls '+str(i*limit+1)+' to '+str(upper)) failures = self.qt.addFilesUnknownSURL(guids[i*limit:upper], lfns[i*limit:upper], srcsite, r[vuid]) if len(failures) > 0: self.log.logError('%d failures when adding to queued transfers catalog' % len(failures)) # finish up worker child sys.exit(0) else: self.log.logInfo( "Launched worked %d " % pid ) self.childs.append( pid ) def checkDatabuckets(self, subs): """Given set of databuckets, checks if new content available and if so, add movement requests.""" # for each databucket for sub in subs: vuid = sub[0] self.log.logDebug('Databucket VUID is %s' % vuid) vuid_dict = self.rc.resolveVUID(vuid) if len(vuid_dict) == 0 or vuid_dict[0] is None: self.log.logError( "Registered vuid or duid %s no longer exists in repository, removing subscription" % vuid) # remove subscription self.lc.deleteDatasetReplica(vuid, sub[1]) continue name = vuid_dict[0] if name[0] != '/': name = '/'+name # for a databucket, the storage is the destination site ID deststorage = self.getStorage(sub[1])+name if deststorage == '': self.log.logError("Missing databucket destination site! "+sub[1]) continue # check this bucket has not been already closed if site_conf.site_id in self.lc.queryDatasetLocations(vuid, 'yes'): # # TODO: Email site_conf.site_email ? This is impossible situation.. # self.log.logInfo('Warning: new files in closed bucket: '+vuid) self.log.logInfo('Removing bucket subscription') res = self.lc.deleteDatabucketSubscription(vuid, site_conf.site_id, deststorage) if res is not None and res.find('Error') != -1: self.log.logError(res) DQ2Mail.sendErrorMail(res) return continue # get new files to be moved out from local # databuckets content catalog files = self.ldc.queryFilesInDataset(vuid) if len(files) == 0: self.log.logDebug("No files found in local databucket catalog for VUID '%s'" % vuid) continue pid = os.fork() if pid == 0: # child process os.setsid() self.initCatalogs() surls = files.keys() guids = files.values() lfns = [] fsizes = [] md5s = [] # for each file, get attributes from LRC, register it in global # content catalog, queue it for transfer, remove it from # local databucket catalog and claim it # # TODO: Make bulk methods for handling all of these! # surlscopy = surls[:] for surl in surlscopy: guid = files[surl] self.log.logDebug("Analyzing file with GUID %s " % guid) reps = lfn = fsize = md5 = None # if LRC exists at site, we use it to get attributes if self.lrc is not None: atts = self.lrc.getFileAttributes(guid) if atts is None: self.log.logError('Error: File '+surl+' ('+guid+') not found in LRC '+site_conf.local_replica_cat) guids.remove(guid) surls.remove(surl) continue else: (reps, lfn, fsize, md5) = atts lfn = lfn[0] # if no LRC exists, we go to file system to get file else: # # TODO: Make generic method to handle place to get directories # and derive file names # fname = surl[5:] if not os.path.exists(fname): self.log.logError("Error: did not find file %s" % fname) guids.remove(guid) surls.remove(surl) continue md5 = '' fsize = str(os.path.getsize(fname)) lfn = fname.split('/')[-1] lfns.append(lfn) fsizes.append(fsize) md5s.append(md5) # bulk deletion of files from local databuckets catalog self.ldc.deleteFilesFromDataset(surlscopy, vuid) # bulk addition to local queued transfers catalog failures = self.qt.addFilesKnownSURL(surls, lfns, guids, fsizes, md5s, deststorage) for guid in failures: lfn = lfns[guids.index(guid)] surl = surls[guids.index(guid)] lfns.remove(lfn) guids.remove(guid) surls.remove(surl) # bulk addition to central Content catalog if len(guids) > 0: res = self.cc.addFilesToDataset(vuid, lfns, guids) if res.find('Error') != -1: self.log.logError(res) DQ2Mail.sendErrorMail(res) sys.exit(0) if res.find('already in the dataset') != -1: self.log.logError(res) # bulk addition of claims if isinstance(self.clc, ClaimsClient): res = self.clc.addBulkClaim(surls) if res.find('Error') != -1: self.log.logError(res) DQ2Mail.sendErrorMail(res) sys.exit(0) elif isinstance(self.clc, ClaimsCatalog): data = '' for surl in surls: data += surl+'+' res = self.clc.addClaims(data[:-1], self.dn) if res.find('Error') != -1: self.log.logError(res) DQ2Mail.sendErrorMail(res) sys.exit(0) self.log.logInfo('Added %d files that recently appeared on bucket (%d failures)' % (len(guids), len(failures))) sys.exit(0) else: self.log.logInfo( "Launched worked %d " % pid ) self.childs.append( pid ) def fetch(self): """Finds all subscriptions for this site and triggers data movement, for datablocks and databuckets.""" subs = self.lc.querySubscriptionsInSite(site_conf.site_id) dblocks = [] dbuckets = [] for uid in subs.keys(): url = subs[uid] if self.isDatabucket(url): dbuckets.append( (uid, url) ) elif self.isDatablock(url): dblocks.append( (uid, url) ) # TODO: improve how these are chosen self.log.logDebug('Found %d subscriptions for datablocks' % len(dblocks)) self.fetchDatablocks(dblocks) self.log.logDebug('Found %d subscriptions for databuckets' % len(dbuckets)) self.checkDatabuckets(dbuckets) return True def usage(): """Usage: [-i nr] [-s secs] [-d secs] -i: number of iterations -s: number of seconds -d: delay between attempts If -i or -s not specified, stays running as daemon. """ print usage.__doc__ def main(argv): try: opts, args = getopt.getopt(argv, "i:s:d:") except getopt.GetoptError: print "Invalid arguments!" usage() sys.exit(2) secs = 0 it = 0 delay = 20 for o, a in opts: if o == '-s': secs = int(a) elif o == '-i': it = int(a) elif o == '-d': delay = int(a) else: usage() sys.exit(0) if args != []: usage() sys.exit(0) l = Log('Fetcher') l.logInfo('Starting up..') start = int(time.time()) i = 0 f = Fetcher(l) try: while 1: # wait for all childs to be finished ch = f.childs[:] for pid in ch: os.waitpid(pid, 0) f.childs.remove(pid) # launch new cycle l.logInfo( 'Looking for new subscriptions') if f.fetch(): l.logInfo( "Fetching cycle sent") if it > 0: i += 1 if i == it: break if secs > 0 and int(time.time())-start > secs: sys.exit(0) time.sleep(delay) except KeyboardInterrupt, e: pass l.logInfo('Stopping.. waiting for all childs to be finished.') # wait for all childs to be finished for pid in f.childs: os.waitpid(pid, 0) if __name__ == '__main__': main(sys.argv[1:])