borgend/scheduler.py

Mon, 17 Sep 2018 19:31:22 -0500

author
Tuomo Valkonen <tuomov@iki.fi>
date
Mon, 17 Sep 2018 19:31:22 -0500
changeset 117
b509a4e34d7f
parent 113
6993964140bd
permissions
-rw-r--r--

xdg include fix?

#
# Borgend by Tuomo Valkonen, 2018
#
# This file is the scheduler: it provides e a way for other threads to
# wait until a given time; which may be "dreamtime" that discounts system
# sleep periods
#

import time
import logging
import math
from threading import Condition, Thread

from . import dreamtime
from .exprotect import protect_noreturn

logger=logging.getLogger(__name__)

class QueuedEvent:
    def __init__(self, cond, name=None):
        self.next=None
        self.prev=None
        self.name=name
        self.cond=cond
        self.linked=False

    @staticmethod
    def snapshot():
        return None

    def is_before(self, other, snapshot=None):
        raise NotImplementedError

    def __lt__(self, other):
        return self.is_before(other, self.snapshot())

    def insert_after(self, ev, snapshot=None):
        if not snapshot:
            snapshot=self.snapshot()
        if not self.next or ev.is_before(self.next, snapshot):
            self.insert_immediately_after(ev)
        else:
            self.next.insert_after(ev, snapshot)

    def insert_immediately_after(self, ev):
        assert(ev.next is None and ev.prev is None)
        ev.prev=self
        ev.next=self.next
        self.next=ev

    def insert_immediately_before(self, ev):
        assert(ev.next is None and ev.prev is None)
        ev.next=self
        ev.prev=self.prev
        self.prev=ev

    def unlink(self):
        n=self.next
        p=self.prev
        if n:
            n.prev=p
        if p:
            p.next=n
        self.next=None
        self.prev=None

class ScheduledEvent(QueuedEvent):
    #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str)
    def __init__(self, when, cond, name=None):
        super().__init__(cond, name=name)
        self.when=when

    @staticmethod
    def snapshot():
        return dreamtime.Snapshot()

    def is_before(self, other, snapshot=None):
        if not snapshot:
            snapshot=self.snapshot()
        return self.when.horizon(snapshot) < other.when.horizon(snapshot)

class TerminableThread(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._terminate=False
        self._cond=Condition()

    def terminate(self):
        with self._cond:
            _terminate=True
            self._cond.notify()

class QueueThread(TerminableThread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.daemon = True
        self._list = None

    def _insert(self, ev, snapshot=None):
        assert(not ev.linked)
        if not self._list:
            #logger.debug("Insert first")
            self._list=ev
        elif ev<self._list:
            #logger.debug("Insert beginning")
            self._list.insert_immediately_before(ev)
            self._list=ev
        else:
            #logger.debug("Insert after")
            if not snapshot:
                snapshot=ev.snapshot()
            self._list.insert_after(ev, snapshot)
        ev.linked=True

    def _unlink(self, ev):
        assert(ev.linked)
        if ev==self._list:
            self._list=ev.next
        ev.unlink()
        ev.linked=False

    def _resort(self, snapshot=None):
        oldlist=self._list
        self._list=None
        if oldlist and not snapshot:
            snapshot=oldlist.snapshot()
        while oldlist:
            ev=oldlist
            oldlist=oldlist.next
            ev.unlink()
            ev.linked=False
            self._insert(ev, snapshot)



class Scheduler(QueueThread):
    # Default to precision of 5 minutes: if there are finite-horizon events in
    # the queue, the scheduler thread will never sleep longer than that.
    # This is to quickly get back on track with the schedule when the computer
    # wakes up from sleep, if the sleep monitor is not working or is not
    # implemented for the particular operating system. However, if there are
    # only infinite-horizon events in the queue (meaning, DreamTime-scheduled
    # events, and the system is sleeping or "sleeping"), the scheduler will
    # also sleep. Hopefully this will also help the system stay properly asleep.
    def __init__(self, precision=60*5):
        self.precision = precision
        self._next_event_time = None
        super().__init__(target = self._scheduler_thread, name = 'Scheduler')
        dreamtime.add_callback(self, self._sleepwake_callback)

    @protect_noreturn
    def _scheduler_thread(self):
        logger.debug("Scheduler thread started")
        with self._cond:
            while not self._terminate:
                snapshot = dreamtime.Snapshot()
                if not self._list:
                    timeout = None
                    delta = math.inf
                    nextname = None
                else:
                    nextname=self._list.name
                    delta = self._list.when.horizon(snapshot)
                    if delta==math.inf:
                        timeout=None
                    else:
                        timeout = min(self.precision, delta)

                if not timeout or timeout>0:
                    logger.debug("Scheduler waiting %s seconds [next event '%s' in %0.2f seconds]"
                                 % (str(timeout), nextname, delta))
                    self._cond.wait(timeout)
                    snapshot = dreamtime.Snapshot()
                    logger.debug("Scheduler timed out")

                while self._list and self._list.when.horizon(snapshot) <= 0:
                    ev=self._list
                    logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
                    # We are only allowed to remove ev from list when ev.cond allows
                    self._unlink(ev)
                    # We need to release the lock on self._cond before acquire
                    # one ev.cond to avoid race conditions with self._wait
                    self._cond.release()
                    with ev.cond:
                        ev.cond.notify_all()
                    self._cond.acquire()

    @protect_noreturn
    def _sleepwake_callback(self, woke):
        logger.debug("Rescheduling events after sleep/wakeup")
        with self._cond:
            self._resort()
            self._cond.notify()

    # It is required to have acquired the lock on ev.cond on entry
    def _wait(self, ev):
        with self._cond:
            self._insert(ev)
            self._cond.notify()

        # This will release the lock on cond, allowing the scheduler
        # thread to notify us if we are ready to be released
        ev.cond.wait()

        # If we were woken up by some other event, not the scheduler,
        # ensure the event is removed
        if ev.linked:
            # Deal with race conditions wrt. the two different locks
            # in the scheduler
            #ev.cond.release()
            with self._cond:
                self._unlink(ev)
            #ev.cond.acquire()

    # cond has to be acquired on entry!
    def wait_until(self, when, cond, name=None):
        logger.info("Scheduling '%s' in %0.01f seconds / on %s [%s]" %
                    (name, when.remaining(), when.isoformat(),
                     when.__class__.__name__))
        self._wait(ScheduledEvent(when, cond, name))

mercurial