diff -r b075b3db3044 -r a409242121d5 backup.py --- a/backup.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,537 +0,0 @@ -# -# Borgend Backup instance -# - -import config -import logging -import time -import loggers -import repository -import dreamtime -from enum import IntEnum -from instance import BorgInstance -from threading import Thread, Lock, Condition -from scheduler import TerminableThread - -logger=loggers.get(__name__) - -JOIN_TIMEOUT=60 - -# -# State and operation related helper classes -# - -class State(IntEnum): - # State - INACTIVE=0 - SCHEDULED=1 - QUEUED=2 - ACTIVE=3 - - -class Errors(IntEnum): - OK=0 - BUSY=1 - OFFLINE=2 - ERRORS=3 - - def combine(self, other): - return max(self, other) - - def ok(self): - return self==self.OK - - def __str__(self): - return _errorstring[self] - -_errorstring={ - Errors.OK: 'ok', - Errors.BUSY: 'busy', - Errors.OFFLINE: 'offline', - Errors.ERRORS: 'errors' -} - -class Operation: - CREATE='create' - PRUNE='prune' - def __init__(self, operation, time, **kwargs): - self.operation=operation - self.time=time - self.detail=kwargs - - def when(self): - return self.time.realtime() - - -class Status(Operation): - def __init__(self, backup, op=None): - if op: - super().__init__(op.operation, op.time, **op.detail) - else: - super().__init__(None, None) - - self.name=backup.name - self.state=backup.state - self.errors=backup.errors - -# -# Miscellaneous helper routines -# - -loglevel_translation={ - 'CRITICAL': logging.CRITICAL, - 'ERROR': logging.ERROR, - 'WARNING': logging.WARNING, - 'DEBUG': logging.DEBUG, - 'INFO': logging.INFO -} - -def translate_loglevel(x): - if x in loglevel_translation: - return loglevel_translation[x] - else: - return logging.ERROR - -def safe_get_int(t, x): - if x in t: - tmp=t[x] - if isinstance(tmp, int): - return tmp - return None - -# -# The Backup class -# - -class Backup(TerminableThread): - - def __decode_config(self, cfg): - loc0='Backup %d' % self.identifier - - self.backup_name=config.check_string(cfg, 'name', 'Name', loc0) - - logger.debug("Configuring backup '%s'" % self.backup_name) - - self.logger=logger.getChild(self.backup_name) - - loc="Backup '%s'" % self.backup_name - - reponame=config.check_string(cfg, 'repository', - 'Target repository', loc) - - self.repository=repository.find_repository(reponame) - if not self.repository: - raise Exception("Repository '%s' not configured" % reponame) - - self.archive_prefix=config.check_string(cfg, 'archive_prefix', - 'Archive prefix', loc) - - self.archive_template=config.check_string(cfg, 'archive_template', - 'Archive template', loc) - - self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', - 'Backup interval', loc, - config.defaults['backup_interval']) - - self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', - 'Retry interval', loc, - config.defaults['retry_interval']) - - - scheduling=config.check_string(cfg, 'scheduling', - 'Scheduling mode', loc, - default="dreamtime") - - if scheduling=="dreamtime": - self.timeclass=dreamtime.DreamTime - elif scheduling=="realtime": - self.timeclass=dreamtime.MonotonicTime - elif scheduling=="manual": - self.backup_interval=0 - else: - logging.error("Invalid time class '%s' for %s" % (scheduling, loc)) - - self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) - - self.borg_parameters=config.BorgParameters.from_config(cfg, loc) - - - def __init__(self, identifier, cfg, scheduler): - self.identifier=identifier - self.__status_update_callback=None - self.scheduler=scheduler - self.logger=None # setup up in __decode_config once backup name is known - - self.borg_instance=None - self.thread_log=None - self.thread_res=None - - self.current_operation=None - self.scheduled_operation=None - self.lastrun_when=None - self.lastrun_finished=None - self.state=State.INACTIVE - self.errors=Errors.OK - self.timeclass=dreamtime.DreamTime - - self.__decode_config(cfg) - - super().__init__(target = self.__main_thread, name = self.backup_name) - self.daemon=True - - def is_running(self): - with self._cond: - running=self.__is_running_unlocked() - return running - - def __is_running_unlocked(self): - running=self.current_operation - if not running: - # Consistency check - assert((not self.borg_instance and not self.thread_log and - not self.thread_res)) - return running - - def __block_when_running(self): - running=self.is_running() - assert(not running) - - def __log_listener(self): - self.logger.debug('Log listener thread waiting for entries') - success=True - for msg in iter(self.borg_instance.read_log, None): - self.logger.info(str(msg)) - t=msg['type'] - - errormsg=None - callback=None - - if t=='progress_percent': - current=safe_get_int(msg, 'current') - total=safe_get_int(msg, 'total') - if current is not None and total is not None: - with self._cond: - self.current_operation.detail['progress_current']=current - self.current_operation.detail['progress_total']=total - status, callback=self.__status_unlocked() - - elif t=='archive_progress': - original_size=safe_get_int(msg, 'original_size') - compressed_size=safe_get_int(msg, 'compressed_size') - deduplicated_size=safe_get_int(msg, 'deduplicated_size') - if original_size is not None and original_size is not None and deduplicated_size is not None: - with self._cond: - self.current_operation.detail['original_size']=original_size - self.current_operation.detail['compressed_size']=compressed_size - self.current_operation.detail['deduplicated_size']=deduplicated_size - status, callback=self.__status_unlocked() - - elif t=='progress_message': - pass - - elif t=='file_status': - pass - - elif t=='log_message': - if 'levelname' not in msg: - msg['levelname']='ERROR' - if 'message' not in msg: - msg['message']='UNKNOWN' - if 'name' not in msg: - msg['name']='borg' - lvl=translate_loglevel(msg['levelname']) - self.logger.log(lvl, msg['name'] + ': ' + msg['message']) - if lvl>=logging.ERROR: - errormsg=msg - errors=Errors.ERRORS - if ('msgid' in msg and - (msg['msgid']=='LockTimeout' or # observed in reality - msg['msgid']=='LockErrorT' or # in docs - msg['msgid']=='LockErrorT')): # in docs - errors=Errors.BUSY - with self._cond: - self.errors=self.errors.combine(errors) - status, callback=self.__status_unlocked() - else: - self.logger.debug('Unrecognised log entry %s' % str(status)) - - if callback: - callback(status, errors=errormsg) - - self.logger.debug('Waiting for borg subprocess to terminate in log thread') - - self.borg_instance.wait() - - self.logger.debug('Borg subprocess terminated; terminating log listener thread') - - def __result_listener(self): - self.logger.debug('Result listener thread waiting for result') - - res=self.borg_instance.read_result() - - self.logger.debug('Borg result: %s' % str(res)) - - with self._cond: - if res is None and self.errors.ok(): - self.logger.error('No result from borg despite no error in log') - self.errors=Errors.ERRORS - - - def __do_launch(self, op, archive_or_repository, - common_params, op_params, paths=[]): - - inst=BorgInstance(op.operation, archive_or_repository, - common_params, op_params, paths) - - # Only the Repository object has access to the passphrase - self.repository.launch_borg_instance(inst) - - self.logger.debug('Creating listener threads') - - t_log=Thread(target=self.__log_listener) - t_log.daemon=True - - t_res=Thread(target=self.__result_listener) - t_res.daemon=True - - self.thread_log=t_log - self.thread_res=t_res - self.borg_instance=inst - self.current_operation=op - # Update scheduled time to real starting time to schedule - # next run relative to this - self.current_operation.time=dreamtime.MonotonicTime.now() - self.state=State.ACTIVE - # Reset error status when starting a new operation - self.errors=Errors.OK - self.__update_status() - - t_log.start() - t_res.start() - - - def __launch(self, op): - self.logger.debug("Launching '%s'" % str(op.operation)) - - params=(config.borg_parameters - +self.repository.borg_parameters - +self.borg_parameters) - - if op.operation==Operation.CREATE: - archive="%s::%s%s" % (self.repository.location, - self.archive_prefix, - self.archive_template) - - self.__do_launch(op, archive, params.common, - params.create, self.paths) - elif op.operation==Operation.PRUNE: - self.__do_launch(op, self.repository.location, params.common, - [{'prefix': self.archive_prefix}] + params.create) - - else: - raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) - - # This must be called with self._cond held. - def __launch_and_wait(self): - op=self.scheduled_operation - if not op: - self.logger.debug("Queued operation aborted") - else: - self.scheduled_operation=None - - self.__launch(op) - - self.__wait_finish() - - def __wait_finish(self): - # Wait for main logger thread to terminate, or for us to be terminated - while not self.terminate and self.thread_res.is_alive(): - self._cond.release() - self.thread_res.join(JOIN_TIMEOUT) - self._cond.acquire() - - # If terminate has been signalled, let outer termination handler - # take care of things (Within this Backup class, it would be cleanest - # to raise an exception instead, but in most other places it's better - # to just check self._terminate, so we don't complicate things with - # an extra exception.) - if self._terminate: - return - - self.logger.debug('Waiting for borg and log subprocesses to terminate') - - self._cond.release() - self.thread_log.join() - self._cond.acquire() - - if not self.borg_instance.wait(): - self.logger.error('Borg subprocess did not terminate') - self.errors=self.errors.combine(Errors.ERRORS) - - if self.current_operation.operation=='create': - self.lastrun_when=self.current_operation.time.monotonic() - self.lastrun_finished=time.monotonic() - self.thread_res=None - self.thread_log=None - self.borg_instance=None - self.current_operation=None - self.state=State.INACTIVE - self.__update_status() - - def __main_thread(self): - with self._cond: - try: - while not self._terminate: - assert(not self.current_operation) - self.__main_thread_wait_schedule() - if not self._terminate: - self.__main_thread_queue_and_launch() - except Exception as err: - self.logger.exception("Error with backup '%s'" % self.backup_name) - self.errors=Errors.ERRORS - - self.state=State.INACTIVE - self.scheduled_operation=None - - # Clean up to terminate: kill borg instance and communication threads - if self.borg_instance: - self.logger.debug("Terminating a borg instance") - self.borg_instance.terminate() - - # Store threads to use outside lock - thread_log=self.thread_log - thread_res=self.thread_res - self.thread_log=None - self.thread_res=None - - self.logger.debug("Waiting for log and result threads to terminate") - - if thread_log: - thread_log.join() - - if thread_res: - thread_res.join() - - # Main thread/2. Schedule next operation if there is no manually - # requested one - def __main_thread_wait_schedule(self): - op=None - if not self.scheduled_operation: - op=self.__next_operation_unlocked() - if op: - self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" % - (str(op.operation), op.detail or 'none', - op.time.seconds_to(), - op.time.__class__.__name__)) - - self.scheduled_operation=op - self.state=State.SCHEDULED - self.__update_status() - - # Wait under scheduled wait - self.scheduler.wait_until(op.time, self._cond, self.backup_name) - else: - # Nothing scheduled - just wait - self.logger.info("Waiting for manual scheduling") - - self.state=State.INACTIVE - self.__update_status() - - self._cond.wait() - - # Main thread/3. If there is a scheduled operation (it might have been - # changed manually from 'op' created in __main_thread_wait_schedule above), - # queue it on the repository, and launch the operation once repository - # available - def __main_thread_queue_and_launch(self): - if self.scheduled_operation: - self.logger.debug("Queuing") - self.state=State.QUEUED - self.__update_status() - res=self.repository.queue_action(self._cond, - action=self.__launch_and_wait, - name=self.backup_name) - if not res and not self._terminate: - self.logger.debug("Queueing aborted") - self.scheduled_operation=None - self.state=State.INACTIVE - self.__update_status() - - def __next_operation_unlocked(self): - # TODO: pruning as well - if not self.lastrun_finished: - initial_interval=self.retry_interval - if initial_interval==0: - initial_interval=self.backup_interval - if initial_interval==0: - return None - else: - tm=self.timeclass.after(initial_interval) - return Operation(Operation.CREATE, tm, reason='initial') - elif not self.errors.ok(): - if self.retry_interval==0: - return None - else: - tm=dreamtime.MonotonicTime(self.lastrun_finished+self.retry_interval) - return Operation(Operation.CREATE, tm, reason='retry') - else: - if self.backup_interval==0: - return None - else: - tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval) - return Operation(Operation.CREATE, tm) - - def __status_unlocked(self): - callback=self.__status_update_callback - - if self.current_operation: - status=Status(self, self.current_operation) - elif self.scheduled_operation: - status=Status(self, self.scheduled_operation) - else: - status=Status(self) - - return status, callback - - def __update_status(self): - status, callback = self.__status_unlocked() - if callback: - #self._cond.release() - try: - callback(status) - except Exception: - self.logger.exception("Status update error") - #finally: - # self._cond.acquire() - - # - # Interface functions - # - - def set_status_update_callback(self, callback): - with self._cond: - self.__status_update_callback=callback - - def status(self): - with self._cond: - res=self.__status_unlocked() - return res[0] - - def create(self): - op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual') - with self._cond: - self.scheduled_operation=op - self._cond.notify() - - def prune(self): - op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual') - with self._cond: - self.scheduled_operation=op - self._cond.notify() - - # TODO: Decide exact (manual) abort mechanism. Perhaps two stages - def abort(self): - with self._cond: - if self.borg_instance: - self.borg_instance.terminate() -