Better borg output processing and some logging

Fri, 19 Jan 2018 15:41:45 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Fri, 19 Jan 2018 15:41:45 +0000
changeset 4
d72c4844e791
parent 3
4cad934aa9ce
child 5
4c5514b2fa76

Better borg output processing and some logging

backup.py file | annotate | diff | comparison | revisions
borgend.py file | annotate | diff | comparison | revisions
config.py file | annotate | diff | comparison | revisions
instance.py file | annotate | diff | comparison | revisions
--- a/backup.py	Fri Jan 19 14:42:27 2018 +0000
+++ b/backup.py	Fri Jan 19 15:41:45 2018 +0000
@@ -3,9 +3,10 @@
 #
 
 import config
+import logging
 from instance import BorgInstance
 from queue import Queue
-from threading import Thread
+from threading import Thread, Lock
 
 class Backup:
 
@@ -53,18 +54,42 @@
         self.lastrun=None
         self.borg_instance=None
         self.thread=None
+        self.lock=Lock()
 
     def __block_when_running(self):
-        assert(self.borg_instance is None and self.thread is None)
+        self.lock.acquire()
+        not_running=self.borg_instance is None and self.thread is None
+        self.lock.release()
+        assert(not_running)
 
     def __listener(self):
         for status in iter(self.borg_instance.read, None):
+            t=status['type']
+            if t=='progress_percent':
+                pass
+            elif t=='archive_progress':
+                pass
+            elif t=='progress_message':
+                pass
+            elif t=='progress_percent':
+                pass
+            elif t=='file_status':
+                pass
+            elif t=='log_message':
+                pass
+            elif t=='exception':
+                pass
+            elif t=='unparsed_error':
+                pass
             # What to do?
             print(status)
-        # What to do on error
-            #     queue.put({'identifier': instance.identifier,
-            #                'operation': instance.operation,
-            #                'status': status})
+
+        logging.info('Borg subprocess finished; terminating listener thread')
+
+        self.lock.acquire()
+        self.borg_instance=None
+        self.thread=None
+        self.lock.release()
 
     def __launch(self, queue, operation, archive_or_repository, *args):
 
@@ -95,6 +120,15 @@
         self.__launch(queue, 'prune', self.repository,
                       [{'prefix': self.archive_prefix}] + self.prune_parameters)
 
+    # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
+    def abort(self):
+        self.lock.acquire()
+        if self.borg_instance:
+            self.borg_instance.terminate()
+        if self.thread:
+            self.thread.terminate()
+        self.lock.release()
+
     def join(self):
         if self.thread:
             self.thread.join()
--- a/borgend.py	Fri Jan 19 14:42:27 2018 +0000
+++ b/borgend.py	Fri Jan 19 15:41:45 2018 +0000
@@ -1,10 +1,15 @@
 #!/usr/local/bin/python3
 
+import logging
+
+logging.basicConfig(#filename='example.log',
+                    format='%(levelname)s:%(message)s',
+                    level=logging.DEBUG)
+
 from backup import Backup
 from config import settings
 from queue import Queue
 import ui
-#import scheduler
 
 if __name__ == "__main__":
     #print(settings)
@@ -15,6 +20,7 @@
 backups=[None]*len(backupconfigs);
 
 for i in range(len(backupconfigs)):
+    logging.info('Setting up backup set %d' % i)
     backups[i]=Backup(i, backupconfigs[i])
 
 queue=Queue()
--- a/config.py	Fri Jan 19 14:42:27 2018 +0000
+++ b/config.py	Fri Jan 19 15:41:45 2018 +0000
@@ -7,6 +7,7 @@
 import os
 import xdg
 import string
+import logging
 from functools import reduce
 
 #
@@ -112,6 +113,8 @@
 
 cfgfile=os.path.join(xdg.XDG_CONFIG_HOME, "borgend", "config.yaml")
 
+logging.info("Reading configuration file %s" % cfgfile)
+
 if not (os.path.exists(cfgfile) and os.path.isfile(cfgfile)):
     raise SystemExit(f'Configuration file required: {cfgfile}')
 
--- a/instance.py	Fri Jan 19 14:42:27 2018 +0000
+++ b/instance.py	Fri Jan 19 15:41:45 2018 +0000
@@ -3,11 +3,13 @@
 #
 
 import json
+import logging
 from subprocess import Popen, PIPE
 from config import settings, arglistify
 
+necessary_opts=['--log-json', '--progress']
+
 class BorgInstance:
-
     def __init__(self, operation, archive_or_repository, args, argsl):
         self.operation=operation;
         self.args=args;
@@ -15,48 +17,54 @@
         self.argsl=argsl;
 
     def construct_cmdline(self):
-        cmd=([settings['borg']['executable'], '--log-json']+
+        cmd=([settings['borg']['executable']]+necessary_opts+
              arglistify(settings['borg']['common_parameters'])+
              [self.operation])
         tmp1=self.operation+'_parameters'
         if tmp1 in settings['borg']:
             cmd=cmd+arglistify(settings['borg'][tmp1])
-        cmd=cmd+arglistify(self.args)+[self.archive_or_repository]+self.argsl
-        print(cmd)
-        return cmd
+
+        return cmd+arglistify(self.args)+[self.archive_or_repository]+self.argsl
 
     def launch(self):
-        # What to do with stderr? Is it needed?
-        self.proc=Popen(self.construct_cmdline(), stdout=PIPE, stderr=PIPE)
+        # Borg prints json to stderr, so pipe it
+        cmd=self.construct_cmdline()
+
+        logging.info('Launching ' + str(cmd))
+
+        self.proc=Popen(cmd, stderr=PIPE)
 
     def read(self):
-        line=self.proc.stdout.readline()
-        if line==b'':
+        try:
             line=self.proc.stderr.readline()
-            if line==b'':
-                return None
-            print('EEE'+str(line))
-            return 'error'
-        else:
-            print('###' + str(line))
-            # # What to do on error? stderr?
-            status=json.loads(line)
-            return status
+        except err:
+            logging.info('Pipe read failed: %s' % str(err))
+
+            return {'type': 'exception', 'exception': err}
+
+        if line==b'':
+
+            logging.info('Pipe EOF?')
+
+            return None
 
-    # for line in iter(stream.readline, b''):
-    #     status=json.loads(line)
-    #     queue.put({'identifier': instance.identifier,
-    #                'operation': instance.operation,
-    #                'status': status})
-    # out.close()
+        try:
+            return json.loads(line)
+        except:
+            logging.warning('JSON parse failed on: "%s"' % line)
+
+            errmsg=line
+            for line in iter(self.proc.stderr.readline, b''):
+                errmsg=errmsg+line
 
-    # def read_output():
-    #     try:
-    #         obj=self.queue.get_nowait()
-    #     except Empty:
-    #         obj=Empty
-    #     return obj
+            return {'type': 'unparsed_error', 'message': str(errmsg)}
+
+    def terminate(self):
+        self.proc.terminate()
 
-
+    def wait(self):
+        return self.proc.wait() is not None
 
+    def has_terminated(self):
+        return self.proc.poll() is not None
 

mercurial