scheduler.py

Mon, 22 Jan 2018 21:07:34 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Mon, 22 Jan 2018 21:07:34 +0000
changeset 53
442c558bd632
parent 49
db33dfa64ad6
child 54
cfcaa5f6ba33
permissions
-rw-r--r--

Generalisation of scheduler thread to general queue threads

#
# Scheduler for Borgend
#
# This module simply provide a way for other threads to until a given time
#

import time
import borgend
from threading import Condition, Lock, Thread

logger=borgend.logger.getChild(__name__)

class QueuedEvent:
    def __init__(self, cond, name=None):
        self.next=None
        self.prev=None
        self.name=name
        self.cond=cond

    def __lt__(self, other):
        return False

    def __gt__(self, other):
        return True

    def insert_after(self, ev):
        if not self.next:
            ev.prev=self
            self.next=ev
            ev.next=None
        elif self.next>ev:
            self.insert_immediately_after(ev)
        else:
            self.next.insert_after(ev)

    def insert_immediately_after(self, ev):
        ev.prev=self
        ev.next=self.next
        if ev.next:
            ev.next.prev=ev
        self.next=ev

    def unlink(self):
        n=self.next
        p=self.prev
        if n:
            n.prev=p
        if p:
            p.next=n

class ScheduledEvent(QueuedEvent):
    def __init__(self, when, cond, name=None):
        super().__init__(cond, name)
        self.when=when

    def __lt__(self, other):
        return self.when < other.when

    def __gt__(self, other):
        return self.when > other.when

class TerminableThread(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._terminate=False
        self._cond=Condition()

    def terminate(self):
        with self._cond:
            _terminate=True
            self._cond.notify()

class QueueThread(TerminableThread):
    def __init__(self, *args, **kwargs):
        self._list = None
        super().__init__(*args, **kwargs)

    def _insert(self, ev):
        if not self._list:
            self._list=ev
        elif self._list > ev:
            ev.insert_immediately_after(self._list)
            self._list=ev
        else:
            self._list.insert_immediately_after(ev)

        self._cond.notify()

    def _wait(self, ev):
        with self._cond:
            self._insert(ev)
        # This will release the lock on cond, allowing queue manager (scheduler)
        # thread to notify us if we are already to be released
        ev.cond.wait()

        # If we were woken up by some other event, not the scheduler,
        # ensure the event is removed
        with self._cond:
            if ev==self._list:
                self._list=ev.next
            ev.unlink()

    def _pop(self):
        ev=self._list
        logger.info("%s: Found event %s" % self.name(), str(ev.name))
        self._list=ev.next
        ev.unlink()
        ev.cond.acquire()
        ev.cond.notifyAll()
        ev.cond.release()


class Scheduler(QueueThread):
    # Default to precision of 60 seconds: the scheduler thread will never
    # sleep longer than that, to get quickly back on track with the schedule
    # when the computer wakes up from sleep
    def __init__(self, precision=60):
        self.precision = precision
        self._next_event_time = None
        self._list = None
        super().__init__(target = self.__scheduler_thread, name = 'Scheduler')
        self.daemon=True

    def __scheduler_thread(self):
        with self._cond:
            while not self._terminate:
                now = time.monotonic()
                if not self._list:
                    timeout = None
                else:
                    # Wait at most precision seconds, or until next event if it
                    # comes earlier
                    timeout=min(self.precision, self._list.when-now)

                if not timeout or timeout>0:
                    self._cond.wait(timeout)
                    now = time.monotonic()

                while self._list and self._list.when <= now:
                    self._pop()

    # cond has to be acquired on entry!
    def wait_until(self, when, cond, name=None):
        self._wait(ScheduledEvent(when, cond, name))

mercurial