diff -r 46c89e5a219f -r e189d4a6cb8c backup.py --- a/backup.py Fri Jan 19 16:53:13 2018 +0000 +++ b/backup.py Sat Jan 20 14:04:51 2018 +0000 @@ -74,30 +74,38 @@ self.lastrun_success=None self.borg_instance=None self.current_operation=None - self.thread=None + self.thread_log=None + self.thread_err=None self.lock=Lock() + def is_running(self): + with self.lock: + running=self.borg_instance or self.thread_log or self.thread_err + return running + def __block_when_running(self): - with self.lock: - not_running=self.borg_instance is None and self.thread is None - assert(not_running) + running=self.is_running() + assert(not running) - def __listener(self): - success=False - for status in iter(self.borg_instance.read, None): - logging.info(str(status)) + def __log_listener(self): + logging.debug('Log listener thread waiting for entries') + success=True + for status in iter(self.borg_instance.read_log, None): + logging.debug(str(status)) t=status['type'] + #may_indicate_finished=False if t=='progress_percent': - pass + #may_indicate_finished=True + # Temporary output + if 'current' not in status: + status['current']=0 + if 'total' not in status: + status['total']=0 + print('%d / %d' % (status['current'], status['total'])) elif t=='archive_progress': pass elif t=='progress_message': - if 'finished' in status: - logging.info('Borg subprocess finished succesfully') - success=status['finished'] - elif t=='progress_percent': - # Temporary output - print('%d / %d', status['current'], status['total']) + #may_indicate_finished=True pass elif t=='file_status': pass @@ -110,23 +118,57 @@ status['name']='borg' logging.log(translate_loglevel(status['levelname']), status['name'] + ': ' + status['message']) + # set success=False? elif t=='exception': - pass + success=False elif t=='unparsed_error': - pass + success=False - logging.info('Waiting for borg subprocess to terminate') + #if (may_indicate_finished and 'finished' in status and + # status['finished']): + # logging.info('Borg subprocess finished succesfully') + # success=status['finished'] + + logging.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() - logging.info('Borg subprocess terminated; terminating listener thread') + logging.debug('Borg subprocess terminated; terminating log listener thread') + + with self.lock: + self.thread_log=None + self.__cleanup_if_both_listeners_terminated() + + + def __result_listener(self): + logging.debug('Result listener thread waiting for result') + + res=self.borg_instance.read_result() + + success=True + + logging.debug('Borg result: %s' % str(res)) + + if res==None: + success=False + + logging.debug('Waiting for borg subprocess to terminate in result thread') + + self.borg_instance.wait() + + logging.debug('Borg subprocess terminated; terminating result listener thread') with self.lock: if self.current_operation=='create': self.lastrun=self.time_started self.lastrun_success=success + self.thread_res=None + self.__cleanup_if_both_listeners_terminated() + + def __cleanup_if_both_listeners_terminated(self): + if self.thread_res==None and self.thread_log==None: + logging.debug('Both threads terminated') self.borg_instance=None - self.thread=None self.current_operation=None self.time_started=None @@ -135,16 +177,21 @@ inst=BorgInstance(operation, archive_or_repository, *args) inst.launch() - t=Thread(target=self.__listener) - t.daemon=True + t_log=Thread(target=self.__log_listener) + t_log.daemon=True - self.thread=t + t_res=Thread(target=self.__result_listener) + t_res.daemon=True + + self.thread_log=t_log + self.thread_res=t_res self.borg_instance=inst self.queue=queue self.current_operation=operation self.time_started=time.monotonic() - t.start() + t_log.start() + t_res.start() def create(self, queue): self.__block_when_running() @@ -169,14 +216,33 @@ with self.lock: if self.borg_instance: self.borg_instance.terminate() - if self.thread: - self.thread.terminate() + thread_log=self.thread_log + thread_res=self.thread_res + + if thread_log: + thread_log.terminate() + + if thread_res: + thread_res.terminate() + def join(self): - if self.thread: - self.thread.join() + logging.debug('Waiting for borg listener thread to terminate') + + with self.lock: + thread_log=self.thread_log + thread_res=self.thread_res + + if thread_log: + thread_log.join() + + if thread_res: + thread_res.join() + + assert(self.thread_log==None and self.thread_res==None) def next_action(): + __block_when_running() # TODO pruning as well now=time.monotonic() if not self.lastrun: