borgend/instance.py

Wed, 02 Feb 2022 11:46:05 +0200

author
Tuomo Valkonen <tuomov@iki.fi>
date
Wed, 02 Feb 2022 11:46:05 +0200
changeset 144
31227feaa05a
parent 120
109eaddc16e1
permissions
-rw-r--r--

Workaround to refresh timer loops on some configurations

#
# Borgend by Tuomo Valkonen, 2018
#
# This file implements a Borg launching interface.
#

import os
import json
import logging
from subprocess import Popen, PIPE

from .config import settings

logger=logging.getLogger(__name__)

necessary_opts=['--log-json', '--progress']

necessary_opts_for={
    'create': ['--json'],
    'info': ['--json'],
    'list': ['--json'],
    'prune': ['--list'],
}

# Conversion of config into command line
def arglistify(args):
    flatten=lambda l: [item for sublist in l for item in sublist]
    if args is None:
        return []
    else:
        # Insert --key=str(value) for 'key: value' in the config.
        # Boolean values are handled different, since borg does not take
        # --key=true type of arguments. If the value is true --key is inserted,
        # otherwise not.
        def mkarg(key, value):
            if isinstance(value, bool):
                if value:
                    return ['--' + key]
                else:
                    return []
            else:
                return ['--' + key, str(value)]
        return flatten([mkarg(key, d[key]) for d in args for key in d])

class BorgInstance:
    def __init__(self, operation, archive_or_repository,
                 common_params, op_params, paths):
        self.operation=operation;
        self.archive_or_repository=archive_or_repository;
        self.common_params=common_params
        self.op_params=op_params
        self.paths=paths
        self.proc=None

    def construct_cmdline(self):
        cmd=([settings['borg']['executable']]+necessary_opts+
             arglistify(self.common_params)+
             [self.operation])

        if self.operation in necessary_opts_for:
            cmd=cmd+necessary_opts_for[self.operation]

        return (cmd+arglistify(self.op_params)
                +[self.archive_or_repository]+self.paths)

    def launch(self, passphrase=None):
        cmd=self.construct_cmdline()

        logger.info('Launching ' + str(cmd))

        # Set passphrase if not, or set to empty if not known, so borg
        # won't hang waiting for it, which seems to happen even if we
        # close stdin.
        env=os.environ.copy()
        env['BORG_PASSPHRASE']=passphrase or ''

        # Workaround: if launched is a standalone app created with py2app,
        # borg will fail unless Python environment is reset.
        # TODO: Of course, this will fail if the system needs the variables
        # PYTHONPATH or PYTHONHOME set to certain values.
        if '_PY2APP_LAUNCHED_' in env:
            val=env['_PY2APP_LAUNCHED_']
            if val=='1':
                if 'PYTHONPATH' in env:
                    del env['PYTHONPATH']
                if 'PYTHONHOME' in env:
                    del env['PYTHONHOME']

        self.proc=Popen(cmd, env=env, stdout=PIPE, stderr=PIPE, stdin=PIPE)

        # We don't do passphrase input etc.
        self.proc.stdin.close()

    def read_result(self):
        stream=self.proc.stdout
        try:
            line=stream.read(-1)
        except Exception:
            logger.exception('Borg stdout pipe read failed')

        if line==b'':
            #logger.debug('Borg stdout pipe EOF?')
            return None

        try:
            return json.loads(line.decode())
        except Exception:
            logger.exception('JSON parse failed on stdout: %s' % str(line))
            return None

    def read_log(self):
        stream=self.proc.stderr
        try:
            line=stream.readline()
        except Exception:
            logger.exception('Pipe stderr pipe read failed')

            return {'type': 'log_message',
                    'levelname': 'CRITICAL',
                    'name': 'borgend.instance.BorgInstance',
                    'msgid': 'Borgend.Exception',
                    'message': err}

        if line==b'':
            #logger.debug('Borg stderr pipe EOF?')
            return None


        try:
            res=json.loads(line.decode())
            if 'type' not in res:
                res['type']='UNKNOWN'
            return res
        except:
            logger.exception('JSON parse failed on stderr: %s' % str(line))

            errmsg=line
            for line in iter(stream.readline, b''):
                errmsg=errmsg+line

            return {'type': 'log_message',
                    'levelname': 'ERROR',
                    'name': 'borgend.instance.BorgInstance',
                    'msgid': 'Borgend.JSONFail',
                    'message': str(errmsg)}

    def terminate(self):
        if self.proc:
            self.proc.terminate()

    # Returns True if has terminated
    def wait(self, timeout=None):
        if self.proc:
            return self.proc.wait(timeout=timeout) is not None
        else:
            return True

    def has_terminated(self):
        return not self.proc or (self.proc.poll() is not None)

mercurial