borgend/scheduler.py

Sun, 04 Feb 2018 09:38:55 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Sun, 04 Feb 2018 09:38:55 +0000
changeset 104
d33e2d7dbeb1
parent 103
32f2154ef25e
child 106
a7bdc239ef62
permissions
-rw-r--r--

Added some debug log messages

#
# Borgend by Tuomo Valkonen, 2018
#
# This file is the scheduler: it provides e a way for other threads to
# wait until a given time; which may be "dreamtime" that discounts system
# sleep periods
#

import time
import logging
import math
from threading import Condition, Thread

from . import dreamtime

logger=logging.getLogger(__name__)

class QueuedEvent:
    def __init__(self, cond, name=None):
        self.next=None
        self.prev=None
        self.name=name
        self.cond=cond
        self.linked=False

    @staticmethod
    def snapshot():
        return None

    def is_before(self, other, snapshot=None):
        raise NotImplementedError

    def __lt__(self, other):
        return self.is_before(other, self.snapshot())

    def insert_after(self, ev, snapshot=None):
        if not snapshot:
            snapshot=self.snapshot()
        if not self.next or ev.is_before(self.next, snapshot):
            self.insert_immediately_after(ev)
        else:
            self.next.insert_after(ev, snapshot)

    def insert_immediately_after(self, ev):
        assert(ev.next is None and ev.prev is None)
        ev.prev=self
        ev.next=self.next
        self.next=ev

    def insert_immediately_before(self, ev):
        assert(ev.next is None and ev.prev is None)
        ev.next=self
        ev.prev=self.prev
        self.prev=ev

    def unlink(self):
        n=self.next
        p=self.prev
        if n:
            n.prev=p
        if p:
            p.next=n
        self.next=None
        self.prev=None

class ScheduledEvent(QueuedEvent):
    #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str)
    def __init__(self, when, cond, name=None):
        super().__init__(cond, name=name)
        self.when=when

    @staticmethod
    def snapshot():
        return dreamtime.Snapshot()

    def is_before(self, other, snapshot=None):
        if not snapshot:
            snapshot=self.snapshot()
        return self.when.horizon(snapshot) < other.when.horizon(snapshot)

class TerminableThread(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._terminate=False
        self._cond=Condition()

    def terminate(self):
        with self._cond:
            _terminate=True
            self._cond.notify()

class QueueThread(TerminableThread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.daemon = True
        self._list = None

    def _insert(self, ev, snapshot=None):
        assert(not ev.linked)
        if not self._list:
            #logger.debug("Insert first")
            self._list=ev
        elif ev<self._list:
            #logger.debug("Insert beginning")
            self._list.insert_immediately_before(ev)
            self._list=ev
        else:
            #logger.debug("Insert after")
            if not snapshot:
                snapshot=ev.snapshot()
            self._list.insert_after(ev, snapshot)
        ev.linked=True

    def _unlink(self, ev):
        assert(ev.linked)
        if ev==self._list:
            self._list=ev.next
        ev.unlink()
        ev.linked=False

    def _resort(self, snapshot=None):
        oldlist=self._list
        self._list=None
        if oldlist and not snapshot:
            snapshot=oldlist.snapshot()
        while oldlist:
            ev=oldlist
            oldlist=oldlist.next
            ev.unlink()
            ev.linked=False
            self._insert(ev, snapshot)



class Scheduler(QueueThread):
    # Default to precision of 5 minutes: if there are finite-horizon events in
    # the queue, the scheduler thread will never sleep longer than that.
    # This is to quickly get back on track with the schedule when the computer
    # wakes up from sleep, if the sleep monitor is not working or is not
    # implemented for the particular operating system. However, if there are
    # only infinite-horizon events in the queue (meaning, DreamTime-scheduled
    # events, and the system is sleeping or "sleeping"), the scheduler will
    # also sleep. Hopefully this will also help the system stay properly asleep.
    def __init__(self, precision=60*5):
        self.precision = precision
        self._next_event_time = None
        super().__init__(target = self._scheduler_thread, name = 'Scheduler')
        dreamtime.add_callback(self, self._sleepwake_callback)

    def _scheduler_thread(self):
        logger.debug("Scheduler thread started")
        with self._cond:
            while not self._terminate:
                try:
                    self.__schedule_one()
                except:
                    logger.exception("Bug in scheduler")

    def __schedule_one(self):
        snapshot = dreamtime.Snapshot()
        now = snapshot.monotonic()
        if not self._list:
            timeout = None
            delta = None
            nextname = None
        else:
            nextname=self._list.name
            delta = self._list.when.horizon(snapshot)-now
            if delta==math.inf:
                timeout=None
            else:
                timeout = min(self.precision, delta)

        if not timeout or timeout>0:
            logger.debug("Scheduler waiting %s seconds [next event '%s' in %s seconds]"
                         % (str(timeout), nextname, str(delta)))
            self._cond.wait(timeout)
            snapshot = dreamtime.Snapshot()
            now = snapshot.monotonic()
            logger.debug("Scheduler timed out")

        while self._list and self._list.when.horizon(snapshot) <= now:
            ev=self._list
            logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
            # We are only allowed to remove ev from list when ev.cond allows
            self._unlink(ev)
            # We need to release the lock on self._cond before acquire
            # one ev.cond to avoid race conditions with self._wait
            self._cond.release()
            with ev.cond:
                ev.cond.notify_all()
            self._cond.acquire()


    def _sleepwake_callback(self, woke):
        logger.debug("Rescheduling events after sleep/wakeup")
        with self._cond:
            self._resort()
            self._cond.notify()

    # It is required to have acquired the lock on ev.cond on entry
    def _wait(self, ev):
        with self._cond:
            self._insert(ev)
            self._cond.notify()

        # This will release the lock on cond, allowing the scheduler
        # thread to notify us if we are ready to be released
        ev.cond.wait()

        # If we were woken up by some other event, not the scheduler,
        # ensure the event is removed
        if ev.linked:
            # Deal with race conditions wrt. the two different locks
            # in the scheduler
            #ev.cond.release()
            with self._cond:
                self._unlink(ev)
            #ev.cond.acquire()

    # cond has to be acquired on entry!
    def wait_until(self, when, cond, name=None):
        logger.info("Scheduling '%s' in %0.01f seconds / on %s [%s]" %
                    (name, when.seconds_to(), when.isoformat(),
                     when.__class__.__name__))
        self._wait(ScheduledEvent(when, cond, name))

mercurial