--- a/backup.py Wed Jan 24 22:23:51 2018 +0000 +++ b/backup.py Thu Jan 25 22:34:55 2018 +0000 @@ -15,6 +15,8 @@ logger=borgend.logger.getChild(__name__) +JOIN_TIMEOUT=60 + # # State and operation related helper classes # @@ -218,7 +220,7 @@ self.logger.debug('Log listener thread waiting for entries') success=True for msg in iter(self.borg_instance.read_log, None): - self.logger.debug(str(msg)) + self.logger.info(str(msg)) t=msg['type'] errormsg=None @@ -287,39 +289,14 @@ res=self.borg_instance.read_result() - # Finish processing remaining errors - self.thread_log.join() - - with self._cond: - errors=self.errors - self.logger.debug('Borg result: %s' % str(res)) - if res is None: - self.logger.error('No result from borg despite no error in log') - if errors.ok(): - errors=Errors.ERRORS - - self.logger.debug('Waiting for borg subprocess to terminate in result thread') - - if not self.borg_instance.wait(): - self.logger.error('Borg subprocess did not terminate') - if errors.ok(): - errors=Errors.ERRORS + with self._cond: + if res is None: + self.logger.error('No result from borg despite no error in log') + if errors.ok(): + self.errors=self.errors.combine(Errors.ERRORS) - self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors)) - - with self._cond: - if self.current_operation.operation=='create': - self.lastrun_when=self.current_operation.when_monotonic - self.thread_res=None - self.thread_log=None - self.borg_instance=None - self.current_operation=None - self.state=State.INACTIVE - self.errors=errors - self.__update_status() - self._cond.notify() def __do_launch(self, op, archive_or_repository, *args): passphrase=self.extract_passphrase() @@ -338,6 +315,14 @@ self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst + self.current_operation=op + # Update scheduled time to real starting time to schedule + # next run relative to this + self.current_operation.when_monotonic=time.monotonic() + self.state=State.ACTIVE + # Reset error status when starting a new operation + self.errors=Errors.OK + self.__update_status() t_log.start() t_res.start() @@ -362,7 +347,7 @@ raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) # This must be called with self._cond held. - def __launch_check(self): + def __launch_and_wait(self): op=self.scheduled_operation if not op: self.logger.debug("Queued operation aborted") @@ -371,25 +356,50 @@ self.__launch(op) - self.current_operation=op - # Update scheduled time to real starting time to schedule - # next run relative to this - self.current_operation.when_monotonic=time.monotonic() - self.state=State.ACTIVE - # Reset error status when starting a new operation - self.errors=Errors.OK - self.__update_status() + self.__wait_finish() + + def __wait_finish(self): + # Wait for main logger thread to terminate, or for us to be terminated + while not self.terminate and self.thread_res.is_alive(): + self._cond.release() + self.thread_res.join(JOIN_TIMEOUT) + self._cond.acquire() + + # If terminate has been signalled, let outer termination handler + # take care of things (Within this Backup class, it would be cleanest + # to raise an exception instead, but in most other places it's better + # to just check self._terminate, so we don't complicate things with + # an extra exception.) + if self._terminate: + return + self.logger.debug('Waiting for borg and log subprocesses to terminate') + + self._cond.release() + self.thread_log.join() + self._cond.acquire() + + if not self.borg_instance.wait(): + self.logger.error('Borg subprocess did not terminate') + self.errors=self.errors.combine(Errors.ERRORS) + + if self.current_operation.operation=='create': + self.lastrun_when=self.current_operation.when_monotonic + self.thread_res=None + self.thread_log=None + self.borg_instance=None + self.current_operation=None + self.state=State.INACTIVE + self.__update_status() def __main_thread(self): with self._cond: try: while not self._terminate: - self.__main_thread_wait_finish() + assert(not self.current_operation) + self.__main_thread_wait_schedule() if not self._terminate: - self.__main_thread_wait_schedule() - if not self._terminate: - self.__main_thread_queue_and_launch() + self.__main_thread_queue_and_launch() except Exception as err: self.logger.exception("Error with backup '%s'" % self._name) self.errors=Errors.ERRORS @@ -416,13 +426,6 @@ if thread_res: thread_res.join() - - # Main thread/1. Wait while a current operation is running - def __main_thread_wait_finish(self): - while self.current_operation and not self._terminate: - self.logger.debug("Waiting for current operation to finish") - self._cond.wait() - # Main thread/2. Schedule next operation if there is no manually # requested one def __main_thread_wait_schedule(self): @@ -459,9 +462,14 @@ self.logger.debug("Queuing") self.state=State.QUEUED self.__update_status() - self.repository.queue_action(self._cond, - action=self.__launch_check, - name=self._name) + res=self.repository.queue_action(self._cond, + action=self.__launch_and_wait, + name=self._name) + if not res and not self._terminate: + self.logger.debug("Queueing aborted") + self.scheduled_operation=None + self.state=State.INACTIVE + self.__update_status() def __next_operation_unlocked(self): # TODO: pruning as well