Wed, 24 Jan 2018 00:20:10 +0000
launch error handling fix
# # Borgend Backup instance # import config import logging import time import keyring import borgend import repository from instance import BorgInstance from threading import Thread, Lock, Condition from scheduler import TerminableThread logger=borgend.logger.getChild(__name__) # State INACTIVE=0 SCHEDULED=1 QUEUED=2 ACTIVE=3 BUSY=4 OFFLINE=5 ERRORS=6 def combine_state(state1, state2): return max(state1, state2) loglevel_translation={ 'CRITICAL': logging.CRITICAL, 'ERROR': logging.ERROR, 'WARNING': logging.WARNING, 'DEBUG': logging.DEBUG, 'INFO': logging.INFO } def translate_loglevel(x): if x in loglevel_translation: return loglevel_translation[x] else: return logging.ERROR def safe_get_int(t, x): if x in t: tmp=t[x] if isinstance(tmp, int): return tmp return None class Backup(TerminableThread): def __decode_config(self, cfg): loc0='backup target %d' % self.identifier self._name=config.check_string(cfg, 'name', 'Name', loc0) self.logger=logger.getChild(self._name) self.loc='backup target "%s"' % self._name reponame=config.check_string(cfg, 'repository', 'Target repository', self.loc) self.repository=repository.get_controller(reponame) self.archive_prefix=config.check_string(cfg, 'archive_prefix', 'Archive prefix', self.loc) self.archive_template=config.check_string(cfg, 'archive_template', 'Archive template', self.loc) self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', 'Backup interval', self.loc, config.defaults['backup_interval']) self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', 'Retry interval', self.loc, config.defaults['retry_interval']) self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', self.loc) self.common_parameters=config.check_list_of_dicts(cfg, 'common_parameters', 'Borg parameters', self.loc, default=[]) self.create_parameters=config.check_list_of_dicts(cfg, 'create_parameters', 'Create parameters', self.loc, default=[]) self.prune_parameters=config.check_list_of_dicts(cfg, 'prune_parameters', 'Prune parameters', self.loc, default=[]) self.__keychain_account=config.check_string(cfg, 'keychain_account', 'Keychain account', self.loc, default='') self.__passphrase=None if config.settings['extract_passphrases_at_startup']: try: self.extract_passphrase() except Exception: pass def extract_passphrase(self): acc=self.__keychain_account if not self.__passphrase: if acc and acc!='': self.logger.debug('Requesting passphrase') try: pw=keyring.get_password("borg-backup", acc) except Exception as err: self.logger.error('Failed to retrieve passphrase') raise err else: self.logger.debug('Received passphrase') self.__passphrase=pw else: self.__passphrase=None return self.__passphrase def __init__(self, identifier, cfg, scheduler): self.identifier=identifier self.config=config self.__status_update_callback=None self.scheduler=scheduler self.logger=None # setup up __decode_config once backup name is known self.borg_instance=None self.thread_log=None self.thread_res=None self.current_operation=None self.scheduled_operation=None self.lastrun_when=None self.state=INACTIVE self.__decode_config(cfg) super().__init__(target = self.__main_thread, name = self._name) self.daemon=True def is_running(self): with self._cond: running=self.__is_running_unlocked() return running def __is_running_unlocked(self): running=self.current_operation if not running: # Consistency check assert((not self.borg_instance and not self.thread_log and not self.thread_res)) return running def __block_when_running(self): running=self.is_running() assert(not running) def __log_listener(self): self.logger.debug('Log listener thread waiting for entries') success=True for status in iter(self.borg_instance.read_log, None): self.logger.debug(str(status)) t=status['type'] errors_this_message=None callback=None if t=='progress_percent': current=safe_get_int(status, 'current') total=safe_get_int(status, 'total') if current is not None and total is not None: with self._cond: self.current_operation['progress_current']=current self.current_operation['progress_total']=total status, callback=self.__status_unlocked() elif t=='archive_progress': original_size=safe_get_int(status, 'original_size') compressed_size=safe_get_int(status, 'compressed_size') deduplicated_size=safe_get_int(status, 'deduplicated_size') if original_size is not None and original_size is not None and deduplicated_size is not None: with self._cond: self.current_operation['original_size']=original_size self.current_operation['compressed_size']=compressed_size self.current_operation['deduplicated_size']=deduplicated_size status, callback=self.__status_unlocked() elif t=='progress_message': pass elif t=='file_status': pass elif t=='log_message': if 'levelname' not in status: status['levelname']='ERROR' if 'message' not in status: status['message']='UNKNOWN' if 'name' not in status: status['name']='borg' lvl=translate_loglevel(status['levelname']) self.logger.log(lvl, status['name'] + ': ' + status['message']) if lvl>=logging.WARNING: errors_this_message=status state=ERRORS if ('msgid' in status and (status['msgid']=='LockTimeout' or # observed in reality status['msgid']=='LockErrorT' or # in docs status['msgid']=='LockErrorT')): # in docs state=BUSY with self._cond: self.state=combine_state(self.state, state) status, callback=self.__status_unlocked() else: self.logger.debug('Unrecognised log entry %s' % str(status)) if callback: callback(status, errors=errors_this_message) self.logger.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() self.logger.debug('Borg subprocess terminated; terminating log listener thread') def __result_listener(self): # self.state=ACTIVE # with self._cond: # status, callback=self.__status_unlocked() # if callback: # callback(status) self.logger.debug('Result listener thread waiting for result') res=self.borg_instance.read_result() # Finish processing remaining errors self.thread_log.join() with self._cond: state=self.state # If there were no errors, reset back to INACTIVE state if state==ACTIVE: state=INACTIVE self.logger.debug('Borg result: %s' % str(res)) if res is None and state==INACTIVE: self.logger.error('No result from borg despite no error in log') state=ERRORS self.logger.debug('Waiting for borg subprocess to terminate in result thread') if not self.borg_instance.wait(): self.logger.error('Borg subprocess did not terminate') state=combine_state(state, ERRORS) self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) with self._cond: if self.current_operation['operation']=='create': self.lastrun_when=self.current_operation['when_monotonic'] self.thread_res=None self.thread_log=None self.borg_instance=None self.current_operation=None self.state=state self.__update_status() self._cond.notify() def __do_launch(self, op, archive_or_repository, *args): passphrase=self.extract_passphrase() inst=BorgInstance(op['operation'], archive_or_repository, *args) inst.launch(passphrase=passphrase) self.logger.debug('Creating listener threads') t_log=Thread(target=self.__log_listener) t_log.daemon=True t_res=Thread(target=self.__result_listener) t_res.daemon=True self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst t_log.start() t_res.start() def __launch(self, op): self.logger.debug("Launching '%s'" % op['operation']) if op['operation']=='create': archive="%s::%s%s" % (self.repository.repository_name, self.archive_prefix, self.archive_template) self.__do_launch(op, archive, self.common_parameters+self.create_parameters, self.paths) elif op['operation']=='prune': self.__do_launch(op, self.repository.repository_name, ([{'prefix': self.archive_prefix}] + self.common_parameters + self.prune_parameters)) else: raise NotImplementedError("Invalid operation '%s'" % op['operation']) def __launch_check(self): op=self.scheduled_operation if not op: self.logger.debug("Queued operation aborted") else: self.scheduled_operation=None self.__launch(op) self.current_operation=op self.current_operation['when_monotonic']=time.monotonic() self.state=ACTIVE self.__update_status() def __main_thread(self): with self._cond: try: while not self._terminate: self.__main_thread_wait_finish() if not self._terminate: self.__main_thread_wait_schedule() if not self._terminate: self.__main_thread_queue_and_launch() except Exception as err: self.logger.exception("Error with backup '%s'" % self._name) self.lastrun_when=time.monotonic() self.state=ERRORS self.scheduled_operation=None # Clean up to terminate: kill borg instance and communication threads if self.borg_instance: self.logger.debug("Terminating a borg instance") self.borg_instance.terminate() # Store threads to use outside lock thread_log=self.thread_log thread_res=self.thread_res self.logger.debug("Waiting for log and result threads to terminate") if thread_log: thread_log.join() if thread_res: thread_res.join() # Main thread/1. Wait while a current operation is running def __main_thread_wait_finish(self): while self.current_operation and not self._terminate: self.logger.debug("Waiting for current operation to finish") self._cond.wait() # Main thread/2. Schedule next operation if there is no manually # requested one def __main_thread_wait_schedule(self): op=None if not self.scheduled_operation: op=self.__next_operation_unlocked() if op: now=time.monotonic() delay=max(0, op['when_monotonic']-now) self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % (op['operation'], op['detail'], delay)) self.scheduled_operation=op self.state=combine_state(self.state, SCHEDULED) self.__update_status() # Wait under scheduled wait self.scheduler.wait_until(now+delay, self._cond, self._name) else: # Nothing scheduled - just wait self.logger.debug("Waiting for manual scheduling") self._cond.wait() # Main thread/3. If there is a scheduled operation (it might have been # changed manually from 'op' created in __main_thread_wait_schedule above), # queue it on the repository, and launch the operation once repository # available def __main_thread_queue_and_launch(self): if self.scheduled_operation: self.logger.debug("Queuing") self.state=combine_state(self.state, QUEUED) self.__update_status() self.repository.queue_action(self._cond, action=self.__launch_check, name=self._name) def __next_operation_unlocked(self): # TODO: pruning as well now=time.monotonic() if not self.lastrun_when: initial_interval=self.retry_interval if initial_interval==0: initial_interval=self.backup_interval if initial_interval==0: return None else: return {'operation': 'create', 'detail': 'initial', 'when_monotonic': now+initial_interval} elif self.state>=BUSY: if self.retry_interval==0: return None else: return {'operation': 'create', 'detail': 'retry', 'when_monotonic': self.lastrun_when+self.retry_interval} else: if self.backup_interval==0: return None else: return {'operation': 'create', 'detail': 'normal', 'when_monotonic': self.lastrun_when+self.backup_interval} def __status_unlocked(self): callback=self.__status_update_callback if self.current_operation: status=self.current_operation status['type']='current' # Errors should be set by listeners else: if self.scheduled_operation: status=self.scheduled_operation if self.state==QUEUED: status['type']='queued' else: status['type']='scheduled' else: status={'type': 'nothing'} status['name']=self._name status['state']=self.state if 'detail' not in status: status['detail']='NONE' if 'when_monotonic' in status: status['when']=(status['when_monotonic'] -time.monotonic()+time.time()) return status, callback def __update_status(self): status, callback = self.__status_unlocked() if callback: #self._cond.release() try: callback(status) except Exception: self.logger.exception("Status update error") #finally: # self._cond.acquire() # # Interface functions # def set_status_update_callback(self, callback): with self._cond: self.__status_update_callback=callback def status(self): with self._cond: res=self.__status_unlocked() return res[0] def create(self): op={'operation': 'create', 'detail': 'manual'} with self._cond: self.scheduled_operation=op self._cond.notify() def prune(self): op={'operation': 'prune', 'detail': 'manual'} with self._cond: self.scheduled_operation=op self._cond.notify() # TODO: Decide exact (manual) abort mechanism. Perhaps two stages def abort(self): with self._cond: if self.borg_instance: self.borg_instance.terminate()