Sun, 28 Jan 2018 11:54:46 +0000
Better package-like organisation
--- a/backup.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,537 +0,0 @@ -# -# Borgend Backup instance -# - -import config -import logging -import time -import loggers -import repository -import dreamtime -from enum import IntEnum -from instance import BorgInstance -from threading import Thread, Lock, Condition -from scheduler import TerminableThread - -logger=loggers.get(__name__) - -JOIN_TIMEOUT=60 - -# -# State and operation related helper classes -# - -class State(IntEnum): - # State - INACTIVE=0 - SCHEDULED=1 - QUEUED=2 - ACTIVE=3 - - -class Errors(IntEnum): - OK=0 - BUSY=1 - OFFLINE=2 - ERRORS=3 - - def combine(self, other): - return max(self, other) - - def ok(self): - return self==self.OK - - def __str__(self): - return _errorstring[self] - -_errorstring={ - Errors.OK: 'ok', - Errors.BUSY: 'busy', - Errors.OFFLINE: 'offline', - Errors.ERRORS: 'errors' -} - -class Operation: - CREATE='create' - PRUNE='prune' - def __init__(self, operation, time, **kwargs): - self.operation=operation - self.time=time - self.detail=kwargs - - def when(self): - return self.time.realtime() - - -class Status(Operation): - def __init__(self, backup, op=None): - if op: - super().__init__(op.operation, op.time, **op.detail) - else: - super().__init__(None, None) - - self.name=backup.name - self.state=backup.state - self.errors=backup.errors - -# -# Miscellaneous helper routines -# - -loglevel_translation={ - 'CRITICAL': logging.CRITICAL, - 'ERROR': logging.ERROR, - 'WARNING': logging.WARNING, - 'DEBUG': logging.DEBUG, - 'INFO': logging.INFO -} - -def translate_loglevel(x): - if x in loglevel_translation: - return loglevel_translation[x] - else: - return logging.ERROR - -def safe_get_int(t, x): - if x in t: - tmp=t[x] - if isinstance(tmp, int): - return tmp - return None - -# -# The Backup class -# - -class Backup(TerminableThread): - - def __decode_config(self, cfg): - loc0='Backup %d' % self.identifier - - self.backup_name=config.check_string(cfg, 'name', 'Name', loc0) - - logger.debug("Configuring backup '%s'" % self.backup_name) - - self.logger=logger.getChild(self.backup_name) - - loc="Backup '%s'" % self.backup_name - - reponame=config.check_string(cfg, 'repository', - 'Target repository', loc) - - self.repository=repository.find_repository(reponame) - if not self.repository: - raise Exception("Repository '%s' not configured" % reponame) - - self.archive_prefix=config.check_string(cfg, 'archive_prefix', - 'Archive prefix', loc) - - self.archive_template=config.check_string(cfg, 'archive_template', - 'Archive template', loc) - - self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', - 'Backup interval', loc, - config.defaults['backup_interval']) - - self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', - 'Retry interval', loc, - config.defaults['retry_interval']) - - - scheduling=config.check_string(cfg, 'scheduling', - 'Scheduling mode', loc, - default="dreamtime") - - if scheduling=="dreamtime": - self.timeclass=dreamtime.DreamTime - elif scheduling=="realtime": - self.timeclass=dreamtime.MonotonicTime - elif scheduling=="manual": - self.backup_interval=0 - else: - logging.error("Invalid time class '%s' for %s" % (scheduling, loc)) - - self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) - - self.borg_parameters=config.BorgParameters.from_config(cfg, loc) - - - def __init__(self, identifier, cfg, scheduler): - self.identifier=identifier - self.__status_update_callback=None - self.scheduler=scheduler - self.logger=None # setup up in __decode_config once backup name is known - - self.borg_instance=None - self.thread_log=None - self.thread_res=None - - self.current_operation=None - self.scheduled_operation=None - self.lastrun_when=None - self.lastrun_finished=None - self.state=State.INACTIVE - self.errors=Errors.OK - self.timeclass=dreamtime.DreamTime - - self.__decode_config(cfg) - - super().__init__(target = self.__main_thread, name = self.backup_name) - self.daemon=True - - def is_running(self): - with self._cond: - running=self.__is_running_unlocked() - return running - - def __is_running_unlocked(self): - running=self.current_operation - if not running: - # Consistency check - assert((not self.borg_instance and not self.thread_log and - not self.thread_res)) - return running - - def __block_when_running(self): - running=self.is_running() - assert(not running) - - def __log_listener(self): - self.logger.debug('Log listener thread waiting for entries') - success=True - for msg in iter(self.borg_instance.read_log, None): - self.logger.info(str(msg)) - t=msg['type'] - - errormsg=None - callback=None - - if t=='progress_percent': - current=safe_get_int(msg, 'current') - total=safe_get_int(msg, 'total') - if current is not None and total is not None: - with self._cond: - self.current_operation.detail['progress_current']=current - self.current_operation.detail['progress_total']=total - status, callback=self.__status_unlocked() - - elif t=='archive_progress': - original_size=safe_get_int(msg, 'original_size') - compressed_size=safe_get_int(msg, 'compressed_size') - deduplicated_size=safe_get_int(msg, 'deduplicated_size') - if original_size is not None and original_size is not None and deduplicated_size is not None: - with self._cond: - self.current_operation.detail['original_size']=original_size - self.current_operation.detail['compressed_size']=compressed_size - self.current_operation.detail['deduplicated_size']=deduplicated_size - status, callback=self.__status_unlocked() - - elif t=='progress_message': - pass - - elif t=='file_status': - pass - - elif t=='log_message': - if 'levelname' not in msg: - msg['levelname']='ERROR' - if 'message' not in msg: - msg['message']='UNKNOWN' - if 'name' not in msg: - msg['name']='borg' - lvl=translate_loglevel(msg['levelname']) - self.logger.log(lvl, msg['name'] + ': ' + msg['message']) - if lvl>=logging.ERROR: - errormsg=msg - errors=Errors.ERRORS - if ('msgid' in msg and - (msg['msgid']=='LockTimeout' or # observed in reality - msg['msgid']=='LockErrorT' or # in docs - msg['msgid']=='LockErrorT')): # in docs - errors=Errors.BUSY - with self._cond: - self.errors=self.errors.combine(errors) - status, callback=self.__status_unlocked() - else: - self.logger.debug('Unrecognised log entry %s' % str(status)) - - if callback: - callback(status, errors=errormsg) - - self.logger.debug('Waiting for borg subprocess to terminate in log thread') - - self.borg_instance.wait() - - self.logger.debug('Borg subprocess terminated; terminating log listener thread') - - def __result_listener(self): - self.logger.debug('Result listener thread waiting for result') - - res=self.borg_instance.read_result() - - self.logger.debug('Borg result: %s' % str(res)) - - with self._cond: - if res is None and self.errors.ok(): - self.logger.error('No result from borg despite no error in log') - self.errors=Errors.ERRORS - - - def __do_launch(self, op, archive_or_repository, - common_params, op_params, paths=[]): - - inst=BorgInstance(op.operation, archive_or_repository, - common_params, op_params, paths) - - # Only the Repository object has access to the passphrase - self.repository.launch_borg_instance(inst) - - self.logger.debug('Creating listener threads') - - t_log=Thread(target=self.__log_listener) - t_log.daemon=True - - t_res=Thread(target=self.__result_listener) - t_res.daemon=True - - self.thread_log=t_log - self.thread_res=t_res - self.borg_instance=inst - self.current_operation=op - # Update scheduled time to real starting time to schedule - # next run relative to this - self.current_operation.time=dreamtime.MonotonicTime.now() - self.state=State.ACTIVE - # Reset error status when starting a new operation - self.errors=Errors.OK - self.__update_status() - - t_log.start() - t_res.start() - - - def __launch(self, op): - self.logger.debug("Launching '%s'" % str(op.operation)) - - params=(config.borg_parameters - +self.repository.borg_parameters - +self.borg_parameters) - - if op.operation==Operation.CREATE: - archive="%s::%s%s" % (self.repository.location, - self.archive_prefix, - self.archive_template) - - self.__do_launch(op, archive, params.common, - params.create, self.paths) - elif op.operation==Operation.PRUNE: - self.__do_launch(op, self.repository.location, params.common, - [{'prefix': self.archive_prefix}] + params.create) - - else: - raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) - - # This must be called with self._cond held. - def __launch_and_wait(self): - op=self.scheduled_operation - if not op: - self.logger.debug("Queued operation aborted") - else: - self.scheduled_operation=None - - self.__launch(op) - - self.__wait_finish() - - def __wait_finish(self): - # Wait for main logger thread to terminate, or for us to be terminated - while not self.terminate and self.thread_res.is_alive(): - self._cond.release() - self.thread_res.join(JOIN_TIMEOUT) - self._cond.acquire() - - # If terminate has been signalled, let outer termination handler - # take care of things (Within this Backup class, it would be cleanest - # to raise an exception instead, but in most other places it's better - # to just check self._terminate, so we don't complicate things with - # an extra exception.) - if self._terminate: - return - - self.logger.debug('Waiting for borg and log subprocesses to terminate') - - self._cond.release() - self.thread_log.join() - self._cond.acquire() - - if not self.borg_instance.wait(): - self.logger.error('Borg subprocess did not terminate') - self.errors=self.errors.combine(Errors.ERRORS) - - if self.current_operation.operation=='create': - self.lastrun_when=self.current_operation.time.monotonic() - self.lastrun_finished=time.monotonic() - self.thread_res=None - self.thread_log=None - self.borg_instance=None - self.current_operation=None - self.state=State.INACTIVE - self.__update_status() - - def __main_thread(self): - with self._cond: - try: - while not self._terminate: - assert(not self.current_operation) - self.__main_thread_wait_schedule() - if not self._terminate: - self.__main_thread_queue_and_launch() - except Exception as err: - self.logger.exception("Error with backup '%s'" % self.backup_name) - self.errors=Errors.ERRORS - - self.state=State.INACTIVE - self.scheduled_operation=None - - # Clean up to terminate: kill borg instance and communication threads - if self.borg_instance: - self.logger.debug("Terminating a borg instance") - self.borg_instance.terminate() - - # Store threads to use outside lock - thread_log=self.thread_log - thread_res=self.thread_res - self.thread_log=None - self.thread_res=None - - self.logger.debug("Waiting for log and result threads to terminate") - - if thread_log: - thread_log.join() - - if thread_res: - thread_res.join() - - # Main thread/2. Schedule next operation if there is no manually - # requested one - def __main_thread_wait_schedule(self): - op=None - if not self.scheduled_operation: - op=self.__next_operation_unlocked() - if op: - self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" % - (str(op.operation), op.detail or 'none', - op.time.seconds_to(), - op.time.__class__.__name__)) - - self.scheduled_operation=op - self.state=State.SCHEDULED - self.__update_status() - - # Wait under scheduled wait - self.scheduler.wait_until(op.time, self._cond, self.backup_name) - else: - # Nothing scheduled - just wait - self.logger.info("Waiting for manual scheduling") - - self.state=State.INACTIVE - self.__update_status() - - self._cond.wait() - - # Main thread/3. If there is a scheduled operation (it might have been - # changed manually from 'op' created in __main_thread_wait_schedule above), - # queue it on the repository, and launch the operation once repository - # available - def __main_thread_queue_and_launch(self): - if self.scheduled_operation: - self.logger.debug("Queuing") - self.state=State.QUEUED - self.__update_status() - res=self.repository.queue_action(self._cond, - action=self.__launch_and_wait, - name=self.backup_name) - if not res and not self._terminate: - self.logger.debug("Queueing aborted") - self.scheduled_operation=None - self.state=State.INACTIVE - self.__update_status() - - def __next_operation_unlocked(self): - # TODO: pruning as well - if not self.lastrun_finished: - initial_interval=self.retry_interval - if initial_interval==0: - initial_interval=self.backup_interval - if initial_interval==0: - return None - else: - tm=self.timeclass.after(initial_interval) - return Operation(Operation.CREATE, tm, reason='initial') - elif not self.errors.ok(): - if self.retry_interval==0: - return None - else: - tm=dreamtime.MonotonicTime(self.lastrun_finished+self.retry_interval) - return Operation(Operation.CREATE, tm, reason='retry') - else: - if self.backup_interval==0: - return None - else: - tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval) - return Operation(Operation.CREATE, tm) - - def __status_unlocked(self): - callback=self.__status_update_callback - - if self.current_operation: - status=Status(self, self.current_operation) - elif self.scheduled_operation: - status=Status(self, self.scheduled_operation) - else: - status=Status(self) - - return status, callback - - def __update_status(self): - status, callback = self.__status_unlocked() - if callback: - #self._cond.release() - try: - callback(status) - except Exception: - self.logger.exception("Status update error") - #finally: - # self._cond.acquire() - - # - # Interface functions - # - - def set_status_update_callback(self, callback): - with self._cond: - self.__status_update_callback=callback - - def status(self): - with self._cond: - res=self.__status_unlocked() - return res[0] - - def create(self): - op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual') - with self._cond: - self.scheduled_operation=op - self._cond.notify() - - def prune(self): - op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual') - with self._cond: - self.scheduled_operation=op - self._cond.notify() - - # TODO: Decide exact (manual) abort mechanism. Perhaps two stages - def abort(self): - with self._cond: - if self.borg_instance: - self.borg_instance.terminate() -
--- a/borgend.py Sun Jan 28 11:38:01 2018 +0000 +++ b/borgend.py Sun Jan 28 11:54:46 2018 +0000 @@ -5,9 +5,10 @@ import sys import argparse import platform -import branding +import logging # Own modules needed at this stage -import locations +import borgend.branding as branding +import borgend.locations as locations # # Argument processing @@ -36,14 +37,12 @@ # Done parsing args, import our own modules, and launch everything # -import branding -import config -import dreamtime -import loggers -import logging -from scheduler import Scheduler -from repository import Repository -from backup import Backup +import borgend.config as config +import borgend.dreamtime as dreamtime +import borgend.loggers as loggers +from borgend.scheduler import Scheduler +from borgend.repository import Repository +from borgend.backup import Backup logger=loggers.mainlogger @@ -85,7 +84,7 @@ scheduler.join() else: # Start UI, and let it handle exit control - from ui import BorgendTray + from borgend.ui import BorgendTray tray=BorgendTray(backups); tray.run()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/backup.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,538 @@ +# +# Borgend Backup instance +# + +import logging +import time +from enum import IntEnum +from threading import Thread, Lock, Condition + +from . import config +from . import loggers +from . import repository +from . import dreamtime +from .instance import BorgInstance +from .scheduler import TerminableThread + +logger=loggers.get(__name__) + +JOIN_TIMEOUT=60 + +# +# State and operation related helper classes +# + +class State(IntEnum): + # State + INACTIVE=0 + SCHEDULED=1 + QUEUED=2 + ACTIVE=3 + + +class Errors(IntEnum): + OK=0 + BUSY=1 + OFFLINE=2 + ERRORS=3 + + def combine(self, other): + return max(self, other) + + def ok(self): + return self==self.OK + + def __str__(self): + return _errorstring[self] + +_errorstring={ + Errors.OK: 'ok', + Errors.BUSY: 'busy', + Errors.OFFLINE: 'offline', + Errors.ERRORS: 'errors' +} + +class Operation: + CREATE='create' + PRUNE='prune' + def __init__(self, operation, time, **kwargs): + self.operation=operation + self.time=time + self.detail=kwargs + + def when(self): + return self.time.realtime() + + +class Status(Operation): + def __init__(self, backup, op=None): + if op: + super().__init__(op.operation, op.time, **op.detail) + else: + super().__init__(None, None) + + self.name=backup.name + self.state=backup.state + self.errors=backup.errors + +# +# Miscellaneous helper routines +# + +loglevel_translation={ + 'CRITICAL': logging.CRITICAL, + 'ERROR': logging.ERROR, + 'WARNING': logging.WARNING, + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO +} + +def translate_loglevel(x): + if x in loglevel_translation: + return loglevel_translation[x] + else: + return logging.ERROR + +def safe_get_int(t, x): + if x in t: + tmp=t[x] + if isinstance(tmp, int): + return tmp + return None + +# +# The Backup class +# + +class Backup(TerminableThread): + + def __decode_config(self, cfg): + loc0='Backup %d' % self.identifier + + self.backup_name=config.check_string(cfg, 'name', 'Name', loc0) + + logger.debug("Configuring backup '%s'" % self.backup_name) + + self.logger=logger.getChild(self.backup_name) + + loc="Backup '%s'" % self.backup_name + + reponame=config.check_string(cfg, 'repository', + 'Target repository', loc) + + self.repository=repository.find_repository(reponame) + if not self.repository: + raise Exception("Repository '%s' not configured" % reponame) + + self.archive_prefix=config.check_string(cfg, 'archive_prefix', + 'Archive prefix', loc) + + self.archive_template=config.check_string(cfg, 'archive_template', + 'Archive template', loc) + + self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', + 'Backup interval', loc, + config.defaults['backup_interval']) + + self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', + 'Retry interval', loc, + config.defaults['retry_interval']) + + + scheduling=config.check_string(cfg, 'scheduling', + 'Scheduling mode', loc, + default="dreamtime") + + if scheduling=="dreamtime": + self.timeclass=dreamtime.DreamTime + elif scheduling=="realtime": + self.timeclass=dreamtime.MonotonicTime + elif scheduling=="manual": + self.backup_interval=0 + else: + logging.error("Invalid time class '%s' for %s" % (scheduling, loc)) + + self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) + + self.borg_parameters=config.BorgParameters.from_config(cfg, loc) + + + def __init__(self, identifier, cfg, scheduler): + self.identifier=identifier + self.__status_update_callback=None + self.scheduler=scheduler + self.logger=None # setup up in __decode_config once backup name is known + + self.borg_instance=None + self.thread_log=None + self.thread_res=None + + self.current_operation=None + self.scheduled_operation=None + self.lastrun_when=None + self.lastrun_finished=None + self.state=State.INACTIVE + self.errors=Errors.OK + self.timeclass=dreamtime.DreamTime + + self.__decode_config(cfg) + + super().__init__(target = self.__main_thread, name = self.backup_name) + self.daemon=True + + def is_running(self): + with self._cond: + running=self.__is_running_unlocked() + return running + + def __is_running_unlocked(self): + running=self.current_operation + if not running: + # Consistency check + assert((not self.borg_instance and not self.thread_log and + not self.thread_res)) + return running + + def __block_when_running(self): + running=self.is_running() + assert(not running) + + def __log_listener(self): + self.logger.debug('Log listener thread waiting for entries') + success=True + for msg in iter(self.borg_instance.read_log, None): + self.logger.info(str(msg)) + t=msg['type'] + + errormsg=None + callback=None + + if t=='progress_percent': + current=safe_get_int(msg, 'current') + total=safe_get_int(msg, 'total') + if current is not None and total is not None: + with self._cond: + self.current_operation.detail['progress_current']=current + self.current_operation.detail['progress_total']=total + status, callback=self.__status_unlocked() + + elif t=='archive_progress': + original_size=safe_get_int(msg, 'original_size') + compressed_size=safe_get_int(msg, 'compressed_size') + deduplicated_size=safe_get_int(msg, 'deduplicated_size') + if original_size is not None and original_size is not None and deduplicated_size is not None: + with self._cond: + self.current_operation.detail['original_size']=original_size + self.current_operation.detail['compressed_size']=compressed_size + self.current_operation.detail['deduplicated_size']=deduplicated_size + status, callback=self.__status_unlocked() + + elif t=='progress_message': + pass + + elif t=='file_status': + pass + + elif t=='log_message': + if 'levelname' not in msg: + msg['levelname']='ERROR' + if 'message' not in msg: + msg['message']='UNKNOWN' + if 'name' not in msg: + msg['name']='borg' + lvl=translate_loglevel(msg['levelname']) + self.logger.log(lvl, msg['name'] + ': ' + msg['message']) + if lvl>=logging.ERROR: + errormsg=msg + errors=Errors.ERRORS + if ('msgid' in msg and + (msg['msgid']=='LockTimeout' or # observed in reality + msg['msgid']=='LockErrorT' or # in docs + msg['msgid']=='LockErrorT')): # in docs + errors=Errors.BUSY + with self._cond: + self.errors=self.errors.combine(errors) + status, callback=self.__status_unlocked() + else: + self.logger.debug('Unrecognised log entry %s' % str(status)) + + if callback: + callback(status, errors=errormsg) + + self.logger.debug('Waiting for borg subprocess to terminate in log thread') + + self.borg_instance.wait() + + self.logger.debug('Borg subprocess terminated; terminating log listener thread') + + def __result_listener(self): + self.logger.debug('Result listener thread waiting for result') + + res=self.borg_instance.read_result() + + self.logger.debug('Borg result: %s' % str(res)) + + with self._cond: + if res is None and self.errors.ok(): + self.logger.error('No result from borg despite no error in log') + self.errors=Errors.ERRORS + + + def __do_launch(self, op, archive_or_repository, + common_params, op_params, paths=[]): + + inst=BorgInstance(op.operation, archive_or_repository, + common_params, op_params, paths) + + # Only the Repository object has access to the passphrase + self.repository.launch_borg_instance(inst) + + self.logger.debug('Creating listener threads') + + t_log=Thread(target=self.__log_listener) + t_log.daemon=True + + t_res=Thread(target=self.__result_listener) + t_res.daemon=True + + self.thread_log=t_log + self.thread_res=t_res + self.borg_instance=inst + self.current_operation=op + # Update scheduled time to real starting time to schedule + # next run relative to this + self.current_operation.time=dreamtime.MonotonicTime.now() + self.state=State.ACTIVE + # Reset error status when starting a new operation + self.errors=Errors.OK + self.__update_status() + + t_log.start() + t_res.start() + + + def __launch(self, op): + self.logger.debug("Launching '%s'" % str(op.operation)) + + params=(config.borg_parameters + +self.repository.borg_parameters + +self.borg_parameters) + + if op.operation==Operation.CREATE: + archive="%s::%s%s" % (self.repository.location, + self.archive_prefix, + self.archive_template) + + self.__do_launch(op, archive, params.common, + params.create, self.paths) + elif op.operation==Operation.PRUNE: + self.__do_launch(op, self.repository.location, params.common, + [{'prefix': self.archive_prefix}] + params.create) + + else: + raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) + + # This must be called with self._cond held. + def __launch_and_wait(self): + op=self.scheduled_operation + if not op: + self.logger.debug("Queued operation aborted") + else: + self.scheduled_operation=None + + self.__launch(op) + + self.__wait_finish() + + def __wait_finish(self): + # Wait for main logger thread to terminate, or for us to be terminated + while not self.terminate and self.thread_res.is_alive(): + self._cond.release() + self.thread_res.join(JOIN_TIMEOUT) + self._cond.acquire() + + # If terminate has been signalled, let outer termination handler + # take care of things (Within this Backup class, it would be cleanest + # to raise an exception instead, but in most other places it's better + # to just check self._terminate, so we don't complicate things with + # an extra exception.) + if self._terminate: + return + + self.logger.debug('Waiting for borg and log subprocesses to terminate') + + self._cond.release() + self.thread_log.join() + self._cond.acquire() + + if not self.borg_instance.wait(): + self.logger.error('Borg subprocess did not terminate') + self.errors=self.errors.combine(Errors.ERRORS) + + if self.current_operation.operation=='create': + self.lastrun_when=self.current_operation.time.monotonic() + self.lastrun_finished=time.monotonic() + self.thread_res=None + self.thread_log=None + self.borg_instance=None + self.current_operation=None + self.state=State.INACTIVE + self.__update_status() + + def __main_thread(self): + with self._cond: + try: + while not self._terminate: + assert(not self.current_operation) + self.__main_thread_wait_schedule() + if not self._terminate: + self.__main_thread_queue_and_launch() + except Exception as err: + self.logger.exception("Error with backup '%s'" % self.backup_name) + self.errors=Errors.ERRORS + + self.state=State.INACTIVE + self.scheduled_operation=None + + # Clean up to terminate: kill borg instance and communication threads + if self.borg_instance: + self.logger.debug("Terminating a borg instance") + self.borg_instance.terminate() + + # Store threads to use outside lock + thread_log=self.thread_log + thread_res=self.thread_res + self.thread_log=None + self.thread_res=None + + self.logger.debug("Waiting for log and result threads to terminate") + + if thread_log: + thread_log.join() + + if thread_res: + thread_res.join() + + # Main thread/2. Schedule next operation if there is no manually + # requested one + def __main_thread_wait_schedule(self): + op=None + if not self.scheduled_operation: + op=self.__next_operation_unlocked() + if op: + self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" % + (str(op.operation), op.detail or 'none', + op.time.seconds_to(), + op.time.__class__.__name__)) + + self.scheduled_operation=op + self.state=State.SCHEDULED + self.__update_status() + + # Wait under scheduled wait + self.scheduler.wait_until(op.time, self._cond, self.backup_name) + else: + # Nothing scheduled - just wait + self.logger.info("Waiting for manual scheduling") + + self.state=State.INACTIVE + self.__update_status() + + self._cond.wait() + + # Main thread/3. If there is a scheduled operation (it might have been + # changed manually from 'op' created in __main_thread_wait_schedule above), + # queue it on the repository, and launch the operation once repository + # available + def __main_thread_queue_and_launch(self): + if self.scheduled_operation: + self.logger.debug("Queuing") + self.state=State.QUEUED + self.__update_status() + res=self.repository.queue_action(self._cond, + action=self.__launch_and_wait, + name=self.backup_name) + if not res and not self._terminate: + self.logger.debug("Queueing aborted") + self.scheduled_operation=None + self.state=State.INACTIVE + self.__update_status() + + def __next_operation_unlocked(self): + # TODO: pruning as well + if not self.lastrun_finished: + initial_interval=self.retry_interval + if initial_interval==0: + initial_interval=self.backup_interval + if initial_interval==0: + return None + else: + tm=self.timeclass.after(initial_interval) + return Operation(Operation.CREATE, tm, reason='initial') + elif not self.errors.ok(): + if self.retry_interval==0: + return None + else: + tm=dreamtime.MonotonicTime(self.lastrun_finished+self.retry_interval) + return Operation(Operation.CREATE, tm, reason='retry') + else: + if self.backup_interval==0: + return None + else: + tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval) + return Operation(Operation.CREATE, tm) + + def __status_unlocked(self): + callback=self.__status_update_callback + + if self.current_operation: + status=Status(self, self.current_operation) + elif self.scheduled_operation: + status=Status(self, self.scheduled_operation) + else: + status=Status(self) + + return status, callback + + def __update_status(self): + status, callback = self.__status_unlocked() + if callback: + #self._cond.release() + try: + callback(status) + except Exception: + self.logger.exception("Status update error") + #finally: + # self._cond.acquire() + + # + # Interface functions + # + + def set_status_update_callback(self, callback): + with self._cond: + self.__status_update_callback=callback + + def status(self): + with self._cond: + res=self.__status_unlocked() + return res[0] + + def create(self): + op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual') + with self._cond: + self.scheduled_operation=op + self._cond.notify() + + def prune(self): + op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual') + with self._cond: + self.scheduled_operation=op + self._cond.notify() + + # TODO: Decide exact (manual) abort mechanism. Perhaps two stages + def abort(self): + with self._cond: + if self.borg_instance: + self.borg_instance.terminate() +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/branding.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,6 @@ +# +# Branding +# + +appname="borgend" +appname_stylised="Borgend"
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/config.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,174 @@ +#!/usr/local/bin/python3 +# +# Borgend configuration loader +# + +import yaml +import io +import os +import string +import logging +import platform +from functools import reduce + +from . import loggers +from . import branding +from . import locations + +logger=loggers.get(__name__) + +# +# Defaults +# + +defaults={ + # borg + # Default: backup every 6 hours (21600 seconds) + 'backup_interval': 21600, + # Default: retry every 15 minutes if unable to connect / unfinished backup + 'retry_interval': 900, + # Extract passphrases at startup or on demand? + 'extract_passphrases_at_startup': True, + # Do not insert a quit menu entry (useful for installing on computers of + # inexperienced users) + 'no_quit_menu_entry': False, + # Borg settings + 'borg': { + 'executable': 'borg', + 'common_parameters': [], + 'create_parameters': [], + 'prune_parameters': [], + } +} + +# +# Type checking etc. +# + +def error(x): + raise AssertionError(x) + +def check_field(cfg, field, descr, loc, default, check): + if field in cfg: + tmp=cfg[field] + if not check(tmp): + error("%s is of invalid type for %s" % (field, loc)) + return tmp + else: + if default is not None: + return default + else: + error("%s is not configured for %s" % (field, loc)) + +def check_bool(cfg, field, descr, loc, default=None): + return check_field(cfg, field, descr, loc, default, + lambda x: isinstance(x, bool)) + +def check_string(cfg, field, descr, loc, default=None): + return check_field(cfg, field, descr, loc, default, + lambda x: isinstance(x, str)) + +def check_dict(cfg, field, descr, loc, default=None): + return check_field(cfg, field, descr, loc, default, + lambda x: isinstance(x, dict)) + +def check_list(cfg, field, descr, loc, default=None): + return check_field(cfg, field, descr, loc, default, + lambda x: isinstance(x, list)) + +def is_list_of(x, chk): + if x is None: + return True + elif isinstance(x, list): + return reduce(lambda y, z: y and chk(z), x, True) + else: + return False + +def check_list_of_dicts(cfg, field, descr, loc, default=None): + return check_field(cfg, field, descr, loc, default, + lambda x: is_list_of(x, lambda z: isinstance(z, dict))) + +def check_list_of_strings(cfg, field, descr, loc, default=None): + return check_field(cfg, field, descr, loc, default, + lambda x: is_list_of(x, lambda z: isinstance(z, str))) + +def check_nonempty_list_of_strings(cfg, field, descr, loc): + return check_list_of_strings(cfg, field, descr, loc) and cfg[field] + + +def check_nonneg_int(cfg, field, descr, loc, default=None): + return check_field(cfg, field, descr, loc, default, + lambda x: isinstance(x, int) and x>=0) + + +# +# Borg command line parameter configuration helper routines and classes +# + +class BorgParameters: + def __init__(self, common, create, prune): + self.common=common or [] + self.create=create or [] + self.prune=prune or [] + + def from_config(cfg, loc): + common=check_list_of_dicts(cfg, 'common_parameters', + 'Borg parameters', loc, default=[]) + + create=check_list_of_dicts(cfg, 'create_parameters', + 'Create parameters', loc, default=[]) + + prune=check_list_of_dicts(cfg, 'prune_parameters', + 'Prune parameters', loc, default=[]) + + return BorgParameters(common, create, prune) + + def __add__(self, other): + common=self.common+other.common + create=self.create+other.create + prune=self.prune+other.prune + return BorgParameters(common, create, prune) + +# +# Load config on module load +# + +def expand_env(cfg, env): + if isinstance(cfg, dict): + out={key: expand_env(val, env) for key, val in cfg.items()} + elif isinstance(cfg, list): + out=[expand_env(val, env) for val in cfg] + elif isinstance(cfg, str): + out=string.Template(cfg).substitute(os.environ) + else: + out=cfg + + return out + +if not (os.path.exists(locations.cfgfile) and os.path.isfile(locations.cfgfile)): + raise SystemExit("Configuration file required: %s" % locations.cfgfile) + +logger.info("Reading configuration %s missing" % locations.cfgfile) + +with io.open(locations.cfgfile, 'r') as file: + settings=expand_env(yaml.load(file), os.environ); + +# +# Verify basic settings +# + +def check_and_set(cfg, field, loc, defa, fn): + cfg[field]=fn(cfg, field, field, loc, defa[field]) + +check_and_set(settings, 'backup_interval', 'top-level', defaults, check_nonneg_int) +check_and_set(settings, 'retry_interval', 'top-level', defaults, check_nonneg_int) +check_and_set(settings, 'extract_passphrases_at_startup', 'top-level', defaults, check_nonneg_int) +check_and_set(settings, 'no_quit_menu_entry', 'top-level', defaults, check_bool) +check_and_set(settings, 'borg', 'top-level', defaults, check_dict) +# Check parameters within 'borg' +if True: + check_and_set(settings['borg'], 'executable', 'borg', + defaults['borg'], check_string) + + borg_parameters=BorgParameters.from_config(settings['borg'], "top-level") +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/dreamtime.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,213 @@ +# +# Wake/sleep detection for scheduling adjustments +# + +import Foundation +import AppKit +import platform +import time +import threading +import weakref +import datetime + +from . import loggers + +logger=loggers.get(__name__) + +_dreamtime_monitor=None + +# +# Support classes for dealing with different times +# + +# Return difference (delay) of "dreamtime" to monotonic time +def dreamtime_difference(): + if _dreamtime_monitor: + return _dreamtime_monitor.diff() + else: + return time.monotonic() + +# Return "dreamtime" +def dreamtime(): + return max(0, time.monotonic()-dreamtime_difference()) + +class Time: + def realtime(self): + raise NotImplementedError + + def monotonic(self): + raise NotImplementedError + + @staticmethod + def _now(): + raise NotImplementedError + + @classmethod + def now(cls): + return cls(cls._now()) + + @classmethod + def from_realtime(cls, realtime): + return cls(realtime-time.time()+cls._now()) + + @classmethod + def from_monotonic(cls, monotonic): + return cls(monotonic-time.monotonic()+cls._now()) + + @classmethod + def after(cls, seconds): + return cls(cls._now()+seconds) + + def datetime(self): + return datetime.datetime.fromtimestamp(self.realtime()) + + def seconds_to(self): + return self._value-self._now() + + def __lt__(self, other): + return self.monotonic() < other.monotonic() + + def __gt__(self, other): + return self.monotonic() > other.monotonic() + + def __le__(self, other): + return self.monotonic() <= other.monotonic() + + def __ge__(self, other): + return self.monotonic() >= other.monotonic() + + def __eq__(self, other): + return self.monotonic() == other.realtime() + +class RealTime(Time): + def __init__(self, when): + self._value=when + + def realtime(self): + return self._value + + def monotonic(self): + return self._value+(time.monotonic()-time.time()) + + @staticmethod + def _now(): + return time.time() + +class MonotonicTime(Time): + def __init__(self, when): + self._value=when + + def realtime(self): + return self._value+(time.time()-time.monotonic()) + + def monotonic(self): + return self._value + + @staticmethod + def _now(): + return time.monotonic() + +class DreamTime(Time): + def __init__(self, when): + self._value=when + + def realtime(self): + return self._value+(time.time()-dreamtime()) + + # Important: monotonic is "static" within a wakeup period + # and does not need to call time.monotonic(), as it gets compared + # to a specific time.monotonic() realisation + def monotonic(self): + return self._value+dreamtime_difference() + + @staticmethod + def _now(): + return dreamtime() + +# +# Wake up / sleep handling +# + +class SleepHandler(Foundation.NSObject): + """ Handle wake/sleep notifications """ + + def init(self): + self.__sleeptime=None + self.__slept=0 + self.__epoch=time.monotonic() + self.__lock=threading.Lock() + self.__callbacks=weakref.WeakKeyDictionary() + + return self + + def handleSleepNotification_(self, aNotification): + logger.info("System going to sleep") + now=time.monotonic() + with self.__lock: + self.__sleeptime=now + + def handleWakeNotification_(self, aNotification): + logger.info("System waking up from sleep") + try: + now=time.monotonic() + with self.__lock: + if self.__sleeptime: + slept=max(0, now-self.__sleeptime) + logger.info("Slept %f seconds" % slept) + self.__slept=self.__slept+slept + self.__sleeptime=None + callbacks=self.__callbacks.copy() + except: + logger.exception("Bug in wakeup handler") + + for callback in callbacks.values(): + try: + callback() + except Exception: + logger.exception("Error in wake notification callback") + + # Return difference to time.monotonic() + def diff(self): + with self.__lock: + diff=self.__epoch+self.__slept + return diff + + # Weirdo (Py)ObjC naming to stop it form choking up + def addForObj_aCallback_(self, obj, callback): + with self.__lock: + self.__callbacks[obj]=callback + +# obj is to use a a key in a weak key dictionary +def add_callback(obj, callback): + global _dreamtime_monitor + + monitor=_dreamtime_monitor + if not monitor: + raise Exception("Dreamtime monitor not started") + else: + monitor.addForObj_aCallback_(obj, callback) + +def start_monitoring(): + global _dreamtime_monitor + + if platform.system()=='Darwin': + logger.debug("Starting to monitor system sleep") + workspace = AppKit.NSWorkspace.sharedWorkspace() + notification_center = workspace.notificationCenter() + _dreamtime_monitor = SleepHandler.new() + + notification_center.addObserver_selector_name_object_( + _dreamtime_monitor, + "handleSleepNotification:", + AppKit.NSWorkspaceWillSleepNotification, + None) + + notification_center.addObserver_selector_name_object_( + _dreamtime_monitor, + "handleWakeNotification:", + AppKit.NSWorkspaceDidWakeNotification, + None) + else: + logger.warning(("No system sleep monitor implemented for '%s'" + % platform.system())) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/fifolog.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,18 @@ +# +# FIFO memory logger +# + +from logging.handlers import BufferingHandler + +class FIFOHandler(BufferingHandler): + def shouldFlush(self, record): + return False + + def emit(self, record): + self.buffer.append(record) + l=len(self.buffer) + if l>self.capacity: + self.buffer=self.buffer[(l-self.capacity):(l-1)] + + def formatAll(self): + return [self.format(record) for record in self.buffer]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/instance.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,135 @@ +# +# Borgend borg launcher / processor +# + +import os +import json +import logging +from subprocess import Popen, PIPE + +from . import loggers +from .config import settings + +logger=loggers.get(__name__) + +necessary_opts=['--log-json', '--progress'] + +necessary_opts_for={ + 'create': ['--json'], + 'info': ['--json'], + 'list': ['--json'], +} + +# Conversion of config into command line +def arglistify(args): + flatten=lambda l: [item for sublist in l for item in sublist] + if args is None: + return [] + else: + return flatten([['--' + key, str(d[key])] for d in args for key in d]) + +class BorgInstance: + def __init__(self, operation, archive_or_repository, + common_params, op_params, paths): + self.operation=operation; + self.archive_or_repository=archive_or_repository; + self.common_params=common_params + self.op_params=op_params + self.paths=paths + + def construct_cmdline(self): + cmd=([settings['borg']['executable']]+necessary_opts+ + arglistify(self.common_params)+ + [self.operation]) + + if self.operation in necessary_opts_for: + cmd=cmd+necessary_opts_for[self.operation] + + return (cmd+arglistify(self.op_params) + +[self.archive_or_repository]+self.paths) + + def launch(self, passphrase=None): + cmd=self.construct_cmdline() + + logger.info('Launching ' + str(cmd)) + + # Set passphrase if not, or set to empty if not known, so borg + # won't hang waiting for it, which seems to happen even if we + # close stdin. + env=os.environ.copy() + env['BORG_PASSPHRASE']=passphrase or '' + + # Workaround: if launched is a standalone app created with py2app, + # borg will fail unless Python environment is reset. + # TODO: Of course, this will fail if the system needs the variables + # PYTHONPATH or PYTHONHOME set to certain values. + if '_PY2APP_LAUNCHED_' in env: + val=env['_PY2APP_LAUNCHED_'] + if val=='1': + del env['PYTHONPATH'] + del env['PYTHONHOME'] + + self.proc=Popen(cmd, env=env, stdout=PIPE, stderr=PIPE, stdin=PIPE) + + # We don't do passphrase input etc. + self.proc.stdin.close() + + def read_result(self): + stream=self.proc.stdout + line=stream.read(-1) + if line==b'': + logger.debug('Borg stdout pipe EOF?') + return None + + try: + return json.loads(line) + except Exception as err: + logger.warning('JSON parse failed on: "%s"' % line) + return None + + def read_log(self): + stream=self.proc.stderr + try: + line=stream.readline() + except err: + logger.debug('Pipe read failed: %s' % str(err)) + + return {'type': 'log_message', + 'levelname': 'CRITICAL', + 'name': 'borgend.instance.BorgInstance', + 'msgid': 'Borgend.Exception', + 'message': err} + + if line==b'': + + logger.debug('Borg stderr pipe EOF?') + + return None + + try: + res=json.loads(line) + if 'type' not in res: + res['type']='UNKNOWN' + return res + except: + logger.debug('JSON parse failed on: "%s"' % str(line)) + + errmsg=line + for line in iter(stream.readline, b''): + errmsg=errmsg+line + + return {'type': 'log_message', + 'levelname': 'ERROR', + 'name': 'borgend.instance.BorgInstance', + 'msgid': 'Borgend.JSONFail', + 'message': str(errmsg)} + + def terminate(self): + self.proc.terminate() + + def wait(self): + return self.proc.wait() is not None + + def has_terminated(self): + return self.proc.poll() is not None +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/locations.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,18 @@ +# +# Locations +# + +import os +import platform + +from . import branding + +if platform.system()!='Darwin': + import xdg + cfgfile=os.path.join(xdg.XDG_CONFIG_HOME, branding.appname, "config.yaml") + logs_dir=os.path.join(xdg.XDG_DATA_HOME, branding.appname, "logs") +else: + import rumps + __base=rumps.application_support(branding.appname) + cfgfile=os.path.join(__base, "config.yaml") + logs_dir=os.path.join(__base, "logs")
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/loggers.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,61 @@ +# +# Loggers +# + +import os +import logging +import logging.handlers +import atexit + +from .fifolog import FIFOHandler +from . import branding +from . import locations + +# +# Logging configuration +# + +loglevel=logging.INFO +logfmt="%(asctime)s:%(levelname)s:%(name)s:%(message)s" +fifo_capacity=1000 +fifo_fmt="%(asctime)s:%(levelname)s:%(message)s" + +# +# Setting up the main logger with fifo, stderr, and rotating files output +# + +mainlogger=logging.getLogger(branding.appname) +mainlogger.setLevel(loglevel) +mainlogger.propagate=True + +mainlogger.handlers.clear() + +# Internal FIFO history +fifo=FIFOHandler(fifo_capacity) +fifo.setFormatter(logging.Formatter(fifo_fmt)) +mainlogger.addHandler(fifo) + +# stderr +stderrlog=logging.StreamHandler() +stderrlog.setLevel(logging.WARNING) +mainlogger.addHandler(stderrlog) + +# Rotating files +if not os.path.isdir(locations.logs_dir): + os.makedirs(locations.logs_dir) + +fileslog=logging.handlers.TimedRotatingFileHandler( + os.path.join(locations.logs_dir, branding.appname+'.log'), + when='D', interval=1) +fileslog.setFormatter(logging.Formatter(logfmt)) +mainlogger.addHandler(fileslog) + +atexit.register(logging.shutdown) + +# +# Routine for obtaining sub-loggers +# + +def get(name): + return mainlogger.getChild(name) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/repository.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,143 @@ +# +# Repository abstraction for queuing +# + +import weakref +import keyring + +from . import loggers +from . import config +from .scheduler import QueueThread, QueuedEvent + +logger=loggers.get(__name__) + +class FIFOEvent(QueuedEvent): + def __init__(self, cond, name=None): + self._goodtogo=False + super().__init__(cond, name=name) + + def __lt__(self, other): + return False + +class FIFO(QueueThread): + def __init__(self, **kwargs): + super().__init__(target = self._fifo_thread, **kwargs) + + def _fifo_thread(self): + with self._cond: + while not self._terminate: + ev=self._list + if ev: + # We can only remove ev from the list when ev.cond allows + with ev.cond: + if not ev._goodtogo: + ev._goodtogo=True + ev.cond.notifyAll() + self._cond.wait() + + # Termination cleanup + ev=self._list + while ev: + # We can only remove ev from the list when ev.cond allows + with ev.cond: + ev.cond.notifyAll() + ev=ev.next + + # cond has to be acquired on entry! + def queue_action(self, cond, action=lambda: (), name=None): + ev=FIFOEvent(cond, name=name) + + with self._cond: + self._insert(ev) + + # This will release the lock on cond, allowing queue manager (scheduler) + # thread to notify us if we are already to be released + logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') + ev.cond.wait() + + try: + if ev._goodtogo: + logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') + # + # TODO: action() has to unlink on finish; so should maybe + # use weak references to event. + # Or we have to make action take all the time, so make the + # stdout thread. + # OR: Easiest to just move finish-waiting into __launch_check + # instead of at the outer level of the main loop. + # + action() + finally: + with self._cond: + self._unlink(ev) + # Let _fifo_thread proceed to next action + self._cond.notify() + + return ev._goodtogo + +repositories=weakref.WeakValueDictionary() + +class Repository(FIFO): + def __decode_config(self, cfg): + loc0='Repository %d' % self.identifier + + self.repository_name=config.check_string(cfg, 'name', 'Name', loc0) + + logger.debug("Configuring repository '%s'" % self.repository_name) + + loc = 'Repository "%s"' + + self.logger=logger.getChild(self.repository_name) + + self.location=config.check_string(cfg, 'location', + 'Target repository location', loc) + + self.borg_parameters=config.BorgParameters.from_config(cfg, loc) + + self.__keychain_account=config.check_string(cfg, 'keychain_account', + 'Keychain account', loc, + default='') + + self.__passphrase=None + + if config.settings['extract_passphrases_at_startup']: + try: + self.extract_passphrase() + except Exception: + pass + + def __init__(self, identifier, cfg): + self.identifier=identifier + self.__decode_config(cfg) + super().__init__(name = 'RepositoryThread %s' % self.repository_name) + repositories[self.repository_name]=self + + def __extract_passphrase(self): + acc=self.__keychain_account + if not self.__passphrase: + if acc and acc!='': + self.logger.debug('Requesting passphrase') + try: + pw=keyring.get_password("borg-backup", acc) + except Exception as err: + self.logger.error('Failed to retrieve passphrase') + raise err + else: + self.logger.debug('Received passphrase') + self.__passphrase=pw + else: + self.__passphrase=None + return self.__passphrase + + def launch_borg_instance(self, inst): + with self._cond: + passphrase=self.__extract_passphrase() + inst.launch(passphrase=passphrase) + +def find_repository(name): + if name in repositories: + return repositories[name] + else: + return None + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/scheduler.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,170 @@ +# +# Scheduler for Borgend +# +# This module simply provide a way for other threads to until a given time +# + +import time +from threading import Condition, Thread + +from . import loggers +from . import dreamtime + +logger=loggers.get(__name__) + +class QueuedEvent: + def __init__(self, cond, name=None): + self.next=None + self.prev=None + self.name=name + self.cond=cond + + def __lt__(self, other): + raise NotImplementedError + + def insert_after(self, ev): + if not self.next or ev<self.next: + self.insert_immediately_after(ev) + else: + self.next.insert_after(ev) + + def insert_immediately_after(self, ev): + assert(ev.next is None and ev.prev is None) + ev.prev=self + ev.next=self.next + self.next=ev + + def insert_immediately_before(self, ev): + assert(ev.next is None and ev.prev is None) + ev.next=self + ev.prev=self.prev + self.prev=ev + + def unlink(self): + n=self.next + p=self.prev + if n: + n.prev=p + if p: + p.next=n + self.next=None + self.prev=None + +class ScheduledEvent(QueuedEvent): + #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str) + def __init__(self, when, cond, name=None): + super().__init__(cond, name=name) + self.when=when + + def __lt__(self, other): + return self.when < other.when + +class TerminableThread(Thread): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._terminate=False + self._cond=Condition() + + def terminate(self): + with self._cond: + _terminate=True + self._cond.notify() + +class QueueThread(TerminableThread): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.daemon = True + self._list = None + + def _insert(self, ev): + if not self._list: + #logger.debug("Insert first") + self._list=ev + elif ev<self._list: + #logger.debug("Insert beginning") + self._list.insert_immediately_before(ev) + self._list=ev + else: + #logger.debug("Insert after") + self._list.insert_after(ev) + + self._cond.notify() + + def _unlink(self, ev): + if ev==self._list: + self._list=ev.next + ev.unlink() + + def _resort(self): + oldlist=self._list + self._list=None + while oldlist: + ev=oldlist + oldlist=oldlist.next + ev.unlink() + self._insert(ev) + + + +class Scheduler(QueueThread): + # Default to precision of 60 seconds: the scheduler thread will never + # sleep longer than that, to get quickly back on track with the schedule + # when the computer wakes up from sleep + def __init__(self, precision=60): + self.precision = precision + self._next_event_time = None + super().__init__(target = self._scheduler_thread, name = 'Scheduler') + dreamtime.add_callback(self, self._wakeup_callback) + + def _scheduler_thread(self): + logger.debug("Scheduler thread started") + with self._cond: + while not self._terminate: + now = time.monotonic() + if not self._list: + timeout = None + else: + # Wait at most precision seconds, or until next event if it + # comes earlier + timeout=min(self.precision, self._list.when.realtime()-now) + + if not timeout or timeout>0: + logger.debug("Scheduler waiting %d seconds" % (timeout or (-1))) + self._cond.wait(timeout) + now = time.monotonic() + + logger.debug("Scheduler timed out") + + while self._list and self._list.when.monotonic() <= now: + ev=self._list + logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) + # We are only allowed to remove ev from list when ev.cond allows + with ev.cond: + self._list=ev.next + ev.unlink() + ev.cond.notifyAll() + + def _wakeup_callback(self): + logger.debug("Rescheduling events after wakeup") + with self._cond: + self._resort() + + def _wait(self, ev): + with self._cond: + self._insert(ev) + + # This will release the lock on cond, allowing queue manager (scheduler) + # thread to notify us if we are already to be released + ev.cond.wait() + + # If we were woken up by some other event, not the scheduler, + # ensure the event is removed + with self._cond: + self._unlink(ev) + + # cond has to be acquired on entry! + def wait_until(self, when, cond, name=None): + logger.debug("Scheduling '%s' in %s seconds [%s]" % + (name, when.seconds_to(), when.__class__.__name__)) + self._wait(ScheduledEvent(when, cond, name)) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/borgend/ui.py Sun Jan 28 11:54:46 2018 +0000 @@ -0,0 +1,311 @@ +# +# Borgend MacOS UI +# + +import rumps +import time +import datetime +import objc +from threading import Lock, Timer + +from . import loggers +from . import backup +from . import dreamtime +from . import branding +from .config import settings + +logger=loggers.get(__name__) + +traynames_ok={ + backup.State.INACTIVE: 'B.', + backup.State.SCHEDULED: 'B.', + backup.State.QUEUED: 'B:', + backup.State.ACTIVE: 'B!', +} + +traynames_errors={ + # The first one should never be used + backup.Errors.OK: traynames_ok[backup.State.INACTIVE], + backup.Errors.BUSY: 'B⦙', + backup.Errors.OFFLINE: 'B⦙', + backup.Errors.ERRORS: 'B?' +} + +def trayname(ste): + state=ste[0] + errors=ste[1] + if not errors.ok(): + return traynames_errors[errors] + else: + return traynames_ok[state] + +def combine_state(a, b): + return (max(a[0], b[0]), max(a[1], b[1])) + +# Refresh the menu at most once a second to reduce flicker +refresh_interval=1.0 + +# Workaround to rumps brokenness; +# see https://github.com/jaredks/rumps/issues/59 +def notification_workaround(title, subtitle, message): + try: + NSDictionary = objc.lookUpClass("NSDictionary") + d=NSDictionary() + + rumps.notification(title, subtitle, message, data=d) + except Exception as err: + logger.exception("Failed to display notification") + +# Based on code snatched from +# https://stackoverflow.com/questions/12523586/python-format-size-application-converting-b-to-kb-mb-gb-tb/37423778 +def humanbytes(B): + 'Return the given bytes as a human friendly KB, MB, GB, or TB string' + B = float(B) + KB = float(1024) + MB = float(KB ** 2) # 1,048,576 + GB = float(KB ** 3) # 1,073,741,824 + TB = float(KB ** 4) # 1,099,511,627,776 + + if B < KB: + return '{0}B'.format(B) + elif KB <= B < MB: + return '{0:.2f}KB'.format(B/KB) + elif MB <= B < GB: + return '{0:.2f}MB'.format(B/MB) + elif GB <= B < TB: + return '{0:.2f}GB'.format(B/GB) + elif TB <= B: + return '{0:.2f}TB'.format(B/TB) + +def make_title(status): + def add_info(info, new): + if info: + return "%s; %s" % (info, new) + else: + return new + + info=None + this_need_reconstruct=None + + if not status.errors.ok(): + info=add_info(info, str(status.errors)) + + if status.state==backup.State.SCHEDULED: + # Operation scheduled + when=status.when() + now=time.time() + + if when<now: + whenstr='overdue' + info='' + else: + tnow=datetime.datetime.fromtimestamp(now) + twhen=datetime.datetime.fromtimestamp(when) + tendtoday=twhen.replace(hour=23,minute=59,second=59) + tendtomorrow=tendtoday+datetime.timedelta(days=1) + diff=datetime.timedelta(seconds=when-now) + + if twhen>tendtomorrow: + whenday=datetime.date.fromtimestamp(when) + whenstr='on %s' % twhen.date().isoformat() + this_need_reconstruct=tendtoday+datetime.timedelta(seconds=1) + elif diff.seconds>=12*60*60: # 12 hours + whenstr='tomorrow' + this_need_reconstruct=twhen-datetime.timedelta(hours=12) + else: + twhen=time.localtime(when) + if twhen.tm_sec>30: + # Round up minute display to avoid user confusion + twhen=time.localtime(when+30) + whenstr='at %02d:%02d' % (twhen.tm_hour, twhen.tm_min) + + this_info='' + if 'reason' in status.detail: + this_info=status.detail['reason'] + ' ' + + when_how_sched= "%s%s %s" % (this_info, status.operation, whenstr) + + info=add_info(info, when_how_sched) + + elif status.state==backup.State.QUEUED: + info=add_info(info, "queued") + elif status.state==backup.State.ACTIVE: + # Operation running + progress='' + d=status.detail + if 'progress_current' in d and 'progress_total' in d: + progress=' %d%%' % (d['progress_current']/d['progress_total']) + elif 'original_size' in d and 'deduplicated_size' in d: + progress=' %s→%s' % (humanbytes(d['original_size']), + humanbytes(d['deduplicated_size'])) + + howrunning = "running %s%s" % (status.operation, progress) + + info=add_info(info, howrunning) + else: + pass + + if info: + title=status.name + ' (' + info + ')' + else: + title=status.name + + return title, (status.state, status.errors), this_need_reconstruct + +class BorgendTray(rumps.App): + def __init__(self, backups): + self.lock=Lock() + self.backups=backups + self.refresh_timer=None + self.refresh_timer_time=None + self.statuses=[None]*len(backups) + + for index in range(len(backups)): + self.statuses[index]=backups[index].status() + + menu, title=self.build_menu_and_timer() + + super().__init__(title, menu=menu, quit_button=None) + + for index in range(len(backups)): + # Python closures suck dog's balls; hence the _index=index hack + # See also http://math.andrej.com/2009/04/09/pythons-lambda-is-broken/ + cb=(lambda status, errors=None, _index=index: + self.__status_callback(_index, status, errorlog=errors)) + backups[index].set_status_update_callback(cb) + + dreamtime.add_callback(self, self.refresh_ui) + + def __rebuild_menu(self): + menu=[] + state=(backup.State.INACTIVE, backup.Errors.OK) + need_reconstruct=None + for index in range(len(self.backups)): + b=self.backups[index] + title, this_state, this_need_reconstruct=make_title(self.statuses[index]) + # Python closures suck dog's balls... + # first and the last program I write in Python until somebody + # fixes this brain damage + cbm=lambda sender, _b=b: self.__menu_select_backup(sender, _b) + item=rumps.MenuItem(title, callback=cbm) + if not this_state[1].ok(): + item.state=-1 + elif this_state[0]==backup.State.SCHEDULED or this_state[0]==backup.State.QUEUED: + item.state=1 + menu.append(item) + state=combine_state(state, this_state) + + # Do we have to automatically update menu display? + if not need_reconstruct: + need_reconstruct=this_need_reconstruct + elif this_need_reconstruct: + need_reconstruct=min(need_reconstruct, this_need_reconstruct) + + menu_log=rumps.MenuItem("Show log", callback=lambda _: showlog()) + menu.append(menu_log) + + if not settings['no_quit_menu_entry']: + menu_quit=rumps.MenuItem("Quit...", callback=lambda _: self.quit()) + menu.append(menu_quit) + + return menu, state, need_reconstruct + + def build_menu_and_timer(self): + if self.refresh_timer: + self.refresh_timer.cancel() + self.refresh_timer=None + self.refresh_timer_time=None + logger.debug('Rebuilding menu') + menu, state, need_reconstruct=self.__rebuild_menu() + title=trayname(state) + + if need_reconstruct: + when=time.mktime(need_reconstruct.timetuple()) + delay=when-time.time() + self.refresh_timer=Timer(delay, self.refresh_ui) + self.refresh_timer_time=need_reconstruct + self.refresh_timer.start() + + return menu, title + + def refresh_ui(self): + with self.lock: + menu, title=self.build_menu_and_timer() + self.menu.clear() + self.menu.update(menu) + self.title=title + + def __status_callback(self, index, status, errorlog=None): + logger.debug("Tray status callback") + with self.lock: + self.statuses[index]=status + # Time the refresh if it has not been timed, or if the timer + # is timing for the "long-term" (refresh_timer_time set) + if not self.refresh_timer or self.refresh_timer_time: + logger.debug("Timing refresh") + self.refresh_timer=Timer(refresh_interval, self.refresh_ui) + # refresh_timer_time is only set for "long-term timers" + self.refresh_timer_time=None + self.refresh_timer.start() + + if errorlog: + if 'msgid' not in errorlog or not isinstance(errorlog['msgid'], str): + msgid='UnknownError' + else: + msgid=errorlog['msgid'] + + logger.debug("Opening notification for error %s '%s'", + msgid, errorlog['message']) + + notification_workaround(branding.appname_stylised, + msgid, errorlog['message']) + + def quit(self): + rumps.quit_application() + + def __menu_select_backup(self, sender, b): + #sender.state=not sender.state + logger.debug("Manually backup '%s'", b.name) + try: + b.create() + except Exception as err: + logger.exception("Failure to initialise backup") + notification_workaround(branding.appname_stylised, + err.__class__.__name__, str(err)) + +# +# Log window +# + +logwindow=[None] +logwindow_lock=Lock() + +def showlog(): + try: + w=None + with logwindow_lock: + if not logwindow[0]: + lines=borgend.fifolog.formatAll() + msg="\n".join(lines[0:]) + w=rumps.Window(title=borgend.appname_stylised+' log', + default_text=msg, + ok='Close', + dimensions=(640,320)) + logwindow[0]=w + if w: + try: + w.run() + finally: + with logwindow_lock: + logwindow[0]=None + except Exception as err: + logger.exception("Failed to display log") + +# +# Notification click response => show log window +# + +@rumps.notifications +def notification_center(_): + showlog() +
--- a/branding.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,6 +0,0 @@ -# -# Branding -# - -appname="borgend" -appname_stylised="Borgend"
--- a/config.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,173 +0,0 @@ -#!/usr/local/bin/python3 -# -# Borgend configuration loader -# - -import yaml -import io -import os -import string -import logging -import platform -from functools import reduce -import loggers -import branding -import locations - -logger=loggers.get(__name__) - -# -# Defaults -# - -defaults={ - # borg - # Default: backup every 6 hours (21600 seconds) - 'backup_interval': 21600, - # Default: retry every 15 minutes if unable to connect / unfinished backup - 'retry_interval': 900, - # Extract passphrases at startup or on demand? - 'extract_passphrases_at_startup': True, - # Do not insert a quit menu entry (useful for installing on computers of - # inexperienced users) - 'no_quit_menu_entry': False, - # Borg settings - 'borg': { - 'executable': 'borg', - 'common_parameters': [], - 'create_parameters': [], - 'prune_parameters': [], - } -} - -# -# Type checking etc. -# - -def error(x): - raise AssertionError(x) - -def check_field(cfg, field, descr, loc, default, check): - if field in cfg: - tmp=cfg[field] - if not check(tmp): - error("%s is of invalid type for %s" % (field, loc)) - return tmp - else: - if default is not None: - return default - else: - error("%s is not configured for %s" % (field, loc)) - -def check_bool(cfg, field, descr, loc, default=None): - return check_field(cfg, field, descr, loc, default, - lambda x: isinstance(x, bool)) - -def check_string(cfg, field, descr, loc, default=None): - return check_field(cfg, field, descr, loc, default, - lambda x: isinstance(x, str)) - -def check_dict(cfg, field, descr, loc, default=None): - return check_field(cfg, field, descr, loc, default, - lambda x: isinstance(x, dict)) - -def check_list(cfg, field, descr, loc, default=None): - return check_field(cfg, field, descr, loc, default, - lambda x: isinstance(x, list)) - -def is_list_of(x, chk): - if x is None: - return True - elif isinstance(x, list): - return reduce(lambda y, z: y and chk(z), x, True) - else: - return False - -def check_list_of_dicts(cfg, field, descr, loc, default=None): - return check_field(cfg, field, descr, loc, default, - lambda x: is_list_of(x, lambda z: isinstance(z, dict))) - -def check_list_of_strings(cfg, field, descr, loc, default=None): - return check_field(cfg, field, descr, loc, default, - lambda x: is_list_of(x, lambda z: isinstance(z, str))) - -def check_nonempty_list_of_strings(cfg, field, descr, loc): - return check_list_of_strings(cfg, field, descr, loc) and cfg[field] - - -def check_nonneg_int(cfg, field, descr, loc, default=None): - return check_field(cfg, field, descr, loc, default, - lambda x: isinstance(x, int) and x>=0) - - -# -# Borg command line parameter configuration helper routines and classes -# - -class BorgParameters: - def __init__(self, common, create, prune): - self.common=common or [] - self.create=create or [] - self.prune=prune or [] - - def from_config(cfg, loc): - common=check_list_of_dicts(cfg, 'common_parameters', - 'Borg parameters', loc, default=[]) - - create=check_list_of_dicts(cfg, 'create_parameters', - 'Create parameters', loc, default=[]) - - prune=check_list_of_dicts(cfg, 'prune_parameters', - 'Prune parameters', loc, default=[]) - - return BorgParameters(common, create, prune) - - def __add__(self, other): - common=self.common+other.common - create=self.create+other.create - prune=self.prune+other.prune - return BorgParameters(common, create, prune) - -# -# Load config on module load -# - -def expand_env(cfg, env): - if isinstance(cfg, dict): - out={key: expand_env(val, env) for key, val in cfg.items()} - elif isinstance(cfg, list): - out=[expand_env(val, env) for val in cfg] - elif isinstance(cfg, str): - out=string.Template(cfg).substitute(os.environ) - else: - out=cfg - - return out - -if not (os.path.exists(locations.cfgfile) and os.path.isfile(locations.cfgfile)): - raise SystemExit("Configuration file required: %s" % locations.cfgfile) - -logger.info("Reading configuration %s missing" % locations.cfgfile) - -with io.open(locations.cfgfile, 'r') as file: - settings=expand_env(yaml.load(file), os.environ); - -# -# Verify basic settings -# - -def check_and_set(cfg, field, loc, defa, fn): - cfg[field]=fn(cfg, field, field, loc, defa[field]) - -check_and_set(settings, 'backup_interval', 'top-level', defaults, check_nonneg_int) -check_and_set(settings, 'retry_interval', 'top-level', defaults, check_nonneg_int) -check_and_set(settings, 'extract_passphrases_at_startup', 'top-level', defaults, check_nonneg_int) -check_and_set(settings, 'no_quit_menu_entry', 'top-level', defaults, check_bool) -check_and_set(settings, 'borg', 'top-level', defaults, check_dict) -# Check parameters within 'borg' -if True: - check_and_set(settings['borg'], 'executable', 'borg', - defaults['borg'], check_string) - - borg_parameters=BorgParameters.from_config(settings['borg'], "top-level") -
--- a/dreamtime.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,212 +0,0 @@ -# -# Wake/sleep detection for scheduling adjustments -# - -import Foundation -import AppKit -import platform -import time -import threading -import weakref -import datetime -import loggers - -logger=loggers.get(__name__) - -_dreamtime_monitor=None - -# -# Support classes for dealing with different times -# - -# Return difference (delay) of "dreamtime" to monotonic time -def dreamtime_difference(): - if _dreamtime_monitor: - return _dreamtime_monitor.diff() - else: - return time.monotonic() - -# Return "dreamtime" -def dreamtime(): - return max(0, time.monotonic()-dreamtime_difference()) - -class Time: - def realtime(self): - raise NotImplementedError - - def monotonic(self): - raise NotImplementedError - - @staticmethod - def _now(): - raise NotImplementedError - - @classmethod - def now(cls): - return cls(cls._now()) - - @classmethod - def from_realtime(cls, realtime): - return cls(realtime-time.time()+cls._now()) - - @classmethod - def from_monotonic(cls, monotonic): - return cls(monotonic-time.monotonic()+cls._now()) - - @classmethod - def after(cls, seconds): - return cls(cls._now()+seconds) - - def datetime(self): - return datetime.datetime.fromtimestamp(self.realtime()) - - def seconds_to(self): - return self._value-self._now() - - def __lt__(self, other): - return self.monotonic() < other.monotonic() - - def __gt__(self, other): - return self.monotonic() > other.monotonic() - - def __le__(self, other): - return self.monotonic() <= other.monotonic() - - def __ge__(self, other): - return self.monotonic() >= other.monotonic() - - def __eq__(self, other): - return self.monotonic() == other.realtime() - -class RealTime(Time): - def __init__(self, when): - self._value=when - - def realtime(self): - return self._value - - def monotonic(self): - return self._value+(time.monotonic()-time.time()) - - @staticmethod - def _now(): - return time.time() - -class MonotonicTime(Time): - def __init__(self, when): - self._value=when - - def realtime(self): - return self._value+(time.time()-time.monotonic()) - - def monotonic(self): - return self._value - - @staticmethod - def _now(): - return time.monotonic() - -class DreamTime(Time): - def __init__(self, when): - self._value=when - - def realtime(self): - return self._value+(time.time()-dreamtime()) - - # Important: monotonic is "static" within a wakeup period - # and does not need to call time.monotonic(), as it gets compared - # to a specific time.monotonic() realisation - def monotonic(self): - return self._value+dreamtime_difference() - - @staticmethod - def _now(): - return dreamtime() - -# -# Wake up / sleep handling -# - -class SleepHandler(Foundation.NSObject): - """ Handle wake/sleep notifications """ - - def init(self): - self.__sleeptime=None - self.__slept=0 - self.__epoch=time.monotonic() - self.__lock=threading.Lock() - self.__callbacks=weakref.WeakKeyDictionary() - - return self - - def handleSleepNotification_(self, aNotification): - logger.info("System going to sleep") - now=time.monotonic() - with self.__lock: - self.__sleeptime=now - - def handleWakeNotification_(self, aNotification): - logger.info("System waking up from sleep") - try: - now=time.monotonic() - with self.__lock: - if self.__sleeptime: - slept=max(0, now-self.__sleeptime) - logger.info("Slept %f seconds" % slept) - self.__slept=self.__slept+slept - self.__sleeptime=None - callbacks=self.__callbacks.copy() - except: - logger.exception("Bug in wakeup handler") - - for callback in callbacks.values(): - try: - callback() - except Exception: - logger.exception("Error in wake notification callback") - - # Return difference to time.monotonic() - def diff(self): - with self.__lock: - diff=self.__epoch+self.__slept - return diff - - # Weirdo (Py)ObjC naming to stop it form choking up - def addForObj_aCallback_(self, obj, callback): - with self.__lock: - self.__callbacks[obj]=callback - -# obj is to use a a key in a weak key dictionary -def add_callback(obj, callback): - global _dreamtime_monitor - - monitor=_dreamtime_monitor - if not monitor: - raise Exception("Dreamtime monitor not started") - else: - monitor.addForObj_aCallback_(obj, callback) - -def start_monitoring(): - global _dreamtime_monitor - - if platform.system()=='Darwin': - logger.debug("Starting to monitor system sleep") - workspace = AppKit.NSWorkspace.sharedWorkspace() - notification_center = workspace.notificationCenter() - _dreamtime_monitor = SleepHandler.new() - - notification_center.addObserver_selector_name_object_( - _dreamtime_monitor, - "handleSleepNotification:", - AppKit.NSWorkspaceWillSleepNotification, - None) - - notification_center.addObserver_selector_name_object_( - _dreamtime_monitor, - "handleWakeNotification:", - AppKit.NSWorkspaceDidWakeNotification, - None) - else: - logger.warning(("No system sleep monitor implemented for '%s'" - % platform.system())) -
--- a/fifolog.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,18 +0,0 @@ -# -# FIFO memory logger -# - -from logging.handlers import BufferingHandler - -class FIFOHandler(BufferingHandler): - def shouldFlush(self, record): - return False - - def emit(self, record): - self.buffer.append(record) - l=len(self.buffer) - if l>self.capacity: - self.buffer=self.buffer[(l-self.capacity):(l-1)] - - def formatAll(self): - return [self.format(record) for record in self.buffer]
--- a/instance.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,134 +0,0 @@ -# -# Borgend borg launcher / processor -# - -import json -import logging -import os -import loggers -from config import settings -from subprocess import Popen, PIPE - -logger=loggers.get(__name__) - -necessary_opts=['--log-json', '--progress'] - -necessary_opts_for={ - 'create': ['--json'], - 'info': ['--json'], - 'list': ['--json'], -} - -# Conversion of config into command line -def arglistify(args): - flatten=lambda l: [item for sublist in l for item in sublist] - if args is None: - return [] - else: - return flatten([['--' + key, str(d[key])] for d in args for key in d]) - -class BorgInstance: - def __init__(self, operation, archive_or_repository, - common_params, op_params, paths): - self.operation=operation; - self.archive_or_repository=archive_or_repository; - self.common_params=common_params - self.op_params=op_params - self.paths=paths - - def construct_cmdline(self): - cmd=([settings['borg']['executable']]+necessary_opts+ - arglistify(self.common_params)+ - [self.operation]) - - if self.operation in necessary_opts_for: - cmd=cmd+necessary_opts_for[self.operation] - - return (cmd+arglistify(self.op_params) - +[self.archive_or_repository]+self.paths) - - def launch(self, passphrase=None): - cmd=self.construct_cmdline() - - logger.info('Launching ' + str(cmd)) - - # Set passphrase if not, or set to empty if not known, so borg - # won't hang waiting for it, which seems to happen even if we - # close stdin. - env=os.environ.copy() - env['BORG_PASSPHRASE']=passphrase or '' - - # Workaround: if launched is a standalone app created with py2app, - # borg will fail unless Python environment is reset. - # TODO: Of course, this will fail if the system needs the variables - # PYTHONPATH or PYTHONHOME set to certain values. - if '_PY2APP_LAUNCHED_' in env: - val=env['_PY2APP_LAUNCHED_'] - if val=='1': - del env['PYTHONPATH'] - del env['PYTHONHOME'] - - self.proc=Popen(cmd, env=env, stdout=PIPE, stderr=PIPE, stdin=PIPE) - - # We don't do passphrase input etc. - self.proc.stdin.close() - - def read_result(self): - stream=self.proc.stdout - line=stream.read(-1) - if line==b'': - logger.debug('Borg stdout pipe EOF?') - return None - - try: - return json.loads(line) - except Exception as err: - logger.warning('JSON parse failed on: "%s"' % line) - return None - - def read_log(self): - stream=self.proc.stderr - try: - line=stream.readline() - except err: - logger.debug('Pipe read failed: %s' % str(err)) - - return {'type': 'log_message', - 'levelname': 'CRITICAL', - 'name': 'borgend.instance.BorgInstance', - 'msgid': 'Borgend.Exception', - 'message': err} - - if line==b'': - - logger.debug('Borg stderr pipe EOF?') - - return None - - try: - res=json.loads(line) - if 'type' not in res: - res['type']='UNKNOWN' - return res - except: - logger.debug('JSON parse failed on: "%s"' % str(line)) - - errmsg=line - for line in iter(stream.readline, b''): - errmsg=errmsg+line - - return {'type': 'log_message', - 'levelname': 'ERROR', - 'name': 'borgend.instance.BorgInstance', - 'msgid': 'Borgend.JSONFail', - 'message': str(errmsg)} - - def terminate(self): - self.proc.terminate() - - def wait(self): - return self.proc.wait() is not None - - def has_terminated(self): - return self.proc.poll() is not None -
--- a/locations.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,17 +0,0 @@ -# -# Locations -# - -import os -import platform -import branding - -if platform.system()!='Darwin': - import xdg - cfgfile=os.path.join(xdg.XDG_CONFIG_HOME, branding.appname, "config.yaml") - logs_dir=os.path.join(xdg.XDG_DATA_HOME, branding.appname, "logs") -else: - import rumps - __base=rumps.application_support(branding.appname) - cfgfile=os.path.join(__base, "config.yaml") - logs_dir=os.path.join(__base, "logs")
--- a/loggers.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,61 +0,0 @@ -# -# Loggers -# - -import os -import logging -import logging.handlers -import atexit - -from fifolog import FIFOHandler -import branding -import locations - -# -# Logging configuration -# - -loglevel=logging.INFO -logfmt="%(asctime)s:%(levelname)s:%(name)s:%(message)s" -fifo_capacity=1000 -fifo_fmt="%(asctime)s:%(levelname)s:%(message)s" - -# -# Setting up the main logger with fifo, stderr, and rotating files output -# - -mainlogger=logging.getLogger(branding.appname) -mainlogger.setLevel(loglevel) -mainlogger.propagate=True - -mainlogger.handlers.clear() - -# Internal FIFO history -fifo=FIFOHandler(fifo_capacity) -fifo.setFormatter(logging.Formatter(fifo_fmt)) -mainlogger.addHandler(fifo) - -# stderr -stderrlog=logging.StreamHandler() -stderrlog.setLevel(logging.WARNING) -mainlogger.addHandler(stderrlog) - -# Rotating files -if not os.path.isdir(locations.logs_dir): - os.makedirs(locations.logs_dir) - -fileslog=logging.handlers.TimedRotatingFileHandler( - os.path.join(locations.logs_dir, branding.appname+'.log'), - when='D', interval=1) -fileslog.setFormatter(logging.Formatter(logfmt)) -mainlogger.addHandler(fileslog) - -atexit.register(logging.shutdown) - -# -# Routine for obtaining sub-loggers -# - -def get(name): - return mainlogger.getChild(name) -
--- a/repository.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,142 +0,0 @@ -# -# Repository abstraction for queuing -# - -import weakref -import keyring -import loggers -import config -from scheduler import QueueThread, QueuedEvent - -logger=loggers.get(__name__) - -class FIFOEvent(QueuedEvent): - def __init__(self, cond, name=None): - self._goodtogo=False - super().__init__(cond, name=name) - - def __lt__(self, other): - return False - -class FIFO(QueueThread): - def __init__(self, **kwargs): - super().__init__(target = self._fifo_thread, **kwargs) - - def _fifo_thread(self): - with self._cond: - while not self._terminate: - ev=self._list - if ev: - # We can only remove ev from the list when ev.cond allows - with ev.cond: - if not ev._goodtogo: - ev._goodtogo=True - ev.cond.notifyAll() - self._cond.wait() - - # Termination cleanup - ev=self._list - while ev: - # We can only remove ev from the list when ev.cond allows - with ev.cond: - ev.cond.notifyAll() - ev=ev.next - - # cond has to be acquired on entry! - def queue_action(self, cond, action=lambda: (), name=None): - ev=FIFOEvent(cond, name=name) - - with self._cond: - self._insert(ev) - - # This will release the lock on cond, allowing queue manager (scheduler) - # thread to notify us if we are already to be released - logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') - ev.cond.wait() - - try: - if ev._goodtogo: - logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') - # - # TODO: action() has to unlink on finish; so should maybe - # use weak references to event. - # Or we have to make action take all the time, so make the - # stdout thread. - # OR: Easiest to just move finish-waiting into __launch_check - # instead of at the outer level of the main loop. - # - action() - finally: - with self._cond: - self._unlink(ev) - # Let _fifo_thread proceed to next action - self._cond.notify() - - return ev._goodtogo - -repositories=weakref.WeakValueDictionary() - -class Repository(FIFO): - def __decode_config(self, cfg): - loc0='Repository %d' % self.identifier - - self.repository_name=config.check_string(cfg, 'name', 'Name', loc0) - - logger.debug("Configuring repository '%s'" % self.repository_name) - - loc = 'Repository "%s"' - - self.logger=logger.getChild(self.repository_name) - - self.location=config.check_string(cfg, 'location', - 'Target repository location', loc) - - self.borg_parameters=config.BorgParameters.from_config(cfg, loc) - - self.__keychain_account=config.check_string(cfg, 'keychain_account', - 'Keychain account', loc, - default='') - - self.__passphrase=None - - if config.settings['extract_passphrases_at_startup']: - try: - self.extract_passphrase() - except Exception: - pass - - def __init__(self, identifier, cfg): - self.identifier=identifier - self.__decode_config(cfg) - super().__init__(name = 'RepositoryThread %s' % self.repository_name) - repositories[self.repository_name]=self - - def __extract_passphrase(self): - acc=self.__keychain_account - if not self.__passphrase: - if acc and acc!='': - self.logger.debug('Requesting passphrase') - try: - pw=keyring.get_password("borg-backup", acc) - except Exception as err: - self.logger.error('Failed to retrieve passphrase') - raise err - else: - self.logger.debug('Received passphrase') - self.__passphrase=pw - else: - self.__passphrase=None - return self.__passphrase - - def launch_borg_instance(self, inst): - with self._cond: - passphrase=self.__extract_passphrase() - inst.launch(passphrase=passphrase) - -def find_repository(name): - if name in repositories: - return repositories[name] - else: - return None - -
--- a/scheduler.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,169 +0,0 @@ -# -# Scheduler for Borgend -# -# This module simply provide a way for other threads to until a given time -# - -import time -import loggers -import dreamtime -from threading import Condition, Lock, Thread - -logger=loggers.get(__name__) - -class QueuedEvent: - def __init__(self, cond, name=None): - self.next=None - self.prev=None - self.name=name - self.cond=cond - - def __lt__(self, other): - raise NotImplementedError - - def insert_after(self, ev): - if not self.next or ev<self.next: - self.insert_immediately_after(ev) - else: - self.next.insert_after(ev) - - def insert_immediately_after(self, ev): - assert(ev.next is None and ev.prev is None) - ev.prev=self - ev.next=self.next - self.next=ev - - def insert_immediately_before(self, ev): - assert(ev.next is None and ev.prev is None) - ev.next=self - ev.prev=self.prev - self.prev=ev - - def unlink(self): - n=self.next - p=self.prev - if n: - n.prev=p - if p: - p.next=n - self.next=None - self.prev=None - -class ScheduledEvent(QueuedEvent): - #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str) - def __init__(self, when, cond, name=None): - super().__init__(cond, name=name) - self.when=when - - def __lt__(self, other): - return self.when < other.when - -class TerminableThread(Thread): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._terminate=False - self._cond=Condition() - - def terminate(self): - with self._cond: - _terminate=True - self._cond.notify() - -class QueueThread(TerminableThread): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.daemon = True - self._list = None - - def _insert(self, ev): - if not self._list: - #logger.debug("Insert first") - self._list=ev - elif ev<self._list: - #logger.debug("Insert beginning") - self._list.insert_immediately_before(ev) - self._list=ev - else: - #logger.debug("Insert after") - self._list.insert_after(ev) - - self._cond.notify() - - def _unlink(self, ev): - if ev==self._list: - self._list=ev.next - ev.unlink() - - def _resort(self): - oldlist=self._list - self._list=None - while oldlist: - ev=oldlist - oldlist=oldlist.next - ev.unlink() - self._insert(ev) - - - -class Scheduler(QueueThread): - # Default to precision of 60 seconds: the scheduler thread will never - # sleep longer than that, to get quickly back on track with the schedule - # when the computer wakes up from sleep - def __init__(self, precision=60): - self.precision = precision - self._next_event_time = None - super().__init__(target = self._scheduler_thread, name = 'Scheduler') - dreamtime.add_callback(self, self._wakeup_callback) - - def _scheduler_thread(self): - logger.debug("Scheduler thread started") - with self._cond: - while not self._terminate: - now = time.monotonic() - if not self._list: - timeout = None - else: - # Wait at most precision seconds, or until next event if it - # comes earlier - timeout=min(self.precision, self._list.when.realtime()-now) - - if not timeout or timeout>0: - logger.debug("Scheduler waiting %d seconds" % (timeout or (-1))) - self._cond.wait(timeout) - now = time.monotonic() - - logger.debug("Scheduler timed out") - - while self._list and self._list.when.monotonic() <= now: - ev=self._list - logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) - # We are only allowed to remove ev from list when ev.cond allows - with ev.cond: - self._list=ev.next - ev.unlink() - ev.cond.notifyAll() - - def _wakeup_callback(self): - logger.debug("Rescheduling events after wakeup") - with self._cond: - self._resort() - - def _wait(self, ev): - with self._cond: - self._insert(ev) - - # This will release the lock on cond, allowing queue manager (scheduler) - # thread to notify us if we are already to be released - ev.cond.wait() - - # If we were woken up by some other event, not the scheduler, - # ensure the event is removed - with self._cond: - self._unlink(ev) - - # cond has to be acquired on entry! - def wait_until(self, when, cond, name=None): - logger.debug("Scheduling '%s' in %s seconds [%s]" % - (name, when.seconds_to(), when.__class__.__name__)) - self._wait(ScheduledEvent(when, cond, name)) -
--- a/ui.py Sun Jan 28 11:38:01 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,311 +0,0 @@ -# -# Borgend MacOS UI -# - -import rumps -import time -import datetime -import objc -from threading import Lock, Timer - -import loggers -import backup -import dreamtime -import branding -from config import settings - -logger=loggers.get(__name__) - -traynames_ok={ - backup.State.INACTIVE: 'B.', - backup.State.SCHEDULED: 'B.', - backup.State.QUEUED: 'B:', - backup.State.ACTIVE: 'B!', -} - -traynames_errors={ - # The first one should never be used - backup.Errors.OK: traynames_ok[backup.State.INACTIVE], - backup.Errors.BUSY: 'B⦙', - backup.Errors.OFFLINE: 'B⦙', - backup.Errors.ERRORS: 'B?' -} - -def trayname(ste): - state=ste[0] - errors=ste[1] - if not errors.ok(): - return traynames_errors[errors] - else: - return traynames_ok[state] - -def combine_state(a, b): - return (max(a[0], b[0]), max(a[1], b[1])) - -# Refresh the menu at most once a second to reduce flicker -refresh_interval=1.0 - -# Workaround to rumps brokenness; -# see https://github.com/jaredks/rumps/issues/59 -def notification_workaround(title, subtitle, message): - try: - NSDictionary = objc.lookUpClass("NSDictionary") - d=NSDictionary() - - rumps.notification(title, subtitle, message, data=d) - except Exception as err: - logger.exception("Failed to display notification") - -# Based on code snatched from -# https://stackoverflow.com/questions/12523586/python-format-size-application-converting-b-to-kb-mb-gb-tb/37423778 -def humanbytes(B): - 'Return the given bytes as a human friendly KB, MB, GB, or TB string' - B = float(B) - KB = float(1024) - MB = float(KB ** 2) # 1,048,576 - GB = float(KB ** 3) # 1,073,741,824 - TB = float(KB ** 4) # 1,099,511,627,776 - - if B < KB: - return '{0}B'.format(B) - elif KB <= B < MB: - return '{0:.2f}KB'.format(B/KB) - elif MB <= B < GB: - return '{0:.2f}MB'.format(B/MB) - elif GB <= B < TB: - return '{0:.2f}GB'.format(B/GB) - elif TB <= B: - return '{0:.2f}TB'.format(B/TB) - -def make_title(status): - def add_info(info, new): - if info: - return "%s; %s" % (info, new) - else: - return new - - info=None - this_need_reconstruct=None - - if not status.errors.ok(): - info=add_info(info, str(status.errors)) - - if status.state==backup.State.SCHEDULED: - # Operation scheduled - when=status.when() - now=time.time() - - if when<now: - whenstr='overdue' - info='' - else: - tnow=datetime.datetime.fromtimestamp(now) - twhen=datetime.datetime.fromtimestamp(when) - tendtoday=twhen.replace(hour=23,minute=59,second=59) - tendtomorrow=tendtoday+datetime.timedelta(days=1) - diff=datetime.timedelta(seconds=when-now) - - if twhen>tendtomorrow: - whenday=datetime.date.fromtimestamp(when) - whenstr='on %s' % twhen.date().isoformat() - this_need_reconstruct=tendtoday+datetime.timedelta(seconds=1) - elif diff.seconds>=12*60*60: # 12 hours - whenstr='tomorrow' - this_need_reconstruct=twhen-datetime.timedelta(hours=12) - else: - twhen=time.localtime(when) - if twhen.tm_sec>30: - # Round up minute display to avoid user confusion - twhen=time.localtime(when+30) - whenstr='at %02d:%02d' % (twhen.tm_hour, twhen.tm_min) - - this_info='' - if 'reason' in status.detail: - this_info=status.detail['reason'] + ' ' - - when_how_sched= "%s%s %s" % (this_info, status.operation, whenstr) - - info=add_info(info, when_how_sched) - - elif status.state==backup.State.QUEUED: - info=add_info(info, "queued") - elif status.state==backup.State.ACTIVE: - # Operation running - progress='' - d=status.detail - if 'progress_current' in d and 'progress_total' in d: - progress=' %d%%' % (d['progress_current']/d['progress_total']) - elif 'original_size' in d and 'deduplicated_size' in d: - progress=' %s→%s' % (humanbytes(d['original_size']), - humanbytes(d['deduplicated_size'])) - - howrunning = "running %s%s" % (status.operation, progress) - - info=add_info(info, howrunning) - else: - pass - - if info: - title=status.name + ' (' + info + ')' - else: - title=status.name - - return title, (status.state, status.errors), this_need_reconstruct - -class BorgendTray(rumps.App): - def __init__(self, backups): - self.lock=Lock() - self.backups=backups - self.refresh_timer=None - self.refresh_timer_time=None - self.statuses=[None]*len(backups) - - for index in range(len(backups)): - self.statuses[index]=backups[index].status() - - menu, title=self.build_menu_and_timer() - - super().__init__(title, menu=menu, quit_button=None) - - for index in range(len(backups)): - # Python closures suck dog's balls; hence the _index=index hack - # See also http://math.andrej.com/2009/04/09/pythons-lambda-is-broken/ - cb=(lambda status, errors=None, _index=index: - self.__status_callback(_index, status, errorlog=errors)) - backups[index].set_status_update_callback(cb) - - dreamtime.add_callback(self, self.refresh_ui) - - def __rebuild_menu(self): - menu=[] - state=(backup.State.INACTIVE, backup.Errors.OK) - need_reconstruct=None - for index in range(len(self.backups)): - b=self.backups[index] - title, this_state, this_need_reconstruct=make_title(self.statuses[index]) - # Python closures suck dog's balls... - # first and the last program I write in Python until somebody - # fixes this brain damage - cbm=lambda sender, _b=b: self.__menu_select_backup(sender, _b) - item=rumps.MenuItem(title, callback=cbm) - if not this_state[1].ok(): - item.state=-1 - elif this_state[0]==backup.State.SCHEDULED or this_state[0]==backup.State.QUEUED: - item.state=1 - menu.append(item) - state=combine_state(state, this_state) - - # Do we have to automatically update menu display? - if not need_reconstruct: - need_reconstruct=this_need_reconstruct - elif this_need_reconstruct: - need_reconstruct=min(need_reconstruct, this_need_reconstruct) - - menu_log=rumps.MenuItem("Show log", callback=lambda _: showlog()) - menu.append(menu_log) - - if not settings['no_quit_menu_entry']: - menu_quit=rumps.MenuItem("Quit...", callback=lambda _: self.quit()) - menu.append(menu_quit) - - return menu, state, need_reconstruct - - def build_menu_and_timer(self): - if self.refresh_timer: - self.refresh_timer.cancel() - self.refresh_timer=None - self.refresh_timer_time=None - logger.debug('Rebuilding menu') - menu, state, need_reconstruct=self.__rebuild_menu() - title=trayname(state) - - if need_reconstruct: - when=time.mktime(need_reconstruct.timetuple()) - delay=when-time.time() - self.refresh_timer=Timer(delay, self.refresh_ui) - self.refresh_timer_time=need_reconstruct - self.refresh_timer.start() - - return menu, title - - def refresh_ui(self): - with self.lock: - menu, title=self.build_menu_and_timer() - self.menu.clear() - self.menu.update(menu) - self.title=title - - def __status_callback(self, index, status, errorlog=None): - logger.debug("Tray status callback") - with self.lock: - self.statuses[index]=status - # Time the refresh if it has not been timed, or if the timer - # is timing for the "long-term" (refresh_timer_time set) - if not self.refresh_timer or self.refresh_timer_time: - logger.debug("Timing refresh") - self.refresh_timer=Timer(refresh_interval, self.refresh_ui) - # refresh_timer_time is only set for "long-term timers" - self.refresh_timer_time=None - self.refresh_timer.start() - - if errorlog: - if 'msgid' not in errorlog or not isinstance(errorlog['msgid'], str): - msgid='UnknownError' - else: - msgid=errorlog['msgid'] - - logger.debug("Opening notification for error %s '%s'", - msgid, errorlog['message']) - - notification_workaround(branding.appname_stylised, - msgid, errorlog['message']) - - def quit(self): - rumps.quit_application() - - def __menu_select_backup(self, sender, b): - #sender.state=not sender.state - logger.debug("Manually backup '%s'", b.name) - try: - b.create() - except Exception as err: - logger.exception("Failure to initialise backup") - notification_workaround(branding.appname_stylised, - err.__class__.__name__, str(err)) - -# -# Log window -# - -logwindow=[None] -logwindow_lock=Lock() - -def showlog(): - try: - w=None - with logwindow_lock: - if not logwindow[0]: - lines=borgend.fifolog.formatAll() - msg="\n".join(lines[0:]) - w=rumps.Window(title=borgend.appname_stylised+' log', - default_text=msg, - ok='Close', - dimensions=(640,320)) - logwindow[0]=w - if w: - try: - w.run() - finally: - with logwindow_lock: - logwindow[0]=None - except Exception as err: - logger.exception("Failed to display log") - -# -# Notification click response => show log window -# - -@rumps.notifications -def notification_center(_): - showlog() -