borgend/scheduler.py

changeset 101
3068b0de12ee
parent 91
f53aa2007a84
child 102
0d43cd568f3c
--- a/borgend/scheduler.py	Sun Feb 04 00:22:20 2018 +0000
+++ b/borgend/scheduler.py	Sun Feb 04 01:27:38 2018 +0000
@@ -22,14 +22,23 @@
         self.cond=cond
         self.linked=False
 
-    def __lt__(self, other):
+    @staticmethod
+    def snapshot():
+        return None
+
+    def is_before(self, other, snapshot=None):
         raise NotImplementedError
 
-    def insert_after(self, ev):
-        if not self.next or ev<self.next:
+    def __lt__(self, other):
+        return self.is_before(other, self.snapshot())
+
+    def insert_after(self, ev, snapshot=None):
+        if not snapshot:
+            snapshot=self.snapshot()
+        if not self.next or ev.is_before(self.next, snapshot):
             self.insert_immediately_after(ev)
         else:
-            self.next.insert_after(ev)
+            self.next.insert_after(ev, snapshot)
 
     def insert_immediately_after(self, ev):
         assert(ev.next is None and ev.prev is None)
@@ -59,8 +68,14 @@
         super().__init__(cond, name=name)
         self.when=when
 
-    def __lt__(self, other):
-        return self.when < other.when
+    @staticmethod
+    def snapshot():
+        return dreamtime.Snapshot()
+
+    def is_before(self, other, snapshot=None):
+        if not snapshot:
+            snapshot=self.snapshot()
+        return self.when.horizon(snapshot) < other.when.horizon(snapshot)
 
 class TerminableThread(Thread):
     def __init__(self, *args, **kwargs):
@@ -79,7 +94,7 @@
         self.daemon = True
         self._list = None
 
-    def _insert(self, ev):
+    def _insert(self, ev, snapshot=None):
         assert(not ev.linked)
         if not self._list:
             #logger.debug("Insert first")
@@ -90,7 +105,9 @@
             self._list=ev
         else:
             #logger.debug("Insert after")
-            self._list.insert_after(ev)
+            if not snapshot:
+                snapshot=ev.snapshot()
+            self._list.insert_after(ev, snapshot)
         ev.linked=True
 
     def _unlink(self, ev):
@@ -100,15 +117,17 @@
         ev.unlink()
         ev.linked=False
 
-    def _resort(self):
+    def _resort(self, snapshot=None):
         oldlist=self._list
         self._list=None
+        if oldlist and not snapshot:
+            snapshot=oldlist.snapshot()
         while oldlist:
             ev=oldlist
             oldlist=oldlist.next
             ev.unlink()
             ev.linked=False
-            self._insert(ev)
+            self._insert(ev, snapshot)
 
 
 
@@ -120,28 +139,31 @@
         self.precision = precision
         self._next_event_time = None
         super().__init__(target = self._scheduler_thread, name = 'Scheduler')
-        dreamtime.add_callback(self, self._wakeup_callback)
+        dreamtime.add_callback(self, self._sleepwake_callback)
 
     def _scheduler_thread(self):
         logger.debug("Scheduler thread started")
         with self._cond:
             while not self._terminate:
-                now = time.monotonic()
+                snapshot = dreamtime.Snapshot()
+                now = snapshot.monotonic()
                 if not self._list:
                     timeout = None
                 else:
                     # Wait at most precision seconds, or until next event if it
                     # comes earlier
-                    timeout=min(self.precision, self._list.when.realtime()-now)
+                    delta = self._list.when.monotonic(snapshot)-now
+                    timeout = min(self.precision, delta)
 
                 if not timeout or timeout>0:
                     logger.debug("Scheduler waiting %s seconds" % str(timeout))
                     self._cond.wait(timeout)
-                    now = time.monotonic()
+                    snapshot = dreamtime.Snapshot()
+                    now = snapshot.monotonic()
 
                 logger.debug("Scheduler timed out")
 
-                while self._list and self._list.when.monotonic() <= now:
+                while self._list and self._list.when.horizon(snapshot) <= now:
                     ev=self._list
                     logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
                     # We are only allowed to remove ev from list when ev.cond allows
@@ -154,8 +176,8 @@
                     self._cond.acquire()
 
 
-    def _wakeup_callback(self):
-        logger.debug("Rescheduling events after wakeup")
+    def _sleepwake_callback(self, woke):
+        logger.debug("Rescheduling events after sleep/wakeup")
         with self._cond:
             self._resort()
 

mercurial