borgend/dreamtime.py

Mon, 17 Sep 2018 19:31:22 -0500

author
Tuomo Valkonen <tuomov@iki.fi>
date
Mon, 17 Sep 2018 19:31:22 -0500
changeset 117
b509a4e34d7f
parent 113
6993964140bd
permissions
-rw-r--r--

xdg include fix?

#
# Borgend by Tuomo Valkonen, 2018
#
# This file implements system wake/sleep detection for scheduling adjustments.
#

import platform
import time
import threading
import weakref
import datetime
import logging
import math

from .exprotect import protect_noreturn

logger=logging.getLogger(__name__)

_dreamtime_monitor=None

#
# Support classes for dealing with different times
#

# Time snapshotting to helps to create stable comparisons of different 
# subclasses of Time.
class Snapshot:
    def __init__(self):
        self._monotonic=None
        self._realtime=None
        self._dreamtime=None
        self._sleeping=None

    def monotonic(self):
        if self._monotonic is None:
            self._monotonic=time.monotonic()
        return self._monotonic

    def realtime(self):
        if self._realtime is None:
            self._realtime=time.time()
        return self._realtime

    def dreamtime_sleeping(self):
        if self._dreamtime is None:
            if _dreamtime_monitor:
                self._dreamtime, self._sleeping=_dreamtime_monitor.dreamtime_sleeping(snapshot=self)
            else:
                self._dreamtime, self._sleeping=self.monotonic(), False
        return self._dreamtime, self._sleeping

    def dreamtime(self):
        time, _=self.dreamtime_sleeping()
        return time

    def sleeping(self):
        _, sleeping=self.dreamtime_sleeping()
        return sleeping

# Python's default arguments are purely idiotic (aka. pythonic): generated
# only once. This makes sense in a purely functional language, which Python
# lightyears away from, but severely limits their usefulness in an imperative
# language. Decorators also seem clumsy for this, as one would have to tell
# the number of positional arguments for things to work nice, being able to
# pass the snapshot both positionally and as keyword. No luck.
# So have to do things the old-fashioned hard way.
def ensure_snapshot(snapshot):
    if not snapshot:
        return Snapshot()
    else:
        return snapshot

# The main Time class, for time advancing in various paces
class Time:
    def __init__(self, when):
        self._value=when

    @staticmethod
    def _now(snapshot):
        raise NotImplementedError

    def remaining(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return self._value-self._now(snapshot)

    # Mostly equal to remaining() but can be ∞ (math.inf) for DreamTime.
    def horizon(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return self.remaining(snapshot)

    def realtime(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return snapshot.realtime()+self.remaining(snapshot)

    def monotonic(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return snapshot.monotonic()+self.remaining(snapshot)

    @classmethod
    def now(cls, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return cls(cls._now(snapshot))

    @classmethod
    def from_realtime(cls, realtime, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return cls(realtime-snapshot.realtime()+cls._now(snapshot))

    @classmethod
    def from_monotonic(cls, monotonic, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return cls(monotonic-snapshot.monotonic()+cls._now(snapshot))

    @classmethod
    def after(cls, seconds, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return cls(cls._now(snapshot)+seconds)

    @classmethod
    def after_other(cls, other, seconds, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        if isinstance(other, cls):
            return cls(other._value+seconds)
        else:
            return cls.from_monotonic(other.monotonic(snapshot)+seconds,
                                      snapshot)

    def datetime(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return datetime.datetime.fromtimestamp(self.realtime(snapshot))

    def isoformat(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return self.datetime(snapshot).isoformat()

    def __compare(self, other, fn):
        if isinstance(other, self.__class__):
            return fn(self._value, other._value)
        else:
            snapshot=Snapshot()
            return fn(self.monotonic(snapshot), other.monotonic(snapshot))

    def __lt__(self, other):
        return self.__compare(other, lambda x, y: x < y)

    def __gt__(self, other):
        return self.__compare(other, lambda x, y: x > y)

    def __le__(self, other):
        return self.__compare(other, lambda x, y: x <= y)

    def __ge__(self, other):
        return self.__compare(other, lambda x, y: x >= y)

    def __eq__(self, other):
        return self.__compare(other, lambda x, y: x == y)


class RealTime(Time):
    @staticmethod
    def _now(snapshot):
        return snapshot.realtime()

    def realtime(self, snapshot=None):
        return self._value


class MonotonicTime(Time):
    @staticmethod
    def _now(snapshot):
        return snapshot.monotonic()

    def monotonic(self, snapshot=None):
        return self._value


class DreamTime(Time):
    @staticmethod
    def _now(snapshot):
        return snapshot.dreamtime()

    def horizon(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        if snapshot.sleeping():
            return math.inf
        else:
            return self.remaining(snapshot)



if platform.system()=='Darwin':

    import Foundation
    import AppKit

    def do_callbacks(callbacks, woke):
        for callback in callbacks.values():
            try:
                callback(woke)
            except Exception:
                logger.exception("Error in sleep/wake notification callback")

    #
    # Wake up / sleep handling
    #

    class SleepHandler(Foundation.NSObject):
        """ Handle wake/sleep notifications """

        # We need to use the actual time.time() to monitor sleep, as
        # time.monotonic() many not run during sleep. But time.time()
        # may encounter adjustments, so we also use time.monotonic() to
        # monitor wake periods.

        def init(self):
            self.__sleeptime=None
            self.__dreamtime_last_sleep=0
            self.__monotonic_last_wakeup=time.monotonic()
            self.__slept=0 # Only used to store the statistic
            self.__lock=threading.Lock()
            self.__callbacks=weakref.WeakKeyDictionary()

            return self

        @protect_noreturn
        def handleSleepNotification_(self, aNotification):
            logger.info("System going to sleep")
            with self.__lock:
                self.__sleeptime=time.time()
                now_m=time.monotonic()
                self.__dreamtime_last_sleep=(self.__dreamtime_last_sleep+now_m
                                             -self.__monotonic_last_wakeup)
                #logger.debug("Sleeping; monotonic time now: %f; dreamtime_last_sleep: %f" % (now_m, self.__dreamtime_last_sleep))
                callbacks=self.__callbacks.copy()
            do_callbacks(callbacks, False)

        @protect_noreturn
        def handleWakeNotification_(self, aNotification):
            logger.info("System waking up from sleep")
            with self.__lock:
                if self.__sleeptime:
                    self.__monotonic_last_wakeup=time.monotonic()
                    now=time.time()
                    slept=max(0, now-self.__sleeptime)
                    self.__slept=self.__slept+slept
                    self.__sleeptime=None
                    logger.info("Slept %f seconds", slept)
                    #logger.debug("Slept %f seconds; total: %f; monotonic time now: %f" % (slept, self.__slept, self.__monotonic_last_wakeup))
                callbacks=self.__callbacks.copy()
            do_callbacks(callbacks, True)

        # Return dreamtime
        def dreamtime_sleeping(self, snapshot=Snapshot()):
            with self.__lock:
                # macOS "sleep" signals / status are complete bollocks: at least
                # when plugged in, the system is actually sometimes running all
                # night with the lid closed and sleepNotification delivered.
                # Therefore, we need our timers to work in a sane manner when
                # we should be sleeping!
                if self.__sleeptime is not None:
                    sleeping=True
                    now_dreamtime=self.__dreamtime_last_sleep
                else:
                    sleeping=False
                    now=snapshot.monotonic()
                    now_dreamtime=(self.__dreamtime_last_sleep
                                   +now-self.__monotonic_last_wakeup)
                    #logger.debug("Dreamtime request: last_sleep: %f now: %f; last wakeup: %f => now %f dreamtime "
                    #             % (self.__dreamtime_last_sleep, now, self.__monotonic_last_wakeup, now_dreamtime))
            return now_dreamtime, sleeping

        # Weirdo (Py)ObjC naming to stop it form choking up
        def addForObj_aCallback_(self, obj, callback):
            with self.__lock:
                self.__callbacks[obj]=callback

    # obj is to use a a key in a weak key dictionary
    def add_callback(obj, callback):
        global _dreamtime_monitor

        monitor=_dreamtime_monitor
        if not monitor:
            raise Exception("Dreamtime monitor not started")
        else:
            monitor.addForObj_aCallback_(obj, callback)

    def start_monitoring():
        global _dreamtime_monitor

        logger.debug("Starting to monitor system sleep")
        workspace = AppKit.NSWorkspace.sharedWorkspace()
        notification_center = workspace.notificationCenter()
        _dreamtime_monitor = SleepHandler.new()

        notification_center.addObserver_selector_name_object_(
            _dreamtime_monitor,
            "handleSleepNotification:",
            AppKit.NSWorkspaceWillSleepNotification,
            None)

        notification_center.addObserver_selector_name_object_(
            _dreamtime_monitor,
            "handleWakeNotification:",
            AppKit.NSWorkspaceDidWakeNotification,
            None)

else: # Not on macOS

    def add_callback(obj, callback):
        pass

    def start_monitoring():
        logger.warning(("No system sleep monitor implemented for '%s'"
                       % platform.system()))

mercurial