Mon, 22 Jan 2018 22:23:01 +0000
Basic repository queue
backup.py | file | annotate | diff | comparison | revisions | |
repository.py | file | annotate | diff | comparison | revisions | |
scheduler.py | file | annotate | diff | comparison | revisions |
--- a/backup.py Mon Jan 22 21:07:34 2018 +0000 +++ b/backup.py Mon Jan 22 22:23:01 2018 +0000 @@ -7,6 +7,7 @@ import time import keyring import borgend +import repository from instance import BorgInstance from threading import Thread, Lock, Condition from scheduler import TerminableThread @@ -55,8 +56,10 @@ self.loc='backup target "%s"' % self._name - self.repository=config.check_string(cfg, 'repository', - 'Target repository', self.loc) + reponame=config.check_string(cfg, 'repository', + 'Target repository', self.loc) + + self.repository=repository.get_controller(reponame) self.archive_prefix=config.check_string(cfg, 'archive_prefix', 'Archive prefix', self.loc) @@ -296,7 +299,7 @@ logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) if op['operation']=='create': - archive="%s::%s%s" % (self.repository, + archive="%s::%s%s" % (self.repository.repository_name, self.archive_prefix, self.archive_template) @@ -304,7 +307,7 @@ self.common_parameters+self.create_parameters, self.paths) elif op['operation']=='prune': - self.__do_launch(op, self.repository, + self.__do_launch(op, self.repository.repository_name, ([{'prefix': self.archive_prefix}] + self.common_parameters + self.prune_parameters)) @@ -388,8 +391,8 @@ if self.scheduled_operation: op=self.scheduled_operation self.scheduled_operation=None - self.__launch(op) - + self.repository.queue_action(self._cond, name=self._name, + action=lambda: self.__launch(op)) # Kill a running borg to cause log and result threads to terminate if self.borg_instance: logger.debug("Terminating a borg instance")
--- a/repository.py Mon Jan 22 21:07:34 2018 +0000 +++ b/repository.py Mon Jan 22 22:23:01 2018 +0000 @@ -2,13 +2,82 @@ # Repository abstraction for queuing # -from scheduler import TerminableThread, QueuedEvent +import weakref +from scheduler import QueueThread, QueuedEvent + +class FIFOEvent(QueuedEvent): + def __init__(self, cond, name=None): + self._goodtogo=False + super().__init__(cond, name=name) + + def __lt__(self, other): + return True + +class FIFO(QueueThread): + def __init__(self, **kwargs): + super().__init__(target = self._fifo_thread, **kwargs) + + def _fifo_thread(self): + with self._cond: + while not self._terminate: + ev=self._list + if ev: + # We can only remove ev from the list when ev.cond allows + with ev.cond: + ev._goodtogo=True + ev.cond.notifyAll() + self._cond.wait() + + # Termination cleanup + ev=self._list + while ev: + # We can only remove ev from the list when ev.cond allows + with ev.cond: + ev.cond.notifyAll() + ev=ev.next + + # cond has to be acquired on entry! + def queue_action(self, cond, action=lambda: (), name=None): + ev=FIFOEvent(cond, name=name) -class Repository(TerminableThread): - def __init__(self) - self.__next_event_time = None - self.__list = None - self._cond = Condition() - self._terminate = False - super().__init__(target = self.__scheduler_thread, name = 'Scheduler') - self.daemon=True + with self._cond: + self._insert(ev) + + goodtogo=False + terminate_=False + while not goodtogo and not terminate_: + # This will release the lock on cond, allowing queue manager (scheduler) + # thread to notify us if we are already to be released + ev.cond.wait() + with ev.cond: + goodtogo=ev._goodtogo + with self._cond: + terminate_=self._terminate + + try: + if not terminate_: + action() + finally: + with self._cond: + self._unlink(ev) + # Let _fifo_thread proceed to next action + self._cond.notify() + +class Repository(FIFO): + def __init__(self, name): + super().__init__(name = 'RepositoryThread %s' % name) + self.repository_name=name + + +# TODO: Should use weak references but they give KeyError +repositories=weakref.WeakValueDictionary() + +def get_controller(name): + if name in repositories: + repo = repositories[name] + else: + repo = Repository(name) + repo.start() + repositories[name] = repo + return repo +
--- a/scheduler.py Mon Jan 22 21:07:34 2018 +0000 +++ b/scheduler.py Mon Jan 22 22:23:01 2018 +0000 @@ -18,17 +18,14 @@ self.cond=cond def __lt__(self, other): - return False - - def __gt__(self, other): - return True + raise NotImplementedError def insert_after(self, ev): if not self.next: ev.prev=self self.next=ev ev.next=None - elif self.next>ev: + elif ev<self.next: self.insert_immediately_after(ev) else: self.next.insert_after(ev) @@ -50,15 +47,12 @@ class ScheduledEvent(QueuedEvent): def __init__(self, when, cond, name=None): - super().__init__(cond, name) + super().__init__(cond, name=name) self.when=when def __lt__(self, other): return self.when < other.when - def __gt__(self, other): - return self.when > other.when - class TerminableThread(Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -72,13 +66,14 @@ class QueueThread(TerminableThread): def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.daemon = True self._list = None - super().__init__(*args, **kwargs) def _insert(self, ev): if not self._list: self._list=ev - elif self._list > ev: + elif ev<self._list: ev.insert_immediately_after(self._list) self._list=ev else: @@ -86,28 +81,10 @@ self._cond.notify() - def _wait(self, ev): - with self._cond: - self._insert(ev) - # This will release the lock on cond, allowing queue manager (scheduler) - # thread to notify us if we are already to be released - ev.cond.wait() - - # If we were woken up by some other event, not the scheduler, - # ensure the event is removed - with self._cond: - if ev==self._list: - self._list=ev.next - ev.unlink() - - def _pop(self): - ev=self._list - logger.info("%s: Found event %s" % self.name(), str(ev.name)) - self._list=ev.next + def _unlink(self, ev): + if ev==self._list: + self._list=ev.next ev.unlink() - ev.cond.acquire() - ev.cond.notifyAll() - ev.cond.release() class Scheduler(QueueThread): @@ -117,11 +94,9 @@ def __init__(self, precision=60): self.precision = precision self._next_event_time = None - self._list = None - super().__init__(target = self.__scheduler_thread, name = 'Scheduler') - self.daemon=True + super().__init__(target = self._scheduler_thread, name = 'Scheduler') - def __scheduler_thread(self): + def _scheduler_thread(self): with self._cond: while not self._terminate: now = time.monotonic() @@ -137,7 +112,26 @@ now = time.monotonic() while self._list and self._list.when <= now: - self._pop() + ev=self._list + logger.info("%s: Scheduling event %s" % self.name(), str(ev.name)) + # We are only allowed to remove ev from list when ev.cond allows + with ev.cond: + self._list=ev.next + ev.unlink() + ev.cond.notifyAll() + + def _wait(self, ev): + with self._cond: + self._insert(ev) + + # This will release the lock on cond, allowing queue manager (scheduler) + # thread to notify us if we are already to be released + ev.cond.wait() + + # If we were woken up by some other event, not the scheduler, + # ensure the event is removed + with self._cond: + self._unlink(ev) # cond has to be acquired on entry! def wait_until(self, when, cond, name=None):