Improved backup main thread loop, etc.

Tue, 23 Jan 2018 22:53:51 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Tue, 23 Jan 2018 22:53:51 +0000
changeset 55
407af23d16bb
parent 54
cfcaa5f6ba33
child 57
121aa89fe670

Improved backup main thread loop, etc.

backup.py file | annotate | diff | comparison | revisions
scheduler.py file | annotate | diff | comparison | revisions
ui.py file | annotate | diff | comparison | revisions
--- a/backup.py	Mon Jan 22 22:23:01 2018 +0000
+++ b/backup.py	Tue Jan 23 22:53:51 2018 +0000
@@ -17,10 +17,11 @@
 # State
 INACTIVE=0
 SCHEDULED=1
-ACTIVE=2
-BUSY=3
-OFFLINE=4
-ERRORS=5
+QUEUED=2
+ACTIVE=3
+BUSY=4
+OFFLINE=5
+ERRORS=6
 
 def combine_state(state1, state2):
     return max(state1, state2)
@@ -54,6 +55,8 @@
 
         self._name=config.check_string(cfg, 'name', 'Name', loc0)
 
+        self.logger=logger.getChild(self._name)
+
         self.loc='backup target "%s"' % self._name
 
         reponame=config.check_string(cfg, 'repository',
@@ -105,14 +108,14 @@
         acc=self.__keychain_account
         if not self.__passphrase:
             if acc and acc!='':
-                logger.debug('Requesting passphrase')
+                self.logger.debug('Requesting passphrase')
                 try:
                     pw=keyring.get_password("borg-backup", acc)
                 except Exception as err:
-                    logger.error('Failed to retrieve passphrase')
+                    self.logger.error('Failed to retrieve passphrase')
                     raise err
                 else:
-                    logger.debug('Received passphrase')
+                    self.logger.debug('Received passphrase')
                 self.__passphrase=pw
             else:
                 self.__passphrase=None
@@ -121,15 +124,18 @@
     def __init__(self, identifier, cfg, scheduler):
         self.identifier=identifier
         self.config=config
-        self.lastrun_when=None
+        self.__status_update_callback=None
+        self.scheduler=scheduler
+        self.logger=None # setup up __decode_config once backup name is known
+
         self.borg_instance=None
-        self.current_operation=None
         self.thread_log=None
         self.thread_res=None
+
+        self.current_operation=None
         self.scheduled_operation=None
-        self.__status_update_callback=None
+        self.lastrun_when=None
         self.state=INACTIVE
-        self.scheduler=scheduler
 
         self.__decode_config(cfg)
 
@@ -154,10 +160,10 @@
         assert(not running)
 
     def __log_listener(self):
-        logger.debug('Log listener thread waiting for entries')
+        self.logger.debug('Log listener thread waiting for entries')
         success=True
         for status in iter(self.borg_instance.read_log, None):
-            logger.debug(str(status))
+            self.logger.debug(str(status))
             t=status['type']
 
             errors_this_message=None
@@ -197,7 +203,7 @@
                 if 'name' not in status:
                     status['name']='borg'
                 lvl=translate_loglevel(status['levelname'])
-                logger.log(lvl, status['name'] + ': ' + status['message'])
+                self.logger.log(lvl, status['name'] + ': ' + status['message'])
                 if lvl>=logging.WARNING:
                     errors_this_message=status
                     state=ERRORS
@@ -210,24 +216,25 @@
                         self.state=combine_state(self.state, state)
                         status, callback=self.__status_unlocked()
             else:
-                logger.debug('Unrecognised log entry %s' % str(status))
+                self.logger.debug('Unrecognised log entry %s' % str(status))
 
             if callback:
-                callback(self, status, errors=errors_this_message)
+                callback(status, errors=errors_this_message)
 
-        logger.debug('Waiting for borg subprocess to terminate in log thread')
+        self.logger.debug('Waiting for borg subprocess to terminate in log thread')
 
         self.borg_instance.wait()
 
-        logger.debug('Borg subprocess terminated; terminating log listener thread')
+        self.logger.debug('Borg subprocess terminated; terminating log listener thread')
 
     def __result_listener(self):
-        with self._cond:
-            status, callback=self.__status_unlocked()
-        if callback:
-            callback(self, status)
+        # self.state=ACTIVE
+        # with self._cond:
+        #     status, callback=self.__status_unlocked()
+        # if callback:
+        #     callback(status)
 
-        logger.debug('Result listener thread waiting for result')
+        self.logger.debug('Result listener thread waiting for result')
 
         res=self.borg_instance.read_result()
 
@@ -241,19 +248,19 @@
         if state==ACTIVE:
             state=INACTIVE
 
-        logger.debug('Borg result: %s' % str(res))
+        self.logger.debug('Borg result: %s' % str(res))
 
         if res is None and state==INACTIVE:
-            logger.error('No result from borg despite no error in log')
+            self.logger.error('No result from borg despite no error in log')
             state=ERRORS
 
-        logger.debug('Waiting for borg subprocess to terminate in result thread')
+        self.logger.debug('Waiting for borg subprocess to terminate in result thread')
 
         if not self.borg_instance.wait():
-            logger.critical('Borg subprocess did not terminate')
+            self.logger.error('Borg subprocess did not terminate')
             state=combine_state(state, ERRORS)
 
-        logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state))
+        self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state))
 
         with self._cond:
             if self.current_operation['operation']=='create':
@@ -263,6 +270,7 @@
             self.borg_instance=None
             self.current_operation=None
             self.state=state
+            self.__update_status()
             self._cond.notify()
 
     def __do_launch(self, op, archive_or_repository, *args):
@@ -271,7 +279,7 @@
         inst=BorgInstance(op['operation'], archive_or_repository, *args)
         inst.launch(passphrase=passphrase)
 
-        logger.debug('Creating listener threads')
+        self.logger.debug('Creating listener threads')
 
         t_log=Thread(target=self.__log_listener)
         t_log.daemon=True
@@ -282,62 +290,118 @@
         self.thread_log=t_log
         self.thread_res=t_res
         self.borg_instance=inst
-        self.current_operation=op
-        self.current_operation['when_monotonic']=time.monotonic()
-        self.state=ACTIVE
 
         t_log.start()
         t_res.start()
 
     def __launch(self, op):
-        if self.__is_running_unlocked():
-            logging.info('Cannot start %s: already running %s'
-                         % (operation, self.current_operation))
-            return False
+        self.logger.debug("Launching '%s'" % op['operation'])
+
+        if op['operation']=='create':
+            archive="%s::%s%s" % (self.repository.repository_name,
+                                  self.archive_prefix,
+                                  self.archive_template)
+
+            self.__do_launch(op, archive,
+                             self.common_parameters+self.create_parameters,
+                             self.paths)
+        elif op['operation']=='prune':
+            self.__do_launch(op, self.repository.repository_name,
+                             ([{'prefix': self.archive_prefix}] + 
+                              self.common_parameters +
+                              self.prune_parameters))
         else:
-            try:
-                logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name))
-
-                if op['operation']=='create':
-                    archive="%s::%s%s" % (self.repository.repository_name,
-                                          self.archive_prefix,
-                                          self.archive_template)
+            raise NotImplementedError("Invalid operation '%s'" % op['operation'])
 
-                    self.__do_launch(op, archive,
-                                     self.common_parameters+self.create_parameters,
-                                     self.paths)
-                elif op['operation']=='prune':
-                    self.__do_launch(op, self.repository.repository_name,
-                                     ([{'prefix': self.archive_prefix}] + 
-                                      self.common_parameters +
-                                      self.prune_parameters))
-                else:
-                    raise NotImplementedError("Invalid operation '%s'" % op['operation'])
+    def __launch_check(self):
+        op=self.scheduled_operation
+        if not op:
+            self.logger.debug("Queued operation aborted")
+        else:
+            self.scheduled_operation=None
+            self.current_operation=op
+            self.current_operation['when_monotonic']=time.monotonic()
+
+            self.__launch(op)
+
+            self.state=ACTIVE
+            self.__update_status()
+
+
+    def __main_thread(self):
+        with self._cond:
+            try:
+                while not self._terminate:
+                    self.__main_thread_wait_finish()
+                    if not self._terminate:
+                        self.__main_thread_wait_schedule()
+                        if not self._terminate:
+                            self.__main_thread_queue_and_launch()
             except Exception as err:
-                logger.debug('Rescheduling after failure')
+                self.logger.exception("Error with backup '%s'" % self._name)
                 self.lastrun_when=time.monotonic()
                 self.state=ERRORS
-                raise err
+                self.scheduled_operation=None
 
-            return True
+            # Clean up to terminate: kill borg instance and communication threads
+            if self.borg_instance:
+                self.logger.debug("Terminating a borg instance")
+                self.borg_instance.terminate()
+
+            # Store threads to use outside lock
+            thread_log=self.thread_log
+            thread_res=self.thread_res
+
+        self.logger.debug("Waiting for log and result threads to terminate")
 
-    def create(self):
-        op={'operation': 'create', 'detail': 'manual'}
-        with self._cond:
-            self.scheduled_operation=op
-            self._cond.notify()
+        if thread_log:
+            thread_log.join()
+
+        if thread_res:
+            thread_res.join()
+
+
+    # Main thread/1. Wait while a current operation is running
+    def __main_thread_wait_finish(self):
+        while self.current_operation and not self._terminate:
+            self.logger.debug("Waiting for current operation to finish")
+            self._cond.wait()
 
-    def prune(self):
-        op={'operation': 'prune', 'detail': 'manual'}
-        with self._cond:
+    # Main thread/2. Schedule next operation if there is no manually
+    # requested one
+    def __main_thread_wait_schedule(self):
+        op=None
+        if not self.scheduled_operation:
+            op=self.__next_operation_unlocked()
+        if op:
+            now=time.monotonic()
+            delay=max(0, op['when_monotonic']-now)
+            self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" %
+                             (op['operation'], op['detail'],  delay))
+
             self.scheduled_operation=op
-            self._cond.notify()
+            self.state=combine_state(self.state, SCHEDULED)
+            self.__update_status()
 
-    # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
-    def abort(self):
-        with self._cond:
-            if self.borg_instance:
-                self.borg_instance.terminate()
+            # Wait under scheduled wait
+            self.scheduler.wait_until(now+delay, self._cond, self._name)
+        else:
+            # Nothing scheduled - just wait
+            self.logger.debug("Waiting for manual scheduling")
+            self._cond.wait()
+
+    # Main thread/3. If there is a scheduled operation (it might have been
+    # changed manually from 'op' created in __main_thread_wait_schedule above),
+    # queue it on the repository, and launch the operation once repository
+    # available
+    def __main_thread_queue_and_launch(self):
+        if self.scheduled_operation:
+            self.logger.debug("Queuing")
+            self.state=combine_state(self.state, QUEUED)
+            self.__update_status()
+            self.repository.queue_action(self._cond,
+                                         action=self.__launch_check,
+                                         name=self._name)
 
     def __next_operation_unlocked(self):
         # TODO: pruning as well
@@ -367,50 +431,6 @@
                         'detail': 'normal',
                         'when_monotonic': self.lastrun_when+self.backup_interval}
 
-    def __main_thread(self):
-        with self._cond:
-            while not self._terminate:
-                op=None
-                if not self.current_operation:
-                    op=self.__next_operation_unlocked()
-                if not op:
-                    self.__update_status()
-                    self._cond.wait()
-                else:
-                    now=time.monotonic()
-                    delay=max(0, op['when_monotonic']-now)
-                    logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" %
-                                (op['operation'], op['detail'], self._name, delay))
-
-                    self.scheduled_operation=op
-                    self.state=combine_state(self.state, SCHEDULED)
-
-                    self.__update_status()
-                    self.scheduler.wait_until(now+delay, self._cond, self._name)
-
-                if self.scheduled_operation:
-                    op=self.scheduled_operation
-                    self.scheduled_operation=None
-                    self.repository.queue_action(self._cond, name=self._name,
-                                                 action=lambda: self.__launch(op))
-            # Kill a running borg to cause log and result threads to terminate
-            if self.borg_instance:
-                logger.debug("Terminating a borg instance")
-                self.borg_instance.terminate()
-
-            # Store threads to use outside lock
-            thread_log=self.thread_log
-            thread_err=self.thread_err
-
-        logger.debug("Waiting for log and result threads to terminate")
-
-        if thread_log:
-            thread_log.join()
-
-        if thread_res:
-            thread_res.join()
-
-
     def __status_unlocked(self):
         callback=self.__status_update_callback
 
@@ -421,7 +441,10 @@
         else:
             if self.scheduled_operation:
                 status=self.scheduled_operation
-                status['type']='scheduled'
+                if self.state==QUEUED:
+                    status['type']='queued'
+                else:
+                    status['type']='scheduled'
             else:
                 status={'type': 'nothing'}
 
@@ -440,11 +463,17 @@
     def __update_status(self):
         status, callback = self.__status_unlocked()
         if callback:
-            self._cond.release()
+            #self._cond.release()
             try:
-                callback(self, status)
-            finally:
-                self._cond.acquire()
+                callback(status)
+            except Exception:
+                self.logger.exception("Status update error")
+            #finally:
+            #    self._cond.acquire()
+
+    #
+    # Interface functions
+    #
 
     def set_status_update_callback(self, callback):
         with self._cond:
@@ -455,3 +484,21 @@
             res=self.__status_unlocked()
         return res[0]
 
+    def create(self):
+        op={'operation': 'create', 'detail': 'manual'}
+        with self._cond:
+            self.scheduled_operation=op
+            self._cond.notify()
+
+    def prune(self):
+        op={'operation': 'prune', 'detail': 'manual'}
+        with self._cond:
+            self.scheduled_operation=op
+            self._cond.notify()
+
+    # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
+    def abort(self):
+        with self._cond:
+            if self.borg_instance:
+                self.borg_instance.terminate()
+
--- a/scheduler.py	Mon Jan 22 22:23:01 2018 +0000
+++ b/scheduler.py	Tue Jan 23 22:53:51 2018 +0000
@@ -113,7 +113,7 @@
 
                 while self._list and self._list.when <= now:
                     ev=self._list
-                    logger.info("%s: Scheduling event %s" % self.name(), str(ev.name))
+                    logger.debug("Scheduling event %s" % (ev.name or "(unknown)"))
                     # We are only allowed to remove ev from list when ev.cond allows
                     with ev.cond:
                         self._list=ev.next
--- a/ui.py	Mon Jan 22 22:23:01 2018 +0000
+++ b/ui.py	Tue Jan 23 22:53:51 2018 +0000
@@ -17,6 +17,7 @@
 traynames={
     backup.INACTIVE: 'B.',
     backup.SCHEDULED: 'B.',
+    backup.QUEUED: 'B:',
     backup.ACTIVE: 'B!',
     backup.BUSY: 'B⦙',
     backup.OFFLINE: 'B⦙',
@@ -26,6 +27,7 @@
 statestring={
     backup.INACTIVE: 'inactive',
     backup.SCHEDULED: 'scheduled',
+    backup.QUEUED: 'queued',
     backup.ACTIVE: 'active',
     backup.BUSY: 'busy',
     backup.OFFLINE: 'offline',
@@ -95,6 +97,8 @@
             if status['detail']!='normal':
                 detail=detail+status['detail']+' '
         title="%s (%s%s %s)" % (status['name'], detail, status['operation'], whenstr)
+    elif status['type']=='queued':
+        title="%s (queued)" % status['name']
     elif status['type']=='current':
         # Operation running
         progress=''
@@ -130,8 +134,8 @@
                 b=backups[index]
                 # Python closures suck dog's balls; hence the _index=index hack
                 # See also http://math.andrej.com/2009/04/09/pythons-lambda-is-broken/
-                cb=(lambda obj, status, _index=index, errors=None:
-                    self.__status_callback(obj, _index, status, errors))
+                cb=(lambda status, errors=None, _index=index:
+                    self.__status_callback(_index, status, errorlog=errors))
                 b.set_status_update_callback(cb)
                 self.statuses[index]=b.status()
 
@@ -177,7 +181,7 @@
             self.menu.update(menu)
             self.title=traynames[active]
 
-    def __status_callback(self, obj, index, status, errorlog):
+    def __status_callback(self, index, status, errorlog=None):
         logger.debug('Status callback: %s' % str(status))
 
         with self.lock:

mercurial