--- a/borgend/repository.py Sun Jan 28 17:54:14 2018 +0000 +++ b/borgend/repository.py Sun Jan 28 19:27:34 2018 +0000 @@ -19,6 +19,10 @@ def __lt__(self, other): return False +# This FIFO essentially a fancy semaphore: Each thread waits on its own +# Condition, so can also be woken up from other executing threads. +# If they are woken up by the FIFO, then a "good to go" flag is set; +# and a specified action executed. Otherwise this is not done. class FIFO(QueueThread): def __init__(self, **kwargs): super().__init__(target = self._fifo_thread, **kwargs) @@ -28,13 +32,22 @@ while not self._terminate: ev=self._list if ev: - # We can only remove ev from the list when ev.cond allows + # We have to release lock on self._cond before obtaining + # one on ev.cond to avoid race conditions with + # self.queue_acion + self._cond.release() with ev.cond: + # Just set "good to go" flag and notify the queued + # thread. To keep blocking other thread, it is the + # job of the queued thred to remove itself. if not ev._goodtogo: ev._goodtogo=True - ev.cond.notifyAll() + ev.cond.notify_all() + self._cond.acquire() self._cond.wait() + self.logger.debug('Terminating') + # Termination cleanup ev=self._list while ev: @@ -49,23 +62,16 @@ with self._cond: self._insert(ev) + self._cond.notify() # This will release the lock on cond, allowing queue manager (scheduler) - # thread to notify us if we are already to be released + # thread to notify us if we are ready to be released logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') ev.cond.wait() try: if ev._goodtogo: logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') - # - # TODO: action() has to unlink on finish; so should maybe - # use weak references to event. - # Or we have to make action take all the time, so make the - # stdout thread. - # OR: Easiest to just move finish-waiting into __launch_check - # instead of at the outer level of the main loop. - # action() finally: with self._cond: @@ -102,7 +108,7 @@ if config.settings['extract_passphrases_at_startup']: try: - self.extract_passphrase() + self.__extract_passphrase() except Exception: pass @@ -113,8 +119,8 @@ repositories[self.repository_name]=self def __extract_passphrase(self): - acc=self.__keychain_account if not self.__passphrase: + acc=self.__keychain_account if acc and acc!='': self.logger.debug('Requesting passphrase') try: @@ -130,9 +136,16 @@ return self.__passphrase def launch_borg_instance(self, inst): - with self._cond: - passphrase=self.__extract_passphrase() - inst.launch(passphrase=passphrase) + try: + self.logger.debug('launch_borg_instance: entering _cond') + with self._cond: + self.logger.debug('launch_borg_instance: entering __extract_passphrase') + passphrase=self.__extract_passphrase() + except Exception as err: + self.logger.error('Aborting operation due to failure to obtain passphrase') + raise err + else: + inst.launch(passphrase=passphrase) def find_repository(name): if name in repositories: