Also listen to stdout

Sat, 20 Jan 2018 14:04:51 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Sat, 20 Jan 2018 14:04:51 +0000
changeset 7
e189d4a6cb8c
parent 6
46c89e5a219f
child 8
7b2d2eac6a48

Also listen to stdout

backup.py file | annotate | diff | comparison | revisions
instance.py file | annotate | diff | comparison | revisions
scheduler.py file | annotate | diff | comparison | revisions
--- a/backup.py	Fri Jan 19 16:53:13 2018 +0000
+++ b/backup.py	Sat Jan 20 14:04:51 2018 +0000
@@ -74,30 +74,38 @@
         self.lastrun_success=None
         self.borg_instance=None
         self.current_operation=None
-        self.thread=None
+        self.thread_log=None
+        self.thread_err=None
         self.lock=Lock()
 
+    def is_running(self):
+        with self.lock:
+            running=self.borg_instance or self.thread_log or self.thread_err
+        return running
+
     def __block_when_running(self):
-        with self.lock:
-            not_running=self.borg_instance is None and self.thread is None
-        assert(not_running)
+        running=self.is_running()
+        assert(not running)
 
-    def __listener(self):
-        success=False
-        for status in iter(self.borg_instance.read, None):
-            logging.info(str(status))
+    def __log_listener(self):
+        logging.debug('Log listener thread waiting for entries')
+        success=True
+        for status in iter(self.borg_instance.read_log, None):
+            logging.debug(str(status))
             t=status['type']
+            #may_indicate_finished=False
             if t=='progress_percent':
-                pass
+                #may_indicate_finished=True
+                # Temporary output
+                if 'current' not in status:
+                    status['current']=0
+                if 'total' not in status:
+                    status['total']=0
+                print('%d / %d' % (status['current'], status['total']))
             elif t=='archive_progress':
                 pass
             elif t=='progress_message':
-                if 'finished' in status:
-                    logging.info('Borg subprocess finished succesfully')
-                    success=status['finished']
-            elif t=='progress_percent':
-                # Temporary output
-                print('%d / %d', status['current'], status['total'])
+                #may_indicate_finished=True
                 pass
             elif t=='file_status':
                 pass
@@ -110,23 +118,57 @@
                     status['name']='borg'
                 logging.log(translate_loglevel(status['levelname']),
                             status['name'] + ': ' + status['message'])
+                # set success=False?
             elif t=='exception':
-                pass
+                success=False
             elif t=='unparsed_error':
-                pass
+                success=False
 
-        logging.info('Waiting for borg subprocess to terminate')
+            #if (may_indicate_finished and 'finished' in status and
+            #    status['finished']):
+            #    logging.info('Borg subprocess finished succesfully')
+            #    success=status['finished']
+
+        logging.debug('Waiting for borg subprocess to terminate in log thread')
 
         self.borg_instance.wait()
 
-        logging.info('Borg subprocess terminated; terminating listener thread')
+        logging.debug('Borg subprocess terminated; terminating log listener thread')
+
+        with self.lock:
+            self.thread_log=None
+            self.__cleanup_if_both_listeners_terminated()
+
+
+    def __result_listener(self):
+        logging.debug('Result listener thread waiting for result')
+
+        res=self.borg_instance.read_result()
+
+        success=True
+
+        logging.debug('Borg result: %s' % str(res))
+
+        if res==None:
+            success=False
+
+        logging.debug('Waiting for borg subprocess to terminate in result thread')
+
+        self.borg_instance.wait()
+
+        logging.debug('Borg subprocess terminated; terminating result listener thread')
 
         with self.lock:
             if self.current_operation=='create':
                 self.lastrun=self.time_started
                 self.lastrun_success=success
+            self.thread_res=None
+            self.__cleanup_if_both_listeners_terminated()
+
+    def __cleanup_if_both_listeners_terminated(self):
+        if self.thread_res==None and self.thread_log==None:
+            logging.debug('Both threads terminated')
             self.borg_instance=None
-            self.thread=None
             self.current_operation=None
             self.time_started=None
 
@@ -135,16 +177,21 @@
         inst=BorgInstance(operation, archive_or_repository, *args)
         inst.launch()
 
-        t=Thread(target=self.__listener)
-        t.daemon=True
+        t_log=Thread(target=self.__log_listener)
+        t_log.daemon=True
 
-        self.thread=t
+        t_res=Thread(target=self.__result_listener)
+        t_res.daemon=True
+
+        self.thread_log=t_log
+        self.thread_res=t_res
         self.borg_instance=inst
         self.queue=queue
         self.current_operation=operation
         self.time_started=time.monotonic()
 
-        t.start()
+        t_log.start()
+        t_res.start()
 
     def create(self, queue):
         self.__block_when_running()
@@ -169,14 +216,33 @@
         with self.lock:
             if self.borg_instance:
                 self.borg_instance.terminate()
-            if self.thread:
-                self.thread.terminate()
+            thread_log=self.thread_log
+            thread_res=self.thread_res
+
+        if thread_log:
+            thread_log.terminate()
+
+        if thread_res:
+            thread_res.terminate()
+
 
     def join(self):
-        if self.thread:
-            self.thread.join()
+        logging.debug('Waiting for borg listener thread to terminate')
+
+        with self.lock:
+            thread_log=self.thread_log
+            thread_res=self.thread_res
+
+        if thread_log:
+            thread_log.join()
+
+        if thread_res:
+            thread_res.join()
+
+        assert(self.thread_log==None and self.thread_res==None)
 
     def next_action():
+        __block_when_running()
         # TODO pruning as well
         now=time.monotonic()
         if not self.lastrun:
--- a/instance.py	Fri Jan 19 16:53:13 2018 +0000
+++ b/instance.py	Sat Jan 20 14:04:51 2018 +0000
@@ -9,6 +9,12 @@
 
 necessary_opts=['--log-json', '--progress']
 
+necessary_opts_for={
+    'create': ['--json'],
+    'info': ['--json'],
+    'list': ['--json'],
+}
+
 class BorgInstance:
     def __init__(self, operation, archive_or_repository, args, argsl):
         self.operation=operation;
@@ -24,27 +30,43 @@
         if tmp1 in settings['borg']:
             cmd=cmd+arglistify(settings['borg'][tmp1])
 
+        if self.operation in necessary_opts_for:
+            cmd=cmd+necessary_opts_for[self.operation]
+
         return cmd+arglistify(self.args)+[self.archive_or_repository]+self.argsl
 
     def launch(self):
-        # Borg prints json to stderr, so pipe it
         cmd=self.construct_cmdline()
 
         logging.info('Launching ' + str(cmd))
 
-        self.proc=Popen(cmd, stderr=PIPE)
+        self.proc=Popen(cmd, stdout=PIPE, stderr=PIPE)
 
-    def read(self):
+    def read_result(self):
+        stream=self.proc.stdout
+        line=stream.read(-1)
+        if line==b'':
+            logging.debug('Borg stdout pipe EOF?')
+            return None
+
         try:
-            line=self.proc.stderr.readline()
+            return json.loads(line)
+        except:
+            logging.warning('JSON parse failed on: "%s"' % line)
+            return None
+
+    def read_log(self):
+        stream=self.proc.stderr
+        try:
+            line=stream.readline()
         except err:
-            logging.info('Pipe read failed: %s' % str(err))
+            logging.debug('Pipe read failed: %s' % str(err))
 
             return {'type': 'exception', 'exception': err}
 
         if line==b'':
 
-            logging.info('Pipe EOF?')
+            logging.debug('Borg stderr pipe EOF?')
 
             return None
 
@@ -54,10 +76,10 @@
                 res['type']='UNKNOWN'
             return res
         except:
-            logging.warning('JSON parse failed on: "%s"' % line)
+            logging.debug('JSON parse failed on: "%s"' % line)
 
             errmsg=line
-            for line in iter(self.proc.stderr.readline, b''):
+            for line in iter(stream.readline, b''):
                 errmsg=errmsg+line
 
             return {'type': 'unparsed_error', 'message': str(errmsg)}
--- a/scheduler.py	Fri Jan 19 16:53:13 2018 +0000
+++ b/scheduler.py	Sat Jan 20 14:04:51 2018 +0000
@@ -3,22 +3,23 @@
 #
 
 from Queue import Queue
-from runborg import BorgInstance
 import sched
 import ui
 
-def scheduler(sched):
-    timeout=???
-    q=sched.eventqueue
-    while True:
-        timeout, timerevent=next_timed_event(sched);
-        t=Timer(timeout, lambda: q.put(timerevent));
-        event=sq.get(True):
-        # Decide what to do
 
 class Scheduler:
 
     def __init__(self, backups):
         self.backups=backups
         self.eventqueue=Queue()
-        self.t=Thread(target=scheduler, args=self)
+        self.t=Thread(target=self.__scheduler)
+        t.start()
+
+    def __scheduler(sched):
+        timeout=???
+        q=sched.eventqueue
+        while True:
+            timeout, timerevent=next_timed_event(sched);
+            t=Timer(timeout, lambda: q.put(timerevent));
+            event=sq.get(True):
+            # Decide what to do

mercurial