basic scheduling

Sat, 20 Jan 2018 15:08:16 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Sat, 20 Jan 2018 15:08:16 +0000
changeset 8
7b2d2eac6a48
parent 7
e189d4a6cb8c
child 9
aa121291eb0e

basic scheduling

backup.py file | annotate | diff | comparison | revisions
borgend.py file | annotate | diff | comparison | revisions
scheduler.py file | annotate | diff | comparison | revisions
--- a/backup.py	Sat Jan 20 14:04:51 2018 +0000
+++ b/backup.py	Sat Jan 20 15:08:16 2018 +0000
@@ -7,7 +7,7 @@
 import time
 from instance import BorgInstance
 from queue import Queue
-from threading import Thread, Lock
+from threading import Thread, Lock, Timer
 
 loglevel_translation={
     'CRITICAL': logging.CRITICAL,
@@ -75,12 +75,23 @@
         self.borg_instance=None
         self.current_operation=None
         self.thread_log=None
-        self.thread_err=None
+        self.thread_res=None
+        self.timer=None
+        self.timer_operation=None
+        self.timer_time=None
         self.lock=Lock()
 
     def is_running(self):
         with self.lock:
-            running=self.borg_instance or self.thread_log or self.thread_err
+            running=self.__is_running_unlocked()
+        return running
+
+    def __is_running_unlocked(self):
+        running=self.current_operation
+        if not running:
+            # Consistency check
+            assert((not self.borg_instance and not self.thread_log and
+                    not self.thread_res))
         return running
 
     def __block_when_running(self):
@@ -124,11 +135,6 @@
             elif t=='unparsed_error':
                 success=False
 
-            #if (may_indicate_finished and 'finished' in status and
-            #    status['finished']):
-            #    logging.info('Borg subprocess finished succesfully')
-            #    success=status['finished']
-
         logging.debug('Waiting for borg subprocess to terminate in log thread')
 
         self.borg_instance.wait()
@@ -137,7 +143,7 @@
 
         with self.lock:
             self.thread_log=None
-            self.__cleanup_if_both_listeners_terminated()
+            self.__finish_and_reschedule_if_both_listeners_terminated()
 
 
     def __result_listener(self):
@@ -154,26 +160,26 @@
 
         logging.debug('Waiting for borg subprocess to terminate in result thread')
 
-        self.borg_instance.wait()
+        success=success and self.borg_instance.wait()
 
-        logging.debug('Borg subprocess terminated; terminating result listener thread')
+        logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success))
 
         with self.lock:
             if self.current_operation=='create':
                 self.lastrun=self.time_started
                 self.lastrun_success=success
             self.thread_res=None
-            self.__cleanup_if_both_listeners_terminated()
+            self.__finish_and_reschedule_if_both_listeners_terminated()
 
-    def __cleanup_if_both_listeners_terminated(self):
+    def __finish_and_reschedule_if_both_listeners_terminated(self):
         if self.thread_res==None and self.thread_log==None:
             logging.debug('Both threads terminated')
             self.borg_instance=None
+            self.time_started=None
             self.current_operation=None
-            self.time_started=None
+            self.__schedule_unlocked()
 
-    def __launch(self, queue, operation, archive_or_repository, *args):
-
+    def __do_launch(self, queue, operation, archive_or_repository, *args):
         inst=BorgInstance(operation, archive_or_repository, *args)
         inst.launch()
 
@@ -193,23 +199,48 @@
         t_log.start()
         t_res.start()
 
-    def create(self, queue):
-        self.__block_when_running()
+    def __launch(self, operation, queue):
+        if self.__is_running_unlocked():
+            logging.info('Cannot start %s: already running %s'
+                         % (operation, self.current_operation))
+            return False
+        else:
+            if self.timer:
+                logging.debug('Unscheduling timed operation due to launch of operation')
+                self.timer=None
+                self.timer_operation=None
+                self.timer_time=None
+
+            logging.debug("Launching '%s' on '%s'" % (operation, self.name))
+
+            if operation=='create':
+                archive="%s::%s%s" % (self.repository,
+                                      self.archive_prefix,
+                                      self.archive_template)
 
-        archive="%s::%s%s" % (self.repository,
-                              self.archive_prefix,
-                              self.archive_template)
+                self.__do_launch(queue, operation, archive,
+                                 self.common_parameters+self.create_parameters,
+                                 self.paths)
+            elif operation=='prune':
+                self.__do_launch(queue, 'prune', self.repository,
+                                 ([{'prefix': self.archive_prefix}] + 
+                                  self.common_parameters +
+                                  self.prune_parameters))
+            else:
+                logging.error("Invalid operaton '%s'" % operation)
+                self.__schedule_unlocked()
 
-        self.__launch(queue, 'create', archive,
-                      self.common_parameters+self.create_parameters,
-                      self.paths)
+            return True
+
+    def create(self, queue):
+        with self.lock:
+            res=self.__launch('create', queue)
+        return res
 
     def prune(self, queue):
-        self.__block_when_running()
-        self.__launch(queue, 'prune', self.repository,
-                      ([{'prefix': self.archive_prefix}] + 
-                       self.common_parameters +
-                       self.prune_parameters))
+        with self.lock:
+            res=self.__launch('prune', queue)
+        return res
 
     # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
     def abort(self):
@@ -227,7 +258,7 @@
 
 
     def join(self):
-        logging.debug('Waiting for borg listener thread to terminate')
+        logging.debug('Waiting for borg listener threads to terminate')
 
         with self.lock:
             thread_log=self.thread_log
@@ -241,9 +272,42 @@
 
         assert(self.thread_log==None and self.thread_res==None)
 
-    def next_action():
-        __block_when_running()
-        # TODO pruning as well
+    def __queue_timed_operation(self):
+        with self.lock:
+            operation=self.timer_operation
+            self.timer_operation=None
+            self.timer_time=None
+            self.timer=None
+
+            if self.__is_running_unlocked():
+                logging.info('Aborted queue operation, as an operation is already running')
+            else:
+                # TODO: Queue on 'repository' and online status for SSH, etc.
+
+                # TODO: UI comms. queue?
+                self.__launch(operation, None)
+
+    def __schedule_unlocked(self):
+        if self.current_operation:
+            return self.current_operation, None
+        else:
+            operation, when=self.__next_operation_unlocked()
+
+            if operation:
+                now=time.monotonic()
+                delay=max(0, when-now)
+                logging.info("Scheduling '%s' of '%s' in %d seconds" %
+                             (operation, self.name, delay))
+                tmr=Timer(delay, self.__queue_timed_operation)
+                self.timer_operation=operation
+                self.timer_time=when
+                self.timer=tmr
+                tmr.start()
+
+            return operation, time
+
+    def __next_operation_unlocked(self):
+        # TODO: pruning as well
         now=time.monotonic()
         if not self.lastrun:
             return 'create', now+self.retry_interval
@@ -255,4 +319,8 @@
             else:
                 return 'create', self.lastrun+self.backup_interval
 
+    def schedule(self):
+        with self.lock:
+            return self.__schedule_unlocked()
 
+
--- a/borgend.py	Sat Jan 20 14:04:51 2018 +0000
+++ b/borgend.py	Sat Jan 20 15:08:16 2018 +0000
@@ -9,12 +9,8 @@
 from backup import Backup
 from config import settings
 from queue import Queue
-import ui
+from ui import BorgendTray
 
-if __name__ == "__main__":
-    #print(settings)
-    #BorgendTray("Borgend").run()
-    pass
 
 backupconfigs=settings['backups']
 backups=[None]*len(backupconfigs);
@@ -24,6 +20,12 @@
     backups[i]=Backup(i, backupconfigs[i])
 
 queue=Queue()
-backups[0].create(queue)
+#print(backups[0].create(queue))
+backups[0].schedule()
+
+#backups[0].join()
 
-backups[0].join()
+if __name__ == "__main__":
+    #print(settings)
+    BorgendTray("Borgend").run()
+    pass
--- a/scheduler.py	Sat Jan 20 14:04:51 2018 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,25 +0,0 @@
-#
-# Borgend scheduler
-#
-
-from Queue import Queue
-import sched
-import ui
-
-
-class Scheduler:
-
-    def __init__(self, backups):
-        self.backups=backups
-        self.eventqueue=Queue()
-        self.t=Thread(target=self.__scheduler)
-        t.start()
-
-    def __scheduler(sched):
-        timeout=???
-        q=sched.eventqueue
-        while True:
-            timeout, timerevent=next_timed_event(sched);
-            t=Timer(timeout, lambda: q.put(timerevent));
-            event=sq.get(True):
-            # Decide what to do

mercurial