diff -r be3ed25df789 -r db33dfa64ad6 backup.py --- a/backup.py Mon Jan 22 12:04:19 2018 +0000 +++ b/backup.py Mon Jan 22 18:16:51 2018 +0000 @@ -8,8 +8,8 @@ import keyring import borgend from instance import BorgInstance -from queue import Queue -from threading import Thread, Lock, Timer +from threading import Thread, Lock, Condition +from scheduler import TerminableThread logger=borgend.logger.getChild(__name__) @@ -46,14 +46,14 @@ return None -class Backup: +class Backup(TerminableThread): def __decode_config(self, cfg): loc0='backup target %d' % self.identifier - self.name=config.check_string(cfg, 'name', 'Name', loc0) + self._name=config.check_string(cfg, 'name', 'Name', loc0) - self.loc='backup target "%s"' % self.name + self.loc='backup target "%s"' % self._name self.repository=config.check_string(cfg, 'repository', 'Target repository', self.loc) @@ -115,25 +115,26 @@ self.__passphrase=None return self.__passphrase - def __init__(self, identifier, cfg): + def __init__(self, identifier, cfg, scheduler): self.identifier=identifier - - self.__decode_config(cfg) - self.config=config self.lastrun_when=None self.borg_instance=None self.current_operation=None self.thread_log=None self.thread_res=None - self.timer=None self.scheduled_operation=None - self.lock=Lock() self.__status_update_callback=None self.state=INACTIVE + self.scheduler=scheduler + + self.__decode_config(cfg) + + super().__init__(target = self.__main_thread, name = self._name) + self.daemon=True def is_running(self): - with self.lock: + with self._cond: running=self.__is_running_unlocked() return running @@ -163,7 +164,7 @@ current=safe_get_int(status, 'current') total=safe_get_int(status, 'total') if current is not None and total is not None: - with self.lock: + with self._cond: self.current_operation['progress_current']=current self.current_operation['progress_total']=total status, callback=self.__status_unlocked() @@ -173,7 +174,7 @@ compressed_size=safe_get_int(status, 'compressed_size') deduplicated_size=safe_get_int(status, 'deduplicated_size') if original_size is not None and original_size is not None and deduplicated_size is not None: - with self.lock: + with self._cond: self.current_operation['original_size']=original_size self.current_operation['compressed_size']=compressed_size self.current_operation['deduplicated_size']=deduplicated_size @@ -202,7 +203,7 @@ status['msgid']=='LockErrorT' or # in docs status['msgid']=='LockErrorT')): # in docs state=BUSY - with self.lock: + with self._cond: self.state=combine_state(self.state, state) status, callback=self.__status_unlocked() else: @@ -218,7 +219,7 @@ logger.debug('Borg subprocess terminated; terminating log listener thread') def __result_listener(self): - with self.lock: + with self._cond: status, callback=self.__status_unlocked() if callback: callback(self, status) @@ -230,7 +231,7 @@ # Finish processing remaining errors self.thread_log.join() - with self.lock: + with self._cond: state=self.state # If there were no errors, reset back to INACTIVE state @@ -251,7 +252,7 @@ logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) - with self.lock: + with self._cond: if self.current_operation['operation']=='create': self.lastrun_when=self.current_operation['when_monotonic'] self.thread_res=None @@ -259,12 +260,9 @@ self.borg_instance=None self.current_operation=None self.state=state - self.__schedule_unlocked() - status, callback=self.__status_unlocked() - if callback: - callback(self, status) + self._cond.notify() - def __do_launch(self, queue, op, archive_or_repository, *args): + def __do_launch(self, op, archive_or_repository, *args): passphrase=self.extract_passphrase() inst=BorgInstance(op['operation'], archive_or_repository, *args) @@ -281,7 +279,6 @@ self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst - self.queue=queue self.current_operation=op self.current_operation['when_monotonic']=time.monotonic() self.state=ACTIVE @@ -289,30 +286,25 @@ t_log.start() t_res.start() - def __launch(self, op, queue): + def __launch(self, op): if self.__is_running_unlocked(): logging.info('Cannot start %s: already running %s' % (operation, self.current_operation)) return False else: - if self.timer: - logger.debug('Unscheduling timed operation due to launch of operation') - self.timer=None - self.scheduled_operation=None - try: - logger.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) + logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) if op['operation']=='create': archive="%s::%s%s" % (self.repository, self.archive_prefix, self.archive_template) - self.__do_launch(queue, op, archive, + self.__do_launch(op, archive, self.common_parameters+self.create_parameters, self.paths) elif op['operation']=='prune': - self.__do_launch(queue, op, self.repository, + self.__do_launch(op, self.repository, ([{'prefix': self.archive_prefix}] + self.common_parameters + self.prune_parameters)) @@ -322,66 +314,27 @@ logger.debug('Rescheduling after failure') self.lastrun_when=time.monotonic() self.state=ERRORS - self.__schedule_unlocked() raise err return True - def create(self, queue): + def create(self): op={'operation': 'create', 'detail': 'manual'} - with self.lock: - res=self.__launch(op, queue) - return res + with self._cond: + self.scheduled_operation=op + self._cond.notify() - def prune(self, queue): + def prune(self): op={'operation': 'prune', 'detail': 'manual'} - with self.lock: - res=self.__launch(op, queue) - return res + with self._cond: + self.scheduled_operation=op + self._cond.notify() # TODO: Decide exact (manual) abort mechanism. Perhaps two stages def abort(self): - with self.lock: + with self._cond: if self.borg_instance: self.borg_instance.terminate() - #thread_log=self.thread_log - #thread_res=self.thread_res - - #if thread_log: - # thread_log.terminate() - - #if thread_res: - # thread_res.terminate() - - - def join(self): - logger.debug('Waiting for borg listener threads to terminate') - - with self.lock: - thread_log=self.thread_log - thread_res=self.thread_res - - if thread_log: - thread_log.join() - - if thread_res: - thread_res.join() - - assert(self.thread_log==None and self.thread_res==None) - - def __queue_timed_operation(self): - with self.lock: - op=self.scheduled_operation - self.scheduled_operation=None - self.timer=None - - if self.__is_running_unlocked(): - logger.info('Aborted queue operation, as an operation is already running') - else: - # TODO: Queue on 'repository' and online status for SSH, etc. - - # TODO: UI comms. queue? - self.__launch(op, None) def __next_operation_unlocked(self): # TODO: pruning as well @@ -411,33 +364,49 @@ 'detail': 'normal', 'when_monotonic': self.lastrun_when+self.backup_interval} - def __schedule_unlocked(self): - if self.current_operation: - return self.current_operation - else: - op=self.__next_operation_unlocked() + def __main_thread(self): + with self._cond: + while not self._terminate: + op=None + if not self.current_operation: + op=self.__next_operation_unlocked() + if not op: + self.__update_status() + self._cond.wait() + else: + now=time.monotonic() + delay=max(0, op['when_monotonic']-now) + logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % + (op['operation'], op['detail'], self._name, delay)) + + self.scheduled_operation=op + self.state=combine_state(self.state, SCHEDULED) + + self.__update_status() + self.scheduler.wait_until(now+delay, self._cond, self._name) - if op: - now=time.monotonic() - delay=max(0, op['when_monotonic']-now) - logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % - (op['operation'], op['detail'], self.name, delay)) - tmr=Timer(delay, self.__queue_timed_operation) - self.scheduled_operation=op - self.timer=tmr - self.state=combine_state(self.state, SCHEDULED) - tmr.start() + if self.scheduled_operation: + op=self.scheduled_operation + self.scheduled_operation=None + self.__launch(op) + + # Kill a running borg to cause log and result threads to terminate + if self.borg_instance: + logger.debug("Terminating a borg instance") + self.borg_instance.terminate() - return op + # Store threads to use outside lock + thread_log=self.thread_log + thread_err=self.thread_err + + logger.debug("Waiting for log and result threads to terminate") - def schedule(self): - with self.lock: - return self.__schedule_unlocked() + if thread_log: + thread_log.join() - def status(self): - with self.lock: - res=self.__status_unlocked() - return res[0] + if thread_res: + thread_res.join() + def __status_unlocked(self): callback=self.__status_update_callback @@ -453,7 +422,7 @@ else: status={'type': 'nothing'} - status['name']=self.name + status['name']=self._name status['state']=self.state if 'detail' not in status: @@ -465,8 +434,21 @@ return status, callback + def __update_status(self): + status, callback = self.__status_unlocked() + if callback: + self._cond.release() + try: + callback(self, status) + finally: + self._cond.acquire() + def set_status_update_callback(self, callback): - with self.lock: + with self._cond: self.__status_update_callback=callback + def status(self): + with self._cond: + res=self.__status_unlocked() + return res[0]