Mon, 22 Jan 2018 18:16:51 +0000
Improved scheduler
# # Scheduler for Borgend # # This module simply provide a way for other threads to until a given time # import time import borgend from threading import Condition, Lock, Thread logger=borgend.logger.getChild(__name__) class ScheduledEvent: def __init__(self, when, cond, name=None): self.next=None self.prev=None self.when=when self.name=name self.cond=Condition() def __lt__(self, other): return self.when < other.when def __gt__(self, other): return self.when > other.when def insert_after(self, ev): if not self.next: ev.prev=self self.next=ev ev.next=None elif self.next>ev: self.insert_immediately_after(ev) else: self.next.insert_after(ev) def insert_immediately_after(self, ev): ev.prev=self ev.next=self.next if ev.next: ev.next.prev=ev self.next=ev def unlink(self): n=self.next p=self.prev if n: n.prev=p if p: p.next=n class TerminableThread(Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._terminate=False self._cond=Condition() def terminate(self): with self._cond: _terminate=True self._cond.notify() class Scheduler(TerminableThread): # Default to precision of 60 seconds: the scheduler thread will never # sleep longer than that, to get quickly back on track with the schedule # when the computer wakes up from sleep def __init__(self, precision=60): self.precision = precision self.__next_event_time = None self.__list = None self._cond = Condition() self._terminate = False super().__init__(target = self.__scheduler_thread, name = 'Scheduler') self.daemon=True def __scheduler_thread(self): with self._cond: while not self._terminate: now = time.monotonic() if not self.__list: timeout = None else: # Wait at most precision seconds, or until next event if it # comes earlier timeout=min(self.precision, self.__list.when-now) if not timeout or timeout>0: self._cond.wait(timeout) now = time.monotonic() while self.__list and self.__list.when <= now: ev=self.__list logger.info("Found schedulable event %s" % str(ev.name)) self.__list=ev.next ev.unlink() ev.cond.acquire() ev.cond.notifyAll() ev.cond.release() # cond has to be acquired on entry! def wait_until(self, when, cond, name=None): ev=ScheduledEvent(when, cond, name) with self._cond: if not self.__list: self.__list=ev elif self.__list > ev: ev.insert_immediately_after(self.__list) self.__list=ev else: self.__list.insert_immediately_after(ev) self._cond.notify() # This will release the lock on cond, allowing scheduler thread # to notify us if we are already to be released cond.wait() # If we were woken up by some other event, not the scheduler, # ensure the event is removed with self._cond: if ev==self.__list: self.__list=ev.next ev.unlink()