backup.py

Wed, 24 Jan 2018 20:18:45 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Wed, 24 Jan 2018 20:18:45 +0000
changeset 61
bc6c3d74e6ea
parent 58
170d69da51bb
child 62
b7d13b2ad67e
permissions
-rw-r--r--

Made combination error-state into an error-state matrix, as well as Enum

#
# Borgend Backup instance
#

import config
import logging
import time
import keyring
import borgend
import repository
from enum import IntEnum
from instance import BorgInstance
from threading import Thread, Lock, Condition
from scheduler import TerminableThread

logger=borgend.logger.getChild(__name__)

class State(IntEnum):
    # State
    INACTIVE=0
    SCHEDULED=1
    QUEUED=2
    ACTIVE=3


class Errors(IntEnum):
    OK=0
    BUSY=1
    OFFLINE=2
    ERRORS=3

    def combine(self, other):
        return max(self, other)

    def ok(self):
        return self==self.OK

    def __str__(self):
        return _errorstring[self]

_errorstring={
    Errors.OK: 'ok',
    Errors.BUSY: 'busy',
    Errors.OFFLINE: 'offline',
    Errors.ERRORS: 'errors'
}

def translate_loglevel(x):
    if x in loglevel_translation:
        return loglevel_translation[x]
    else:
        return logging.ERROR

def safe_get_int(t, x):
    if x in t:
        tmp=t[x]
        if isinstance(tmp, int):
            return tmp
    return None

loglevel_translation={
    'CRITICAL': logging.CRITICAL,
    'ERROR': logging.ERROR,
    'WARNING': logging.WARNING,
    'DEBUG': logging.DEBUG,
    'INFO': logging.INFO
}

class Backup(TerminableThread):

    def __decode_config(self, cfg):
        loc0='backup target %d' % self.identifier

        self._name=config.check_string(cfg, 'name', 'Name', loc0)

        self.logger=logger.getChild(self._name)

        self.loc='backup target "%s"' % self._name

        reponame=config.check_string(cfg, 'repository',
                                     'Target repository', self.loc)

        self.repository=repository.get_controller(reponame)

        self.archive_prefix=config.check_string(cfg, 'archive_prefix',
                                                'Archive prefix', self.loc)

        self.archive_template=config.check_string(cfg, 'archive_template',
                                                  'Archive template', self.loc)

        self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval',
                                                     'Backup interval', self.loc,
                                                     config.defaults['backup_interval'])

        self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
                                                    'Retry interval', self.loc,
                                                    config.defaults['retry_interval'])

        self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', self.loc)

        self.common_parameters=config.check_list_of_dicts(cfg, 'common_parameters',
                                                         'Borg parameters', self.loc,
                                                         default=[])

        self.create_parameters=config.check_list_of_dicts(cfg, 'create_parameters',
                                                         'Create parameters', self.loc,
                                                         default=[])

        self.prune_parameters=config.check_list_of_dicts(cfg, 'prune_parameters',
                                                         'Prune parameters', self.loc,
                                                         default=[])

        self.__keychain_account=config.check_string(cfg, 'keychain_account',
                                                    'Keychain account', self.loc,
                                                    default='')

        self.__passphrase=None

        if config.settings['extract_passphrases_at_startup']:
            try:
                self.extract_passphrase()
            except Exception:
                pass

    def extract_passphrase(self):
        acc=self.__keychain_account
        if not self.__passphrase:
            if acc and acc!='':
                self.logger.debug('Requesting passphrase')
                try:
                    pw=keyring.get_password("borg-backup", acc)
                except Exception as err:
                    self.logger.error('Failed to retrieve passphrase')
                    raise err
                else:
                    self.logger.debug('Received passphrase')
                self.__passphrase=pw
            else:
                self.__passphrase=None
        return self.__passphrase

    def __init__(self, identifier, cfg, scheduler):
        self.identifier=identifier
        self.config=config
        self.__status_update_callback=None
        self.scheduler=scheduler
        self.logger=None # setup up __decode_config once backup name is known

        self.borg_instance=None
        self.thread_log=None
        self.thread_res=None

        self.current_operation=None
        self.scheduled_operation=None
        self.lastrun_when=None
        self.state=State.INACTIVE
        self.errors=Errors.OK

        self.__decode_config(cfg)

        super().__init__(target = self.__main_thread, name = self._name)
        self.daemon=True

    def is_running(self):
        with self._cond:
            running=self.__is_running_unlocked()
        return running

    def __is_running_unlocked(self):
        running=self.current_operation
        if not running:
            # Consistency check
            assert((not self.borg_instance and not self.thread_log and
                    not self.thread_res))
        return running

    def __block_when_running(self):
        running=self.is_running()
        assert(not running)

    def __log_listener(self):
        self.logger.debug('Log listener thread waiting for entries')
        success=True
        for msg in iter(self.borg_instance.read_log, None):
            self.logger.debug(str(msg))
            t=msg['type']

            errormsg=None
            callback=None

            if t=='progress_percent':
                current=safe_get_int(msg, 'current')
                total=safe_get_int(msg, 'total')
                if current is not None and total is not None:
                    with self._cond:
                        self.current_operation['progress_current']=current
                        self.current_operation['progress_total']=total
                        status, callback=self.__status_unlocked()

            elif t=='archive_progress':
                original_size=safe_get_int(msg, 'original_size')
                compressed_size=safe_get_int(msg, 'compressed_size')
                deduplicated_size=safe_get_int(msg, 'deduplicated_size')
                if original_size is not None and original_size is not None and deduplicated_size is not None:
                    with self._cond:
                        self.current_operation['original_size']=original_size
                        self.current_operation['compressed_size']=compressed_size
                        self.current_operation['deduplicated_size']=deduplicated_size
                        status, callback=self.__status_unlocked()

            elif t=='progress_message':
                pass

            elif t=='file_status':
                pass

            elif t=='log_message':
                if 'levelname' not in msg:
                    msg['levelname']='ERROR'
                if 'message' not in msg:
                    msg['message']='UNKNOWN'
                if 'name' not in msg:
                    msg['name']='borg'
                lvl=translate_loglevel(msg['levelname'])
                self.logger.log(lvl, msg['name'] + ': ' + msg['message'])
                if lvl>=logging.WARNING:
                    errormsg=msg
                    errors=Errors.ERRORS
                    if ('msgid' in msg and
                        (msg['msgid']=='LockTimeout' or # observed in reality
                         msg['msgid']=='LockErrorT' or # in docs
                         msg['msgid']=='LockErrorT')): # in docs
                        errors=Errors.BUSY
                    with self._cond:
                        self.errors=self.errors.combine(errors)
                        status, callback=self.__status_unlocked()
            else:
                self.logger.debug('Unrecognised log entry %s' % str(status))

            if callback:
                callback(status, errors=errormsg)

        self.logger.debug('Waiting for borg subprocess to terminate in log thread')

        self.borg_instance.wait()

        self.logger.debug('Borg subprocess terminated; terminating log listener thread')

    def __result_listener(self):
        self.logger.debug('Result listener thread waiting for result')

        res=self.borg_instance.read_result()

        # Finish processing remaining errors
        self.thread_log.join()

        with self._cond:
            errors=self.errors

        self.logger.debug('Borg result: %s' % str(res))

        if res is None:
            self.logger.error('No result from borg despite no error in log')
            if errors.ok():
                errors=Errors.ERRORS

        self.logger.debug('Waiting for borg subprocess to terminate in result thread')

        if not self.borg_instance.wait():
            self.logger.error('Borg subprocess did not terminate')
            if errors.ok():
                errors=Errors.ERRORS

        self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors))

        with self._cond:
            if self.current_operation['operation']=='create':
                self.lastrun_when=self.current_operation['when_monotonic']
            self.thread_res=None
            self.thread_log=None
            self.borg_instance=None
            self.current_operation=None
            self.state=State.INACTIVE
            self.errors=errors
            self.__update_status()
            self._cond.notify()

    def __do_launch(self, op, archive_or_repository, *args):
        passphrase=self.extract_passphrase()

        inst=BorgInstance(op['operation'], archive_or_repository, *args)
        inst.launch(passphrase=passphrase)

        self.logger.debug('Creating listener threads')

        t_log=Thread(target=self.__log_listener)
        t_log.daemon=True

        t_res=Thread(target=self.__result_listener)
        t_res.daemon=True

        self.thread_log=t_log
        self.thread_res=t_res
        self.borg_instance=inst

        t_log.start()
        t_res.start()

    def __launch(self, op):
        self.logger.debug("Launching '%s'" % op['operation'])

        if op['operation']=='create':
            archive="%s::%s%s" % (self.repository.repository_name,
                                  self.archive_prefix,
                                  self.archive_template)

            self.__do_launch(op, archive,
                             self.common_parameters+self.create_parameters,
                             self.paths)
        elif op['operation']=='prune':
            self.__do_launch(op, self.repository.repository_name,
                             ([{'prefix': self.archive_prefix}] + 
                              self.common_parameters +
                              self.prune_parameters))
        else:
            raise NotImplementedError("Invalid operation '%s'" % op['operation'])

    # This must be called with self._cond held.
    def __launch_check(self):
        op=self.scheduled_operation
        if not op:
            self.logger.debug("Queued operation aborted")
        else:
            self.scheduled_operation=None

            self.__launch(op)

            self.current_operation=op
            self.current_operation['when_monotonic']=time.monotonic()
            self.state=State.ACTIVE
            # Reset error status when starting a new operation
            self.errors=Errors.OK
            self.__update_status()


    def __main_thread(self):
        with self._cond:
            try:
                while not self._terminate:
                    self.__main_thread_wait_finish()
                    if not self._terminate:
                        self.__main_thread_wait_schedule()
                        if not self._terminate:
                            self.__main_thread_queue_and_launch()
            except Exception as err:
                self.logger.exception("Error with backup '%s'" % self._name)
                self.errors=Errors.ERRORS

            self.state=State.INACTIVE
            self.scheduled_operation=None

            # Clean up to terminate: kill borg instance and communication threads
            if self.borg_instance:
                self.logger.debug("Terminating a borg instance")
                self.borg_instance.terminate()

            # Store threads to use outside lock
            thread_log=self.thread_log
            thread_res=self.thread_res
            self.thread_log=None
            self.thread_res=None

        self.logger.debug("Waiting for log and result threads to terminate")

        if thread_log:
            thread_log.join()

        if thread_res:
            thread_res.join()


    # Main thread/1. Wait while a current operation is running
    def __main_thread_wait_finish(self):
        while self.current_operation and not self._terminate:
            self.logger.debug("Waiting for current operation to finish")
            self._cond.wait()

    # Main thread/2. Schedule next operation if there is no manually
    # requested one
    def __main_thread_wait_schedule(self):
        op=None
        if not self.scheduled_operation:
            op=self.__next_operation_unlocked()
        if op:
            now=time.monotonic()
            delay=max(0, op['when_monotonic']-now)
            self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" %
                             (op['operation'], op['detail'],  delay))

            self.scheduled_operation=op
            self.state=State.SCHEDULED
            self.__update_status()

            # Wait under scheduled wait
            self.scheduler.wait_until(now+delay, self._cond, self._name)
        else:
            # Nothing scheduled - just wait
            self.logger.info("Waiting for manual scheduling")

            self.state=State.INACTIVE
            self.__update_status()

            self._cond.wait()

    # Main thread/3. If there is a scheduled operation (it might have been
    # changed manually from 'op' created in __main_thread_wait_schedule above),
    # queue it on the repository, and launch the operation once repository
    # available
    def __main_thread_queue_and_launch(self):
        if self.scheduled_operation:
            self.logger.debug("Queuing")
            self.state=State.QUEUED
            self.__update_status()
            self.repository.queue_action(self._cond,
                                         action=self.__launch_check,
                                         name=self._name)

    def __next_operation_unlocked(self):
        # TODO: pruning as well
        now=time.monotonic()
        if not self.lastrun_when:
            initial_interval=self.retry_interval
            if initial_interval==0:
                initial_interval=self.backup_interval
            if initial_interval==0:
                return None
            else:
                return {'operation': 'create',
                        'detail': 'initial',
                        'when_monotonic': now+initial_interval}
        elif not self.errors.ok():
            if self.retry_interval==0:
                return None
            else:
                return {'operation': 'create',
                        'detail': 'retry',
                        'when_monotonic': self.lastrun_when+self.retry_interval}
        else:
            if self.backup_interval==0:
                return None
            else:
                return {'operation': 'create',
                        'detail': 'normal',
                        'when_monotonic': self.lastrun_when+self.backup_interval}

    def __status_unlocked(self):
        callback=self.__status_update_callback

        if self.current_operation:
            status=self.current_operation
        elif self.scheduled_operation:
            status=self.scheduled_operation
        else:
            status={'type': 'nothing'}

        status['name']=self._name
        status['state']=self.state
        status['errors']=self.errors

        if 'detail' not in status:
            status['detail']='NONE'

        if 'when_monotonic' in status:
            status['when']=(status['when_monotonic']
                            -time.monotonic()+time.time())

        return status, callback

    def __update_status(self):
        status, callback = self.__status_unlocked()
        if callback:
            #self._cond.release()
            try:
                callback(status)
            except Exception:
                self.logger.exception("Status update error")
            #finally:
            #    self._cond.acquire()

    #
    # Interface functions
    #

    def set_status_update_callback(self, callback):
        with self._cond:
            self.__status_update_callback=callback

    def status(self):
        with self._cond:
            res=self.__status_unlocked()
        return res[0]

    def create(self):
        op={'operation': 'create', 'detail': 'manual'}
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    def prune(self):
        op={'operation': 'prune', 'detail': 'manual'}
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
    def abort(self):
        with self._cond:
            if self.borg_instance:
                self.borg_instance.terminate()

mercurial