scheduler.py

changeset 49
db33dfa64ad6
child 53
442c558bd632
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scheduler.py	Mon Jan 22 18:16:51 2018 +0000
@@ -0,0 +1,125 @@
+#
+# Scheduler for Borgend
+#
+# This module simply provide a way for other threads to until a given time
+#
+
+import time
+import borgend
+from threading import Condition, Lock, Thread
+
+logger=borgend.logger.getChild(__name__)
+
+class ScheduledEvent:
+    def __init__(self, when, cond, name=None):
+        self.next=None
+        self.prev=None
+        self.when=when
+        self.name=name
+        self.cond=Condition()
+
+    def __lt__(self, other):
+        return self.when < other.when
+
+    def __gt__(self, other):
+        return self.when > other.when
+
+    def insert_after(self, ev):
+        if not self.next:
+            ev.prev=self
+            self.next=ev
+            ev.next=None
+        elif self.next>ev:
+            self.insert_immediately_after(ev)
+        else:
+            self.next.insert_after(ev)
+
+    def insert_immediately_after(self, ev):
+        ev.prev=self
+        ev.next=self.next
+        if ev.next:
+            ev.next.prev=ev
+        self.next=ev
+
+    def unlink(self):
+        n=self.next
+        p=self.prev
+        if n:
+            n.prev=p
+        if p:
+            p.next=n
+
+class TerminableThread(Thread):
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._terminate=False
+        self._cond=Condition()
+
+    def terminate(self):
+        with self._cond:
+            _terminate=True
+            self._cond.notify()
+
+
+class Scheduler(TerminableThread):
+    # Default to precision of 60 seconds: the scheduler thread will never
+    # sleep longer than that, to get quickly back on track with the schedule
+    # when the computer wakes up from sleep
+    def __init__(self, precision=60):
+        self.precision = precision
+        self.__next_event_time = None
+        self.__list = None
+        self._cond = Condition()
+        self._terminate = False
+        super().__init__(target = self.__scheduler_thread, name = 'Scheduler')
+        self.daemon=True
+
+    def __scheduler_thread(self):
+        with self._cond:
+            while not self._terminate:
+                now = time.monotonic()
+                if not self.__list:
+                    timeout = None
+                else:
+                    # Wait at most precision seconds, or until next event if it
+                    # comes earlier
+                    timeout=min(self.precision, self.__list.when-now)
+
+                if not timeout or timeout>0:
+                    self._cond.wait(timeout)
+                    now = time.monotonic()
+
+                while self.__list and self.__list.when <= now:
+                    ev=self.__list
+                    logger.info("Found schedulable event %s" % str(ev.name))
+                    self.__list=ev.next
+                    ev.unlink()
+                    ev.cond.acquire()
+                    ev.cond.notifyAll()
+                    ev.cond.release()
+
+    # cond has to be acquired on entry!
+    def wait_until(self, when, cond, name=None):
+        ev=ScheduledEvent(when, cond, name)
+        with self._cond:
+            if not self.__list:
+                self.__list=ev
+            elif self.__list > ev:
+                ev.insert_immediately_after(self.__list)
+                self.__list=ev
+            else:
+                self.__list.insert_immediately_after(ev)
+
+            self._cond.notify()
+        # This will release the lock on cond, allowing scheduler thread
+        # to notify us if we are already to be released
+        cond.wait()
+
+        # If we were woken up by some other event, not the scheduler,
+        # ensure the event is removed
+        with self._cond:
+            if ev==self.__list:
+                self.__list=ev.next
+            ev.unlink()
+
+

mercurial