Wed, 24 Jan 2018 22:04:16 +0000
Turned Status and Operation into classes instead of dictionaries
# # Borgend Backup instance # import config import logging import time import keyring import borgend import repository from enum import IntEnum from instance import BorgInstance from threading import Thread, Lock, Condition from scheduler import TerminableThread logger=borgend.logger.getChild(__name__) # # State and operation related helper classes # class State(IntEnum): # State INACTIVE=0 SCHEDULED=1 QUEUED=2 ACTIVE=3 class Errors(IntEnum): OK=0 BUSY=1 OFFLINE=2 ERRORS=3 def combine(self, other): return max(self, other) def ok(self): return self==self.OK def __str__(self): return _errorstring[self] _errorstring={ Errors.OK: 'ok', Errors.BUSY: 'busy', Errors.OFFLINE: 'offline', Errors.ERRORS: 'errors' } class Operation: CREATE='create' PRUNE='prune' def __init__(self, operation, when_monotonic, **kwargs): self.operation=operation self.when_monotonic=when_monotonic self.detail=kwargs def when(self): return self.when_monotonic-time.monotonic()+time.time() class Status(Operation): def __init__(self, backup, op=None): if op: super().__init__(op.operation, op.when_monotonic, detail=op.detail) else: super().__init__(None, None) self.name=backup.name self.state=backup.state self.errors=backup.errors # # Miscellaneous helper routines # loglevel_translation={ 'CRITICAL': logging.CRITICAL, 'ERROR': logging.ERROR, 'WARNING': logging.WARNING, 'DEBUG': logging.DEBUG, 'INFO': logging.INFO } def translate_loglevel(x): if x in loglevel_translation: return loglevel_translation[x] else: return logging.ERROR def safe_get_int(t, x): if x in t: tmp=t[x] if isinstance(tmp, int): return tmp return None # # The Backup class # class Backup(TerminableThread): def __decode_config(self, cfg): loc0='backup target %d' % self.identifier self._name=config.check_string(cfg, 'name', 'Name', loc0) self.logger=logger.getChild(self._name) self.loc='backup target "%s"' % self._name reponame=config.check_string(cfg, 'repository', 'Target repository', self.loc) self.repository=repository.get_controller(reponame) self.archive_prefix=config.check_string(cfg, 'archive_prefix', 'Archive prefix', self.loc) self.archive_template=config.check_string(cfg, 'archive_template', 'Archive template', self.loc) self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', 'Backup interval', self.loc, config.defaults['backup_interval']) self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', 'Retry interval', self.loc, config.defaults['retry_interval']) self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', self.loc) self.common_parameters=config.check_list_of_dicts(cfg, 'common_parameters', 'Borg parameters', self.loc, default=[]) self.create_parameters=config.check_list_of_dicts(cfg, 'create_parameters', 'Create parameters', self.loc, default=[]) self.prune_parameters=config.check_list_of_dicts(cfg, 'prune_parameters', 'Prune parameters', self.loc, default=[]) self.__keychain_account=config.check_string(cfg, 'keychain_account', 'Keychain account', self.loc, default='') self.__passphrase=None if config.settings['extract_passphrases_at_startup']: try: self.extract_passphrase() except Exception: pass def extract_passphrase(self): acc=self.__keychain_account if not self.__passphrase: if acc and acc!='': self.logger.debug('Requesting passphrase') try: pw=keyring.get_password("borg-backup", acc) except Exception as err: self.logger.error('Failed to retrieve passphrase') raise err else: self.logger.debug('Received passphrase') self.__passphrase=pw else: self.__passphrase=None return self.__passphrase def __init__(self, identifier, cfg, scheduler): self.identifier=identifier self.config=config self.__status_update_callback=None self.scheduler=scheduler self.logger=None # setup up __decode_config once backup name is known self.borg_instance=None self.thread_log=None self.thread_res=None self.current_operation=None self.scheduled_operation=None self.lastrun_when=None self.state=State.INACTIVE self.errors=Errors.OK self.__decode_config(cfg) super().__init__(target = self.__main_thread, name = self._name) self.daemon=True def is_running(self): with self._cond: running=self.__is_running_unlocked() return running def __is_running_unlocked(self): running=self.current_operation if not running: # Consistency check assert((not self.borg_instance and not self.thread_log and not self.thread_res)) return running def __block_when_running(self): running=self.is_running() assert(not running) def __log_listener(self): self.logger.debug('Log listener thread waiting for entries') success=True for msg in iter(self.borg_instance.read_log, None): self.logger.debug(str(msg)) t=msg['type'] errormsg=None callback=None if t=='progress_percent': current=safe_get_int(msg, 'current') total=safe_get_int(msg, 'total') if current is not None and total is not None: with self._cond: self.current_operation.detail['progress_current']=current self.current_operation.detail['progress_total']=total status, callback=self.__status_unlocked() elif t=='archive_progress': original_size=safe_get_int(msg, 'original_size') compressed_size=safe_get_int(msg, 'compressed_size') deduplicated_size=safe_get_int(msg, 'deduplicated_size') if original_size is not None and original_size is not None and deduplicated_size is not None: with self._cond: self.current_operation.detail['original_size']=original_size self.current_operation.detail['compressed_size']=compressed_size self.current_operation.detail['deduplicated_size']=deduplicated_size status, callback=self.__status_unlocked() elif t=='progress_message': pass elif t=='file_status': pass elif t=='log_message': if 'levelname' not in msg: msg['levelname']='ERROR' if 'message' not in msg: msg['message']='UNKNOWN' if 'name' not in msg: msg['name']='borg' lvl=translate_loglevel(msg['levelname']) self.logger.log(lvl, msg['name'] + ': ' + msg['message']) if lvl>=logging.WARNING: errormsg=msg errors=Errors.ERRORS if ('msgid' in msg and (msg['msgid']=='LockTimeout' or # observed in reality msg['msgid']=='LockErrorT' or # in docs msg['msgid']=='LockErrorT')): # in docs errors=Errors.BUSY with self._cond: self.errors=self.errors.combine(errors) status, callback=self.__status_unlocked() else: self.logger.debug('Unrecognised log entry %s' % str(status)) if callback: callback(status, errors=errormsg) self.logger.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() self.logger.debug('Borg subprocess terminated; terminating log listener thread') def __result_listener(self): self.logger.debug('Result listener thread waiting for result') res=self.borg_instance.read_result() # Finish processing remaining errors self.thread_log.join() with self._cond: errors=self.errors self.logger.debug('Borg result: %s' % str(res)) if res is None: self.logger.error('No result from borg despite no error in log') if errors.ok(): errors=Errors.ERRORS self.logger.debug('Waiting for borg subprocess to terminate in result thread') if not self.borg_instance.wait(): self.logger.error('Borg subprocess did not terminate') if errors.ok(): errors=Errors.ERRORS self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors)) with self._cond: if self.current_operation.operation=='create': self.lastrun_when=self.current_operation.when_monotonic self.thread_res=None self.thread_log=None self.borg_instance=None self.current_operation=None self.state=State.INACTIVE self.errors=errors self.__update_status() self._cond.notify() def __do_launch(self, op, archive_or_repository, *args): passphrase=self.extract_passphrase() inst=BorgInstance(op.operation, archive_or_repository, *args) inst.launch(passphrase=passphrase) self.logger.debug('Creating listener threads') t_log=Thread(target=self.__log_listener) t_log.daemon=True t_res=Thread(target=self.__result_listener) t_res.daemon=True self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst t_log.start() t_res.start() def __launch(self, op): self.logger.debug("Launching '%s'" % str(op.operation)) if op.operation==Operation.CREATE: archive="%s::%s%s" % (self.repository.repository_name, self.archive_prefix, self.archive_template) self.__do_launch(op, archive, self.common_parameters+self.create_parameters, self.paths) elif op.operation==Operation.PRUNE: self.__do_launch(op, self.repository.repository_name, ([{'prefix': self.archive_prefix}] + self.common_parameters + self.prune_parameters)) else: raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) # This must be called with self._cond held. def __launch_check(self): op=self.scheduled_operation if not op: self.logger.debug("Queued operation aborted") else: self.scheduled_operation=None self.__launch(op) self.current_operation=op # Update scheduled time to real starting time to schedule # next run relative to this self.current_operation.when_monotonic=time.monotonic() self.state=State.ACTIVE # Reset error status when starting a new operation self.errors=Errors.OK self.__update_status() def __main_thread(self): with self._cond: try: while not self._terminate: self.__main_thread_wait_finish() if not self._terminate: self.__main_thread_wait_schedule() if not self._terminate: self.__main_thread_queue_and_launch() except Exception as err: self.logger.exception("Error with backup '%s'" % self._name) self.errors=Errors.ERRORS self.state=State.INACTIVE self.scheduled_operation=None # Clean up to terminate: kill borg instance and communication threads if self.borg_instance: self.logger.debug("Terminating a borg instance") self.borg_instance.terminate() # Store threads to use outside lock thread_log=self.thread_log thread_res=self.thread_res self.thread_log=None self.thread_res=None self.logger.debug("Waiting for log and result threads to terminate") if thread_log: thread_log.join() if thread_res: thread_res.join() # Main thread/1. Wait while a current operation is running def __main_thread_wait_finish(self): while self.current_operation and not self._terminate: self.logger.debug("Waiting for current operation to finish") self._cond.wait() # Main thread/2. Schedule next operation if there is no manually # requested one def __main_thread_wait_schedule(self): op=None if not self.scheduled_operation: op=self.__next_operation_unlocked() if op: now=time.monotonic() delay=max(0, op.when_monotonic-now) self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % (str(op.operation), op.detail or 'none', delay)) self.scheduled_operation=op self.state=State.SCHEDULED self.__update_status() # Wait under scheduled wait self.scheduler.wait_until(now+delay, self._cond, self._name) else: # Nothing scheduled - just wait self.logger.info("Waiting for manual scheduling") self.state=State.INACTIVE self.__update_status() self._cond.wait() # Main thread/3. If there is a scheduled operation (it might have been # changed manually from 'op' created in __main_thread_wait_schedule above), # queue it on the repository, and launch the operation once repository # available def __main_thread_queue_and_launch(self): if self.scheduled_operation: self.logger.debug("Queuing") self.state=State.QUEUED self.__update_status() self.repository.queue_action(self._cond, action=self.__launch_check, name=self._name) def __next_operation_unlocked(self): # TODO: pruning as well now=time.monotonic() if not self.lastrun_when: initial_interval=self.retry_interval if initial_interval==0: initial_interval=self.backup_interval if initial_interval==0: return None else: return Operation(Operation.CREATE, now+initial_interval, reason='initial') elif not self.errors.ok(): if self.retry_interval==0: return None else: return Operation(Operation.CREATE, self.lastrun_when+self.retry_interval, reason='retry') else: if self.backup_interval==0: return None else: return Operation(Operation.CREATE, self.lastrun_when+self.backup_interval) def __status_unlocked(self): callback=self.__status_update_callback if self.current_operation: status=Status(self, self.current_operation) elif self.scheduled_operation: status=Status(self, self.scheduled_operation) else: status=Status(self) return status, callback def __update_status(self): status, callback = self.__status_unlocked() if callback: #self._cond.release() try: callback(status) except Exception: self.logger.exception("Status update error") #finally: # self._cond.acquire() # # Interface functions # def set_status_update_callback(self, callback): with self._cond: self.__status_update_callback=callback def status(self): with self._cond: res=self.__status_unlocked() return res[0] def create(self): op=Operation(Operation.CREATE, time.monotonic(), reason='manual') with self._cond: self.scheduled_operation=op self._cond.notify() def prune(self): op=Operation(Operation.PRUNE, time.monotonic(), reason='manual') with self._cond: self.scheduled_operation=op self._cond.notify() # TODO: Decide exact (manual) abort mechanism. Perhaps two stages def abort(self): with self._cond: if self.borg_instance: self.borg_instance.terminate()