--- a/scheduler.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,169 +0,0 @@ -# -# Scheduler for Borgend -# -# This module simply provide a way for other threads to until a given time -# - -import time -import loggers -import dreamtime -from threading import Condition, Lock, Thread - -logger=loggers.get(__name__) - -class QueuedEvent: - def __init__(self, cond, name=None): - self.next=None - self.prev=None - self.name=name - self.cond=cond - - def __lt__(self, other): - raise NotImplementedError - - def insert_after(self, ev): - if not self.next or ev<self.next: - self.insert_immediately_after(ev) - else: - self.next.insert_after(ev) - - def insert_immediately_after(self, ev): - assert(ev.next is None and ev.prev is None) - ev.prev=self - ev.next=self.next - self.next=ev - - def insert_immediately_before(self, ev): - assert(ev.next is None and ev.prev is None) - ev.next=self - ev.prev=self.prev - self.prev=ev - - def unlink(self): - n=self.next - p=self.prev - if n: - n.prev=p - if p: - p.next=n - self.next=None - self.prev=None - -class ScheduledEvent(QueuedEvent): - #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str) - def __init__(self, when, cond, name=None): - super().__init__(cond, name=name) - self.when=when - - def __lt__(self, other): - return self.when < other.when - -class TerminableThread(Thread): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._terminate=False - self._cond=Condition() - - def terminate(self): - with self._cond: - _terminate=True - self._cond.notify() - -class QueueThread(TerminableThread): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.daemon = True - self._list = None - - def _insert(self, ev): - if not self._list: - #logger.debug("Insert first") - self._list=ev - elif ev<self._list: - #logger.debug("Insert beginning") - self._list.insert_immediately_before(ev) - self._list=ev - else: - #logger.debug("Insert after") - self._list.insert_after(ev) - - self._cond.notify() - - def _unlink(self, ev): - if ev==self._list: - self._list=ev.next - ev.unlink() - - def _resort(self): - oldlist=self._list - self._list=None - while oldlist: - ev=oldlist - oldlist=oldlist.next - ev.unlink() - self._insert(ev) - - - -class Scheduler(QueueThread): - # Default to precision of 60 seconds: the scheduler thread will never - # sleep longer than that, to get quickly back on track with the schedule - # when the computer wakes up from sleep - def __init__(self, precision=60): - self.precision = precision - self._next_event_time = None - super().__init__(target = self._scheduler_thread, name = 'Scheduler') - dreamtime.add_callback(self, self._wakeup_callback) - - def _scheduler_thread(self): - logger.debug("Scheduler thread started") - with self._cond: - while not self._terminate: - now = time.monotonic() - if not self._list: - timeout = None - else: - # Wait at most precision seconds, or until next event if it - # comes earlier - timeout=min(self.precision, self._list.when.realtime()-now) - - if not timeout or timeout>0: - logger.debug("Scheduler waiting %d seconds" % (timeout or (-1))) - self._cond.wait(timeout) - now = time.monotonic() - - logger.debug("Scheduler timed out") - - while self._list and self._list.when.monotonic() <= now: - ev=self._list - logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) - # We are only allowed to remove ev from list when ev.cond allows - with ev.cond: - self._list=ev.next - ev.unlink() - ev.cond.notifyAll() - - def _wakeup_callback(self): - logger.debug("Rescheduling events after wakeup") - with self._cond: - self._resort() - - def _wait(self, ev): - with self._cond: - self._insert(ev) - - # This will release the lock on cond, allowing queue manager (scheduler) - # thread to notify us if we are already to be released - ev.cond.wait() - - # If we were woken up by some other event, not the scheduler, - # ensure the event is removed - with self._cond: - self._unlink(ev) - - # cond has to be acquired on entry! - def wait_until(self, when, cond, name=None): - logger.debug("Scheduling '%s' in %s seconds [%s]" % - (name, when.seconds_to(), when.__class__.__name__)) - self._wait(ScheduledEvent(when, cond, name)) -