Sun, 28 Jan 2018 11:38:01 +0000
Cleaned up module organisation to simplify borgend.py and not have to import it in other modules.
# # Scheduler for Borgend # # This module simply provide a way for other threads to until a given time # import time import loggers import dreamtime from threading import Condition, Lock, Thread logger=loggers.get(__name__) class QueuedEvent: def __init__(self, cond, name=None): self.next=None self.prev=None self.name=name self.cond=cond def __lt__(self, other): raise NotImplementedError def insert_after(self, ev): if not self.next or ev<self.next: self.insert_immediately_after(ev) else: self.next.insert_after(ev) def insert_immediately_after(self, ev): assert(ev.next is None and ev.prev is None) ev.prev=self ev.next=self.next self.next=ev def insert_immediately_before(self, ev): assert(ev.next is None and ev.prev is None) ev.next=self ev.prev=self.prev self.prev=ev def unlink(self): n=self.next p=self.prev if n: n.prev=p if p: p.next=n self.next=None self.prev=None class ScheduledEvent(QueuedEvent): #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str) def __init__(self, when, cond, name=None): super().__init__(cond, name=name) self.when=when def __lt__(self, other): return self.when < other.when class TerminableThread(Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._terminate=False self._cond=Condition() def terminate(self): with self._cond: _terminate=True self._cond.notify() class QueueThread(TerminableThread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.daemon = True self._list = None def _insert(self, ev): if not self._list: #logger.debug("Insert first") self._list=ev elif ev<self._list: #logger.debug("Insert beginning") self._list.insert_immediately_before(ev) self._list=ev else: #logger.debug("Insert after") self._list.insert_after(ev) self._cond.notify() def _unlink(self, ev): if ev==self._list: self._list=ev.next ev.unlink() def _resort(self): oldlist=self._list self._list=None while oldlist: ev=oldlist oldlist=oldlist.next ev.unlink() self._insert(ev) class Scheduler(QueueThread): # Default to precision of 60 seconds: the scheduler thread will never # sleep longer than that, to get quickly back on track with the schedule # when the computer wakes up from sleep def __init__(self, precision=60): self.precision = precision self._next_event_time = None super().__init__(target = self._scheduler_thread, name = 'Scheduler') dreamtime.add_callback(self, self._wakeup_callback) def _scheduler_thread(self): logger.debug("Scheduler thread started") with self._cond: while not self._terminate: now = time.monotonic() if not self._list: timeout = None else: # Wait at most precision seconds, or until next event if it # comes earlier timeout=min(self.precision, self._list.when.realtime()-now) if not timeout or timeout>0: logger.debug("Scheduler waiting %d seconds" % (timeout or (-1))) self._cond.wait(timeout) now = time.monotonic() logger.debug("Scheduler timed out") while self._list and self._list.when.monotonic() <= now: ev=self._list logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) # We are only allowed to remove ev from list when ev.cond allows with ev.cond: self._list=ev.next ev.unlink() ev.cond.notifyAll() def _wakeup_callback(self): logger.debug("Rescheduling events after wakeup") with self._cond: self._resort() def _wait(self, ev): with self._cond: self._insert(ev) # This will release the lock on cond, allowing queue manager (scheduler) # thread to notify us if we are already to be released ev.cond.wait() # If we were woken up by some other event, not the scheduler, # ensure the event is removed with self._cond: self._unlink(ev) # cond has to be acquired on entry! def wait_until(self, when, cond, name=None): logger.debug("Scheduling '%s' in %s seconds [%s]" % (name, when.seconds_to(), when.__class__.__name__)) self._wait(ScheduledEvent(when, cond, name))