borgend/backup.py

changeset 97
96d5adbe0205
parent 96
de8ac6c470d8
child 98
9052e427ea39
--- a/borgend/backup.py	Mon Jan 29 14:32:27 2018 +0000
+++ b/borgend/backup.py	Wed Jan 31 00:06:54 2018 +0000
@@ -8,12 +8,13 @@
 import logging
 import time
 import datetime
+import re
 from enum import IntEnum
 from threading import Thread, Lock, Condition
 
 from . import config
 from . import repository
-from . import dreamtime
+from .dreamtime import MonotonicTime, DreamTime, RealTime
 from .instance import BorgInstance
 from .scheduler import TerminableThread
 
@@ -61,25 +62,45 @@
     INFO='info'
     LIST='list'
 
-    def __init__(self, operation, time, **kwargs):
-        self.operation=operation
-        self.time=time
+    def __init__(self, type, start_time, **kwargs):
+        self.type=type
+        self.start_time=start_time
+        self.finish_time=None
         self.detail=kwargs
+        self.errors=Errors.OK
 
     def when(self):
-        return self.time.realtime()
+        return self.start_time.realtime()
+
+    def ok(self):
+        return self.errors.ok()
+
+    def add_error(self, error):
+        self.errors=self.errors.combine(error)
 
 
 class Status(Operation):
     def __init__(self, backup, op=None):
+        op=None
+        errorsop=None
+
+        if backup.current_operation:
+            op=backup.current_operation
+        elif backup.scheduled_operation:
+            op=backup.scheduled_operation
+            if backup.previous_operation:
+                errorsop=backup.previous_operation
+
         if op:
-            super().__init__(op.operation, op.time, **op.detail)
+            super().__init__(op.type, op.start_time, **op.detail)
+            if not errorsop:
+                errorsop=op
+            self.errors=errorsop.errors
         else:
             super().__init__(None, None)
 
         self.name=backup.name
         self.state=backup.state
-        self.errors=backup.errors
 
 #
 # Miscellaneous helper routines
@@ -139,6 +160,22 @@
 
     return thistime, True
 
+_prune_progress_re=re.compile(".*\(([0-9]+)/([0-9]+)\)$")
+# Borg gives very little progress info in easy form, so try to extrat it
+def check_prune_status(msg):
+    res=_prune_progress_re.match(msg)
+    if res:
+        c=res.groups()
+        try:
+            archive_no=int(c[0])
+            of_total=int(c[1])
+        except:
+            pass
+        else:
+            return archive_no, of_total
+    return None, None
+
+
 #
 # The Backup class
 #
@@ -173,6 +210,10 @@
                                                      'Backup interval', loc,
                                                      config.defaults['backup_interval'])
 
+        self.prune_interval=config.check_nonneg_int(cfg, 'prune_interval',
+                                                     'Prune interval', loc,
+                                                     config.defaults['prune_interval'])
+
         self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
                                                     'Retry interval', loc,
                                                     config.defaults['retry_interval'])
@@ -183,9 +224,9 @@
                                       default="dreamtime")
 
         if scheduling=="dreamtime":
-            self.timeclass=dreamtime.DreamTime
+            self.timeclass=DreamTime
         elif scheduling=="realtime":
-            self.timeclass=dreamtime.MonotonicTime
+            self.timeclass=MonotonicTime
         elif scheduling=="manual":
             self.backup_interval=0
         else:
@@ -206,14 +247,12 @@
         self.thread_log=None
         self.thread_res=None
 
+        self.previous_operation=None
         self.current_operation=None
         self.scheduled_operation=None
-        self.last_operation_finished=None
-        self.last_create_when=None
-        self.last_create_finished=None
+        self.previous_operation_of_type={}
         self.state=State.INACTIVE
-        self.errors=Errors.OK
-        self.timeclass=dreamtime.DreamTime
+        self.timeclass=DreamTime
 
         self.__decode_config(cfg)
 
@@ -241,7 +280,7 @@
         self.logger.debug('Log listener thread waiting for entries')
         success=True
         for msg in iter(self.borg_instance.read_log, None):
-            self.logger.info(str(msg))
+            self.logger.debug(str(msg))
             t=msg['type']
 
             errormsg=None
@@ -293,13 +332,20 @@
                          msg['msgid']=='LockErrorT')): # in docs
                         errors=Errors.BUSY
                     with self._cond:
-                        self.errors=self.errors.combine(errors)
+                        self.current_operation.add_error(errors)
+                        status, callback=self.__status_unlocked()
+                elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE:
+                    # Borg gives very little progress info in easy form, so try to extrat it
+                    archive_number, of_total=check_prune_status(msg['message'])
+                    if archive_number!=None and of_total!=None:
+                        self.current_operation.detail['progress_current_secondary']=archive_number
+                        self.current_operation.detail['progress_total_secondary']=of_total
                         status, callback=self.__status_unlocked()
 
             elif t=='question_prompt' or t=='question_prompt_retry':
                 self.logger.error('Did not expect to receive question prompt from borg')
                 with self._cond:
-                    self.errors=self.errors.combine(Errors.ERRORS)
+                    self.current_operation.add_error(Errors.ERRORS)
                 # TODO: terminate org? Send 'NO' reply?
 
             elif (t=='question_invalid_answer' or t=='question_accepted_default'
@@ -328,14 +374,17 @@
 
         if res is None:
             with self._cond:
-                if self.errors.ok():
+                # Prune gives absolutely no result, so don't complain
+                if (self.current_operation.ok() and
+                    self.current_operation.type!=Operation.PRUNE):
                     self.logger.error('No result from borg despite no error in log')
-                    self.errors=Errors.ERRORS
-        elif self.current_operation.operation==Operation.CREATE:
-            with self._cond:
-                self.last_create_finished=time.monotonic()
-                self.last_create_when=self.current_operation.time.monotonic()
-        elif self.current_operation.operation==Operation.LIST:
+                    self.current_operation.add_error(Errors.ERRORS)
+        elif self.current_operation.type==Operation.LIST:
+            self.__handle_list_result(res)
+
+        # All other results are discarded
+
+    def __handle_list_result(self, res):
             ok=True
             latest=None
             if 'archives' in res and isinstance(res['archives'], list):
@@ -354,27 +403,24 @@
 
             if not ok:
                 with self._cond:
-                    self.errors=self.errors.combine(Errors.ERRORS)
+                    self.current_operation.add_error(Errors.ERRORS)
 
             if latest:
                 self.logger.info('borg info: Previous backup was on %s' % latest.isoformat())
-                realtime=time.mktime(latest.timetuple())
-                monotonic=realtime+time.monotonic()-time.time()
+                when=MonotonicTime.from_realtime(time.mktime(latest.timetuple()))
+                op=Operation(Operation.CREATE, when, reason='listed')
+                op.finish_time=when
                 with self._cond:
-                    self.last_create_finished=monotonic
-                    self.last_create_when=monotonic
+                    self.previous_operation_of_type[Operation.CREATE]=op
             else:
                 self.logger.info('borg info: Could not discover a previous backup')
-                with self._cond:
-                    self.last_create_finished=-1
-                    self.last_create_when=None
 
     def __do_launch(self, op, archive_or_repository,
                     common_params, op_params, paths=[]):
 
         self.logger.debug('Creating BorgInstance')
 
-        inst=BorgInstance(op.operation, archive_or_repository,
+        inst=BorgInstance(op.type, archive_or_repository,
                           common_params, op_params, paths)
 
         self.logger.debug('Launching BorgInstance via repository')
@@ -396,10 +442,9 @@
         self.current_operation=op
         # Update scheduled time to real starting time to schedule
         # next run relative to this
-        self.current_operation.time=dreamtime.MonotonicTime.now()
+        self.current_operation.start_time=MonotonicTime.now()
         self.state=State.ACTIVE
         # Reset error status when starting a new operation
-        self.errors=Errors.OK
         self.__update_status()
 
         t_log.start()
@@ -407,28 +452,28 @@
 
 
     def __launch(self, op):
-        self.logger.debug("Launching '%s'" % str(op.operation))
+        self.logger.debug("Launching '%s'" % str(op.type))
 
         params=(config.borg_parameters
                 +self.repository.borg_parameters
                 +self.borg_parameters)
 
-        if op.operation==Operation.CREATE:
+        if op.type==Operation.CREATE:
             archive="%s::%s%s" % (self.repository.location,
                                   self.archive_prefix,
                                   self.archive_template)
 
             self.__do_launch(op, archive, params.common,
                              params.create, self.paths)
-        elif op.operation==Operation.PRUNE:
+        elif op.type==Operation.PRUNE:
             self.__do_launch(op, self.repository.location, params.common,
-                             [{'prefix': self.archive_prefix}] + params.create)
+                             [{'prefix': self.archive_prefix}] + params.prune)
 
-        elif op.operation==Operation.LIST:
+        elif op.type==Operation.LIST:
             self.__do_launch(op, self.repository.location, params.common,
                              [{'prefix': self.archive_prefix}])
         else:
-            raise NotImplementedError("Invalid operation '%s'" % str(op.operation))
+            raise NotImplementedError("Invalid operation '%s'" % str(op.type))
 
     # This must be called with self._cond held.
     def __launch_and_wait(self):
@@ -443,6 +488,8 @@
             self.__wait_finish()
 
     def __wait_finish(self):
+        current=self.current_operation
+
         # Wait for main logger thread to terminate, or for us to be terminated
         while not self.terminate and self.thread_res.is_alive():
             self._cond.release()
@@ -465,14 +512,16 @@
 
         if not self.borg_instance.wait():
             self.logger.error('Borg subprocess did not terminate')
-            self.errors=self.errors.combine(Errors.ERRORS)
+            curent.add_error(Errors.ERRORS)
+
+        current.finish_time=MonotonicTime.now()
 
-        now=time.monotonic()
-        self.last_operation_finished=now
+        self.previous_operation_of_type[current.type]=current
+        self.previous_operation=current
+        self.current_operation=None
         self.thread_res=None
         self.thread_log=None
         self.borg_instance=None
-        self.current_operation=None
         self.state=State.INACTIVE
         self.__update_status()
 
@@ -525,17 +574,17 @@
         if not self.scheduled_operation:
             op=self.__next_operation_unlocked()
         if op:
-            self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" %
-                             (str(op.operation), op.detail or 'none',
-                              op.time.seconds_to(),
-                              op.time.__class__.__name__))
+            self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" %
+                             (str(op.type), op.detail or 'none',
+                              op.start_time.isoformat(),
+                              op.start_time.__class__.__name__))
 
             self.scheduled_operation=op
             self.state=State.SCHEDULED
             self.__update_status()
 
             # Wait under scheduled wait
-            self.scheduler.wait_until(op.time, self._cond, self.backup_name)
+            self.scheduler.wait_until(op.start_time, self._cond, self.backup_name)
         else:
             # Nothing scheduled - just wait
             self.logger.info("Waiting for manual scheduling")
@@ -564,52 +613,93 @@
                 self.__update_status()
 
     def __next_operation_unlocked(self):
-        if self.backup_interval==0:
-            # Manual backup
+        listop=self.__next_operation_list()
+        if listop:
+            return listop
+
+        create=self.__next_operation_type(Operation.CREATE,
+                                          self.backup_interval,
+                                          important=True,
+                                          initial_reason='initial');
+
+        prune=self.__next_operation_type(Operation.PRUNE,
+                                         self.prune_interval,
+                                         important=False,
+                                         initial_reason=None);
+
+        if not prune:
+            return create
+        elif not create:
+            return prune
+        elif create.start_time < prune.start_time:
+            return create
+        else:
+            return prune
+
+    def __next_operation_list(self):
+        reason='initial'
+        # Unless manual backup has been chosen (backup_interval<=0), perform 
+        # repository listing if no previous create operation known, or if we
+        # just pruned the repository
+        if self.backup_interval<=0:
             return None
-        elif self.last_create_finished==None:
-            # Don't know when last create finished: try to get it from
-            # archive list.
-            if not self.errors.ok() and self.retry_interval==0:
+        elif (self.previous_operation and
+              self.previous_operation.type==Operation.PRUNE):
+            tm=MonotonicTime.now()
+            reason='post-prune'
+        elif Operation.LIST in self.previous_operation_of_type:
+            prev=self.previous_operation_of_type[Operation.LIST]
+            if prev.ok():
+                return None
+            if self.retry_interval==0:
                 # Do not retry in case of errors if retry interval is zero
                 return None
-            elif self.last_operation_finished:
-                # Attempt ater retry interval if some operation has been run
-                # already
-                tm=dreamtime.MonotonicTime(self.last_operation_finished+self.retry_interval)
-            else:
-                # Nothing has been attempted: run immediately
-                tm=dreamtime.MonotonicTime.now()
-            return Operation(Operation.LIST, tm, reason='initial')
-        elif self.errors.ok() and self.last_create_finished<0:
-            # No backup exists; perform initial backup
-            tm=dreamtime.MonotonicTime.now()
-            return Operation(Operation.CREATE, tm, reason='initial')
-        elif not self.errors.ok():
-            # Retry create in case of errors unless retry has been disabled
-            if self.retry_interval==0:
+            # Attempt after retry interval
+            tm=MonotonicTime.after_other(prev.finish_time, self.retry_interval)
+        else:
+            # Nothing has been attempted: run immediately
+            tm=MonotonicTime.now()
+        return Operation(Operation.LIST, tm, reason=reason)
+
+    def __next_operation_type(self, optype, standard_interval,
+                              important=False,
+                              initial_reason=None):
+        if optype not in self.previous_operation_of_type:
+            # No previous operation exists; perform immediately
+            # if important, otherwise after standard interval.
+            # Do not perform if manual operation selected by
+            # setting standard_interval<=0
+            if standard_interval<=0:
                 return None
             else:
-                if self.last_create_finished<0:
-                    basetime=time.monotonic()
+                if important:
+                    tm=MonotonicTime.now()
+                else:
+                    tm=self.timeclass.after(standard_interval)
+                if initial_reason:
+                    return Operation(optype, tm, reason=initial_reason)
                 else:
-                    basetime=self.last_create_finished
-                tm=dreamtime.MonotonicTime(basetime+self.retry_interval)
-                return Operation(Operation.CREATE, tm, reason='retry')
+                    return Operation(optype, tm)
         else:
-            # All ok - run create at standard backup interval
-            tm=self.timeclass.from_monotonic(self.last_create_when+self.backup_interval)
-            return Operation(Operation.CREATE, tm)
+            # Previous operation has been performed; perform after
+            # retry interval if there were errors, otherwise after
+            # standard interval.
+            prev=self.previous_operation_of_type[optype]
+            if not prev.ok():
+                tm=MonotonicTime.after_other(prev.start_time,
+                                             self.retry_interval)
+                return Operation(optype, tm, reason='retry')
+            elif standard_interval>0:
+                tm=self.timeclass.after_other(prev.start_time,
+                                              standard_interval)
+                return Operation(optype, tm)
+            else:
+                # Manual operation is standard_interval is zero.
+                return None
 
     def __status_unlocked(self):
         callback=self.__status_update_callback
-
-        if self.current_operation:
-            status=Status(self, self.current_operation)
-        elif self.scheduled_operation:
-            status=Status(self, self.scheduled_operation)
-        else:
-            status=Status(self)
+        status=Status(self)
 
         return status, callback
 
@@ -638,13 +728,13 @@
         return res[0]
 
     def create(self):
-        op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual')
+        op=Operation(Operation.CREATE, MonotonicTime.now(), reason='manual')
         with self._cond:
             self.scheduled_operation=op
             self._cond.notify()
 
     def prune(self):
-        op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual')
+        op=Operation(Operation.PRUNE, MonotonicTime.now(), reason='manual')
         with self._cond:
             self.scheduled_operation=op
             self._cond.notify()

mercurial