backup.py

Sun, 28 Jan 2018 11:38:01 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Sun, 28 Jan 2018 11:38:01 +0000
changeset 79
b075b3db3044
parent 78
83b43987e61e
permissions
-rw-r--r--

Cleaned up module organisation to simplify borgend.py and not have to import it in other modules.

#
# Borgend Backup instance
#

import config
import logging
import time
import loggers
import repository
import dreamtime
from enum import IntEnum
from instance import BorgInstance
from threading import Thread, Lock, Condition
from scheduler import TerminableThread

logger=loggers.get(__name__)

JOIN_TIMEOUT=60

#
# State and operation related helper classes
#

class State(IntEnum):
    # State
    INACTIVE=0
    SCHEDULED=1
    QUEUED=2
    ACTIVE=3


class Errors(IntEnum):
    OK=0
    BUSY=1
    OFFLINE=2
    ERRORS=3

    def combine(self, other):
        return max(self, other)

    def ok(self):
        return self==self.OK

    def __str__(self):
        return _errorstring[self]

_errorstring={
    Errors.OK: 'ok',
    Errors.BUSY: 'busy',
    Errors.OFFLINE: 'offline',
    Errors.ERRORS: 'errors'
}

class Operation:
    CREATE='create'
    PRUNE='prune'
    def __init__(self, operation, time, **kwargs):
        self.operation=operation
        self.time=time
        self.detail=kwargs

    def when(self):
        return self.time.realtime()


class Status(Operation):
    def __init__(self, backup, op=None):
        if op:
            super().__init__(op.operation, op.time, **op.detail)
        else:
            super().__init__(None, None)

        self.name=backup.name
        self.state=backup.state
        self.errors=backup.errors

#
# Miscellaneous helper routines
#

loglevel_translation={
    'CRITICAL': logging.CRITICAL,
    'ERROR': logging.ERROR,
    'WARNING': logging.WARNING,
    'DEBUG': logging.DEBUG,
    'INFO': logging.INFO
}

def translate_loglevel(x):
    if x in loglevel_translation:
        return loglevel_translation[x]
    else:
        return logging.ERROR

def safe_get_int(t, x):
    if x in t:
        tmp=t[x]
        if isinstance(tmp, int):
            return tmp
    return None

#
# The Backup class
#

class Backup(TerminableThread):

    def __decode_config(self, cfg):
        loc0='Backup %d' % self.identifier

        self.backup_name=config.check_string(cfg, 'name', 'Name', loc0)

        logger.debug("Configuring backup '%s'" % self.backup_name)

        self.logger=logger.getChild(self.backup_name)

        loc="Backup '%s'" % self.backup_name

        reponame=config.check_string(cfg, 'repository',
                                     'Target repository', loc)

        self.repository=repository.find_repository(reponame)
        if not self.repository:
            raise Exception("Repository '%s' not configured" % reponame)

        self.archive_prefix=config.check_string(cfg, 'archive_prefix',
                                                'Archive prefix', loc)

        self.archive_template=config.check_string(cfg, 'archive_template',
                                                  'Archive template', loc)

        self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval',
                                                     'Backup interval', loc,
                                                     config.defaults['backup_interval'])

        self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
                                                    'Retry interval', loc,
                                                    config.defaults['retry_interval'])


        scheduling=config.check_string(cfg, 'scheduling',
                                      'Scheduling mode', loc,
                                      default="dreamtime")

        if scheduling=="dreamtime":
            self.timeclass=dreamtime.DreamTime
        elif scheduling=="realtime":
            self.timeclass=dreamtime.MonotonicTime
        elif scheduling=="manual":
            self.backup_interval=0
        else:
            logging.error("Invalid time class '%s' for %s" % (scheduling, loc))

        self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc)

        self.borg_parameters=config.BorgParameters.from_config(cfg, loc)


    def __init__(self, identifier, cfg, scheduler):
        self.identifier=identifier
        self.__status_update_callback=None
        self.scheduler=scheduler
        self.logger=None # setup up in __decode_config once backup name is known

        self.borg_instance=None
        self.thread_log=None
        self.thread_res=None

        self.current_operation=None
        self.scheduled_operation=None
        self.lastrun_when=None
        self.lastrun_finished=None
        self.state=State.INACTIVE
        self.errors=Errors.OK
        self.timeclass=dreamtime.DreamTime

        self.__decode_config(cfg)

        super().__init__(target = self.__main_thread, name = self.backup_name)
        self.daemon=True

    def is_running(self):
        with self._cond:
            running=self.__is_running_unlocked()
        return running

    def __is_running_unlocked(self):
        running=self.current_operation
        if not running:
            # Consistency check
            assert((not self.borg_instance and not self.thread_log and
                    not self.thread_res))
        return running

    def __block_when_running(self):
        running=self.is_running()
        assert(not running)

    def __log_listener(self):
        self.logger.debug('Log listener thread waiting for entries')
        success=True
        for msg in iter(self.borg_instance.read_log, None):
            self.logger.info(str(msg))
            t=msg['type']

            errormsg=None
            callback=None

            if t=='progress_percent':
                current=safe_get_int(msg, 'current')
                total=safe_get_int(msg, 'total')
                if current is not None and total is not None:
                    with self._cond:
                        self.current_operation.detail['progress_current']=current
                        self.current_operation.detail['progress_total']=total
                        status, callback=self.__status_unlocked()

            elif t=='archive_progress':
                original_size=safe_get_int(msg, 'original_size')
                compressed_size=safe_get_int(msg, 'compressed_size')
                deduplicated_size=safe_get_int(msg, 'deduplicated_size')
                if original_size is not None and original_size is not None and deduplicated_size is not None:
                    with self._cond:
                        self.current_operation.detail['original_size']=original_size
                        self.current_operation.detail['compressed_size']=compressed_size
                        self.current_operation.detail['deduplicated_size']=deduplicated_size
                        status, callback=self.__status_unlocked()

            elif t=='progress_message':
                pass

            elif t=='file_status':
                pass

            elif t=='log_message':
                if 'levelname' not in msg:
                    msg['levelname']='ERROR'
                if 'message' not in msg:
                    msg['message']='UNKNOWN'
                if 'name' not in msg:
                    msg['name']='borg'
                lvl=translate_loglevel(msg['levelname'])
                self.logger.log(lvl, msg['name'] + ': ' + msg['message'])
                if lvl>=logging.ERROR:
                    errormsg=msg
                    errors=Errors.ERRORS
                    if ('msgid' in msg and
                        (msg['msgid']=='LockTimeout' or # observed in reality
                         msg['msgid']=='LockErrorT' or # in docs
                         msg['msgid']=='LockErrorT')): # in docs
                        errors=Errors.BUSY
                    with self._cond:
                        self.errors=self.errors.combine(errors)
                        status, callback=self.__status_unlocked()
            else:
                self.logger.debug('Unrecognised log entry %s' % str(status))

            if callback:
                callback(status, errors=errormsg)

        self.logger.debug('Waiting for borg subprocess to terminate in log thread')

        self.borg_instance.wait()

        self.logger.debug('Borg subprocess terminated; terminating log listener thread')

    def __result_listener(self):
        self.logger.debug('Result listener thread waiting for result')

        res=self.borg_instance.read_result()

        self.logger.debug('Borg result: %s' % str(res))

        with self._cond:
            if res is None and self.errors.ok():
                self.logger.error('No result from borg despite no error in log')
                self.errors=Errors.ERRORS


    def __do_launch(self, op, archive_or_repository,
                    common_params, op_params, paths=[]):

        inst=BorgInstance(op.operation, archive_or_repository,
                          common_params, op_params, paths)

        # Only the Repository object has access to the passphrase
        self.repository.launch_borg_instance(inst)

        self.logger.debug('Creating listener threads')

        t_log=Thread(target=self.__log_listener)
        t_log.daemon=True

        t_res=Thread(target=self.__result_listener)
        t_res.daemon=True

        self.thread_log=t_log
        self.thread_res=t_res
        self.borg_instance=inst
        self.current_operation=op
        # Update scheduled time to real starting time to schedule
        # next run relative to this
        self.current_operation.time=dreamtime.MonotonicTime.now()
        self.state=State.ACTIVE
        # Reset error status when starting a new operation
        self.errors=Errors.OK
        self.__update_status()

        t_log.start()
        t_res.start()


    def __launch(self, op):
        self.logger.debug("Launching '%s'" % str(op.operation))

        params=(config.borg_parameters
                +self.repository.borg_parameters
                +self.borg_parameters)

        if op.operation==Operation.CREATE:
            archive="%s::%s%s" % (self.repository.location,
                                  self.archive_prefix,
                                  self.archive_template)

            self.__do_launch(op, archive, params.common,
                             params.create, self.paths)
        elif op.operation==Operation.PRUNE:
            self.__do_launch(op, self.repository.location, params.common,
                             [{'prefix': self.archive_prefix}] + params.create)

        else:
            raise NotImplementedError("Invalid operation '%s'" % str(op.operation))

    # This must be called with self._cond held.
    def __launch_and_wait(self):
        op=self.scheduled_operation
        if not op:
            self.logger.debug("Queued operation aborted")
        else:
            self.scheduled_operation=None

            self.__launch(op)

            self.__wait_finish()

    def __wait_finish(self):
        # Wait for main logger thread to terminate, or for us to be terminated
        while not self.terminate and self.thread_res.is_alive():
            self._cond.release()
            self.thread_res.join(JOIN_TIMEOUT)
            self._cond.acquire()

        # If terminate has been signalled, let outer termination handler
        # take care of things (Within this Backup class, it would be cleanest
        # to raise an exception instead, but in most other places it's better
        # to just check self._terminate, so we don't complicate things with
        # an extra exception.)
        if self._terminate:
            return

        self.logger.debug('Waiting for borg and log subprocesses to terminate')

        self._cond.release()
        self.thread_log.join()
        self._cond.acquire()

        if not self.borg_instance.wait():
            self.logger.error('Borg subprocess did not terminate')
            self.errors=self.errors.combine(Errors.ERRORS)

        if self.current_operation.operation=='create':
            self.lastrun_when=self.current_operation.time.monotonic()
            self.lastrun_finished=time.monotonic()
        self.thread_res=None
        self.thread_log=None
        self.borg_instance=None
        self.current_operation=None
        self.state=State.INACTIVE
        self.__update_status()

    def __main_thread(self):
        with self._cond:
            try:
                while not self._terminate:
                    assert(not self.current_operation)
                    self.__main_thread_wait_schedule()
                    if not self._terminate:
                        self.__main_thread_queue_and_launch()
            except Exception as err:
                self.logger.exception("Error with backup '%s'" % self.backup_name)
                self.errors=Errors.ERRORS

            self.state=State.INACTIVE
            self.scheduled_operation=None

            # Clean up to terminate: kill borg instance and communication threads
            if self.borg_instance:
                self.logger.debug("Terminating a borg instance")
                self.borg_instance.terminate()

            # Store threads to use outside lock
            thread_log=self.thread_log
            thread_res=self.thread_res
            self.thread_log=None
            self.thread_res=None

        self.logger.debug("Waiting for log and result threads to terminate")

        if thread_log:
            thread_log.join()

        if thread_res:
            thread_res.join()

    # Main thread/2. Schedule next operation if there is no manually
    # requested one
    def __main_thread_wait_schedule(self):
        op=None
        if not self.scheduled_operation:
            op=self.__next_operation_unlocked()
        if op:
            self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" %
                             (str(op.operation), op.detail or 'none',
                              op.time.seconds_to(),
                              op.time.__class__.__name__))

            self.scheduled_operation=op
            self.state=State.SCHEDULED
            self.__update_status()

            # Wait under scheduled wait
            self.scheduler.wait_until(op.time, self._cond, self.backup_name)
        else:
            # Nothing scheduled - just wait
            self.logger.info("Waiting for manual scheduling")

            self.state=State.INACTIVE
            self.__update_status()

            self._cond.wait()

    # Main thread/3. If there is a scheduled operation (it might have been
    # changed manually from 'op' created in __main_thread_wait_schedule above),
    # queue it on the repository, and launch the operation once repository
    # available
    def __main_thread_queue_and_launch(self):
        if self.scheduled_operation:
            self.logger.debug("Queuing")
            self.state=State.QUEUED
            self.__update_status()
            res=self.repository.queue_action(self._cond,
                                             action=self.__launch_and_wait,
                                             name=self.backup_name)
            if not res and not self._terminate:
                self.logger.debug("Queueing aborted")
                self.scheduled_operation=None
                self.state=State.INACTIVE
                self.__update_status()

    def __next_operation_unlocked(self):
        # TODO: pruning as well
        if not self.lastrun_finished:
            initial_interval=self.retry_interval
            if initial_interval==0:
                initial_interval=self.backup_interval
            if initial_interval==0:
                return None
            else:
                tm=self.timeclass.after(initial_interval)
                return Operation(Operation.CREATE, tm, reason='initial')
        elif not self.errors.ok():
            if self.retry_interval==0:
                return None
            else:
                tm=dreamtime.MonotonicTime(self.lastrun_finished+self.retry_interval)
                return Operation(Operation.CREATE, tm, reason='retry')
        else:
            if self.backup_interval==0:
                return None
            else:
                tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval)
                return Operation(Operation.CREATE, tm)

    def __status_unlocked(self):
        callback=self.__status_update_callback

        if self.current_operation:
            status=Status(self, self.current_operation)
        elif self.scheduled_operation:
            status=Status(self, self.scheduled_operation)
        else:
            status=Status(self)

        return status, callback

    def __update_status(self):
        status, callback = self.__status_unlocked()
        if callback:
            #self._cond.release()
            try:
                callback(status)
            except Exception:
                self.logger.exception("Status update error")
            #finally:
            #    self._cond.acquire()

    #
    # Interface functions
    #

    def set_status_update_callback(self, callback):
        with self._cond:
            self.__status_update_callback=callback

    def status(self):
        with self._cond:
            res=self.__status_unlocked()
        return res[0]

    def create(self):
        op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual')
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    def prune(self):
        op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual')
        with self._cond:
            self.scheduled_operation=op
            self._cond.notify()

    # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
    def abort(self):
        with self._cond:
            if self.borg_instance:
                self.borg_instance.terminate()

mercurial