scheduler.py

Mon, 22 Jan 2018 19:14:47 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Mon, 22 Jan 2018 19:14:47 +0000
changeset 52
aef860e99323
parent 49
db33dfa64ad6
child 53
442c558bd632
permissions
-rw-r--r--

Have to clean up logger handlers in same cases to avoid duplicate messages

#
# Scheduler for Borgend
#
# This module simply provide a way for other threads to until a given time
#

import time
import borgend
from threading import Condition, Lock, Thread

logger=borgend.logger.getChild(__name__)

class ScheduledEvent:
    def __init__(self, when, cond, name=None):
        self.next=None
        self.prev=None
        self.when=when
        self.name=name
        self.cond=Condition()

    def __lt__(self, other):
        return self.when < other.when

    def __gt__(self, other):
        return self.when > other.when

    def insert_after(self, ev):
        if not self.next:
            ev.prev=self
            self.next=ev
            ev.next=None
        elif self.next>ev:
            self.insert_immediately_after(ev)
        else:
            self.next.insert_after(ev)

    def insert_immediately_after(self, ev):
        ev.prev=self
        ev.next=self.next
        if ev.next:
            ev.next.prev=ev
        self.next=ev

    def unlink(self):
        n=self.next
        p=self.prev
        if n:
            n.prev=p
        if p:
            p.next=n

class TerminableThread(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._terminate=False
        self._cond=Condition()

    def terminate(self):
        with self._cond:
            _terminate=True
            self._cond.notify()


class Scheduler(TerminableThread):
    # Default to precision of 60 seconds: the scheduler thread will never
    # sleep longer than that, to get quickly back on track with the schedule
    # when the computer wakes up from sleep
    def __init__(self, precision=60):
        self.precision = precision
        self.__next_event_time = None
        self.__list = None
        self._cond = Condition()
        self._terminate = False
        super().__init__(target = self.__scheduler_thread, name = 'Scheduler')
        self.daemon=True

    def __scheduler_thread(self):
        with self._cond:
            while not self._terminate:
                now = time.monotonic()
                if not self.__list:
                    timeout = None
                else:
                    # Wait at most precision seconds, or until next event if it
                    # comes earlier
                    timeout=min(self.precision, self.__list.when-now)

                if not timeout or timeout>0:
                    self._cond.wait(timeout)
                    now = time.monotonic()

                while self.__list and self.__list.when <= now:
                    ev=self.__list
                    logger.info("Found schedulable event %s" % str(ev.name))
                    self.__list=ev.next
                    ev.unlink()
                    ev.cond.acquire()
                    ev.cond.notifyAll()
                    ev.cond.release()

    # cond has to be acquired on entry!
    def wait_until(self, when, cond, name=None):
        ev=ScheduledEvent(when, cond, name)
        with self._cond:
            if not self.__list:
                self.__list=ev
            elif self.__list > ev:
                ev.insert_immediately_after(self.__list)
                self.__list=ev
            else:
                self.__list.insert_immediately_after(ev)

            self._cond.notify()
        # This will release the lock on cond, allowing scheduler thread
        # to notify us if we are already to be released
        cond.wait()

        # If we were woken up by some other event, not the scheduler,
        # ensure the event is removed
        with self._cond:
            if ev==self.__list:
                self.__list=ev.next
            ev.unlink()

mercurial