Generalisation of scheduler thread to general queue threads

Mon, 22 Jan 2018 21:07:34 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Mon, 22 Jan 2018 21:07:34 +0000
changeset 53
442c558bd632
parent 52
aef860e99323
child 54
cfcaa5f6ba33

Generalisation of scheduler thread to general queue threads

repository.py file | annotate | diff | comparison | revisions
scheduler.py file | annotate | diff | comparison | revisions
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/repository.py	Mon Jan 22 21:07:34 2018 +0000
@@ -0,0 +1,14 @@
+#
+# Repository abstraction for queuing
+#
+
+from scheduler import TerminableThread, QueuedEvent
+
+class Repository(TerminableThread):
+    def __init__(self)
+        self.__next_event_time = None
+        self.__list = None
+        self._cond = Condition()
+        self._terminate = False
+        super().__init__(target = self.__scheduler_thread, name = 'Scheduler')
+        self.daemon=True
--- a/scheduler.py	Mon Jan 22 19:14:47 2018 +0000
+++ b/scheduler.py	Mon Jan 22 21:07:34 2018 +0000
@@ -10,19 +10,18 @@
 
 logger=borgend.logger.getChild(__name__)
 
-class ScheduledEvent:
-    def __init__(self, when, cond, name=None):
+class QueuedEvent:
+    def __init__(self, cond, name=None):
         self.next=None
         self.prev=None
-        self.when=when
         self.name=name
-        self.cond=Condition()
+        self.cond=cond
 
     def __lt__(self, other):
-        return self.when < other.when
+        return False
 
     def __gt__(self, other):
-        return self.when > other.when
+        return True
 
     def insert_after(self, ev):
         if not self.next:
@@ -49,6 +48,17 @@
         if p:
             p.next=n
 
+class ScheduledEvent(QueuedEvent):
+    def __init__(self, when, cond, name=None):
+        super().__init__(cond, name)
+        self.when=when
+
+    def __lt__(self, other):
+        return self.when < other.when
+
+    def __gt__(self, other):
+        return self.when > other.when
+
 class TerminableThread(Thread):
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
@@ -60,17 +70,54 @@
             _terminate=True
             self._cond.notify()
 
+class QueueThread(TerminableThread):
+    def __init__(self, *args, **kwargs):
+        self._list = None
+        super().__init__(*args, **kwargs)
 
-class Scheduler(TerminableThread):
+    def _insert(self, ev):
+        if not self._list:
+            self._list=ev
+        elif self._list > ev:
+            ev.insert_immediately_after(self._list)
+            self._list=ev
+        else:
+            self._list.insert_immediately_after(ev)
+
+        self._cond.notify()
+
+    def _wait(self, ev):
+        with self._cond:
+            self._insert(ev)
+        # This will release the lock on cond, allowing queue manager (scheduler)
+        # thread to notify us if we are already to be released
+        ev.cond.wait()
+
+        # If we were woken up by some other event, not the scheduler,
+        # ensure the event is removed
+        with self._cond:
+            if ev==self._list:
+                self._list=ev.next
+            ev.unlink()
+
+    def _pop(self):
+        ev=self._list
+        logger.info("%s: Found event %s" % self.name(), str(ev.name))
+        self._list=ev.next
+        ev.unlink()
+        ev.cond.acquire()
+        ev.cond.notifyAll()
+        ev.cond.release()
+
+
+class Scheduler(QueueThread):
     # Default to precision of 60 seconds: the scheduler thread will never
     # sleep longer than that, to get quickly back on track with the schedule
     # when the computer wakes up from sleep
     def __init__(self, precision=60):
         self.precision = precision
-        self.__next_event_time = None
-        self.__list = None
-        self._cond = Condition()
-        self._terminate = False
+        self._next_event_time = None
+        self._list = None
         super().__init__(target = self.__scheduler_thread, name = 'Scheduler')
         self.daemon=True
 
@@ -78,48 +125,21 @@
         with self._cond:
             while not self._terminate:
                 now = time.monotonic()
-                if not self.__list:
+                if not self._list:
                     timeout = None
                 else:
                     # Wait at most precision seconds, or until next event if it
                     # comes earlier
-                    timeout=min(self.precision, self.__list.when-now)
+                    timeout=min(self.precision, self._list.when-now)
 
                 if not timeout or timeout>0:
                     self._cond.wait(timeout)
                     now = time.monotonic()
 
-                while self.__list and self.__list.when <= now:
-                    ev=self.__list
-                    logger.info("Found schedulable event %s" % str(ev.name))
-                    self.__list=ev.next
-                    ev.unlink()
-                    ev.cond.acquire()
-                    ev.cond.notifyAll()
-                    ev.cond.release()
+                while self._list and self._list.when <= now:
+                    self._pop()
 
     # cond has to be acquired on entry!
     def wait_until(self, when, cond, name=None):
-        ev=ScheduledEvent(when, cond, name)
-        with self._cond:
-            if not self.__list:
-                self.__list=ev
-            elif self.__list > ev:
-                ev.insert_immediately_after(self.__list)
-                self.__list=ev
-            else:
-                self.__list.insert_immediately_after(ev)
+        self._wait(ScheduledEvent(when, cond, name))
 
-            self._cond.notify()
-        # This will release the lock on cond, allowing scheduler thread
-        # to notify us if we are already to be released
-        cond.wait()
-
-        # If we were woken up by some other event, not the scheduler,
-        # ensure the event is removed
-        with self._cond:
-            if ev==self.__list:
-                self.__list=ev.next
-            ev.unlink()
-
-

mercurial