--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/scheduler.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,170 @@ +# +# Scheduler for Borgend +# +# This module simply provide a way for other threads to until a given time +# + +import time +from threading import Condition, Thread + +from . import loggers +from . import dreamtime + +logger=loggers.get(__name__) + +class QueuedEvent: + def __init__(self, cond, name=None): + self.next=None + self.prev=None + self.name=name + self.cond=cond + + def __lt__(self, other): + raise NotImplementedError + + def insert_after(self, ev): + if not self.next or ev<self.next: + self.insert_immediately_after(ev) + else: + self.next.insert_after(ev) + + def insert_immediately_after(self, ev): + assert(ev.next is None and ev.prev is None) + ev.prev=self + ev.next=self.next + self.next=ev + + def insert_immediately_before(self, ev): + assert(ev.next is None and ev.prev is None) + ev.next=self + ev.prev=self.prev + self.prev=ev + + def unlink(self): + n=self.next + p=self.prev + if n: + n.prev=p + if p: + p.next=n + self.next=None + self.prev=None + +class ScheduledEvent(QueuedEvent): + #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str) + def __init__(self, when, cond, name=None): + super().__init__(cond, name=name) + self.when=when + + def __lt__(self, other): + return self.when < other.when + +class TerminableThread(Thread): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._terminate=False + self._cond=Condition() + + def terminate(self): + with self._cond: + _terminate=True + self._cond.notify() + +class QueueThread(TerminableThread): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.daemon = True + self._list = None + + def _insert(self, ev): + if not self._list: + #logger.debug("Insert first") + self._list=ev + elif ev<self._list: + #logger.debug("Insert beginning") + self._list.insert_immediately_before(ev) + self._list=ev + else: + #logger.debug("Insert after") + self._list.insert_after(ev) + + self._cond.notify() + + def _unlink(self, ev): + if ev==self._list: + self._list=ev.next + ev.unlink() + + def _resort(self): + oldlist=self._list + self._list=None + while oldlist: + ev=oldlist + oldlist=oldlist.next + ev.unlink() + self._insert(ev) + + + +class Scheduler(QueueThread): + # Default to precision of 60 seconds: the scheduler thread will never + # sleep longer than that, to get quickly back on track with the schedule + # when the computer wakes up from sleep + def __init__(self, precision=60): + self.precision = precision + self._next_event_time = None + super().__init__(target = self._scheduler_thread, name = 'Scheduler') + dreamtime.add_callback(self, self._wakeup_callback) + + def _scheduler_thread(self): + logger.debug("Scheduler thread started") + with self._cond: + while not self._terminate: + now = time.monotonic() + if not self._list: + timeout = None + else: + # Wait at most precision seconds, or until next event if it + # comes earlier + timeout=min(self.precision, self._list.when.realtime()-now) + + if not timeout or timeout>0: + logger.debug("Scheduler waiting %d seconds" % (timeout or (-1))) + self._cond.wait(timeout) + now = time.monotonic() + + logger.debug("Scheduler timed out") + + while self._list and self._list.when.monotonic() <= now: + ev=self._list + logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) + # We are only allowed to remove ev from list when ev.cond allows + with ev.cond: + self._list=ev.next + ev.unlink() + ev.cond.notifyAll() + + def _wakeup_callback(self): + logger.debug("Rescheduling events after wakeup") + with self._cond: + self._resort() + + def _wait(self, ev): + with self._cond: + self._insert(ev) + + # This will release the lock on cond, allowing queue manager (scheduler) + # thread to notify us if we are already to be released + ev.cond.wait() + + # If we were woken up by some other event, not the scheduler, + # ensure the event is removed + with self._cond: + self._unlink(ev) + + # cond has to be acquired on entry! + def wait_until(self, when, cond, name=None): + logger.debug("Scheduling '%s' in %s seconds [%s]" % + (name, when.seconds_to(), when.__class__.__name__)) + self._wait(ScheduledEvent(when, cond, name)) +