borgend/repository.py

changeset 87
a214d475aa28
parent 86
2fe66644c50d
child 89
51cc2e25af38
--- a/borgend/repository.py	Sun Jan 28 17:54:14 2018 +0000
+++ b/borgend/repository.py	Sun Jan 28 19:27:34 2018 +0000
@@ -19,6 +19,10 @@
     def __lt__(self, other):
         return False
 
+# This FIFO essentially a fancy semaphore: Each thread waits on its own
+# Condition, so can also be woken up from other executing threads.
+# If they are woken up by the FIFO, then a "good to go" flag is set;
+# and a specified action executed. Otherwise this is not done.
 class FIFO(QueueThread):
     def __init__(self, **kwargs):
         super().__init__(target = self._fifo_thread, **kwargs)
@@ -28,13 +32,22 @@
             while not self._terminate:
                 ev=self._list
                 if ev:
-                    # We can only remove ev from the list when ev.cond allows
+                    # We have to release lock on self._cond before obtaining
+                    # one on ev.cond to avoid race conditions with
+                    # self.queue_acion
+                    self._cond.release()
                     with ev.cond:
+                        # Just set "good to go" flag and notify the queued
+                        # thread. To keep blocking other thread, it is the
+                        # job of the queued thred to remove itself.
                         if not ev._goodtogo:
                             ev._goodtogo=True
-                            ev.cond.notifyAll()
+                            ev.cond.notify_all()
+                    self._cond.acquire()
                 self._cond.wait()
 
+            self.logger.debug('Terminating')
+
             # Termination cleanup
             ev=self._list
             while ev:
@@ -49,23 +62,16 @@
 
         with self._cond:
             self._insert(ev)
+            self._cond.notify()
 
         # This will release the lock on cond, allowing queue manager (scheduler)
-        # thread to notify us if we are already to be released
+        # thread to notify us if we are ready to be released
         logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN')
         ev.cond.wait()
 
         try:
             if ev._goodtogo:
                 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN')
-                #
-                # TODO: action() has to unlink on finish; so should maybe
-                # use weak references to event.
-                # Or we have to make action take all the time, so make the
-                # stdout thread.
-                # OR: Easiest to just move finish-waiting into __launch_check
-                # instead of at the outer level of the main loop.
-                #
                 action()
         finally:
             with self._cond:
@@ -102,7 +108,7 @@
 
         if config.settings['extract_passphrases_at_startup']:
             try:
-                self.extract_passphrase()
+                self.__extract_passphrase()
             except Exception:
                 pass
 
@@ -113,8 +119,8 @@
         repositories[self.repository_name]=self
 
     def __extract_passphrase(self):
-        acc=self.__keychain_account
         if not self.__passphrase:
+            acc=self.__keychain_account
             if acc and acc!='':
                 self.logger.debug('Requesting passphrase')
                 try:
@@ -130,9 +136,16 @@
         return self.__passphrase
 
     def launch_borg_instance(self, inst):
-        with self._cond:
-            passphrase=self.__extract_passphrase()
-        inst.launch(passphrase=passphrase)
+        try:
+            self.logger.debug('launch_borg_instance: entering _cond')
+            with self._cond:
+                self.logger.debug('launch_borg_instance: entering __extract_passphrase')
+                passphrase=self.__extract_passphrase()
+        except Exception as err:
+            self.logger.error('Aborting operation due to failure to obtain passphrase')
+            raise err
+        else:
+            inst.launch(passphrase=passphrase)
 
 def find_repository(name):
     if name in repositories:

mercurial