backup.py

Sat, 20 Jan 2018 20:34:23 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Sat, 20 Jan 2018 20:34:23 +0000
changeset 11
0bff53095f28
parent 10
76dbfb06eba0
child 12
16a8c63344c0
permissions
-rw-r--r--

New tray title: B. or B! depending on activity

#
# Borgend Backup instance
#

import config
import logging
import time
from instance import BorgInstance
from queue import Queue
from threading import Thread, Lock, Timer

loglevel_translation={
    'CRITICAL': logging.CRITICAL,
    'ERROR': logging.ERROR,
    'WARNING': logging.WARNING,
    'DEBUG': logging.DEBUG,
    'INFO': logging.INFO
}

def translate_loglevel(x):
    if x in loglevel_translation:
        return loglevel_translation[x]
    else:
        return logging.ERROR

class Backup:

    def __decode_config(self, cfg):
        loc0='backup target %d' % self.identifier

        self.name=config.check_string(cfg, 'name', 'Name', loc0)

        self.loc='backup target "%s"' % self.name

        self.repository=config.check_string(cfg, 'repository',
                                            'Target repository', self.loc)

        self.archive_prefix=config.check_string(cfg, 'archive_prefix',
                                                'Archive prefix', self.loc)

        self.archive_template=config.check_string(cfg, 'archive_template',
                                                  'Archive template', self.loc)

        self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval',
                                                     'Backup interval', self.loc,
                                                     config.defaults['backup_interval'])

        self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
                                                    'Retry interval', self.loc,
                                                    config.defaults['retry_interval'])

        self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', self.loc)

        self.common_parameters=config.check_list_of_dicts(cfg, 'common_parameters',
                                                         'Borg parameters', self.loc,
                                                         default=[])

        self.create_parameters=config.check_list_of_dicts(cfg, 'create_parameters',
                                                         'Create parameters', self.loc,
                                                         default=[])

        self.prune_parameters=config.check_list_of_dicts(cfg, 'prune_parameters',
                                                         'Prune parameters', self.loc,
                                                         default=[])


    def __init__(self, identifier, cfg):
        self.identifier=identifier

        self.__decode_config(cfg)

        self.config=config
        self.lastrun=None
        self.lastrun_success=None
        self.borg_instance=None
        self.current_operation=None
        self.thread_log=None
        self.thread_res=None
        self.timer=None
        self.scheduled_operation=None
        self.lock=Lock()
        self.status_update_callback=None

    def is_running(self):
        with self.lock:
            running=self.__is_running_unlocked()
        return running

    def __is_running_unlocked(self):
        running=self.current_operation
        if not running:
            # Consistency check
            assert((not self.borg_instance and not self.thread_log and
                    not self.thread_res))
        return running

    def __block_when_running(self):
        running=self.is_running()
        assert(not running)

    def __log_listener(self):
        logging.debug('Log listener thread waiting for entries')
        success=True
        for status in iter(self.borg_instance.read_log, None):
            logging.debug(str(status))
            t=status['type']
            #may_indicate_finished=False
            if t=='progress_percent':
                #may_indicate_finished=True
                # Temporary output
                if 'current' not in status:
                    status['current']=0
                if 'total' not in status:
                    status['total']=0
                print('%d / %d' % (status['current'], status['total']))
            elif t=='archive_progress':
                pass
            elif t=='progress_message':
                #may_indicate_finished=True
                pass
            elif t=='file_status':
                pass
            elif t=='log_message':
                if 'levelname' not in status:
                    status['levelname']='ERROR'
                if 'message' not in status:
                    status['message']='UNKNOWN'
                if 'name' not in status:
                    status['name']='borg'
                logging.log(translate_loglevel(status['levelname']),
                            status['name'] + ': ' + status['message'])
                # set success=False?
            elif t=='exception':
                success=False
            elif t=='unparsed_error':
                success=False

        logging.debug('Waiting for borg subprocess to terminate in log thread')

        self.borg_instance.wait()

        logging.debug('Borg subprocess terminated; terminating log listener thread')

        with self.lock:
            self.thread_log=None
            self.__finish_and_reschedule_if_both_listeners_terminated()


    def __result_listener(self):
        with self.lock:
            status, callback=self.__status_unlocked()
        if callback:
            callback(self, status)

        logging.debug('Result listener thread waiting for result')

        res=self.borg_instance.read_result()

        success=True

        logging.debug('Borg result: %s' % str(res))

        if res==None:
            success=False

        logging.debug('Waiting for borg subprocess to terminate in result thread')

        success=success and self.borg_instance.wait()

        logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success))

        with self.lock:
            if self.current_operation['operation']=='create':
                self.lastrun=self.current_operation['when_monotonic']
                self.lastrun_success=success
            self.thread_res=None
            self.__finish_and_reschedule_if_both_listeners_terminated()
            status, callback=self.__status_unlocked()
        if callback:
            callback(self, status)

    def __finish_and_reschedule_if_both_listeners_terminated(self):
        if self.thread_res==None and self.thread_log==None:
            logging.debug('Both threads terminated')
            self.borg_instance=None
            self.current_operation=None
            self.__schedule_unlocked()

    def __do_launch(self, queue, op, archive_or_repository, *args):
        inst=BorgInstance(op['operation'], archive_or_repository, *args)
        inst.launch()

        t_log=Thread(target=self.__log_listener)
        t_log.daemon=True

        t_res=Thread(target=self.__result_listener)
        t_res.daemon=True

        self.thread_log=t_log
        self.thread_res=t_res
        self.borg_instance=inst
        self.queue=queue
        self.current_operation=op
        self.current_operation['when_monotonic']=time.monotonic()

        t_log.start()
        t_res.start()

    def __launch(self, op, queue):
        if self.__is_running_unlocked():
            logging.info('Cannot start %s: already running %s'
                         % (operation, self.current_operation))
            return False
        else:
            if self.timer:
                logging.debug('Unscheduling timed operation due to launch of operation')
                self.timer=None
                self.scheduled_operation=None

            logging.debug("Launching '%s' on '%s'" % (op['operation'], self.name))

            if op['operation']=='create':
                archive="%s::%s%s" % (self.repository,
                                      self.archive_prefix,
                                      self.archive_template)

                self.__do_launch(queue, op, archive,
                                 self.common_parameters+self.create_parameters,
                                 self.paths)
            elif op['operation']=='prune':
                self.__do_launch(queue, op, self.repository,
                                 ([{'prefix': self.archive_prefix}] + 
                                  self.common_parameters +
                                  self.prune_parameters))
            else:
                logging.error("Invalid operaton '%s'" % op['operation'])
                self.__schedule_unlocked()

            return True

    def create(self, queue):
        op={'operation': 'create', 'detail': 'manual'}
        with self.lock:
            res=self.__launch(op, queue)
        return res

    def prune(self, queue):
        op={'operation': 'prune', 'detail': 'manual'}
        with self.lock:
            res=self.__launch(op, queue)
        return res

    # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
    def abort(self):
        with self.lock:
            if self.borg_instance:
                self.borg_instance.terminate()
            thread_log=self.thread_log
            thread_res=self.thread_res

        if thread_log:
            thread_log.terminate()

        if thread_res:
            thread_res.terminate()


    def join(self):
        logging.debug('Waiting for borg listener threads to terminate')

        with self.lock:
            thread_log=self.thread_log
            thread_res=self.thread_res

        if thread_log:
            thread_log.join()

        if thread_res:
            thread_res.join()

        assert(self.thread_log==None and self.thread_res==None)

    def __queue_timed_operation(self):
        with self.lock:
            op=self.scheduled_operation
            self.scheduled_operation=None
            self.timer=None

            if self.__is_running_unlocked():
                logging.info('Aborted queue operation, as an operation is already running')
            else:
                # TODO: Queue on 'repository' and online status for SSH, etc.

                # TODO: UI comms. queue?
                self.__launch(op, None)

    def __next_operation_unlocked(self):
        # TODO: pruning as well
        now=time.monotonic()
        if not self.lastrun:
            initial_interval=self.retry_interval
            if initial_interval==0:
                initial_interval=self.backup_interval
            if initial_interval==0:
                return None
            else:
                return {'operation': 'create',
                        'detail': 'initial',
                        'when_monotonic': now+initial_interval}
        elif not self.lastrun_success:
            if self.retry_interval==0:
                return None
            else:
                return {'operation': 'create',
                        'detail': 'retry',
                        'when_monotonic': self.lastrun+self.retry_interval}
        else:
            if self.backup_interval==0:
                return None
            else:
                return {'operation': 'create',
                        'detail': None,
                        'when_monotonic': self.lastrun+self.backup_interval}

    def __schedule_unlocked(self):
        if self.current_operation:
            return self.current_operation
        else:
            op=self.__next_operation_unlocked()

            if op:
                now=time.monotonic()
                delay=max(0, op['when_monotonic']-now)
                logging.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" %
                             (op['operation'], op['detail'], self.name, delay))
                tmr=Timer(delay, self.__queue_timed_operation)
                self.scheduled_operation=op
                self.timer=tmr
                tmr.start()

            return op

    def schedule(self):
        with self.lock:
            return self.__schedule_unlocked()

    def set_status_update_callback(self, callback):
        with self.lock:
            self.status_update_callback=callback

    def status(self):
        with self.lock:
            res=self.__status_unlocked()
        return res[0]

    def __status_unlocked(self):
        callback=self.status_update_callback
        if self.current_operation:
            status=self.current_operation
            status['type']='current'
        elif self.scheduled_operation:
            status=self.scheduled_operation
            status['type']='scheduled'
        else:
            status={'type': 'nothing'}

        status['name']=self.name

        if 'when_monotonic' in status:
            status['when']=(status['when_monotonic']
                            -time.monotonic()+time.time())

        return status, callback


mercurial