Tue, 23 Jan 2018 22:55:00 +0000
merge
# # Repository abstraction for queuing # import weakref from scheduler import QueueThread, QueuedEvent class FIFOEvent(QueuedEvent): def __init__(self, cond, name=None): self._goodtogo=False super().__init__(cond, name=name) def __lt__(self, other): return True class FIFO(QueueThread): def __init__(self, **kwargs): super().__init__(target = self._fifo_thread, **kwargs) def _fifo_thread(self): with self._cond: while not self._terminate: ev=self._list if ev: # We can only remove ev from the list when ev.cond allows with ev.cond: ev._goodtogo=True ev.cond.notifyAll() self._cond.wait() # Termination cleanup ev=self._list while ev: # We can only remove ev from the list when ev.cond allows with ev.cond: ev.cond.notifyAll() ev=ev.next # cond has to be acquired on entry! def queue_action(self, cond, action=lambda: (), name=None): ev=FIFOEvent(cond, name=name) with self._cond: self._insert(ev) goodtogo=False terminate_=False while not goodtogo and not terminate_: # This will release the lock on cond, allowing queue manager (scheduler) # thread to notify us if we are already to be released ev.cond.wait() with ev.cond: goodtogo=ev._goodtogo with self._cond: terminate_=self._terminate try: if not terminate_: action() finally: with self._cond: self._unlink(ev) # Let _fifo_thread proceed to next action self._cond.notify() class Repository(FIFO): def __init__(self, name): super().__init__(name = 'RepositoryThread %s' % name) self.repository_name=name # TODO: Should use weak references but they give KeyError repositories=weakref.WeakValueDictionary() def get_controller(name): if name in repositories: repo = repositories[name] else: repo = Repository(name) repo.start() repositories[name] = repo return repo