backup.py

changeset 7
e189d4a6cb8c
parent 6
46c89e5a219f
child 8
7b2d2eac6a48
--- a/backup.py	Fri Jan 19 16:53:13 2018 +0000
+++ b/backup.py	Sat Jan 20 14:04:51 2018 +0000
@@ -74,30 +74,38 @@
         self.lastrun_success=None
         self.borg_instance=None
         self.current_operation=None
-        self.thread=None
+        self.thread_log=None
+        self.thread_err=None
         self.lock=Lock()
 
+    def is_running(self):
+        with self.lock:
+            running=self.borg_instance or self.thread_log or self.thread_err
+        return running
+
     def __block_when_running(self):
-        with self.lock:
-            not_running=self.borg_instance is None and self.thread is None
-        assert(not_running)
+        running=self.is_running()
+        assert(not running)
 
-    def __listener(self):
-        success=False
-        for status in iter(self.borg_instance.read, None):
-            logging.info(str(status))
+    def __log_listener(self):
+        logging.debug('Log listener thread waiting for entries')
+        success=True
+        for status in iter(self.borg_instance.read_log, None):
+            logging.debug(str(status))
             t=status['type']
+            #may_indicate_finished=False
             if t=='progress_percent':
-                pass
+                #may_indicate_finished=True
+                # Temporary output
+                if 'current' not in status:
+                    status['current']=0
+                if 'total' not in status:
+                    status['total']=0
+                print('%d / %d' % (status['current'], status['total']))
             elif t=='archive_progress':
                 pass
             elif t=='progress_message':
-                if 'finished' in status:
-                    logging.info('Borg subprocess finished succesfully')
-                    success=status['finished']
-            elif t=='progress_percent':
-                # Temporary output
-                print('%d / %d', status['current'], status['total'])
+                #may_indicate_finished=True
                 pass
             elif t=='file_status':
                 pass
@@ -110,23 +118,57 @@
                     status['name']='borg'
                 logging.log(translate_loglevel(status['levelname']),
                             status['name'] + ': ' + status['message'])
+                # set success=False?
             elif t=='exception':
-                pass
+                success=False
             elif t=='unparsed_error':
-                pass
+                success=False
 
-        logging.info('Waiting for borg subprocess to terminate')
+            #if (may_indicate_finished and 'finished' in status and
+            #    status['finished']):
+            #    logging.info('Borg subprocess finished succesfully')
+            #    success=status['finished']
+
+        logging.debug('Waiting for borg subprocess to terminate in log thread')
 
         self.borg_instance.wait()
 
-        logging.info('Borg subprocess terminated; terminating listener thread')
+        logging.debug('Borg subprocess terminated; terminating log listener thread')
+
+        with self.lock:
+            self.thread_log=None
+            self.__cleanup_if_both_listeners_terminated()
+
+
+    def __result_listener(self):
+        logging.debug('Result listener thread waiting for result')
+
+        res=self.borg_instance.read_result()
+
+        success=True
+
+        logging.debug('Borg result: %s' % str(res))
+
+        if res==None:
+            success=False
+
+        logging.debug('Waiting for borg subprocess to terminate in result thread')
+
+        self.borg_instance.wait()
+
+        logging.debug('Borg subprocess terminated; terminating result listener thread')
 
         with self.lock:
             if self.current_operation=='create':
                 self.lastrun=self.time_started
                 self.lastrun_success=success
+            self.thread_res=None
+            self.__cleanup_if_both_listeners_terminated()
+
+    def __cleanup_if_both_listeners_terminated(self):
+        if self.thread_res==None and self.thread_log==None:
+            logging.debug('Both threads terminated')
             self.borg_instance=None
-            self.thread=None
             self.current_operation=None
             self.time_started=None
 
@@ -135,16 +177,21 @@
         inst=BorgInstance(operation, archive_or_repository, *args)
         inst.launch()
 
-        t=Thread(target=self.__listener)
-        t.daemon=True
+        t_log=Thread(target=self.__log_listener)
+        t_log.daemon=True
 
-        self.thread=t
+        t_res=Thread(target=self.__result_listener)
+        t_res.daemon=True
+
+        self.thread_log=t_log
+        self.thread_res=t_res
         self.borg_instance=inst
         self.queue=queue
         self.current_operation=operation
         self.time_started=time.monotonic()
 
-        t.start()
+        t_log.start()
+        t_res.start()
 
     def create(self, queue):
         self.__block_when_running()
@@ -169,14 +216,33 @@
         with self.lock:
             if self.borg_instance:
                 self.borg_instance.terminate()
-            if self.thread:
-                self.thread.terminate()
+            thread_log=self.thread_log
+            thread_res=self.thread_res
+
+        if thread_log:
+            thread_log.terminate()
+
+        if thread_res:
+            thread_res.terminate()
+
 
     def join(self):
-        if self.thread:
-            self.thread.join()
+        logging.debug('Waiting for borg listener thread to terminate')
+
+        with self.lock:
+            thread_log=self.thread_log
+            thread_res=self.thread_res
+
+        if thread_log:
+            thread_log.join()
+
+        if thread_res:
+            thread_res.join()
+
+        assert(self.thread_log==None and self.thread_res==None)
 
     def next_action():
+        __block_when_running()
         # TODO pruning as well
         now=time.monotonic()
         if not self.lastrun:

mercurial