borgend/backup.py

Sun, 25 Dec 2022 13:26:18 +0200

author
Tuomo Valkonen <tuomov@iki.fi>
date
Sun, 25 Dec 2022 13:26:18 +0200
changeset 147
c42d69c44170
parent 146
e79cc90c7798
permissions
-rw-r--r--

Prune also needs --glob-archives instead of --prefix

#
# Borgend by Tuomo Valkonen, 2018
#
# This file implements the scheduling, running, and borg output processing
# for a specific configured backup.
#

import logging
import time
import datetime
import re
from enum import IntEnum
from threading import Thread, Lock, Condition

from . import config
from . import repository
from .dreamtime import MonotonicTime, DreamTime, RealTime
from .instance import BorgInstance
from .scheduler import TerminableThread
from .exprotect import protect_noreturn

_logger=logging.getLogger(__name__)

JOIN_TIMEOUT=10

#
# State and operation related helper classes
#

class State(IntEnum):
    # State
    INACTIVE=0
    PAUSED=1
    SCHEDULED=2
    QUEUED=3
    ACTIVE=4

    def __str__(self):
        return _statestring[self]

class Errors(IntEnum):
    OK=0
    BUSY=1
    OFFLINE=2
    ERRORS=3

    def combine(self, other):
        return max(self, other)

    def ok(self):
        return self==self.OK

    def __str__(self):
        return _errorstring[self]

_errorstring={
    Errors.OK: 'ok',
    Errors.BUSY: 'busy',
    Errors.OFFLINE: 'offline',
    Errors.ERRORS: 'errors'
}

_statestring={
    State.INACTIVE: 'inactive',
    State.PAUSED: 'paused',
    State.SCHEDULED: 'scheduled',
    State.QUEUED: 'queued',
    State.ACTIVE: 'active'
}

class Operation:
    CREATE='create'
    PRUNE='prune'
    INFO='info'
    LIST='list'

    def __init__(self, type, start_time, **kwargs):
        self.type=type
        self.start_time=start_time
        self.finish_time=None
        self.detail=kwargs
        self.errors=Errors.OK

    def when(self):
        return self.start_time.realtime()

    def ok(self):
        return self.errors.ok()

    def add_error(self, error):
        self.errors=self.errors.combine(error)

    def name(self):
        if 'reason' in self.detail:
            return str(self.type) + '.' + self.detail['reason']
        else:
            return str(self.type)


class Status(Operation):
    def __init__(self, backup, op=None):
        op=backup.current_operation
        errorsop=backup.current_operation

        if not op:
            op=backup.scheduled_operation

        if not errorsop:
            errorsop=backup.previous_operation

        if op:
            super().__init__(op.type, op.start_time, **op.detail)
        else:
            super().__init__(None, None)

        if errorsop:
            self.errors=errorsop.errors

        self.name=backup.name
        self.state=backup.state

#
# Miscellaneous helper routines
#

loglevel_translation={
    'CRITICAL': logging.CRITICAL,
    'ERROR': logging.ERROR,
    'WARNING': logging.WARNING,
    'DEBUG': logging.DEBUG,
    'INFO': logging.INFO
}

def translate_loglevel(x):
    if x in loglevel_translation:
        return loglevel_translation[x]
    else:
        return logging.ERROR

def safe_get_int(t, x):
    if x in t:
        tmp=t[x]
        if isinstance(tmp, int):
            return tmp
    return None

def parse_borg_date(d, logger):
    try:
        res=datetime.datetime.strptime(d, '%Y-%m-%dT%H:%M:%S.%f')
    except Exception:
        logger.exception('Unable parse date from borg: "%s"' % d)
        res=None
    return res

_checkpoint_str='.checkpoint'
_checkpoint_str_len=len(_checkpoint_str)

def is_checkpoint(name):
    i=name.rfind(_checkpoint_str);
    return i>=0 and i+_checkpoint_str_len==len(name)

def get_archive_time(a, logger):
    if not 'name' in a:
        logger.error('Borg archive list entry missing name')
        return None, False
    if is_checkpoint(a['name']):
        logger.debug('Skipping checkpoint in archive list')
        return None, True

    thistime=None
    if 'start' in a:
        thistime=parse_borg_date(a['start'], logger)
    if not thistime and 'time' in a:
        thistime=parse_borg_date(a['time'], logger)
    if not thistime:
        return None, False

    return thistime, True

_prune_progress_re=re.compile(".*\(([0-9]+)/([0-9]+)\)$")
# Borg gives very little progress info in easy form, so try to extrat it
def check_prune_status(msg):
    res=_prune_progress_re.match(msg)
    if res:
        c=res.groups()
        try:
            archive_no=int(c[0])
            of_total=int(c[1])
        except:
            pass
        else:
            return archive_no, of_total
    return None, None


#
# The Backup class
#

class Backup(TerminableThread):

    def __decode_config(self, cfg):
        loc0='Backup %d' % self.identifier

        self.backup_name=config.check_string(cfg, 'name', 'Name', loc0)

        _logger.debug("Configuring backup '%s'" % self.backup_name)

        self.logger=_logger.getChild(self.backup_name)

        loc="Backup '%s'" % self.backup_name

        reponame=config.check_string(cfg, 'repository',
                                     'Target repository', loc)

        self.repository=repository.find_repository(reponame)
        if not self.repository:
            raise Exception("Repository '%s' not configured" % reponame)

        self.archive_prefix=config.check_string(cfg, 'archive_prefix',
                                                'Archive prefix', loc)

        self.archive_template=config.check_string(cfg, 'archive_template',
                                                  'Archive template', loc)

        self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval',
                                                     'Backup interval', loc,
                                                     config.defaults['backup_interval'])

        self.prune_interval=config.check_nonneg_int(cfg, 'prune_interval',
                                                     'Prune interval', loc,
                                                     config.defaults['prune_interval'])

        self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
                                                    'Retry interval', loc,
                                                    config.defaults['retry_interval'])


        scheduling=config.check_string(cfg, 'scheduling',
                                      'Scheduling mode', loc,
                                      default="dreamtime")

        if scheduling=="dreamtime":
            self.timeclass=DreamTime
        elif scheduling=="realtime":
            self.timeclass=MonotonicTime
        elif scheduling=="manual":
            self.backup_interval=0
        else:
            logging.error("Invalid time class '%s' for %s" % (scheduling, loc))

        self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc)

        self.borg_parameters=config.BorgParameters.from_config(cfg, loc)


    def __init__(self, identifier, cfg, scheduler):
        self.identifier=identifier
        self.__status_update_callback=None
        self._pause=False
        self.scheduler=scheduler
        self.logger=None # setup up in __decode_config once backup name is known

        self.borg_instance=None
        self.thread_log=None
        self.thread_res=None

        self.previous_operation=None
        self.current_operation=None
        self.scheduled_operation=None
        self.previous_operation_of_type={}
        self.state=State.INACTIVE
        self.timeclass=DreamTime
        self.start_time=self.timeclass.now()

        self.__decode_config(cfg)

        super().__init__(target = self.__main_thread, name = self.backup_name)
        self.daemon=True

    def is_running(self):
        with self._cond:
            running=self.__is_running_unlocked()
        return running

    def __is_running_unlocked(self):
        running=self.current_operation
        if not running:
            # Consistency check
            assert((not self.borg_instance and not self.thread_log and
                    not self.thread_res))
        return running

    def __block_when_running(self):
        running=self.is_running()
        assert(not running)

    @protect_noreturn
    def __log_listener(self):
        self.logger.debug('Log listener thread waiting for entries')
        success=True
        for msg in iter(self.borg_instance.read_log, None):
            self.logger.debug(str(msg))
            t=msg['type']

            errormsg=None
            callback=None

            if t=='progress_percent':
                current=safe_get_int(msg, 'current')
                total=safe_get_int(msg, 'total')
                operation_no=safe_get_int(msg, 'operation')
                if current is not None and total is not None:
                    with self._cond:
                        self.current_operation.detail['progress_current']=current
                        self.current_operation.detail['progress_total']=total
                        self.current_operation.detail['operation_no']=operation_no
                        status, callback=self.__status_unlocked()

            elif t=='archive_progress':
                original_size=safe_get_int(msg, 'original_size')
                compressed_size=safe_get_int(msg, 'compressed_size')
                deduplicated_size=safe_get_int(msg, 'deduplicated_size')
                if original_size is not None and original_size is not None and deduplicated_size is not None:
                    with self._cond:
                        self.current_operation.detail['original_size']=original_size
                        self.current_operation.detail['compressed_size']=compressed_size
                        self.current_operation.detail['deduplicated_size']=deduplicated_size
                        status, callback=self.__status_unlocked()

            elif t=='progress_message':
                pass

            elif t=='file_status':
                pass

            elif t=='log_message':
                if 'levelname' not in msg:
                    msg['levelname']='ERROR'
                if 'message' not in msg:
                    msg['message']='UNKNOWN'
                if 'name' not in msg:
                    msg['name']='borg'
                lvl=translate_loglevel(msg['levelname'])
                self.logger.log(lvl, msg['name'] + ': ' + msg['message'])
                if lvl>=logging.ERROR:
                    errormsg=msg
                    errors=Errors.ERRORS
                    if ('msgid' in msg and
                        (msg['msgid']=='LockTimeout' or # observed in reality
                         msg['msgid']=='LockErrorT' or # in docs
                         msg['msgid']=='LockErrorT')): # in docs
                        errors=Errors.BUSY
                    with self._cond:
                        self.current_operation.add_error(errors)
                        # Don't notify of errors if we are terminating or pausing
                        if not self._terminate_or_pause():
                            status, callback=self.__status_unlocked()
                elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE:
                    # Borg gives very little progress info in easy form, so try to extrat it
                    archive_number, of_total=check_prune_status(msg['message'])
                    if archive_number!=None and of_total!=None:
                        self.current_operation.detail['progress_current_secondary']=archive_number
                        self.current_operation.detail['progress_total_secondary']=of_total
                        status, callback=self.__status_unlocked()

            elif t=='question_prompt' or t=='question_prompt_retry':
                self.logger.error('Did not expect to receive question prompt from borg')
                with self._cond:
                    self.current_operation.add_error(Errors.ERRORS)
                # TODO: terminate org? Send 'NO' reply?

            elif (t=='question_invalid_answer' or t=='question_accepted_default'
                  or t=='question_accepted_true' or t=='question_accepted_false'
                  or t=='question_env_answer'):
                pass

            else:
                self.logger.debug('Unrecognised log entry %s' % str(status))

            if callback:
                callback(status, errorlog=errormsg)

        self.logger.debug('Waiting for borg subprocess to terminate in log thread')

        self.borg_instance.wait()

        self.logger.debug('Borg subprocess terminated; terminating log listener thread')

    @protect_noreturn
    def __result_listener(self):
        self.logger.debug('Result listener thread waiting for result')

        res=self.borg_instance.read_result()

        self.logger.debug('Borg result: %s' % str(res))

        if res is None:
            with self._cond:
                # Prune gives absolutely no result, so don't complain
                if (self.current_operation.ok() and
                    self.current_operation.type!=Operation.PRUNE):
                    self.logger.error('No result from borg despite no error in log')
                    self.current_operation.add_error(Errors.ERRORS)
        elif self.current_operation.type==Operation.LIST:
            self.__handle_list_result(res)

        # All other results are discarded

    def __handle_list_result(self, res):
            ok=True
            latest=None
            if 'archives' in res and isinstance(res['archives'], list):
                archives=res['archives']
                for a in archives:
                    if not isinstance(a, dict):
                        self.logger.error('Borg archive list entry not a dictionary')
                        ok=False
                    else:
                        thistime, this_ok=get_archive_time(a, self.logger)
                        if thistime and (not latest or thistime>latest):
                            latest=thistime
                        ok=ok and this_ok
            else:
                logger.error('Borg archive list missing "archives" entry')

            if not ok:
                with self._cond:
                    self.current_operation.add_error(Errors.ERRORS)

            if latest:
                self.logger.info('borg info: Previous backup was on %s' % latest.isoformat())
                when=MonotonicTime.from_realtime(time.mktime(latest.timetuple()))
                op=Operation(Operation.CREATE, when, reason='listed')
                op.finish_time=when
                with self._cond:
                    self.previous_operation_of_type[Operation.CREATE]=op
            else:
                self.logger.info('borg info: Could not discover a previous backup')

    def __do_launch(self, op, archive_or_repository,
                    common_params, op_params, paths=[]):

        # Set up current_operation here so errors can be added to it in
        # __main_thread if there is an exception
        self.current_operation=op
        # Update scheduled time to real starting time to schedule
        # next run relative to this
        self.current_operation.start_time=MonotonicTime.now()

        self.logger.debug('Creating BorgInstance')

        inst=BorgInstance(op.type, archive_or_repository,
                          common_params, op_params, paths)

        self.logger.debug('Launching BorgInstance via repository')

        # Only the Repository object has access to the passphrase
        self.repository.launch_borg_instance(inst)

        self.logger.debug('Creating listener threads')

        t_log=Thread(target=self.__log_listener)
        t_log.daemon=True

        t_res=Thread(target=self.__result_listener)
        t_res.daemon=True

        self.thread_log=t_log
        self.thread_res=t_res
        self.borg_instance=inst
        # Reset error status when starting a new operation
        self.__update_status(State.ACTIVE)

        t_log.start()
        t_res.start()

    def __launch(self, op):
        self.logger.info("Launching '%s'" % str(op.type))

        params=(config.borg_parameters
                +self.repository.borg_parameters
                +self.borg_parameters)

        if op.type==Operation.CREATE:
            archive="%s::%s%s" % (self.repository.location,
                                  self.archive_prefix,
                                  self.archive_template)

            self.__do_launch(op, archive, params.common,
                             params.create, self.paths)
        elif op.type==Operation.PRUNE:
            self.__do_launch(op, self.repository.location, params.common,
                             [{'glob-archives': self.archive_prefix + '*'}] + params.prune)

        elif op.type==Operation.LIST:
            self.__do_launch(op, self.repository.location, params.common,
                             [{'glob-archives': self.archive_prefix + '*'}])
        else:
            raise NotImplementedError("Invalid operation '%s'" % str(op.type))

    # This must be called with self._cond held.
    def __launch_and_wait(self):
        op=self.scheduled_operation
        if not op:
            self.logger.debug("Queued operation aborted")
        else:
            self.scheduled_operation=None

            self.__launch(op)

            self.__wait_finish()

    def _terminate_or_pause(self):
        return self._terminate or self._pause

    def __wait_finish(self):
        # Wait for main logger thread to terminate, or for us to be terminated
        while not self._terminate_or_pause() and self.thread_res.is_alive():
            self._cond.release()
            # Maybe wait for borg instead?
            self.thread_res.join(JOIN_TIMEOUT)
            self._cond.acquire()

        # If terminate or pause has been signalled, let outer termination handler
        # take care of things (Within this Backup class, it would be cleanest
        # to raise an exception instead, but in most other places it's better
        # to just check self._terminate, so we don't complicate things with
        # an extra exception.)
        if self.thread_res.is_alive():
            return

        self.logger.debug('Waiting for borg and log subprocesses to terminate')

        self._cond.release()
        self.thread_log.join()
        self._cond.acquire()

        if not self.borg_instance.wait():
            self.logger.error('Borg subprocess did not terminate')
            self.current_operation.add_error(Errors.ERRORS)

        self.thread_res=None
        self.thread_log=None
        self.borg_instance=None

        self.__mark_current_finished()

    def __mark_current_finished(self):
        current=self.current_operation

        assert(current)

        current.finish_time=MonotonicTime.now()

        self.previous_operation_of_type[current.type]=current
        self.previous_operation=current
        self.current_operation=None

    @protect_noreturn
    def __main_thread(self):
        with self._cond:
            while not self._terminate:
                try:
                    assert(not self.current_operation)
                    self.__main_thread_wait_schedule()
                    if ((not self._terminate_or_pause()) and self.scheduled_operation
                        and self.scheduled_operation.start_time <= MonotonicTime.now()):
                        self.__main_thread_queue_and_launch()
                except Exception as err:
                    self.logger.exception("Exception in backup '%s'" % self.backup_name)
                    current=self.current_operation
                    if current:
                        current.add_error(Errors.ERRORS)
                        self.__mark_current_finished()
                finally:
                    self.__cleanup()

    def __cleanup(self):
        thread_log=self.thread_log
        thread_res=self.thread_res
        borg_instance=self.borg_instance
        self.scheduled_operation=None
        self.thread_log=None
        self.thread_res=None
        self.borg_instance=None

        self._cond.release()
        try:
            if borg_instance:
                self.logger.debug("Terminating a borg instance")
                borg_instance.terminate()

            if thread_log:
                self.logger.debug("Waiting for log thread to terminate")
                thread_log.join()

            if thread_res:
                self.logger.debug("Waiting for result thread to terminate")
                thread_res.join()
        finally:
            self._cond.acquire()
            self.current_operation=None

    # Main thread/2. Schedule next operation if there is no manually
    # requested one
    def __main_thread_wait_schedule(self):
        op=None
        if self._pause:
            self.logger.info("Waiting for resume to be signalled")

            self.__update_status(State.PAUSED)

            self._cond.wait()
        else:
            if not self.scheduled_operation:
                op=self.__next_operation_unlocked()
            if op:
                self.scheduled_operation=op

                self.__update_status(State.SCHEDULED)

                # Wait under scheduled wait
                eventname=op.name() + '@' + self.backup_name
                self.scheduler.wait_until(op.start_time, self._cond, eventname)
            else:
                # Nothing scheduled - just wait
                self.logger.info("Waiting for manual scheduling")

                self.__update_status(State.INACTIVE)

                self._cond.wait()

    # Main thread/3. If there is a scheduled operation (it might have been
    # changed manually from 'op' created in __main_thread_wait_schedule above),
    # queue it on the repository, and launch the operation once repository
    # available
    def __main_thread_queue_and_launch(self):
        if self.scheduled_operation:

            self.__update_status(State.QUEUED)

            res=self.repository.queue_action(self._cond,
                                             action=self.__launch_and_wait,
                                             name=self.backup_name)
            if not res:
                self.logger.debug("Queueing aborted")

    def __next_operation_unlocked(self):
        listop=self.__next_operation_list()
        if listop:
            return listop

        create=self.__next_operation_type(Operation.CREATE,
                                          self.backup_interval,
                                          important=True,
                                          initial_reason='initial');

        prune=self.__next_operation_type(Operation.PRUNE,
                                         self.prune_interval,
                                         important=False,
                                         initial_reason=None);

        if prune:
            self.logger.debug("prune scheduled at %s " % prune.start_time.isoformat())
        else:
            self.logger.debug("no prune scheduled")

        if not prune:
            return create
        elif not create:
            return prune
        elif create.start_time < prune.start_time:
            return create
        else:
            return prune

    def __next_operation_list(self):
        reason='initial'
        # Unless manual backup has been chosen (backup_interval<=0), perform 
        # repository listing if no previous create operation known, or if we
        # just pruned the repository
        if self.backup_interval<=0:
            return None
        elif (self.previous_operation and
              self.previous_operation.type==Operation.PRUNE):
            tm=MonotonicTime.now()
            reason='post-prune'
        elif Operation.LIST in self.previous_operation_of_type:
            prev=self.previous_operation_of_type[Operation.LIST]
            if prev.ok():
                return None
            if self.retry_interval<=0:
                # Do not retry in case of errors if retry interval is <= 0
                return None
            # Attempt after retry interval
            tm=MonotonicTime.after_other(prev.finish_time, self.retry_interval)
        else:
            # Nothing has been attempted: run immediately
            tm=MonotonicTime.now()
        return Operation(Operation.LIST, tm, reason=reason)

    def __next_operation_type(self, optype, standard_interval,
                              important=False,
                              initial_reason=None):
        if optype not in self.previous_operation_of_type:
            # No previous operation exists; perform immediately
            # if important, otherwise after standard interval.
            # Do not perform if manual operation selected by
            # setting standard_interval<=0
            if standard_interval<=0:
                return None
            else:
                if important:
                    tm=MonotonicTime.now()
                else:
                    tm=self.timeclass.after_other(self.start_time, standard_interval)
                if initial_reason:
                    return Operation(optype, tm, reason=initial_reason)
                else:
                    return Operation(optype, tm)
        else:
            # Previous operation has been performed; perform after
            # retry interval if there were errors, otherwise after
            # standard interval.
            prev=self.previous_operation_of_type[optype]
            if not prev.ok():
                # Do not retry in case of errors if retry interval is <= 0
                if self.retry_interval<=0:
                    return None
                tm=MonotonicTime.after_other(prev.start_time,
                                             self.retry_interval)
                return Operation(optype, tm, reason='retry')
            elif standard_interval>0:
                tm=self.timeclass.after_other(prev.start_time,
                                              standard_interval)
                return Operation(optype, tm)
            else:
                # Manual operation is standard_interval is zero.
                return None

    def __status_unlocked(self):
        callback=self.__status_update_callback
        status=Status(self)

        return status, callback

    def __update_status(self, state):
        self.logger.debug("Entering %s state", str(state))
        self.state=state
        status, callback = self.__status_unlocked()
        if callback:
            #self._cond.release()
            try:
                callback(status)
            except Exception:
                self.logger.exception("Status update error")
            #finally:
            #    self._cond.acquire()

    #
    # Interface functions
    #

    def set_status_update_callback(self, callback):
        with self._cond:
            self.__status_update_callback=callback

    def status(self):
        with self._cond:
            res=self.__status_unlocked()
        return res[0]

    def create(self):
        op=Operation(Operation.CREATE, MonotonicTime.now(), reason='manual')
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    def prune(self):
        op=Operation(Operation.PRUNE, MonotonicTime.now(), reason='manual')
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
    def abort(self):
        with self._cond:
            if self.borg_instance:
                self.borg_instance.terminate()

    def is_paused(self):
        with self._cond:
            paused=(self.state==State.PAUSED)
        return paused

    def pause(self):
        with self._cond:
            self.logger.debug('Pause signalled')
            self.scheduled_operation=None
            self._pause=True
            self._cond.notify()

    def resume(self):
        with self._cond:
            self.logger.debug('Resume signalled')
            self._pause=False
            self._cond.notify()

mercurial