borgend/backup.py

changeset 98
9052e427ea39
parent 97
96d5adbe0205
child 100
b141bed9e718
--- a/borgend/backup.py	Wed Jan 31 00:06:54 2018 +0000
+++ b/borgend/backup.py	Wed Jan 31 22:32:11 2018 +0000
@@ -20,7 +20,7 @@
 
 _logger=logging.getLogger(__name__)
 
-JOIN_TIMEOUT=60
+JOIN_TIMEOUT=10
 
 #
 # State and operation related helper classes
@@ -29,9 +29,10 @@
 class State(IntEnum):
     # State
     INACTIVE=0
-    SCHEDULED=1
-    QUEUED=2
-    ACTIVE=3
+    PAUSED=1
+    SCHEDULED=2
+    QUEUED=3
+    ACTIVE=4
 
 
 class Errors(IntEnum):
@@ -240,6 +241,7 @@
     def __init__(self, identifier, cfg, scheduler):
         self.identifier=identifier
         self.__status_update_callback=None
+        self._pause=False
         self.scheduler=scheduler
         self.logger=None # setup up in __decode_config once backup name is known
 
@@ -333,7 +335,9 @@
                         errors=Errors.BUSY
                     with self._cond:
                         self.current_operation.add_error(errors)
-                        status, callback=self.__status_unlocked()
+                        # Don't notify of errors if we are terminating or pausing
+                        if not self._terminate_or_pause():
+                            status, callback=self.__status_unlocked()
                 elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE:
                     # Borg gives very little progress info in easy form, so try to extrat it
                     archive_number, of_total=check_prune_status(msg['message'])
@@ -487,21 +491,24 @@
 
             self.__wait_finish()
 
+    def _terminate_or_pause(self):
+        return self._terminate or self._pause
+
     def __wait_finish(self):
         current=self.current_operation
 
         # Wait for main logger thread to terminate, or for us to be terminated
-        while not self.terminate and self.thread_res.is_alive():
+        while not self._terminate_or_pause() and self.thread_res.is_alive():
             self._cond.release()
             self.thread_res.join(JOIN_TIMEOUT)
             self._cond.acquire()
 
-        # If terminate has been signalled, let outer termination handler
+        # If terminate or pause has been signalled, let outer termination handler
         # take care of things (Within this Backup class, it would be cleanest
         # to raise an exception instead, but in most other places it's better
         # to just check self._terminate, so we don't complicate things with
         # an extra exception.)
-        if self._terminate:
+        if self.thread_res.is_alive():
             return
 
         self.logger.debug('Waiting for borg and log subprocesses to terminate')
@@ -522,8 +529,6 @@
         self.thread_res=None
         self.thread_log=None
         self.borg_instance=None
-        self.state=State.INACTIVE
-        self.__update_status()
 
     def __main_thread(self):
         with self._cond:
@@ -531,22 +536,19 @@
                 try:
                     assert(not self.current_operation)
                     self.__main_thread_wait_schedule()
-                    if not self._terminate:
+                    if (not self._terminate_or_pause() and self.scheduled_operation
+                        and self.scheduled_operation.start_time <= MonotonicTime.now()):
                         self.__main_thread_queue_and_launch()
                 except Exception as err:
-                    self.logger.exception("Error with backup '%s'" % self.backup_name)
-                    self.errors=Errors.ERRORS
+                    self.logger.exception("Exception in backup '%s'" % self.backup_name)
+                finally:
                     self.__cleanup()
 
-            self.__cleanup()
-
     def __cleanup(self):
-        self.state=State.INACTIVE
-        self.scheduled_operation=None
-        self.current_operation=None
         thread_log=self.thread_log
         thread_res=self.thread_res
         borg_instance=self.borg_instance
+        self.scheduled_operation=None
         self.thread_log=None
         self.thread_res=None
         self.borg_instance=None
@@ -566,33 +568,42 @@
                 thread_res.join()
         finally:
             self._cond.acquire()
+            self.current_operation=None
 
     # Main thread/2. Schedule next operation if there is no manually
     # requested one
     def __main_thread_wait_schedule(self):
         op=None
-        if not self.scheduled_operation:
-            op=self.__next_operation_unlocked()
-        if op:
-            self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" %
-                             (str(op.type), op.detail or 'none',
-                              op.start_time.isoformat(),
-                              op.start_time.__class__.__name__))
+        if self._pause:
+            self.logger.info("Waiting for resume to be signalled")
 
-            self.scheduled_operation=op
-            self.state=State.SCHEDULED
-            self.__update_status()
-
-            # Wait under scheduled wait
-            self.scheduler.wait_until(op.start_time, self._cond, self.backup_name)
-        else:
-            # Nothing scheduled - just wait
-            self.logger.info("Waiting for manual scheduling")
-
-            self.state=State.INACTIVE
+            self.state=State.PAUSED
             self.__update_status()
 
             self._cond.wait()
+        else:
+            if not self.scheduled_operation:
+                op=self.__next_operation_unlocked()
+            if op:
+                self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" %
+                                 (str(op.type), op.detail or 'none',
+                                  op.start_time.isoformat(),
+                                  op.start_time.__class__.__name__))
+
+                self.scheduled_operation=op
+                self.state=State.SCHEDULED
+                self.__update_status()
+
+                # Wait under scheduled wait
+                self.scheduler.wait_until(op.start_time, self._cond, self.backup_name)
+            else:
+                # Nothing scheduled - just wait
+                self.logger.info("Waiting for manual scheduling")
+
+                self.state=State.INACTIVE
+                self.__update_status()
+
+                self._cond.wait()
 
     # Main thread/3. If there is a scheduled operation (it might have been
     # changed manually from 'op' created in __main_thread_wait_schedule above),
@@ -606,11 +617,8 @@
             res=self.repository.queue_action(self._cond,
                                              action=self.__launch_and_wait,
                                              name=self.backup_name)
-            if not res and not self._terminate:
+            if not res:
                 self.logger.debug("Queueing aborted")
-                self.scheduled_operation=None
-                self.state=State.INACTIVE
-                self.__update_status()
 
     def __next_operation_unlocked(self):
         listop=self.__next_operation_list()
@@ -745,3 +753,21 @@
             if self.borg_instance:
                 self.borg_instance.terminate()
 
+    def is_paused(self):
+        with self._cond:
+            paused=self.state==State.PAUSED
+        return paused
+
+    def pause(self):
+        with self._cond:
+            self.logger.debug('Pause signalled')
+            self.scheduled_operation=None
+            self._pause=True
+            self._cond.notify()
+
+    def resume(self):
+        with self._cond:
+            self.logger.debug('Resume signalled')
+            self._pause=False
+            self._cond.notify()
+

mercurial