scheduler.py

Wed, 24 Jan 2018 09:19:42 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Wed, 24 Jan 2018 09:19:42 +0000
changeset 60
10bd7e3906d9
parent 59
8d0a815022cc
child 69
8705e296c7a0
permissions
-rw-r--r--

py2app fails on several dependencies (xdg, pyyaml) that are satisfied
(pip reports installed), so remove them; need to be installed manually

#
# Scheduler for Borgend
#
# This module simply provide a way for other threads to until a given time
#

import time
import borgend
from threading import Condition, Lock, Thread

logger=borgend.logger.getChild(__name__)

class QueuedEvent:
    def __init__(self, cond, name=None):
        self.next=None
        self.prev=None
        self.name=name
        self.cond=cond

    def __lt__(self, other):
        raise NotImplementedError

    def insert_after(self, ev):
        if not self.next:
            ev.prev=self
            self.next=ev
            ev.next=None
        elif ev<self.next:
            self.insert_immediately_after(ev)
        else:
            self.next.insert_after(ev)

    def insert_immediately_after(self, ev):
        ev.prev=self
        ev.next=self.next
        if ev.next:
            ev.next.prev=ev
        self.next=ev

    def unlink(self):
        n=self.next
        p=self.prev
        if n:
            n.prev=p
        if p:
            p.next=n

class ScheduledEvent(QueuedEvent):
    def __init__(self, when, cond, name=None):
        super().__init__(cond, name=name)
        self.when=when

    def __lt__(self, other):
        return self.when < other.when

class TerminableThread(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._terminate=False
        self._cond=Condition()

    def terminate(self):
        with self._cond:
            _terminate=True
            self._cond.notify()

class QueueThread(TerminableThread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.daemon = True
        self._list = None

    def _insert(self, ev):
        if not self._list:
            self._list=ev
        elif ev<self._list:
            ev.insert_immediately_after(self._list)
            self._list=ev
        else:
            self._list.insert_after(ev)

        self._cond.notify()

    def _unlink(self, ev):
        if ev==self._list:
            self._list=ev.next
        ev.unlink()


class Scheduler(QueueThread):
    # Default to precision of 60 seconds: the scheduler thread will never
    # sleep longer than that, to get quickly back on track with the schedule
    # when the computer wakes up from sleep
    def __init__(self, precision=60):
        self.precision = precision
        self._next_event_time = None
        super().__init__(target = self._scheduler_thread, name = 'Scheduler')

    def _scheduler_thread(self):
        with self._cond:
            while not self._terminate:
                now = time.monotonic()
                if not self._list:
                    timeout = None
                else:
                    # Wait at most precision seconds, or until next event if it
                    # comes earlier
                    timeout=min(self.precision, self._list.when-now)

                if not timeout or timeout>0:
                    self._cond.wait(timeout)
                    now = time.monotonic()

                while self._list and self._list.when <= now:
                    ev=self._list
                    logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
                    # We are only allowed to remove ev from list when ev.cond allows
                    with ev.cond:
                        self._list=ev.next
                        ev.unlink()
                        ev.cond.notifyAll()

    def _wait(self, ev):
        with self._cond:
            self._insert(ev)

        # This will release the lock on cond, allowing queue manager (scheduler)
        # thread to notify us if we are already to be released
        ev.cond.wait()

        # If we were woken up by some other event, not the scheduler,
        # ensure the event is removed
        with self._cond:
            self._unlink(ev)

    # cond has to be acquired on entry!
    def wait_until(self, when, cond, name=None):
        self._wait(ScheduledEvent(when, cond, name))

mercurial