# HG changeset patch # User Tuomo Valkonen # Date 1516748031 0 # Node ID 407af23d16bb8d864ad381944dcdf511ee1793ed # Parent cfcaa5f6ba3336966b1bbdae891e1b8c72ec4af8 Improved backup main thread loop, etc. diff -r cfcaa5f6ba33 -r 407af23d16bb backup.py --- a/backup.py Mon Jan 22 22:23:01 2018 +0000 +++ b/backup.py Tue Jan 23 22:53:51 2018 +0000 @@ -17,10 +17,11 @@ # State INACTIVE=0 SCHEDULED=1 -ACTIVE=2 -BUSY=3 -OFFLINE=4 -ERRORS=5 +QUEUED=2 +ACTIVE=3 +BUSY=4 +OFFLINE=5 +ERRORS=6 def combine_state(state1, state2): return max(state1, state2) @@ -54,6 +55,8 @@ self._name=config.check_string(cfg, 'name', 'Name', loc0) + self.logger=logger.getChild(self._name) + self.loc='backup target "%s"' % self._name reponame=config.check_string(cfg, 'repository', @@ -105,14 +108,14 @@ acc=self.__keychain_account if not self.__passphrase: if acc and acc!='': - logger.debug('Requesting passphrase') + self.logger.debug('Requesting passphrase') try: pw=keyring.get_password("borg-backup", acc) except Exception as err: - logger.error('Failed to retrieve passphrase') + self.logger.error('Failed to retrieve passphrase') raise err else: - logger.debug('Received passphrase') + self.logger.debug('Received passphrase') self.__passphrase=pw else: self.__passphrase=None @@ -121,15 +124,18 @@ def __init__(self, identifier, cfg, scheduler): self.identifier=identifier self.config=config - self.lastrun_when=None + self.__status_update_callback=None + self.scheduler=scheduler + self.logger=None # setup up __decode_config once backup name is known + self.borg_instance=None - self.current_operation=None self.thread_log=None self.thread_res=None + + self.current_operation=None self.scheduled_operation=None - self.__status_update_callback=None + self.lastrun_when=None self.state=INACTIVE - self.scheduler=scheduler self.__decode_config(cfg) @@ -154,10 +160,10 @@ assert(not running) def __log_listener(self): - logger.debug('Log listener thread waiting for entries') + self.logger.debug('Log listener thread waiting for entries') success=True for status in iter(self.borg_instance.read_log, None): - logger.debug(str(status)) + self.logger.debug(str(status)) t=status['type'] errors_this_message=None @@ -197,7 +203,7 @@ if 'name' not in status: status['name']='borg' lvl=translate_loglevel(status['levelname']) - logger.log(lvl, status['name'] + ': ' + status['message']) + self.logger.log(lvl, status['name'] + ': ' + status['message']) if lvl>=logging.WARNING: errors_this_message=status state=ERRORS @@ -210,24 +216,25 @@ self.state=combine_state(self.state, state) status, callback=self.__status_unlocked() else: - logger.debug('Unrecognised log entry %s' % str(status)) + self.logger.debug('Unrecognised log entry %s' % str(status)) if callback: - callback(self, status, errors=errors_this_message) + callback(status, errors=errors_this_message) - logger.debug('Waiting for borg subprocess to terminate in log thread') + self.logger.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() - logger.debug('Borg subprocess terminated; terminating log listener thread') + self.logger.debug('Borg subprocess terminated; terminating log listener thread') def __result_listener(self): - with self._cond: - status, callback=self.__status_unlocked() - if callback: - callback(self, status) + # self.state=ACTIVE + # with self._cond: + # status, callback=self.__status_unlocked() + # if callback: + # callback(status) - logger.debug('Result listener thread waiting for result') + self.logger.debug('Result listener thread waiting for result') res=self.borg_instance.read_result() @@ -241,19 +248,19 @@ if state==ACTIVE: state=INACTIVE - logger.debug('Borg result: %s' % str(res)) + self.logger.debug('Borg result: %s' % str(res)) if res is None and state==INACTIVE: - logger.error('No result from borg despite no error in log') + self.logger.error('No result from borg despite no error in log') state=ERRORS - logger.debug('Waiting for borg subprocess to terminate in result thread') + self.logger.debug('Waiting for borg subprocess to terminate in result thread') if not self.borg_instance.wait(): - logger.critical('Borg subprocess did not terminate') + self.logger.error('Borg subprocess did not terminate') state=combine_state(state, ERRORS) - logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) + self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) with self._cond: if self.current_operation['operation']=='create': @@ -263,6 +270,7 @@ self.borg_instance=None self.current_operation=None self.state=state + self.__update_status() self._cond.notify() def __do_launch(self, op, archive_or_repository, *args): @@ -271,7 +279,7 @@ inst=BorgInstance(op['operation'], archive_or_repository, *args) inst.launch(passphrase=passphrase) - logger.debug('Creating listener threads') + self.logger.debug('Creating listener threads') t_log=Thread(target=self.__log_listener) t_log.daemon=True @@ -282,62 +290,118 @@ self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst - self.current_operation=op - self.current_operation['when_monotonic']=time.monotonic() - self.state=ACTIVE t_log.start() t_res.start() def __launch(self, op): - if self.__is_running_unlocked(): - logging.info('Cannot start %s: already running %s' - % (operation, self.current_operation)) - return False + self.logger.debug("Launching '%s'" % op['operation']) + + if op['operation']=='create': + archive="%s::%s%s" % (self.repository.repository_name, + self.archive_prefix, + self.archive_template) + + self.__do_launch(op, archive, + self.common_parameters+self.create_parameters, + self.paths) + elif op['operation']=='prune': + self.__do_launch(op, self.repository.repository_name, + ([{'prefix': self.archive_prefix}] + + self.common_parameters + + self.prune_parameters)) else: - try: - logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) - - if op['operation']=='create': - archive="%s::%s%s" % (self.repository.repository_name, - self.archive_prefix, - self.archive_template) + raise NotImplementedError("Invalid operation '%s'" % op['operation']) - self.__do_launch(op, archive, - self.common_parameters+self.create_parameters, - self.paths) - elif op['operation']=='prune': - self.__do_launch(op, self.repository.repository_name, - ([{'prefix': self.archive_prefix}] + - self.common_parameters + - self.prune_parameters)) - else: - raise NotImplementedError("Invalid operation '%s'" % op['operation']) + def __launch_check(self): + op=self.scheduled_operation + if not op: + self.logger.debug("Queued operation aborted") + else: + self.scheduled_operation=None + self.current_operation=op + self.current_operation['when_monotonic']=time.monotonic() + + self.__launch(op) + + self.state=ACTIVE + self.__update_status() + + + def __main_thread(self): + with self._cond: + try: + while not self._terminate: + self.__main_thread_wait_finish() + if not self._terminate: + self.__main_thread_wait_schedule() + if not self._terminate: + self.__main_thread_queue_and_launch() except Exception as err: - logger.debug('Rescheduling after failure') + self.logger.exception("Error with backup '%s'" % self._name) self.lastrun_when=time.monotonic() self.state=ERRORS - raise err + self.scheduled_operation=None - return True + # Clean up to terminate: kill borg instance and communication threads + if self.borg_instance: + self.logger.debug("Terminating a borg instance") + self.borg_instance.terminate() + + # Store threads to use outside lock + thread_log=self.thread_log + thread_res=self.thread_res + + self.logger.debug("Waiting for log and result threads to terminate") - def create(self): - op={'operation': 'create', 'detail': 'manual'} - with self._cond: - self.scheduled_operation=op - self._cond.notify() + if thread_log: + thread_log.join() + + if thread_res: + thread_res.join() + + + # Main thread/1. Wait while a current operation is running + def __main_thread_wait_finish(self): + while self.current_operation and not self._terminate: + self.logger.debug("Waiting for current operation to finish") + self._cond.wait() - def prune(self): - op={'operation': 'prune', 'detail': 'manual'} - with self._cond: + # Main thread/2. Schedule next operation if there is no manually + # requested one + def __main_thread_wait_schedule(self): + op=None + if not self.scheduled_operation: + op=self.__next_operation_unlocked() + if op: + now=time.monotonic() + delay=max(0, op['when_monotonic']-now) + self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % + (op['operation'], op['detail'], delay)) + self.scheduled_operation=op - self._cond.notify() + self.state=combine_state(self.state, SCHEDULED) + self.__update_status() - # TODO: Decide exact (manual) abort mechanism. Perhaps two stages - def abort(self): - with self._cond: - if self.borg_instance: - self.borg_instance.terminate() + # Wait under scheduled wait + self.scheduler.wait_until(now+delay, self._cond, self._name) + else: + # Nothing scheduled - just wait + self.logger.debug("Waiting for manual scheduling") + self._cond.wait() + + # Main thread/3. If there is a scheduled operation (it might have been + # changed manually from 'op' created in __main_thread_wait_schedule above), + # queue it on the repository, and launch the operation once repository + # available + def __main_thread_queue_and_launch(self): + if self.scheduled_operation: + self.logger.debug("Queuing") + self.state=combine_state(self.state, QUEUED) + self.__update_status() + self.repository.queue_action(self._cond, + action=self.__launch_check, + name=self._name) def __next_operation_unlocked(self): # TODO: pruning as well @@ -367,50 +431,6 @@ 'detail': 'normal', 'when_monotonic': self.lastrun_when+self.backup_interval} - def __main_thread(self): - with self._cond: - while not self._terminate: - op=None - if not self.current_operation: - op=self.__next_operation_unlocked() - if not op: - self.__update_status() - self._cond.wait() - else: - now=time.monotonic() - delay=max(0, op['when_monotonic']-now) - logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % - (op['operation'], op['detail'], self._name, delay)) - - self.scheduled_operation=op - self.state=combine_state(self.state, SCHEDULED) - - self.__update_status() - self.scheduler.wait_until(now+delay, self._cond, self._name) - - if self.scheduled_operation: - op=self.scheduled_operation - self.scheduled_operation=None - self.repository.queue_action(self._cond, name=self._name, - action=lambda: self.__launch(op)) - # Kill a running borg to cause log and result threads to terminate - if self.borg_instance: - logger.debug("Terminating a borg instance") - self.borg_instance.terminate() - - # Store threads to use outside lock - thread_log=self.thread_log - thread_err=self.thread_err - - logger.debug("Waiting for log and result threads to terminate") - - if thread_log: - thread_log.join() - - if thread_res: - thread_res.join() - - def __status_unlocked(self): callback=self.__status_update_callback @@ -421,7 +441,10 @@ else: if self.scheduled_operation: status=self.scheduled_operation - status['type']='scheduled' + if self.state==QUEUED: + status['type']='queued' + else: + status['type']='scheduled' else: status={'type': 'nothing'} @@ -440,11 +463,17 @@ def __update_status(self): status, callback = self.__status_unlocked() if callback: - self._cond.release() + #self._cond.release() try: - callback(self, status) - finally: - self._cond.acquire() + callback(status) + except Exception: + self.logger.exception("Status update error") + #finally: + # self._cond.acquire() + + # + # Interface functions + # def set_status_update_callback(self, callback): with self._cond: @@ -455,3 +484,21 @@ res=self.__status_unlocked() return res[0] + def create(self): + op={'operation': 'create', 'detail': 'manual'} + with self._cond: + self.scheduled_operation=op + self._cond.notify() + + def prune(self): + op={'operation': 'prune', 'detail': 'manual'} + with self._cond: + self.scheduled_operation=op + self._cond.notify() + + # TODO: Decide exact (manual) abort mechanism. Perhaps two stages + def abort(self): + with self._cond: + if self.borg_instance: + self.borg_instance.terminate() + diff -r cfcaa5f6ba33 -r 407af23d16bb scheduler.py --- a/scheduler.py Mon Jan 22 22:23:01 2018 +0000 +++ b/scheduler.py Tue Jan 23 22:53:51 2018 +0000 @@ -113,7 +113,7 @@ while self._list and self._list.when <= now: ev=self._list - logger.info("%s: Scheduling event %s" % self.name(), str(ev.name)) + logger.debug("Scheduling event %s" % (ev.name or "(unknown)")) # We are only allowed to remove ev from list when ev.cond allows with ev.cond: self._list=ev.next diff -r cfcaa5f6ba33 -r 407af23d16bb ui.py --- a/ui.py Mon Jan 22 22:23:01 2018 +0000 +++ b/ui.py Tue Jan 23 22:53:51 2018 +0000 @@ -17,6 +17,7 @@ traynames={ backup.INACTIVE: 'B.', backup.SCHEDULED: 'B.', + backup.QUEUED: 'B:', backup.ACTIVE: 'B!', backup.BUSY: 'B⦙', backup.OFFLINE: 'B⦙', @@ -26,6 +27,7 @@ statestring={ backup.INACTIVE: 'inactive', backup.SCHEDULED: 'scheduled', + backup.QUEUED: 'queued', backup.ACTIVE: 'active', backup.BUSY: 'busy', backup.OFFLINE: 'offline', @@ -95,6 +97,8 @@ if status['detail']!='normal': detail=detail+status['detail']+' ' title="%s (%s%s %s)" % (status['name'], detail, status['operation'], whenstr) + elif status['type']=='queued': + title="%s (queued)" % status['name'] elif status['type']=='current': # Operation running progress='' @@ -130,8 +134,8 @@ b=backups[index] # Python closures suck dog's balls; hence the _index=index hack # See also http://math.andrej.com/2009/04/09/pythons-lambda-is-broken/ - cb=(lambda obj, status, _index=index, errors=None: - self.__status_callback(obj, _index, status, errors)) + cb=(lambda status, errors=None, _index=index: + self.__status_callback(_index, status, errorlog=errors)) b.set_status_update_callback(cb) self.statuses[index]=b.status() @@ -177,7 +181,7 @@ self.menu.update(menu) self.title=traynames[active] - def __status_callback(self, obj, index, status, errorlog): + def __status_callback(self, index, status, errorlog=None): logger.debug('Status callback: %s' % str(status)) with self.lock: