borgend/repository.py

Mon, 05 Feb 2018 10:25:17 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Mon, 05 Feb 2018 10:25:17 +0000
changeset 106
a7bdc239ef62
parent 101
3068b0de12ee
child 121
1279af7591f0
permissions
-rw-r--r--

Added exeption protection decorators to callbacks.
If callbacks crash, there's rarely anything in the logs otherwise.

#
# Borgend by Tuomo Valkonen, 2018
#
# Repository abstraction for queuing.
#

import weakref
import keyring
import logging

from . import config
from .scheduler import QueueThread, QueuedEvent
from .exprotect import protect_noreturn

logger=logging.getLogger(__name__)

class FIFOEvent(QueuedEvent):
    def __init__(self, cond, name=None):
        self._goodtogo=False
        super().__init__(cond, name=name)

    def is_before(self, other, snapshot=None):
        return False

# This FIFO essentially a fancy semaphore: Each thread waits on its own
# Condition, so can also be woken up from other executing threads.
# If they are woken up by the FIFO, then a "good to go" flag is set;
# and a specified action executed. Otherwise this is not done.
class FIFO(QueueThread):
    def __init__(self, **kwargs):
        super().__init__(target = self._fifo_thread, **kwargs)

    @protect_noreturn
    def _fifo_thread(self):
        with self._cond:
            while not self._terminate:
                ev=self._list
                if ev:
                    # We have to release lock on self._cond before obtaining
                    # one on ev.cond to avoid race conditions with
                    # self.queue_acion
                    self._cond.release()
                    with ev.cond:
                        # Just set "good to go" flag and notify the queued
                        # thread. To keep blocking other thread, it is the
                        # job of the queued thred to remove itself.
                        if not ev._goodtogo:
                            ev._goodtogo=True
                            ev.cond.notify_all()
                    self._cond.acquire()
                self._cond.wait()

            self.logger.debug('Terminating')

            # Termination cleanup
            ev=self._list
            while ev:
                # We can only remove ev from the list when ev.cond allows
                with ev.cond:
                    ev.cond.notifyAll()
                    ev=ev.next

    # cond has to be acquired on entry!
    def queue_action(self, cond, action=lambda: (), name=None):
        ev=FIFOEvent(cond, name=name)

        with self._cond:
            self._insert(ev)
            self._cond.notify()

        # This will release the lock on cond, allowing queue manager (scheduler)
        # thread to notify us if we are ready to be released
        logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN')
        ev.cond.wait()

        try:
            if ev._goodtogo:
                logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN')
                action()
        finally:
            with self._cond:
                self._unlink(ev)
                # Let _fifo_thread proceed to next action
                self._cond.notify()

        return ev._goodtogo

repositories=weakref.WeakValueDictionary()

class Repository(FIFO):
    def __decode_config(self, cfg):
        loc0='Repository %d' % self.identifier

        self.repository_name=config.check_string(cfg, 'name', 'Name', loc0)

        logger.debug("Configuring repository '%s'" % self.repository_name)

        loc = 'Repository "%s"'

        self.logger=logger.getChild(self.repository_name)

        self.location=config.check_string(cfg, 'location',
                                         'Target repository location', loc)

        self.borg_parameters=config.BorgParameters.from_config(cfg, loc)

        self.__keychain_account=config.check_string(cfg, 'keychain_account',
                                                    'Keychain account', loc,
                                                    default='')

        self.__passphrase=None

        if config.settings['extract_passphrases_at_startup']:
            try:
                self.__extract_passphrase()
            except Exception:
                pass

    def __init__(self, identifier, cfg):
        self.identifier=identifier
        self.__decode_config(cfg)
        super().__init__(name = 'RepositoryThread %s' % self.repository_name)
        repositories[self.repository_name]=self

    def __extract_passphrase(self):
        if not self.__passphrase:
            acc=self.__keychain_account
            if acc and acc!='':
                self.logger.debug('Requesting passphrase')
                try:
                    pw=keyring.get_password("borg-backup", acc)
                except Exception as err:
                    self.logger.error('Failed to retrieve passphrase')
                    raise err
                else:
                    self.logger.debug('Received passphrase')
                self.__passphrase=pw
            else:
                self.__passphrase=None
        return self.__passphrase

    def launch_borg_instance(self, inst):
        try:
            self.logger.debug('launch_borg_instance: entering _cond')
            with self._cond:
                self.logger.debug('launch_borg_instance: entering __extract_passphrase')
                passphrase=self.__extract_passphrase()
        except Exception as err:
            self.logger.error('Aborting operation due to failure to obtain passphrase')
            raise err
        else:
            inst.launch(passphrase=passphrase)

def find_repository(name):
    if name in repositories:
        return repositories[name]
    else:
        return None

mercurial