Mon, 22 Jan 2018 21:07:34 +0000
Generalisation of scheduler thread to general queue threads
repository.py | file | annotate | diff | comparison | revisions | |
scheduler.py | file | annotate | diff | comparison | revisions |
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/repository.py Mon Jan 22 21:07:34 2018 +0000 @@ -0,0 +1,14 @@ +# +# Repository abstraction for queuing +# + +from scheduler import TerminableThread, QueuedEvent + +class Repository(TerminableThread): + def __init__(self) + self.__next_event_time = None + self.__list = None + self._cond = Condition() + self._terminate = False + super().__init__(target = self.__scheduler_thread, name = 'Scheduler') + self.daemon=True
--- a/scheduler.py Mon Jan 22 19:14:47 2018 +0000 +++ b/scheduler.py Mon Jan 22 21:07:34 2018 +0000 @@ -10,19 +10,18 @@ logger=borgend.logger.getChild(__name__) -class ScheduledEvent: - def __init__(self, when, cond, name=None): +class QueuedEvent: + def __init__(self, cond, name=None): self.next=None self.prev=None - self.when=when self.name=name - self.cond=Condition() + self.cond=cond def __lt__(self, other): - return self.when < other.when + return False def __gt__(self, other): - return self.when > other.when + return True def insert_after(self, ev): if not self.next: @@ -49,6 +48,17 @@ if p: p.next=n +class ScheduledEvent(QueuedEvent): + def __init__(self, when, cond, name=None): + super().__init__(cond, name) + self.when=when + + def __lt__(self, other): + return self.when < other.when + + def __gt__(self, other): + return self.when > other.when + class TerminableThread(Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -60,17 +70,54 @@ _terminate=True self._cond.notify() +class QueueThread(TerminableThread): + def __init__(self, *args, **kwargs): + self._list = None + super().__init__(*args, **kwargs) -class Scheduler(TerminableThread): + def _insert(self, ev): + if not self._list: + self._list=ev + elif self._list > ev: + ev.insert_immediately_after(self._list) + self._list=ev + else: + self._list.insert_immediately_after(ev) + + self._cond.notify() + + def _wait(self, ev): + with self._cond: + self._insert(ev) + # This will release the lock on cond, allowing queue manager (scheduler) + # thread to notify us if we are already to be released + ev.cond.wait() + + # If we were woken up by some other event, not the scheduler, + # ensure the event is removed + with self._cond: + if ev==self._list: + self._list=ev.next + ev.unlink() + + def _pop(self): + ev=self._list + logger.info("%s: Found event %s" % self.name(), str(ev.name)) + self._list=ev.next + ev.unlink() + ev.cond.acquire() + ev.cond.notifyAll() + ev.cond.release() + + +class Scheduler(QueueThread): # Default to precision of 60 seconds: the scheduler thread will never # sleep longer than that, to get quickly back on track with the schedule # when the computer wakes up from sleep def __init__(self, precision=60): self.precision = precision - self.__next_event_time = None - self.__list = None - self._cond = Condition() - self._terminate = False + self._next_event_time = None + self._list = None super().__init__(target = self.__scheduler_thread, name = 'Scheduler') self.daemon=True @@ -78,48 +125,21 @@ with self._cond: while not self._terminate: now = time.monotonic() - if not self.__list: + if not self._list: timeout = None else: # Wait at most precision seconds, or until next event if it # comes earlier - timeout=min(self.precision, self.__list.when-now) + timeout=min(self.precision, self._list.when-now) if not timeout or timeout>0: self._cond.wait(timeout) now = time.monotonic() - while self.__list and self.__list.when <= now: - ev=self.__list - logger.info("Found schedulable event %s" % str(ev.name)) - self.__list=ev.next - ev.unlink() - ev.cond.acquire() - ev.cond.notifyAll() - ev.cond.release() + while self._list and self._list.when <= now: + self._pop() # cond has to be acquired on entry! def wait_until(self, when, cond, name=None): - ev=ScheduledEvent(when, cond, name) - with self._cond: - if not self.__list: - self.__list=ev - elif self.__list > ev: - ev.insert_immediately_after(self.__list) - self.__list=ev - else: - self.__list.insert_immediately_after(ev) + self._wait(ScheduledEvent(when, cond, name)) - self._cond.notify() - # This will release the lock on cond, allowing scheduler thread - # to notify us if we are already to be released - cond.wait() - - # If we were woken up by some other event, not the scheduler, - # ensure the event is removed - with self._cond: - if ev==self.__list: - self.__list=ev.next - ev.unlink() - -