| 1 # |
1 # |
| 2 # Repository abstraction for queuing |
2 # Repository abstraction for queuing |
| 3 # |
3 # |
| 4 |
4 |
| 5 from scheduler import TerminableThread, QueuedEvent |
5 import weakref |
| |
6 from scheduler import QueueThread, QueuedEvent |
| 6 |
7 |
| 7 class Repository(TerminableThread): |
8 class FIFOEvent(QueuedEvent): |
| 8 def __init__(self) |
9 def __init__(self, cond, name=None): |
| 9 self.__next_event_time = None |
10 self._goodtogo=False |
| 10 self.__list = None |
11 super().__init__(cond, name=name) |
| 11 self._cond = Condition() |
12 |
| 12 self._terminate = False |
13 def __lt__(self, other): |
| 13 super().__init__(target = self.__scheduler_thread, name = 'Scheduler') |
14 return True |
| 14 self.daemon=True |
15 |
| |
16 class FIFO(QueueThread): |
| |
17 def __init__(self, **kwargs): |
| |
18 super().__init__(target = self._fifo_thread, **kwargs) |
| |
19 |
| |
20 def _fifo_thread(self): |
| |
21 with self._cond: |
| |
22 while not self._terminate: |
| |
23 ev=self._list |
| |
24 if ev: |
| |
25 # We can only remove ev from the list when ev.cond allows |
| |
26 with ev.cond: |
| |
27 ev._goodtogo=True |
| |
28 ev.cond.notifyAll() |
| |
29 self._cond.wait() |
| |
30 |
| |
31 # Termination cleanup |
| |
32 ev=self._list |
| |
33 while ev: |
| |
34 # We can only remove ev from the list when ev.cond allows |
| |
35 with ev.cond: |
| |
36 ev.cond.notifyAll() |
| |
37 ev=ev.next |
| |
38 |
| |
39 # cond has to be acquired on entry! |
| |
40 def queue_action(self, cond, action=lambda: (), name=None): |
| |
41 ev=FIFOEvent(cond, name=name) |
| |
42 |
| |
43 with self._cond: |
| |
44 self._insert(ev) |
| |
45 |
| |
46 goodtogo=False |
| |
47 terminate_=False |
| |
48 while not goodtogo and not terminate_: |
| |
49 # This will release the lock on cond, allowing queue manager (scheduler) |
| |
50 # thread to notify us if we are already to be released |
| |
51 ev.cond.wait() |
| |
52 with ev.cond: |
| |
53 goodtogo=ev._goodtogo |
| |
54 with self._cond: |
| |
55 terminate_=self._terminate |
| |
56 |
| |
57 try: |
| |
58 if not terminate_: |
| |
59 action() |
| |
60 finally: |
| |
61 with self._cond: |
| |
62 self._unlink(ev) |
| |
63 # Let _fifo_thread proceed to next action |
| |
64 self._cond.notify() |
| |
65 |
| |
66 class Repository(FIFO): |
| |
67 def __init__(self, name): |
| |
68 super().__init__(name = 'RepositoryThread %s' % name) |
| |
69 self.repository_name=name |
| |
70 |
| |
71 |
| |
72 # TODO: Should use weak references but they give KeyError |
| |
73 repositories=weakref.WeakValueDictionary() |
| |
74 |
| |
75 def get_controller(name): |
| |
76 if name in repositories: |
| |
77 repo = repositories[name] |
| |
78 else: |
| |
79 repo = Repository(name) |
| |
80 repo.start() |
| |
81 repositories[name] = repo |
| |
82 return repo |
| |
83 |