backup.py

changeset 8
7b2d2eac6a48
parent 7
e189d4a6cb8c
child 10
76dbfb06eba0
--- a/backup.py	Sat Jan 20 14:04:51 2018 +0000
+++ b/backup.py	Sat Jan 20 15:08:16 2018 +0000
@@ -7,7 +7,7 @@
 import time
 from instance import BorgInstance
 from queue import Queue
-from threading import Thread, Lock
+from threading import Thread, Lock, Timer
 
 loglevel_translation={
     'CRITICAL': logging.CRITICAL,
@@ -75,12 +75,23 @@
         self.borg_instance=None
         self.current_operation=None
         self.thread_log=None
-        self.thread_err=None
+        self.thread_res=None
+        self.timer=None
+        self.timer_operation=None
+        self.timer_time=None
         self.lock=Lock()
 
     def is_running(self):
         with self.lock:
-            running=self.borg_instance or self.thread_log or self.thread_err
+            running=self.__is_running_unlocked()
+        return running
+
+    def __is_running_unlocked(self):
+        running=self.current_operation
+        if not running:
+            # Consistency check
+            assert((not self.borg_instance and not self.thread_log and
+                    not self.thread_res))
         return running
 
     def __block_when_running(self):
@@ -124,11 +135,6 @@
             elif t=='unparsed_error':
                 success=False
 
-            #if (may_indicate_finished and 'finished' in status and
-            #    status['finished']):
-            #    logging.info('Borg subprocess finished succesfully')
-            #    success=status['finished']
-
         logging.debug('Waiting for borg subprocess to terminate in log thread')
 
         self.borg_instance.wait()
@@ -137,7 +143,7 @@
 
         with self.lock:
             self.thread_log=None
-            self.__cleanup_if_both_listeners_terminated()
+            self.__finish_and_reschedule_if_both_listeners_terminated()
 
 
     def __result_listener(self):
@@ -154,26 +160,26 @@
 
         logging.debug('Waiting for borg subprocess to terminate in result thread')
 
-        self.borg_instance.wait()
+        success=success and self.borg_instance.wait()
 
-        logging.debug('Borg subprocess terminated; terminating result listener thread')
+        logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success))
 
         with self.lock:
             if self.current_operation=='create':
                 self.lastrun=self.time_started
                 self.lastrun_success=success
             self.thread_res=None
-            self.__cleanup_if_both_listeners_terminated()
+            self.__finish_and_reschedule_if_both_listeners_terminated()
 
-    def __cleanup_if_both_listeners_terminated(self):
+    def __finish_and_reschedule_if_both_listeners_terminated(self):
         if self.thread_res==None and self.thread_log==None:
             logging.debug('Both threads terminated')
             self.borg_instance=None
+            self.time_started=None
             self.current_operation=None
-            self.time_started=None
+            self.__schedule_unlocked()
 
-    def __launch(self, queue, operation, archive_or_repository, *args):
-
+    def __do_launch(self, queue, operation, archive_or_repository, *args):
         inst=BorgInstance(operation, archive_or_repository, *args)
         inst.launch()
 
@@ -193,23 +199,48 @@
         t_log.start()
         t_res.start()
 
-    def create(self, queue):
-        self.__block_when_running()
+    def __launch(self, operation, queue):
+        if self.__is_running_unlocked():
+            logging.info('Cannot start %s: already running %s'
+                         % (operation, self.current_operation))
+            return False
+        else:
+            if self.timer:
+                logging.debug('Unscheduling timed operation due to launch of operation')
+                self.timer=None
+                self.timer_operation=None
+                self.timer_time=None
+
+            logging.debug("Launching '%s' on '%s'" % (operation, self.name))
+
+            if operation=='create':
+                archive="%s::%s%s" % (self.repository,
+                                      self.archive_prefix,
+                                      self.archive_template)
 
-        archive="%s::%s%s" % (self.repository,
-                              self.archive_prefix,
-                              self.archive_template)
+                self.__do_launch(queue, operation, archive,
+                                 self.common_parameters+self.create_parameters,
+                                 self.paths)
+            elif operation=='prune':
+                self.__do_launch(queue, 'prune', self.repository,
+                                 ([{'prefix': self.archive_prefix}] + 
+                                  self.common_parameters +
+                                  self.prune_parameters))
+            else:
+                logging.error("Invalid operaton '%s'" % operation)
+                self.__schedule_unlocked()
 
-        self.__launch(queue, 'create', archive,
-                      self.common_parameters+self.create_parameters,
-                      self.paths)
+            return True
+
+    def create(self, queue):
+        with self.lock:
+            res=self.__launch('create', queue)
+        return res
 
     def prune(self, queue):
-        self.__block_when_running()
-        self.__launch(queue, 'prune', self.repository,
-                      ([{'prefix': self.archive_prefix}] + 
-                       self.common_parameters +
-                       self.prune_parameters))
+        with self.lock:
+            res=self.__launch('prune', queue)
+        return res
 
     # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
     def abort(self):
@@ -227,7 +258,7 @@
 
 
     def join(self):
-        logging.debug('Waiting for borg listener thread to terminate')
+        logging.debug('Waiting for borg listener threads to terminate')
 
         with self.lock:
             thread_log=self.thread_log
@@ -241,9 +272,42 @@
 
         assert(self.thread_log==None and self.thread_res==None)
 
-    def next_action():
-        __block_when_running()
-        # TODO pruning as well
+    def __queue_timed_operation(self):
+        with self.lock:
+            operation=self.timer_operation
+            self.timer_operation=None
+            self.timer_time=None
+            self.timer=None
+
+            if self.__is_running_unlocked():
+                logging.info('Aborted queue operation, as an operation is already running')
+            else:
+                # TODO: Queue on 'repository' and online status for SSH, etc.
+
+                # TODO: UI comms. queue?
+                self.__launch(operation, None)
+
+    def __schedule_unlocked(self):
+        if self.current_operation:
+            return self.current_operation, None
+        else:
+            operation, when=self.__next_operation_unlocked()
+
+            if operation:
+                now=time.monotonic()
+                delay=max(0, when-now)
+                logging.info("Scheduling '%s' of '%s' in %d seconds" %
+                             (operation, self.name, delay))
+                tmr=Timer(delay, self.__queue_timed_operation)
+                self.timer_operation=operation
+                self.timer_time=when
+                self.timer=tmr
+                tmr.start()
+
+            return operation, time
+
+    def __next_operation_unlocked(self):
+        # TODO: pruning as well
         now=time.monotonic()
         if not self.lastrun:
             return 'create', now+self.retry_interval
@@ -255,4 +319,8 @@
             else:
                 return 'create', self.lastrun+self.backup_interval
 
+    def schedule(self):
+        with self.lock:
+            return self.__schedule_unlocked()
 
+

mercurial