# HG changeset patch # User Tuomo Valkonen # Date 1517437931 0 # Node ID 9052e427ea399466c3e2eb6f0a06355b2afbedc4 # Parent 96d5adbe0205ca56c65c8fceb3306e35d5534a07 Added pause feature diff -r 96d5adbe0205 -r 9052e427ea39 borgend/backup.py --- a/borgend/backup.py Wed Jan 31 00:06:54 2018 +0000 +++ b/borgend/backup.py Wed Jan 31 22:32:11 2018 +0000 @@ -20,7 +20,7 @@ _logger=logging.getLogger(__name__) -JOIN_TIMEOUT=60 +JOIN_TIMEOUT=10 # # State and operation related helper classes @@ -29,9 +29,10 @@ class State(IntEnum): # State INACTIVE=0 - SCHEDULED=1 - QUEUED=2 - ACTIVE=3 + PAUSED=1 + SCHEDULED=2 + QUEUED=3 + ACTIVE=4 class Errors(IntEnum): @@ -240,6 +241,7 @@ def __init__(self, identifier, cfg, scheduler): self.identifier=identifier self.__status_update_callback=None + self._pause=False self.scheduler=scheduler self.logger=None # setup up in __decode_config once backup name is known @@ -333,7 +335,9 @@ errors=Errors.BUSY with self._cond: self.current_operation.add_error(errors) - status, callback=self.__status_unlocked() + # Don't notify of errors if we are terminating or pausing + if not self._terminate_or_pause(): + status, callback=self.__status_unlocked() elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE: # Borg gives very little progress info in easy form, so try to extrat it archive_number, of_total=check_prune_status(msg['message']) @@ -487,21 +491,24 @@ self.__wait_finish() + def _terminate_or_pause(self): + return self._terminate or self._pause + def __wait_finish(self): current=self.current_operation # Wait for main logger thread to terminate, or for us to be terminated - while not self.terminate and self.thread_res.is_alive(): + while not self._terminate_or_pause() and self.thread_res.is_alive(): self._cond.release() self.thread_res.join(JOIN_TIMEOUT) self._cond.acquire() - # If terminate has been signalled, let outer termination handler + # If terminate or pause has been signalled, let outer termination handler # take care of things (Within this Backup class, it would be cleanest # to raise an exception instead, but in most other places it's better # to just check self._terminate, so we don't complicate things with # an extra exception.) - if self._terminate: + if self.thread_res.is_alive(): return self.logger.debug('Waiting for borg and log subprocesses to terminate') @@ -522,8 +529,6 @@ self.thread_res=None self.thread_log=None self.borg_instance=None - self.state=State.INACTIVE - self.__update_status() def __main_thread(self): with self._cond: @@ -531,22 +536,19 @@ try: assert(not self.current_operation) self.__main_thread_wait_schedule() - if not self._terminate: + if (not self._terminate_or_pause() and self.scheduled_operation + and self.scheduled_operation.start_time <= MonotonicTime.now()): self.__main_thread_queue_and_launch() except Exception as err: - self.logger.exception("Error with backup '%s'" % self.backup_name) - self.errors=Errors.ERRORS + self.logger.exception("Exception in backup '%s'" % self.backup_name) + finally: self.__cleanup() - self.__cleanup() - def __cleanup(self): - self.state=State.INACTIVE - self.scheduled_operation=None - self.current_operation=None thread_log=self.thread_log thread_res=self.thread_res borg_instance=self.borg_instance + self.scheduled_operation=None self.thread_log=None self.thread_res=None self.borg_instance=None @@ -566,33 +568,42 @@ thread_res.join() finally: self._cond.acquire() + self.current_operation=None # Main thread/2. Schedule next operation if there is no manually # requested one def __main_thread_wait_schedule(self): op=None - if not self.scheduled_operation: - op=self.__next_operation_unlocked() - if op: - self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" % - (str(op.type), op.detail or 'none', - op.start_time.isoformat(), - op.start_time.__class__.__name__)) + if self._pause: + self.logger.info("Waiting for resume to be signalled") - self.scheduled_operation=op - self.state=State.SCHEDULED - self.__update_status() - - # Wait under scheduled wait - self.scheduler.wait_until(op.start_time, self._cond, self.backup_name) - else: - # Nothing scheduled - just wait - self.logger.info("Waiting for manual scheduling") - - self.state=State.INACTIVE + self.state=State.PAUSED self.__update_status() self._cond.wait() + else: + if not self.scheduled_operation: + op=self.__next_operation_unlocked() + if op: + self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" % + (str(op.type), op.detail or 'none', + op.start_time.isoformat(), + op.start_time.__class__.__name__)) + + self.scheduled_operation=op + self.state=State.SCHEDULED + self.__update_status() + + # Wait under scheduled wait + self.scheduler.wait_until(op.start_time, self._cond, self.backup_name) + else: + # Nothing scheduled - just wait + self.logger.info("Waiting for manual scheduling") + + self.state=State.INACTIVE + self.__update_status() + + self._cond.wait() # Main thread/3. If there is a scheduled operation (it might have been # changed manually from 'op' created in __main_thread_wait_schedule above), @@ -606,11 +617,8 @@ res=self.repository.queue_action(self._cond, action=self.__launch_and_wait, name=self.backup_name) - if not res and not self._terminate: + if not res: self.logger.debug("Queueing aborted") - self.scheduled_operation=None - self.state=State.INACTIVE - self.__update_status() def __next_operation_unlocked(self): listop=self.__next_operation_list() @@ -745,3 +753,21 @@ if self.borg_instance: self.borg_instance.terminate() + def is_paused(self): + with self._cond: + paused=self.state==State.PAUSED + return paused + + def pause(self): + with self._cond: + self.logger.debug('Pause signalled') + self.scheduled_operation=None + self._pause=True + self._cond.notify() + + def resume(self): + with self._cond: + self.logger.debug('Resume signalled') + self._pause=False + self._cond.notify() + diff -r 96d5adbe0205 -r 9052e427ea39 borgend/ui.py --- a/borgend/ui.py Wed Jan 31 00:06:54 2018 +0000 +++ b/borgend/ui.py Wed Jan 31 22:32:11 2018 +0000 @@ -20,6 +20,7 @@ traynames_ok={ backup.State.INACTIVE: 'B.', + backup.State.PAUSED: 'B‖', backup.State.SCHEDULED: 'B.', backup.State.QUEUED: 'B:', backup.State.ACTIVE: 'B!', @@ -115,7 +116,9 @@ if not status.errors.ok(): info=add_info(info, str(status.errors)) - if status.state==backup.State.SCHEDULED: + if status.state==backup.State.PAUSED: + info=add_info(info, "paused") + elif status.state==backup.State.SCHEDULED: # Operation scheduled when=status.when() now=time.time() @@ -210,6 +213,8 @@ menu=[] state=(backup.State.INACTIVE, backup.Errors.OK) need_reconstruct=None + all_paused=True + for index in range(len(self.backups)): b=self.backups[index] title, this_state, this_need_reconstruct=make_title(self.statuses[index]) @@ -225,6 +230,8 @@ menu.append(item) state=combine_state(state, this_state) + all_paused=(all_paused and this_state[0]==backup.State.PAUSED) + # Do we have to automatically update menu display? if not need_reconstruct: need_reconstruct=this_need_reconstruct @@ -234,6 +241,13 @@ menu_log=rumps.MenuItem("Show log", callback=lambda _: showlog()) menu.append(menu_log) + if all_paused: + pausename='Resume all' + else: + pausename="Pause all" + menu_pause=rumps.MenuItem(pausename, callback=lambda _: self.pause_resume_all()) + menu.append(menu_pause) + if not settings['no_quit_menu_entry']: menu_quit=rumps.MenuItem("Quit...", callback=lambda _: self.quit()) menu.append(menu_quit) @@ -290,6 +304,23 @@ notification_workaround(branding.appname_stylised, msgid, errorlog['message']) + def pause_resume_all(self): + with self.lock: + try: + all_paused=True + for b in self.backups: + all_paused=all_paused and b.is_paused() + if all_paused: + logger.debug('Pausing all') + for b in self.backups: + b.resume() + else: + logger.debug('Resuming all') + for b in self.backups: + b.pause() + except: + logger.exception("Pause/resume error") + def quit(self): rumps.quit_application()