Mon, 22 Jan 2018 18:16:51 +0000
Improved scheduler
| 49 | 1 | # |
| 2 | # Scheduler for Borgend | |
| 3 | # | |
| 4 | # This module simply provide a way for other threads to until a given time | |
| 5 | # | |
| 6 | ||
| 7 | import time | |
| 8 | import borgend | |
| 9 | from threading import Condition, Lock, Thread | |
| 10 | ||
| 11 | logger=borgend.logger.getChild(__name__) | |
| 12 | ||
| 13 | class ScheduledEvent: | |
| 14 | def __init__(self, when, cond, name=None): | |
| 15 | self.next=None | |
| 16 | self.prev=None | |
| 17 | self.when=when | |
| 18 | self.name=name | |
| 19 | self.cond=Condition() | |
| 20 | ||
| 21 | def __lt__(self, other): | |
| 22 | return self.when < other.when | |
| 23 | ||
| 24 | def __gt__(self, other): | |
| 25 | return self.when > other.when | |
| 26 | ||
| 27 | def insert_after(self, ev): | |
| 28 | if not self.next: | |
| 29 | ev.prev=self | |
| 30 | self.next=ev | |
| 31 | ev.next=None | |
| 32 | elif self.next>ev: | |
| 33 | self.insert_immediately_after(ev) | |
| 34 | else: | |
| 35 | self.next.insert_after(ev) | |
| 36 | ||
| 37 | def insert_immediately_after(self, ev): | |
| 38 | ev.prev=self | |
| 39 | ev.next=self.next | |
| 40 | if ev.next: | |
| 41 | ev.next.prev=ev | |
| 42 | self.next=ev | |
| 43 | ||
| 44 | def unlink(self): | |
| 45 | n=self.next | |
| 46 | p=self.prev | |
| 47 | if n: | |
| 48 | n.prev=p | |
| 49 | if p: | |
| 50 | p.next=n | |
| 51 | ||
| 52 | class TerminableThread(Thread): | |
| 53 | def __init__(self, *args, **kwargs): | |
| 54 | super().__init__(*args, **kwargs) | |
| 55 | self._terminate=False | |
| 56 | self._cond=Condition() | |
| 57 | ||
| 58 | def terminate(self): | |
| 59 | with self._cond: | |
| 60 | _terminate=True | |
| 61 | self._cond.notify() | |
| 62 | ||
| 63 | ||
| 64 | class Scheduler(TerminableThread): | |
| 65 | # Default to precision of 60 seconds: the scheduler thread will never | |
| 66 | # sleep longer than that, to get quickly back on track with the schedule | |
| 67 | # when the computer wakes up from sleep | |
| 68 | def __init__(self, precision=60): | |
| 69 | self.precision = precision | |
| 70 | self.__next_event_time = None | |
| 71 | self.__list = None | |
| 72 | self._cond = Condition() | |
| 73 | self._terminate = False | |
| 74 | super().__init__(target = self.__scheduler_thread, name = 'Scheduler') | |
| 75 | self.daemon=True | |
| 76 | ||
| 77 | def __scheduler_thread(self): | |
| 78 | with self._cond: | |
| 79 | while not self._terminate: | |
| 80 | now = time.monotonic() | |
| 81 | if not self.__list: | |
| 82 | timeout = None | |
| 83 | else: | |
| 84 | # Wait at most precision seconds, or until next event if it | |
| 85 | # comes earlier | |
| 86 | timeout=min(self.precision, self.__list.when-now) | |
| 87 | ||
| 88 | if not timeout or timeout>0: | |
| 89 | self._cond.wait(timeout) | |
| 90 | now = time.monotonic() | |
| 91 | ||
| 92 | while self.__list and self.__list.when <= now: | |
| 93 | ev=self.__list | |
| 94 | logger.info("Found schedulable event %s" % str(ev.name)) | |
| 95 | self.__list=ev.next | |
| 96 | ev.unlink() | |
| 97 | ev.cond.acquire() | |
| 98 | ev.cond.notifyAll() | |
| 99 | ev.cond.release() | |
| 100 | ||
| 101 | # cond has to be acquired on entry! | |
| 102 | def wait_until(self, when, cond, name=None): | |
| 103 | ev=ScheduledEvent(when, cond, name) | |
| 104 | with self._cond: | |
| 105 | if not self.__list: | |
| 106 | self.__list=ev | |
| 107 | elif self.__list > ev: | |
| 108 | ev.insert_immediately_after(self.__list) | |
| 109 | self.__list=ev | |
| 110 | else: | |
| 111 | self.__list.insert_immediately_after(ev) | |
| 112 | ||
| 113 | self._cond.notify() | |
| 114 | # This will release the lock on cond, allowing scheduler thread | |
| 115 | # to notify us if we are already to be released | |
| 116 | cond.wait() | |
| 117 | ||
| 118 | # If we were woken up by some other event, not the scheduler, | |
| 119 | # ensure the event is removed | |
| 120 | with self._cond: | |
| 121 | if ev==self.__list: | |
| 122 | self.__list=ev.next | |
| 123 | ev.unlink() | |
| 124 | ||
| 125 |