Wed, 07 Feb 2018 20:39:01 +0000
Time snapshot fixes.
Python's default arguments are purely idiotic (aka. pythonic): generated
only once. This makes sense in a purely functional language, which Python
lightyears away from, but severely limits their usefulness in an imperative
language. Decorators also seem clumsy for this, as one would have to tell
the number of positional arguments for things to work nice, being able to
pass the snapshot both positionally and as keyword. No luck.
So have to do things the old-fashioned hard way.
# # Borgend by Tuomo Valkonen, 2018 # # Repository abstraction for queuing. # import weakref import keyring import logging from . import config from .scheduler import QueueThread, QueuedEvent from .exprotect import protect_noreturn logger=logging.getLogger(__name__) class FIFOEvent(QueuedEvent): def __init__(self, cond, name=None): self._goodtogo=False super().__init__(cond, name=name) def is_before(self, other, snapshot=None): return False # This FIFO essentially a fancy semaphore: Each thread waits on its own # Condition, so can also be woken up from other executing threads. # If they are woken up by the FIFO, then a "good to go" flag is set; # and a specified action executed. Otherwise this is not done. class FIFO(QueueThread): def __init__(self, **kwargs): super().__init__(target = self._fifo_thread, **kwargs) @protect_noreturn def _fifo_thread(self): with self._cond: while not self._terminate: ev=self._list if ev: # We have to release lock on self._cond before obtaining # one on ev.cond to avoid race conditions with # self.queue_acion self._cond.release() with ev.cond: # Just set "good to go" flag and notify the queued # thread. To keep blocking other thread, it is the # job of the queued thred to remove itself. if not ev._goodtogo: ev._goodtogo=True ev.cond.notify_all() self._cond.acquire() self._cond.wait() self.logger.debug('Terminating') # Termination cleanup ev=self._list while ev: # We can only remove ev from the list when ev.cond allows with ev.cond: ev.cond.notifyAll() ev=ev.next # cond has to be acquired on entry! def queue_action(self, cond, action=lambda: (), name=None): ev=FIFOEvent(cond, name=name) with self._cond: self._insert(ev) self._cond.notify() # This will release the lock on cond, allowing queue manager (scheduler) # thread to notify us if we are ready to be released logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') ev.cond.wait() try: if ev._goodtogo: logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') action() finally: with self._cond: self._unlink(ev) # Let _fifo_thread proceed to next action self._cond.notify() return ev._goodtogo repositories=weakref.WeakValueDictionary() class Repository(FIFO): def __decode_config(self, cfg): loc0='Repository %d' % self.identifier self.repository_name=config.check_string(cfg, 'name', 'Name', loc0) logger.debug("Configuring repository '%s'" % self.repository_name) loc = 'Repository "%s"' self.logger=logger.getChild(self.repository_name) self.location=config.check_string(cfg, 'location', 'Target repository location', loc) self.borg_parameters=config.BorgParameters.from_config(cfg, loc) self.__keychain_account=config.check_string(cfg, 'keychain_account', 'Keychain account', loc, default='') self.__passphrase=None if config.settings['extract_passphrases_at_startup']: try: self.__extract_passphrase() except Exception: pass def __init__(self, identifier, cfg): self.identifier=identifier self.__decode_config(cfg) super().__init__(name = 'RepositoryThread %s' % self.repository_name) repositories[self.repository_name]=self def __extract_passphrase(self): if not self.__passphrase: acc=self.__keychain_account if acc and acc!='': self.logger.debug('Requesting passphrase') try: pw=keyring.get_password("borg-backup", acc) except Exception as err: self.logger.error('Failed to retrieve passphrase') raise err else: self.logger.debug('Received passphrase') self.__passphrase=pw else: self.__passphrase=None return self.__passphrase def launch_borg_instance(self, inst): try: self.logger.debug('launch_borg_instance: entering _cond') with self._cond: self.logger.debug('launch_borg_instance: entering __extract_passphrase') passphrase=self.__extract_passphrase() except Exception as err: self.logger.error('Aborting operation due to failure to obtain passphrase') raise err else: inst.launch(passphrase=passphrase) def find_repository(name): if name in repositories: return repositories[name] else: return None