Sat, 20 Jan 2018 23:00:43 +0000
Added offline symbol B⦙ (no offline detection yet)
# # Borgend Backup instance # import config import logging import time from instance import BorgInstance from queue import Queue from threading import Thread, Lock, Timer loglevel_translation={ 'CRITICAL': logging.CRITICAL, 'ERROR': logging.ERROR, 'WARNING': logging.WARNING, 'DEBUG': logging.DEBUG, 'INFO': logging.INFO } def translate_loglevel(x): if x in loglevel_translation: return loglevel_translation[x] else: return logging.ERROR def safe_get_int(t, x): if x in t: tmp=t[x] if isinstance(tmp, int): return tmp return None class Backup: def __decode_config(self, cfg): loc0='backup target %d' % self.identifier self.name=config.check_string(cfg, 'name', 'Name', loc0) self.loc='backup target "%s"' % self.name self.repository=config.check_string(cfg, 'repository', 'Target repository', self.loc) self.archive_prefix=config.check_string(cfg, 'archive_prefix', 'Archive prefix', self.loc) self.archive_template=config.check_string(cfg, 'archive_template', 'Archive template', self.loc) self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', 'Backup interval', self.loc, config.defaults['backup_interval']) self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', 'Retry interval', self.loc, config.defaults['retry_interval']) self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', self.loc) self.common_parameters=config.check_list_of_dicts(cfg, 'common_parameters', 'Borg parameters', self.loc, default=[]) self.create_parameters=config.check_list_of_dicts(cfg, 'create_parameters', 'Create parameters', self.loc, default=[]) self.prune_parameters=config.check_list_of_dicts(cfg, 'prune_parameters', 'Prune parameters', self.loc, default=[]) def __init__(self, identifier, cfg): self.identifier=identifier self.__decode_config(cfg) self.config=config self.lastrun=None self.lastrun_success=None self.borg_instance=None self.current_operation=None self.thread_log=None self.thread_res=None self.timer=None self.scheduled_operation=None self.lock=Lock() self.status_update_callback=None def is_running(self): with self.lock: running=self.__is_running_unlocked() return running def __is_running_unlocked(self): running=self.current_operation if not running: # Consistency check assert((not self.borg_instance and not self.thread_log and not self.thread_res)) return running def __block_when_running(self): running=self.is_running() assert(not running) def __log_listener(self): logging.debug('Log listener thread waiting for entries') success=True for status in iter(self.borg_instance.read_log, None): logging.debug(str(status)) t=status['type'] errors_this_message=False callback=None if t=='progress_percent': current=safe_get_int(status, 'current') total=safe_get_int(status, 'total') if current is not None and total is not None: with self.lock: self.current_operation.progress_current=current self.current_operation.progress_total=total status, callback=self.__status_unlocked() elif t=='archive_progress': original_size=safe_get_int(status, 'original_size') compressed_size=safe_get_int(status, 'compressed_size') deduplicated_size=safe_get_int(status, 'deduplicated_size') if original_size is not None and original_size is not None and deduplicated_size is not None: with self.lock: self.current_operation.original_size=original_size self.current_operation.compressed_size=compressed_size self.current_operation.deduplicated_size=deduplicated_size status, callback=self.__status_unlocked() elif t=='progress_message': pass elif t=='file_status': pass elif t=='log_message': if 'levelname' not in status: status['levelname']='ERROR' if 'message' not in status: status['message']='UNKNOWN' if 'name' not in status: status['name']='borg' lvl=translate_loglevel(status['levelname']) logging.log(lvl, status['name'] + ': ' + status['message']) if lvl>=logging.WARNING: errors_this_message=True elif t=='exception': errors_this_message=True elif t=='unparsed_error': errors_this_message=True if errors_this_message: with self.lock: self.current_operation['errors']=True status, callback=self.__status_unlocked() if callback: callback(self, status) logging.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() logging.debug('Borg subprocess terminated; terminating log listener thread') def __result_listener(self): with self.lock: status, callback=self.__status_unlocked() if callback: callback(self, status) logging.debug('Result listener thread waiting for result') res=self.borg_instance.read_result() success=True logging.debug('Borg result: %s' % str(res)) if res==None: success=False logging.debug('Waiting for borg subprocess to terminate in result thread') success=success and self.borg_instance.wait() logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) self.thread_log.join() with self.lock: if self.current_operation['operation']=='create': self.lastrun=self.current_operation['when_monotonic'] self.lastrun_success=success self.thread_res=None self.thread_log=None self.borg_instance=None self.current_operation=None self.__schedule_unlocked() status, callback=self.__status_unlocked() if callback: callback(self, status) def __do_launch(self, queue, op, archive_or_repository, *args): inst=BorgInstance(op['operation'], archive_or_repository, *args) inst.launch() t_log=Thread(target=self.__log_listener) t_log.daemon=True t_res=Thread(target=self.__result_listener) t_res.daemon=True self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst self.queue=queue self.current_operation=op self.current_operation['when_monotonic']=time.monotonic() t_log.start() t_res.start() def __launch(self, op, queue): if self.__is_running_unlocked(): logging.info('Cannot start %s: already running %s' % (operation, self.current_operation)) return False else: if self.timer: logging.debug('Unscheduling timed operation due to launch of operation') self.timer=None self.scheduled_operation=None logging.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) if op['operation']=='create': archive="%s::%s%s" % (self.repository, self.archive_prefix, self.archive_template) self.__do_launch(queue, op, archive, self.common_parameters+self.create_parameters, self.paths) elif op['operation']=='prune': self.__do_launch(queue, op, self.repository, ([{'prefix': self.archive_prefix}] + self.common_parameters + self.prune_parameters)) else: logging.error("Invalid operaton '%s'" % op['operation']) self.__schedule_unlocked() return True def create(self, queue): op={'operation': 'create', 'detail': 'manual'} with self.lock: res=self.__launch(op, queue) return res def prune(self, queue): op={'operation': 'prune', 'detail': 'manual'} with self.lock: res=self.__launch(op, queue) return res # TODO: Decide exact (manual) abort mechanism. Perhaps two stages def abort(self): with self.lock: if self.borg_instance: self.borg_instance.terminate() thread_log=self.thread_log thread_res=self.thread_res if thread_log: thread_log.terminate() if thread_res: thread_res.terminate() def join(self): logging.debug('Waiting for borg listener threads to terminate') with self.lock: thread_log=self.thread_log thread_res=self.thread_res if thread_log: thread_log.join() if thread_res: thread_res.join() assert(self.thread_log==None and self.thread_res==None) def __queue_timed_operation(self): with self.lock: op=self.scheduled_operation self.scheduled_operation=None self.timer=None if self.__is_running_unlocked(): logging.info('Aborted queue operation, as an operation is already running') else: # TODO: Queue on 'repository' and online status for SSH, etc. # TODO: UI comms. queue? self.__launch(op, None) def __next_operation_unlocked(self): # TODO: pruning as well now=time.monotonic() if not self.lastrun: initial_interval=self.retry_interval if initial_interval==0: initial_interval=self.backup_interval if initial_interval==0: return None else: return {'operation': 'create', 'detail': 'initial', 'when_monotonic': now+initial_interval} elif not self.lastrun_success: if self.retry_interval==0: return None else: return {'operation': 'create', 'detail': 'retry', 'when_monotonic': self.lastrun+self.retry_interval} else: if self.backup_interval==0: return None else: return {'operation': 'create', 'detail': None, 'when_monotonic': self.lastrun+self.backup_interval} def __schedule_unlocked(self): if self.current_operation: return self.current_operation else: op=self.__next_operation_unlocked() if op: now=time.monotonic() delay=max(0, op['when_monotonic']-now) logging.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % (op['operation'], op['detail'], self.name, delay)) tmr=Timer(delay, self.__queue_timed_operation) self.scheduled_operation=op self.timer=tmr tmr.start() return op def schedule(self): with self.lock: return self.__schedule_unlocked() def set_status_update_callback(self, callback): with self.lock: self.status_update_callback=callback def status(self): with self.lock: res=self.__status_unlocked() return res[0] def __status_unlocked(self): callback=self.status_update_callback if self.current_operation: status=self.current_operation status['type']='current' elif self.scheduled_operation: status=self.scheduled_operation status['type']='scheduled' else: status={'type': 'nothing'} status['name']=self.name if 'errors' not in status: status['errors']=False if 'when_monotonic' in status: status['when']=(status['when_monotonic'] -time.monotonic()+time.time()) return status, callback