# HG changeset patch # User Tuomo Valkonen # Date 1516457091 0 # Node ID e189d4a6cb8ccb2aca75e54f17e1fada33ed9832 # Parent 46c89e5a219f326759d19bc7015fca8d08a88fc6 Also listen to stdout diff -r 46c89e5a219f -r e189d4a6cb8c backup.py --- a/backup.py Fri Jan 19 16:53:13 2018 +0000 +++ b/backup.py Sat Jan 20 14:04:51 2018 +0000 @@ -74,30 +74,38 @@ self.lastrun_success=None self.borg_instance=None self.current_operation=None - self.thread=None + self.thread_log=None + self.thread_err=None self.lock=Lock() + def is_running(self): + with self.lock: + running=self.borg_instance or self.thread_log or self.thread_err + return running + def __block_when_running(self): - with self.lock: - not_running=self.borg_instance is None and self.thread is None - assert(not_running) + running=self.is_running() + assert(not running) - def __listener(self): - success=False - for status in iter(self.borg_instance.read, None): - logging.info(str(status)) + def __log_listener(self): + logging.debug('Log listener thread waiting for entries') + success=True + for status in iter(self.borg_instance.read_log, None): + logging.debug(str(status)) t=status['type'] + #may_indicate_finished=False if t=='progress_percent': - pass + #may_indicate_finished=True + # Temporary output + if 'current' not in status: + status['current']=0 + if 'total' not in status: + status['total']=0 + print('%d / %d' % (status['current'], status['total'])) elif t=='archive_progress': pass elif t=='progress_message': - if 'finished' in status: - logging.info('Borg subprocess finished succesfully') - success=status['finished'] - elif t=='progress_percent': - # Temporary output - print('%d / %d', status['current'], status['total']) + #may_indicate_finished=True pass elif t=='file_status': pass @@ -110,23 +118,57 @@ status['name']='borg' logging.log(translate_loglevel(status['levelname']), status['name'] + ': ' + status['message']) + # set success=False? elif t=='exception': - pass + success=False elif t=='unparsed_error': - pass + success=False - logging.info('Waiting for borg subprocess to terminate') + #if (may_indicate_finished and 'finished' in status and + # status['finished']): + # logging.info('Borg subprocess finished succesfully') + # success=status['finished'] + + logging.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() - logging.info('Borg subprocess terminated; terminating listener thread') + logging.debug('Borg subprocess terminated; terminating log listener thread') + + with self.lock: + self.thread_log=None + self.__cleanup_if_both_listeners_terminated() + + + def __result_listener(self): + logging.debug('Result listener thread waiting for result') + + res=self.borg_instance.read_result() + + success=True + + logging.debug('Borg result: %s' % str(res)) + + if res==None: + success=False + + logging.debug('Waiting for borg subprocess to terminate in result thread') + + self.borg_instance.wait() + + logging.debug('Borg subprocess terminated; terminating result listener thread') with self.lock: if self.current_operation=='create': self.lastrun=self.time_started self.lastrun_success=success + self.thread_res=None + self.__cleanup_if_both_listeners_terminated() + + def __cleanup_if_both_listeners_terminated(self): + if self.thread_res==None and self.thread_log==None: + logging.debug('Both threads terminated') self.borg_instance=None - self.thread=None self.current_operation=None self.time_started=None @@ -135,16 +177,21 @@ inst=BorgInstance(operation, archive_or_repository, *args) inst.launch() - t=Thread(target=self.__listener) - t.daemon=True + t_log=Thread(target=self.__log_listener) + t_log.daemon=True - self.thread=t + t_res=Thread(target=self.__result_listener) + t_res.daemon=True + + self.thread_log=t_log + self.thread_res=t_res self.borg_instance=inst self.queue=queue self.current_operation=operation self.time_started=time.monotonic() - t.start() + t_log.start() + t_res.start() def create(self, queue): self.__block_when_running() @@ -169,14 +216,33 @@ with self.lock: if self.borg_instance: self.borg_instance.terminate() - if self.thread: - self.thread.terminate() + thread_log=self.thread_log + thread_res=self.thread_res + + if thread_log: + thread_log.terminate() + + if thread_res: + thread_res.terminate() + def join(self): - if self.thread: - self.thread.join() + logging.debug('Waiting for borg listener thread to terminate') + + with self.lock: + thread_log=self.thread_log + thread_res=self.thread_res + + if thread_log: + thread_log.join() + + if thread_res: + thread_res.join() + + assert(self.thread_log==None and self.thread_res==None) def next_action(): + __block_when_running() # TODO pruning as well now=time.monotonic() if not self.lastrun: diff -r 46c89e5a219f -r e189d4a6cb8c instance.py --- a/instance.py Fri Jan 19 16:53:13 2018 +0000 +++ b/instance.py Sat Jan 20 14:04:51 2018 +0000 @@ -9,6 +9,12 @@ necessary_opts=['--log-json', '--progress'] +necessary_opts_for={ + 'create': ['--json'], + 'info': ['--json'], + 'list': ['--json'], +} + class BorgInstance: def __init__(self, operation, archive_or_repository, args, argsl): self.operation=operation; @@ -24,27 +30,43 @@ if tmp1 in settings['borg']: cmd=cmd+arglistify(settings['borg'][tmp1]) + if self.operation in necessary_opts_for: + cmd=cmd+necessary_opts_for[self.operation] + return cmd+arglistify(self.args)+[self.archive_or_repository]+self.argsl def launch(self): - # Borg prints json to stderr, so pipe it cmd=self.construct_cmdline() logging.info('Launching ' + str(cmd)) - self.proc=Popen(cmd, stderr=PIPE) + self.proc=Popen(cmd, stdout=PIPE, stderr=PIPE) - def read(self): + def read_result(self): + stream=self.proc.stdout + line=stream.read(-1) + if line==b'': + logging.debug('Borg stdout pipe EOF?') + return None + try: - line=self.proc.stderr.readline() + return json.loads(line) + except: + logging.warning('JSON parse failed on: "%s"' % line) + return None + + def read_log(self): + stream=self.proc.stderr + try: + line=stream.readline() except err: - logging.info('Pipe read failed: %s' % str(err)) + logging.debug('Pipe read failed: %s' % str(err)) return {'type': 'exception', 'exception': err} if line==b'': - logging.info('Pipe EOF?') + logging.debug('Borg stderr pipe EOF?') return None @@ -54,10 +76,10 @@ res['type']='UNKNOWN' return res except: - logging.warning('JSON parse failed on: "%s"' % line) + logging.debug('JSON parse failed on: "%s"' % line) errmsg=line - for line in iter(self.proc.stderr.readline, b''): + for line in iter(stream.readline, b''): errmsg=errmsg+line return {'type': 'unparsed_error', 'message': str(errmsg)} diff -r 46c89e5a219f -r e189d4a6cb8c scheduler.py --- a/scheduler.py Fri Jan 19 16:53:13 2018 +0000 +++ b/scheduler.py Sat Jan 20 14:04:51 2018 +0000 @@ -3,22 +3,23 @@ # from Queue import Queue -from runborg import BorgInstance import sched import ui -def scheduler(sched): - timeout=??? - q=sched.eventqueue - while True: - timeout, timerevent=next_timed_event(sched); - t=Timer(timeout, lambda: q.put(timerevent)); - event=sq.get(True): - # Decide what to do class Scheduler: def __init__(self, backups): self.backups=backups self.eventqueue=Queue() - self.t=Thread(target=scheduler, args=self) + self.t=Thread(target=self.__scheduler) + t.start() + + def __scheduler(sched): + timeout=??? + q=sched.eventqueue + while True: + timeout, timerevent=next_timed_event(sched); + t=Timer(timeout, lambda: q.put(timerevent)); + event=sq.get(True): + # Decide what to do