1 # |
1 # |
2 # Repository abstraction for queuing |
2 # Repository abstraction for queuing |
3 # |
3 # |
4 |
4 |
5 import weakref |
5 import weakref |
|
6 import borgend |
6 from scheduler import QueueThread, QueuedEvent |
7 from scheduler import QueueThread, QueuedEvent |
|
8 |
|
9 logger=borgend.logger.getChild(__name__) |
7 |
10 |
8 class FIFOEvent(QueuedEvent): |
11 class FIFOEvent(QueuedEvent): |
9 def __init__(self, cond, name=None): |
12 def __init__(self, cond, name=None): |
10 self._goodtogo=False |
13 self._goodtogo=False |
11 super().__init__(cond, name=name) |
14 super().__init__(cond, name=name) |
12 |
15 |
13 def __lt__(self, other): |
16 def __lt__(self, other): |
14 return True |
17 return False |
15 |
18 |
16 class FIFO(QueueThread): |
19 class FIFO(QueueThread): |
17 def __init__(self, **kwargs): |
20 def __init__(self, **kwargs): |
18 super().__init__(target = self._fifo_thread, **kwargs) |
21 super().__init__(target = self._fifo_thread, **kwargs) |
19 |
22 |
41 ev=FIFOEvent(cond, name=name) |
45 ev=FIFOEvent(cond, name=name) |
42 |
46 |
43 with self._cond: |
47 with self._cond: |
44 self._insert(ev) |
48 self._insert(ev) |
45 |
49 |
46 goodtogo=False |
50 # This will release the lock on cond, allowing queue manager (scheduler) |
47 terminate_=False |
51 # thread to notify us if we are already to be released |
48 while not goodtogo and not terminate_: |
52 logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') |
49 # This will release the lock on cond, allowing queue manager (scheduler) |
53 ev.cond.wait() |
50 # thread to notify us if we are already to be released |
|
51 ev.cond.wait() |
|
52 with ev.cond: |
|
53 goodtogo=ev._goodtogo |
|
54 with self._cond: |
|
55 terminate_=self._terminate |
|
56 |
54 |
57 try: |
55 try: |
58 if not terminate_: |
56 if ev._goodtogo: |
|
57 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') |
|
58 # |
|
59 # TODO: action() has to unlink on finish; so should maybe |
|
60 # use weak references to event. |
|
61 # Or we have to make action take all the time, so make the |
|
62 # stdout thread. |
|
63 # OR: Easiest to just move finish-waiting into __launch_check |
|
64 # instead of at the outer level of the main loop. |
|
65 # |
59 action() |
66 action() |
60 finally: |
67 finally: |
61 with self._cond: |
68 with self._cond: |
62 self._unlink(ev) |
69 self._unlink(ev) |
63 # Let _fifo_thread proceed to next action |
70 # Let _fifo_thread proceed to next action |
64 self._cond.notify() |
71 self._cond.notify() |
|
72 |
|
73 return ev._goodtogo |
65 |
74 |
66 class Repository(FIFO): |
75 class Repository(FIFO): |
67 def __init__(self, name): |
76 def __init__(self, name): |
68 super().__init__(name = 'RepositoryThread %s' % name) |
77 super().__init__(name = 'RepositoryThread %s' % name) |
69 self.repository_name=name |
78 self.repository_name=name |