Better recovery from errors; fixes to potential race conditions in scheduler and repository queue

Sun, 28 Jan 2018 19:27:34 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Sun, 28 Jan 2018 19:27:34 +0000
changeset 87
a214d475aa28
parent 86
2fe66644c50d
child 88
dfd52898f175

Better recovery from errors; fixes to potential race conditions in scheduler and repository queue

borgend.py file | annotate | diff | comparison | revisions
borgend/backup.py file | annotate | diff | comparison | revisions
borgend/instance.py file | annotate | diff | comparison | revisions
borgend/repository.py file | annotate | diff | comparison | revisions
borgend/scheduler.py file | annotate | diff | comparison | revisions
borgend/ui.py file | annotate | diff | comparison | revisions
--- a/borgend.py	Sun Jan 28 17:54:14 2018 +0000
+++ b/borgend.py	Sun Jan 28 19:27:34 2018 +0000
@@ -61,15 +61,15 @@
 
     repoconfigs=config.settings['repositories']
 
+    logger.info('Initialising repositories')
     for i in range(len(repoconfigs)):
-        logger.info('Setting up repository %d' % i)
         r=Repository(i, repoconfigs[i])
         repos.append(r)
 
     backupconfigs=config.settings['backups']
 
+    logger.info('Initialising backups')
     for i in range(len(backupconfigs)):
-        logger.info('Setting up backup set %d' % i)
         b=Backup(i, backupconfigs[i], scheduler)
         backups.append(b)
 
--- a/borgend/backup.py	Sun Jan 28 17:54:14 2018 +0000
+++ b/borgend/backup.py	Sun Jan 28 19:27:34 2018 +0000
@@ -296,7 +296,7 @@
                 self.logger.debug('Unrecognised log entry %s' % str(status))
 
             if callback:
-                callback(status, errors=errormsg)
+                callback(status, errorlog=errormsg)
 
         self.logger.debug('Waiting for borg subprocess to terminate in log thread')
 
@@ -354,9 +354,13 @@
     def __do_launch(self, op, archive_or_repository,
                     common_params, op_params, paths=[]):
 
+        self.logger.debug('Creating BorgInstance')
+
         inst=BorgInstance(op.operation, archive_or_repository,
                           common_params, op_params, paths)
 
+        self.logger.debug('Launching BorgInstance via repository')
+
         # Only the Repository object has access to the passphrase
         self.repository.launch_borg_instance(inst)
 
@@ -456,37 +460,45 @@
 
     def __main_thread(self):
         with self._cond:
-            try:
-                while not self._terminate:
+            while not self._terminate:
+                try:
                     assert(not self.current_operation)
                     self.__main_thread_wait_schedule()
                     if not self._terminate:
                         self.__main_thread_queue_and_launch()
-            except Exception as err:
-                self.logger.exception("Error with backup '%s'" % self.backup_name)
-                self.errors=Errors.ERRORS
+                except Exception as err:
+                    self.logger.exception("Error with backup '%s'" % self.backup_name)
+                    self.errors=Errors.ERRORS
+                    self.__cleanup()
 
-            self.state=State.INACTIVE
-            self.scheduled_operation=None
-
-            # Clean up to terminate: kill borg instance and communication threads
-            if self.borg_instance:
-                self.logger.debug("Terminating a borg instance")
-                self.borg_instance.terminate()
+            self.__cleanup()
 
-            # Store threads to use outside lock
-            thread_log=self.thread_log
-            thread_res=self.thread_res
-            self.thread_log=None
-            self.thread_res=None
+    def __cleanup(self):
+        self.state=State.INACTIVE
+        self.scheduled_operation=None
+        self.current_operation=None
+        thread_log=self.thread_log
+        thread_res=self.thread_res
+        borg_instance=self.borg_instance
+        self.thread_log=None
+        self.thread_res=None
+        self.borg_instance=None
 
-        self.logger.debug("Waiting for log and result threads to terminate")
+        self._cond.release()
+        try:
+            if borg_instance:
+                self.logger.debug("Terminating a borg instance")
+                borg_instance.terminate()
 
-        if thread_log:
-            thread_log.join()
+            if thread_log:
+                self.logger.debug("Waiting for log thread to terminate")
+                thread_log.join()
 
-        if thread_res:
-            thread_res.join()
+            if thread_res:
+                self.logger.debug("Waiting for result thread to terminate")
+                thread_res.join()
+        finally:
+            self._cond.acquire()
 
     # Main thread/2. Schedule next operation if there is no manually
     # requested one
--- a/borgend/instance.py	Sun Jan 28 17:54:14 2018 +0000
+++ b/borgend/instance.py	Sun Jan 28 19:27:34 2018 +0000
@@ -35,6 +35,7 @@
         self.common_params=common_params
         self.op_params=op_params
         self.paths=paths
+        self.proc=None
 
     def construct_cmdline(self):
         cmd=([settings['borg']['executable']]+necessary_opts+
@@ -124,11 +125,16 @@
                     'message': str(errmsg)}
 
     def terminate(self):
-        self.proc.terminate()
+        if self.proc:
+            self.proc.terminate()
 
-    def wait(self):
-        return self.proc.wait() is not None
+    # Returns True if has terminated
+    def wait(self, timeout=None):
+        if self.proc:
+            return self.proc.wait(timeout=timeout) is not None
+        else:
+            return True
 
     def has_terminated(self):
-        return self.proc.poll() is not None
+        return not self.proc or (self.proc.poll() is not None)
 
--- a/borgend/repository.py	Sun Jan 28 17:54:14 2018 +0000
+++ b/borgend/repository.py	Sun Jan 28 19:27:34 2018 +0000
@@ -19,6 +19,10 @@
     def __lt__(self, other):
         return False
 
+# This FIFO essentially a fancy semaphore: Each thread waits on its own
+# Condition, so can also be woken up from other executing threads.
+# If they are woken up by the FIFO, then a "good to go" flag is set;
+# and a specified action executed. Otherwise this is not done.
 class FIFO(QueueThread):
     def __init__(self, **kwargs):
         super().__init__(target = self._fifo_thread, **kwargs)
@@ -28,13 +32,22 @@
             while not self._terminate:
                 ev=self._list
                 if ev:
-                    # We can only remove ev from the list when ev.cond allows
+                    # We have to release lock on self._cond before obtaining
+                    # one on ev.cond to avoid race conditions with
+                    # self.queue_acion
+                    self._cond.release()
                     with ev.cond:
+                        # Just set "good to go" flag and notify the queued
+                        # thread. To keep blocking other thread, it is the
+                        # job of the queued thred to remove itself.
                         if not ev._goodtogo:
                             ev._goodtogo=True
-                            ev.cond.notifyAll()
+                            ev.cond.notify_all()
+                    self._cond.acquire()
                 self._cond.wait()
 
+            self.logger.debug('Terminating')
+
             # Termination cleanup
             ev=self._list
             while ev:
@@ -49,23 +62,16 @@
 
         with self._cond:
             self._insert(ev)
+            self._cond.notify()
 
         # This will release the lock on cond, allowing queue manager (scheduler)
-        # thread to notify us if we are already to be released
+        # thread to notify us if we are ready to be released
         logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN')
         ev.cond.wait()
 
         try:
             if ev._goodtogo:
                 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN')
-                #
-                # TODO: action() has to unlink on finish; so should maybe
-                # use weak references to event.
-                # Or we have to make action take all the time, so make the
-                # stdout thread.
-                # OR: Easiest to just move finish-waiting into __launch_check
-                # instead of at the outer level of the main loop.
-                #
                 action()
         finally:
             with self._cond:
@@ -102,7 +108,7 @@
 
         if config.settings['extract_passphrases_at_startup']:
             try:
-                self.extract_passphrase()
+                self.__extract_passphrase()
             except Exception:
                 pass
 
@@ -113,8 +119,8 @@
         repositories[self.repository_name]=self
 
     def __extract_passphrase(self):
-        acc=self.__keychain_account
         if not self.__passphrase:
+            acc=self.__keychain_account
             if acc and acc!='':
                 self.logger.debug('Requesting passphrase')
                 try:
@@ -130,9 +136,16 @@
         return self.__passphrase
 
     def launch_borg_instance(self, inst):
-        with self._cond:
-            passphrase=self.__extract_passphrase()
-        inst.launch(passphrase=passphrase)
+        try:
+            self.logger.debug('launch_borg_instance: entering _cond')
+            with self._cond:
+                self.logger.debug('launch_borg_instance: entering __extract_passphrase')
+                passphrase=self.__extract_passphrase()
+        except Exception as err:
+            self.logger.error('Aborting operation due to failure to obtain passphrase')
+            raise err
+        else:
+            inst.launch(passphrase=passphrase)
 
 def find_repository(name):
     if name in repositories:
--- a/borgend/scheduler.py	Sun Jan 28 17:54:14 2018 +0000
+++ b/borgend/scheduler.py	Sun Jan 28 19:27:34 2018 +0000
@@ -18,6 +18,7 @@
         self.prev=None
         self.name=name
         self.cond=cond
+        self.linked=False
 
     def __lt__(self, other):
         raise NotImplementedError
@@ -77,6 +78,7 @@
         self._list = None
 
     def _insert(self, ev):
+        assert(not ev.linked)
         if not self._list:
             #logger.debug("Insert first")
             self._list=ev
@@ -87,13 +89,14 @@
         else:
             #logger.debug("Insert after")
             self._list.insert_after(ev)
-
-        self._cond.notify()
+        ev.linked=True
 
     def _unlink(self, ev):
+        assert(ev.linked)
         if ev==self._list:
             self._list=ev.next
         ev.unlink()
+        ev.linked=False
 
     def _resort(self):
         oldlist=self._list
@@ -139,28 +142,39 @@
                     ev=self._list
                     logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
                     # We are only allowed to remove ev from list when ev.cond allows
+                    self._unlink(ev)
+                    # We need to release the lock on self._cond before acquire
+                    # one ev.cond to avoid race conditions with self._wait
+                    self._cond.release()
                     with ev.cond:
-                        self._list=ev.next
-                        ev.unlink()
-                        ev.cond.notifyAll()
+                        ev.cond.notify_all()
+                    self._cond.acquire()
+
 
     def _wakeup_callback(self):
         logger.debug("Rescheduling events after wakeup")
         with self._cond:
             self._resort()
 
+    # It is required to have acquired the lock on ev.cond on entry
     def _wait(self, ev):
         with self._cond:
             self._insert(ev)
+            self._cond.notify()
 
-        # This will release the lock on cond, allowing queue manager (scheduler)
-        # thread to notify us if we are already to be released
+        # This will release the lock on cond, allowing the scheduler
+        # thread to notify us if we are ready to be released
         ev.cond.wait()
 
         # If we were woken up by some other event, not the scheduler,
         # ensure the event is removed
-        with self._cond:
-            self._unlink(ev)
+        if ev.linked:
+            # Deal with race conditions wrt. the two different locks
+            # in the scheduler
+            #ev.cond.release()
+            with self._cond:
+                self._unlink(ev)
+            #ev.cond.acquire()
 
     # cond has to be acquired on entry!
     def wait_until(self, when, cond, name=None):
--- a/borgend/ui.py	Sun Jan 28 17:54:14 2018 +0000
+++ b/borgend/ui.py	Sun Jan 28 19:27:34 2018 +0000
@@ -173,8 +173,8 @@
         for index in range(len(backups)):
             # Python closures suck dog's balls; hence the _index=index hack
             # See also http://math.andrej.com/2009/04/09/pythons-lambda-is-broken/
-            cb=(lambda status, errors=None, _index=index:
-                self.__status_callback(_index, status, errorlog=errors))
+            cb=(lambda status, errorlog=None, _index=index:
+                self.__status_callback(_index, status, errorlog=errorlog))
             backups[index].set_status_update_callback(cb)
 
         dreamtime.add_callback(self, self.refresh_ui)

mercurial