repository.py

changeset 54
cfcaa5f6ba33
parent 53
442c558bd632
child 64
6cfe6a89e810
--- a/repository.py	Mon Jan 22 21:07:34 2018 +0000
+++ b/repository.py	Mon Jan 22 22:23:01 2018 +0000
@@ -2,13 +2,82 @@
 # Repository abstraction for queuing
 #
 
-from scheduler import TerminableThread, QueuedEvent
+import weakref
+from scheduler import QueueThread, QueuedEvent
+
+class FIFOEvent(QueuedEvent):
+    def __init__(self, cond, name=None):
+        self._goodtogo=False
+        super().__init__(cond, name=name)
+
+    def __lt__(self, other):
+        return True
+
+class FIFO(QueueThread):
+    def __init__(self, **kwargs):
+        super().__init__(target = self._fifo_thread, **kwargs)
+
+    def _fifo_thread(self):
+        with self._cond:
+            while not self._terminate:
+                ev=self._list
+                if ev:
+                    # We can only remove ev from the list when ev.cond allows
+                    with ev.cond:
+                        ev._goodtogo=True
+                        ev.cond.notifyAll()
+                self._cond.wait()
+
+            # Termination cleanup
+            ev=self._list
+            while ev:
+                # We can only remove ev from the list when ev.cond allows
+                with ev.cond:
+                    ev.cond.notifyAll()
+                    ev=ev.next
+
+    # cond has to be acquired on entry!
+    def queue_action(self, cond, action=lambda: (), name=None):
+        ev=FIFOEvent(cond, name=name)
 
-class Repository(TerminableThread):
-    def __init__(self)
-        self.__next_event_time = None
-        self.__list = None
-        self._cond = Condition()
-        self._terminate = False
-        super().__init__(target = self.__scheduler_thread, name = 'Scheduler')
-        self.daemon=True
+        with self._cond:
+            self._insert(ev)
+
+        goodtogo=False
+        terminate_=False
+        while not goodtogo and not terminate_:
+            # This will release the lock on cond, allowing queue manager (scheduler)
+            # thread to notify us if we are already to be released
+            ev.cond.wait()
+            with ev.cond:
+                goodtogo=ev._goodtogo
+            with self._cond:
+                terminate_=self._terminate
+
+        try:
+            if not terminate_:
+                action()
+        finally:
+            with self._cond:
+                self._unlink(ev)
+                # Let _fifo_thread proceed to next action
+                self._cond.notify()
+
+class Repository(FIFO):
+    def __init__(self, name):
+        super().__init__(name = 'RepositoryThread %s' % name)
+        self.repository_name=name
+
+
+# TODO: Should use weak references but they give KeyError
+repositories=weakref.WeakValueDictionary()
+
+def get_controller(name):
+    if name in repositories:
+        repo = repositories[name]
+    else:
+        repo = Repository(name)
+        repo.start()
+        repositories[name] = repo
+    return repo
+

mercurial