--- a/repository.py Mon Jan 22 21:07:34 2018 +0000 +++ b/repository.py Mon Jan 22 22:23:01 2018 +0000 @@ -2,13 +2,82 @@ # Repository abstraction for queuing # -from scheduler import TerminableThread, QueuedEvent +import weakref +from scheduler import QueueThread, QueuedEvent + +class FIFOEvent(QueuedEvent): + def __init__(self, cond, name=None): + self._goodtogo=False + super().__init__(cond, name=name) + + def __lt__(self, other): + return True + +class FIFO(QueueThread): + def __init__(self, **kwargs): + super().__init__(target = self._fifo_thread, **kwargs) + + def _fifo_thread(self): + with self._cond: + while not self._terminate: + ev=self._list + if ev: + # We can only remove ev from the list when ev.cond allows + with ev.cond: + ev._goodtogo=True + ev.cond.notifyAll() + self._cond.wait() + + # Termination cleanup + ev=self._list + while ev: + # We can only remove ev from the list when ev.cond allows + with ev.cond: + ev.cond.notifyAll() + ev=ev.next + + # cond has to be acquired on entry! + def queue_action(self, cond, action=lambda: (), name=None): + ev=FIFOEvent(cond, name=name) -class Repository(TerminableThread): - def __init__(self) - self.__next_event_time = None - self.__list = None - self._cond = Condition() - self._terminate = False - super().__init__(target = self.__scheduler_thread, name = 'Scheduler') - self.daemon=True + with self._cond: + self._insert(ev) + + goodtogo=False + terminate_=False + while not goodtogo and not terminate_: + # This will release the lock on cond, allowing queue manager (scheduler) + # thread to notify us if we are already to be released + ev.cond.wait() + with ev.cond: + goodtogo=ev._goodtogo + with self._cond: + terminate_=self._terminate + + try: + if not terminate_: + action() + finally: + with self._cond: + self._unlink(ev) + # Let _fifo_thread proceed to next action + self._cond.notify() + +class Repository(FIFO): + def __init__(self, name): + super().__init__(name = 'RepositoryThread %s' % name) + self.repository_name=name + + +# TODO: Should use weak references but they give KeyError +repositories=weakref.WeakValueDictionary() + +def get_controller(name): + if name in repositories: + repo = repositories[name] + else: + repo = Repository(name) + repo.start() + repositories[name] = repo + return repo +