scheduler.py

changeset 54
cfcaa5f6ba33
parent 53
442c558bd632
child 55
407af23d16bb
--- a/scheduler.py	Mon Jan 22 21:07:34 2018 +0000
+++ b/scheduler.py	Mon Jan 22 22:23:01 2018 +0000
@@ -18,17 +18,14 @@
         self.cond=cond
 
     def __lt__(self, other):
-        return False
-
-    def __gt__(self, other):
-        return True
+        raise NotImplementedError
 
     def insert_after(self, ev):
         if not self.next:
             ev.prev=self
             self.next=ev
             ev.next=None
-        elif self.next>ev:
+        elif ev<self.next:
             self.insert_immediately_after(ev)
         else:
             self.next.insert_after(ev)
@@ -50,15 +47,12 @@
 
 class ScheduledEvent(QueuedEvent):
     def __init__(self, when, cond, name=None):
-        super().__init__(cond, name)
+        super().__init__(cond, name=name)
         self.when=when
 
     def __lt__(self, other):
         return self.when < other.when
 
-    def __gt__(self, other):
-        return self.when > other.when
-
 class TerminableThread(Thread):
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
@@ -72,13 +66,14 @@
 
 class QueueThread(TerminableThread):
     def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.daemon = True
         self._list = None
-        super().__init__(*args, **kwargs)
 
     def _insert(self, ev):
         if not self._list:
             self._list=ev
-        elif self._list > ev:
+        elif ev<self._list:
             ev.insert_immediately_after(self._list)
             self._list=ev
         else:
@@ -86,28 +81,10 @@
 
         self._cond.notify()
 
-    def _wait(self, ev):
-        with self._cond:
-            self._insert(ev)
-        # This will release the lock on cond, allowing queue manager (scheduler)
-        # thread to notify us if we are already to be released
-        ev.cond.wait()
-
-        # If we were woken up by some other event, not the scheduler,
-        # ensure the event is removed
-        with self._cond:
-            if ev==self._list:
-                self._list=ev.next
-            ev.unlink()
-
-    def _pop(self):
-        ev=self._list
-        logger.info("%s: Found event %s" % self.name(), str(ev.name))
-        self._list=ev.next
+    def _unlink(self, ev):
+        if ev==self._list:
+            self._list=ev.next
         ev.unlink()
-        ev.cond.acquire()
-        ev.cond.notifyAll()
-        ev.cond.release()
 
 
 class Scheduler(QueueThread):
@@ -117,11 +94,9 @@
     def __init__(self, precision=60):
         self.precision = precision
         self._next_event_time = None
-        self._list = None
-        super().__init__(target = self.__scheduler_thread, name = 'Scheduler')
-        self.daemon=True
+        super().__init__(target = self._scheduler_thread, name = 'Scheduler')
 
-    def __scheduler_thread(self):
+    def _scheduler_thread(self):
         with self._cond:
             while not self._terminate:
                 now = time.monotonic()
@@ -137,7 +112,26 @@
                     now = time.monotonic()
 
                 while self._list and self._list.when <= now:
-                    self._pop()
+                    ev=self._list
+                    logger.info("%s: Scheduling event %s" % self.name(), str(ev.name))
+                    # We are only allowed to remove ev from list when ev.cond allows
+                    with ev.cond:
+                        self._list=ev.next
+                        ev.unlink()
+                        ev.cond.notifyAll()
+
+    def _wait(self, ev):
+        with self._cond:
+            self._insert(ev)
+
+        # This will release the lock on cond, allowing queue manager (scheduler)
+        # thread to notify us if we are already to be released
+        ev.cond.wait()
+
+        # If we were woken up by some other event, not the scheduler,
+        # ensure the event is removed
+        with self._cond:
+            self._unlink(ev)
 
     # cond has to be acquired on entry!
     def wait_until(self, when, cond, name=None):

mercurial