borgend/backup.py

changeset 80
a409242121d5
parent 79
b075b3db3044
child 85
56a000d15965
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/borgend/backup.py	Sun Jan 28 11:54:46 2018 +0000
@@ -0,0 +1,538 @@
+#
+# Borgend Backup instance
+#
+
+import logging
+import time
+from enum import IntEnum
+from threading import Thread, Lock, Condition
+
+from . import config
+from . import loggers
+from . import repository
+from . import dreamtime
+from .instance import BorgInstance
+from .scheduler import TerminableThread
+
+logger=loggers.get(__name__)
+
+JOIN_TIMEOUT=60
+
+#
+# State and operation related helper classes
+#
+
+class State(IntEnum):
+    # State
+    INACTIVE=0
+    SCHEDULED=1
+    QUEUED=2
+    ACTIVE=3
+
+
+class Errors(IntEnum):
+    OK=0
+    BUSY=1
+    OFFLINE=2
+    ERRORS=3
+
+    def combine(self, other):
+        return max(self, other)
+
+    def ok(self):
+        return self==self.OK
+
+    def __str__(self):
+        return _errorstring[self]
+
+_errorstring={
+    Errors.OK: 'ok',
+    Errors.BUSY: 'busy',
+    Errors.OFFLINE: 'offline',
+    Errors.ERRORS: 'errors'
+}
+
+class Operation:
+    CREATE='create'
+    PRUNE='prune'
+    def __init__(self, operation, time, **kwargs):
+        self.operation=operation
+        self.time=time
+        self.detail=kwargs
+
+    def when(self):
+        return self.time.realtime()
+
+
+class Status(Operation):
+    def __init__(self, backup, op=None):
+        if op:
+            super().__init__(op.operation, op.time, **op.detail)
+        else:
+            super().__init__(None, None)
+
+        self.name=backup.name
+        self.state=backup.state
+        self.errors=backup.errors
+
+#
+# Miscellaneous helper routines
+#
+
+loglevel_translation={
+    'CRITICAL': logging.CRITICAL,
+    'ERROR': logging.ERROR,
+    'WARNING': logging.WARNING,
+    'DEBUG': logging.DEBUG,
+    'INFO': logging.INFO
+}
+
+def translate_loglevel(x):
+    if x in loglevel_translation:
+        return loglevel_translation[x]
+    else:
+        return logging.ERROR
+
+def safe_get_int(t, x):
+    if x in t:
+        tmp=t[x]
+        if isinstance(tmp, int):
+            return tmp
+    return None
+
+#
+# The Backup class
+#
+
+class Backup(TerminableThread):
+
+    def __decode_config(self, cfg):
+        loc0='Backup %d' % self.identifier
+
+        self.backup_name=config.check_string(cfg, 'name', 'Name', loc0)
+
+        logger.debug("Configuring backup '%s'" % self.backup_name)
+
+        self.logger=logger.getChild(self.backup_name)
+
+        loc="Backup '%s'" % self.backup_name
+
+        reponame=config.check_string(cfg, 'repository',
+                                     'Target repository', loc)
+
+        self.repository=repository.find_repository(reponame)
+        if not self.repository:
+            raise Exception("Repository '%s' not configured" % reponame)
+
+        self.archive_prefix=config.check_string(cfg, 'archive_prefix',
+                                                'Archive prefix', loc)
+
+        self.archive_template=config.check_string(cfg, 'archive_template',
+                                                  'Archive template', loc)
+
+        self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval',
+                                                     'Backup interval', loc,
+                                                     config.defaults['backup_interval'])
+
+        self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
+                                                    'Retry interval', loc,
+                                                    config.defaults['retry_interval'])
+
+
+        scheduling=config.check_string(cfg, 'scheduling',
+                                      'Scheduling mode', loc,
+                                      default="dreamtime")
+
+        if scheduling=="dreamtime":
+            self.timeclass=dreamtime.DreamTime
+        elif scheduling=="realtime":
+            self.timeclass=dreamtime.MonotonicTime
+        elif scheduling=="manual":
+            self.backup_interval=0
+        else:
+            logging.error("Invalid time class '%s' for %s" % (scheduling, loc))
+
+        self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc)
+
+        self.borg_parameters=config.BorgParameters.from_config(cfg, loc)
+
+
+    def __init__(self, identifier, cfg, scheduler):
+        self.identifier=identifier
+        self.__status_update_callback=None
+        self.scheduler=scheduler
+        self.logger=None # setup up in __decode_config once backup name is known
+
+        self.borg_instance=None
+        self.thread_log=None
+        self.thread_res=None
+
+        self.current_operation=None
+        self.scheduled_operation=None
+        self.lastrun_when=None
+        self.lastrun_finished=None
+        self.state=State.INACTIVE
+        self.errors=Errors.OK
+        self.timeclass=dreamtime.DreamTime
+
+        self.__decode_config(cfg)
+
+        super().__init__(target = self.__main_thread, name = self.backup_name)
+        self.daemon=True
+
+    def is_running(self):
+        with self._cond:
+            running=self.__is_running_unlocked()
+        return running
+
+    def __is_running_unlocked(self):
+        running=self.current_operation
+        if not running:
+            # Consistency check
+            assert((not self.borg_instance and not self.thread_log and
+                    not self.thread_res))
+        return running
+
+    def __block_when_running(self):
+        running=self.is_running()
+        assert(not running)
+
+    def __log_listener(self):
+        self.logger.debug('Log listener thread waiting for entries')
+        success=True
+        for msg in iter(self.borg_instance.read_log, None):
+            self.logger.info(str(msg))
+            t=msg['type']
+
+            errormsg=None
+            callback=None
+
+            if t=='progress_percent':
+                current=safe_get_int(msg, 'current')
+                total=safe_get_int(msg, 'total')
+                if current is not None and total is not None:
+                    with self._cond:
+                        self.current_operation.detail['progress_current']=current
+                        self.current_operation.detail['progress_total']=total
+                        status, callback=self.__status_unlocked()
+
+            elif t=='archive_progress':
+                original_size=safe_get_int(msg, 'original_size')
+                compressed_size=safe_get_int(msg, 'compressed_size')
+                deduplicated_size=safe_get_int(msg, 'deduplicated_size')
+                if original_size is not None and original_size is not None and deduplicated_size is not None:
+                    with self._cond:
+                        self.current_operation.detail['original_size']=original_size
+                        self.current_operation.detail['compressed_size']=compressed_size
+                        self.current_operation.detail['deduplicated_size']=deduplicated_size
+                        status, callback=self.__status_unlocked()
+
+            elif t=='progress_message':
+                pass
+
+            elif t=='file_status':
+                pass
+
+            elif t=='log_message':
+                if 'levelname' not in msg:
+                    msg['levelname']='ERROR'
+                if 'message' not in msg:
+                    msg['message']='UNKNOWN'
+                if 'name' not in msg:
+                    msg['name']='borg'
+                lvl=translate_loglevel(msg['levelname'])
+                self.logger.log(lvl, msg['name'] + ': ' + msg['message'])
+                if lvl>=logging.ERROR:
+                    errormsg=msg
+                    errors=Errors.ERRORS
+                    if ('msgid' in msg and
+                        (msg['msgid']=='LockTimeout' or # observed in reality
+                         msg['msgid']=='LockErrorT' or # in docs
+                         msg['msgid']=='LockErrorT')): # in docs
+                        errors=Errors.BUSY
+                    with self._cond:
+                        self.errors=self.errors.combine(errors)
+                        status, callback=self.__status_unlocked()
+            else:
+                self.logger.debug('Unrecognised log entry %s' % str(status))
+
+            if callback:
+                callback(status, errors=errormsg)
+
+        self.logger.debug('Waiting for borg subprocess to terminate in log thread')
+
+        self.borg_instance.wait()
+
+        self.logger.debug('Borg subprocess terminated; terminating log listener thread')
+
+    def __result_listener(self):
+        self.logger.debug('Result listener thread waiting for result')
+
+        res=self.borg_instance.read_result()
+
+        self.logger.debug('Borg result: %s' % str(res))
+
+        with self._cond:
+            if res is None and self.errors.ok():
+                self.logger.error('No result from borg despite no error in log')
+                self.errors=Errors.ERRORS
+
+
+    def __do_launch(self, op, archive_or_repository,
+                    common_params, op_params, paths=[]):
+
+        inst=BorgInstance(op.operation, archive_or_repository,
+                          common_params, op_params, paths)
+
+        # Only the Repository object has access to the passphrase
+        self.repository.launch_borg_instance(inst)
+
+        self.logger.debug('Creating listener threads')
+
+        t_log=Thread(target=self.__log_listener)
+        t_log.daemon=True
+
+        t_res=Thread(target=self.__result_listener)
+        t_res.daemon=True
+
+        self.thread_log=t_log
+        self.thread_res=t_res
+        self.borg_instance=inst
+        self.current_operation=op
+        # Update scheduled time to real starting time to schedule
+        # next run relative to this
+        self.current_operation.time=dreamtime.MonotonicTime.now()
+        self.state=State.ACTIVE
+        # Reset error status when starting a new operation
+        self.errors=Errors.OK
+        self.__update_status()
+
+        t_log.start()
+        t_res.start()
+
+
+    def __launch(self, op):
+        self.logger.debug("Launching '%s'" % str(op.operation))
+
+        params=(config.borg_parameters
+                +self.repository.borg_parameters
+                +self.borg_parameters)
+
+        if op.operation==Operation.CREATE:
+            archive="%s::%s%s" % (self.repository.location,
+                                  self.archive_prefix,
+                                  self.archive_template)
+
+            self.__do_launch(op, archive, params.common,
+                             params.create, self.paths)
+        elif op.operation==Operation.PRUNE:
+            self.__do_launch(op, self.repository.location, params.common,
+                             [{'prefix': self.archive_prefix}] + params.create)
+
+        else:
+            raise NotImplementedError("Invalid operation '%s'" % str(op.operation))
+
+    # This must be called with self._cond held.
+    def __launch_and_wait(self):
+        op=self.scheduled_operation
+        if not op:
+            self.logger.debug("Queued operation aborted")
+        else:
+            self.scheduled_operation=None
+
+            self.__launch(op)
+
+            self.__wait_finish()
+
+    def __wait_finish(self):
+        # Wait for main logger thread to terminate, or for us to be terminated
+        while not self.terminate and self.thread_res.is_alive():
+            self._cond.release()
+            self.thread_res.join(JOIN_TIMEOUT)
+            self._cond.acquire()
+
+        # If terminate has been signalled, let outer termination handler
+        # take care of things (Within this Backup class, it would be cleanest
+        # to raise an exception instead, but in most other places it's better
+        # to just check self._terminate, so we don't complicate things with
+        # an extra exception.)
+        if self._terminate:
+            return
+
+        self.logger.debug('Waiting for borg and log subprocesses to terminate')
+
+        self._cond.release()
+        self.thread_log.join()
+        self._cond.acquire()
+
+        if not self.borg_instance.wait():
+            self.logger.error('Borg subprocess did not terminate')
+            self.errors=self.errors.combine(Errors.ERRORS)
+
+        if self.current_operation.operation=='create':
+            self.lastrun_when=self.current_operation.time.monotonic()
+            self.lastrun_finished=time.monotonic()
+        self.thread_res=None
+        self.thread_log=None
+        self.borg_instance=None
+        self.current_operation=None
+        self.state=State.INACTIVE
+        self.__update_status()
+
+    def __main_thread(self):
+        with self._cond:
+            try:
+                while not self._terminate:
+                    assert(not self.current_operation)
+                    self.__main_thread_wait_schedule()
+                    if not self._terminate:
+                        self.__main_thread_queue_and_launch()
+            except Exception as err:
+                self.logger.exception("Error with backup '%s'" % self.backup_name)
+                self.errors=Errors.ERRORS
+
+            self.state=State.INACTIVE
+            self.scheduled_operation=None
+
+            # Clean up to terminate: kill borg instance and communication threads
+            if self.borg_instance:
+                self.logger.debug("Terminating a borg instance")
+                self.borg_instance.terminate()
+
+            # Store threads to use outside lock
+            thread_log=self.thread_log
+            thread_res=self.thread_res
+            self.thread_log=None
+            self.thread_res=None
+
+        self.logger.debug("Waiting for log and result threads to terminate")
+
+        if thread_log:
+            thread_log.join()
+
+        if thread_res:
+            thread_res.join()
+
+    # Main thread/2. Schedule next operation if there is no manually
+    # requested one
+    def __main_thread_wait_schedule(self):
+        op=None
+        if not self.scheduled_operation:
+            op=self.__next_operation_unlocked()
+        if op:
+            self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" %
+                             (str(op.operation), op.detail or 'none',
+                              op.time.seconds_to(),
+                              op.time.__class__.__name__))
+
+            self.scheduled_operation=op
+            self.state=State.SCHEDULED
+            self.__update_status()
+
+            # Wait under scheduled wait
+            self.scheduler.wait_until(op.time, self._cond, self.backup_name)
+        else:
+            # Nothing scheduled - just wait
+            self.logger.info("Waiting for manual scheduling")
+
+            self.state=State.INACTIVE
+            self.__update_status()
+
+            self._cond.wait()
+
+    # Main thread/3. If there is a scheduled operation (it might have been
+    # changed manually from 'op' created in __main_thread_wait_schedule above),
+    # queue it on the repository, and launch the operation once repository
+    # available
+    def __main_thread_queue_and_launch(self):
+        if self.scheduled_operation:
+            self.logger.debug("Queuing")
+            self.state=State.QUEUED
+            self.__update_status()
+            res=self.repository.queue_action(self._cond,
+                                             action=self.__launch_and_wait,
+                                             name=self.backup_name)
+            if not res and not self._terminate:
+                self.logger.debug("Queueing aborted")
+                self.scheduled_operation=None
+                self.state=State.INACTIVE
+                self.__update_status()
+
+    def __next_operation_unlocked(self):
+        # TODO: pruning as well
+        if not self.lastrun_finished:
+            initial_interval=self.retry_interval
+            if initial_interval==0:
+                initial_interval=self.backup_interval
+            if initial_interval==0:
+                return None
+            else:
+                tm=self.timeclass.after(initial_interval)
+                return Operation(Operation.CREATE, tm, reason='initial')
+        elif not self.errors.ok():
+            if self.retry_interval==0:
+                return None
+            else:
+                tm=dreamtime.MonotonicTime(self.lastrun_finished+self.retry_interval)
+                return Operation(Operation.CREATE, tm, reason='retry')
+        else:
+            if self.backup_interval==0:
+                return None
+            else:
+                tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval)
+                return Operation(Operation.CREATE, tm)
+
+    def __status_unlocked(self):
+        callback=self.__status_update_callback
+
+        if self.current_operation:
+            status=Status(self, self.current_operation)
+        elif self.scheduled_operation:
+            status=Status(self, self.scheduled_operation)
+        else:
+            status=Status(self)
+
+        return status, callback
+
+    def __update_status(self):
+        status, callback = self.__status_unlocked()
+        if callback:
+            #self._cond.release()
+            try:
+                callback(status)
+            except Exception:
+                self.logger.exception("Status update error")
+            #finally:
+            #    self._cond.acquire()
+
+    #
+    # Interface functions
+    #
+
+    def set_status_update_callback(self, callback):
+        with self._cond:
+            self.__status_update_callback=callback
+
+    def status(self):
+        with self._cond:
+            res=self.__status_unlocked()
+        return res[0]
+
+    def create(self):
+        op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual')
+        with self._cond:
+            self.scheduled_operation=op
+            self._cond.notify()
+
+    def prune(self):
+        op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual')
+        with self._cond:
+            self.scheduled_operation=op
+            self._cond.notify()
+
+    # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
+    def abort(self):
+        with self._cond:
+            if self.borg_instance:
+                self.borg_instance.terminate()
+

mercurial