borgend/scheduler.py

changeset 87
a214d475aa28
parent 86
2fe66644c50d
child 89
51cc2e25af38
--- a/borgend/scheduler.py	Sun Jan 28 17:54:14 2018 +0000
+++ b/borgend/scheduler.py	Sun Jan 28 19:27:34 2018 +0000
@@ -18,6 +18,7 @@
         self.prev=None
         self.name=name
         self.cond=cond
+        self.linked=False
 
     def __lt__(self, other):
         raise NotImplementedError
@@ -77,6 +78,7 @@
         self._list = None
 
     def _insert(self, ev):
+        assert(not ev.linked)
         if not self._list:
             #logger.debug("Insert first")
             self._list=ev
@@ -87,13 +89,14 @@
         else:
             #logger.debug("Insert after")
             self._list.insert_after(ev)
-
-        self._cond.notify()
+        ev.linked=True
 
     def _unlink(self, ev):
+        assert(ev.linked)
         if ev==self._list:
             self._list=ev.next
         ev.unlink()
+        ev.linked=False
 
     def _resort(self):
         oldlist=self._list
@@ -139,28 +142,39 @@
                     ev=self._list
                     logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
                     # We are only allowed to remove ev from list when ev.cond allows
+                    self._unlink(ev)
+                    # We need to release the lock on self._cond before acquire
+                    # one ev.cond to avoid race conditions with self._wait
+                    self._cond.release()
                     with ev.cond:
-                        self._list=ev.next
-                        ev.unlink()
-                        ev.cond.notifyAll()
+                        ev.cond.notify_all()
+                    self._cond.acquire()
+
 
     def _wakeup_callback(self):
         logger.debug("Rescheduling events after wakeup")
         with self._cond:
             self._resort()
 
+    # It is required to have acquired the lock on ev.cond on entry
     def _wait(self, ev):
         with self._cond:
             self._insert(ev)
+            self._cond.notify()
 
-        # This will release the lock on cond, allowing queue manager (scheduler)
-        # thread to notify us if we are already to be released
+        # This will release the lock on cond, allowing the scheduler
+        # thread to notify us if we are ready to be released
         ev.cond.wait()
 
         # If we were woken up by some other event, not the scheduler,
         # ensure the event is removed
-        with self._cond:
-            self._unlink(ev)
+        if ev.linked:
+            # Deal with race conditions wrt. the two different locks
+            # in the scheduler
+            #ev.cond.release()
+            with self._cond:
+                self._unlink(ev)
+            #ev.cond.acquire()
 
     # cond has to be acquired on entry!
     def wait_until(self, when, cond, name=None):

mercurial