backup.py

changeset 49
db33dfa64ad6
parent 48
be3ed25df789
child 50
2d8947351727
--- a/backup.py	Mon Jan 22 12:04:19 2018 +0000
+++ b/backup.py	Mon Jan 22 18:16:51 2018 +0000
@@ -8,8 +8,8 @@
 import keyring
 import borgend
 from instance import BorgInstance
-from queue import Queue
-from threading import Thread, Lock, Timer
+from threading import Thread, Lock, Condition
+from scheduler import TerminableThread
 
 logger=borgend.logger.getChild(__name__)
 
@@ -46,14 +46,14 @@
     return None
 
 
-class Backup:
+class Backup(TerminableThread):
 
     def __decode_config(self, cfg):
         loc0='backup target %d' % self.identifier
 
-        self.name=config.check_string(cfg, 'name', 'Name', loc0)
+        self._name=config.check_string(cfg, 'name', 'Name', loc0)
 
-        self.loc='backup target "%s"' % self.name
+        self.loc='backup target "%s"' % self._name
 
         self.repository=config.check_string(cfg, 'repository',
                                             'Target repository', self.loc)
@@ -115,25 +115,26 @@
                 self.__passphrase=None
         return self.__passphrase
 
-    def __init__(self, identifier, cfg):
+    def __init__(self, identifier, cfg, scheduler):
         self.identifier=identifier
-
-        self.__decode_config(cfg)
-
         self.config=config
         self.lastrun_when=None
         self.borg_instance=None
         self.current_operation=None
         self.thread_log=None
         self.thread_res=None
-        self.timer=None
         self.scheduled_operation=None
-        self.lock=Lock()
         self.__status_update_callback=None
         self.state=INACTIVE
+        self.scheduler=scheduler
+
+        self.__decode_config(cfg)
+
+        super().__init__(target = self.__main_thread, name = self._name)
+        self.daemon=True
 
     def is_running(self):
-        with self.lock:
+        with self._cond:
             running=self.__is_running_unlocked()
         return running
 
@@ -163,7 +164,7 @@
                 current=safe_get_int(status, 'current')
                 total=safe_get_int(status, 'total')
                 if current is not None and total is not None:
-                    with self.lock:
+                    with self._cond:
                         self.current_operation['progress_current']=current
                         self.current_operation['progress_total']=total
                         status, callback=self.__status_unlocked()
@@ -173,7 +174,7 @@
                 compressed_size=safe_get_int(status, 'compressed_size')
                 deduplicated_size=safe_get_int(status, 'deduplicated_size')
                 if original_size is not None and original_size is not None and deduplicated_size is not None:
-                    with self.lock:
+                    with self._cond:
                         self.current_operation['original_size']=original_size
                         self.current_operation['compressed_size']=compressed_size
                         self.current_operation['deduplicated_size']=deduplicated_size
@@ -202,7 +203,7 @@
                          status['msgid']=='LockErrorT' or # in docs
                          status['msgid']=='LockErrorT')): # in docs
                         state=BUSY
-                    with self.lock:
+                    with self._cond:
                         self.state=combine_state(self.state, state)
                         status, callback=self.__status_unlocked()
             else:
@@ -218,7 +219,7 @@
         logger.debug('Borg subprocess terminated; terminating log listener thread')
 
     def __result_listener(self):
-        with self.lock:
+        with self._cond:
             status, callback=self.__status_unlocked()
         if callback:
             callback(self, status)
@@ -230,7 +231,7 @@
         # Finish processing remaining errors
         self.thread_log.join()
 
-        with self.lock:
+        with self._cond:
             state=self.state
 
         # If there were no errors, reset back to INACTIVE state
@@ -251,7 +252,7 @@
 
         logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state))
 
-        with self.lock:
+        with self._cond:
             if self.current_operation['operation']=='create':
                 self.lastrun_when=self.current_operation['when_monotonic']
             self.thread_res=None
@@ -259,12 +260,9 @@
             self.borg_instance=None
             self.current_operation=None
             self.state=state
-            self.__schedule_unlocked()
-            status, callback=self.__status_unlocked()
-        if callback:
-            callback(self, status)
+            self._cond.notify()
 
-    def __do_launch(self, queue, op, archive_or_repository, *args):
+    def __do_launch(self, op, archive_or_repository, *args):
         passphrase=self.extract_passphrase()
 
         inst=BorgInstance(op['operation'], archive_or_repository, *args)
@@ -281,7 +279,6 @@
         self.thread_log=t_log
         self.thread_res=t_res
         self.borg_instance=inst
-        self.queue=queue
         self.current_operation=op
         self.current_operation['when_monotonic']=time.monotonic()
         self.state=ACTIVE
@@ -289,30 +286,25 @@
         t_log.start()
         t_res.start()
 
-    def __launch(self, op, queue):
+    def __launch(self, op):
         if self.__is_running_unlocked():
             logging.info('Cannot start %s: already running %s'
                          % (operation, self.current_operation))
             return False
         else:
-            if self.timer:
-                logger.debug('Unscheduling timed operation due to launch of operation')
-                self.timer=None
-                self.scheduled_operation=None
-
             try:
-                logger.debug("Launching '%s' on '%s'" % (op['operation'], self.name))
+                logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name))
 
                 if op['operation']=='create':
                     archive="%s::%s%s" % (self.repository,
                                           self.archive_prefix,
                                           self.archive_template)
 
-                    self.__do_launch(queue, op, archive,
+                    self.__do_launch(op, archive,
                                      self.common_parameters+self.create_parameters,
                                      self.paths)
                 elif op['operation']=='prune':
-                    self.__do_launch(queue, op, self.repository,
+                    self.__do_launch(op, self.repository,
                                      ([{'prefix': self.archive_prefix}] + 
                                       self.common_parameters +
                                       self.prune_parameters))
@@ -322,66 +314,27 @@
                 logger.debug('Rescheduling after failure')
                 self.lastrun_when=time.monotonic()
                 self.state=ERRORS
-                self.__schedule_unlocked()
                 raise err
 
             return True
 
-    def create(self, queue):
+    def create(self):
         op={'operation': 'create', 'detail': 'manual'}
-        with self.lock:
-            res=self.__launch(op, queue)
-        return res
+        with self._cond:
+            self.scheduled_operation=op
+            self._cond.notify()
 
-    def prune(self, queue):
+    def prune(self):
         op={'operation': 'prune', 'detail': 'manual'}
-        with self.lock:
-            res=self.__launch(op, queue)
-        return res
+        with self._cond:
+            self.scheduled_operation=op
+            self._cond.notify()
 
     # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
     def abort(self):
-        with self.lock:
+        with self._cond:
             if self.borg_instance:
                 self.borg_instance.terminate()
-            #thread_log=self.thread_log
-            #thread_res=self.thread_res
-
-        #if thread_log:
-        #    thread_log.terminate()
-
-        #if thread_res:
-        #    thread_res.terminate()
-
-
-    def join(self):
-        logger.debug('Waiting for borg listener threads to terminate')
-
-        with self.lock:
-            thread_log=self.thread_log
-            thread_res=self.thread_res
-
-        if thread_log:
-            thread_log.join()
-
-        if thread_res:
-            thread_res.join()
-
-        assert(self.thread_log==None and self.thread_res==None)
-
-    def __queue_timed_operation(self):
-        with self.lock:
-            op=self.scheduled_operation
-            self.scheduled_operation=None
-            self.timer=None
-
-            if self.__is_running_unlocked():
-                logger.info('Aborted queue operation, as an operation is already running')
-            else:
-                # TODO: Queue on 'repository' and online status for SSH, etc.
-
-                # TODO: UI comms. queue?
-                self.__launch(op, None)
 
     def __next_operation_unlocked(self):
         # TODO: pruning as well
@@ -411,33 +364,49 @@
                         'detail': 'normal',
                         'when_monotonic': self.lastrun_when+self.backup_interval}
 
-    def __schedule_unlocked(self):
-        if self.current_operation:
-            return self.current_operation
-        else:
-            op=self.__next_operation_unlocked()
+    def __main_thread(self):
+        with self._cond:
+            while not self._terminate:
+                op=None
+                if not self.current_operation:
+                    op=self.__next_operation_unlocked()
+                if not op:
+                    self.__update_status()
+                    self._cond.wait()
+                else:
+                    now=time.monotonic()
+                    delay=max(0, op['when_monotonic']-now)
+                    logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" %
+                                (op['operation'], op['detail'], self._name, delay))
+
+                    self.scheduled_operation=op
+                    self.state=combine_state(self.state, SCHEDULED)
+
+                    self.__update_status()
+                    self.scheduler.wait_until(now+delay, self._cond, self._name)
 
-            if op:
-                now=time.monotonic()
-                delay=max(0, op['when_monotonic']-now)
-                logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" %
-                            (op['operation'], op['detail'], self.name, delay))
-                tmr=Timer(delay, self.__queue_timed_operation)
-                self.scheduled_operation=op
-                self.timer=tmr
-                self.state=combine_state(self.state, SCHEDULED)
-                tmr.start()
+                if self.scheduled_operation:
+                    op=self.scheduled_operation
+                    self.scheduled_operation=None
+                    self.__launch(op)
+
+            # Kill a running borg to cause log and result threads to terminate
+            if self.borg_instance:
+                logger.debug("Terminating a borg instance")
+                self.borg_instance.terminate()
 
-            return op
+            # Store threads to use outside lock
+            thread_log=self.thread_log
+            thread_err=self.thread_err
+
+        logger.debug("Waiting for log and result threads to terminate")
 
-    def schedule(self):
-        with self.lock:
-            return self.__schedule_unlocked()
+        if thread_log:
+            thread_log.join()
 
-    def status(self):
-        with self.lock:
-            res=self.__status_unlocked()
-        return res[0]
+        if thread_res:
+            thread_res.join()
+
 
     def __status_unlocked(self):
         callback=self.__status_update_callback
@@ -453,7 +422,7 @@
             else:
                 status={'type': 'nothing'}
 
-        status['name']=self.name
+        status['name']=self._name
         status['state']=self.state
 
         if 'detail' not in status:
@@ -465,8 +434,21 @@
 
         return status, callback
 
+    def __update_status(self):
+        status, callback = self.__status_unlocked()
+        if callback:
+            self._cond.release()
+            try:
+                callback(self, status)
+            finally:
+                self._cond.acquire()
+
     def set_status_update_callback(self, callback):
-        with self.lock:
+        with self._cond:
             self.__status_update_callback=callback
 
+    def status(self):
+        with self._cond:
+            res=self.__status_unlocked()
+        return res[0]
 

mercurial