Sun, 04 Feb 2018 01:36:59 +0000
Changed scheduler busylooping method, and decreased precision to 5 minutes:
if there are finite-horizon events in the queue, the scheduler thread will never
sleep longer than that. This is to quickly get back on track with the schedule
when the computer wakes up from sleep, if the sleep monitor is not working or
is not implemented for the particular operating system. However, if there are
only infinite-horizon events in the queue (meaning, DreamTime-scheduled events,
and the system is sleeping or "sleeping"), the scheduler will also sleep.
Hopefully this will also help the system stay properly asleep.
# # Borgend by Tuomo Valkonen, 2018 # # This file is the scheduler: it provides e a way for other threads to # wait until a given time; which may be "dreamtime" that discounts system # sleep periods # import time import logging import math from threading import Condition, Thread from . import dreamtime logger=logging.getLogger(__name__) class QueuedEvent: def __init__(self, cond, name=None): self.next=None self.prev=None self.name=name self.cond=cond self.linked=False @staticmethod def snapshot(): return None def is_before(self, other, snapshot=None): raise NotImplementedError def __lt__(self, other): return self.is_before(other, self.snapshot()) def insert_after(self, ev, snapshot=None): if not snapshot: snapshot=self.snapshot() if not self.next or ev.is_before(self.next, snapshot): self.insert_immediately_after(ev) else: self.next.insert_after(ev, snapshot) def insert_immediately_after(self, ev): assert(ev.next is None and ev.prev is None) ev.prev=self ev.next=self.next self.next=ev def insert_immediately_before(self, ev): assert(ev.next is None and ev.prev is None) ev.next=self ev.prev=self.prev self.prev=ev def unlink(self): n=self.next p=self.prev if n: n.prev=p if p: p.next=n self.next=None self.prev=None class ScheduledEvent(QueuedEvent): #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str) def __init__(self, when, cond, name=None): super().__init__(cond, name=name) self.when=when @staticmethod def snapshot(): return dreamtime.Snapshot() def is_before(self, other, snapshot=None): if not snapshot: snapshot=self.snapshot() return self.when.horizon(snapshot) < other.when.horizon(snapshot) class TerminableThread(Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._terminate=False self._cond=Condition() def terminate(self): with self._cond: _terminate=True self._cond.notify() class QueueThread(TerminableThread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.daemon = True self._list = None def _insert(self, ev, snapshot=None): assert(not ev.linked) if not self._list: #logger.debug("Insert first") self._list=ev elif ev<self._list: #logger.debug("Insert beginning") self._list.insert_immediately_before(ev) self._list=ev else: #logger.debug("Insert after") if not snapshot: snapshot=ev.snapshot() self._list.insert_after(ev, snapshot) ev.linked=True def _unlink(self, ev): assert(ev.linked) if ev==self._list: self._list=ev.next ev.unlink() ev.linked=False def _resort(self, snapshot=None): oldlist=self._list self._list=None if oldlist and not snapshot: snapshot=oldlist.snapshot() while oldlist: ev=oldlist oldlist=oldlist.next ev.unlink() ev.linked=False self._insert(ev, snapshot) class Scheduler(QueueThread): # Default to precision of 5 minutes: if there are finite-horizon events in # the queue, the scheduler thread will never sleep longer than that. # This is to quickly get back on track with the schedule when the computer # wakes up from sleep, if the sleep monitor is not working or is not # implemented for the particular operating system. However, if there are # only infinite-horizon events in the queue (meaning, DreamTime-scheduled # events, and the system is sleeping or "sleeping"), the scheduler will # also sleep. Hopefully this will also help the system stay properly asleep. def __init__(self, precision=60*5): self.precision = precision self._next_event_time = None super().__init__(target = self._scheduler_thread, name = 'Scheduler') dreamtime.add_callback(self, self._sleepwake_callback) def _scheduler_thread(self): logger.debug("Scheduler thread started") with self._cond: while not self._terminate: snapshot = dreamtime.Snapshot() now = snapshot.monotonic() if not self._list: timeout = None else: delta = self._list.when.horizon(snapshot)-now if delta==math.inf: timeout=None else: timeout = min(self.precision, delta) if not timeout or timeout>0: logger.debug("Scheduler waiting %s seconds" % str(timeout)) self._cond.wait(timeout) snapshot = dreamtime.Snapshot() now = snapshot.monotonic() logger.debug("Scheduler timed out") while self._list and self._list.when.horizon(snapshot) <= now: ev=self._list logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) # We are only allowed to remove ev from list when ev.cond allows self._unlink(ev) # We need to release the lock on self._cond before acquire # one ev.cond to avoid race conditions with self._wait self._cond.release() with ev.cond: ev.cond.notify_all() self._cond.acquire() def _sleepwake_callback(self, woke): logger.debug("Rescheduling events after sleep/wakeup") with self._cond: self._resort() # It is required to have acquired the lock on ev.cond on entry def _wait(self, ev): with self._cond: self._insert(ev) self._cond.notify() # This will release the lock on cond, allowing the scheduler # thread to notify us if we are ready to be released ev.cond.wait() # If we were woken up by some other event, not the scheduler, # ensure the event is removed if ev.linked: # Deal with race conditions wrt. the two different locks # in the scheduler #ev.cond.release() with self._cond: self._unlink(ev) #ev.cond.acquire() # cond has to be acquired on entry! def wait_until(self, when, cond, name=None): logger.debug("Scheduling '%s' in %s seconds [%s]" % (name, when.seconds_to(), when.__class__.__name__)) self._wait(ScheduledEvent(when, cond, name))