--- a/borgend/scheduler.py Sun Jan 28 17:54:14 2018 +0000 +++ b/borgend/scheduler.py Sun Jan 28 19:27:34 2018 +0000 @@ -18,6 +18,7 @@ self.prev=None self.name=name self.cond=cond + self.linked=False def __lt__(self, other): raise NotImplementedError @@ -77,6 +78,7 @@ self._list = None def _insert(self, ev): + assert(not ev.linked) if not self._list: #logger.debug("Insert first") self._list=ev @@ -87,13 +89,14 @@ else: #logger.debug("Insert after") self._list.insert_after(ev) - - self._cond.notify() + ev.linked=True def _unlink(self, ev): + assert(ev.linked) if ev==self._list: self._list=ev.next ev.unlink() + ev.linked=False def _resort(self): oldlist=self._list @@ -139,28 +142,39 @@ ev=self._list logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) # We are only allowed to remove ev from list when ev.cond allows + self._unlink(ev) + # We need to release the lock on self._cond before acquire + # one ev.cond to avoid race conditions with self._wait + self._cond.release() with ev.cond: - self._list=ev.next - ev.unlink() - ev.cond.notifyAll() + ev.cond.notify_all() + self._cond.acquire() + def _wakeup_callback(self): logger.debug("Rescheduling events after wakeup") with self._cond: self._resort() + # It is required to have acquired the lock on ev.cond on entry def _wait(self, ev): with self._cond: self._insert(ev) + self._cond.notify() - # This will release the lock on cond, allowing queue manager (scheduler) - # thread to notify us if we are already to be released + # This will release the lock on cond, allowing the scheduler + # thread to notify us if we are ready to be released ev.cond.wait() # If we were woken up by some other event, not the scheduler, # ensure the event is removed - with self._cond: - self._unlink(ev) + if ev.linked: + # Deal with race conditions wrt. the two different locks + # in the scheduler + #ev.cond.release() + with self._cond: + self._unlink(ev) + #ev.cond.acquire() # cond has to be acquired on entry! def wait_until(self, when, cond, name=None):