| 1 # | 1 # | 
| 2 # Repository abstraction for queuing | 2 # Repository abstraction for queuing | 
| 3 # | 3 # | 
| 4 | 4 | 
| 5 from scheduler import TerminableThread, QueuedEvent | 5 import weakref | 
|  | 6 from scheduler import QueueThread, QueuedEvent | 
| 6 | 7 | 
| 7 class Repository(TerminableThread): | 8 class FIFOEvent(QueuedEvent): | 
| 8     def __init__(self) | 9     def __init__(self, cond, name=None): | 
| 9         self.__next_event_time = None | 10         self._goodtogo=False | 
| 10         self.__list = None | 11         super().__init__(cond, name=name) | 
| 11         self._cond = Condition() | 12 | 
| 12         self._terminate = False | 13     def __lt__(self, other): | 
| 13         super().__init__(target = self.__scheduler_thread, name = 'Scheduler') | 14         return True | 
| 14         self.daemon=True | 15 | 
|  | 16 class FIFO(QueueThread): | 
|  | 17     def __init__(self, **kwargs): | 
|  | 18         super().__init__(target = self._fifo_thread, **kwargs) | 
|  | 19 | 
|  | 20     def _fifo_thread(self): | 
|  | 21         with self._cond: | 
|  | 22             while not self._terminate: | 
|  | 23                 ev=self._list | 
|  | 24                 if ev: | 
|  | 25                     # We can only remove ev from the list when ev.cond allows | 
|  | 26                     with ev.cond: | 
|  | 27                         ev._goodtogo=True | 
|  | 28                         ev.cond.notifyAll() | 
|  | 29                 self._cond.wait() | 
|  | 30 | 
|  | 31             # Termination cleanup | 
|  | 32             ev=self._list | 
|  | 33             while ev: | 
|  | 34                 # We can only remove ev from the list when ev.cond allows | 
|  | 35                 with ev.cond: | 
|  | 36                     ev.cond.notifyAll() | 
|  | 37                     ev=ev.next | 
|  | 38 | 
|  | 39     # cond has to be acquired on entry! | 
|  | 40     def queue_action(self, cond, action=lambda: (), name=None): | 
|  | 41         ev=FIFOEvent(cond, name=name) | 
|  | 42 | 
|  | 43         with self._cond: | 
|  | 44             self._insert(ev) | 
|  | 45 | 
|  | 46         goodtogo=False | 
|  | 47         terminate_=False | 
|  | 48         while not goodtogo and not terminate_: | 
|  | 49             # This will release the lock on cond, allowing queue manager (scheduler) | 
|  | 50             # thread to notify us if we are already to be released | 
|  | 51             ev.cond.wait() | 
|  | 52             with ev.cond: | 
|  | 53                 goodtogo=ev._goodtogo | 
|  | 54             with self._cond: | 
|  | 55                 terminate_=self._terminate | 
|  | 56 | 
|  | 57         try: | 
|  | 58             if not terminate_: | 
|  | 59                 action() | 
|  | 60         finally: | 
|  | 61             with self._cond: | 
|  | 62                 self._unlink(ev) | 
|  | 63                 # Let _fifo_thread proceed to next action | 
|  | 64                 self._cond.notify() | 
|  | 65 | 
|  | 66 class Repository(FIFO): | 
|  | 67     def __init__(self, name): | 
|  | 68         super().__init__(name = 'RepositoryThread %s' % name) | 
|  | 69         self.repository_name=name | 
|  | 70 | 
|  | 71 | 
|  | 72 # TODO: Should use weak references but they give KeyError | 
|  | 73 repositories=weakref.WeakValueDictionary() | 
|  | 74 | 
|  | 75 def get_controller(name): | 
|  | 76     if name in repositories: | 
|  | 77         repo = repositories[name] | 
|  | 78     else: | 
|  | 79         repo = Repository(name) | 
|  | 80         repo.start() | 
|  | 81         repositories[name] = repo | 
|  | 82     return repo | 
|  | 83 |