Improved scheduler

Mon, 22 Jan 2018 18:16:51 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Mon, 22 Jan 2018 18:16:51 +0000
changeset 49
db33dfa64ad6
parent 48
be3ed25df789
child 50
2d8947351727

Improved scheduler

backup.py file | annotate | diff | comparison | revisions
borgend.py file | annotate | diff | comparison | revisions
scheduler.py file | annotate | diff | comparison | revisions
ui.py file | annotate | diff | comparison | revisions
--- a/backup.py	Mon Jan 22 12:04:19 2018 +0000
+++ b/backup.py	Mon Jan 22 18:16:51 2018 +0000
@@ -8,8 +8,8 @@
 import keyring
 import borgend
 from instance import BorgInstance
-from queue import Queue
-from threading import Thread, Lock, Timer
+from threading import Thread, Lock, Condition
+from scheduler import TerminableThread
 
 logger=borgend.logger.getChild(__name__)
 
@@ -46,14 +46,14 @@
     return None
 
 
-class Backup:
+class Backup(TerminableThread):
 
     def __decode_config(self, cfg):
         loc0='backup target %d' % self.identifier
 
-        self.name=config.check_string(cfg, 'name', 'Name', loc0)
+        self._name=config.check_string(cfg, 'name', 'Name', loc0)
 
-        self.loc='backup target "%s"' % self.name
+        self.loc='backup target "%s"' % self._name
 
         self.repository=config.check_string(cfg, 'repository',
                                             'Target repository', self.loc)
@@ -115,25 +115,26 @@
                 self.__passphrase=None
         return self.__passphrase
 
-    def __init__(self, identifier, cfg):
+    def __init__(self, identifier, cfg, scheduler):
         self.identifier=identifier
-
-        self.__decode_config(cfg)
-
         self.config=config
         self.lastrun_when=None
         self.borg_instance=None
         self.current_operation=None
         self.thread_log=None
         self.thread_res=None
-        self.timer=None
         self.scheduled_operation=None
-        self.lock=Lock()
         self.__status_update_callback=None
         self.state=INACTIVE
+        self.scheduler=scheduler
+
+        self.__decode_config(cfg)
+
+        super().__init__(target = self.__main_thread, name = self._name)
+        self.daemon=True
 
     def is_running(self):
-        with self.lock:
+        with self._cond:
             running=self.__is_running_unlocked()
         return running
 
@@ -163,7 +164,7 @@
                 current=safe_get_int(status, 'current')
                 total=safe_get_int(status, 'total')
                 if current is not None and total is not None:
-                    with self.lock:
+                    with self._cond:
                         self.current_operation['progress_current']=current
                         self.current_operation['progress_total']=total
                         status, callback=self.__status_unlocked()
@@ -173,7 +174,7 @@
                 compressed_size=safe_get_int(status, 'compressed_size')
                 deduplicated_size=safe_get_int(status, 'deduplicated_size')
                 if original_size is not None and original_size is not None and deduplicated_size is not None:
-                    with self.lock:
+                    with self._cond:
                         self.current_operation['original_size']=original_size
                         self.current_operation['compressed_size']=compressed_size
                         self.current_operation['deduplicated_size']=deduplicated_size
@@ -202,7 +203,7 @@
                          status['msgid']=='LockErrorT' or # in docs
                          status['msgid']=='LockErrorT')): # in docs
                         state=BUSY
-                    with self.lock:
+                    with self._cond:
                         self.state=combine_state(self.state, state)
                         status, callback=self.__status_unlocked()
             else:
@@ -218,7 +219,7 @@
         logger.debug('Borg subprocess terminated; terminating log listener thread')
 
     def __result_listener(self):
-        with self.lock:
+        with self._cond:
             status, callback=self.__status_unlocked()
         if callback:
             callback(self, status)
@@ -230,7 +231,7 @@
         # Finish processing remaining errors
         self.thread_log.join()
 
-        with self.lock:
+        with self._cond:
             state=self.state
 
         # If there were no errors, reset back to INACTIVE state
@@ -251,7 +252,7 @@
 
         logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state))
 
-        with self.lock:
+        with self._cond:
             if self.current_operation['operation']=='create':
                 self.lastrun_when=self.current_operation['when_monotonic']
             self.thread_res=None
@@ -259,12 +260,9 @@
             self.borg_instance=None
             self.current_operation=None
             self.state=state
-            self.__schedule_unlocked()
-            status, callback=self.__status_unlocked()
-        if callback:
-            callback(self, status)
+            self._cond.notify()
 
-    def __do_launch(self, queue, op, archive_or_repository, *args):
+    def __do_launch(self, op, archive_or_repository, *args):
         passphrase=self.extract_passphrase()
 
         inst=BorgInstance(op['operation'], archive_or_repository, *args)
@@ -281,7 +279,6 @@
         self.thread_log=t_log
         self.thread_res=t_res
         self.borg_instance=inst
-        self.queue=queue
         self.current_operation=op
         self.current_operation['when_monotonic']=time.monotonic()
         self.state=ACTIVE
@@ -289,30 +286,25 @@
         t_log.start()
         t_res.start()
 
-    def __launch(self, op, queue):
+    def __launch(self, op):
         if self.__is_running_unlocked():
             logging.info('Cannot start %s: already running %s'
                          % (operation, self.current_operation))
             return False
         else:
-            if self.timer:
-                logger.debug('Unscheduling timed operation due to launch of operation')
-                self.timer=None
-                self.scheduled_operation=None
-
             try:
-                logger.debug("Launching '%s' on '%s'" % (op['operation'], self.name))
+                logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name))
 
                 if op['operation']=='create':
                     archive="%s::%s%s" % (self.repository,
                                           self.archive_prefix,
                                           self.archive_template)
 
-                    self.__do_launch(queue, op, archive,
+                    self.__do_launch(op, archive,
                                      self.common_parameters+self.create_parameters,
                                      self.paths)
                 elif op['operation']=='prune':
-                    self.__do_launch(queue, op, self.repository,
+                    self.__do_launch(op, self.repository,
                                      ([{'prefix': self.archive_prefix}] + 
                                       self.common_parameters +
                                       self.prune_parameters))
@@ -322,66 +314,27 @@
                 logger.debug('Rescheduling after failure')
                 self.lastrun_when=time.monotonic()
                 self.state=ERRORS
-                self.__schedule_unlocked()
                 raise err
 
             return True
 
-    def create(self, queue):
+    def create(self):
         op={'operation': 'create', 'detail': 'manual'}
-        with self.lock:
-            res=self.__launch(op, queue)
-        return res
+        with self._cond:
+            self.scheduled_operation=op
+            self._cond.notify()
 
-    def prune(self, queue):
+    def prune(self):
         op={'operation': 'prune', 'detail': 'manual'}
-        with self.lock:
-            res=self.__launch(op, queue)
-        return res
+        with self._cond:
+            self.scheduled_operation=op
+            self._cond.notify()
 
     # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
     def abort(self):
-        with self.lock:
+        with self._cond:
             if self.borg_instance:
                 self.borg_instance.terminate()
-            #thread_log=self.thread_log
-            #thread_res=self.thread_res
-
-        #if thread_log:
-        #    thread_log.terminate()
-
-        #if thread_res:
-        #    thread_res.terminate()
-
-
-    def join(self):
-        logger.debug('Waiting for borg listener threads to terminate')
-
-        with self.lock:
-            thread_log=self.thread_log
-            thread_res=self.thread_res
-
-        if thread_log:
-            thread_log.join()
-
-        if thread_res:
-            thread_res.join()
-
-        assert(self.thread_log==None and self.thread_res==None)
-
-    def __queue_timed_operation(self):
-        with self.lock:
-            op=self.scheduled_operation
-            self.scheduled_operation=None
-            self.timer=None
-
-            if self.__is_running_unlocked():
-                logger.info('Aborted queue operation, as an operation is already running')
-            else:
-                # TODO: Queue on 'repository' and online status for SSH, etc.
-
-                # TODO: UI comms. queue?
-                self.__launch(op, None)
 
     def __next_operation_unlocked(self):
         # TODO: pruning as well
@@ -411,33 +364,49 @@
                         'detail': 'normal',
                         'when_monotonic': self.lastrun_when+self.backup_interval}
 
-    def __schedule_unlocked(self):
-        if self.current_operation:
-            return self.current_operation
-        else:
-            op=self.__next_operation_unlocked()
+    def __main_thread(self):
+        with self._cond:
+            while not self._terminate:
+                op=None
+                if not self.current_operation:
+                    op=self.__next_operation_unlocked()
+                if not op:
+                    self.__update_status()
+                    self._cond.wait()
+                else:
+                    now=time.monotonic()
+                    delay=max(0, op['when_monotonic']-now)
+                    logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" %
+                                (op['operation'], op['detail'], self._name, delay))
+
+                    self.scheduled_operation=op
+                    self.state=combine_state(self.state, SCHEDULED)
+
+                    self.__update_status()
+                    self.scheduler.wait_until(now+delay, self._cond, self._name)
 
-            if op:
-                now=time.monotonic()
-                delay=max(0, op['when_monotonic']-now)
-                logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" %
-                            (op['operation'], op['detail'], self.name, delay))
-                tmr=Timer(delay, self.__queue_timed_operation)
-                self.scheduled_operation=op
-                self.timer=tmr
-                self.state=combine_state(self.state, SCHEDULED)
-                tmr.start()
+                if self.scheduled_operation:
+                    op=self.scheduled_operation
+                    self.scheduled_operation=None
+                    self.__launch(op)
+
+            # Kill a running borg to cause log and result threads to terminate
+            if self.borg_instance:
+                logger.debug("Terminating a borg instance")
+                self.borg_instance.terminate()
 
-            return op
+            # Store threads to use outside lock
+            thread_log=self.thread_log
+            thread_err=self.thread_err
+
+        logger.debug("Waiting for log and result threads to terminate")
 
-    def schedule(self):
-        with self.lock:
-            return self.__schedule_unlocked()
+        if thread_log:
+            thread_log.join()
 
-    def status(self):
-        with self.lock:
-            res=self.__status_unlocked()
-        return res[0]
+        if thread_res:
+            thread_res.join()
+
 
     def __status_unlocked(self):
         callback=self.__status_update_callback
@@ -453,7 +422,7 @@
             else:
                 status={'type': 'nothing'}
 
-        status['name']=self.name
+        status['name']=self._name
         status['state']=self.state
 
         if 'detail' not in status:
@@ -465,8 +434,21 @@
 
         return status, callback
 
+    def __update_status(self):
+        status, callback = self.__status_unlocked()
+        if callback:
+            self._cond.release()
+            try:
+                callback(self, status)
+            finally:
+                self._cond.acquire()
+
     def set_status_update_callback(self, callback):
-        with self.lock:
+        with self._cond:
             self.__status_update_callback=callback
 
+    def status(self):
+        with self._cond:
+            res=self.__status_unlocked()
+        return res[0]
 
--- a/borgend.py	Mon Jan 22 12:04:19 2018 +0000
+++ b/borgend.py	Mon Jan 22 18:16:51 2018 +0000
@@ -7,7 +7,6 @@
 import argparse
 import platform
 import utils
-from fifolog import FIFOHandler
 
 #
 # Branding
@@ -30,14 +29,23 @@
 
 logger=logging.getLogger(appname)
 logger.setLevel(loglevel)
+stderrlog=logging.StreamHandler()
+logger.addHandler(stderrlog)
 logger.propagate=True
 
 #
+# Import our own modules. This needs to be done here
+# for the things above to be available to them
+#
+
+import config
+from scheduler import Scheduler
+from fifolog import FIFOHandler
+
+#
 # Argument processing
 #
 
-import config
-
 def do_args():
     parser=argparse.ArgumentParser(
         description=appname_stylised + ': BorgBackup scheduler and tray icon.',
@@ -86,11 +94,15 @@
         backupconfigs=config.settings['backups']
         backups=[None]*len(backupconfigs);
 
+        scheduler = Scheduler()
+
         try:
+            scheduler.start()
+
             for i in range(len(backupconfigs)):
                 logger.info('Setting up backup set %d' % i)
-                backups[i]=Backup(i, backupconfigs[i])
-                backups[i].schedule()
+                backups[i]=Backup(i, backupconfigs[i], scheduler)
+                backups[i].start()
 
             if args.notray or platform.system()!='Darwin':
                 pass
@@ -104,6 +116,8 @@
                     backups[i].abort()
             backups=[]
     except Exception as err:
+        # TODO: Should write errors here to stderr;
+        # perhaps add an extra stderr logger for error level messages
         utils.log_exception(logger, err, detail='Exiting')
         if tray:
             tray.quit()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scheduler.py	Mon Jan 22 18:16:51 2018 +0000
@@ -0,0 +1,125 @@
+#
+# Scheduler for Borgend
+#
+# This module simply provide a way for other threads to until a given time
+#
+
+import time
+import borgend
+from threading import Condition, Lock, Thread
+
+logger=borgend.logger.getChild(__name__)
+
+class ScheduledEvent:
+    def __init__(self, when, cond, name=None):
+        self.next=None
+        self.prev=None
+        self.when=when
+        self.name=name
+        self.cond=Condition()
+
+    def __lt__(self, other):
+        return self.when < other.when
+
+    def __gt__(self, other):
+        return self.when > other.when
+
+    def insert_after(self, ev):
+        if not self.next:
+            ev.prev=self
+            self.next=ev
+            ev.next=None
+        elif self.next>ev:
+            self.insert_immediately_after(ev)
+        else:
+            self.next.insert_after(ev)
+
+    def insert_immediately_after(self, ev):
+        ev.prev=self
+        ev.next=self.next
+        if ev.next:
+            ev.next.prev=ev
+        self.next=ev
+
+    def unlink(self):
+        n=self.next
+        p=self.prev
+        if n:
+            n.prev=p
+        if p:
+            p.next=n
+
+class TerminableThread(Thread):
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._terminate=False
+        self._cond=Condition()
+
+    def terminate(self):
+        with self._cond:
+            _terminate=True
+            self._cond.notify()
+
+
+class Scheduler(TerminableThread):
+    # Default to precision of 60 seconds: the scheduler thread will never
+    # sleep longer than that, to get quickly back on track with the schedule
+    # when the computer wakes up from sleep
+    def __init__(self, precision=60):
+        self.precision = precision
+        self.__next_event_time = None
+        self.__list = None
+        self._cond = Condition()
+        self._terminate = False
+        super().__init__(target = self.__scheduler_thread, name = 'Scheduler')
+        self.daemon=True
+
+    def __scheduler_thread(self):
+        with self._cond:
+            while not self._terminate:
+                now = time.monotonic()
+                if not self.__list:
+                    timeout = None
+                else:
+                    # Wait at most precision seconds, or until next event if it
+                    # comes earlier
+                    timeout=min(self.precision, self.__list.when-now)
+
+                if not timeout or timeout>0:
+                    self._cond.wait(timeout)
+                    now = time.monotonic()
+
+                while self.__list and self.__list.when <= now:
+                    ev=self.__list
+                    logger.info("Found schedulable event %s" % str(ev.name))
+                    self.__list=ev.next
+                    ev.unlink()
+                    ev.cond.acquire()
+                    ev.cond.notifyAll()
+                    ev.cond.release()
+
+    # cond has to be acquired on entry!
+    def wait_until(self, when, cond, name=None):
+        ev=ScheduledEvent(when, cond, name)
+        with self._cond:
+            if not self.__list:
+                self.__list=ev
+            elif self.__list > ev:
+                ev.insert_immediately_after(self.__list)
+                self.__list=ev
+            else:
+                self.__list.insert_immediately_after(ev)
+
+            self._cond.notify()
+        # This will release the lock on cond, allowing scheduler thread
+        # to notify us if we are already to be released
+        cond.wait()
+
+        # If we were woken up by some other event, not the scheduler,
+        # ensure the event is removed
+        with self._cond:
+            if ev==self.__list:
+                self.__list=ev.next
+            ev.unlink()
+
+
--- a/ui.py	Mon Jan 22 12:04:19 2018 +0000
+++ b/ui.py	Mon Jan 22 18:16:51 2018 +0000
@@ -206,7 +206,7 @@
         #sender.state=not sender.state
         logger.debug("Manually backup '%s'", b.name)
         try:
-            b.create(None)
+            b.create()
         except Exception as err:
             utils.log_exception(logger, err)
             notification_workaround(borgend.appname_stylised,

mercurial