# HG changeset patch # User Tuomo Valkonen # Date 1516645011 0 # Node ID db33dfa64ad69666e884cc887ff3f239c7fd178d # Parent be3ed25df789630808c6ae00f9f1b025ac7acb71 Improved scheduler diff -r be3ed25df789 -r db33dfa64ad6 backup.py --- a/backup.py Mon Jan 22 12:04:19 2018 +0000 +++ b/backup.py Mon Jan 22 18:16:51 2018 +0000 @@ -8,8 +8,8 @@ import keyring import borgend from instance import BorgInstance -from queue import Queue -from threading import Thread, Lock, Timer +from threading import Thread, Lock, Condition +from scheduler import TerminableThread logger=borgend.logger.getChild(__name__) @@ -46,14 +46,14 @@ return None -class Backup: +class Backup(TerminableThread): def __decode_config(self, cfg): loc0='backup target %d' % self.identifier - self.name=config.check_string(cfg, 'name', 'Name', loc0) + self._name=config.check_string(cfg, 'name', 'Name', loc0) - self.loc='backup target "%s"' % self.name + self.loc='backup target "%s"' % self._name self.repository=config.check_string(cfg, 'repository', 'Target repository', self.loc) @@ -115,25 +115,26 @@ self.__passphrase=None return self.__passphrase - def __init__(self, identifier, cfg): + def __init__(self, identifier, cfg, scheduler): self.identifier=identifier - - self.__decode_config(cfg) - self.config=config self.lastrun_when=None self.borg_instance=None self.current_operation=None self.thread_log=None self.thread_res=None - self.timer=None self.scheduled_operation=None - self.lock=Lock() self.__status_update_callback=None self.state=INACTIVE + self.scheduler=scheduler + + self.__decode_config(cfg) + + super().__init__(target = self.__main_thread, name = self._name) + self.daemon=True def is_running(self): - with self.lock: + with self._cond: running=self.__is_running_unlocked() return running @@ -163,7 +164,7 @@ current=safe_get_int(status, 'current') total=safe_get_int(status, 'total') if current is not None and total is not None: - with self.lock: + with self._cond: self.current_operation['progress_current']=current self.current_operation['progress_total']=total status, callback=self.__status_unlocked() @@ -173,7 +174,7 @@ compressed_size=safe_get_int(status, 'compressed_size') deduplicated_size=safe_get_int(status, 'deduplicated_size') if original_size is not None and original_size is not None and deduplicated_size is not None: - with self.lock: + with self._cond: self.current_operation['original_size']=original_size self.current_operation['compressed_size']=compressed_size self.current_operation['deduplicated_size']=deduplicated_size @@ -202,7 +203,7 @@ status['msgid']=='LockErrorT' or # in docs status['msgid']=='LockErrorT')): # in docs state=BUSY - with self.lock: + with self._cond: self.state=combine_state(self.state, state) status, callback=self.__status_unlocked() else: @@ -218,7 +219,7 @@ logger.debug('Borg subprocess terminated; terminating log listener thread') def __result_listener(self): - with self.lock: + with self._cond: status, callback=self.__status_unlocked() if callback: callback(self, status) @@ -230,7 +231,7 @@ # Finish processing remaining errors self.thread_log.join() - with self.lock: + with self._cond: state=self.state # If there were no errors, reset back to INACTIVE state @@ -251,7 +252,7 @@ logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) - with self.lock: + with self._cond: if self.current_operation['operation']=='create': self.lastrun_when=self.current_operation['when_monotonic'] self.thread_res=None @@ -259,12 +260,9 @@ self.borg_instance=None self.current_operation=None self.state=state - self.__schedule_unlocked() - status, callback=self.__status_unlocked() - if callback: - callback(self, status) + self._cond.notify() - def __do_launch(self, queue, op, archive_or_repository, *args): + def __do_launch(self, op, archive_or_repository, *args): passphrase=self.extract_passphrase() inst=BorgInstance(op['operation'], archive_or_repository, *args) @@ -281,7 +279,6 @@ self.thread_log=t_log self.thread_res=t_res self.borg_instance=inst - self.queue=queue self.current_operation=op self.current_operation['when_monotonic']=time.monotonic() self.state=ACTIVE @@ -289,30 +286,25 @@ t_log.start() t_res.start() - def __launch(self, op, queue): + def __launch(self, op): if self.__is_running_unlocked(): logging.info('Cannot start %s: already running %s' % (operation, self.current_operation)) return False else: - if self.timer: - logger.debug('Unscheduling timed operation due to launch of operation') - self.timer=None - self.scheduled_operation=None - try: - logger.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) + logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) if op['operation']=='create': archive="%s::%s%s" % (self.repository, self.archive_prefix, self.archive_template) - self.__do_launch(queue, op, archive, + self.__do_launch(op, archive, self.common_parameters+self.create_parameters, self.paths) elif op['operation']=='prune': - self.__do_launch(queue, op, self.repository, + self.__do_launch(op, self.repository, ([{'prefix': self.archive_prefix}] + self.common_parameters + self.prune_parameters)) @@ -322,66 +314,27 @@ logger.debug('Rescheduling after failure') self.lastrun_when=time.monotonic() self.state=ERRORS - self.__schedule_unlocked() raise err return True - def create(self, queue): + def create(self): op={'operation': 'create', 'detail': 'manual'} - with self.lock: - res=self.__launch(op, queue) - return res + with self._cond: + self.scheduled_operation=op + self._cond.notify() - def prune(self, queue): + def prune(self): op={'operation': 'prune', 'detail': 'manual'} - with self.lock: - res=self.__launch(op, queue) - return res + with self._cond: + self.scheduled_operation=op + self._cond.notify() # TODO: Decide exact (manual) abort mechanism. Perhaps two stages def abort(self): - with self.lock: + with self._cond: if self.borg_instance: self.borg_instance.terminate() - #thread_log=self.thread_log - #thread_res=self.thread_res - - #if thread_log: - # thread_log.terminate() - - #if thread_res: - # thread_res.terminate() - - - def join(self): - logger.debug('Waiting for borg listener threads to terminate') - - with self.lock: - thread_log=self.thread_log - thread_res=self.thread_res - - if thread_log: - thread_log.join() - - if thread_res: - thread_res.join() - - assert(self.thread_log==None and self.thread_res==None) - - def __queue_timed_operation(self): - with self.lock: - op=self.scheduled_operation - self.scheduled_operation=None - self.timer=None - - if self.__is_running_unlocked(): - logger.info('Aborted queue operation, as an operation is already running') - else: - # TODO: Queue on 'repository' and online status for SSH, etc. - - # TODO: UI comms. queue? - self.__launch(op, None) def __next_operation_unlocked(self): # TODO: pruning as well @@ -411,33 +364,49 @@ 'detail': 'normal', 'when_monotonic': self.lastrun_when+self.backup_interval} - def __schedule_unlocked(self): - if self.current_operation: - return self.current_operation - else: - op=self.__next_operation_unlocked() + def __main_thread(self): + with self._cond: + while not self._terminate: + op=None + if not self.current_operation: + op=self.__next_operation_unlocked() + if not op: + self.__update_status() + self._cond.wait() + else: + now=time.monotonic() + delay=max(0, op['when_monotonic']-now) + logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % + (op['operation'], op['detail'], self._name, delay)) + + self.scheduled_operation=op + self.state=combine_state(self.state, SCHEDULED) + + self.__update_status() + self.scheduler.wait_until(now+delay, self._cond, self._name) - if op: - now=time.monotonic() - delay=max(0, op['when_monotonic']-now) - logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % - (op['operation'], op['detail'], self.name, delay)) - tmr=Timer(delay, self.__queue_timed_operation) - self.scheduled_operation=op - self.timer=tmr - self.state=combine_state(self.state, SCHEDULED) - tmr.start() + if self.scheduled_operation: + op=self.scheduled_operation + self.scheduled_operation=None + self.__launch(op) + + # Kill a running borg to cause log and result threads to terminate + if self.borg_instance: + logger.debug("Terminating a borg instance") + self.borg_instance.terminate() - return op + # Store threads to use outside lock + thread_log=self.thread_log + thread_err=self.thread_err + + logger.debug("Waiting for log and result threads to terminate") - def schedule(self): - with self.lock: - return self.__schedule_unlocked() + if thread_log: + thread_log.join() - def status(self): - with self.lock: - res=self.__status_unlocked() - return res[0] + if thread_res: + thread_res.join() + def __status_unlocked(self): callback=self.__status_update_callback @@ -453,7 +422,7 @@ else: status={'type': 'nothing'} - status['name']=self.name + status['name']=self._name status['state']=self.state if 'detail' not in status: @@ -465,8 +434,21 @@ return status, callback + def __update_status(self): + status, callback = self.__status_unlocked() + if callback: + self._cond.release() + try: + callback(self, status) + finally: + self._cond.acquire() + def set_status_update_callback(self, callback): - with self.lock: + with self._cond: self.__status_update_callback=callback + def status(self): + with self._cond: + res=self.__status_unlocked() + return res[0] diff -r be3ed25df789 -r db33dfa64ad6 borgend.py --- a/borgend.py Mon Jan 22 12:04:19 2018 +0000 +++ b/borgend.py Mon Jan 22 18:16:51 2018 +0000 @@ -7,7 +7,6 @@ import argparse import platform import utils -from fifolog import FIFOHandler # # Branding @@ -30,14 +29,23 @@ logger=logging.getLogger(appname) logger.setLevel(loglevel) +stderrlog=logging.StreamHandler() +logger.addHandler(stderrlog) logger.propagate=True # +# Import our own modules. This needs to be done here +# for the things above to be available to them +# + +import config +from scheduler import Scheduler +from fifolog import FIFOHandler + +# # Argument processing # -import config - def do_args(): parser=argparse.ArgumentParser( description=appname_stylised + ': BorgBackup scheduler and tray icon.', @@ -86,11 +94,15 @@ backupconfigs=config.settings['backups'] backups=[None]*len(backupconfigs); + scheduler = Scheduler() + try: + scheduler.start() + for i in range(len(backupconfigs)): logger.info('Setting up backup set %d' % i) - backups[i]=Backup(i, backupconfigs[i]) - backups[i].schedule() + backups[i]=Backup(i, backupconfigs[i], scheduler) + backups[i].start() if args.notray or platform.system()!='Darwin': pass @@ -104,6 +116,8 @@ backups[i].abort() backups=[] except Exception as err: + # TODO: Should write errors here to stderr; + # perhaps add an extra stderr logger for error level messages utils.log_exception(logger, err, detail='Exiting') if tray: tray.quit() diff -r be3ed25df789 -r db33dfa64ad6 scheduler.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scheduler.py Mon Jan 22 18:16:51 2018 +0000 @@ -0,0 +1,125 @@ +# +# Scheduler for Borgend +# +# This module simply provide a way for other threads to until a given time +# + +import time +import borgend +from threading import Condition, Lock, Thread + +logger=borgend.logger.getChild(__name__) + +class ScheduledEvent: + def __init__(self, when, cond, name=None): + self.next=None + self.prev=None + self.when=when + self.name=name + self.cond=Condition() + + def __lt__(self, other): + return self.when < other.when + + def __gt__(self, other): + return self.when > other.when + + def insert_after(self, ev): + if not self.next: + ev.prev=self + self.next=ev + ev.next=None + elif self.next>ev: + self.insert_immediately_after(ev) + else: + self.next.insert_after(ev) + + def insert_immediately_after(self, ev): + ev.prev=self + ev.next=self.next + if ev.next: + ev.next.prev=ev + self.next=ev + + def unlink(self): + n=self.next + p=self.prev + if n: + n.prev=p + if p: + p.next=n + +class TerminableThread(Thread): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._terminate=False + self._cond=Condition() + + def terminate(self): + with self._cond: + _terminate=True + self._cond.notify() + + +class Scheduler(TerminableThread): + # Default to precision of 60 seconds: the scheduler thread will never + # sleep longer than that, to get quickly back on track with the schedule + # when the computer wakes up from sleep + def __init__(self, precision=60): + self.precision = precision + self.__next_event_time = None + self.__list = None + self._cond = Condition() + self._terminate = False + super().__init__(target = self.__scheduler_thread, name = 'Scheduler') + self.daemon=True + + def __scheduler_thread(self): + with self._cond: + while not self._terminate: + now = time.monotonic() + if not self.__list: + timeout = None + else: + # Wait at most precision seconds, or until next event if it + # comes earlier + timeout=min(self.precision, self.__list.when-now) + + if not timeout or timeout>0: + self._cond.wait(timeout) + now = time.monotonic() + + while self.__list and self.__list.when <= now: + ev=self.__list + logger.info("Found schedulable event %s" % str(ev.name)) + self.__list=ev.next + ev.unlink() + ev.cond.acquire() + ev.cond.notifyAll() + ev.cond.release() + + # cond has to be acquired on entry! + def wait_until(self, when, cond, name=None): + ev=ScheduledEvent(when, cond, name) + with self._cond: + if not self.__list: + self.__list=ev + elif self.__list > ev: + ev.insert_immediately_after(self.__list) + self.__list=ev + else: + self.__list.insert_immediately_after(ev) + + self._cond.notify() + # This will release the lock on cond, allowing scheduler thread + # to notify us if we are already to be released + cond.wait() + + # If we were woken up by some other event, not the scheduler, + # ensure the event is removed + with self._cond: + if ev==self.__list: + self.__list=ev.next + ev.unlink() + + diff -r be3ed25df789 -r db33dfa64ad6 ui.py --- a/ui.py Mon Jan 22 12:04:19 2018 +0000 +++ b/ui.py Mon Jan 22 18:16:51 2018 +0000 @@ -206,7 +206,7 @@ #sender.state=not sender.state logger.debug("Manually backup '%s'", b.name) try: - b.create(None) + b.create() except Exception as err: utils.log_exception(logger, err) notification_workaround(borgend.appname_stylised,