--- a/borgend/backup.py Wed Jan 31 00:06:54 2018 +0000 +++ b/borgend/backup.py Wed Jan 31 22:32:11 2018 +0000 @@ -20,7 +20,7 @@ _logger=logging.getLogger(__name__) -JOIN_TIMEOUT=60 +JOIN_TIMEOUT=10 # # State and operation related helper classes @@ -29,9 +29,10 @@ class State(IntEnum): # State INACTIVE=0 - SCHEDULED=1 - QUEUED=2 - ACTIVE=3 + PAUSED=1 + SCHEDULED=2 + QUEUED=3 + ACTIVE=4 class Errors(IntEnum): @@ -240,6 +241,7 @@ def __init__(self, identifier, cfg, scheduler): self.identifier=identifier self.__status_update_callback=None + self._pause=False self.scheduler=scheduler self.logger=None # setup up in __decode_config once backup name is known @@ -333,7 +335,9 @@ errors=Errors.BUSY with self._cond: self.current_operation.add_error(errors) - status, callback=self.__status_unlocked() + # Don't notify of errors if we are terminating or pausing + if not self._terminate_or_pause(): + status, callback=self.__status_unlocked() elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE: # Borg gives very little progress info in easy form, so try to extrat it archive_number, of_total=check_prune_status(msg['message']) @@ -487,21 +491,24 @@ self.__wait_finish() + def _terminate_or_pause(self): + return self._terminate or self._pause + def __wait_finish(self): current=self.current_operation # Wait for main logger thread to terminate, or for us to be terminated - while not self.terminate and self.thread_res.is_alive(): + while not self._terminate_or_pause() and self.thread_res.is_alive(): self._cond.release() self.thread_res.join(JOIN_TIMEOUT) self._cond.acquire() - # If terminate has been signalled, let outer termination handler + # If terminate or pause has been signalled, let outer termination handler # take care of things (Within this Backup class, it would be cleanest # to raise an exception instead, but in most other places it's better # to just check self._terminate, so we don't complicate things with # an extra exception.) - if self._terminate: + if self.thread_res.is_alive(): return self.logger.debug('Waiting for borg and log subprocesses to terminate') @@ -522,8 +529,6 @@ self.thread_res=None self.thread_log=None self.borg_instance=None - self.state=State.INACTIVE - self.__update_status() def __main_thread(self): with self._cond: @@ -531,22 +536,19 @@ try: assert(not self.current_operation) self.__main_thread_wait_schedule() - if not self._terminate: + if (not self._terminate_or_pause() and self.scheduled_operation + and self.scheduled_operation.start_time <= MonotonicTime.now()): self.__main_thread_queue_and_launch() except Exception as err: - self.logger.exception("Error with backup '%s'" % self.backup_name) - self.errors=Errors.ERRORS + self.logger.exception("Exception in backup '%s'" % self.backup_name) + finally: self.__cleanup() - self.__cleanup() - def __cleanup(self): - self.state=State.INACTIVE - self.scheduled_operation=None - self.current_operation=None thread_log=self.thread_log thread_res=self.thread_res borg_instance=self.borg_instance + self.scheduled_operation=None self.thread_log=None self.thread_res=None self.borg_instance=None @@ -566,33 +568,42 @@ thread_res.join() finally: self._cond.acquire() + self.current_operation=None # Main thread/2. Schedule next operation if there is no manually # requested one def __main_thread_wait_schedule(self): op=None - if not self.scheduled_operation: - op=self.__next_operation_unlocked() - if op: - self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" % - (str(op.type), op.detail or 'none', - op.start_time.isoformat(), - op.start_time.__class__.__name__)) + if self._pause: + self.logger.info("Waiting for resume to be signalled") - self.scheduled_operation=op - self.state=State.SCHEDULED - self.__update_status() - - # Wait under scheduled wait - self.scheduler.wait_until(op.start_time, self._cond, self.backup_name) - else: - # Nothing scheduled - just wait - self.logger.info("Waiting for manual scheduling") - - self.state=State.INACTIVE + self.state=State.PAUSED self.__update_status() self._cond.wait() + else: + if not self.scheduled_operation: + op=self.__next_operation_unlocked() + if op: + self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" % + (str(op.type), op.detail or 'none', + op.start_time.isoformat(), + op.start_time.__class__.__name__)) + + self.scheduled_operation=op + self.state=State.SCHEDULED + self.__update_status() + + # Wait under scheduled wait + self.scheduler.wait_until(op.start_time, self._cond, self.backup_name) + else: + # Nothing scheduled - just wait + self.logger.info("Waiting for manual scheduling") + + self.state=State.INACTIVE + self.__update_status() + + self._cond.wait() # Main thread/3. If there is a scheduled operation (it might have been # changed manually from 'op' created in __main_thread_wait_schedule above), @@ -606,11 +617,8 @@ res=self.repository.queue_action(self._cond, action=self.__launch_and_wait, name=self.backup_name) - if not res and not self._terminate: + if not res: self.logger.debug("Queueing aborted") - self.scheduled_operation=None - self.state=State.INACTIVE - self.__update_status() def __next_operation_unlocked(self): listop=self.__next_operation_list() @@ -745,3 +753,21 @@ if self.borg_instance: self.borg_instance.terminate() + def is_paused(self): + with self._cond: + paused=self.state==State.PAUSED + return paused + + def pause(self): + with self._cond: + self.logger.debug('Pause signalled') + self.scheduled_operation=None + self._pause=True + self._cond.notify() + + def resume(self): + with self._cond: + self.logger.debug('Resume signalled') + self._pause=False + self._cond.notify() +