--- a/borgend/scheduler.py Sun Feb 04 00:22:20 2018 +0000 +++ b/borgend/scheduler.py Sun Feb 04 01:27:38 2018 +0000 @@ -22,14 +22,23 @@ self.cond=cond self.linked=False - def __lt__(self, other): + @staticmethod + def snapshot(): + return None + + def is_before(self, other, snapshot=None): raise NotImplementedError - def insert_after(self, ev): - if not self.next or ev<self.next: + def __lt__(self, other): + return self.is_before(other, self.snapshot()) + + def insert_after(self, ev, snapshot=None): + if not snapshot: + snapshot=self.snapshot() + if not self.next or ev.is_before(self.next, snapshot): self.insert_immediately_after(ev) else: - self.next.insert_after(ev) + self.next.insert_after(ev, snapshot) def insert_immediately_after(self, ev): assert(ev.next is None and ev.prev is None) @@ -59,8 +68,14 @@ super().__init__(cond, name=name) self.when=when - def __lt__(self, other): - return self.when < other.when + @staticmethod + def snapshot(): + return dreamtime.Snapshot() + + def is_before(self, other, snapshot=None): + if not snapshot: + snapshot=self.snapshot() + return self.when.horizon(snapshot) < other.when.horizon(snapshot) class TerminableThread(Thread): def __init__(self, *args, **kwargs): @@ -79,7 +94,7 @@ self.daemon = True self._list = None - def _insert(self, ev): + def _insert(self, ev, snapshot=None): assert(not ev.linked) if not self._list: #logger.debug("Insert first") @@ -90,7 +105,9 @@ self._list=ev else: #logger.debug("Insert after") - self._list.insert_after(ev) + if not snapshot: + snapshot=ev.snapshot() + self._list.insert_after(ev, snapshot) ev.linked=True def _unlink(self, ev): @@ -100,15 +117,17 @@ ev.unlink() ev.linked=False - def _resort(self): + def _resort(self, snapshot=None): oldlist=self._list self._list=None + if oldlist and not snapshot: + snapshot=oldlist.snapshot() while oldlist: ev=oldlist oldlist=oldlist.next ev.unlink() ev.linked=False - self._insert(ev) + self._insert(ev, snapshot) @@ -120,28 +139,31 @@ self.precision = precision self._next_event_time = None super().__init__(target = self._scheduler_thread, name = 'Scheduler') - dreamtime.add_callback(self, self._wakeup_callback) + dreamtime.add_callback(self, self._sleepwake_callback) def _scheduler_thread(self): logger.debug("Scheduler thread started") with self._cond: while not self._terminate: - now = time.monotonic() + snapshot = dreamtime.Snapshot() + now = snapshot.monotonic() if not self._list: timeout = None else: # Wait at most precision seconds, or until next event if it # comes earlier - timeout=min(self.precision, self._list.when.realtime()-now) + delta = self._list.when.monotonic(snapshot)-now + timeout = min(self.precision, delta) if not timeout or timeout>0: logger.debug("Scheduler waiting %s seconds" % str(timeout)) self._cond.wait(timeout) - now = time.monotonic() + snapshot = dreamtime.Snapshot() + now = snapshot.monotonic() logger.debug("Scheduler timed out") - while self._list and self._list.when.monotonic() <= now: + while self._list and self._list.when.horizon(snapshot) <= now: ev=self._list logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) # We are only allowed to remove ev from list when ev.cond allows @@ -154,8 +176,8 @@ self._cond.acquire() - def _wakeup_callback(self): - logger.debug("Rescheduling events after wakeup") + def _sleepwake_callback(self, woke): + logger.debug("Rescheduling events after sleep/wakeup") with self._cond: self._resort()