borgend/backup.py

Wed, 07 Feb 2018 20:39:01 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Wed, 07 Feb 2018 20:39:01 +0000
changeset 113
6993964140bd
parent 109
246190bfd501
child 115
df0de44d2c4b
permissions
-rw-r--r--

Time snapshot fixes.
Python's default arguments are purely idiotic (aka. pythonic): generated
only once. This makes sense in a purely functional language, which Python
lightyears away from, but severely limits their usefulness in an imperative
language. Decorators also seem clumsy for this, as one would have to tell
the number of positional arguments for things to work nice, being able to
pass the snapshot both positionally and as keyword. No luck.
So have to do things the old-fashioned hard way.

#
# Borgend by Tuomo Valkonen, 2018
#
# This file implements the scheduling, running, and borg output processing
# for a specific configured backup.
#

import logging
import time
import datetime
import re
from enum import IntEnum
from threading import Thread, Lock, Condition

from . import config
from . import repository
from .dreamtime import MonotonicTime, DreamTime, RealTime
from .instance import BorgInstance
from .scheduler import TerminableThread
from .exprotect import protect_noreturn

_logger=logging.getLogger(__name__)

JOIN_TIMEOUT=10

#
# State and operation related helper classes
#

class State(IntEnum):
    # State
    INACTIVE=0
    PAUSED=1
    SCHEDULED=2
    QUEUED=3
    ACTIVE=4

    def __str__(self):
        return _statestring[self]

class Errors(IntEnum):
    OK=0
    BUSY=1
    OFFLINE=2
    ERRORS=3

    def combine(self, other):
        return max(self, other)

    def ok(self):
        return self==self.OK

    def __str__(self):
        return _errorstring[self]

_errorstring={
    Errors.OK: 'ok',
    Errors.BUSY: 'busy',
    Errors.OFFLINE: 'offline',
    Errors.ERRORS: 'errors'
}

_statestring={
    State.INACTIVE: 'inactive',
    State.PAUSED: 'paused',
    State.SCHEDULED: 'scheduled',
    State.QUEUED: 'queued',
    State.ACTIVE: 'active'
}

class Operation:
    CREATE='create'
    PRUNE='prune'
    INFO='info'
    LIST='list'

    def __init__(self, type, start_time, **kwargs):
        self.type=type
        self.start_time=start_time
        self.finish_time=None
        self.detail=kwargs
        self.errors=Errors.OK

    def when(self):
        return self.start_time.realtime()

    def ok(self):
        return self.errors.ok()

    def add_error(self, error):
        self.errors=self.errors.combine(error)

    def name(self):
        if 'reason' in self.detail:
            return str(self.type) + '.' + self.detail['reason']
        else:
            return str(self.type)


class Status(Operation):
    def __init__(self, backup, op=None):
        op=backup.current_operation
        errorsop=backup.current_operation

        if not op:
            op=backup.scheduled_operation

        if not errorsop:
            errorsop=backup.previous_operation

        if op:
            super().__init__(op.type, op.start_time, **op.detail)
        else:
            super().__init__(None, None)

        if errorsop:
            self.errors=errorsop.errors

        self.name=backup.name
        self.state=backup.state

#
# Miscellaneous helper routines
#

loglevel_translation={
    'CRITICAL': logging.CRITICAL,
    'ERROR': logging.ERROR,
    'WARNING': logging.WARNING,
    'DEBUG': logging.DEBUG,
    'INFO': logging.INFO
}

def translate_loglevel(x):
    if x in loglevel_translation:
        return loglevel_translation[x]
    else:
        return logging.ERROR

def safe_get_int(t, x):
    if x in t:
        tmp=t[x]
        if isinstance(tmp, int):
            return tmp
    return None

def parse_borg_date(d, logger):
    try:
        res=datetime.datetime.strptime(d, '%Y-%m-%dT%H:%M:%S.%f')
    except Exception:
        logger.exception('Unable parse date from borg: "%s"' % d)
        res=None
    return res

_checkpoint_str='.checkpoint'
_checkpoint_str_len=len(_checkpoint_str)

def is_checkpoint(name):
    i=name.rfind(_checkpoint_str);
    return i>=0 and i+_checkpoint_str_len==len(name)

def get_archive_time(a, logger):
    if not 'name' in a:
        logger.error('Borg archive list entry missing name')
        return None, False
    if is_checkpoint(a['name']):
        logger.debug('Skipping checkpoint in archive list')
        return None, True

    thistime=None
    if 'start' in a:
        thistime=parse_borg_date(a['start'], logger)
    if not thistime and 'time' in a:
        thistime=parse_borg_date(a['time'], logger)
    if not thistime:
        return None, False

    return thistime, True

_prune_progress_re=re.compile(".*\(([0-9]+)/([0-9]+)\)$")
# Borg gives very little progress info in easy form, so try to extrat it
def check_prune_status(msg):
    res=_prune_progress_re.match(msg)
    if res:
        c=res.groups()
        try:
            archive_no=int(c[0])
            of_total=int(c[1])
        except:
            pass
        else:
            return archive_no, of_total
    return None, None


#
# The Backup class
#

class Backup(TerminableThread):

    def __decode_config(self, cfg):
        loc0='Backup %d' % self.identifier

        self.backup_name=config.check_string(cfg, 'name', 'Name', loc0)

        _logger.debug("Configuring backup '%s'" % self.backup_name)

        self.logger=_logger.getChild(self.backup_name)

        loc="Backup '%s'" % self.backup_name

        reponame=config.check_string(cfg, 'repository',
                                     'Target repository', loc)

        self.repository=repository.find_repository(reponame)
        if not self.repository:
            raise Exception("Repository '%s' not configured" % reponame)

        self.archive_prefix=config.check_string(cfg, 'archive_prefix',
                                                'Archive prefix', loc)

        self.archive_template=config.check_string(cfg, 'archive_template',
                                                  'Archive template', loc)

        self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval',
                                                     'Backup interval', loc,
                                                     config.defaults['backup_interval'])

        self.prune_interval=config.check_nonneg_int(cfg, 'prune_interval',
                                                     'Prune interval', loc,
                                                     config.defaults['prune_interval'])

        self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
                                                    'Retry interval', loc,
                                                    config.defaults['retry_interval'])


        scheduling=config.check_string(cfg, 'scheduling',
                                      'Scheduling mode', loc,
                                      default="dreamtime")

        if scheduling=="dreamtime":
            self.timeclass=DreamTime
        elif scheduling=="realtime":
            self.timeclass=MonotonicTime
        elif scheduling=="manual":
            self.backup_interval=0
        else:
            logging.error("Invalid time class '%s' for %s" % (scheduling, loc))

        self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc)

        self.borg_parameters=config.BorgParameters.from_config(cfg, loc)


    def __init__(self, identifier, cfg, scheduler):
        self.identifier=identifier
        self.__status_update_callback=None
        self._pause=False
        self.scheduler=scheduler
        self.logger=None # setup up in __decode_config once backup name is known

        self.borg_instance=None
        self.thread_log=None
        self.thread_res=None

        self.previous_operation=None
        self.current_operation=None
        self.scheduled_operation=None
        self.previous_operation_of_type={}
        self.state=State.INACTIVE
        self.timeclass=DreamTime

        self.__decode_config(cfg)

        super().__init__(target = self.__main_thread, name = self.backup_name)
        self.daemon=True

    def is_running(self):
        with self._cond:
            running=self.__is_running_unlocked()
        return running

    def __is_running_unlocked(self):
        running=self.current_operation
        if not running:
            # Consistency check
            assert((not self.borg_instance and not self.thread_log and
                    not self.thread_res))
        return running

    def __block_when_running(self):
        running=self.is_running()
        assert(not running)

    @protect_noreturn
    def __log_listener(self):
        self.logger.debug('Log listener thread waiting for entries')
        success=True
        for msg in iter(self.borg_instance.read_log, None):
            self.logger.debug(str(msg))
            t=msg['type']

            errormsg=None
            callback=None

            if t=='progress_percent':
                current=safe_get_int(msg, 'current')
                total=safe_get_int(msg, 'total')
                operation_no=safe_get_int(msg, 'operation')
                if current is not None and total is not None:
                    with self._cond:
                        self.current_operation.detail['progress_current']=current
                        self.current_operation.detail['progress_total']=total
                        self.current_operation.detail['operation_no']=operation_no
                        status, callback=self.__status_unlocked()

            elif t=='archive_progress':
                original_size=safe_get_int(msg, 'original_size')
                compressed_size=safe_get_int(msg, 'compressed_size')
                deduplicated_size=safe_get_int(msg, 'deduplicated_size')
                if original_size is not None and original_size is not None and deduplicated_size is not None:
                    with self._cond:
                        self.current_operation.detail['original_size']=original_size
                        self.current_operation.detail['compressed_size']=compressed_size
                        self.current_operation.detail['deduplicated_size']=deduplicated_size
                        status, callback=self.__status_unlocked()

            elif t=='progress_message':
                pass

            elif t=='file_status':
                pass

            elif t=='log_message':
                if 'levelname' not in msg:
                    msg['levelname']='ERROR'
                if 'message' not in msg:
                    msg['message']='UNKNOWN'
                if 'name' not in msg:
                    msg['name']='borg'
                lvl=translate_loglevel(msg['levelname'])
                self.logger.log(lvl, msg['name'] + ': ' + msg['message'])
                if lvl>=logging.ERROR:
                    errormsg=msg
                    errors=Errors.ERRORS
                    if ('msgid' in msg and
                        (msg['msgid']=='LockTimeout' or # observed in reality
                         msg['msgid']=='LockErrorT' or # in docs
                         msg['msgid']=='LockErrorT')): # in docs
                        errors=Errors.BUSY
                    with self._cond:
                        self.current_operation.add_error(errors)
                        # Don't notify of errors if we are terminating or pausing
                        if not self._terminate_or_pause():
                            status, callback=self.__status_unlocked()
                elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE:
                    # Borg gives very little progress info in easy form, so try to extrat it
                    archive_number, of_total=check_prune_status(msg['message'])
                    if archive_number!=None and of_total!=None:
                        self.current_operation.detail['progress_current_secondary']=archive_number
                        self.current_operation.detail['progress_total_secondary']=of_total
                        status, callback=self.__status_unlocked()

            elif t=='question_prompt' or t=='question_prompt_retry':
                self.logger.error('Did not expect to receive question prompt from borg')
                with self._cond:
                    self.current_operation.add_error(Errors.ERRORS)
                # TODO: terminate org? Send 'NO' reply?

            elif (t=='question_invalid_answer' or t=='question_accepted_default'
                  or t=='question_accepted_true' or t=='question_accepted_false'
                  or t=='question_env_answer'):
                pass

            else:
                self.logger.debug('Unrecognised log entry %s' % str(status))

            if callback:
                callback(status, errorlog=errormsg)

        self.logger.debug('Waiting for borg subprocess to terminate in log thread')

        self.borg_instance.wait()

        self.logger.debug('Borg subprocess terminated; terminating log listener thread')

    @protect_noreturn
    def __result_listener(self):
        self.logger.debug('Result listener thread waiting for result')

        res=self.borg_instance.read_result()

        self.logger.debug('Borg result: %s' % str(res))

        if res is None:
            with self._cond:
                # Prune gives absolutely no result, so don't complain
                if (self.current_operation.ok() and
                    self.current_operation.type!=Operation.PRUNE):
                    self.logger.error('No result from borg despite no error in log')
                    self.current_operation.add_error(Errors.ERRORS)
        elif self.current_operation.type==Operation.LIST:
            self.__handle_list_result(res)

        # All other results are discarded

    def __handle_list_result(self, res):
            ok=True
            latest=None
            if 'archives' in res and isinstance(res['archives'], list):
                archives=res['archives']
                for a in archives:
                    if not isinstance(a, dict):
                        self.logger.error('Borg archive list entry not a dictionary')
                        ok=False
                    else:
                        thistime, this_ok=get_archive_time(a, self.logger)
                        if thistime and (not latest or thistime>latest):
                            latest=thistime
                        ok=ok and this_ok
            else:
                logger.error('Borg archive list missing "archives" entry')

            if not ok:
                with self._cond:
                    self.current_operation.add_error(Errors.ERRORS)

            if latest:
                self.logger.info('borg info: Previous backup was on %s' % latest.isoformat())
                when=MonotonicTime.from_realtime(time.mktime(latest.timetuple()))
                op=Operation(Operation.CREATE, when, reason='listed')
                op.finish_time=when
                with self._cond:
                    self.previous_operation_of_type[Operation.CREATE]=op
            else:
                self.logger.info('borg info: Could not discover a previous backup')

    def __do_launch(self, op, archive_or_repository,
                    common_params, op_params, paths=[]):

        self.logger.debug('Creating BorgInstance')

        inst=BorgInstance(op.type, archive_or_repository,
                          common_params, op_params, paths)

        self.logger.debug('Launching BorgInstance via repository')

        # Only the Repository object has access to the passphrase
        self.repository.launch_borg_instance(inst)

        self.logger.debug('Creating listener threads')

        t_log=Thread(target=self.__log_listener)
        t_log.daemon=True

        t_res=Thread(target=self.__result_listener)
        t_res.daemon=True

        self.thread_log=t_log
        self.thread_res=t_res
        self.borg_instance=inst
        self.current_operation=op
        # Update scheduled time to real starting time to schedule
        # next run relative to this
        self.current_operation.start_time=MonotonicTime.now()
        # Reset error status when starting a new operation
        self.__update_status(State.ACTIVE)

        t_log.start()
        t_res.start()


    def __launch(self, op):
        self.logger.debug("Launching '%s'" % str(op.type))

        params=(config.borg_parameters
                +self.repository.borg_parameters
                +self.borg_parameters)

        if op.type==Operation.CREATE:
            archive="%s::%s%s" % (self.repository.location,
                                  self.archive_prefix,
                                  self.archive_template)

            self.__do_launch(op, archive, params.common,
                             params.create, self.paths)
        elif op.type==Operation.PRUNE:
            self.__do_launch(op, self.repository.location, params.common,
                             [{'prefix': self.archive_prefix}] + params.prune)

        elif op.type==Operation.LIST:
            self.__do_launch(op, self.repository.location, params.common,
                             [{'prefix': self.archive_prefix}])
        else:
            raise NotImplementedError("Invalid operation '%s'" % str(op.type))

    # This must be called with self._cond held.
    def __launch_and_wait(self):
        op=self.scheduled_operation
        if not op:
            self.logger.debug("Queued operation aborted")
        else:
            self.scheduled_operation=None

            self.__launch(op)

            self.__wait_finish()

    def _terminate_or_pause(self):
        return self._terminate or self._pause

    def __wait_finish(self):
        current=self.current_operation

        # Wait for main logger thread to terminate, or for us to be terminated
        while not self._terminate_or_pause() and self.thread_res.is_alive():
            self._cond.release()
            self.thread_res.join(JOIN_TIMEOUT)
            self._cond.acquire()

        # If terminate or pause has been signalled, let outer termination handler
        # take care of things (Within this Backup class, it would be cleanest
        # to raise an exception instead, but in most other places it's better
        # to just check self._terminate, so we don't complicate things with
        # an extra exception.)
        if self.thread_res.is_alive():
            return

        self.logger.debug('Waiting for borg and log subprocesses to terminate')

        self._cond.release()
        self.thread_log.join()
        self._cond.acquire()

        if not self.borg_instance.wait():
            self.logger.error('Borg subprocess did not terminate')
            curent.add_error(Errors.ERRORS)

        current.finish_time=MonotonicTime.now()

        self.previous_operation_of_type[current.type]=current
        self.previous_operation=current
        self.current_operation=None
        self.thread_res=None
        self.thread_log=None
        self.borg_instance=None

    @protect_noreturn
    def __main_thread(self):
        with self._cond:
            while not self._terminate:
                try:
                    assert(not self.current_operation)
                    self.__main_thread_wait_schedule()
                    if ((not self._terminate_or_pause()) and self.scheduled_operation
                        and self.scheduled_operation.start_time <= MonotonicTime.now()):
                        self.__main_thread_queue_and_launch()
                except Exception as err:
                    self.logger.exception("Exception in backup '%s'" % self.backup_name)
                finally:
                    self.__cleanup()

    def __cleanup(self):
        thread_log=self.thread_log
        thread_res=self.thread_res
        borg_instance=self.borg_instance
        self.scheduled_operation=None
        self.thread_log=None
        self.thread_res=None
        self.borg_instance=None

        self._cond.release()
        try:
            if borg_instance:
                self.logger.debug("Terminating a borg instance")
                borg_instance.terminate()

            if thread_log:
                self.logger.debug("Waiting for log thread to terminate")
                thread_log.join()

            if thread_res:
                self.logger.debug("Waiting for result thread to terminate")
                thread_res.join()
        finally:
            self._cond.acquire()
            self.current_operation=None

    # Main thread/2. Schedule next operation if there is no manually
    # requested one
    def __main_thread_wait_schedule(self):
        op=None
        if self._pause:
            self.logger.info("Waiting for resume to be signalled")

            self.__update_status(State.PAUSED)

            self._cond.wait()
        else:
            if not self.scheduled_operation:
                op=self.__next_operation_unlocked()
            if op:
                self.scheduled_operation=op

                self.__update_status(State.SCHEDULED)

                # Wait under scheduled wait
                eventname=op.name() + '@' + self.backup_name
                self.scheduler.wait_until(op.start_time, self._cond, eventname)
            else:
                # Nothing scheduled - just wait
                self.logger.info("Waiting for manual scheduling")

                self.__update_status(State.INACTIVE)

                self._cond.wait()

    # Main thread/3. If there is a scheduled operation (it might have been
    # changed manually from 'op' created in __main_thread_wait_schedule above),
    # queue it on the repository, and launch the operation once repository
    # available
    def __main_thread_queue_and_launch(self):
        if self.scheduled_operation:

            self.__update_status(State.QUEUED)

            res=self.repository.queue_action(self._cond,
                                             action=self.__launch_and_wait,
                                             name=self.backup_name)
            if not res:
                self.logger.debug("Queueing aborted")

    def __next_operation_unlocked(self):
        listop=self.__next_operation_list()
        if listop:
            return listop

        create=self.__next_operation_type(Operation.CREATE,
                                          self.backup_interval,
                                          important=True,
                                          initial_reason='initial');

        prune=self.__next_operation_type(Operation.PRUNE,
                                         self.prune_interval,
                                         important=False,
                                         initial_reason=None);

        if not prune:
            return create
        elif not create:
            return prune
        elif create.start_time < prune.start_time:
            return create
        else:
            return prune

    def __next_operation_list(self):
        reason='initial'
        # Unless manual backup has been chosen (backup_interval<=0), perform 
        # repository listing if no previous create operation known, or if we
        # just pruned the repository
        if self.backup_interval<=0:
            return None
        elif (self.previous_operation and
              self.previous_operation.type==Operation.PRUNE):
            tm=MonotonicTime.now()
            reason='post-prune'
        elif Operation.LIST in self.previous_operation_of_type:
            prev=self.previous_operation_of_type[Operation.LIST]
            if prev.ok():
                return None
            if self.retry_interval<=0:
                # Do not retry in case of errors if retry interval is <= 0
                return None
            # Attempt after retry interval
            tm=MonotonicTime.after_other(prev.finish_time, self.retry_interval)
        else:
            # Nothing has been attempted: run immediately
            tm=MonotonicTime.now()
        return Operation(Operation.LIST, tm, reason=reason)

    def __next_operation_type(self, optype, standard_interval,
                              important=False,
                              initial_reason=None):
        if optype not in self.previous_operation_of_type:
            # No previous operation exists; perform immediately
            # if important, otherwise after standard interval.
            # Do not perform if manual operation selected by
            # setting standard_interval<=0
            if standard_interval<=0:
                return None
            else:
                if important:
                    tm=MonotonicTime.now()
                else:
                    tm=self.timeclass.after(standard_interval)
                if initial_reason:
                    return Operation(optype, tm, reason=initial_reason)
                else:
                    return Operation(optype, tm)
        else:
            # Previous operation has been performed; perform after
            # retry interval if there were errors, otherwise after
            # standard interval.
            prev=self.previous_operation_of_type[optype]
            if not prev.ok():
                # Do not retry in case of errors if retry interval is <= 0
                if self.retry_interval<=0:
                    return None
                tm=MonotonicTime.after_other(prev.start_time,
                                             self.retry_interval)
                return Operation(optype, tm, reason='retry')
            elif standard_interval>0:
                tm=self.timeclass.after_other(prev.start_time,
                                              standard_interval)
                return Operation(optype, tm)
            else:
                # Manual operation is standard_interval is zero.
                return None

    def __status_unlocked(self):
        callback=self.__status_update_callback
        status=Status(self)

        return status, callback

    def __update_status(self, state):
        self.logger.debug("Entering %s state", str(state))
        self.state=state
        status, callback = self.__status_unlocked()
        if callback:
            #self._cond.release()
            try:
                callback(status)
            except Exception:
                self.logger.exception("Status update error")
            #finally:
            #    self._cond.acquire()

    #
    # Interface functions
    #

    def set_status_update_callback(self, callback):
        with self._cond:
            self.__status_update_callback=callback

    def status(self):
        with self._cond:
            res=self.__status_unlocked()
        return res[0]

    def create(self):
        op=Operation(Operation.CREATE, MonotonicTime.now(), reason='manual')
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    def prune(self):
        op=Operation(Operation.PRUNE, MonotonicTime.now(), reason='manual')
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
    def abort(self):
        with self._cond:
            if self.borg_instance:
                self.borg_instance.terminate()

    def is_paused(self):
        with self._cond:
            paused=(self.state==State.PAUSED)
        return paused

    def pause(self):
        with self._cond:
            self.logger.debug('Pause signalled')
            self.scheduled_operation=None
            self._pause=True
            self._cond.notify()

    def resume(self):
        with self._cond:
            self.logger.debug('Resume signalled')
            self._pause=False
            self._cond.notify()

mercurial