--- a/backup.py Sat Jan 20 14:04:51 2018 +0000 +++ b/backup.py Sat Jan 20 15:08:16 2018 +0000 @@ -7,7 +7,7 @@ import time from instance import BorgInstance from queue import Queue -from threading import Thread, Lock +from threading import Thread, Lock, Timer loglevel_translation={ 'CRITICAL': logging.CRITICAL, @@ -75,12 +75,23 @@ self.borg_instance=None self.current_operation=None self.thread_log=None - self.thread_err=None + self.thread_res=None + self.timer=None + self.timer_operation=None + self.timer_time=None self.lock=Lock() def is_running(self): with self.lock: - running=self.borg_instance or self.thread_log or self.thread_err + running=self.__is_running_unlocked() + return running + + def __is_running_unlocked(self): + running=self.current_operation + if not running: + # Consistency check + assert((not self.borg_instance and not self.thread_log and + not self.thread_res)) return running def __block_when_running(self): @@ -124,11 +135,6 @@ elif t=='unparsed_error': success=False - #if (may_indicate_finished and 'finished' in status and - # status['finished']): - # logging.info('Borg subprocess finished succesfully') - # success=status['finished'] - logging.debug('Waiting for borg subprocess to terminate in log thread') self.borg_instance.wait() @@ -137,7 +143,7 @@ with self.lock: self.thread_log=None - self.__cleanup_if_both_listeners_terminated() + self.__finish_and_reschedule_if_both_listeners_terminated() def __result_listener(self): @@ -154,26 +160,26 @@ logging.debug('Waiting for borg subprocess to terminate in result thread') - self.borg_instance.wait() + success=success and self.borg_instance.wait() - logging.debug('Borg subprocess terminated; terminating result listener thread') + logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) with self.lock: if self.current_operation=='create': self.lastrun=self.time_started self.lastrun_success=success self.thread_res=None - self.__cleanup_if_both_listeners_terminated() + self.__finish_and_reschedule_if_both_listeners_terminated() - def __cleanup_if_both_listeners_terminated(self): + def __finish_and_reschedule_if_both_listeners_terminated(self): if self.thread_res==None and self.thread_log==None: logging.debug('Both threads terminated') self.borg_instance=None + self.time_started=None self.current_operation=None - self.time_started=None + self.__schedule_unlocked() - def __launch(self, queue, operation, archive_or_repository, *args): - + def __do_launch(self, queue, operation, archive_or_repository, *args): inst=BorgInstance(operation, archive_or_repository, *args) inst.launch() @@ -193,23 +199,48 @@ t_log.start() t_res.start() - def create(self, queue): - self.__block_when_running() + def __launch(self, operation, queue): + if self.__is_running_unlocked(): + logging.info('Cannot start %s: already running %s' + % (operation, self.current_operation)) + return False + else: + if self.timer: + logging.debug('Unscheduling timed operation due to launch of operation') + self.timer=None + self.timer_operation=None + self.timer_time=None + + logging.debug("Launching '%s' on '%s'" % (operation, self.name)) + + if operation=='create': + archive="%s::%s%s" % (self.repository, + self.archive_prefix, + self.archive_template) - archive="%s::%s%s" % (self.repository, - self.archive_prefix, - self.archive_template) + self.__do_launch(queue, operation, archive, + self.common_parameters+self.create_parameters, + self.paths) + elif operation=='prune': + self.__do_launch(queue, 'prune', self.repository, + ([{'prefix': self.archive_prefix}] + + self.common_parameters + + self.prune_parameters)) + else: + logging.error("Invalid operaton '%s'" % operation) + self.__schedule_unlocked() - self.__launch(queue, 'create', archive, - self.common_parameters+self.create_parameters, - self.paths) + return True + + def create(self, queue): + with self.lock: + res=self.__launch('create', queue) + return res def prune(self, queue): - self.__block_when_running() - self.__launch(queue, 'prune', self.repository, - ([{'prefix': self.archive_prefix}] + - self.common_parameters + - self.prune_parameters)) + with self.lock: + res=self.__launch('prune', queue) + return res # TODO: Decide exact (manual) abort mechanism. Perhaps two stages def abort(self): @@ -227,7 +258,7 @@ def join(self): - logging.debug('Waiting for borg listener thread to terminate') + logging.debug('Waiting for borg listener threads to terminate') with self.lock: thread_log=self.thread_log @@ -241,9 +272,42 @@ assert(self.thread_log==None and self.thread_res==None) - def next_action(): - __block_when_running() - # TODO pruning as well + def __queue_timed_operation(self): + with self.lock: + operation=self.timer_operation + self.timer_operation=None + self.timer_time=None + self.timer=None + + if self.__is_running_unlocked(): + logging.info('Aborted queue operation, as an operation is already running') + else: + # TODO: Queue on 'repository' and online status for SSH, etc. + + # TODO: UI comms. queue? + self.__launch(operation, None) + + def __schedule_unlocked(self): + if self.current_operation: + return self.current_operation, None + else: + operation, when=self.__next_operation_unlocked() + + if operation: + now=time.monotonic() + delay=max(0, when-now) + logging.info("Scheduling '%s' of '%s' in %d seconds" % + (operation, self.name, delay)) + tmr=Timer(delay, self.__queue_timed_operation) + self.timer_operation=operation + self.timer_time=when + self.timer=tmr + tmr.start() + + return operation, time + + def __next_operation_unlocked(self): + # TODO: pruning as well now=time.monotonic() if not self.lastrun: return 'create', now+self.retry_interval @@ -255,4 +319,8 @@ else: return 'create', self.lastrun+self.backup_interval + def schedule(self): + with self.lock: + return self.__schedule_unlocked() +