Mon, 05 Feb 2018 10:25:17 +0000
Added exeption protection decorators to callbacks.
If callbacks crash, there's rarely anything in the logs otherwise.
# # Borgend by Tuomo Valkonen, 2018 # # Repository abstraction for queuing. # import weakref import keyring import logging from . import config from .scheduler import QueueThread, QueuedEvent from .exprotect import protect_noreturn logger=logging.getLogger(__name__) class FIFOEvent(QueuedEvent): def __init__(self, cond, name=None): self._goodtogo=False super().__init__(cond, name=name) def is_before(self, other, snapshot=None): return False # This FIFO essentially a fancy semaphore: Each thread waits on its own # Condition, so can also be woken up from other executing threads. # If they are woken up by the FIFO, then a "good to go" flag is set; # and a specified action executed. Otherwise this is not done. class FIFO(QueueThread): def __init__(self, **kwargs): super().__init__(target = self._fifo_thread, **kwargs) @protect_noreturn def _fifo_thread(self): with self._cond: while not self._terminate: ev=self._list if ev: # We have to release lock on self._cond before obtaining # one on ev.cond to avoid race conditions with # self.queue_acion self._cond.release() with ev.cond: # Just set "good to go" flag and notify the queued # thread. To keep blocking other thread, it is the # job of the queued thred to remove itself. if not ev._goodtogo: ev._goodtogo=True ev.cond.notify_all() self._cond.acquire() self._cond.wait() self.logger.debug('Terminating') # Termination cleanup ev=self._list while ev: # We can only remove ev from the list when ev.cond allows with ev.cond: ev.cond.notifyAll() ev=ev.next # cond has to be acquired on entry! def queue_action(self, cond, action=lambda: (), name=None): ev=FIFOEvent(cond, name=name) with self._cond: self._insert(ev) self._cond.notify() # This will release the lock on cond, allowing queue manager (scheduler) # thread to notify us if we are ready to be released logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') ev.cond.wait() try: if ev._goodtogo: logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') action() finally: with self._cond: self._unlink(ev) # Let _fifo_thread proceed to next action self._cond.notify() return ev._goodtogo repositories=weakref.WeakValueDictionary() class Repository(FIFO): def __decode_config(self, cfg): loc0='Repository %d' % self.identifier self.repository_name=config.check_string(cfg, 'name', 'Name', loc0) logger.debug("Configuring repository '%s'" % self.repository_name) loc = 'Repository "%s"' self.logger=logger.getChild(self.repository_name) self.location=config.check_string(cfg, 'location', 'Target repository location', loc) self.borg_parameters=config.BorgParameters.from_config(cfg, loc) self.__keychain_account=config.check_string(cfg, 'keychain_account', 'Keychain account', loc, default='') self.__passphrase=None if config.settings['extract_passphrases_at_startup']: try: self.__extract_passphrase() except Exception: pass def __init__(self, identifier, cfg): self.identifier=identifier self.__decode_config(cfg) super().__init__(name = 'RepositoryThread %s' % self.repository_name) repositories[self.repository_name]=self def __extract_passphrase(self): if not self.__passphrase: acc=self.__keychain_account if acc and acc!='': self.logger.debug('Requesting passphrase') try: pw=keyring.get_password("borg-backup", acc) except Exception as err: self.logger.error('Failed to retrieve passphrase') raise err else: self.logger.debug('Received passphrase') self.__passphrase=pw else: self.__passphrase=None return self.__passphrase def launch_borg_instance(self, inst): try: self.logger.debug('launch_borg_instance: entering _cond') with self._cond: self.logger.debug('launch_borg_instance: entering __extract_passphrase') passphrase=self.__extract_passphrase() except Exception as err: self.logger.error('Aborting operation due to failure to obtain passphrase') raise err else: inst.launch(passphrase=passphrase) def find_repository(name): if name in repositories: return repositories[name] else: return None