backup.py

changeset 64
6cfe6a89e810
parent 63
1fd6814a29fc
child 70
3f794760d52e
--- a/backup.py	Wed Jan 24 22:23:51 2018 +0000
+++ b/backup.py	Thu Jan 25 22:34:55 2018 +0000
@@ -15,6 +15,8 @@
 
 logger=borgend.logger.getChild(__name__)
 
+JOIN_TIMEOUT=60
+
 #
 # State and operation related helper classes
 #
@@ -218,7 +220,7 @@
         self.logger.debug('Log listener thread waiting for entries')
         success=True
         for msg in iter(self.borg_instance.read_log, None):
-            self.logger.debug(str(msg))
+            self.logger.info(str(msg))
             t=msg['type']
 
             errormsg=None
@@ -287,39 +289,14 @@
 
         res=self.borg_instance.read_result()
 
-        # Finish processing remaining errors
-        self.thread_log.join()
-
-        with self._cond:
-            errors=self.errors
-
         self.logger.debug('Borg result: %s' % str(res))
 
-        if res is None:
-            self.logger.error('No result from borg despite no error in log')
-            if errors.ok():
-                errors=Errors.ERRORS
-
-        self.logger.debug('Waiting for borg subprocess to terminate in result thread')
-
-        if not self.borg_instance.wait():
-            self.logger.error('Borg subprocess did not terminate')
-            if errors.ok():
-                errors=Errors.ERRORS
+        with self._cond:
+            if res is None:
+                self.logger.error('No result from borg despite no error in log')
+                if errors.ok():
+                    self.errors=self.errors.combine(Errors.ERRORS)
 
-        self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors))
-
-        with self._cond:
-            if self.current_operation.operation=='create':
-                self.lastrun_when=self.current_operation.when_monotonic
-            self.thread_res=None
-            self.thread_log=None
-            self.borg_instance=None
-            self.current_operation=None
-            self.state=State.INACTIVE
-            self.errors=errors
-            self.__update_status()
-            self._cond.notify()
 
     def __do_launch(self, op, archive_or_repository, *args):
         passphrase=self.extract_passphrase()
@@ -338,6 +315,14 @@
         self.thread_log=t_log
         self.thread_res=t_res
         self.borg_instance=inst
+        self.current_operation=op
+        # Update scheduled time to real starting time to schedule
+        # next run relative to this
+        self.current_operation.when_monotonic=time.monotonic()
+        self.state=State.ACTIVE
+        # Reset error status when starting a new operation
+        self.errors=Errors.OK
+        self.__update_status()
 
         t_log.start()
         t_res.start()
@@ -362,7 +347,7 @@
             raise NotImplementedError("Invalid operation '%s'" % str(op.operation))
 
     # This must be called with self._cond held.
-    def __launch_check(self):
+    def __launch_and_wait(self):
         op=self.scheduled_operation
         if not op:
             self.logger.debug("Queued operation aborted")
@@ -371,25 +356,50 @@
 
             self.__launch(op)
 
-            self.current_operation=op
-            # Update scheduled time to real starting time to schedule
-            # next run relative to this
-            self.current_operation.when_monotonic=time.monotonic()
-            self.state=State.ACTIVE
-            # Reset error status when starting a new operation
-            self.errors=Errors.OK
-            self.__update_status()
+            self.__wait_finish()
+
+    def __wait_finish(self):
+        # Wait for main logger thread to terminate, or for us to be terminated
+        while not self.terminate and self.thread_res.is_alive():
+            self._cond.release()
+            self.thread_res.join(JOIN_TIMEOUT)
+            self._cond.acquire()
+
+        # If terminate has been signalled, let outer termination handler
+        # take care of things (Within this Backup class, it would be cleanest
+        # to raise an exception instead, but in most other places it's better
+        # to just check self._terminate, so we don't complicate things with
+        # an extra exception.)
+        if self._terminate:
+            return
 
+        self.logger.debug('Waiting for borg and log subprocesses to terminate')
+
+        self._cond.release()
+        self.thread_log.join()
+        self._cond.acquire()
+
+        if not self.borg_instance.wait():
+            self.logger.error('Borg subprocess did not terminate')
+            self.errors=self.errors.combine(Errors.ERRORS)
+
+        if self.current_operation.operation=='create':
+            self.lastrun_when=self.current_operation.when_monotonic
+        self.thread_res=None
+        self.thread_log=None
+        self.borg_instance=None
+        self.current_operation=None
+        self.state=State.INACTIVE
+        self.__update_status()
 
     def __main_thread(self):
         with self._cond:
             try:
                 while not self._terminate:
-                    self.__main_thread_wait_finish()
+                    assert(not self.current_operation)
+                    self.__main_thread_wait_schedule()
                     if not self._terminate:
-                        self.__main_thread_wait_schedule()
-                        if not self._terminate:
-                            self.__main_thread_queue_and_launch()
+                        self.__main_thread_queue_and_launch()
             except Exception as err:
                 self.logger.exception("Error with backup '%s'" % self._name)
                 self.errors=Errors.ERRORS
@@ -416,13 +426,6 @@
         if thread_res:
             thread_res.join()
 
-
-    # Main thread/1. Wait while a current operation is running
-    def __main_thread_wait_finish(self):
-        while self.current_operation and not self._terminate:
-            self.logger.debug("Waiting for current operation to finish")
-            self._cond.wait()
-
     # Main thread/2. Schedule next operation if there is no manually
     # requested one
     def __main_thread_wait_schedule(self):
@@ -459,9 +462,14 @@
             self.logger.debug("Queuing")
             self.state=State.QUEUED
             self.__update_status()
-            self.repository.queue_action(self._cond,
-                                         action=self.__launch_check,
-                                         name=self._name)
+            res=self.repository.queue_action(self._cond,
+                                             action=self.__launch_and_wait,
+                                             name=self._name)
+            if not res and not self._terminate:
+                self.logger.debug("Queueing aborted")
+                self.scheduled_operation=None
+                self.state=State.INACTIVE
+                self.__update_status()
 
     def __next_operation_unlocked(self):
         # TODO: pruning as well

mercurial