borgend/repository.py

Wed, 07 Feb 2018 20:39:01 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Wed, 07 Feb 2018 20:39:01 +0000
changeset 113
6993964140bd
parent 106
a7bdc239ef62
child 121
1279af7591f0
permissions
-rw-r--r--

Time snapshot fixes.
Python's default arguments are purely idiotic (aka. pythonic): generated
only once. This makes sense in a purely functional language, which Python
lightyears away from, but severely limits their usefulness in an imperative
language. Decorators also seem clumsy for this, as one would have to tell
the number of positional arguments for things to work nice, being able to
pass the snapshot both positionally and as keyword. No luck.
So have to do things the old-fashioned hard way.

#
# Borgend by Tuomo Valkonen, 2018
#
# Repository abstraction for queuing.
#

import weakref
import keyring
import logging

from . import config
from .scheduler import QueueThread, QueuedEvent
from .exprotect import protect_noreturn

logger=logging.getLogger(__name__)

class FIFOEvent(QueuedEvent):
    def __init__(self, cond, name=None):
        self._goodtogo=False
        super().__init__(cond, name=name)

    def is_before(self, other, snapshot=None):
        return False

# This FIFO essentially a fancy semaphore: Each thread waits on its own
# Condition, so can also be woken up from other executing threads.
# If they are woken up by the FIFO, then a "good to go" flag is set;
# and a specified action executed. Otherwise this is not done.
class FIFO(QueueThread):
    def __init__(self, **kwargs):
        super().__init__(target = self._fifo_thread, **kwargs)

    @protect_noreturn
    def _fifo_thread(self):
        with self._cond:
            while not self._terminate:
                ev=self._list
                if ev:
                    # We have to release lock on self._cond before obtaining
                    # one on ev.cond to avoid race conditions with
                    # self.queue_acion
                    self._cond.release()
                    with ev.cond:
                        # Just set "good to go" flag and notify the queued
                        # thread. To keep blocking other thread, it is the
                        # job of the queued thred to remove itself.
                        if not ev._goodtogo:
                            ev._goodtogo=True
                            ev.cond.notify_all()
                    self._cond.acquire()
                self._cond.wait()

            self.logger.debug('Terminating')

            # Termination cleanup
            ev=self._list
            while ev:
                # We can only remove ev from the list when ev.cond allows
                with ev.cond:
                    ev.cond.notifyAll()
                    ev=ev.next

    # cond has to be acquired on entry!
    def queue_action(self, cond, action=lambda: (), name=None):
        ev=FIFOEvent(cond, name=name)

        with self._cond:
            self._insert(ev)
            self._cond.notify()

        # This will release the lock on cond, allowing queue manager (scheduler)
        # thread to notify us if we are ready to be released
        logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN')
        ev.cond.wait()

        try:
            if ev._goodtogo:
                logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN')
                action()
        finally:
            with self._cond:
                self._unlink(ev)
                # Let _fifo_thread proceed to next action
                self._cond.notify()

        return ev._goodtogo

repositories=weakref.WeakValueDictionary()

class Repository(FIFO):
    def __decode_config(self, cfg):
        loc0='Repository %d' % self.identifier

        self.repository_name=config.check_string(cfg, 'name', 'Name', loc0)

        logger.debug("Configuring repository '%s'" % self.repository_name)

        loc = 'Repository "%s"'

        self.logger=logger.getChild(self.repository_name)

        self.location=config.check_string(cfg, 'location',
                                         'Target repository location', loc)

        self.borg_parameters=config.BorgParameters.from_config(cfg, loc)

        self.__keychain_account=config.check_string(cfg, 'keychain_account',
                                                    'Keychain account', loc,
                                                    default='')

        self.__passphrase=None

        if config.settings['extract_passphrases_at_startup']:
            try:
                self.__extract_passphrase()
            except Exception:
                pass

    def __init__(self, identifier, cfg):
        self.identifier=identifier
        self.__decode_config(cfg)
        super().__init__(name = 'RepositoryThread %s' % self.repository_name)
        repositories[self.repository_name]=self

    def __extract_passphrase(self):
        if not self.__passphrase:
            acc=self.__keychain_account
            if acc and acc!='':
                self.logger.debug('Requesting passphrase')
                try:
                    pw=keyring.get_password("borg-backup", acc)
                except Exception as err:
                    self.logger.error('Failed to retrieve passphrase')
                    raise err
                else:
                    self.logger.debug('Received passphrase')
                self.__passphrase=pw
            else:
                self.__passphrase=None
        return self.__passphrase

    def launch_borg_instance(self, inst):
        try:
            self.logger.debug('launch_borg_instance: entering _cond')
            with self._cond:
                self.logger.debug('launch_borg_instance: entering __extract_passphrase')
                passphrase=self.__extract_passphrase()
        except Exception as err:
            self.logger.error('Aborting operation due to failure to obtain passphrase')
            raise err
        else:
            inst.launch(passphrase=passphrase)

def find_repository(name):
    if name in repositories:
        return repositories[name]
    else:
        return None

mercurial