backup.py

Tue, 23 Jan 2018 22:53:51 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Tue, 23 Jan 2018 22:53:51 +0000
changeset 55
407af23d16bb
parent 54
cfcaa5f6ba33
child 58
170d69da51bb
permissions
-rw-r--r--

Improved backup main thread loop, etc.

#
# Borgend Backup instance
#

import config
import logging
import time
import keyring
import borgend
import repository
from instance import BorgInstance
from threading import Thread, Lock, Condition
from scheduler import TerminableThread

logger=borgend.logger.getChild(__name__)

# State
INACTIVE=0
SCHEDULED=1
QUEUED=2
ACTIVE=3
BUSY=4
OFFLINE=5
ERRORS=6

def combine_state(state1, state2):
    return max(state1, state2)

loglevel_translation={
    'CRITICAL': logging.CRITICAL,
    'ERROR': logging.ERROR,
    'WARNING': logging.WARNING,
    'DEBUG': logging.DEBUG,
    'INFO': logging.INFO
}

def translate_loglevel(x):
    if x in loglevel_translation:
        return loglevel_translation[x]
    else:
        return logging.ERROR

def safe_get_int(t, x):
    if x in t:
        tmp=t[x]
        if isinstance(tmp, int):
            return tmp
    return None


class Backup(TerminableThread):

    def __decode_config(self, cfg):
        loc0='backup target %d' % self.identifier

        self._name=config.check_string(cfg, 'name', 'Name', loc0)

        self.logger=logger.getChild(self._name)

        self.loc='backup target "%s"' % self._name

        reponame=config.check_string(cfg, 'repository',
                                     'Target repository', self.loc)

        self.repository=repository.get_controller(reponame)

        self.archive_prefix=config.check_string(cfg, 'archive_prefix',
                                                'Archive prefix', self.loc)

        self.archive_template=config.check_string(cfg, 'archive_template',
                                                  'Archive template', self.loc)

        self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval',
                                                     'Backup interval', self.loc,
                                                     config.defaults['backup_interval'])

        self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
                                                    'Retry interval', self.loc,
                                                    config.defaults['retry_interval'])

        self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', self.loc)

        self.common_parameters=config.check_list_of_dicts(cfg, 'common_parameters',
                                                         'Borg parameters', self.loc,
                                                         default=[])

        self.create_parameters=config.check_list_of_dicts(cfg, 'create_parameters',
                                                         'Create parameters', self.loc,
                                                         default=[])

        self.prune_parameters=config.check_list_of_dicts(cfg, 'prune_parameters',
                                                         'Prune parameters', self.loc,
                                                         default=[])

        self.__keychain_account=config.check_string(cfg, 'keychain_account',
                                                    'Keychain account', self.loc,
                                                    default='')

        self.__passphrase=None

        if config.settings['extract_passphrases_at_startup']:
            try:
                self.extract_passphrase()
            except Exception:
                pass

    def extract_passphrase(self):
        acc=self.__keychain_account
        if not self.__passphrase:
            if acc and acc!='':
                self.logger.debug('Requesting passphrase')
                try:
                    pw=keyring.get_password("borg-backup", acc)
                except Exception as err:
                    self.logger.error('Failed to retrieve passphrase')
                    raise err
                else:
                    self.logger.debug('Received passphrase')
                self.__passphrase=pw
            else:
                self.__passphrase=None
        return self.__passphrase

    def __init__(self, identifier, cfg, scheduler):
        self.identifier=identifier
        self.config=config
        self.__status_update_callback=None
        self.scheduler=scheduler
        self.logger=None # setup up __decode_config once backup name is known

        self.borg_instance=None
        self.thread_log=None
        self.thread_res=None

        self.current_operation=None
        self.scheduled_operation=None
        self.lastrun_when=None
        self.state=INACTIVE

        self.__decode_config(cfg)

        super().__init__(target = self.__main_thread, name = self._name)
        self.daemon=True

    def is_running(self):
        with self._cond:
            running=self.__is_running_unlocked()
        return running

    def __is_running_unlocked(self):
        running=self.current_operation
        if not running:
            # Consistency check
            assert((not self.borg_instance and not self.thread_log and
                    not self.thread_res))
        return running

    def __block_when_running(self):
        running=self.is_running()
        assert(not running)

    def __log_listener(self):
        self.logger.debug('Log listener thread waiting for entries')
        success=True
        for status in iter(self.borg_instance.read_log, None):
            self.logger.debug(str(status))
            t=status['type']

            errors_this_message=None
            callback=None

            if t=='progress_percent':
                current=safe_get_int(status, 'current')
                total=safe_get_int(status, 'total')
                if current is not None and total is not None:
                    with self._cond:
                        self.current_operation['progress_current']=current
                        self.current_operation['progress_total']=total
                        status, callback=self.__status_unlocked()

            elif t=='archive_progress':
                original_size=safe_get_int(status, 'original_size')
                compressed_size=safe_get_int(status, 'compressed_size')
                deduplicated_size=safe_get_int(status, 'deduplicated_size')
                if original_size is not None and original_size is not None and deduplicated_size is not None:
                    with self._cond:
                        self.current_operation['original_size']=original_size
                        self.current_operation['compressed_size']=compressed_size
                        self.current_operation['deduplicated_size']=deduplicated_size
                        status, callback=self.__status_unlocked()

            elif t=='progress_message':
                pass

            elif t=='file_status':
                pass

            elif t=='log_message':
                if 'levelname' not in status:
                    status['levelname']='ERROR'
                if 'message' not in status:
                    status['message']='UNKNOWN'
                if 'name' not in status:
                    status['name']='borg'
                lvl=translate_loglevel(status['levelname'])
                self.logger.log(lvl, status['name'] + ': ' + status['message'])
                if lvl>=logging.WARNING:
                    errors_this_message=status
                    state=ERRORS
                    if ('msgid' in status and
                        (status['msgid']=='LockTimeout' or # observed in reality
                         status['msgid']=='LockErrorT' or # in docs
                         status['msgid']=='LockErrorT')): # in docs
                        state=BUSY
                    with self._cond:
                        self.state=combine_state(self.state, state)
                        status, callback=self.__status_unlocked()
            else:
                self.logger.debug('Unrecognised log entry %s' % str(status))

            if callback:
                callback(status, errors=errors_this_message)

        self.logger.debug('Waiting for borg subprocess to terminate in log thread')

        self.borg_instance.wait()

        self.logger.debug('Borg subprocess terminated; terminating log listener thread')

    def __result_listener(self):
        # self.state=ACTIVE
        # with self._cond:
        #     status, callback=self.__status_unlocked()
        # if callback:
        #     callback(status)

        self.logger.debug('Result listener thread waiting for result')

        res=self.borg_instance.read_result()

        # Finish processing remaining errors
        self.thread_log.join()

        with self._cond:
            state=self.state

        # If there were no errors, reset back to INACTIVE state
        if state==ACTIVE:
            state=INACTIVE

        self.logger.debug('Borg result: %s' % str(res))

        if res is None and state==INACTIVE:
            self.logger.error('No result from borg despite no error in log')
            state=ERRORS

        self.logger.debug('Waiting for borg subprocess to terminate in result thread')

        if not self.borg_instance.wait():
            self.logger.error('Borg subprocess did not terminate')
            state=combine_state(state, ERRORS)

        self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state))

        with self._cond:
            if self.current_operation['operation']=='create':
                self.lastrun_when=self.current_operation['when_monotonic']
            self.thread_res=None
            self.thread_log=None
            self.borg_instance=None
            self.current_operation=None
            self.state=state
            self.__update_status()
            self._cond.notify()

    def __do_launch(self, op, archive_or_repository, *args):
        passphrase=self.extract_passphrase()

        inst=BorgInstance(op['operation'], archive_or_repository, *args)
        inst.launch(passphrase=passphrase)

        self.logger.debug('Creating listener threads')

        t_log=Thread(target=self.__log_listener)
        t_log.daemon=True

        t_res=Thread(target=self.__result_listener)
        t_res.daemon=True

        self.thread_log=t_log
        self.thread_res=t_res
        self.borg_instance=inst

        t_log.start()
        t_res.start()

    def __launch(self, op):
        self.logger.debug("Launching '%s'" % op['operation'])

        if op['operation']=='create':
            archive="%s::%s%s" % (self.repository.repository_name,
                                  self.archive_prefix,
                                  self.archive_template)

            self.__do_launch(op, archive,
                             self.common_parameters+self.create_parameters,
                             self.paths)
        elif op['operation']=='prune':
            self.__do_launch(op, self.repository.repository_name,
                             ([{'prefix': self.archive_prefix}] + 
                              self.common_parameters +
                              self.prune_parameters))
        else:
            raise NotImplementedError("Invalid operation '%s'" % op['operation'])

    def __launch_check(self):
        op=self.scheduled_operation
        if not op:
            self.logger.debug("Queued operation aborted")
        else:
            self.scheduled_operation=None
            self.current_operation=op
            self.current_operation['when_monotonic']=time.monotonic()

            self.__launch(op)

            self.state=ACTIVE
            self.__update_status()


    def __main_thread(self):
        with self._cond:
            try:
                while not self._terminate:
                    self.__main_thread_wait_finish()
                    if not self._terminate:
                        self.__main_thread_wait_schedule()
                        if not self._terminate:
                            self.__main_thread_queue_and_launch()
            except Exception as err:
                self.logger.exception("Error with backup '%s'" % self._name)
                self.lastrun_when=time.monotonic()
                self.state=ERRORS
                self.scheduled_operation=None

            # Clean up to terminate: kill borg instance and communication threads
            if self.borg_instance:
                self.logger.debug("Terminating a borg instance")
                self.borg_instance.terminate()

            # Store threads to use outside lock
            thread_log=self.thread_log
            thread_res=self.thread_res

        self.logger.debug("Waiting for log and result threads to terminate")

        if thread_log:
            thread_log.join()

        if thread_res:
            thread_res.join()


    # Main thread/1. Wait while a current operation is running
    def __main_thread_wait_finish(self):
        while self.current_operation and not self._terminate:
            self.logger.debug("Waiting for current operation to finish")
            self._cond.wait()

    # Main thread/2. Schedule next operation if there is no manually
    # requested one
    def __main_thread_wait_schedule(self):
        op=None
        if not self.scheduled_operation:
            op=self.__next_operation_unlocked()
        if op:
            now=time.monotonic()
            delay=max(0, op['when_monotonic']-now)
            self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" %
                             (op['operation'], op['detail'],  delay))

            self.scheduled_operation=op
            self.state=combine_state(self.state, SCHEDULED)
            self.__update_status()

            # Wait under scheduled wait
            self.scheduler.wait_until(now+delay, self._cond, self._name)
        else:
            # Nothing scheduled - just wait
            self.logger.debug("Waiting for manual scheduling")
            self._cond.wait()

    # Main thread/3. If there is a scheduled operation (it might have been
    # changed manually from 'op' created in __main_thread_wait_schedule above),
    # queue it on the repository, and launch the operation once repository
    # available
    def __main_thread_queue_and_launch(self):
        if self.scheduled_operation:
            self.logger.debug("Queuing")
            self.state=combine_state(self.state, QUEUED)
            self.__update_status()
            self.repository.queue_action(self._cond,
                                         action=self.__launch_check,
                                         name=self._name)

    def __next_operation_unlocked(self):
        # TODO: pruning as well
        now=time.monotonic()
        if not self.lastrun_when:
            initial_interval=self.retry_interval
            if initial_interval==0:
                initial_interval=self.backup_interval
            if initial_interval==0:
                return None
            else:
                return {'operation': 'create',
                        'detail': 'initial',
                        'when_monotonic': now+initial_interval}
        elif self.state>=BUSY:
            if self.retry_interval==0:
                return None
            else:
                return {'operation': 'create',
                        'detail': 'retry',
                        'when_monotonic': self.lastrun_when+self.retry_interval}
        else:
            if self.backup_interval==0:
                return None
            else:
                return {'operation': 'create',
                        'detail': 'normal',
                        'when_monotonic': self.lastrun_when+self.backup_interval}

    def __status_unlocked(self):
        callback=self.__status_update_callback

        if self.current_operation:
            status=self.current_operation
            status['type']='current'
            # Errors should be set by listeners
        else:
            if self.scheduled_operation:
                status=self.scheduled_operation
                if self.state==QUEUED:
                    status['type']='queued'
                else:
                    status['type']='scheduled'
            else:
                status={'type': 'nothing'}

        status['name']=self._name
        status['state']=self.state

        if 'detail' not in status:
            status['detail']='NONE'

        if 'when_monotonic' in status:
            status['when']=(status['when_monotonic']
                            -time.monotonic()+time.time())

        return status, callback

    def __update_status(self):
        status, callback = self.__status_unlocked()
        if callback:
            #self._cond.release()
            try:
                callback(status)
            except Exception:
                self.logger.exception("Status update error")
            #finally:
            #    self._cond.acquire()

    #
    # Interface functions
    #

    def set_status_update_callback(self, callback):
        with self._cond:
            self.__status_update_callback=callback

    def status(self):
        with self._cond:
            res=self.__status_unlocked()
        return res[0]

    def create(self):
        op={'operation': 'create', 'detail': 'manual'}
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    def prune(self):
        op={'operation': 'prune', 'detail': 'manual'}
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
    def abort(self):
        with self._cond:
            if self.borg_instance:
                self.borg_instance.terminate()

mercurial