Basic repository queue

Mon, 22 Jan 2018 22:23:01 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Mon, 22 Jan 2018 22:23:01 +0000
changeset 54
cfcaa5f6ba33
parent 53
442c558bd632
child 55
407af23d16bb
child 56
d63f6e9a4633

Basic repository queue

backup.py file | annotate | diff | comparison | revisions
repository.py file | annotate | diff | comparison | revisions
scheduler.py file | annotate | diff | comparison | revisions
--- a/backup.py	Mon Jan 22 21:07:34 2018 +0000
+++ b/backup.py	Mon Jan 22 22:23:01 2018 +0000
@@ -7,6 +7,7 @@
 import time
 import keyring
 import borgend
+import repository
 from instance import BorgInstance
 from threading import Thread, Lock, Condition
 from scheduler import TerminableThread
@@ -55,8 +56,10 @@
 
         self.loc='backup target "%s"' % self._name
 
-        self.repository=config.check_string(cfg, 'repository',
-                                            'Target repository', self.loc)
+        reponame=config.check_string(cfg, 'repository',
+                                     'Target repository', self.loc)
+
+        self.repository=repository.get_controller(reponame)
 
         self.archive_prefix=config.check_string(cfg, 'archive_prefix',
                                                 'Archive prefix', self.loc)
@@ -296,7 +299,7 @@
                 logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name))
 
                 if op['operation']=='create':
-                    archive="%s::%s%s" % (self.repository,
+                    archive="%s::%s%s" % (self.repository.repository_name,
                                           self.archive_prefix,
                                           self.archive_template)
 
@@ -304,7 +307,7 @@
                                      self.common_parameters+self.create_parameters,
                                      self.paths)
                 elif op['operation']=='prune':
-                    self.__do_launch(op, self.repository,
+                    self.__do_launch(op, self.repository.repository_name,
                                      ([{'prefix': self.archive_prefix}] + 
                                       self.common_parameters +
                                       self.prune_parameters))
@@ -388,8 +391,8 @@
                 if self.scheduled_operation:
                     op=self.scheduled_operation
                     self.scheduled_operation=None
-                    self.__launch(op)
-
+                    self.repository.queue_action(self._cond, name=self._name,
+                                                 action=lambda: self.__launch(op))
             # Kill a running borg to cause log and result threads to terminate
             if self.borg_instance:
                 logger.debug("Terminating a borg instance")
--- a/repository.py	Mon Jan 22 21:07:34 2018 +0000
+++ b/repository.py	Mon Jan 22 22:23:01 2018 +0000
@@ -2,13 +2,82 @@
 # Repository abstraction for queuing
 #
 
-from scheduler import TerminableThread, QueuedEvent
+import weakref
+from scheduler import QueueThread, QueuedEvent
+
+class FIFOEvent(QueuedEvent):
+    def __init__(self, cond, name=None):
+        self._goodtogo=False
+        super().__init__(cond, name=name)
+
+    def __lt__(self, other):
+        return True
+
+class FIFO(QueueThread):
+    def __init__(self, **kwargs):
+        super().__init__(target = self._fifo_thread, **kwargs)
+
+    def _fifo_thread(self):
+        with self._cond:
+            while not self._terminate:
+                ev=self._list
+                if ev:
+                    # We can only remove ev from the list when ev.cond allows
+                    with ev.cond:
+                        ev._goodtogo=True
+                        ev.cond.notifyAll()
+                self._cond.wait()
+
+            # Termination cleanup
+            ev=self._list
+            while ev:
+                # We can only remove ev from the list when ev.cond allows
+                with ev.cond:
+                    ev.cond.notifyAll()
+                    ev=ev.next
+
+    # cond has to be acquired on entry!
+    def queue_action(self, cond, action=lambda: (), name=None):
+        ev=FIFOEvent(cond, name=name)
 
-class Repository(TerminableThread):
-    def __init__(self)
-        self.__next_event_time = None
-        self.__list = None
-        self._cond = Condition()
-        self._terminate = False
-        super().__init__(target = self.__scheduler_thread, name = 'Scheduler')
-        self.daemon=True
+        with self._cond:
+            self._insert(ev)
+
+        goodtogo=False
+        terminate_=False
+        while not goodtogo and not terminate_:
+            # This will release the lock on cond, allowing queue manager (scheduler)
+            # thread to notify us if we are already to be released
+            ev.cond.wait()
+            with ev.cond:
+                goodtogo=ev._goodtogo
+            with self._cond:
+                terminate_=self._terminate
+
+        try:
+            if not terminate_:
+                action()
+        finally:
+            with self._cond:
+                self._unlink(ev)
+                # Let _fifo_thread proceed to next action
+                self._cond.notify()
+
+class Repository(FIFO):
+    def __init__(self, name):
+        super().__init__(name = 'RepositoryThread %s' % name)
+        self.repository_name=name
+
+
+# TODO: Should use weak references but they give KeyError
+repositories=weakref.WeakValueDictionary()
+
+def get_controller(name):
+    if name in repositories:
+        repo = repositories[name]
+    else:
+        repo = Repository(name)
+        repo.start()
+        repositories[name] = repo
+    return repo
+
--- a/scheduler.py	Mon Jan 22 21:07:34 2018 +0000
+++ b/scheduler.py	Mon Jan 22 22:23:01 2018 +0000
@@ -18,17 +18,14 @@
         self.cond=cond
 
     def __lt__(self, other):
-        return False
-
-    def __gt__(self, other):
-        return True
+        raise NotImplementedError
 
     def insert_after(self, ev):
         if not self.next:
             ev.prev=self
             self.next=ev
             ev.next=None
-        elif self.next>ev:
+        elif ev<self.next:
             self.insert_immediately_after(ev)
         else:
             self.next.insert_after(ev)
@@ -50,15 +47,12 @@
 
 class ScheduledEvent(QueuedEvent):
     def __init__(self, when, cond, name=None):
-        super().__init__(cond, name)
+        super().__init__(cond, name=name)
         self.when=when
 
     def __lt__(self, other):
         return self.when < other.when
 
-    def __gt__(self, other):
-        return self.when > other.when
-
 class TerminableThread(Thread):
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
@@ -72,13 +66,14 @@
 
 class QueueThread(TerminableThread):
     def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.daemon = True
         self._list = None
-        super().__init__(*args, **kwargs)
 
     def _insert(self, ev):
         if not self._list:
             self._list=ev
-        elif self._list > ev:
+        elif ev<self._list:
             ev.insert_immediately_after(self._list)
             self._list=ev
         else:
@@ -86,28 +81,10 @@
 
         self._cond.notify()
 
-    def _wait(self, ev):
-        with self._cond:
-            self._insert(ev)
-        # This will release the lock on cond, allowing queue manager (scheduler)
-        # thread to notify us if we are already to be released
-        ev.cond.wait()
-
-        # If we were woken up by some other event, not the scheduler,
-        # ensure the event is removed
-        with self._cond:
-            if ev==self._list:
-                self._list=ev.next
-            ev.unlink()
-
-    def _pop(self):
-        ev=self._list
-        logger.info("%s: Found event %s" % self.name(), str(ev.name))
-        self._list=ev.next
+    def _unlink(self, ev):
+        if ev==self._list:
+            self._list=ev.next
         ev.unlink()
-        ev.cond.acquire()
-        ev.cond.notifyAll()
-        ev.cond.release()
 
 
 class Scheduler(QueueThread):
@@ -117,11 +94,9 @@
     def __init__(self, precision=60):
         self.precision = precision
         self._next_event_time = None
-        self._list = None
-        super().__init__(target = self.__scheduler_thread, name = 'Scheduler')
-        self.daemon=True
+        super().__init__(target = self._scheduler_thread, name = 'Scheduler')
 
-    def __scheduler_thread(self):
+    def _scheduler_thread(self):
         with self._cond:
             while not self._terminate:
                 now = time.monotonic()
@@ -137,7 +112,26 @@
                     now = time.monotonic()
 
                 while self._list and self._list.when <= now:
-                    self._pop()
+                    ev=self._list
+                    logger.info("%s: Scheduling event %s" % self.name(), str(ev.name))
+                    # We are only allowed to remove ev from list when ev.cond allows
+                    with ev.cond:
+                        self._list=ev.next
+                        ev.unlink()
+                        ev.cond.notifyAll()
+
+    def _wait(self, ev):
+        with self._cond:
+            self._insert(ev)
+
+        # This will release the lock on cond, allowing queue manager (scheduler)
+        # thread to notify us if we are already to be released
+        ev.cond.wait()
+
+        # If we were woken up by some other event, not the scheduler,
+        # ensure the event is removed
+        with self._cond:
+            self._unlink(ev)
 
     # cond has to be acquired on entry!
     def wait_until(self, when, cond, name=None):

mercurial