Repository queuing fixes

Thu, 25 Jan 2018 22:34:55 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Thu, 25 Jan 2018 22:34:55 +0000
changeset 64
6cfe6a89e810
parent 63
1fd6814a29fc
child 65
6fed67863b00

Repository queuing fixes

backup.py file | annotate | diff | comparison | revisions
repository.py file | annotate | diff | comparison | revisions
--- a/backup.py	Wed Jan 24 22:23:51 2018 +0000
+++ b/backup.py	Thu Jan 25 22:34:55 2018 +0000
@@ -15,6 +15,8 @@
 
 logger=borgend.logger.getChild(__name__)
 
+JOIN_TIMEOUT=60
+
 #
 # State and operation related helper classes
 #
@@ -218,7 +220,7 @@
         self.logger.debug('Log listener thread waiting for entries')
         success=True
         for msg in iter(self.borg_instance.read_log, None):
-            self.logger.debug(str(msg))
+            self.logger.info(str(msg))
             t=msg['type']
 
             errormsg=None
@@ -287,39 +289,14 @@
 
         res=self.borg_instance.read_result()
 
-        # Finish processing remaining errors
-        self.thread_log.join()
-
-        with self._cond:
-            errors=self.errors
-
         self.logger.debug('Borg result: %s' % str(res))
 
-        if res is None:
-            self.logger.error('No result from borg despite no error in log')
-            if errors.ok():
-                errors=Errors.ERRORS
-
-        self.logger.debug('Waiting for borg subprocess to terminate in result thread')
-
-        if not self.borg_instance.wait():
-            self.logger.error('Borg subprocess did not terminate')
-            if errors.ok():
-                errors=Errors.ERRORS
+        with self._cond:
+            if res is None:
+                self.logger.error('No result from borg despite no error in log')
+                if errors.ok():
+                    self.errors=self.errors.combine(Errors.ERRORS)
 
-        self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors))
-
-        with self._cond:
-            if self.current_operation.operation=='create':
-                self.lastrun_when=self.current_operation.when_monotonic
-            self.thread_res=None
-            self.thread_log=None
-            self.borg_instance=None
-            self.current_operation=None
-            self.state=State.INACTIVE
-            self.errors=errors
-            self.__update_status()
-            self._cond.notify()
 
     def __do_launch(self, op, archive_or_repository, *args):
         passphrase=self.extract_passphrase()
@@ -338,6 +315,14 @@
         self.thread_log=t_log
         self.thread_res=t_res
         self.borg_instance=inst
+        self.current_operation=op
+        # Update scheduled time to real starting time to schedule
+        # next run relative to this
+        self.current_operation.when_monotonic=time.monotonic()
+        self.state=State.ACTIVE
+        # Reset error status when starting a new operation
+        self.errors=Errors.OK
+        self.__update_status()
 
         t_log.start()
         t_res.start()
@@ -362,7 +347,7 @@
             raise NotImplementedError("Invalid operation '%s'" % str(op.operation))
 
     # This must be called with self._cond held.
-    def __launch_check(self):
+    def __launch_and_wait(self):
         op=self.scheduled_operation
         if not op:
             self.logger.debug("Queued operation aborted")
@@ -371,25 +356,50 @@
 
             self.__launch(op)
 
-            self.current_operation=op
-            # Update scheduled time to real starting time to schedule
-            # next run relative to this
-            self.current_operation.when_monotonic=time.monotonic()
-            self.state=State.ACTIVE
-            # Reset error status when starting a new operation
-            self.errors=Errors.OK
-            self.__update_status()
+            self.__wait_finish()
+
+    def __wait_finish(self):
+        # Wait for main logger thread to terminate, or for us to be terminated
+        while not self.terminate and self.thread_res.is_alive():
+            self._cond.release()
+            self.thread_res.join(JOIN_TIMEOUT)
+            self._cond.acquire()
+
+        # If terminate has been signalled, let outer termination handler
+        # take care of things (Within this Backup class, it would be cleanest
+        # to raise an exception instead, but in most other places it's better
+        # to just check self._terminate, so we don't complicate things with
+        # an extra exception.)
+        if self._terminate:
+            return
 
+        self.logger.debug('Waiting for borg and log subprocesses to terminate')
+
+        self._cond.release()
+        self.thread_log.join()
+        self._cond.acquire()
+
+        if not self.borg_instance.wait():
+            self.logger.error('Borg subprocess did not terminate')
+            self.errors=self.errors.combine(Errors.ERRORS)
+
+        if self.current_operation.operation=='create':
+            self.lastrun_when=self.current_operation.when_monotonic
+        self.thread_res=None
+        self.thread_log=None
+        self.borg_instance=None
+        self.current_operation=None
+        self.state=State.INACTIVE
+        self.__update_status()
 
     def __main_thread(self):
         with self._cond:
             try:
                 while not self._terminate:
-                    self.__main_thread_wait_finish()
+                    assert(not self.current_operation)
+                    self.__main_thread_wait_schedule()
                     if not self._terminate:
-                        self.__main_thread_wait_schedule()
-                        if not self._terminate:
-                            self.__main_thread_queue_and_launch()
+                        self.__main_thread_queue_and_launch()
             except Exception as err:
                 self.logger.exception("Error with backup '%s'" % self._name)
                 self.errors=Errors.ERRORS
@@ -416,13 +426,6 @@
         if thread_res:
             thread_res.join()
 
-
-    # Main thread/1. Wait while a current operation is running
-    def __main_thread_wait_finish(self):
-        while self.current_operation and not self._terminate:
-            self.logger.debug("Waiting for current operation to finish")
-            self._cond.wait()
-
     # Main thread/2. Schedule next operation if there is no manually
     # requested one
     def __main_thread_wait_schedule(self):
@@ -459,9 +462,14 @@
             self.logger.debug("Queuing")
             self.state=State.QUEUED
             self.__update_status()
-            self.repository.queue_action(self._cond,
-                                         action=self.__launch_check,
-                                         name=self._name)
+            res=self.repository.queue_action(self._cond,
+                                             action=self.__launch_and_wait,
+                                             name=self._name)
+            if not res and not self._terminate:
+                self.logger.debug("Queueing aborted")
+                self.scheduled_operation=None
+                self.state=State.INACTIVE
+                self.__update_status()
 
     def __next_operation_unlocked(self):
         # TODO: pruning as well
--- a/repository.py	Wed Jan 24 22:23:51 2018 +0000
+++ b/repository.py	Thu Jan 25 22:34:55 2018 +0000
@@ -3,15 +3,18 @@
 #
 
 import weakref
+import borgend
 from scheduler import QueueThread, QueuedEvent
 
+logger=borgend.logger.getChild(__name__)
+
 class FIFOEvent(QueuedEvent):
     def __init__(self, cond, name=None):
         self._goodtogo=False
         super().__init__(cond, name=name)
 
     def __lt__(self, other):
-        return True
+        return False
 
 class FIFO(QueueThread):
     def __init__(self, **kwargs):
@@ -24,8 +27,9 @@
                 if ev:
                     # We can only remove ev from the list when ev.cond allows
                     with ev.cond:
-                        ev._goodtogo=True
-                        ev.cond.notifyAll()
+                        if not ev._goodtogo:
+                            ev._goodtogo=True
+                            ev.cond.notifyAll()
                 self._cond.wait()
 
             # Termination cleanup
@@ -43,19 +47,22 @@
         with self._cond:
             self._insert(ev)
 
-        goodtogo=False
-        terminate_=False
-        while not goodtogo and not terminate_:
-            # This will release the lock on cond, allowing queue manager (scheduler)
-            # thread to notify us if we are already to be released
-            ev.cond.wait()
-            with ev.cond:
-                goodtogo=ev._goodtogo
-            with self._cond:
-                terminate_=self._terminate
+        # This will release the lock on cond, allowing queue manager (scheduler)
+        # thread to notify us if we are already to be released
+        logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN')
+        ev.cond.wait()
 
         try:
-            if not terminate_:
+            if ev._goodtogo:
+                logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN')
+                #
+                # TODO: action() has to unlink on finish; so should maybe
+                # use weak references to event.
+                # Or we have to make action take all the time, so make the
+                # stdout thread.
+                # OR: Easiest to just move finish-waiting into __launch_check
+                # instead of at the outer level of the main loop.
+                #
                 action()
         finally:
             with self._cond:
@@ -63,6 +70,8 @@
                 # Let _fifo_thread proceed to next action
                 self._cond.notify()
 
+        return ev._goodtogo
+
 class Repository(FIFO):
     def __init__(self, name):
         super().__init__(name = 'RepositoryThread %s' % name)

mercurial