Sun, 25 Dec 2022 13:26:18 +0200
Prune also needs --glob-archives instead of --prefix
# # Borgend by Tuomo Valkonen, 2018 # # This file implements the scheduling, running, and borg output processing # for a specific configured backup. # import logging import time import datetime import re from enum import IntEnum from threading import Thread, Lock, Condition from . import config from . import repository from .dreamtime import MonotonicTime, DreamTime, RealTime from .instance import BorgInstance from .scheduler import TerminableThread from .exprotect import protect_noreturn _logger=logging.getLogger(__name__) JOIN_TIMEOUT=10 # # State and operation related helper classes # class State(IntEnum): # State INACTIVE=0 PAUSED=1 SCHEDULED=2 QUEUED=3 ACTIVE=4 def __str__(self): return _statestring[self] class Errors(IntEnum): OK=0 BUSY=1 OFFLINE=2 ERRORS=3 def combine(self, other): return max(self, other) def ok(self): return self==self.OK def __str__(self): return _errorstring[self] _errorstring={ Errors.OK: 'ok', Errors.BUSY: 'busy', Errors.OFFLINE: 'offline', Errors.ERRORS: 'errors' } _statestring={ State.INACTIVE: 'inactive', State.PAUSED: 'paused', State.SCHEDULED: 'scheduled', State.QUEUED: 'queued', State.ACTIVE: 'active' } class Operation: CREATE='create' PRUNE='prune' INFO='info' LIST='list' def __init__(self, type, start_time, **kwargs): self.type=type self.start_time=start_time self.finish_time=None self.detail=kwargs self.errors=Errors.OK def when(self): return self.start_time.realtime() def ok(self): return self.errors.ok() def add_error(self, error): self.errors=self.errors.combine(error) def name(self): if 'reason' in self.detail: return str(self.type) + '.' + self.detail['reason'] else: return str(self.type) class Status(Operation): def __init__(self, backup, op=None): op=backup.current_operation errorsop=backup.current_operation if not op: op=backup.scheduled_operation if not errorsop: errorsop=backup.previous_operation if op: super().__init__(op.type, op.start_time, **op.detail) else: super().__init__(None, None) if errorsop: self.errors=errorsop.errors self.name=backup.name self.state=backup.state # # Miscellaneous helper routines # loglevel_translation={ 'CRITICAL': logging.CRITICAL, 'ERROR': logging.ERROR, 'WARNING': logging.WARNING, 'DEBUG': logging.DEBUG, 'INFO': logging.INFO } def translate_loglevel(x): if x in loglevel_translation: return loglevel_translation[x] else: return logging.ERROR def safe_get_int(t, x): if x in t: tmp=t[x] if isinstance(tmp, int): return tmp return None def parse_borg_date(d, logger): try: res=datetime.datetime.strptime(d, '%Y-%m-%dT%H:%M:%S.%f') except Exception: logger.exception('Unable parse date from borg: "%s"' % d) res=None return res _checkpoint_str='.checkpoint' _checkpoint_str_len=len(_checkpoint_str) def is_checkpoint(name): i=name.rfind(_checkpoint_str); return i>=0 and i+_checkpoint_str_len==len(name) def get_archive_time(a, logger): if not 'name' in a: logger.error('Borg archive list entry missing name') return None, False if is_checkpoint(a['name']): logger.debug('Skipping checkpoint in archive list') return None, True thistime=None if 'start' in a: thistime=parse_borg_date(a['start'], logger) if not thistime and 'time' in a: thistime=parse_borg_date(a['time'], logger) if not thistime: return None, False return thistime, True _prune_progress_re=re.compile(".*\(([0-9]+)/([0-9]+)\)$") # Borg gives very little progress info in easy form, so try to extrat it def check_prune_status(msg): res=_prune_progress_re.match(msg) if res: c=res.groups() try: archive_no=int(c[0]) of_total=int(c[1]) except: pass else: return archive_no, of_total return None, None # # The Backup class # class Backup(TerminableThread): def __decode_config(self, cfg): loc0='Backup %d' % self.identifier self.backup_name=config.check_string(cfg, 'name', 'Name', loc0) _logger.debug("Configuring backup '%s'" % self.backup_name) self.logger=_logger.getChild(self.backup_name) loc="Backup '%s'" % self.backup_name reponame=config.check_string(cfg, 'repository', 'Target repository', loc) self.repository=repository.find_repository(reponame) if not self.repository: raise Exception("Repository '%s' not configured" % reponame) self.archive_prefix=config.check_string(cfg, 'archive_prefix', 'Archive prefix', loc) self.archive_template=config.check_string(cfg, 'archive_template', 'Archive template', loc) self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', 'Backup interval', loc, config.defaults['backup_interval']) self.prune_interval=config.check_nonneg_int(cfg, 'prune_interval', 'Prune interval', loc, config.defaults['prune_interval']) self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', 'Retry interval', loc, config.defaults['retry_interval']) scheduling=config.check_string(cfg, 'scheduling', 'Scheduling mode', loc, default="dreamtime") if scheduling=="dreamtime": self.timeclass=DreamTime elif scheduling=="realtime": self.timeclass=MonotonicTime elif scheduling=="manual": self.backup_interval=0 else: logging.error("Invalid time class '%s' for %s" % (scheduling, loc)) self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) self.borg_parameters=config.BorgParameters.from_config(cfg, loc) def __init__(self, identifier, cfg, scheduler): self.identifier=identifier self.__status_update_callback=None self._pause=False self.scheduler=scheduler self.logger=None # setup up in __decode_config once backup name is known self.borg_instance=None self.thread_log=None self.thread_res=None self.previous_operation=None self.current_operation=None self.scheduled_operation=None self.previous_operation_of_type={} self.state=State.INACTIVE self.timeclass=DreamTime self.start_time=self.timeclass.now() self.__decode_config(cfg) super().__init__(target = self.__main_thread, name = self.backup_name) self.daemon=True def is_running(self): with self._cond: running=self.__is_running_unlocked() return running def __is_running_unlocked(self): running=self.current_operation if not running: # Consistency check assert((not self.borg_instance and not self.thread_log and not self.thread_res)) return running def __block_when_running(self): running=self.is_running() assert(not running) @protect_noreturn def __log_listener(self): self.logger.debug('Log listener thread waiting for entries') success=True for msg in iter(self.borg_instance.read_log, None): self.logger.debug(str(msg)) t=msg['type'] errormsg=None callback=None if t=='progress_percent': current=safe_get_int(msg, 'current') total=safe_get_int(msg, 'total') operation_no=safe_get_int(msg, 'operation') if current is not None and total is not None: with self._cond: self.current_operation.detail['progress_current']=current self.current_operation.detail['progress_total']=total self.current_operation.detail['operation_no']=operation_no status, callback=self.__status_unlocked() elif t=='archive_progress': original_size=safe_get_int(msg, 'original_size') compressed_size=safe_get_int(msg, 'compressed_size') deduplicated_size=safe_get_int(msg, 'deduplicated_size') if original_size is not None and original_size is not None and deduplicated_size is not None: with self._cond: self.current_operation.detail['original_size']=original_size self.current_operation.detail['compressed_size']=compressed_size self.current_operation.detail['deduplicated_size']=deduplicated_size status, callback=self.__status_unlocked() elif t=='progress_message': pass elif t=='file_status': pass elif t=='log_message': if 'levelname' not in msg: msg['levelname']='ERROR' if 'message' not in msg: msg['message']='UNKNOWN' if 'name' not in msg: msg['name']='borg' lvl=translate_loglevel(msg['levelname']) self.logger.log(lvl, msg['name'] + ': ' + msg['message']) if lvl>=logging.ERROR: errormsg=msg errors=Errors.ERRORS if ('msgid' in msg and (msg['msgid']=='LockTimeout' or # observed in reality msg['msgid']=='LockErrorT' or # in docs msg['msgid']=='LockErrorT')): # in docs errors=Errors.BUSY with self._cond: self.current_operation.add_error(errors) # Don't notify of errors if we are terminating or pausing if not self._terminate_or_pause(): status, callback=self.__status_unlocked() elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE: # Borg gives very little progress info in easy form, so try to extrat it archive_number, of_total=check_prune_status(msg['message']) if archive_number!=None and of_total!=None: self.current_operation.detail['progress_current_secondary']=archive_number self.current_operation.detail['progress_total_secondary']=of_total status, callback=self.__status_unlocked() elif t=='question_prompt' or t=='question_prompt_retry': self.logger.error('Did not expect to receive question prompt from borg') with self._cond: self.current_operation.add_error(Errors.ERRORS) # TODO: terminate org? Send 'NO' reply? elif (t=='question_invalid_answer' or t=='question_accepted_default' or t=='question_accepted_true' or t=='question_accepted_false' or t=='question_env_answer'): pass else: self.logger.debug('Unrecognised log entry %s' % str(status)) if callback: callback(status, errorlog=errormsg) self.logger.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() self.logger.debug('Borg subprocess terminated; terminating log listener thread') @protect_noreturn def __result_listener(self): self.logger.debug('Result listener thread waiting for result') res=self.borg_instance.read_result() self.logger.debug('Borg result: %s' % str(res)) if res is None: with self._cond: # Prune gives absolutely no result, so don't complain if (self.current_operation.ok() and self.current_operation.type!=Operation.PRUNE): self.logger.error('No result from borg despite no error in log') self.current_operation.add_error(Errors.ERRORS) elif self.current_operation.type==Operation.LIST: self.__handle_list_result(res) # All other results are discarded def __handle_list_result(self, res): ok=True latest=None if 'archives' in res and isinstance(res['archives'], list): archives=res['archives'] for a in archives: if not isinstance(a, dict): self.logger.error('Borg archive list entry not a dictionary') ok=False else: thistime, this_ok=get_archive_time(a, self.logger) if thistime and (not latest or thistime>latest): latest=thistime ok=ok and this_ok else: logger.error('Borg archive list missing "archives" entry') if not ok: with self._cond: self.current_operation.add_error(Errors.ERRORS) if latest: self.logger.info('borg info: Previous backup was on %s' % latest.isoformat()) when=MonotonicTime.from_realtime(time.mktime(latest.timetuple())) op=Operation(Operation.CREATE, when, reason='listed') op.finish_time=when with self._cond: self.previous_operation_of_type[Operation.CREATE]=op else: self.logger.info('borg info: Could not discover a previous backup') def __do_launch(self, op, archive_or_repository, common_params, op_params, paths=[]): # Set up current_operation here so errors can be added to it in # __main_thread if there is an exception self.current_operation=op # Update scheduled time to real starting time to schedule # next run relative to this self.current_operation.start_time=MonotonicTime.now() self.logger.debug('Creating BorgInstance') inst=BorgInstance(op.type, archive_or_repository, common_params, op_params, paths) self.logger.debug('Launching BorgInstance via repository') # Only the Repository object has access to the passphrase self.repository.launch_borg_instance(inst) self.logger.debug('Creating listener threads') t_log=Thread(target=self.__log_listener) t_log.daemon=True t_res=Thread(target=self.__result_listener) t_res.daemon=True self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst # Reset error status when starting a new operation self.__update_status(State.ACTIVE) t_log.start() t_res.start() def __launch(self, op): self.logger.info("Launching '%s'" % str(op.type)) params=(config.borg_parameters +self.repository.borg_parameters +self.borg_parameters) if op.type==Operation.CREATE: archive="%s::%s%s" % (self.repository.location, self.archive_prefix, self.archive_template) self.__do_launch(op, archive, params.common, params.create, self.paths) elif op.type==Operation.PRUNE: self.__do_launch(op, self.repository.location, params.common, [{'glob-archives': self.archive_prefix + '*'}] + params.prune) elif op.type==Operation.LIST: self.__do_launch(op, self.repository.location, params.common, [{'glob-archives': self.archive_prefix + '*'}]) else: raise NotImplementedError("Invalid operation '%s'" % str(op.type)) # This must be called with self._cond held. def __launch_and_wait(self): op=self.scheduled_operation if not op: self.logger.debug("Queued operation aborted") else: self.scheduled_operation=None self.__launch(op) self.__wait_finish() def _terminate_or_pause(self): return self._terminate or self._pause def __wait_finish(self): # Wait for main logger thread to terminate, or for us to be terminated while not self._terminate_or_pause() and self.thread_res.is_alive(): self._cond.release() # Maybe wait for borg instead? self.thread_res.join(JOIN_TIMEOUT) self._cond.acquire() # If terminate or pause has been signalled, let outer termination handler # take care of things (Within this Backup class, it would be cleanest # to raise an exception instead, but in most other places it's better # to just check self._terminate, so we don't complicate things with # an extra exception.) if self.thread_res.is_alive(): return self.logger.debug('Waiting for borg and log subprocesses to terminate') self._cond.release() self.thread_log.join() self._cond.acquire() if not self.borg_instance.wait(): self.logger.error('Borg subprocess did not terminate') self.current_operation.add_error(Errors.ERRORS) self.thread_res=None self.thread_log=None self.borg_instance=None self.__mark_current_finished() def __mark_current_finished(self): current=self.current_operation assert(current) current.finish_time=MonotonicTime.now() self.previous_operation_of_type[current.type]=current self.previous_operation=current self.current_operation=None @protect_noreturn def __main_thread(self): with self._cond: while not self._terminate: try: assert(not self.current_operation) self.__main_thread_wait_schedule() if ((not self._terminate_or_pause()) and self.scheduled_operation and self.scheduled_operation.start_time <= MonotonicTime.now()): self.__main_thread_queue_and_launch() except Exception as err: self.logger.exception("Exception in backup '%s'" % self.backup_name) current=self.current_operation if current: current.add_error(Errors.ERRORS) self.__mark_current_finished() finally: self.__cleanup() def __cleanup(self): thread_log=self.thread_log thread_res=self.thread_res borg_instance=self.borg_instance self.scheduled_operation=None self.thread_log=None self.thread_res=None self.borg_instance=None self._cond.release() try: if borg_instance: self.logger.debug("Terminating a borg instance") borg_instance.terminate() if thread_log: self.logger.debug("Waiting for log thread to terminate") thread_log.join() if thread_res: self.logger.debug("Waiting for result thread to terminate") thread_res.join() finally: self._cond.acquire() self.current_operation=None # Main thread/2. Schedule next operation if there is no manually # requested one def __main_thread_wait_schedule(self): op=None if self._pause: self.logger.info("Waiting for resume to be signalled") self.__update_status(State.PAUSED) self._cond.wait() else: if not self.scheduled_operation: op=self.__next_operation_unlocked() if op: self.scheduled_operation=op self.__update_status(State.SCHEDULED) # Wait under scheduled wait eventname=op.name() + '@' + self.backup_name self.scheduler.wait_until(op.start_time, self._cond, eventname) else: # Nothing scheduled - just wait self.logger.info("Waiting for manual scheduling") self.__update_status(State.INACTIVE) self._cond.wait() # Main thread/3. If there is a scheduled operation (it might have been # changed manually from 'op' created in __main_thread_wait_schedule above), # queue it on the repository, and launch the operation once repository # available def __main_thread_queue_and_launch(self): if self.scheduled_operation: self.__update_status(State.QUEUED) res=self.repository.queue_action(self._cond, action=self.__launch_and_wait, name=self.backup_name) if not res: self.logger.debug("Queueing aborted") def __next_operation_unlocked(self): listop=self.__next_operation_list() if listop: return listop create=self.__next_operation_type(Operation.CREATE, self.backup_interval, important=True, initial_reason='initial'); prune=self.__next_operation_type(Operation.PRUNE, self.prune_interval, important=False, initial_reason=None); if prune: self.logger.debug("prune scheduled at %s " % prune.start_time.isoformat()) else: self.logger.debug("no prune scheduled") if not prune: return create elif not create: return prune elif create.start_time < prune.start_time: return create else: return prune def __next_operation_list(self): reason='initial' # Unless manual backup has been chosen (backup_interval<=0), perform # repository listing if no previous create operation known, or if we # just pruned the repository if self.backup_interval<=0: return None elif (self.previous_operation and self.previous_operation.type==Operation.PRUNE): tm=MonotonicTime.now() reason='post-prune' elif Operation.LIST in self.previous_operation_of_type: prev=self.previous_operation_of_type[Operation.LIST] if prev.ok(): return None if self.retry_interval<=0: # Do not retry in case of errors if retry interval is <= 0 return None # Attempt after retry interval tm=MonotonicTime.after_other(prev.finish_time, self.retry_interval) else: # Nothing has been attempted: run immediately tm=MonotonicTime.now() return Operation(Operation.LIST, tm, reason=reason) def __next_operation_type(self, optype, standard_interval, important=False, initial_reason=None): if optype not in self.previous_operation_of_type: # No previous operation exists; perform immediately # if important, otherwise after standard interval. # Do not perform if manual operation selected by # setting standard_interval<=0 if standard_interval<=0: return None else: if important: tm=MonotonicTime.now() else: tm=self.timeclass.after_other(self.start_time, standard_interval) if initial_reason: return Operation(optype, tm, reason=initial_reason) else: return Operation(optype, tm) else: # Previous operation has been performed; perform after # retry interval if there were errors, otherwise after # standard interval. prev=self.previous_operation_of_type[optype] if not prev.ok(): # Do not retry in case of errors if retry interval is <= 0 if self.retry_interval<=0: return None tm=MonotonicTime.after_other(prev.start_time, self.retry_interval) return Operation(optype, tm, reason='retry') elif standard_interval>0: tm=self.timeclass.after_other(prev.start_time, standard_interval) return Operation(optype, tm) else: # Manual operation is standard_interval is zero. return None def __status_unlocked(self): callback=self.__status_update_callback status=Status(self) return status, callback def __update_status(self, state): self.logger.debug("Entering %s state", str(state)) self.state=state status, callback = self.__status_unlocked() if callback: #self._cond.release() try: callback(status) except Exception: self.logger.exception("Status update error") #finally: # self._cond.acquire() # # Interface functions # def set_status_update_callback(self, callback): with self._cond: self.__status_update_callback=callback def status(self): with self._cond: res=self.__status_unlocked() return res[0] def create(self): op=Operation(Operation.CREATE, MonotonicTime.now(), reason='manual') with self._cond: self.scheduled_operation=op self._cond.notify() def prune(self): op=Operation(Operation.PRUNE, MonotonicTime.now(), reason='manual') with self._cond: self.scheduled_operation=op self._cond.notify() # TODO: Decide exact (manual) abort mechanism. Perhaps two stages def abort(self): with self._cond: if self.borg_instance: self.borg_instance.terminate() def is_paused(self): with self._cond: paused=(self.state==State.PAUSED) return paused def pause(self): with self._cond: self.logger.debug('Pause signalled') self.scheduled_operation=None self._pause=True self._cond.notify() def resume(self): with self._cond: self.logger.debug('Resume signalled') self._pause=False self._cond.notify()