borgend/dreamtime.py

Wed, 07 Feb 2018 20:39:01 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Wed, 07 Feb 2018 20:39:01 +0000
changeset 113
6993964140bd
parent 111
c3bc27cf5ece
permissions
-rw-r--r--

Time snapshot fixes.
Python's default arguments are purely idiotic (aka. pythonic): generated
only once. This makes sense in a purely functional language, which Python
lightyears away from, but severely limits their usefulness in an imperative
language. Decorators also seem clumsy for this, as one would have to tell
the number of positional arguments for things to work nice, being able to
pass the snapshot both positionally and as keyword. No luck.
So have to do things the old-fashioned hard way.

#
# Borgend by Tuomo Valkonen, 2018
#
# This file implements system wake/sleep detection for scheduling adjustments.
#

import platform
import time
import threading
import weakref
import datetime
import logging
import math

from .exprotect import protect_noreturn

logger=logging.getLogger(__name__)

_dreamtime_monitor=None

#
# Support classes for dealing with different times
#

# Time snapshotting to helps to create stable comparisons of different 
# subclasses of Time.
class Snapshot:
    def __init__(self):
        self._monotonic=None
        self._realtime=None
        self._dreamtime=None
        self._sleeping=None

    def monotonic(self):
        if self._monotonic is None:
            self._monotonic=time.monotonic()
        return self._monotonic

    def realtime(self):
        if self._realtime is None:
            self._realtime=time.time()
        return self._realtime

    def dreamtime_sleeping(self):
        if self._dreamtime is None:
            if _dreamtime_monitor:
                self._dreamtime, self._sleeping=_dreamtime_monitor.dreamtime_sleeping(snapshot=self)
            else:
                self._dreamtime, self._sleeping=self.monotonic(), False
        return self._dreamtime, self._sleeping

    def dreamtime(self):
        time, _=self.dreamtime_sleeping()
        return time

    def sleeping(self):
        _, sleeping=self.dreamtime_sleeping()
        return sleeping

# Python's default arguments are purely idiotic (aka. pythonic): generated
# only once. This makes sense in a purely functional language, which Python
# lightyears away from, but severely limits their usefulness in an imperative
# language. Decorators also seem clumsy for this, as one would have to tell
# the number of positional arguments for things to work nice, being able to
# pass the snapshot both positionally and as keyword. No luck.
# So have to do things the old-fashioned hard way.
def ensure_snapshot(snapshot):
    if not snapshot:
        return Snapshot()
    else:
        return snapshot

# The main Time class, for time advancing in various paces
class Time:
    def __init__(self, when):
        self._value=when

    @staticmethod
    def _now(snapshot):
        raise NotImplementedError

    def remaining(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return self._value-self._now(snapshot)

    # Mostly equal to remaining() but can be ∞ (math.inf) for DreamTime.
    def horizon(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return self.remaining(snapshot)

    def realtime(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return snapshot.realtime()+self.remaining(snapshot)

    def monotonic(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return snapshot.monotonic()+self.remaining(snapshot)

    @classmethod
    def now(cls, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return cls(cls._now(snapshot))

    @classmethod
    def from_realtime(cls, realtime, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return cls(realtime-snapshot.realtime()+cls._now(snapshot))

    @classmethod
    def from_monotonic(cls, monotonic, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return cls(monotonic-snapshot.monotonic()+cls._now(snapshot))

    @classmethod
    def after(cls, seconds, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return cls(cls._now(snapshot)+seconds)

    @classmethod
    def after_other(cls, other, seconds, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        if isinstance(other, cls):
            return cls(other._value+seconds)
        else:
            return cls.from_monotonic(other.monotonic(snapshot)+seconds,
                                      snapshot)

    def datetime(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return datetime.datetime.fromtimestamp(self.realtime(snapshot))

    def isoformat(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        return self.datetime(snapshot).isoformat()

    def __compare(self, other, fn):
        if isinstance(other, self.__class__):
            return fn(self._value, other._value)
        else:
            snapshot=Snapshot()
            return fn(self.monotonic(snapshot), other.monotonic(snapshot))

    def __lt__(self, other):
        return self.__compare(other, lambda x, y: x < y)

    def __gt__(self, other):
        return self.__compare(other, lambda x, y: x > y)

    def __le__(self, other):
        return self.__compare(other, lambda x, y: x <= y)

    def __ge__(self, other):
        return self.__compare(other, lambda x, y: x >= y)

    def __eq__(self, other):
        return self.__compare(other, lambda x, y: x == y)


class RealTime(Time):
    @staticmethod
    def _now(snapshot):
        return snapshot.realtime()

    def realtime(self, snapshot=None):
        return self._value


class MonotonicTime(Time):
    @staticmethod
    def _now(snapshot):
        return snapshot.monotonic()

    def monotonic(self, snapshot=None):
        return self._value


class DreamTime(Time):
    @staticmethod
    def _now(snapshot):
        return snapshot.dreamtime()

    def horizon(self, snapshot=None):
        snapshot=ensure_snapshot(snapshot)
        if snapshot.sleeping():
            return math.inf
        else:
            return self.remaining(snapshot)



if platform.system()=='Darwin':

    import Foundation
    import AppKit

    def do_callbacks(callbacks, woke):
        for callback in callbacks.values():
            try:
                callback(woke)
            except Exception:
                logger.exception("Error in sleep/wake notification callback")

    #
    # Wake up / sleep handling
    #

    class SleepHandler(Foundation.NSObject):
        """ Handle wake/sleep notifications """

        # We need to use the actual time.time() to monitor sleep, as
        # time.monotonic() many not run during sleep. But time.time()
        # may encounter adjustments, so we also use time.monotonic() to
        # monitor wake periods.

        def init(self):
            self.__sleeptime=None
            self.__dreamtime_last_sleep=0
            self.__monotonic_last_wakeup=time.monotonic()
            self.__slept=0 # Only used to store the statistic
            self.__lock=threading.Lock()
            self.__callbacks=weakref.WeakKeyDictionary()

            return self

        @protect_noreturn
        def handleSleepNotification_(self, aNotification):
            logger.info("System going to sleep")
            with self.__lock:
                self.__sleeptime=time.time()
                now_m=time.monotonic()
                self.__dreamtime_last_sleep=(self.__dreamtime_last_sleep+now_m
                                             -self.__monotonic_last_wakeup)
                #logger.debug("Sleeping; monotonic time now: %f; dreamtime_last_sleep: %f" % (now_m, self.__dreamtime_last_sleep))
                callbacks=self.__callbacks.copy()
            do_callbacks(callbacks, False)

        @protect_noreturn
        def handleWakeNotification_(self, aNotification):
            logger.info("System waking up from sleep")
            with self.__lock:
                if self.__sleeptime:
                    self.__monotonic_last_wakeup=time.monotonic()
                    now=time.time()
                    slept=max(0, now-self.__sleeptime)
                    self.__slept=self.__slept+slept
                    self.__sleeptime=None
                    logger.info("Slept %f seconds", slept)
                    #logger.debug("Slept %f seconds; total: %f; monotonic time now: %f" % (slept, self.__slept, self.__monotonic_last_wakeup))
                callbacks=self.__callbacks.copy()
            do_callbacks(callbacks, True)

        # Return dreamtime
        def dreamtime_sleeping(self, snapshot=Snapshot()):
            with self.__lock:
                # macOS "sleep" signals / status are complete bollocks: at least
                # when plugged in, the system is actually sometimes running all
                # night with the lid closed and sleepNotification delivered.
                # Therefore, we need our timers to work in a sane manner when
                # we should be sleeping!
                if self.__sleeptime is not None:
                    sleeping=True
                    now_dreamtime=self.__dreamtime_last_sleep
                else:
                    sleeping=False
                    now=snapshot.monotonic()
                    now_dreamtime=(self.__dreamtime_last_sleep
                                   +now-self.__monotonic_last_wakeup)
                    #logger.debug("Dreamtime request: last_sleep: %f now: %f; last wakeup: %f => now %f dreamtime "
                    #             % (self.__dreamtime_last_sleep, now, self.__monotonic_last_wakeup, now_dreamtime))
            return now_dreamtime, sleeping

        # Weirdo (Py)ObjC naming to stop it form choking up
        def addForObj_aCallback_(self, obj, callback):
            with self.__lock:
                self.__callbacks[obj]=callback

    # obj is to use a a key in a weak key dictionary
    def add_callback(obj, callback):
        global _dreamtime_monitor

        monitor=_dreamtime_monitor
        if not monitor:
            raise Exception("Dreamtime monitor not started")
        else:
            monitor.addForObj_aCallback_(obj, callback)

    def start_monitoring():
        global _dreamtime_monitor

        logger.debug("Starting to monitor system sleep")
        workspace = AppKit.NSWorkspace.sharedWorkspace()
        notification_center = workspace.notificationCenter()
        _dreamtime_monitor = SleepHandler.new()

        notification_center.addObserver_selector_name_object_(
            _dreamtime_monitor,
            "handleSleepNotification:",
            AppKit.NSWorkspaceWillSleepNotification,
            None)

        notification_center.addObserver_selector_name_object_(
            _dreamtime_monitor,
            "handleWakeNotification:",
            AppKit.NSWorkspaceDidWakeNotification,
            None)

else: # Not on macOS

    def add_callback(obj, callback):
        pass

    def start_monitoring():
        logger.warning(("No system sleep monitor implemented for '%s'"
                       % platform.system()))

mercurial