Sat, 20 Jan 2018 15:08:16 +0000
basic scheduling
# # Borgend Backup instance # import config import logging import time from instance import BorgInstance from queue import Queue from threading import Thread, Lock, Timer loglevel_translation={ 'CRITICAL': logging.CRITICAL, 'ERROR': logging.ERROR, 'WARNING': logging.WARNING, 'DEBUG': logging.DEBUG, 'INFO': logging.INFO } def translate_loglevel(x): if x in loglevel_translation: return loglevel_translation[x] else: return logging.ERROR class Backup: def __decode_config(self, cfg): loc0='backup target %d' % self.identifier self.name=config.check_string(cfg, 'name', 'Name', loc0) self.loc='backup target "%s"' % self.name self.repository=config.check_string(cfg, 'repository', 'Target repository', self.loc) self.archive_prefix=config.check_string(cfg, 'archive_prefix', 'Archive prefix', self.loc) self.archive_template=config.check_string(cfg, 'archive_template', 'Archive template', self.loc) self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', 'Backup interval', self.loc, config.defaults['backup_interval']) self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', 'Retry interval', self.loc, config.defaults['retry_interval']) self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', self.loc) self.common_parameters=config.check_list_of_dicts(cfg, 'common_parameters', 'Borg parameters', self.loc, default=[]) self.create_parameters=config.check_list_of_dicts(cfg, 'create_parameters', 'Create parameters', self.loc, default=[]) self.prune_parameters=config.check_list_of_dicts(cfg, 'prune_parameters', 'Prune parameters', self.loc, default=[]) def __init__(self, identifier, cfg): self.identifier=identifier self.__decode_config(cfg) self.config=config self.lastrun=None self.lastrun_success=None self.borg_instance=None self.current_operation=None self.thread_log=None self.thread_res=None self.timer=None self.timer_operation=None self.timer_time=None self.lock=Lock() def is_running(self): with self.lock: running=self.__is_running_unlocked() return running def __is_running_unlocked(self): running=self.current_operation if not running: # Consistency check assert((not self.borg_instance and not self.thread_log and not self.thread_res)) return running def __block_when_running(self): running=self.is_running() assert(not running) def __log_listener(self): logging.debug('Log listener thread waiting for entries') success=True for status in iter(self.borg_instance.read_log, None): logging.debug(str(status)) t=status['type'] #may_indicate_finished=False if t=='progress_percent': #may_indicate_finished=True # Temporary output if 'current' not in status: status['current']=0 if 'total' not in status: status['total']=0 print('%d / %d' % (status['current'], status['total'])) elif t=='archive_progress': pass elif t=='progress_message': #may_indicate_finished=True pass elif t=='file_status': pass elif t=='log_message': if 'levelname' not in status: status['levelname']='ERROR' if 'message' not in status: status['message']='UNKNOWN' if 'name' not in status: status['name']='borg' logging.log(translate_loglevel(status['levelname']), status['name'] + ': ' + status['message']) # set success=False? elif t=='exception': success=False elif t=='unparsed_error': success=False logging.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() logging.debug('Borg subprocess terminated; terminating log listener thread') with self.lock: self.thread_log=None self.__finish_and_reschedule_if_both_listeners_terminated() def __result_listener(self): logging.debug('Result listener thread waiting for result') res=self.borg_instance.read_result() success=True logging.debug('Borg result: %s' % str(res)) if res==None: success=False logging.debug('Waiting for borg subprocess to terminate in result thread') success=success and self.borg_instance.wait() logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) with self.lock: if self.current_operation=='create': self.lastrun=self.time_started self.lastrun_success=success self.thread_res=None self.__finish_and_reschedule_if_both_listeners_terminated() def __finish_and_reschedule_if_both_listeners_terminated(self): if self.thread_res==None and self.thread_log==None: logging.debug('Both threads terminated') self.borg_instance=None self.time_started=None self.current_operation=None self.__schedule_unlocked() def __do_launch(self, queue, operation, archive_or_repository, *args): inst=BorgInstance(operation, archive_or_repository, *args) inst.launch() t_log=Thread(target=self.__log_listener) t_log.daemon=True t_res=Thread(target=self.__result_listener) t_res.daemon=True self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst self.queue=queue self.current_operation=operation self.time_started=time.monotonic() t_log.start() t_res.start() def __launch(self, operation, queue): if self.__is_running_unlocked(): logging.info('Cannot start %s: already running %s' % (operation, self.current_operation)) return False else: if self.timer: logging.debug('Unscheduling timed operation due to launch of operation') self.timer=None self.timer_operation=None self.timer_time=None logging.debug("Launching '%s' on '%s'" % (operation, self.name)) if operation=='create': archive="%s::%s%s" % (self.repository, self.archive_prefix, self.archive_template) self.__do_launch(queue, operation, archive, self.common_parameters+self.create_parameters, self.paths) elif operation=='prune': self.__do_launch(queue, 'prune', self.repository, ([{'prefix': self.archive_prefix}] + self.common_parameters + self.prune_parameters)) else: logging.error("Invalid operaton '%s'" % operation) self.__schedule_unlocked() return True def create(self, queue): with self.lock: res=self.__launch('create', queue) return res def prune(self, queue): with self.lock: res=self.__launch('prune', queue) return res # TODO: Decide exact (manual) abort mechanism. Perhaps two stages def abort(self): with self.lock: if self.borg_instance: self.borg_instance.terminate() thread_log=self.thread_log thread_res=self.thread_res if thread_log: thread_log.terminate() if thread_res: thread_res.terminate() def join(self): logging.debug('Waiting for borg listener threads to terminate') with self.lock: thread_log=self.thread_log thread_res=self.thread_res if thread_log: thread_log.join() if thread_res: thread_res.join() assert(self.thread_log==None and self.thread_res==None) def __queue_timed_operation(self): with self.lock: operation=self.timer_operation self.timer_operation=None self.timer_time=None self.timer=None if self.__is_running_unlocked(): logging.info('Aborted queue operation, as an operation is already running') else: # TODO: Queue on 'repository' and online status for SSH, etc. # TODO: UI comms. queue? self.__launch(operation, None) def __schedule_unlocked(self): if self.current_operation: return self.current_operation, None else: operation, when=self.__next_operation_unlocked() if operation: now=time.monotonic() delay=max(0, when-now) logging.info("Scheduling '%s' of '%s' in %d seconds" % (operation, self.name, delay)) tmr=Timer(delay, self.__queue_timed_operation) self.timer_operation=operation self.timer_time=when self.timer=tmr tmr.start() return operation, time def __next_operation_unlocked(self): # TODO: pruning as well now=time.monotonic() if not self.lastrun: return 'create', now+self.retry_interval elif not self.lastrun_success: return 'create', self.lastrun+self.retry_interval else: if self.backup_interval==0: return 'none', 0 else: return 'create', self.lastrun+self.backup_interval def schedule(self): with self.lock: return self.__schedule_unlocked()