Added pause feature

Wed, 31 Jan 2018 22:32:11 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Wed, 31 Jan 2018 22:32:11 +0000
changeset 98
9052e427ea39
parent 97
96d5adbe0205
child 99
281bab8361c8

Added pause feature

borgend/backup.py file | annotate | diff | comparison | revisions
borgend/ui.py file | annotate | diff | comparison | revisions
--- a/borgend/backup.py	Wed Jan 31 00:06:54 2018 +0000
+++ b/borgend/backup.py	Wed Jan 31 22:32:11 2018 +0000
@@ -20,7 +20,7 @@
 
 _logger=logging.getLogger(__name__)
 
-JOIN_TIMEOUT=60
+JOIN_TIMEOUT=10
 
 #
 # State and operation related helper classes
@@ -29,9 +29,10 @@
 class State(IntEnum):
     # State
     INACTIVE=0
-    SCHEDULED=1
-    QUEUED=2
-    ACTIVE=3
+    PAUSED=1
+    SCHEDULED=2
+    QUEUED=3
+    ACTIVE=4
 
 
 class Errors(IntEnum):
@@ -240,6 +241,7 @@
     def __init__(self, identifier, cfg, scheduler):
         self.identifier=identifier
         self.__status_update_callback=None
+        self._pause=False
         self.scheduler=scheduler
         self.logger=None # setup up in __decode_config once backup name is known
 
@@ -333,7 +335,9 @@
                         errors=Errors.BUSY
                     with self._cond:
                         self.current_operation.add_error(errors)
-                        status, callback=self.__status_unlocked()
+                        # Don't notify of errors if we are terminating or pausing
+                        if not self._terminate_or_pause():
+                            status, callback=self.__status_unlocked()
                 elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE:
                     # Borg gives very little progress info in easy form, so try to extrat it
                     archive_number, of_total=check_prune_status(msg['message'])
@@ -487,21 +491,24 @@
 
             self.__wait_finish()
 
+    def _terminate_or_pause(self):
+        return self._terminate or self._pause
+
     def __wait_finish(self):
         current=self.current_operation
 
         # Wait for main logger thread to terminate, or for us to be terminated
-        while not self.terminate and self.thread_res.is_alive():
+        while not self._terminate_or_pause() and self.thread_res.is_alive():
             self._cond.release()
             self.thread_res.join(JOIN_TIMEOUT)
             self._cond.acquire()
 
-        # If terminate has been signalled, let outer termination handler
+        # If terminate or pause has been signalled, let outer termination handler
         # take care of things (Within this Backup class, it would be cleanest
         # to raise an exception instead, but in most other places it's better
         # to just check self._terminate, so we don't complicate things with
         # an extra exception.)
-        if self._terminate:
+        if self.thread_res.is_alive():
             return
 
         self.logger.debug('Waiting for borg and log subprocesses to terminate')
@@ -522,8 +529,6 @@
         self.thread_res=None
         self.thread_log=None
         self.borg_instance=None
-        self.state=State.INACTIVE
-        self.__update_status()
 
     def __main_thread(self):
         with self._cond:
@@ -531,22 +536,19 @@
                 try:
                     assert(not self.current_operation)
                     self.__main_thread_wait_schedule()
-                    if not self._terminate:
+                    if (not self._terminate_or_pause() and self.scheduled_operation
+                        and self.scheduled_operation.start_time <= MonotonicTime.now()):
                         self.__main_thread_queue_and_launch()
                 except Exception as err:
-                    self.logger.exception("Error with backup '%s'" % self.backup_name)
-                    self.errors=Errors.ERRORS
+                    self.logger.exception("Exception in backup '%s'" % self.backup_name)
+                finally:
                     self.__cleanup()
 
-            self.__cleanup()
-
     def __cleanup(self):
-        self.state=State.INACTIVE
-        self.scheduled_operation=None
-        self.current_operation=None
         thread_log=self.thread_log
         thread_res=self.thread_res
         borg_instance=self.borg_instance
+        self.scheduled_operation=None
         self.thread_log=None
         self.thread_res=None
         self.borg_instance=None
@@ -566,33 +568,42 @@
                 thread_res.join()
         finally:
             self._cond.acquire()
+            self.current_operation=None
 
     # Main thread/2. Schedule next operation if there is no manually
     # requested one
     def __main_thread_wait_schedule(self):
         op=None
-        if not self.scheduled_operation:
-            op=self.__next_operation_unlocked()
-        if op:
-            self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" %
-                             (str(op.type), op.detail or 'none',
-                              op.start_time.isoformat(),
-                              op.start_time.__class__.__name__))
+        if self._pause:
+            self.logger.info("Waiting for resume to be signalled")
 
-            self.scheduled_operation=op
-            self.state=State.SCHEDULED
-            self.__update_status()
-
-            # Wait under scheduled wait
-            self.scheduler.wait_until(op.start_time, self._cond, self.backup_name)
-        else:
-            # Nothing scheduled - just wait
-            self.logger.info("Waiting for manual scheduling")
-
-            self.state=State.INACTIVE
+            self.state=State.PAUSED
             self.__update_status()
 
             self._cond.wait()
+        else:
+            if not self.scheduled_operation:
+                op=self.__next_operation_unlocked()
+            if op:
+                self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" %
+                                 (str(op.type), op.detail or 'none',
+                                  op.start_time.isoformat(),
+                                  op.start_time.__class__.__name__))
+
+                self.scheduled_operation=op
+                self.state=State.SCHEDULED
+                self.__update_status()
+
+                # Wait under scheduled wait
+                self.scheduler.wait_until(op.start_time, self._cond, self.backup_name)
+            else:
+                # Nothing scheduled - just wait
+                self.logger.info("Waiting for manual scheduling")
+
+                self.state=State.INACTIVE
+                self.__update_status()
+
+                self._cond.wait()
 
     # Main thread/3. If there is a scheduled operation (it might have been
     # changed manually from 'op' created in __main_thread_wait_schedule above),
@@ -606,11 +617,8 @@
             res=self.repository.queue_action(self._cond,
                                              action=self.__launch_and_wait,
                                              name=self.backup_name)
-            if not res and not self._terminate:
+            if not res:
                 self.logger.debug("Queueing aborted")
-                self.scheduled_operation=None
-                self.state=State.INACTIVE
-                self.__update_status()
 
     def __next_operation_unlocked(self):
         listop=self.__next_operation_list()
@@ -745,3 +753,21 @@
             if self.borg_instance:
                 self.borg_instance.terminate()
 
+    def is_paused(self):
+        with self._cond:
+            paused=self.state==State.PAUSED
+        return paused
+
+    def pause(self):
+        with self._cond:
+            self.logger.debug('Pause signalled')
+            self.scheduled_operation=None
+            self._pause=True
+            self._cond.notify()
+
+    def resume(self):
+        with self._cond:
+            self.logger.debug('Resume signalled')
+            self._pause=False
+            self._cond.notify()
+
--- a/borgend/ui.py	Wed Jan 31 00:06:54 2018 +0000
+++ b/borgend/ui.py	Wed Jan 31 22:32:11 2018 +0000
@@ -20,6 +20,7 @@
 
 traynames_ok={
     backup.State.INACTIVE: 'B.',
+    backup.State.PAUSED: 'B‖',
     backup.State.SCHEDULED: 'B.',
     backup.State.QUEUED: 'B:',
     backup.State.ACTIVE: 'B!',
@@ -115,7 +116,9 @@
     if not status.errors.ok():
         info=add_info(info, str(status.errors))
 
-    if status.state==backup.State.SCHEDULED:
+    if status.state==backup.State.PAUSED:
+        info=add_info(info, "paused")
+    elif status.state==backup.State.SCHEDULED:
         # Operation scheduled
         when=status.when()
         now=time.time()
@@ -210,6 +213,8 @@
         menu=[]
         state=(backup.State.INACTIVE, backup.Errors.OK)
         need_reconstruct=None
+        all_paused=True
+
         for index in range(len(self.backups)):
             b=self.backups[index]
             title, this_state, this_need_reconstruct=make_title(self.statuses[index])
@@ -225,6 +230,8 @@
             menu.append(item)
             state=combine_state(state, this_state)
 
+            all_paused=(all_paused and this_state[0]==backup.State.PAUSED)
+
             # Do we have to automatically update menu display?
             if not need_reconstruct:
                 need_reconstruct=this_need_reconstruct
@@ -234,6 +241,13 @@
         menu_log=rumps.MenuItem("Show log", callback=lambda _: showlog())
         menu.append(menu_log)
 
+        if all_paused:
+            pausename='Resume all'
+        else:
+            pausename="Pause all"
+        menu_pause=rumps.MenuItem(pausename, callback=lambda _: self.pause_resume_all())
+        menu.append(menu_pause)
+
         if not settings['no_quit_menu_entry']:
             menu_quit=rumps.MenuItem("Quit...", callback=lambda _: self.quit())
             menu.append(menu_quit)
@@ -290,6 +304,23 @@
             notification_workaround(branding.appname_stylised,
                                     msgid, errorlog['message'])
 
+    def pause_resume_all(self):
+        with self.lock:
+            try:
+                all_paused=True
+                for b in self.backups:
+                    all_paused=all_paused and b.is_paused()
+                if all_paused:
+                    logger.debug('Pausing all')
+                    for b in self.backups:
+                        b.resume()
+                else:
+                    logger.debug('Resuming all')
+                    for b in self.backups:
+                        b.pause()
+            except:
+                logger.exception("Pause/resume error")
+
     def quit(self):
         rumps.quit_application()
 

mercurial