Fri, 26 Jan 2018 19:04:04 +0000
Separated repository configuration form backup configuration;
gave passphrase management to Repository object;
various fixes.
# # Borgend Backup instance # import config import logging import time import borgend import repository from enum import IntEnum from instance import BorgInstance from threading import Thread, Lock, Condition from scheduler import TerminableThread logger=borgend.logger.getChild(__name__) JOIN_TIMEOUT=60 # # State and operation related helper classes # class State(IntEnum): # State INACTIVE=0 SCHEDULED=1 QUEUED=2 ACTIVE=3 class Errors(IntEnum): OK=0 BUSY=1 OFFLINE=2 ERRORS=3 def combine(self, other): return max(self, other) def ok(self): return self==self.OK def __str__(self): return _errorstring[self] _errorstring={ Errors.OK: 'ok', Errors.BUSY: 'busy', Errors.OFFLINE: 'offline', Errors.ERRORS: 'errors' } class Operation: CREATE='create' PRUNE='prune' def __init__(self, operation, when_monotonic, **kwargs): self.operation=operation self.when_monotonic=when_monotonic self.detail=kwargs def when(self): return self.when_monotonic-time.monotonic()+time.time() class Status(Operation): def __init__(self, backup, op=None): if op: super().__init__(op.operation, op.when_monotonic, **op.detail) else: super().__init__(None, None) self.name=backup.name self.state=backup.state self.errors=backup.errors # # Miscellaneous helper routines # loglevel_translation={ 'CRITICAL': logging.CRITICAL, 'ERROR': logging.ERROR, 'WARNING': logging.WARNING, 'DEBUG': logging.DEBUG, 'INFO': logging.INFO } def translate_loglevel(x): if x in loglevel_translation: return loglevel_translation[x] else: return logging.ERROR def safe_get_int(t, x): if x in t: tmp=t[x] if isinstance(tmp, int): return tmp return None # # The Backup class # class Backup(TerminableThread): def __decode_config(self, cfg): loc0='Backup %d' % self.identifier self.backup_name=config.check_string(cfg, 'name', 'Name', loc0) logger.debug("Configuring backup '%s'" % self.backup_name) self.logger=logger.getChild(self.backup_name) loc="Backup '%s'" % self.backup_name reponame=config.check_string(cfg, 'repository', 'Target repository', loc) self.repository=repository.find_repository(reponame) if not self.repository: raise Exception("Repository '%s' not configured" % reponame) self.archive_prefix=config.check_string(cfg, 'archive_prefix', 'Archive prefix', loc) self.archive_template=config.check_string(cfg, 'archive_template', 'Archive template', loc) self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', 'Backup interval', loc, config.defaults['backup_interval']) self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', 'Retry interval', loc, config.defaults['retry_interval']) self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) self.borg_parameters=config.BorgParameters.from_config(cfg, loc) def __init__(self, identifier, cfg, scheduler): self.identifier=identifier self.__status_update_callback=None self.scheduler=scheduler self.logger=None # setup up in __decode_config once backup name is known self.borg_instance=None self.thread_log=None self.thread_res=None self.current_operation=None self.scheduled_operation=None self.lastrun_when=None self.lastrun_finished=None self.state=State.INACTIVE self.errors=Errors.OK self.__decode_config(cfg) super().__init__(target = self.__main_thread, name = self.backup_name) self.daemon=True def is_running(self): with self._cond: running=self.__is_running_unlocked() return running def __is_running_unlocked(self): running=self.current_operation if not running: # Consistency check assert((not self.borg_instance and not self.thread_log and not self.thread_res)) return running def __block_when_running(self): running=self.is_running() assert(not running) def __log_listener(self): self.logger.debug('Log listener thread waiting for entries') success=True for msg in iter(self.borg_instance.read_log, None): self.logger.info(str(msg)) t=msg['type'] errormsg=None callback=None if t=='progress_percent': current=safe_get_int(msg, 'current') total=safe_get_int(msg, 'total') if current is not None and total is not None: with self._cond: self.current_operation.detail['progress_current']=current self.current_operation.detail['progress_total']=total status, callback=self.__status_unlocked() elif t=='archive_progress': original_size=safe_get_int(msg, 'original_size') compressed_size=safe_get_int(msg, 'compressed_size') deduplicated_size=safe_get_int(msg, 'deduplicated_size') if original_size is not None and original_size is not None and deduplicated_size is not None: with self._cond: self.current_operation.detail['original_size']=original_size self.current_operation.detail['compressed_size']=compressed_size self.current_operation.detail['deduplicated_size']=deduplicated_size status, callback=self.__status_unlocked() elif t=='progress_message': pass elif t=='file_status': pass elif t=='log_message': if 'levelname' not in msg: msg['levelname']='ERROR' if 'message' not in msg: msg['message']='UNKNOWN' if 'name' not in msg: msg['name']='borg' lvl=translate_loglevel(msg['levelname']) self.logger.log(lvl, msg['name'] + ': ' + msg['message']) if lvl>=logging.ERROR: errormsg=msg errors=Errors.ERRORS if ('msgid' in msg and (msg['msgid']=='LockTimeout' or # observed in reality msg['msgid']=='LockErrorT' or # in docs msg['msgid']=='LockErrorT')): # in docs errors=Errors.BUSY with self._cond: self.errors=self.errors.combine(errors) status, callback=self.__status_unlocked() else: self.logger.debug('Unrecognised log entry %s' % str(status)) if callback: callback(status, errors=errormsg) self.logger.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() self.logger.debug('Borg subprocess terminated; terminating log listener thread') def __result_listener(self): self.logger.debug('Result listener thread waiting for result') res=self.borg_instance.read_result() self.logger.debug('Borg result: %s' % str(res)) with self._cond: if res is None and self.errors.ok(): self.logger.error('No result from borg despite no error in log') self.errors=Errors.ERRORS def __do_launch(self, op, archive_or_repository, common_params, op_params, paths=[]): inst=BorgInstance(op.operation, archive_or_repository, common_params, op_params, paths) # Only the Repository object has access to the passphrase self.repository.launch_borg_instance(inst) self.logger.debug('Creating listener threads') t_log=Thread(target=self.__log_listener) t_log.daemon=True t_res=Thread(target=self.__result_listener) t_res.daemon=True self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst self.current_operation=op # Update scheduled time to real starting time to schedule # next run relative to this self.current_operation.when_monotonic=time.monotonic() self.state=State.ACTIVE # Reset error status when starting a new operation self.errors=Errors.OK self.__update_status() t_log.start() t_res.start() def __launch(self, op): self.logger.debug("Launching '%s'" % str(op.operation)) params=(config.borg_parameters +self.repository.borg_parameters +self.borg_parameters) if op.operation==Operation.CREATE: archive="%s::%s%s" % (self.repository.location, self.archive_prefix, self.archive_template) self.__do_launch(op, archive, params.common, params.create, self.paths) elif op.operation==Operation.PRUNE: self.__do_launch(op, self.repository.location, params.common, [{'prefix': self.archive_prefix}] + params.create) else: raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) # This must be called with self._cond held. def __launch_and_wait(self): op=self.scheduled_operation if not op: self.logger.debug("Queued operation aborted") else: self.scheduled_operation=None self.__launch(op) self.__wait_finish() def __wait_finish(self): # Wait for main logger thread to terminate, or for us to be terminated while not self.terminate and self.thread_res.is_alive(): self._cond.release() self.thread_res.join(JOIN_TIMEOUT) self._cond.acquire() # If terminate has been signalled, let outer termination handler # take care of things (Within this Backup class, it would be cleanest # to raise an exception instead, but in most other places it's better # to just check self._terminate, so we don't complicate things with # an extra exception.) if self._terminate: return self.logger.debug('Waiting for borg and log subprocesses to terminate') self._cond.release() self.thread_log.join() self._cond.acquire() if not self.borg_instance.wait(): self.logger.error('Borg subprocess did not terminate') self.errors=self.errors.combine(Errors.ERRORS) if self.current_operation.operation=='create': self.lastrun_when=self.current_operation.when_monotonic self.lastrun_finished=time.monotonic() self.thread_res=None self.thread_log=None self.borg_instance=None self.current_operation=None self.state=State.INACTIVE self.__update_status() def __main_thread(self): with self._cond: try: while not self._terminate: assert(not self.current_operation) self.__main_thread_wait_schedule() if not self._terminate: self.__main_thread_queue_and_launch() except Exception as err: self.logger.exception("Error with backup '%s'" % self.backup_name) self.errors=Errors.ERRORS self.state=State.INACTIVE self.scheduled_operation=None # Clean up to terminate: kill borg instance and communication threads if self.borg_instance: self.logger.debug("Terminating a borg instance") self.borg_instance.terminate() # Store threads to use outside lock thread_log=self.thread_log thread_res=self.thread_res self.thread_log=None self.thread_res=None self.logger.debug("Waiting for log and result threads to terminate") if thread_log: thread_log.join() if thread_res: thread_res.join() # Main thread/2. Schedule next operation if there is no manually # requested one def __main_thread_wait_schedule(self): op=None if not self.scheduled_operation: op=self.__next_operation_unlocked() if op: now=time.monotonic() delay=max(0, op.when_monotonic-now) self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % (str(op.operation), op.detail or 'none', delay)) self.scheduled_operation=op self.state=State.SCHEDULED self.__update_status() # Wait under scheduled wait self.scheduler.wait_until(now+delay, self._cond, self.backup_name) else: # Nothing scheduled - just wait self.logger.info("Waiting for manual scheduling") self.state=State.INACTIVE self.__update_status() self._cond.wait() # Main thread/3. If there is a scheduled operation (it might have been # changed manually from 'op' created in __main_thread_wait_schedule above), # queue it on the repository, and launch the operation once repository # available def __main_thread_queue_and_launch(self): if self.scheduled_operation: self.logger.debug("Queuing") self.state=State.QUEUED self.__update_status() res=self.repository.queue_action(self._cond, action=self.__launch_and_wait, name=self.backup_name) if not res and not self._terminate: self.logger.debug("Queueing aborted") self.scheduled_operation=None self.state=State.INACTIVE self.__update_status() def __next_operation_unlocked(self): # TODO: pruning as well now=time.monotonic() if not self.lastrun_finished: initial_interval=self.retry_interval if initial_interval==0: initial_interval=self.backup_interval if initial_interval==0: return None else: return Operation(Operation.CREATE, now+initial_interval, reason='initial') elif not self.errors.ok(): if self.retry_interval==0: return None else: return Operation(Operation.CREATE, self.lastrun_finished+self.retry_interval, reason='retry') else: if self.backup_interval==0: return None else: return Operation(Operation.CREATE, self.lastrun_when+self.backup_interval) def __status_unlocked(self): callback=self.__status_update_callback if self.current_operation: status=Status(self, self.current_operation) elif self.scheduled_operation: status=Status(self, self.scheduled_operation) else: status=Status(self) return status, callback def __update_status(self): status, callback = self.__status_unlocked() if callback: #self._cond.release() try: callback(status) except Exception: self.logger.exception("Status update error") #finally: # self._cond.acquire() # # Interface functions # def set_status_update_callback(self, callback): with self._cond: self.__status_update_callback=callback def status(self): with self._cond: res=self.__status_unlocked() return res[0] def create(self): op=Operation(Operation.CREATE, time.monotonic(), reason='manual') with self._cond: self.scheduled_operation=op self._cond.notify() def prune(self): op=Operation(Operation.PRUNE, time.monotonic(), reason='manual') with self._cond: self.scheduled_operation=op self._cond.notify() # TODO: Decide exact (manual) abort mechanism. Perhaps two stages def abort(self): with self._cond: if self.borg_instance: self.borg_instance.terminate()