borgend/scheduler.py

Wed, 07 Feb 2018 20:39:01 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Wed, 07 Feb 2018 20:39:01 +0000
changeset 113
6993964140bd
parent 106
a7bdc239ef62
permissions
-rw-r--r--

Time snapshot fixes.
Python's default arguments are purely idiotic (aka. pythonic): generated
only once. This makes sense in a purely functional language, which Python
lightyears away from, but severely limits their usefulness in an imperative
language. Decorators also seem clumsy for this, as one would have to tell
the number of positional arguments for things to work nice, being able to
pass the snapshot both positionally and as keyword. No luck.
So have to do things the old-fashioned hard way.

#
# Borgend by Tuomo Valkonen, 2018
#
# This file is the scheduler: it provides e a way for other threads to
# wait until a given time; which may be "dreamtime" that discounts system
# sleep periods
#

import time
import logging
import math
from threading import Condition, Thread

from . import dreamtime
from .exprotect import protect_noreturn

logger=logging.getLogger(__name__)

class QueuedEvent:
    def __init__(self, cond, name=None):
        self.next=None
        self.prev=None
        self.name=name
        self.cond=cond
        self.linked=False

    @staticmethod
    def snapshot():
        return None

    def is_before(self, other, snapshot=None):
        raise NotImplementedError

    def __lt__(self, other):
        return self.is_before(other, self.snapshot())

    def insert_after(self, ev, snapshot=None):
        if not snapshot:
            snapshot=self.snapshot()
        if not self.next or ev.is_before(self.next, snapshot):
            self.insert_immediately_after(ev)
        else:
            self.next.insert_after(ev, snapshot)

    def insert_immediately_after(self, ev):
        assert(ev.next is None and ev.prev is None)
        ev.prev=self
        ev.next=self.next
        self.next=ev

    def insert_immediately_before(self, ev):
        assert(ev.next is None and ev.prev is None)
        ev.next=self
        ev.prev=self.prev
        self.prev=ev

    def unlink(self):
        n=self.next
        p=self.prev
        if n:
            n.prev=p
        if p:
            p.next=n
        self.next=None
        self.prev=None

class ScheduledEvent(QueuedEvent):
    #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str)
    def __init__(self, when, cond, name=None):
        super().__init__(cond, name=name)
        self.when=when

    @staticmethod
    def snapshot():
        return dreamtime.Snapshot()

    def is_before(self, other, snapshot=None):
        if not snapshot:
            snapshot=self.snapshot()
        return self.when.horizon(snapshot) < other.when.horizon(snapshot)

class TerminableThread(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._terminate=False
        self._cond=Condition()

    def terminate(self):
        with self._cond:
            _terminate=True
            self._cond.notify()

class QueueThread(TerminableThread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.daemon = True
        self._list = None

    def _insert(self, ev, snapshot=None):
        assert(not ev.linked)
        if not self._list:
            #logger.debug("Insert first")
            self._list=ev
        elif ev<self._list:
            #logger.debug("Insert beginning")
            self._list.insert_immediately_before(ev)
            self._list=ev
        else:
            #logger.debug("Insert after")
            if not snapshot:
                snapshot=ev.snapshot()
            self._list.insert_after(ev, snapshot)
        ev.linked=True

    def _unlink(self, ev):
        assert(ev.linked)
        if ev==self._list:
            self._list=ev.next
        ev.unlink()
        ev.linked=False

    def _resort(self, snapshot=None):
        oldlist=self._list
        self._list=None
        if oldlist and not snapshot:
            snapshot=oldlist.snapshot()
        while oldlist:
            ev=oldlist
            oldlist=oldlist.next
            ev.unlink()
            ev.linked=False
            self._insert(ev, snapshot)



class Scheduler(QueueThread):
    # Default to precision of 5 minutes: if there are finite-horizon events in
    # the queue, the scheduler thread will never sleep longer than that.
    # This is to quickly get back on track with the schedule when the computer
    # wakes up from sleep, if the sleep monitor is not working or is not
    # implemented for the particular operating system. However, if there are
    # only infinite-horizon events in the queue (meaning, DreamTime-scheduled
    # events, and the system is sleeping or "sleeping"), the scheduler will
    # also sleep. Hopefully this will also help the system stay properly asleep.
    def __init__(self, precision=60*5):
        self.precision = precision
        self._next_event_time = None
        super().__init__(target = self._scheduler_thread, name = 'Scheduler')
        dreamtime.add_callback(self, self._sleepwake_callback)

    @protect_noreturn
    def _scheduler_thread(self):
        logger.debug("Scheduler thread started")
        with self._cond:
            while not self._terminate:
                snapshot = dreamtime.Snapshot()
                if not self._list:
                    timeout = None
                    delta = math.inf
                    nextname = None
                else:
                    nextname=self._list.name
                    delta = self._list.when.horizon(snapshot)
                    if delta==math.inf:
                        timeout=None
                    else:
                        timeout = min(self.precision, delta)

                if not timeout or timeout>0:
                    logger.debug("Scheduler waiting %s seconds [next event '%s' in %0.2f seconds]"
                                 % (str(timeout), nextname, delta))
                    self._cond.wait(timeout)
                    snapshot = dreamtime.Snapshot()
                    logger.debug("Scheduler timed out")

                while self._list and self._list.when.horizon(snapshot) <= 0:
                    ev=self._list
                    logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
                    # We are only allowed to remove ev from list when ev.cond allows
                    self._unlink(ev)
                    # We need to release the lock on self._cond before acquire
                    # one ev.cond to avoid race conditions with self._wait
                    self._cond.release()
                    with ev.cond:
                        ev.cond.notify_all()
                    self._cond.acquire()

    @protect_noreturn
    def _sleepwake_callback(self, woke):
        logger.debug("Rescheduling events after sleep/wakeup")
        with self._cond:
            self._resort()
            self._cond.notify()

    # It is required to have acquired the lock on ev.cond on entry
    def _wait(self, ev):
        with self._cond:
            self._insert(ev)
            self._cond.notify()

        # This will release the lock on cond, allowing the scheduler
        # thread to notify us if we are ready to be released
        ev.cond.wait()

        # If we were woken up by some other event, not the scheduler,
        # ensure the event is removed
        if ev.linked:
            # Deal with race conditions wrt. the two different locks
            # in the scheduler
            #ev.cond.release()
            with self._cond:
                self._unlink(ev)
            #ev.cond.acquire()

    # cond has to be acquired on entry!
    def wait_until(self, when, cond, name=None):
        logger.info("Scheduling '%s' in %0.01f seconds / on %s [%s]" %
                    (name, when.remaining(), when.isoformat(),
                     when.__class__.__name__))
        self._wait(ScheduledEvent(when, cond, name))

mercurial