backup.py

changeset 61
bc6c3d74e6ea
parent 58
170d69da51bb
child 62
b7d13b2ad67e
--- a/backup.py	Wed Jan 24 09:19:42 2018 +0000
+++ b/backup.py	Wed Jan 24 20:18:45 2018 +0000
@@ -8,30 +8,41 @@
 import keyring
 import borgend
 import repository
+from enum import IntEnum
 from instance import BorgInstance
 from threading import Thread, Lock, Condition
 from scheduler import TerminableThread
 
 logger=borgend.logger.getChild(__name__)
 
-# State
-INACTIVE=0
-SCHEDULED=1
-QUEUED=2
-ACTIVE=3
-BUSY=4
-OFFLINE=5
-ERRORS=6
+class State(IntEnum):
+    # State
+    INACTIVE=0
+    SCHEDULED=1
+    QUEUED=2
+    ACTIVE=3
+
+
+class Errors(IntEnum):
+    OK=0
+    BUSY=1
+    OFFLINE=2
+    ERRORS=3
 
-def combine_state(state1, state2):
-    return max(state1, state2)
+    def combine(self, other):
+        return max(self, other)
+
+    def ok(self):
+        return self==self.OK
 
-loglevel_translation={
-    'CRITICAL': logging.CRITICAL,
-    'ERROR': logging.ERROR,
-    'WARNING': logging.WARNING,
-    'DEBUG': logging.DEBUG,
-    'INFO': logging.INFO
+    def __str__(self):
+        return _errorstring[self]
+
+_errorstring={
+    Errors.OK: 'ok',
+    Errors.BUSY: 'busy',
+    Errors.OFFLINE: 'offline',
+    Errors.ERRORS: 'errors'
 }
 
 def translate_loglevel(x):
@@ -47,6 +58,13 @@
             return tmp
     return None
 
+loglevel_translation={
+    'CRITICAL': logging.CRITICAL,
+    'ERROR': logging.ERROR,
+    'WARNING': logging.WARNING,
+    'DEBUG': logging.DEBUG,
+    'INFO': logging.INFO
+}
 
 class Backup(TerminableThread):
 
@@ -135,7 +153,8 @@
         self.current_operation=None
         self.scheduled_operation=None
         self.lastrun_when=None
-        self.state=INACTIVE
+        self.state=State.INACTIVE
+        self.errors=Errors.OK
 
         self.__decode_config(cfg)
 
@@ -162,16 +181,16 @@
     def __log_listener(self):
         self.logger.debug('Log listener thread waiting for entries')
         success=True
-        for status in iter(self.borg_instance.read_log, None):
-            self.logger.debug(str(status))
-            t=status['type']
+        for msg in iter(self.borg_instance.read_log, None):
+            self.logger.debug(str(msg))
+            t=msg['type']
 
-            errors_this_message=None
+            errormsg=None
             callback=None
 
             if t=='progress_percent':
-                current=safe_get_int(status, 'current')
-                total=safe_get_int(status, 'total')
+                current=safe_get_int(msg, 'current')
+                total=safe_get_int(msg, 'total')
                 if current is not None and total is not None:
                     with self._cond:
                         self.current_operation['progress_current']=current
@@ -179,9 +198,9 @@
                         status, callback=self.__status_unlocked()
 
             elif t=='archive_progress':
-                original_size=safe_get_int(status, 'original_size')
-                compressed_size=safe_get_int(status, 'compressed_size')
-                deduplicated_size=safe_get_int(status, 'deduplicated_size')
+                original_size=safe_get_int(msg, 'original_size')
+                compressed_size=safe_get_int(msg, 'compressed_size')
+                deduplicated_size=safe_get_int(msg, 'deduplicated_size')
                 if original_size is not None and original_size is not None and deduplicated_size is not None:
                     with self._cond:
                         self.current_operation['original_size']=original_size
@@ -196,30 +215,30 @@
                 pass
 
             elif t=='log_message':
-                if 'levelname' not in status:
-                    status['levelname']='ERROR'
-                if 'message' not in status:
-                    status['message']='UNKNOWN'
-                if 'name' not in status:
-                    status['name']='borg'
-                lvl=translate_loglevel(status['levelname'])
-                self.logger.log(lvl, status['name'] + ': ' + status['message'])
+                if 'levelname' not in msg:
+                    msg['levelname']='ERROR'
+                if 'message' not in msg:
+                    msg['message']='UNKNOWN'
+                if 'name' not in msg:
+                    msg['name']='borg'
+                lvl=translate_loglevel(msg['levelname'])
+                self.logger.log(lvl, msg['name'] + ': ' + msg['message'])
                 if lvl>=logging.WARNING:
-                    errors_this_message=status
-                    state=ERRORS
-                    if ('msgid' in status and
-                        (status['msgid']=='LockTimeout' or # observed in reality
-                         status['msgid']=='LockErrorT' or # in docs
-                         status['msgid']=='LockErrorT')): # in docs
-                        state=BUSY
+                    errormsg=msg
+                    errors=Errors.ERRORS
+                    if ('msgid' in msg and
+                        (msg['msgid']=='LockTimeout' or # observed in reality
+                         msg['msgid']=='LockErrorT' or # in docs
+                         msg['msgid']=='LockErrorT')): # in docs
+                        errors=Errors.BUSY
                     with self._cond:
-                        self.state=combine_state(self.state, state)
+                        self.errors=self.errors.combine(errors)
                         status, callback=self.__status_unlocked()
             else:
                 self.logger.debug('Unrecognised log entry %s' % str(status))
 
             if callback:
-                callback(status, errors=errors_this_message)
+                callback(status, errors=errormsg)
 
         self.logger.debug('Waiting for borg subprocess to terminate in log thread')
 
@@ -228,12 +247,6 @@
         self.logger.debug('Borg subprocess terminated; terminating log listener thread')
 
     def __result_listener(self):
-        # self.state=ACTIVE
-        # with self._cond:
-        #     status, callback=self.__status_unlocked()
-        # if callback:
-        #     callback(status)
-
         self.logger.debug('Result listener thread waiting for result')
 
         res=self.borg_instance.read_result()
@@ -242,25 +255,23 @@
         self.thread_log.join()
 
         with self._cond:
-            state=self.state
-
-        # If there were no errors, reset back to INACTIVE state
-        if state==ACTIVE:
-            state=INACTIVE
+            errors=self.errors
 
         self.logger.debug('Borg result: %s' % str(res))
 
-        if res is None and state==INACTIVE:
+        if res is None:
             self.logger.error('No result from borg despite no error in log')
-            state=ERRORS
+            if errors.ok():
+                errors=Errors.ERRORS
 
         self.logger.debug('Waiting for borg subprocess to terminate in result thread')
 
         if not self.borg_instance.wait():
             self.logger.error('Borg subprocess did not terminate')
-            state=combine_state(state, ERRORS)
+            if errors.ok():
+                errors=Errors.ERRORS
 
-        self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state))
+        self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors))
 
         with self._cond:
             if self.current_operation['operation']=='create':
@@ -269,7 +280,8 @@
             self.thread_log=None
             self.borg_instance=None
             self.current_operation=None
-            self.state=state
+            self.state=State.INACTIVE
+            self.errors=errors
             self.__update_status()
             self._cond.notify()
 
@@ -313,6 +325,7 @@
         else:
             raise NotImplementedError("Invalid operation '%s'" % op['operation'])
 
+    # This must be called with self._cond held.
     def __launch_check(self):
         op=self.scheduled_operation
         if not op:
@@ -324,7 +337,9 @@
 
             self.current_operation=op
             self.current_operation['when_monotonic']=time.monotonic()
-            self.state=ACTIVE
+            self.state=State.ACTIVE
+            # Reset error status when starting a new operation
+            self.errors=Errors.OK
             self.__update_status()
 
 
@@ -339,9 +354,10 @@
                             self.__main_thread_queue_and_launch()
             except Exception as err:
                 self.logger.exception("Error with backup '%s'" % self._name)
-                self.lastrun_when=time.monotonic()
-                self.state=ERRORS
-                self.scheduled_operation=None
+                self.errors=Errors.ERRORS
+
+            self.state=State.INACTIVE
+            self.scheduled_operation=None
 
             # Clean up to terminate: kill borg instance and communication threads
             if self.borg_instance:
@@ -351,6 +367,8 @@
             # Store threads to use outside lock
             thread_log=self.thread_log
             thread_res=self.thread_res
+            self.thread_log=None
+            self.thread_res=None
 
         self.logger.debug("Waiting for log and result threads to terminate")
 
@@ -380,14 +398,18 @@
                              (op['operation'], op['detail'],  delay))
 
             self.scheduled_operation=op
-            self.state=combine_state(self.state, SCHEDULED)
+            self.state=State.SCHEDULED
             self.__update_status()
 
             # Wait under scheduled wait
             self.scheduler.wait_until(now+delay, self._cond, self._name)
         else:
             # Nothing scheduled - just wait
-            self.logger.debug("Waiting for manual scheduling")
+            self.logger.info("Waiting for manual scheduling")
+
+            self.state=State.INACTIVE
+            self.__update_status()
+
             self._cond.wait()
 
     # Main thread/3. If there is a scheduled operation (it might have been
@@ -397,7 +419,7 @@
     def __main_thread_queue_and_launch(self):
         if self.scheduled_operation:
             self.logger.debug("Queuing")
-            self.state=combine_state(self.state, QUEUED)
+            self.state=State.QUEUED
             self.__update_status()
             self.repository.queue_action(self._cond,
                                          action=self.__launch_check,
@@ -416,7 +438,7 @@
                 return {'operation': 'create',
                         'detail': 'initial',
                         'when_monotonic': now+initial_interval}
-        elif self.state>=BUSY:
+        elif not self.errors.ok():
             if self.retry_interval==0:
                 return None
             else:
@@ -436,20 +458,14 @@
 
         if self.current_operation:
             status=self.current_operation
-            status['type']='current'
-            # Errors should be set by listeners
+        elif self.scheduled_operation:
+            status=self.scheduled_operation
         else:
-            if self.scheduled_operation:
-                status=self.scheduled_operation
-                if self.state==QUEUED:
-                    status['type']='queued'
-                else:
-                    status['type']='scheduled'
-            else:
-                status={'type': 'nothing'}
+            status={'type': 'nothing'}
 
         status['name']=self._name
         status['state']=self.state
+        status['errors']=self.errors
 
         if 'detail' not in status:
             status['detail']='NONE'

mercurial