Sun, 28 Jan 2018 19:27:34 +0000
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
borgend.py | file | annotate | diff | comparison | revisions | |
borgend/backup.py | file | annotate | diff | comparison | revisions | |
borgend/instance.py | file | annotate | diff | comparison | revisions | |
borgend/repository.py | file | annotate | diff | comparison | revisions | |
borgend/scheduler.py | file | annotate | diff | comparison | revisions | |
borgend/ui.py | file | annotate | diff | comparison | revisions |
--- a/borgend.py Sun Jan 28 17:54:14 2018 +0000 +++ b/borgend.py Sun Jan 28 19:27:34 2018 +0000 @@ -61,15 +61,15 @@ repoconfigs=config.settings['repositories'] + logger.info('Initialising repositories') for i in range(len(repoconfigs)): - logger.info('Setting up repository %d' % i) r=Repository(i, repoconfigs[i]) repos.append(r) backupconfigs=config.settings['backups'] + logger.info('Initialising backups') for i in range(len(backupconfigs)): - logger.info('Setting up backup set %d' % i) b=Backup(i, backupconfigs[i], scheduler) backups.append(b)
--- a/borgend/backup.py Sun Jan 28 17:54:14 2018 +0000 +++ b/borgend/backup.py Sun Jan 28 19:27:34 2018 +0000 @@ -296,7 +296,7 @@ self.logger.debug('Unrecognised log entry %s' % str(status)) if callback: - callback(status, errors=errormsg) + callback(status, errorlog=errormsg) self.logger.debug('Waiting for borg subprocess to terminate in log thread') @@ -354,9 +354,13 @@ def __do_launch(self, op, archive_or_repository, common_params, op_params, paths=[]): + self.logger.debug('Creating BorgInstance') + inst=BorgInstance(op.operation, archive_or_repository, common_params, op_params, paths) + self.logger.debug('Launching BorgInstance via repository') + # Only the Repository object has access to the passphrase self.repository.launch_borg_instance(inst) @@ -456,37 +460,45 @@ def __main_thread(self): with self._cond: - try: - while not self._terminate: + while not self._terminate: + try: assert(not self.current_operation) self.__main_thread_wait_schedule() if not self._terminate: self.__main_thread_queue_and_launch() - except Exception as err: - self.logger.exception("Error with backup '%s'" % self.backup_name) - self.errors=Errors.ERRORS + except Exception as err: + self.logger.exception("Error with backup '%s'" % self.backup_name) + self.errors=Errors.ERRORS + self.__cleanup() - self.state=State.INACTIVE - self.scheduled_operation=None - - # Clean up to terminate: kill borg instance and communication threads - if self.borg_instance: - self.logger.debug("Terminating a borg instance") - self.borg_instance.terminate() + self.__cleanup() - # Store threads to use outside lock - thread_log=self.thread_log - thread_res=self.thread_res - self.thread_log=None - self.thread_res=None + def __cleanup(self): + self.state=State.INACTIVE + self.scheduled_operation=None + self.current_operation=None + thread_log=self.thread_log + thread_res=self.thread_res + borg_instance=self.borg_instance + self.thread_log=None + self.thread_res=None + self.borg_instance=None - self.logger.debug("Waiting for log and result threads to terminate") + self._cond.release() + try: + if borg_instance: + self.logger.debug("Terminating a borg instance") + borg_instance.terminate() - if thread_log: - thread_log.join() + if thread_log: + self.logger.debug("Waiting for log thread to terminate") + thread_log.join() - if thread_res: - thread_res.join() + if thread_res: + self.logger.debug("Waiting for result thread to terminate") + thread_res.join() + finally: + self._cond.acquire() # Main thread/2. Schedule next operation if there is no manually # requested one
--- a/borgend/instance.py Sun Jan 28 17:54:14 2018 +0000 +++ b/borgend/instance.py Sun Jan 28 19:27:34 2018 +0000 @@ -35,6 +35,7 @@ self.common_params=common_params self.op_params=op_params self.paths=paths + self.proc=None def construct_cmdline(self): cmd=([settings['borg']['executable']]+necessary_opts+ @@ -124,11 +125,16 @@ 'message': str(errmsg)} def terminate(self): - self.proc.terminate() + if self.proc: + self.proc.terminate() - def wait(self): - return self.proc.wait() is not None + # Returns True if has terminated + def wait(self, timeout=None): + if self.proc: + return self.proc.wait(timeout=timeout) is not None + else: + return True def has_terminated(self): - return self.proc.poll() is not None + return not self.proc or (self.proc.poll() is not None)
--- a/borgend/repository.py Sun Jan 28 17:54:14 2018 +0000 +++ b/borgend/repository.py Sun Jan 28 19:27:34 2018 +0000 @@ -19,6 +19,10 @@ def __lt__(self, other): return False +# This FIFO essentially a fancy semaphore: Each thread waits on its own +# Condition, so can also be woken up from other executing threads. +# If they are woken up by the FIFO, then a "good to go" flag is set; +# and a specified action executed. Otherwise this is not done. class FIFO(QueueThread): def __init__(self, **kwargs): super().__init__(target = self._fifo_thread, **kwargs) @@ -28,13 +32,22 @@ while not self._terminate: ev=self._list if ev: - # We can only remove ev from the list when ev.cond allows + # We have to release lock on self._cond before obtaining + # one on ev.cond to avoid race conditions with + # self.queue_acion + self._cond.release() with ev.cond: + # Just set "good to go" flag and notify the queued + # thread. To keep blocking other thread, it is the + # job of the queued thred to remove itself. if not ev._goodtogo: ev._goodtogo=True - ev.cond.notifyAll() + ev.cond.notify_all() + self._cond.acquire() self._cond.wait() + self.logger.debug('Terminating') + # Termination cleanup ev=self._list while ev: @@ -49,23 +62,16 @@ with self._cond: self._insert(ev) + self._cond.notify() # This will release the lock on cond, allowing queue manager (scheduler) - # thread to notify us if we are already to be released + # thread to notify us if we are ready to be released logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') ev.cond.wait() try: if ev._goodtogo: logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') - # - # TODO: action() has to unlink on finish; so should maybe - # use weak references to event. - # Or we have to make action take all the time, so make the - # stdout thread. - # OR: Easiest to just move finish-waiting into __launch_check - # instead of at the outer level of the main loop. - # action() finally: with self._cond: @@ -102,7 +108,7 @@ if config.settings['extract_passphrases_at_startup']: try: - self.extract_passphrase() + self.__extract_passphrase() except Exception: pass @@ -113,8 +119,8 @@ repositories[self.repository_name]=self def __extract_passphrase(self): - acc=self.__keychain_account if not self.__passphrase: + acc=self.__keychain_account if acc and acc!='': self.logger.debug('Requesting passphrase') try: @@ -130,9 +136,16 @@ return self.__passphrase def launch_borg_instance(self, inst): - with self._cond: - passphrase=self.__extract_passphrase() - inst.launch(passphrase=passphrase) + try: + self.logger.debug('launch_borg_instance: entering _cond') + with self._cond: + self.logger.debug('launch_borg_instance: entering __extract_passphrase') + passphrase=self.__extract_passphrase() + except Exception as err: + self.logger.error('Aborting operation due to failure to obtain passphrase') + raise err + else: + inst.launch(passphrase=passphrase) def find_repository(name): if name in repositories:
--- a/borgend/scheduler.py Sun Jan 28 17:54:14 2018 +0000 +++ b/borgend/scheduler.py Sun Jan 28 19:27:34 2018 +0000 @@ -18,6 +18,7 @@ self.prev=None self.name=name self.cond=cond + self.linked=False def __lt__(self, other): raise NotImplementedError @@ -77,6 +78,7 @@ self._list = None def _insert(self, ev): + assert(not ev.linked) if not self._list: #logger.debug("Insert first") self._list=ev @@ -87,13 +89,14 @@ else: #logger.debug("Insert after") self._list.insert_after(ev) - - self._cond.notify() + ev.linked=True def _unlink(self, ev): + assert(ev.linked) if ev==self._list: self._list=ev.next ev.unlink() + ev.linked=False def _resort(self): oldlist=self._list @@ -139,28 +142,39 @@ ev=self._list logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) # We are only allowed to remove ev from list when ev.cond allows + self._unlink(ev) + # We need to release the lock on self._cond before acquire + # one ev.cond to avoid race conditions with self._wait + self._cond.release() with ev.cond: - self._list=ev.next - ev.unlink() - ev.cond.notifyAll() + ev.cond.notify_all() + self._cond.acquire() + def _wakeup_callback(self): logger.debug("Rescheduling events after wakeup") with self._cond: self._resort() + # It is required to have acquired the lock on ev.cond on entry def _wait(self, ev): with self._cond: self._insert(ev) + self._cond.notify() - # This will release the lock on cond, allowing queue manager (scheduler) - # thread to notify us if we are already to be released + # This will release the lock on cond, allowing the scheduler + # thread to notify us if we are ready to be released ev.cond.wait() # If we were woken up by some other event, not the scheduler, # ensure the event is removed - with self._cond: - self._unlink(ev) + if ev.linked: + # Deal with race conditions wrt. the two different locks + # in the scheduler + #ev.cond.release() + with self._cond: + self._unlink(ev) + #ev.cond.acquire() # cond has to be acquired on entry! def wait_until(self, when, cond, name=None):
--- a/borgend/ui.py Sun Jan 28 17:54:14 2018 +0000 +++ b/borgend/ui.py Sun Jan 28 19:27:34 2018 +0000 @@ -173,8 +173,8 @@ for index in range(len(backups)): # Python closures suck dog's balls; hence the _index=index hack # See also http://math.andrej.com/2009/04/09/pythons-lambda-is-broken/ - cb=(lambda status, errors=None, _index=index: - self.__status_callback(_index, status, errorlog=errors)) + cb=(lambda status, errorlog=None, _index=index: + self.__status_callback(_index, status, errorlog=errorlog)) backups[index].set_status_update_callback(cb) dreamtime.add_callback(self, self.refresh_ui)