--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/backup.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,538 @@ +# +# Borgend Backup instance +# + +import logging +import time +from enum import IntEnum +from threading import Thread, Lock, Condition + +from . import config +from . import loggers +from . import repository +from . import dreamtime +from .instance import BorgInstance +from .scheduler import TerminableThread + +logger=loggers.get(__name__) + +JOIN_TIMEOUT=60 + +# +# State and operation related helper classes +# + +class State(IntEnum): + # State + INACTIVE=0 + SCHEDULED=1 + QUEUED=2 + ACTIVE=3 + + +class Errors(IntEnum): + OK=0 + BUSY=1 + OFFLINE=2 + ERRORS=3 + + def combine(self, other): + return max(self, other) + + def ok(self): + return self==self.OK + + def __str__(self): + return _errorstring[self] + +_errorstring={ + Errors.OK: 'ok', + Errors.BUSY: 'busy', + Errors.OFFLINE: 'offline', + Errors.ERRORS: 'errors' +} + +class Operation: + CREATE='create' + PRUNE='prune' + def __init__(self, operation, time, **kwargs): + self.operation=operation + self.time=time + self.detail=kwargs + + def when(self): + return self.time.realtime() + + +class Status(Operation): + def __init__(self, backup, op=None): + if op: + super().__init__(op.operation, op.time, **op.detail) + else: + super().__init__(None, None) + + self.name=backup.name + self.state=backup.state + self.errors=backup.errors + +# +# Miscellaneous helper routines +# + +loglevel_translation={ + 'CRITICAL': logging.CRITICAL, + 'ERROR': logging.ERROR, + 'WARNING': logging.WARNING, + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO +} + +def translate_loglevel(x): + if x in loglevel_translation: + return loglevel_translation[x] + else: + return logging.ERROR + +def safe_get_int(t, x): + if x in t: + tmp=t[x] + if isinstance(tmp, int): + return tmp + return None + +# +# The Backup class +# + +class Backup(TerminableThread): + + def __decode_config(self, cfg): + loc0='Backup %d' % self.identifier + + self.backup_name=config.check_string(cfg, 'name', 'Name', loc0) + + logger.debug("Configuring backup '%s'" % self.backup_name) + + self.logger=logger.getChild(self.backup_name) + + loc="Backup '%s'" % self.backup_name + + reponame=config.check_string(cfg, 'repository', + 'Target repository', loc) + + self.repository=repository.find_repository(reponame) + if not self.repository: + raise Exception("Repository '%s' not configured" % reponame) + + self.archive_prefix=config.check_string(cfg, 'archive_prefix', + 'Archive prefix', loc) + + self.archive_template=config.check_string(cfg, 'archive_template', + 'Archive template', loc) + + self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', + 'Backup interval', loc, + config.defaults['backup_interval']) + + self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', + 'Retry interval', loc, + config.defaults['retry_interval']) + + + scheduling=config.check_string(cfg, 'scheduling', + 'Scheduling mode', loc, + default="dreamtime") + + if scheduling=="dreamtime": + self.timeclass=dreamtime.DreamTime + elif scheduling=="realtime": + self.timeclass=dreamtime.MonotonicTime + elif scheduling=="manual": + self.backup_interval=0 + else: + logging.error("Invalid time class '%s' for %s" % (scheduling, loc)) + + self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) + + self.borg_parameters=config.BorgParameters.from_config(cfg, loc) + + + def __init__(self, identifier, cfg, scheduler): + self.identifier=identifier + self.__status_update_callback=None + self.scheduler=scheduler + self.logger=None # setup up in __decode_config once backup name is known + + self.borg_instance=None + self.thread_log=None + self.thread_res=None + + self.current_operation=None + self.scheduled_operation=None + self.lastrun_when=None + self.lastrun_finished=None + self.state=State.INACTIVE + self.errors=Errors.OK + self.timeclass=dreamtime.DreamTime + + self.__decode_config(cfg) + + super().__init__(target = self.__main_thread, name = self.backup_name) + self.daemon=True + + def is_running(self): + with self._cond: + running=self.__is_running_unlocked() + return running + + def __is_running_unlocked(self): + running=self.current_operation + if not running: + # Consistency check + assert((not self.borg_instance and not self.thread_log and + not self.thread_res)) + return running + + def __block_when_running(self): + running=self.is_running() + assert(not running) + + def __log_listener(self): + self.logger.debug('Log listener thread waiting for entries') + success=True + for msg in iter(self.borg_instance.read_log, None): + self.logger.info(str(msg)) + t=msg['type'] + + errormsg=None + callback=None + + if t=='progress_percent': + current=safe_get_int(msg, 'current') + total=safe_get_int(msg, 'total') + if current is not None and total is not None: + with self._cond: + self.current_operation.detail['progress_current']=current + self.current_operation.detail['progress_total']=total + status, callback=self.__status_unlocked() + + elif t=='archive_progress': + original_size=safe_get_int(msg, 'original_size') + compressed_size=safe_get_int(msg, 'compressed_size') + deduplicated_size=safe_get_int(msg, 'deduplicated_size') + if original_size is not None and original_size is not None and deduplicated_size is not None: + with self._cond: + self.current_operation.detail['original_size']=original_size + self.current_operation.detail['compressed_size']=compressed_size + self.current_operation.detail['deduplicated_size']=deduplicated_size + status, callback=self.__status_unlocked() + + elif t=='progress_message': + pass + + elif t=='file_status': + pass + + elif t=='log_message': + if 'levelname' not in msg: + msg['levelname']='ERROR' + if 'message' not in msg: + msg['message']='UNKNOWN' + if 'name' not in msg: + msg['name']='borg' + lvl=translate_loglevel(msg['levelname']) + self.logger.log(lvl, msg['name'] + ': ' + msg['message']) + if lvl>=logging.ERROR: + errormsg=msg + errors=Errors.ERRORS + if ('msgid' in msg and + (msg['msgid']=='LockTimeout' or # observed in reality + msg['msgid']=='LockErrorT' or # in docs + msg['msgid']=='LockErrorT')): # in docs + errors=Errors.BUSY + with self._cond: + self.errors=self.errors.combine(errors) + status, callback=self.__status_unlocked() + else: + self.logger.debug('Unrecognised log entry %s' % str(status)) + + if callback: + callback(status, errors=errormsg) + + self.logger.debug('Waiting for borg subprocess to terminate in log thread') + + self.borg_instance.wait() + + self.logger.debug('Borg subprocess terminated; terminating log listener thread') + + def __result_listener(self): + self.logger.debug('Result listener thread waiting for result') + + res=self.borg_instance.read_result() + + self.logger.debug('Borg result: %s' % str(res)) + + with self._cond: + if res is None and self.errors.ok(): + self.logger.error('No result from borg despite no error in log') + self.errors=Errors.ERRORS + + + def __do_launch(self, op, archive_or_repository, + common_params, op_params, paths=[]): + + inst=BorgInstance(op.operation, archive_or_repository, + common_params, op_params, paths) + + # Only the Repository object has access to the passphrase + self.repository.launch_borg_instance(inst) + + self.logger.debug('Creating listener threads') + + t_log=Thread(target=self.__log_listener) + t_log.daemon=True + + t_res=Thread(target=self.__result_listener) + t_res.daemon=True + + self.thread_log=t_log + self.thread_res=t_res + self.borg_instance=inst + self.current_operation=op + # Update scheduled time to real starting time to schedule + # next run relative to this + self.current_operation.time=dreamtime.MonotonicTime.now() + self.state=State.ACTIVE + # Reset error status when starting a new operation + self.errors=Errors.OK + self.__update_status() + + t_log.start() + t_res.start() + + + def __launch(self, op): + self.logger.debug("Launching '%s'" % str(op.operation)) + + params=(config.borg_parameters + +self.repository.borg_parameters + +self.borg_parameters) + + if op.operation==Operation.CREATE: + archive="%s::%s%s" % (self.repository.location, + self.archive_prefix, + self.archive_template) + + self.__do_launch(op, archive, params.common, + params.create, self.paths) + elif op.operation==Operation.PRUNE: + self.__do_launch(op, self.repository.location, params.common, + [{'prefix': self.archive_prefix}] + params.create) + + else: + raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) + + # This must be called with self._cond held. + def __launch_and_wait(self): + op=self.scheduled_operation + if not op: + self.logger.debug("Queued operation aborted") + else: + self.scheduled_operation=None + + self.__launch(op) + + self.__wait_finish() + + def __wait_finish(self): + # Wait for main logger thread to terminate, or for us to be terminated + while not self.terminate and self.thread_res.is_alive(): + self._cond.release() + self.thread_res.join(JOIN_TIMEOUT) + self._cond.acquire() + + # If terminate has been signalled, let outer termination handler + # take care of things (Within this Backup class, it would be cleanest + # to raise an exception instead, but in most other places it's better + # to just check self._terminate, so we don't complicate things with + # an extra exception.) + if self._terminate: + return + + self.logger.debug('Waiting for borg and log subprocesses to terminate') + + self._cond.release() + self.thread_log.join() + self._cond.acquire() + + if not self.borg_instance.wait(): + self.logger.error('Borg subprocess did not terminate') + self.errors=self.errors.combine(Errors.ERRORS) + + if self.current_operation.operation=='create': + self.lastrun_when=self.current_operation.time.monotonic() + self.lastrun_finished=time.monotonic() + self.thread_res=None + self.thread_log=None + self.borg_instance=None + self.current_operation=None + self.state=State.INACTIVE + self.__update_status() + + def __main_thread(self): + with self._cond: + try: + while not self._terminate: + assert(not self.current_operation) + self.__main_thread_wait_schedule() + if not self._terminate: + self.__main_thread_queue_and_launch() + except Exception as err: + self.logger.exception("Error with backup '%s'" % self.backup_name) + self.errors=Errors.ERRORS + + self.state=State.INACTIVE + self.scheduled_operation=None + + # Clean up to terminate: kill borg instance and communication threads + if self.borg_instance: + self.logger.debug("Terminating a borg instance") + self.borg_instance.terminate() + + # Store threads to use outside lock + thread_log=self.thread_log + thread_res=self.thread_res + self.thread_log=None + self.thread_res=None + + self.logger.debug("Waiting for log and result threads to terminate") + + if thread_log: + thread_log.join() + + if thread_res: + thread_res.join() + + # Main thread/2. Schedule next operation if there is no manually + # requested one + def __main_thread_wait_schedule(self): + op=None + if not self.scheduled_operation: + op=self.__next_operation_unlocked() + if op: + self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" % + (str(op.operation), op.detail or 'none', + op.time.seconds_to(), + op.time.__class__.__name__)) + + self.scheduled_operation=op + self.state=State.SCHEDULED + self.__update_status() + + # Wait under scheduled wait + self.scheduler.wait_until(op.time, self._cond, self.backup_name) + else: + # Nothing scheduled - just wait + self.logger.info("Waiting for manual scheduling") + + self.state=State.INACTIVE + self.__update_status() + + self._cond.wait() + + # Main thread/3. If there is a scheduled operation (it might have been + # changed manually from 'op' created in __main_thread_wait_schedule above), + # queue it on the repository, and launch the operation once repository + # available + def __main_thread_queue_and_launch(self): + if self.scheduled_operation: + self.logger.debug("Queuing") + self.state=State.QUEUED + self.__update_status() + res=self.repository.queue_action(self._cond, + action=self.__launch_and_wait, + name=self.backup_name) + if not res and not self._terminate: + self.logger.debug("Queueing aborted") + self.scheduled_operation=None + self.state=State.INACTIVE + self.__update_status() + + def __next_operation_unlocked(self): + # TODO: pruning as well + if not self.lastrun_finished: + initial_interval=self.retry_interval + if initial_interval==0: + initial_interval=self.backup_interval + if initial_interval==0: + return None + else: + tm=self.timeclass.after(initial_interval) + return Operation(Operation.CREATE, tm, reason='initial') + elif not self.errors.ok(): + if self.retry_interval==0: + return None + else: + tm=dreamtime.MonotonicTime(self.lastrun_finished+self.retry_interval) + return Operation(Operation.CREATE, tm, reason='retry') + else: + if self.backup_interval==0: + return None + else: + tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval) + return Operation(Operation.CREATE, tm) + + def __status_unlocked(self): + callback=self.__status_update_callback + + if self.current_operation: + status=Status(self, self.current_operation) + elif self.scheduled_operation: + status=Status(self, self.scheduled_operation) + else: + status=Status(self) + + return status, callback + + def __update_status(self): + status, callback = self.__status_unlocked() + if callback: + #self._cond.release() + try: + callback(status) + except Exception: + self.logger.exception("Status update error") + #finally: + # self._cond.acquire() + + # + # Interface functions + # + + def set_status_update_callback(self, callback): + with self._cond: + self.__status_update_callback=callback + + def status(self): + with self._cond: + res=self.__status_unlocked() + return res[0] + + def create(self): + op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual') + with self._cond: + self.scheduled_operation=op + self._cond.notify() + + def prune(self): + op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual') + with self._cond: + self.scheduled_operation=op + self._cond.notify() + + # TODO: Decide exact (manual) abort mechanism. Perhaps two stages + def abort(self): + with self._cond: + if self.borg_instance: + self.borg_instance.terminate() +