borgend/scheduler.py

changeset 80
a409242121d5
parent 79
b075b3db3044
child 86
2fe66644c50d
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/borgend/scheduler.py	Sun Jan 28 11:54:46 2018 +0000
@@ -0,0 +1,170 @@
+#
+# Scheduler for Borgend
+#
+# This module simply provide a way for other threads to until a given time
+#
+
+import time
+from threading import Condition, Thread
+
+from . import loggers
+from . import dreamtime
+
+logger=loggers.get(__name__)
+
+class QueuedEvent:
+    def __init__(self, cond, name=None):
+        self.next=None
+        self.prev=None
+        self.name=name
+        self.cond=cond
+
+    def __lt__(self, other):
+        raise NotImplementedError
+
+    def insert_after(self, ev):
+        if not self.next or ev<self.next:
+            self.insert_immediately_after(ev)
+        else:
+            self.next.insert_after(ev)
+
+    def insert_immediately_after(self, ev):
+        assert(ev.next is None and ev.prev is None)
+        ev.prev=self
+        ev.next=self.next
+        self.next=ev
+
+    def insert_immediately_before(self, ev):
+        assert(ev.next is None and ev.prev is None)
+        ev.next=self
+        ev.prev=self.prev
+        self.prev=ev
+
+    def unlink(self):
+        n=self.next
+        p=self.prev
+        if n:
+            n.prev=p
+        if p:
+            p.next=n
+        self.next=None
+        self.prev=None
+
+class ScheduledEvent(QueuedEvent):
+    #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str)
+    def __init__(self, when, cond, name=None):
+        super().__init__(cond, name=name)
+        self.when=when
+
+    def __lt__(self, other):
+        return self.when < other.when
+
+class TerminableThread(Thread):
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._terminate=False
+        self._cond=Condition()
+
+    def terminate(self):
+        with self._cond:
+            _terminate=True
+            self._cond.notify()
+
+class QueueThread(TerminableThread):
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.daemon = True
+        self._list = None
+
+    def _insert(self, ev):
+        if not self._list:
+            #logger.debug("Insert first")
+            self._list=ev
+        elif ev<self._list:
+            #logger.debug("Insert beginning")
+            self._list.insert_immediately_before(ev)
+            self._list=ev
+        else:
+            #logger.debug("Insert after")
+            self._list.insert_after(ev)
+
+        self._cond.notify()
+
+    def _unlink(self, ev):
+        if ev==self._list:
+            self._list=ev.next
+        ev.unlink()
+
+    def _resort(self):
+        oldlist=self._list
+        self._list=None
+        while oldlist:
+            ev=oldlist
+            oldlist=oldlist.next
+            ev.unlink()
+            self._insert(ev)
+
+
+
+class Scheduler(QueueThread):
+    # Default to precision of 60 seconds: the scheduler thread will never
+    # sleep longer than that, to get quickly back on track with the schedule
+    # when the computer wakes up from sleep
+    def __init__(self, precision=60):
+        self.precision = precision
+        self._next_event_time = None
+        super().__init__(target = self._scheduler_thread, name = 'Scheduler')
+        dreamtime.add_callback(self, self._wakeup_callback)
+
+    def _scheduler_thread(self):
+        logger.debug("Scheduler thread started")
+        with self._cond:
+            while not self._terminate:
+                now = time.monotonic()
+                if not self._list:
+                    timeout = None
+                else:
+                    # Wait at most precision seconds, or until next event if it
+                    # comes earlier
+                    timeout=min(self.precision, self._list.when.realtime()-now)
+
+                if not timeout or timeout>0:
+                    logger.debug("Scheduler waiting %d seconds" % (timeout or (-1)))
+                    self._cond.wait(timeout)
+                    now = time.monotonic()
+
+                logger.debug("Scheduler timed out")
+
+                while self._list and self._list.when.monotonic() <= now:
+                    ev=self._list
+                    logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
+                    # We are only allowed to remove ev from list when ev.cond allows
+                    with ev.cond:
+                        self._list=ev.next
+                        ev.unlink()
+                        ev.cond.notifyAll()
+
+    def _wakeup_callback(self):
+        logger.debug("Rescheduling events after wakeup")
+        with self._cond:
+            self._resort()
+
+    def _wait(self, ev):
+        with self._cond:
+            self._insert(ev)
+
+        # This will release the lock on cond, allowing queue manager (scheduler)
+        # thread to notify us if we are already to be released
+        ev.cond.wait()
+
+        # If we were woken up by some other event, not the scheduler,
+        # ensure the event is removed
+        with self._cond:
+            self._unlink(ev)
+
+    # cond has to be acquired on entry!
+    def wait_until(self, when, cond, name=None):
+        logger.debug("Scheduling '%s' in %s seconds [%s]" %
+                     (name, when.seconds_to(), when.__class__.__name__))
+        self._wait(ScheduledEvent(when, cond, name))
+

mercurial