backup.py

Fri, 26 Jan 2018 10:31:08 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Fri, 26 Jan 2018 10:31:08 +0000
changeset 71
a8a5ebb64e02
parent 70
3f794760d52e
child 74
4f56142e7497
permissions
-rw-r--r--

Changed retry timing to start form end of previous attempt instead of beginning

#
# Borgend Backup instance
#

import config
import logging
import time
import keyring
import borgend
import repository
from enum import IntEnum
from instance import BorgInstance
from threading import Thread, Lock, Condition
from scheduler import TerminableThread

logger=borgend.logger.getChild(__name__)

JOIN_TIMEOUT=60

#
# State and operation related helper classes
#

class State(IntEnum):
    # State
    INACTIVE=0
    SCHEDULED=1
    QUEUED=2
    ACTIVE=3


class Errors(IntEnum):
    OK=0
    BUSY=1
    OFFLINE=2
    ERRORS=3

    def combine(self, other):
        return max(self, other)

    def ok(self):
        return self==self.OK

    def __str__(self):
        return _errorstring[self]

_errorstring={
    Errors.OK: 'ok',
    Errors.BUSY: 'busy',
    Errors.OFFLINE: 'offline',
    Errors.ERRORS: 'errors'
}

class Operation:
    CREATE='create'
    PRUNE='prune'
    def __init__(self, operation, when_monotonic, **kwargs):
        self.operation=operation
        self.when_monotonic=when_monotonic
        self.detail=kwargs

    def when(self):
        return self.when_monotonic-time.monotonic()+time.time()


class Status(Operation):
    def __init__(self, backup, op=None):
        if op:
            super().__init__(op.operation, op.when_monotonic,
                             **op.detail)
        else:
            super().__init__(None, None)

        self.name=backup.name
        self.state=backup.state
        self.errors=backup.errors

#
# Miscellaneous helper routines
#

loglevel_translation={
    'CRITICAL': logging.CRITICAL,
    'ERROR': logging.ERROR,
    'WARNING': logging.WARNING,
    'DEBUG': logging.DEBUG,
    'INFO': logging.INFO
}

def translate_loglevel(x):
    if x in loglevel_translation:
        return loglevel_translation[x]
    else:
        return logging.ERROR

def safe_get_int(t, x):
    if x in t:
        tmp=t[x]
        if isinstance(tmp, int):
            return tmp
    return None

#
# The Backup class
#

class Backup(TerminableThread):

    def __decode_config(self, cfg):
        loc0='backup target %d' % self.identifier

        self._name=config.check_string(cfg, 'name', 'Name', loc0)

        self.logger=logger.getChild(self._name)

        self.loc='backup target "%s"' % self._name

        reponame=config.check_string(cfg, 'repository',
                                     'Target repository', self.loc)

        self.repository=repository.get_controller(reponame)

        self.archive_prefix=config.check_string(cfg, 'archive_prefix',
                                                'Archive prefix', self.loc)

        self.archive_template=config.check_string(cfg, 'archive_template',
                                                  'Archive template', self.loc)

        self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval',
                                                     'Backup interval', self.loc,
                                                     config.defaults['backup_interval'])

        self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
                                                    'Retry interval', self.loc,
                                                    config.defaults['retry_interval'])

        self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', self.loc)

        self.common_parameters=config.check_list_of_dicts(cfg, 'common_parameters',
                                                         'Borg parameters', self.loc,
                                                         default=[])

        self.create_parameters=config.check_list_of_dicts(cfg, 'create_parameters',
                                                         'Create parameters', self.loc,
                                                         default=[])

        self.prune_parameters=config.check_list_of_dicts(cfg, 'prune_parameters',
                                                         'Prune parameters', self.loc,
                                                         default=[])

        self.__keychain_account=config.check_string(cfg, 'keychain_account',
                                                    'Keychain account', self.loc,
                                                    default='')

        self.__passphrase=None

        if config.settings['extract_passphrases_at_startup']:
            try:
                self.extract_passphrase()
            except Exception:
                pass

    def extract_passphrase(self):
        acc=self.__keychain_account
        if not self.__passphrase:
            if acc and acc!='':
                self.logger.debug('Requesting passphrase')
                try:
                    pw=keyring.get_password("borg-backup", acc)
                except Exception as err:
                    self.logger.error('Failed to retrieve passphrase')
                    raise err
                else:
                    self.logger.debug('Received passphrase')
                self.__passphrase=pw
            else:
                self.__passphrase=None
        return self.__passphrase

    def __init__(self, identifier, cfg, scheduler):
        self.identifier=identifier
        self.config=config
        self.__status_update_callback=None
        self.scheduler=scheduler
        self.logger=None # setup up __decode_config once backup name is known

        self.borg_instance=None
        self.thread_log=None
        self.thread_res=None

        self.current_operation=None
        self.scheduled_operation=None
        self.lastrun_when=None
        self.lastrun_finished=None
        self.state=State.INACTIVE
        self.errors=Errors.OK

        self.__decode_config(cfg)

        super().__init__(target = self.__main_thread, name = self._name)
        self.daemon=True

    def is_running(self):
        with self._cond:
            running=self.__is_running_unlocked()
        return running

    def __is_running_unlocked(self):
        running=self.current_operation
        if not running:
            # Consistency check
            assert((not self.borg_instance and not self.thread_log and
                    not self.thread_res))
        return running

    def __block_when_running(self):
        running=self.is_running()
        assert(not running)

    def __log_listener(self):
        self.logger.debug('Log listener thread waiting for entries')
        success=True
        for msg in iter(self.borg_instance.read_log, None):
            self.logger.info(str(msg))
            t=msg['type']

            errormsg=None
            callback=None

            if t=='progress_percent':
                current=safe_get_int(msg, 'current')
                total=safe_get_int(msg, 'total')
                if current is not None and total is not None:
                    with self._cond:
                        self.current_operation.detail['progress_current']=current
                        self.current_operation.detail['progress_total']=total
                        status, callback=self.__status_unlocked()

            elif t=='archive_progress':
                original_size=safe_get_int(msg, 'original_size')
                compressed_size=safe_get_int(msg, 'compressed_size')
                deduplicated_size=safe_get_int(msg, 'deduplicated_size')
                if original_size is not None and original_size is not None and deduplicated_size is not None:
                    with self._cond:
                        self.current_operation.detail['original_size']=original_size
                        self.current_operation.detail['compressed_size']=compressed_size
                        self.current_operation.detail['deduplicated_size']=deduplicated_size
                        status, callback=self.__status_unlocked()

            elif t=='progress_message':
                pass

            elif t=='file_status':
                pass

            elif t=='log_message':
                if 'levelname' not in msg:
                    msg['levelname']='ERROR'
                if 'message' not in msg:
                    msg['message']='UNKNOWN'
                if 'name' not in msg:
                    msg['name']='borg'
                lvl=translate_loglevel(msg['levelname'])
                self.logger.log(lvl, msg['name'] + ': ' + msg['message'])
                if lvl>=logging.ERROR:
                    errormsg=msg
                    errors=Errors.ERRORS
                    if ('msgid' in msg and
                        (msg['msgid']=='LockTimeout' or # observed in reality
                         msg['msgid']=='LockErrorT' or # in docs
                         msg['msgid']=='LockErrorT')): # in docs
                        errors=Errors.BUSY
                    with self._cond:
                        self.errors=self.errors.combine(errors)
                        status, callback=self.__status_unlocked()
            else:
                self.logger.debug('Unrecognised log entry %s' % str(status))

            if callback:
                callback(status, errors=errormsg)

        self.logger.debug('Waiting for borg subprocess to terminate in log thread')

        self.borg_instance.wait()

        self.logger.debug('Borg subprocess terminated; terminating log listener thread')

    def __result_listener(self):
        self.logger.debug('Result listener thread waiting for result')

        res=self.borg_instance.read_result()

        self.logger.debug('Borg result: %s' % str(res))

        with self._cond:
            if res is None:
                self.logger.error('No result from borg despite no error in log')
                if errors.ok():
                    self.errors=self.errors.combine(Errors.ERRORS)


    def __do_launch(self, op, archive_or_repository, *args):
        passphrase=self.extract_passphrase()

        inst=BorgInstance(op.operation, archive_or_repository, *args)
        inst.launch(passphrase=passphrase)

        self.logger.debug('Creating listener threads')

        t_log=Thread(target=self.__log_listener)
        t_log.daemon=True

        t_res=Thread(target=self.__result_listener)
        t_res.daemon=True

        self.thread_log=t_log
        self.thread_res=t_res
        self.borg_instance=inst
        self.current_operation=op
        # Update scheduled time to real starting time to schedule
        # next run relative to this
        self.current_operation.when_monotonic=time.monotonic()
        self.state=State.ACTIVE
        # Reset error status when starting a new operation
        self.errors=Errors.OK
        self.__update_status()

        t_log.start()
        t_res.start()

    def __launch(self, op):
        self.logger.debug("Launching '%s'" % str(op.operation))

        if op.operation==Operation.CREATE:
            archive="%s::%s%s" % (self.repository.repository_name,
                                  self.archive_prefix,
                                  self.archive_template)

            self.__do_launch(op, archive,
                             self.common_parameters+self.create_parameters,
                             self.paths)
        elif op.operation==Operation.PRUNE:
            self.__do_launch(op, self.repository.repository_name,
                             ([{'prefix': self.archive_prefix}] + 
                              self.common_parameters +
                              self.prune_parameters))
        else:
            raise NotImplementedError("Invalid operation '%s'" % str(op.operation))

    # This must be called with self._cond held.
    def __launch_and_wait(self):
        op=self.scheduled_operation
        if not op:
            self.logger.debug("Queued operation aborted")
        else:
            self.scheduled_operation=None

            self.__launch(op)

            self.__wait_finish()

    def __wait_finish(self):
        # Wait for main logger thread to terminate, or for us to be terminated
        while not self.terminate and self.thread_res.is_alive():
            self._cond.release()
            self.thread_res.join(JOIN_TIMEOUT)
            self._cond.acquire()

        # If terminate has been signalled, let outer termination handler
        # take care of things (Within this Backup class, it would be cleanest
        # to raise an exception instead, but in most other places it's better
        # to just check self._terminate, so we don't complicate things with
        # an extra exception.)
        if self._terminate:
            return

        self.logger.debug('Waiting for borg and log subprocesses to terminate')

        self._cond.release()
        self.thread_log.join()
        self._cond.acquire()

        if not self.borg_instance.wait():
            self.logger.error('Borg subprocess did not terminate')
            self.errors=self.errors.combine(Errors.ERRORS)

        if self.current_operation.operation=='create':
            self.lastrun_when=self.current_operation.when_monotonic
            self.lastrun_finished=time.monotonic()
        self.thread_res=None
        self.thread_log=None
        self.borg_instance=None
        self.current_operation=None
        self.state=State.INACTIVE
        self.__update_status()

    def __main_thread(self):
        with self._cond:
            try:
                while not self._terminate:
                    assert(not self.current_operation)
                    self.__main_thread_wait_schedule()
                    if not self._terminate:
                        self.__main_thread_queue_and_launch()
            except Exception as err:
                self.logger.exception("Error with backup '%s'" % self._name)
                self.errors=Errors.ERRORS

            self.state=State.INACTIVE
            self.scheduled_operation=None

            # Clean up to terminate: kill borg instance and communication threads
            if self.borg_instance:
                self.logger.debug("Terminating a borg instance")
                self.borg_instance.terminate()

            # Store threads to use outside lock
            thread_log=self.thread_log
            thread_res=self.thread_res
            self.thread_log=None
            self.thread_res=None

        self.logger.debug("Waiting for log and result threads to terminate")

        if thread_log:
            thread_log.join()

        if thread_res:
            thread_res.join()

    # Main thread/2. Schedule next operation if there is no manually
    # requested one
    def __main_thread_wait_schedule(self):
        op=None
        if not self.scheduled_operation:
            op=self.__next_operation_unlocked()
        if op:
            now=time.monotonic()
            delay=max(0, op.when_monotonic-now)
            self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" %
                             (str(op.operation), op.detail or 'none', delay))

            self.scheduled_operation=op
            self.state=State.SCHEDULED
            self.__update_status()

            # Wait under scheduled wait
            self.scheduler.wait_until(now+delay, self._cond, self._name)
        else:
            # Nothing scheduled - just wait
            self.logger.info("Waiting for manual scheduling")

            self.state=State.INACTIVE
            self.__update_status()

            self._cond.wait()

    # Main thread/3. If there is a scheduled operation (it might have been
    # changed manually from 'op' created in __main_thread_wait_schedule above),
    # queue it on the repository, and launch the operation once repository
    # available
    def __main_thread_queue_and_launch(self):
        if self.scheduled_operation:
            self.logger.debug("Queuing")
            self.state=State.QUEUED
            self.__update_status()
            res=self.repository.queue_action(self._cond,
                                             action=self.__launch_and_wait,
                                             name=self._name)
            if not res and not self._terminate:
                self.logger.debug("Queueing aborted")
                self.scheduled_operation=None
                self.state=State.INACTIVE
                self.__update_status()

    def __next_operation_unlocked(self):
        # TODO: pruning as well
        now=time.monotonic()
        if not self.lastrun_finished:
            initial_interval=self.retry_interval
            if initial_interval==0:
                initial_interval=self.backup_interval
            if initial_interval==0:
                return None
            else:
                return Operation(Operation.CREATE, now+initial_interval,
                                 reason='initial')
        elif not self.errors.ok():
            if self.retry_interval==0:
                return None
            else:
                return Operation(Operation.CREATE,
                                 self.lastrun_finished+self.retry_interval,
                                 reason='retry')
        else:
            if self.backup_interval==0:
                return None
            else:
                return Operation(Operation.CREATE,
                                 self.lastrun_when+self.backup_interval)

    def __status_unlocked(self):
        callback=self.__status_update_callback

        if self.current_operation:
            status=Status(self, self.current_operation)
        elif self.scheduled_operation:
            status=Status(self, self.scheduled_operation)
        else:
            status=Status(self)

        return status, callback

    def __update_status(self):
        status, callback = self.__status_unlocked()
        if callback:
            #self._cond.release()
            try:
                callback(status)
            except Exception:
                self.logger.exception("Status update error")
            #finally:
            #    self._cond.acquire()

    #
    # Interface functions
    #

    def set_status_update_callback(self, callback):
        with self._cond:
            self.__status_update_callback=callback

    def status(self):
        with self._cond:
            res=self.__status_unlocked()
        return res[0]

    def create(self):
        op=Operation(Operation.CREATE, time.monotonic(), reason='manual')
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    def prune(self):
        op=Operation(Operation.PRUNE, time.monotonic(), reason='manual')
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
    def abort(self):
        with self._cond:
            if self.borg_instance:
                self.borg_instance.terminate()

mercurial