# HG changeset patch # User Tuomo Valkonen # Date 1516376505 0 # Node ID d72c4844e791adb714ba4f486e1b3ad664dfcec6 # Parent 4cad934aa9ce38269982308c812d955b89853cd9 Better borg output processing and some logging diff -r 4cad934aa9ce -r d72c4844e791 backup.py --- a/backup.py Fri Jan 19 14:42:27 2018 +0000 +++ b/backup.py Fri Jan 19 15:41:45 2018 +0000 @@ -3,9 +3,10 @@ # import config +import logging from instance import BorgInstance from queue import Queue -from threading import Thread +from threading import Thread, Lock class Backup: @@ -53,18 +54,42 @@ self.lastrun=None self.borg_instance=None self.thread=None + self.lock=Lock() def __block_when_running(self): - assert(self.borg_instance is None and self.thread is None) + self.lock.acquire() + not_running=self.borg_instance is None and self.thread is None + self.lock.release() + assert(not_running) def __listener(self): for status in iter(self.borg_instance.read, None): + t=status['type'] + if t=='progress_percent': + pass + elif t=='archive_progress': + pass + elif t=='progress_message': + pass + elif t=='progress_percent': + pass + elif t=='file_status': + pass + elif t=='log_message': + pass + elif t=='exception': + pass + elif t=='unparsed_error': + pass # What to do? print(status) - # What to do on error - # queue.put({'identifier': instance.identifier, - # 'operation': instance.operation, - # 'status': status}) + + logging.info('Borg subprocess finished; terminating listener thread') + + self.lock.acquire() + self.borg_instance=None + self.thread=None + self.lock.release() def __launch(self, queue, operation, archive_or_repository, *args): @@ -95,6 +120,15 @@ self.__launch(queue, 'prune', self.repository, [{'prefix': self.archive_prefix}] + self.prune_parameters) + # TODO: Decide exact (manual) abort mechanism. Perhaps two stages + def abort(self): + self.lock.acquire() + if self.borg_instance: + self.borg_instance.terminate() + if self.thread: + self.thread.terminate() + self.lock.release() + def join(self): if self.thread: self.thread.join() diff -r 4cad934aa9ce -r d72c4844e791 borgend.py --- a/borgend.py Fri Jan 19 14:42:27 2018 +0000 +++ b/borgend.py Fri Jan 19 15:41:45 2018 +0000 @@ -1,10 +1,15 @@ #!/usr/local/bin/python3 +import logging + +logging.basicConfig(#filename='example.log', + format='%(levelname)s:%(message)s', + level=logging.DEBUG) + from backup import Backup from config import settings from queue import Queue import ui -#import scheduler if __name__ == "__main__": #print(settings) @@ -15,6 +20,7 @@ backups=[None]*len(backupconfigs); for i in range(len(backupconfigs)): + logging.info('Setting up backup set %d' % i) backups[i]=Backup(i, backupconfigs[i]) queue=Queue() diff -r 4cad934aa9ce -r d72c4844e791 config.py --- a/config.py Fri Jan 19 14:42:27 2018 +0000 +++ b/config.py Fri Jan 19 15:41:45 2018 +0000 @@ -7,6 +7,7 @@ import os import xdg import string +import logging from functools import reduce # @@ -112,6 +113,8 @@ cfgfile=os.path.join(xdg.XDG_CONFIG_HOME, "borgend", "config.yaml") +logging.info("Reading configuration file %s" % cfgfile) + if not (os.path.exists(cfgfile) and os.path.isfile(cfgfile)): raise SystemExit(f'Configuration file required: {cfgfile}') diff -r 4cad934aa9ce -r d72c4844e791 instance.py --- a/instance.py Fri Jan 19 14:42:27 2018 +0000 +++ b/instance.py Fri Jan 19 15:41:45 2018 +0000 @@ -3,11 +3,13 @@ # import json +import logging from subprocess import Popen, PIPE from config import settings, arglistify +necessary_opts=['--log-json', '--progress'] + class BorgInstance: - def __init__(self, operation, archive_or_repository, args, argsl): self.operation=operation; self.args=args; @@ -15,48 +17,54 @@ self.argsl=argsl; def construct_cmdline(self): - cmd=([settings['borg']['executable'], '--log-json']+ + cmd=([settings['borg']['executable']]+necessary_opts+ arglistify(settings['borg']['common_parameters'])+ [self.operation]) tmp1=self.operation+'_parameters' if tmp1 in settings['borg']: cmd=cmd+arglistify(settings['borg'][tmp1]) - cmd=cmd+arglistify(self.args)+[self.archive_or_repository]+self.argsl - print(cmd) - return cmd + + return cmd+arglistify(self.args)+[self.archive_or_repository]+self.argsl def launch(self): - # What to do with stderr? Is it needed? - self.proc=Popen(self.construct_cmdline(), stdout=PIPE, stderr=PIPE) + # Borg prints json to stderr, so pipe it + cmd=self.construct_cmdline() + + logging.info('Launching ' + str(cmd)) + + self.proc=Popen(cmd, stderr=PIPE) def read(self): - line=self.proc.stdout.readline() - if line==b'': + try: line=self.proc.stderr.readline() - if line==b'': - return None - print('EEE'+str(line)) - return 'error' - else: - print('###' + str(line)) - # # What to do on error? stderr? - status=json.loads(line) - return status + except err: + logging.info('Pipe read failed: %s' % str(err)) + + return {'type': 'exception', 'exception': err} + + if line==b'': + + logging.info('Pipe EOF?') + + return None - # for line in iter(stream.readline, b''): - # status=json.loads(line) - # queue.put({'identifier': instance.identifier, - # 'operation': instance.operation, - # 'status': status}) - # out.close() + try: + return json.loads(line) + except: + logging.warning('JSON parse failed on: "%s"' % line) + + errmsg=line + for line in iter(self.proc.stderr.readline, b''): + errmsg=errmsg+line - # def read_output(): - # try: - # obj=self.queue.get_nowait() - # except Empty: - # obj=Empty - # return obj + return {'type': 'unparsed_error', 'message': str(errmsg)} + + def terminate(self): + self.proc.terminate() - + def wait(self): + return self.proc.wait() is not None + def has_terminated(self): + return self.proc.poll() is not None