--- a/scheduler.py Mon Jan 22 21:07:34 2018 +0000 +++ b/scheduler.py Mon Jan 22 22:23:01 2018 +0000 @@ -18,17 +18,14 @@ self.cond=cond def __lt__(self, other): - return False - - def __gt__(self, other): - return True + raise NotImplementedError def insert_after(self, ev): if not self.next: ev.prev=self self.next=ev ev.next=None - elif self.next>ev: + elif ev<self.next: self.insert_immediately_after(ev) else: self.next.insert_after(ev) @@ -50,15 +47,12 @@ class ScheduledEvent(QueuedEvent): def __init__(self, when, cond, name=None): - super().__init__(cond, name) + super().__init__(cond, name=name) self.when=when def __lt__(self, other): return self.when < other.when - def __gt__(self, other): - return self.when > other.when - class TerminableThread(Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -72,13 +66,14 @@ class QueueThread(TerminableThread): def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.daemon = True self._list = None - super().__init__(*args, **kwargs) def _insert(self, ev): if not self._list: self._list=ev - elif self._list > ev: + elif ev<self._list: ev.insert_immediately_after(self._list) self._list=ev else: @@ -86,28 +81,10 @@ self._cond.notify() - def _wait(self, ev): - with self._cond: - self._insert(ev) - # This will release the lock on cond, allowing queue manager (scheduler) - # thread to notify us if we are already to be released - ev.cond.wait() - - # If we were woken up by some other event, not the scheduler, - # ensure the event is removed - with self._cond: - if ev==self._list: - self._list=ev.next - ev.unlink() - - def _pop(self): - ev=self._list - logger.info("%s: Found event %s" % self.name(), str(ev.name)) - self._list=ev.next + def _unlink(self, ev): + if ev==self._list: + self._list=ev.next ev.unlink() - ev.cond.acquire() - ev.cond.notifyAll() - ev.cond.release() class Scheduler(QueueThread): @@ -117,11 +94,9 @@ def __init__(self, precision=60): self.precision = precision self._next_event_time = None - self._list = None - super().__init__(target = self.__scheduler_thread, name = 'Scheduler') - self.daemon=True + super().__init__(target = self._scheduler_thread, name = 'Scheduler') - def __scheduler_thread(self): + def _scheduler_thread(self): with self._cond: while not self._terminate: now = time.monotonic() @@ -137,7 +112,26 @@ now = time.monotonic() while self._list and self._list.when <= now: - self._pop() + ev=self._list + logger.info("%s: Scheduling event %s" % self.name(), str(ev.name)) + # We are only allowed to remove ev from list when ev.cond allows + with ev.cond: + self._list=ev.next + ev.unlink() + ev.cond.notifyAll() + + def _wait(self, ev): + with self._cond: + self._insert(ev) + + # This will release the lock on cond, allowing queue manager (scheduler) + # thread to notify us if we are already to be released + ev.cond.wait() + + # If we were woken up by some other event, not the scheduler, + # ensure the event is removed + with self._cond: + self._unlink(ev) # cond has to be acquired on entry! def wait_until(self, when, cond, name=None):