# HG changeset patch # User Tuomo Valkonen # Date 1516659781 0 # Node ID cfcaa5f6ba3336966b1bbdae891e1b8c72ec4af8 # Parent 442c558bd632adbbaf4d3edd07b58074a5ad6ae1 Basic repository queue diff -r 442c558bd632 -r cfcaa5f6ba33 backup.py --- a/backup.py Mon Jan 22 21:07:34 2018 +0000 +++ b/backup.py Mon Jan 22 22:23:01 2018 +0000 @@ -7,6 +7,7 @@ import time import keyring import borgend +import repository from instance import BorgInstance from threading import Thread, Lock, Condition from scheduler import TerminableThread @@ -55,8 +56,10 @@ self.loc='backup target "%s"' % self._name - self.repository=config.check_string(cfg, 'repository', - 'Target repository', self.loc) + reponame=config.check_string(cfg, 'repository', + 'Target repository', self.loc) + + self.repository=repository.get_controller(reponame) self.archive_prefix=config.check_string(cfg, 'archive_prefix', 'Archive prefix', self.loc) @@ -296,7 +299,7 @@ logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) if op['operation']=='create': - archive="%s::%s%s" % (self.repository, + archive="%s::%s%s" % (self.repository.repository_name, self.archive_prefix, self.archive_template) @@ -304,7 +307,7 @@ self.common_parameters+self.create_parameters, self.paths) elif op['operation']=='prune': - self.__do_launch(op, self.repository, + self.__do_launch(op, self.repository.repository_name, ([{'prefix': self.archive_prefix}] + self.common_parameters + self.prune_parameters)) @@ -388,8 +391,8 @@ if self.scheduled_operation: op=self.scheduled_operation self.scheduled_operation=None - self.__launch(op) - + self.repository.queue_action(self._cond, name=self._name, + action=lambda: self.__launch(op)) # Kill a running borg to cause log and result threads to terminate if self.borg_instance: logger.debug("Terminating a borg instance") diff -r 442c558bd632 -r cfcaa5f6ba33 repository.py --- a/repository.py Mon Jan 22 21:07:34 2018 +0000 +++ b/repository.py Mon Jan 22 22:23:01 2018 +0000 @@ -2,13 +2,82 @@ # Repository abstraction for queuing # -from scheduler import TerminableThread, QueuedEvent +import weakref +from scheduler import QueueThread, QueuedEvent + +class FIFOEvent(QueuedEvent): + def __init__(self, cond, name=None): + self._goodtogo=False + super().__init__(cond, name=name) + + def __lt__(self, other): + return True + +class FIFO(QueueThread): + def __init__(self, **kwargs): + super().__init__(target = self._fifo_thread, **kwargs) + + def _fifo_thread(self): + with self._cond: + while not self._terminate: + ev=self._list + if ev: + # We can only remove ev from the list when ev.cond allows + with ev.cond: + ev._goodtogo=True + ev.cond.notifyAll() + self._cond.wait() + + # Termination cleanup + ev=self._list + while ev: + # We can only remove ev from the list when ev.cond allows + with ev.cond: + ev.cond.notifyAll() + ev=ev.next + + # cond has to be acquired on entry! + def queue_action(self, cond, action=lambda: (), name=None): + ev=FIFOEvent(cond, name=name) -class Repository(TerminableThread): - def __init__(self) - self.__next_event_time = None - self.__list = None - self._cond = Condition() - self._terminate = False - super().__init__(target = self.__scheduler_thread, name = 'Scheduler') - self.daemon=True + with self._cond: + self._insert(ev) + + goodtogo=False + terminate_=False + while not goodtogo and not terminate_: + # This will release the lock on cond, allowing queue manager (scheduler) + # thread to notify us if we are already to be released + ev.cond.wait() + with ev.cond: + goodtogo=ev._goodtogo + with self._cond: + terminate_=self._terminate + + try: + if not terminate_: + action() + finally: + with self._cond: + self._unlink(ev) + # Let _fifo_thread proceed to next action + self._cond.notify() + +class Repository(FIFO): + def __init__(self, name): + super().__init__(name = 'RepositoryThread %s' % name) + self.repository_name=name + + +# TODO: Should use weak references but they give KeyError +repositories=weakref.WeakValueDictionary() + +def get_controller(name): + if name in repositories: + repo = repositories[name] + else: + repo = Repository(name) + repo.start() + repositories[name] = repo + return repo + diff -r 442c558bd632 -r cfcaa5f6ba33 scheduler.py --- a/scheduler.py Mon Jan 22 21:07:34 2018 +0000 +++ b/scheduler.py Mon Jan 22 22:23:01 2018 +0000 @@ -18,17 +18,14 @@ self.cond=cond def __lt__(self, other): - return False - - def __gt__(self, other): - return True + raise NotImplementedError def insert_after(self, ev): if not self.next: ev.prev=self self.next=ev ev.next=None - elif self.next>ev: + elif ev other.when - class TerminableThread(Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -72,13 +66,14 @@ class QueueThread(TerminableThread): def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.daemon = True self._list = None - super().__init__(*args, **kwargs) def _insert(self, ev): if not self._list: self._list=ev - elif self._list > ev: + elif ev