# HG changeset patch # User Tuomo Valkonen # Date 1516919695 0 # Node ID 6cfe6a89e8109442ef194f5165ba708946c71bb3 # Parent 1fd6814a29fc4bd636f91cf3254ecb74f095e006 Repository queuing fixes diff -r 1fd6814a29fc -r 6cfe6a89e810 backup.py --- a/backup.py Wed Jan 24 22:23:51 2018 +0000 +++ b/backup.py Thu Jan 25 22:34:55 2018 +0000 @@ -15,6 +15,8 @@ logger=borgend.logger.getChild(__name__) +JOIN_TIMEOUT=60 + # # State and operation related helper classes # @@ -218,7 +220,7 @@ self.logger.debug('Log listener thread waiting for entries') success=True for msg in iter(self.borg_instance.read_log, None): - self.logger.debug(str(msg)) + self.logger.info(str(msg)) t=msg['type'] errormsg=None @@ -287,39 +289,14 @@ res=self.borg_instance.read_result() - # Finish processing remaining errors - self.thread_log.join() - - with self._cond: - errors=self.errors - self.logger.debug('Borg result: %s' % str(res)) - if res is None: - self.logger.error('No result from borg despite no error in log') - if errors.ok(): - errors=Errors.ERRORS - - self.logger.debug('Waiting for borg subprocess to terminate in result thread') - - if not self.borg_instance.wait(): - self.logger.error('Borg subprocess did not terminate') - if errors.ok(): - errors=Errors.ERRORS + with self._cond: + if res is None: + self.logger.error('No result from borg despite no error in log') + if errors.ok(): + self.errors=self.errors.combine(Errors.ERRORS) - self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors)) - - with self._cond: - if self.current_operation.operation=='create': - self.lastrun_when=self.current_operation.when_monotonic - self.thread_res=None - self.thread_log=None - self.borg_instance=None - self.current_operation=None - self.state=State.INACTIVE - self.errors=errors - self.__update_status() - self._cond.notify() def __do_launch(self, op, archive_or_repository, *args): passphrase=self.extract_passphrase() @@ -338,6 +315,14 @@ self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst + self.current_operation=op + # Update scheduled time to real starting time to schedule + # next run relative to this + self.current_operation.when_monotonic=time.monotonic() + self.state=State.ACTIVE + # Reset error status when starting a new operation + self.errors=Errors.OK + self.__update_status() t_log.start() t_res.start() @@ -362,7 +347,7 @@ raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) # This must be called with self._cond held. - def __launch_check(self): + def __launch_and_wait(self): op=self.scheduled_operation if not op: self.logger.debug("Queued operation aborted") @@ -371,25 +356,50 @@ self.__launch(op) - self.current_operation=op - # Update scheduled time to real starting time to schedule - # next run relative to this - self.current_operation.when_monotonic=time.monotonic() - self.state=State.ACTIVE - # Reset error status when starting a new operation - self.errors=Errors.OK - self.__update_status() + self.__wait_finish() + + def __wait_finish(self): + # Wait for main logger thread to terminate, or for us to be terminated + while not self.terminate and self.thread_res.is_alive(): + self._cond.release() + self.thread_res.join(JOIN_TIMEOUT) + self._cond.acquire() + + # If terminate has been signalled, let outer termination handler + # take care of things (Within this Backup class, it would be cleanest + # to raise an exception instead, but in most other places it's better + # to just check self._terminate, so we don't complicate things with + # an extra exception.) + if self._terminate: + return + self.logger.debug('Waiting for borg and log subprocesses to terminate') + + self._cond.release() + self.thread_log.join() + self._cond.acquire() + + if not self.borg_instance.wait(): + self.logger.error('Borg subprocess did not terminate') + self.errors=self.errors.combine(Errors.ERRORS) + + if self.current_operation.operation=='create': + self.lastrun_when=self.current_operation.when_monotonic + self.thread_res=None + self.thread_log=None + self.borg_instance=None + self.current_operation=None + self.state=State.INACTIVE + self.__update_status() def __main_thread(self): with self._cond: try: while not self._terminate: - self.__main_thread_wait_finish() + assert(not self.current_operation) + self.__main_thread_wait_schedule() if not self._terminate: - self.__main_thread_wait_schedule() - if not self._terminate: - self.__main_thread_queue_and_launch() + self.__main_thread_queue_and_launch() except Exception as err: self.logger.exception("Error with backup '%s'" % self._name) self.errors=Errors.ERRORS @@ -416,13 +426,6 @@ if thread_res: thread_res.join() - - # Main thread/1. Wait while a current operation is running - def __main_thread_wait_finish(self): - while self.current_operation and not self._terminate: - self.logger.debug("Waiting for current operation to finish") - self._cond.wait() - # Main thread/2. Schedule next operation if there is no manually # requested one def __main_thread_wait_schedule(self): @@ -459,9 +462,14 @@ self.logger.debug("Queuing") self.state=State.QUEUED self.__update_status() - self.repository.queue_action(self._cond, - action=self.__launch_check, - name=self._name) + res=self.repository.queue_action(self._cond, + action=self.__launch_and_wait, + name=self._name) + if not res and not self._terminate: + self.logger.debug("Queueing aborted") + self.scheduled_operation=None + self.state=State.INACTIVE + self.__update_status() def __next_operation_unlocked(self): # TODO: pruning as well diff -r 1fd6814a29fc -r 6cfe6a89e810 repository.py --- a/repository.py Wed Jan 24 22:23:51 2018 +0000 +++ b/repository.py Thu Jan 25 22:34:55 2018 +0000 @@ -3,15 +3,18 @@ # import weakref +import borgend from scheduler import QueueThread, QueuedEvent +logger=borgend.logger.getChild(__name__) + class FIFOEvent(QueuedEvent): def __init__(self, cond, name=None): self._goodtogo=False super().__init__(cond, name=name) def __lt__(self, other): - return True + return False class FIFO(QueueThread): def __init__(self, **kwargs): @@ -24,8 +27,9 @@ if ev: # We can only remove ev from the list when ev.cond allows with ev.cond: - ev._goodtogo=True - ev.cond.notifyAll() + if not ev._goodtogo: + ev._goodtogo=True + ev.cond.notifyAll() self._cond.wait() # Termination cleanup @@ -43,19 +47,22 @@ with self._cond: self._insert(ev) - goodtogo=False - terminate_=False - while not goodtogo and not terminate_: - # This will release the lock on cond, allowing queue manager (scheduler) - # thread to notify us if we are already to be released - ev.cond.wait() - with ev.cond: - goodtogo=ev._goodtogo - with self._cond: - terminate_=self._terminate + # This will release the lock on cond, allowing queue manager (scheduler) + # thread to notify us if we are already to be released + logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') + ev.cond.wait() try: - if not terminate_: + if ev._goodtogo: + logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') + # + # TODO: action() has to unlink on finish; so should maybe + # use weak references to event. + # Or we have to make action take all the time, so make the + # stdout thread. + # OR: Easiest to just move finish-waiting into __launch_check + # instead of at the outer level of the main loop. + # action() finally: with self._cond: @@ -63,6 +70,8 @@ # Let _fifo_thread proceed to next action self._cond.notify() + return ev._goodtogo + class Repository(FIFO): def __init__(self, name): super().__init__(name = 'RepositoryThread %s' % name)