Thu, 25 Jan 2018 22:34:55 +0000
Repository queuing fixes
# # Repository abstraction for queuing # import weakref import borgend from scheduler import QueueThread, QueuedEvent logger=borgend.logger.getChild(__name__) class FIFOEvent(QueuedEvent): def __init__(self, cond, name=None): self._goodtogo=False super().__init__(cond, name=name) def __lt__(self, other): return False class FIFO(QueueThread): def __init__(self, **kwargs): super().__init__(target = self._fifo_thread, **kwargs) def _fifo_thread(self): with self._cond: while not self._terminate: ev=self._list if ev: # We can only remove ev from the list when ev.cond allows with ev.cond: if not ev._goodtogo: ev._goodtogo=True ev.cond.notifyAll() self._cond.wait() # Termination cleanup ev=self._list while ev: # We can only remove ev from the list when ev.cond allows with ev.cond: ev.cond.notifyAll() ev=ev.next # cond has to be acquired on entry! def queue_action(self, cond, action=lambda: (), name=None): ev=FIFOEvent(cond, name=name) with self._cond: self._insert(ev) # This will release the lock on cond, allowing queue manager (scheduler) # thread to notify us if we are already to be released logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') ev.cond.wait() try: if ev._goodtogo: logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') # # TODO: action() has to unlink on finish; so should maybe # use weak references to event. # Or we have to make action take all the time, so make the # stdout thread. # OR: Easiest to just move finish-waiting into __launch_check # instead of at the outer level of the main loop. # action() finally: with self._cond: self._unlink(ev) # Let _fifo_thread proceed to next action self._cond.notify() return ev._goodtogo class Repository(FIFO): def __init__(self, name): super().__init__(name = 'RepositoryThread %s' % name) self.repository_name=name # TODO: Should use weak references but they give KeyError repositories=weakref.WeakValueDictionary() def get_controller(name): if name in repositories: repo = repositories[name] else: repo = Repository(name) repo.start() repositories[name] = repo return repo