Mon, 22 Jan 2018 21:07:34 +0000
Generalisation of scheduler thread to general queue threads
# # Borgend Backup instance # import config import logging import time import keyring import borgend from instance import BorgInstance from threading import Thread, Lock, Condition from scheduler import TerminableThread logger=borgend.logger.getChild(__name__) # State INACTIVE=0 SCHEDULED=1 ACTIVE=2 BUSY=3 OFFLINE=4 ERRORS=5 def combine_state(state1, state2): return max(state1, state2) loglevel_translation={ 'CRITICAL': logging.CRITICAL, 'ERROR': logging.ERROR, 'WARNING': logging.WARNING, 'DEBUG': logging.DEBUG, 'INFO': logging.INFO } def translate_loglevel(x): if x in loglevel_translation: return loglevel_translation[x] else: return logging.ERROR def safe_get_int(t, x): if x in t: tmp=t[x] if isinstance(tmp, int): return tmp return None class Backup(TerminableThread): def __decode_config(self, cfg): loc0='backup target %d' % self.identifier self._name=config.check_string(cfg, 'name', 'Name', loc0) self.loc='backup target "%s"' % self._name self.repository=config.check_string(cfg, 'repository', 'Target repository', self.loc) self.archive_prefix=config.check_string(cfg, 'archive_prefix', 'Archive prefix', self.loc) self.archive_template=config.check_string(cfg, 'archive_template', 'Archive template', self.loc) self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', 'Backup interval', self.loc, config.defaults['backup_interval']) self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', 'Retry interval', self.loc, config.defaults['retry_interval']) self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', self.loc) self.common_parameters=config.check_list_of_dicts(cfg, 'common_parameters', 'Borg parameters', self.loc, default=[]) self.create_parameters=config.check_list_of_dicts(cfg, 'create_parameters', 'Create parameters', self.loc, default=[]) self.prune_parameters=config.check_list_of_dicts(cfg, 'prune_parameters', 'Prune parameters', self.loc, default=[]) self.__keychain_account=config.check_string(cfg, 'keychain_account', 'Keychain account', self.loc, default='') self.__passphrase=None if config.settings['extract_passphrases_at_startup']: try: self.extract_passphrase() except Exception: pass def extract_passphrase(self): acc=self.__keychain_account if not self.__passphrase: if acc and acc!='': logger.debug('Requesting passphrase') try: pw=keyring.get_password("borg-backup", acc) except Exception as err: logger.error('Failed to retrieve passphrase') raise err else: logger.debug('Received passphrase') self.__passphrase=pw else: self.__passphrase=None return self.__passphrase def __init__(self, identifier, cfg, scheduler): self.identifier=identifier self.config=config self.lastrun_when=None self.borg_instance=None self.current_operation=None self.thread_log=None self.thread_res=None self.scheduled_operation=None self.__status_update_callback=None self.state=INACTIVE self.scheduler=scheduler self.__decode_config(cfg) super().__init__(target = self.__main_thread, name = self._name) self.daemon=True def is_running(self): with self._cond: running=self.__is_running_unlocked() return running def __is_running_unlocked(self): running=self.current_operation if not running: # Consistency check assert((not self.borg_instance and not self.thread_log and not self.thread_res)) return running def __block_when_running(self): running=self.is_running() assert(not running) def __log_listener(self): logger.debug('Log listener thread waiting for entries') success=True for status in iter(self.borg_instance.read_log, None): logger.debug(str(status)) t=status['type'] errors_this_message=None callback=None if t=='progress_percent': current=safe_get_int(status, 'current') total=safe_get_int(status, 'total') if current is not None and total is not None: with self._cond: self.current_operation['progress_current']=current self.current_operation['progress_total']=total status, callback=self.__status_unlocked() elif t=='archive_progress': original_size=safe_get_int(status, 'original_size') compressed_size=safe_get_int(status, 'compressed_size') deduplicated_size=safe_get_int(status, 'deduplicated_size') if original_size is not None and original_size is not None and deduplicated_size is not None: with self._cond: self.current_operation['original_size']=original_size self.current_operation['compressed_size']=compressed_size self.current_operation['deduplicated_size']=deduplicated_size status, callback=self.__status_unlocked() elif t=='progress_message': pass elif t=='file_status': pass elif t=='log_message': if 'levelname' not in status: status['levelname']='ERROR' if 'message' not in status: status['message']='UNKNOWN' if 'name' not in status: status['name']='borg' lvl=translate_loglevel(status['levelname']) logger.log(lvl, status['name'] + ': ' + status['message']) if lvl>=logging.WARNING: errors_this_message=status state=ERRORS if ('msgid' in status and (status['msgid']=='LockTimeout' or # observed in reality status['msgid']=='LockErrorT' or # in docs status['msgid']=='LockErrorT')): # in docs state=BUSY with self._cond: self.state=combine_state(self.state, state) status, callback=self.__status_unlocked() else: logger.debug('Unrecognised log entry %s' % str(status)) if callback: callback(self, status, errors=errors_this_message) logger.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() logger.debug('Borg subprocess terminated; terminating log listener thread') def __result_listener(self): with self._cond: status, callback=self.__status_unlocked() if callback: callback(self, status) logger.debug('Result listener thread waiting for result') res=self.borg_instance.read_result() # Finish processing remaining errors self.thread_log.join() with self._cond: state=self.state # If there were no errors, reset back to INACTIVE state if state==ACTIVE: state=INACTIVE logger.debug('Borg result: %s' % str(res)) if res is None and state==INACTIVE: logger.error('No result from borg despite no error in log') state=ERRORS logger.debug('Waiting for borg subprocess to terminate in result thread') if not self.borg_instance.wait(): logger.critical('Borg subprocess did not terminate') state=combine_state(state, ERRORS) logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) with self._cond: if self.current_operation['operation']=='create': self.lastrun_when=self.current_operation['when_monotonic'] self.thread_res=None self.thread_log=None self.borg_instance=None self.current_operation=None self.state=state self._cond.notify() def __do_launch(self, op, archive_or_repository, *args): passphrase=self.extract_passphrase() inst=BorgInstance(op['operation'], archive_or_repository, *args) inst.launch(passphrase=passphrase) logger.debug('Creating listener threads') t_log=Thread(target=self.__log_listener) t_log.daemon=True t_res=Thread(target=self.__result_listener) t_res.daemon=True self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst self.current_operation=op self.current_operation['when_monotonic']=time.monotonic() self.state=ACTIVE t_log.start() t_res.start() def __launch(self, op): if self.__is_running_unlocked(): logging.info('Cannot start %s: already running %s' % (operation, self.current_operation)) return False else: try: logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) if op['operation']=='create': archive="%s::%s%s" % (self.repository, self.archive_prefix, self.archive_template) self.__do_launch(op, archive, self.common_parameters+self.create_parameters, self.paths) elif op['operation']=='prune': self.__do_launch(op, self.repository, ([{'prefix': self.archive_prefix}] + self.common_parameters + self.prune_parameters)) else: raise NotImplementedError("Invalid operation '%s'" % op['operation']) except Exception as err: logger.debug('Rescheduling after failure') self.lastrun_when=time.monotonic() self.state=ERRORS raise err return True def create(self): op={'operation': 'create', 'detail': 'manual'} with self._cond: self.scheduled_operation=op self._cond.notify() def prune(self): op={'operation': 'prune', 'detail': 'manual'} with self._cond: self.scheduled_operation=op self._cond.notify() # TODO: Decide exact (manual) abort mechanism. Perhaps two stages def abort(self): with self._cond: if self.borg_instance: self.borg_instance.terminate() def __next_operation_unlocked(self): # TODO: pruning as well now=time.monotonic() if not self.lastrun_when: initial_interval=self.retry_interval if initial_interval==0: initial_interval=self.backup_interval if initial_interval==0: return None else: return {'operation': 'create', 'detail': 'initial', 'when_monotonic': now+initial_interval} elif self.state>=BUSY: if self.retry_interval==0: return None else: return {'operation': 'create', 'detail': 'retry', 'when_monotonic': self.lastrun_when+self.retry_interval} else: if self.backup_interval==0: return None else: return {'operation': 'create', 'detail': 'normal', 'when_monotonic': self.lastrun_when+self.backup_interval} def __main_thread(self): with self._cond: while not self._terminate: op=None if not self.current_operation: op=self.__next_operation_unlocked() if not op: self.__update_status() self._cond.wait() else: now=time.monotonic() delay=max(0, op['when_monotonic']-now) logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % (op['operation'], op['detail'], self._name, delay)) self.scheduled_operation=op self.state=combine_state(self.state, SCHEDULED) self.__update_status() self.scheduler.wait_until(now+delay, self._cond, self._name) if self.scheduled_operation: op=self.scheduled_operation self.scheduled_operation=None self.__launch(op) # Kill a running borg to cause log and result threads to terminate if self.borg_instance: logger.debug("Terminating a borg instance") self.borg_instance.terminate() # Store threads to use outside lock thread_log=self.thread_log thread_err=self.thread_err logger.debug("Waiting for log and result threads to terminate") if thread_log: thread_log.join() if thread_res: thread_res.join() def __status_unlocked(self): callback=self.__status_update_callback if self.current_operation: status=self.current_operation status['type']='current' # Errors should be set by listeners else: if self.scheduled_operation: status=self.scheduled_operation status['type']='scheduled' else: status={'type': 'nothing'} status['name']=self._name status['state']=self.state if 'detail' not in status: status['detail']='NONE' if 'when_monotonic' in status: status['when']=(status['when_monotonic'] -time.monotonic()+time.time()) return status, callback def __update_status(self): status, callback = self.__status_unlocked() if callback: self._cond.release() try: callback(self, status) finally: self._cond.acquire() def set_status_update_callback(self, callback): with self._cond: self.__status_update_callback=callback def status(self): with self._cond: res=self.__status_unlocked() return res[0]