Mon, 22 Jan 2018 22:23:01 +0000
Basic repository queue
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
diff
changeset
|
1 | # |
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
diff
changeset
|
2 | # Repository abstraction for queuing |
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
diff
changeset
|
3 | # |
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
diff
changeset
|
4 | |
54 | 5 | import weakref |
6 | from scheduler import QueueThread, QueuedEvent | |
7 | ||
8 | class FIFOEvent(QueuedEvent): | |
9 | def __init__(self, cond, name=None): | |
10 | self._goodtogo=False | |
11 | super().__init__(cond, name=name) | |
12 | ||
13 | def __lt__(self, other): | |
14 | return True | |
15 | ||
16 | class FIFO(QueueThread): | |
17 | def __init__(self, **kwargs): | |
18 | super().__init__(target = self._fifo_thread, **kwargs) | |
19 | ||
20 | def _fifo_thread(self): | |
21 | with self._cond: | |
22 | while not self._terminate: | |
23 | ev=self._list | |
24 | if ev: | |
25 | # We can only remove ev from the list when ev.cond allows | |
26 | with ev.cond: | |
27 | ev._goodtogo=True | |
28 | ev.cond.notifyAll() | |
29 | self._cond.wait() | |
30 | ||
31 | # Termination cleanup | |
32 | ev=self._list | |
33 | while ev: | |
34 | # We can only remove ev from the list when ev.cond allows | |
35 | with ev.cond: | |
36 | ev.cond.notifyAll() | |
37 | ev=ev.next | |
38 | ||
39 | # cond has to be acquired on entry! | |
40 | def queue_action(self, cond, action=lambda: (), name=None): | |
41 | ev=FIFOEvent(cond, name=name) | |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
diff
changeset
|
42 | |
54 | 43 | with self._cond: |
44 | self._insert(ev) | |
45 | ||
46 | goodtogo=False | |
47 | terminate_=False | |
48 | while not goodtogo and not terminate_: | |
49 | # This will release the lock on cond, allowing queue manager (scheduler) | |
50 | # thread to notify us if we are already to be released | |
51 | ev.cond.wait() | |
52 | with ev.cond: | |
53 | goodtogo=ev._goodtogo | |
54 | with self._cond: | |
55 | terminate_=self._terminate | |
56 | ||
57 | try: | |
58 | if not terminate_: | |
59 | action() | |
60 | finally: | |
61 | with self._cond: | |
62 | self._unlink(ev) | |
63 | # Let _fifo_thread proceed to next action | |
64 | self._cond.notify() | |
65 | ||
66 | class Repository(FIFO): | |
67 | def __init__(self, name): | |
68 | super().__init__(name = 'RepositoryThread %s' % name) | |
69 | self.repository_name=name | |
70 | ||
71 | ||
72 | # TODO: Should use weak references but they give KeyError | |
73 | repositories=weakref.WeakValueDictionary() | |
74 | ||
75 | def get_controller(name): | |
76 | if name in repositories: | |
77 | repo = repositories[name] | |
78 | else: | |
79 | repo = Repository(name) | |
80 | repo.start() | |
81 | repositories[name] = repo | |
82 | return repo | |
83 |