Tue, 06 Feb 2018 20:16:59 +0000
Changed dreamtime calculation.
The monotonic clock (time.monotonic()) sometimes advances unreliably
in sleep, so isn't reliable for calculating sleeping time. We have to
use time.time(). But this may be changed by the user, so we only use
it to track sleep periods, and still use time.monotonic() to track
wake periods.
# # Borgend by Tuomo Valkonen, 2018 # # This file implements the scheduling, running, and borg output processing # for a specific configured backup. # import logging import time import datetime import re from enum import IntEnum from threading import Thread, Lock, Condition from . import config from . import repository from .dreamtime import MonotonicTime, DreamTime, RealTime from .instance import BorgInstance from .scheduler import TerminableThread from .exprotect import protect_noreturn _logger=logging.getLogger(__name__) JOIN_TIMEOUT=10 # # State and operation related helper classes # class State(IntEnum): # State INACTIVE=0 PAUSED=1 SCHEDULED=2 QUEUED=3 ACTIVE=4 def __str__(self): return _statestring[self] class Errors(IntEnum): OK=0 BUSY=1 OFFLINE=2 ERRORS=3 def combine(self, other): return max(self, other) def ok(self): return self==self.OK def __str__(self): return _errorstring[self] _errorstring={ Errors.OK: 'ok', Errors.BUSY: 'busy', Errors.OFFLINE: 'offline', Errors.ERRORS: 'errors' } _statestring={ State.INACTIVE: 'inactive', State.PAUSED: 'paused', State.SCHEDULED: 'scheduled', State.QUEUED: 'queued', State.ACTIVE: 'active' } class Operation: CREATE='create' PRUNE='prune' INFO='info' LIST='list' def __init__(self, type, start_time, **kwargs): self.type=type self.start_time=start_time self.finish_time=None self.detail=kwargs self.errors=Errors.OK def when(self): return self.start_time.realtime() def ok(self): return self.errors.ok() def add_error(self, error): self.errors=self.errors.combine(error) def name(self): if 'reason' in self.detail: return str(self.type) + '.' + self.detail['reason'] else: return str(self.type) class Status(Operation): def __init__(self, backup, op=None): op=backup.current_operation errorsop=backup.current_operation if not op: op=backup.scheduled_operation if not errorsop: errorsop=backup.previous_operation if op: super().__init__(op.type, op.start_time, **op.detail) else: super().__init__(None, None) if errorsop: self.errors=errorsop.errors self.name=backup.name self.state=backup.state # # Miscellaneous helper routines # loglevel_translation={ 'CRITICAL': logging.CRITICAL, 'ERROR': logging.ERROR, 'WARNING': logging.WARNING, 'DEBUG': logging.DEBUG, 'INFO': logging.INFO } def translate_loglevel(x): if x in loglevel_translation: return loglevel_translation[x] else: return logging.ERROR def safe_get_int(t, x): if x in t: tmp=t[x] if isinstance(tmp, int): return tmp return None def parse_borg_date(d, logger): try: res=datetime.datetime.strptime(d, '%Y-%m-%dT%H:%M:%S.%f') except Exception: logger.exception('Unable parse date from borg: "%s"' % d) res=None return res _checkpoint_str='.checkpoint' _checkpoint_str_len=len(_checkpoint_str) def is_checkpoint(name): i=name.rfind(_checkpoint_str); return i>=0 and i+_checkpoint_str_len==len(name) def get_archive_time(a, logger): if not 'name' in a: logger.error('Borg archive list entry missing name') return None, False if is_checkpoint(a['name']): logger.debug('Skipping checkpoint in archive list') return None, True thistime=None if 'start' in a: thistime=parse_borg_date(a['start'], logger) if not thistime and 'time' in a: thistime=parse_borg_date(a['time'], logger) if not thistime: return None, False return thistime, True _prune_progress_re=re.compile(".*\(([0-9]+)/([0-9]+)\)$") # Borg gives very little progress info in easy form, so try to extrat it def check_prune_status(msg): res=_prune_progress_re.match(msg) if res: c=res.groups() try: archive_no=int(c[0]) of_total=int(c[1]) except: pass else: return archive_no, of_total return None, None # # The Backup class # class Backup(TerminableThread): def __decode_config(self, cfg): loc0='Backup %d' % self.identifier self.backup_name=config.check_string(cfg, 'name', 'Name', loc0) _logger.debug("Configuring backup '%s'" % self.backup_name) self.logger=_logger.getChild(self.backup_name) loc="Backup '%s'" % self.backup_name reponame=config.check_string(cfg, 'repository', 'Target repository', loc) self.repository=repository.find_repository(reponame) if not self.repository: raise Exception("Repository '%s' not configured" % reponame) self.archive_prefix=config.check_string(cfg, 'archive_prefix', 'Archive prefix', loc) self.archive_template=config.check_string(cfg, 'archive_template', 'Archive template', loc) self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', 'Backup interval', loc, config.defaults['backup_interval']) self.prune_interval=config.check_nonneg_int(cfg, 'prune_interval', 'Prune interval', loc, config.defaults['prune_interval']) self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', 'Retry interval', loc, config.defaults['retry_interval']) scheduling=config.check_string(cfg, 'scheduling', 'Scheduling mode', loc, default="dreamtime") if scheduling=="dreamtime": self.timeclass=DreamTime elif scheduling=="realtime": self.timeclass=MonotonicTime elif scheduling=="manual": self.backup_interval=0 else: logging.error("Invalid time class '%s' for %s" % (scheduling, loc)) self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) self.borg_parameters=config.BorgParameters.from_config(cfg, loc) def __init__(self, identifier, cfg, scheduler): self.identifier=identifier self.__status_update_callback=None self._pause=False self.scheduler=scheduler self.logger=None # setup up in __decode_config once backup name is known self.borg_instance=None self.thread_log=None self.thread_res=None self.previous_operation=None self.current_operation=None self.scheduled_operation=None self.previous_operation_of_type={} self.state=State.INACTIVE self.timeclass=DreamTime self.__decode_config(cfg) super().__init__(target = self.__main_thread, name = self.backup_name) self.daemon=True def is_running(self): with self._cond: running=self.__is_running_unlocked() return running def __is_running_unlocked(self): running=self.current_operation if not running: # Consistency check assert((not self.borg_instance and not self.thread_log and not self.thread_res)) return running def __block_when_running(self): running=self.is_running() assert(not running) @protect_noreturn def __log_listener(self): self.logger.debug('Log listener thread waiting for entries') success=True for msg in iter(self.borg_instance.read_log, None): self.logger.debug(str(msg)) t=msg['type'] errormsg=None callback=None if t=='progress_percent': current=safe_get_int(msg, 'current') total=safe_get_int(msg, 'total') operation_no=safe_get_int(msg, 'operation') if current is not None and total is not None: with self._cond: self.current_operation.detail['progress_current']=current self.current_operation.detail['progress_total']=total self.current_operation.detail['operation_no']=operation_no status, callback=self.__status_unlocked() elif t=='archive_progress': original_size=safe_get_int(msg, 'original_size') compressed_size=safe_get_int(msg, 'compressed_size') deduplicated_size=safe_get_int(msg, 'deduplicated_size') if original_size is not None and original_size is not None and deduplicated_size is not None: with self._cond: self.current_operation.detail['original_size']=original_size self.current_operation.detail['compressed_size']=compressed_size self.current_operation.detail['deduplicated_size']=deduplicated_size status, callback=self.__status_unlocked() elif t=='progress_message': pass elif t=='file_status': pass elif t=='log_message': if 'levelname' not in msg: msg['levelname']='ERROR' if 'message' not in msg: msg['message']='UNKNOWN' if 'name' not in msg: msg['name']='borg' lvl=translate_loglevel(msg['levelname']) self.logger.log(lvl, msg['name'] + ': ' + msg['message']) if lvl>=logging.ERROR: errormsg=msg errors=Errors.ERRORS if ('msgid' in msg and (msg['msgid']=='LockTimeout' or # observed in reality msg['msgid']=='LockErrorT' or # in docs msg['msgid']=='LockErrorT')): # in docs errors=Errors.BUSY with self._cond: self.current_operation.add_error(errors) # Don't notify of errors if we are terminating or pausing if not self._terminate_or_pause(): status, callback=self.__status_unlocked() elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE: # Borg gives very little progress info in easy form, so try to extrat it archive_number, of_total=check_prune_status(msg['message']) if archive_number!=None and of_total!=None: self.current_operation.detail['progress_current_secondary']=archive_number self.current_operation.detail['progress_total_secondary']=of_total status, callback=self.__status_unlocked() elif t=='question_prompt' or t=='question_prompt_retry': self.logger.error('Did not expect to receive question prompt from borg') with self._cond: self.current_operation.add_error(Errors.ERRORS) # TODO: terminate org? Send 'NO' reply? elif (t=='question_invalid_answer' or t=='question_accepted_default' or t=='question_accepted_true' or t=='question_accepted_false' or t=='question_env_answer'): pass else: self.logger.debug('Unrecognised log entry %s' % str(status)) if callback: callback(status, errorlog=errormsg) self.logger.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() self.logger.debug('Borg subprocess terminated; terminating log listener thread') @protect_noreturn def __result_listener(self): self.logger.debug('Result listener thread waiting for result') res=self.borg_instance.read_result() self.logger.debug('Borg result: %s' % str(res)) if res is None: with self._cond: # Prune gives absolutely no result, so don't complain if (self.current_operation.ok() and self.current_operation.type!=Operation.PRUNE): self.logger.error('No result from borg despite no error in log') self.current_operation.add_error(Errors.ERRORS) elif self.current_operation.type==Operation.LIST: self.__handle_list_result(res) # All other results are discarded def __handle_list_result(self, res): ok=True latest=None if 'archives' in res and isinstance(res['archives'], list): archives=res['archives'] for a in archives: if not isinstance(a, dict): self.logger.error('Borg archive list entry not a dictionary') ok=False else: thistime, this_ok=get_archive_time(a, self.logger) if thistime and (not latest or thistime>latest): latest=thistime ok=ok and this_ok else: logger.error('Borg archive list missing "archives" entry') if not ok: with self._cond: self.current_operation.add_error(Errors.ERRORS) if latest: self.logger.info('borg info: Previous backup was on %s' % latest.isoformat()) when=MonotonicTime.from_realtime(time.mktime(latest.timetuple())) op=Operation(Operation.CREATE, when, reason='listed') op.finish_time=when with self._cond: self.previous_operation_of_type[Operation.CREATE]=op else: self.logger.info('borg info: Could not discover a previous backup') def __do_launch(self, op, archive_or_repository, common_params, op_params, paths=[]): self.logger.debug('Creating BorgInstance') inst=BorgInstance(op.type, archive_or_repository, common_params, op_params, paths) self.logger.debug('Launching BorgInstance via repository') # Only the Repository object has access to the passphrase self.repository.launch_borg_instance(inst) self.logger.debug('Creating listener threads') t_log=Thread(target=self.__log_listener) t_log.daemon=True t_res=Thread(target=self.__result_listener) t_res.daemon=True self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst self.current_operation=op # Update scheduled time to real starting time to schedule # next run relative to this self.current_operation.start_time=MonotonicTime.now() # Reset error status when starting a new operation self.__update_status(State.ACTIVE) t_log.start() t_res.start() def __launch(self, op): self.logger.debug("Launching '%s'" % str(op.type)) params=(config.borg_parameters +self.repository.borg_parameters +self.borg_parameters) if op.type==Operation.CREATE: archive="%s::%s%s" % (self.repository.location, self.archive_prefix, self.archive_template) self.__do_launch(op, archive, params.common, params.create, self.paths) elif op.type==Operation.PRUNE: self.__do_launch(op, self.repository.location, params.common, [{'prefix': self.archive_prefix}] + params.prune) elif op.type==Operation.LIST: self.__do_launch(op, self.repository.location, params.common, [{'prefix': self.archive_prefix}]) else: raise NotImplementedError("Invalid operation '%s'" % str(op.type)) # This must be called with self._cond held. def __launch_and_wait(self): op=self.scheduled_operation if not op: self.logger.debug("Queued operation aborted") else: self.scheduled_operation=None self.__launch(op) self.__wait_finish() def _terminate_or_pause(self): return self._terminate or self._pause def __wait_finish(self): current=self.current_operation # Wait for main logger thread to terminate, or for us to be terminated while not self._terminate_or_pause() and self.thread_res.is_alive(): self._cond.release() self.thread_res.join(JOIN_TIMEOUT) self._cond.acquire() # If terminate or pause has been signalled, let outer termination handler # take care of things (Within this Backup class, it would be cleanest # to raise an exception instead, but in most other places it's better # to just check self._terminate, so we don't complicate things with # an extra exception.) if self.thread_res.is_alive(): return self.logger.debug('Waiting for borg and log subprocesses to terminate') self._cond.release() self.thread_log.join() self._cond.acquire() if not self.borg_instance.wait(): self.logger.error('Borg subprocess did not terminate') curent.add_error(Errors.ERRORS) current.finish_time=MonotonicTime.now() self.previous_operation_of_type[current.type]=current self.previous_operation=current self.current_operation=None self.thread_res=None self.thread_log=None self.borg_instance=None @protect_noreturn def __main_thread(self): with self._cond: while not self._terminate: try: assert(not self.current_operation) self.__main_thread_wait_schedule() if ((not self._terminate_or_pause()) and self.scheduled_operation and self.scheduled_operation.start_time <= MonotonicTime.now()): self.__main_thread_queue_and_launch() except Exception as err: self.logger.exception("Exception in backup '%s'" % self.backup_name) finally: self.__cleanup() def __cleanup(self): thread_log=self.thread_log thread_res=self.thread_res borg_instance=self.borg_instance self.scheduled_operation=None self.thread_log=None self.thread_res=None self.borg_instance=None self._cond.release() try: if borg_instance: self.logger.debug("Terminating a borg instance") borg_instance.terminate() if thread_log: self.logger.debug("Waiting for log thread to terminate") thread_log.join() if thread_res: self.logger.debug("Waiting for result thread to terminate") thread_res.join() finally: self._cond.acquire() self.current_operation=None # Main thread/2. Schedule next operation if there is no manually # requested one def __main_thread_wait_schedule(self): op=None if self._pause: self.logger.info("Waiting for resume to be signalled") self.__update_status(State.PAUSED) self._cond.wait() else: if not self.scheduled_operation: op=self.__next_operation_unlocked() if op: self.scheduled_operation=op self.__update_status(State.SCHEDULED) # Wait under scheduled wait eventname=op.name() + '@' + self.backup_name self.scheduler.wait_until(op.start_time, self._cond, eventname) else: # Nothing scheduled - just wait self.logger.info("Waiting for manual scheduling") self.__update_status(State.INACTIVE) self._cond.wait() # Main thread/3. If there is a scheduled operation (it might have been # changed manually from 'op' created in __main_thread_wait_schedule above), # queue it on the repository, and launch the operation once repository # available def __main_thread_queue_and_launch(self): if self.scheduled_operation: self.__update_status(State.QUEUED) res=self.repository.queue_action(self._cond, action=self.__launch_and_wait, name=self.backup_name) if not res: self.logger.debug("Queueing aborted") def __next_operation_unlocked(self): listop=self.__next_operation_list() if listop: return listop create=self.__next_operation_type(Operation.CREATE, self.backup_interval, important=True, initial_reason='initial'); prune=self.__next_operation_type(Operation.PRUNE, self.prune_interval, important=False, initial_reason=None); if not prune: return create elif not create: return prune elif create.start_time < prune.start_time: return create else: return prune def __next_operation_list(self): reason='initial' # Unless manual backup has been chosen (backup_interval<=0), perform # repository listing if no previous create operation known, or if we # just pruned the repository if self.backup_interval<=0: return None elif (self.previous_operation and self.previous_operation.type==Operation.PRUNE): tm=MonotonicTime.now() reason='post-prune' elif Operation.LIST in self.previous_operation_of_type: prev=self.previous_operation_of_type[Operation.LIST] if prev.ok(): return None if self.retry_interval<=0: # Do not retry in case of errors if retry interval is <= 0 return None # Attempt after retry interval tm=MonotonicTime.after_other(prev.finish_time, self.retry_interval) else: # Nothing has been attempted: run immediately tm=MonotonicTime.now() return Operation(Operation.LIST, tm, reason=reason) def __next_operation_type(self, optype, standard_interval, important=False, initial_reason=None): if optype not in self.previous_operation_of_type: # No previous operation exists; perform immediately # if important, otherwise after standard interval. # Do not perform if manual operation selected by # setting standard_interval<=0 if standard_interval<=0: return None else: if important: tm=MonotonicTime.now() else: tm=self.timeclass.after(standard_interval) if initial_reason: return Operation(optype, tm, reason=initial_reason) else: return Operation(optype, tm) else: # Previous operation has been performed; perform after # retry interval if there were errors, otherwise after # standard interval. prev=self.previous_operation_of_type[optype] if not prev.ok(): # Do not retry in case of errors if retry interval is <= 0 if self.retry_interval<=0: return None tm=MonotonicTime.after_other(prev.start_time, self.retry_interval) return Operation(optype, tm, reason='retry') elif standard_interval>0: tm=self.timeclass.after_other(prev.start_time, standard_interval) return Operation(optype, tm) else: # Manual operation is standard_interval is zero. return None def __status_unlocked(self): callback=self.__status_update_callback status=Status(self) return status, callback def __update_status(self, state): self.logger.debug("Entering %s state", str(state)) self.state=state status, callback = self.__status_unlocked() if callback: #self._cond.release() try: callback(status) except Exception: self.logger.exception("Status update error") #finally: # self._cond.acquire() # # Interface functions # def set_status_update_callback(self, callback): with self._cond: self.__status_update_callback=callback def status(self): with self._cond: res=self.__status_unlocked() return res[0] def create(self): op=Operation(Operation.CREATE, MonotonicTime.now(), reason='manual') with self._cond: self.scheduled_operation=op self._cond.notify() def prune(self): op=Operation(Operation.PRUNE, MonotonicTime.now(), reason='manual') with self._cond: self.scheduled_operation=op self._cond.notify() # TODO: Decide exact (manual) abort mechanism. Perhaps two stages def abort(self): with self._cond: if self.borg_instance: self.borg_instance.terminate() def is_paused(self): with self._cond: paused=(self.state==State.PAUSED) return paused def pause(self): with self._cond: self.logger.debug('Pause signalled') self.scheduled_operation=None self._pause=True self._cond.notify() def resume(self): with self._cond: self.logger.debug('Resume signalled') self._pause=False self._cond.notify()