1 # |
1 # |
2 # Repository abstraction for queuing |
2 # Repository abstraction for queuing |
3 # |
3 # |
4 |
4 |
5 from scheduler import TerminableThread, QueuedEvent |
5 import weakref |
|
6 from scheduler import QueueThread, QueuedEvent |
6 |
7 |
7 class Repository(TerminableThread): |
8 class FIFOEvent(QueuedEvent): |
8 def __init__(self) |
9 def __init__(self, cond, name=None): |
9 self.__next_event_time = None |
10 self._goodtogo=False |
10 self.__list = None |
11 super().__init__(cond, name=name) |
11 self._cond = Condition() |
12 |
12 self._terminate = False |
13 def __lt__(self, other): |
13 super().__init__(target = self.__scheduler_thread, name = 'Scheduler') |
14 return True |
14 self.daemon=True |
15 |
|
16 class FIFO(QueueThread): |
|
17 def __init__(self, **kwargs): |
|
18 super().__init__(target = self._fifo_thread, **kwargs) |
|
19 |
|
20 def _fifo_thread(self): |
|
21 with self._cond: |
|
22 while not self._terminate: |
|
23 ev=self._list |
|
24 if ev: |
|
25 # We can only remove ev from the list when ev.cond allows |
|
26 with ev.cond: |
|
27 ev._goodtogo=True |
|
28 ev.cond.notifyAll() |
|
29 self._cond.wait() |
|
30 |
|
31 # Termination cleanup |
|
32 ev=self._list |
|
33 while ev: |
|
34 # We can only remove ev from the list when ev.cond allows |
|
35 with ev.cond: |
|
36 ev.cond.notifyAll() |
|
37 ev=ev.next |
|
38 |
|
39 # cond has to be acquired on entry! |
|
40 def queue_action(self, cond, action=lambda: (), name=None): |
|
41 ev=FIFOEvent(cond, name=name) |
|
42 |
|
43 with self._cond: |
|
44 self._insert(ev) |
|
45 |
|
46 goodtogo=False |
|
47 terminate_=False |
|
48 while not goodtogo and not terminate_: |
|
49 # This will release the lock on cond, allowing queue manager (scheduler) |
|
50 # thread to notify us if we are already to be released |
|
51 ev.cond.wait() |
|
52 with ev.cond: |
|
53 goodtogo=ev._goodtogo |
|
54 with self._cond: |
|
55 terminate_=self._terminate |
|
56 |
|
57 try: |
|
58 if not terminate_: |
|
59 action() |
|
60 finally: |
|
61 with self._cond: |
|
62 self._unlink(ev) |
|
63 # Let _fifo_thread proceed to next action |
|
64 self._cond.notify() |
|
65 |
|
66 class Repository(FIFO): |
|
67 def __init__(self, name): |
|
68 super().__init__(name = 'RepositoryThread %s' % name) |
|
69 self.repository_name=name |
|
70 |
|
71 |
|
72 # TODO: Should use weak references but they give KeyError |
|
73 repositories=weakref.WeakValueDictionary() |
|
74 |
|
75 def get_controller(name): |
|
76 if name in repositories: |
|
77 repo = repositories[name] |
|
78 else: |
|
79 repo = Repository(name) |
|
80 repo.start() |
|
81 repositories[name] = repo |
|
82 return repo |
|
83 |