--- a/backup.py Wed Jan 24 09:19:42 2018 +0000 +++ b/backup.py Wed Jan 24 20:18:45 2018 +0000 @@ -8,30 +8,41 @@ import keyring import borgend import repository +from enum import IntEnum from instance import BorgInstance from threading import Thread, Lock, Condition from scheduler import TerminableThread logger=borgend.logger.getChild(__name__) -# State -INACTIVE=0 -SCHEDULED=1 -QUEUED=2 -ACTIVE=3 -BUSY=4 -OFFLINE=5 -ERRORS=6 +class State(IntEnum): + # State + INACTIVE=0 + SCHEDULED=1 + QUEUED=2 + ACTIVE=3 + + +class Errors(IntEnum): + OK=0 + BUSY=1 + OFFLINE=2 + ERRORS=3 -def combine_state(state1, state2): - return max(state1, state2) + def combine(self, other): + return max(self, other) + + def ok(self): + return self==self.OK -loglevel_translation={ - 'CRITICAL': logging.CRITICAL, - 'ERROR': logging.ERROR, - 'WARNING': logging.WARNING, - 'DEBUG': logging.DEBUG, - 'INFO': logging.INFO + def __str__(self): + return _errorstring[self] + +_errorstring={ + Errors.OK: 'ok', + Errors.BUSY: 'busy', + Errors.OFFLINE: 'offline', + Errors.ERRORS: 'errors' } def translate_loglevel(x): @@ -47,6 +58,13 @@ return tmp return None +loglevel_translation={ + 'CRITICAL': logging.CRITICAL, + 'ERROR': logging.ERROR, + 'WARNING': logging.WARNING, + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO +} class Backup(TerminableThread): @@ -135,7 +153,8 @@ self.current_operation=None self.scheduled_operation=None self.lastrun_when=None - self.state=INACTIVE + self.state=State.INACTIVE + self.errors=Errors.OK self.__decode_config(cfg) @@ -162,16 +181,16 @@ def __log_listener(self): self.logger.debug('Log listener thread waiting for entries') success=True - for status in iter(self.borg_instance.read_log, None): - self.logger.debug(str(status)) - t=status['type'] + for msg in iter(self.borg_instance.read_log, None): + self.logger.debug(str(msg)) + t=msg['type'] - errors_this_message=None + errormsg=None callback=None if t=='progress_percent': - current=safe_get_int(status, 'current') - total=safe_get_int(status, 'total') + current=safe_get_int(msg, 'current') + total=safe_get_int(msg, 'total') if current is not None and total is not None: with self._cond: self.current_operation['progress_current']=current @@ -179,9 +198,9 @@ status, callback=self.__status_unlocked() elif t=='archive_progress': - original_size=safe_get_int(status, 'original_size') - compressed_size=safe_get_int(status, 'compressed_size') - deduplicated_size=safe_get_int(status, 'deduplicated_size') + original_size=safe_get_int(msg, 'original_size') + compressed_size=safe_get_int(msg, 'compressed_size') + deduplicated_size=safe_get_int(msg, 'deduplicated_size') if original_size is not None and original_size is not None and deduplicated_size is not None: with self._cond: self.current_operation['original_size']=original_size @@ -196,30 +215,30 @@ pass elif t=='log_message': - if 'levelname' not in status: - status['levelname']='ERROR' - if 'message' not in status: - status['message']='UNKNOWN' - if 'name' not in status: - status['name']='borg' - lvl=translate_loglevel(status['levelname']) - self.logger.log(lvl, status['name'] + ': ' + status['message']) + if 'levelname' not in msg: + msg['levelname']='ERROR' + if 'message' not in msg: + msg['message']='UNKNOWN' + if 'name' not in msg: + msg['name']='borg' + lvl=translate_loglevel(msg['levelname']) + self.logger.log(lvl, msg['name'] + ': ' + msg['message']) if lvl>=logging.WARNING: - errors_this_message=status - state=ERRORS - if ('msgid' in status and - (status['msgid']=='LockTimeout' or # observed in reality - status['msgid']=='LockErrorT' or # in docs - status['msgid']=='LockErrorT')): # in docs - state=BUSY + errormsg=msg + errors=Errors.ERRORS + if ('msgid' in msg and + (msg['msgid']=='LockTimeout' or # observed in reality + msg['msgid']=='LockErrorT' or # in docs + msg['msgid']=='LockErrorT')): # in docs + errors=Errors.BUSY with self._cond: - self.state=combine_state(self.state, state) + self.errors=self.errors.combine(errors) status, callback=self.__status_unlocked() else: self.logger.debug('Unrecognised log entry %s' % str(status)) if callback: - callback(status, errors=errors_this_message) + callback(status, errors=errormsg) self.logger.debug('Waiting for borg subprocess to terminate in log thread') @@ -228,12 +247,6 @@ self.logger.debug('Borg subprocess terminated; terminating log listener thread') def __result_listener(self): - # self.state=ACTIVE - # with self._cond: - # status, callback=self.__status_unlocked() - # if callback: - # callback(status) - self.logger.debug('Result listener thread waiting for result') res=self.borg_instance.read_result() @@ -242,25 +255,23 @@ self.thread_log.join() with self._cond: - state=self.state - - # If there were no errors, reset back to INACTIVE state - if state==ACTIVE: - state=INACTIVE + errors=self.errors self.logger.debug('Borg result: %s' % str(res)) - if res is None and state==INACTIVE: + if res is None: self.logger.error('No result from borg despite no error in log') - state=ERRORS + if errors.ok(): + errors=Errors.ERRORS self.logger.debug('Waiting for borg subprocess to terminate in result thread') if not self.borg_instance.wait(): self.logger.error('Borg subprocess did not terminate') - state=combine_state(state, ERRORS) + if errors.ok(): + errors=Errors.ERRORS - self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) + self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors)) with self._cond: if self.current_operation['operation']=='create': @@ -269,7 +280,8 @@ self.thread_log=None self.borg_instance=None self.current_operation=None - self.state=state + self.state=State.INACTIVE + self.errors=errors self.__update_status() self._cond.notify() @@ -313,6 +325,7 @@ else: raise NotImplementedError("Invalid operation '%s'" % op['operation']) + # This must be called with self._cond held. def __launch_check(self): op=self.scheduled_operation if not op: @@ -324,7 +337,9 @@ self.current_operation=op self.current_operation['when_monotonic']=time.monotonic() - self.state=ACTIVE + self.state=State.ACTIVE + # Reset error status when starting a new operation + self.errors=Errors.OK self.__update_status() @@ -339,9 +354,10 @@ self.__main_thread_queue_and_launch() except Exception as err: self.logger.exception("Error with backup '%s'" % self._name) - self.lastrun_when=time.monotonic() - self.state=ERRORS - self.scheduled_operation=None + self.errors=Errors.ERRORS + + self.state=State.INACTIVE + self.scheduled_operation=None # Clean up to terminate: kill borg instance and communication threads if self.borg_instance: @@ -351,6 +367,8 @@ # Store threads to use outside lock thread_log=self.thread_log thread_res=self.thread_res + self.thread_log=None + self.thread_res=None self.logger.debug("Waiting for log and result threads to terminate") @@ -380,14 +398,18 @@ (op['operation'], op['detail'], delay)) self.scheduled_operation=op - self.state=combine_state(self.state, SCHEDULED) + self.state=State.SCHEDULED self.__update_status() # Wait under scheduled wait self.scheduler.wait_until(now+delay, self._cond, self._name) else: # Nothing scheduled - just wait - self.logger.debug("Waiting for manual scheduling") + self.logger.info("Waiting for manual scheduling") + + self.state=State.INACTIVE + self.__update_status() + self._cond.wait() # Main thread/3. If there is a scheduled operation (it might have been @@ -397,7 +419,7 @@ def __main_thread_queue_and_launch(self): if self.scheduled_operation: self.logger.debug("Queuing") - self.state=combine_state(self.state, QUEUED) + self.state=State.QUEUED self.__update_status() self.repository.queue_action(self._cond, action=self.__launch_check, @@ -416,7 +438,7 @@ return {'operation': 'create', 'detail': 'initial', 'when_monotonic': now+initial_interval} - elif self.state>=BUSY: + elif not self.errors.ok(): if self.retry_interval==0: return None else: @@ -436,20 +458,14 @@ if self.current_operation: status=self.current_operation - status['type']='current' - # Errors should be set by listeners + elif self.scheduled_operation: + status=self.scheduled_operation else: - if self.scheduled_operation: - status=self.scheduled_operation - if self.state==QUEUED: - status['type']='queued' - else: - status['type']='scheduled' - else: - status={'type': 'nothing'} + status={'type': 'nothing'} status['name']=self._name status['state']=self.state + status['errors']=self.errors if 'detail' not in status: status['detail']='NONE'