scheduler.py

Sun, 28 Jan 2018 11:38:01 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Sun, 28 Jan 2018 11:38:01 +0000
changeset 79
b075b3db3044
parent 78
83b43987e61e
permissions
-rw-r--r--

Cleaned up module organisation to simplify borgend.py and not have to import it in other modules.

#
# Scheduler for Borgend
#
# This module simply provide a way for other threads to until a given time
#

import time
import loggers
import dreamtime
from threading import Condition, Lock, Thread

logger=loggers.get(__name__)

class QueuedEvent:
    def __init__(self, cond, name=None):
        self.next=None
        self.prev=None
        self.name=name
        self.cond=cond

    def __lt__(self, other):
        raise NotImplementedError

    def insert_after(self, ev):
        if not self.next or ev<self.next:
            self.insert_immediately_after(ev)
        else:
            self.next.insert_after(ev)

    def insert_immediately_after(self, ev):
        assert(ev.next is None and ev.prev is None)
        ev.prev=self
        ev.next=self.next
        self.next=ev

    def insert_immediately_before(self, ev):
        assert(ev.next is None and ev.prev is None)
        ev.next=self
        ev.prev=self.prev
        self.prev=ev

    def unlink(self):
        n=self.next
        p=self.prev
        if n:
            n.prev=p
        if p:
            p.next=n
        self.next=None
        self.prev=None

class ScheduledEvent(QueuedEvent):
    #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str)
    def __init__(self, when, cond, name=None):
        super().__init__(cond, name=name)
        self.when=when

    def __lt__(self, other):
        return self.when < other.when

class TerminableThread(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._terminate=False
        self._cond=Condition()

    def terminate(self):
        with self._cond:
            _terminate=True
            self._cond.notify()

class QueueThread(TerminableThread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.daemon = True
        self._list = None

    def _insert(self, ev):
        if not self._list:
            #logger.debug("Insert first")
            self._list=ev
        elif ev<self._list:
            #logger.debug("Insert beginning")
            self._list.insert_immediately_before(ev)
            self._list=ev
        else:
            #logger.debug("Insert after")
            self._list.insert_after(ev)

        self._cond.notify()

    def _unlink(self, ev):
        if ev==self._list:
            self._list=ev.next
        ev.unlink()

    def _resort(self):
        oldlist=self._list
        self._list=None
        while oldlist:
            ev=oldlist
            oldlist=oldlist.next
            ev.unlink()
            self._insert(ev)



class Scheduler(QueueThread):
    # Default to precision of 60 seconds: the scheduler thread will never
    # sleep longer than that, to get quickly back on track with the schedule
    # when the computer wakes up from sleep
    def __init__(self, precision=60):
        self.precision = precision
        self._next_event_time = None
        super().__init__(target = self._scheduler_thread, name = 'Scheduler')
        dreamtime.add_callback(self, self._wakeup_callback)

    def _scheduler_thread(self):
        logger.debug("Scheduler thread started")
        with self._cond:
            while not self._terminate:
                now = time.monotonic()
                if not self._list:
                    timeout = None
                else:
                    # Wait at most precision seconds, or until next event if it
                    # comes earlier
                    timeout=min(self.precision, self._list.when.realtime()-now)

                if not timeout or timeout>0:
                    logger.debug("Scheduler waiting %d seconds" % (timeout or (-1)))
                    self._cond.wait(timeout)
                    now = time.monotonic()

                logger.debug("Scheduler timed out")

                while self._list and self._list.when.monotonic() <= now:
                    ev=self._list
                    logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
                    # We are only allowed to remove ev from list when ev.cond allows
                    with ev.cond:
                        self._list=ev.next
                        ev.unlink()
                        ev.cond.notifyAll()

    def _wakeup_callback(self):
        logger.debug("Rescheduling events after wakeup")
        with self._cond:
            self._resort()

    def _wait(self, ev):
        with self._cond:
            self._insert(ev)

        # This will release the lock on cond, allowing queue manager (scheduler)
        # thread to notify us if we are already to be released
        ev.cond.wait()

        # If we were woken up by some other event, not the scheduler,
        # ensure the event is removed
        with self._cond:
            self._unlink(ev)

    # cond has to be acquired on entry!
    def wait_until(self, when, cond, name=None):
        logger.debug("Scheduling '%s' in %s seconds [%s]" %
                     (name, when.seconds_to(), when.__class__.__name__))
        self._wait(ScheduledEvent(when, cond, name))

mercurial