scheduler.py

Sat, 27 Jan 2018 12:19:39 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Sat, 27 Jan 2018 12:19:39 +0000
changeset 75
2a44b9649212
parent 69
8705e296c7a0
child 76
4b08fca3ce34
permissions
-rw-r--r--

UI refresh fix; added debug messages

#
# Scheduler for Borgend
#
# This module simply provide a way for other threads to until a given time
#

import time
import borgend
from threading import Condition, Lock, Thread

logger=borgend.logger.getChild(__name__)

class QueuedEvent:
    def __init__(self, cond, name=None):
        self.next=None
        self.prev=None
        self.name=name
        self.cond=cond

    def __lt__(self, other):
        raise NotImplementedError

    def insert_after(self, ev):
        if not self.next or ev<self.next:
            self.insert_immediately_after(ev)
        else:
            self.next.insert_after(ev)

    def insert_immediately_after(self, ev):
        assert(ev.next is None and ev.prev is None)
        ev.prev=self
        ev.next=self.next
        self.next=ev

    def insert_immediately_before(self, ev):
        assert(ev.next is None and ev.prev is None)
        ev.next=self
        ev.prev=self.prev
        self.prev=ev

    def unlink(self):
        n=self.next
        p=self.prev
        if n:
            n.prev=p
        if p:
            p.next=n

class ScheduledEvent(QueuedEvent):
    def __init__(self, when, cond, name=None):
        super().__init__(cond, name=name)
        self.when=when

    def __lt__(self, other):
        return self.when < other.when

class TerminableThread(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._terminate=False
        self._cond=Condition()

    def terminate(self):
        with self._cond:
            _terminate=True
            self._cond.notify()

class QueueThread(TerminableThread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.daemon = True
        self._list = None

    def _insert(self, ev):
        if not self._list:
            #logger.debug("Insert first")
            self._list=ev
        elif ev<self._list:
            #logger.debug("Insert beginning")
            self._list.insert_immediately_before(ev)
            self._list=ev
        else:
            #logger.debug("Insert after")
            self._list.insert_after(ev)

        self._cond.notify()

    def _unlink(self, ev):
        if ev==self._list:
            self._list=ev.next
        ev.unlink()


class Scheduler(QueueThread):
    # Default to precision of 60 seconds: the scheduler thread will never
    # sleep longer than that, to get quickly back on track with the schedule
    # when the computer wakes up from sleep
    def __init__(self, precision=60):
        self.precision = precision
        self._next_event_time = None
        super().__init__(target = self._scheduler_thread, name = 'Scheduler')

    def _scheduler_thread(self):
        logger.debug("Scheduler thread started")
        with self._cond:
            while not self._terminate:
                now = time.monotonic()
                if not self._list:
                    timeout = None
                else:
                    # Wait at most precision seconds, or until next event if it
                    # comes earlier
                    timeout=min(self.precision, self._list.when-now)

                if not timeout or timeout>0:
                    logger.debug("Scheduler waiting %d seconds" % (timeout or (-1)))
                    self._cond.wait(timeout)
                    now = time.monotonic()

                logger.debug("Scheduler timed out")

                while self._list and self._list.when <= now:
                    ev=self._list
                    logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
                    # We are only allowed to remove ev from list when ev.cond allows
                    with ev.cond:
                        self._list=ev.next
                        ev.unlink()
                        ev.cond.notifyAll()

    def _wait(self, ev):
        with self._cond:
            self._insert(ev)

        # This will release the lock on cond, allowing queue manager (scheduler)
        # thread to notify us if we are already to be released
        ev.cond.wait()

        # If we were woken up by some other event, not the scheduler,
        # ensure the event is removed
        with self._cond:
            self._unlink(ev)

    # cond has to be acquired on entry!
    def wait_until(self, when, cond, name=None):
        logger.debug("Scheduling '%s' at %d" % (name, when))
        self._wait(ScheduledEvent(when, cond, name))

mercurial