| 1 # |
1 # |
| 2 # Repository abstraction for queuing |
2 # Repository abstraction for queuing |
| 3 # |
3 # |
| 4 |
4 |
| 5 import weakref |
5 import weakref |
| |
6 import borgend |
| 6 from scheduler import QueueThread, QueuedEvent |
7 from scheduler import QueueThread, QueuedEvent |
| |
8 |
| |
9 logger=borgend.logger.getChild(__name__) |
| 7 |
10 |
| 8 class FIFOEvent(QueuedEvent): |
11 class FIFOEvent(QueuedEvent): |
| 9 def __init__(self, cond, name=None): |
12 def __init__(self, cond, name=None): |
| 10 self._goodtogo=False |
13 self._goodtogo=False |
| 11 super().__init__(cond, name=name) |
14 super().__init__(cond, name=name) |
| 12 |
15 |
| 13 def __lt__(self, other): |
16 def __lt__(self, other): |
| 14 return True |
17 return False |
| 15 |
18 |
| 16 class FIFO(QueueThread): |
19 class FIFO(QueueThread): |
| 17 def __init__(self, **kwargs): |
20 def __init__(self, **kwargs): |
| 18 super().__init__(target = self._fifo_thread, **kwargs) |
21 super().__init__(target = self._fifo_thread, **kwargs) |
| 19 |
22 |
| 41 ev=FIFOEvent(cond, name=name) |
45 ev=FIFOEvent(cond, name=name) |
| 42 |
46 |
| 43 with self._cond: |
47 with self._cond: |
| 44 self._insert(ev) |
48 self._insert(ev) |
| 45 |
49 |
| 46 goodtogo=False |
50 # This will release the lock on cond, allowing queue manager (scheduler) |
| 47 terminate_=False |
51 # thread to notify us if we are already to be released |
| 48 while not goodtogo and not terminate_: |
52 logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') |
| 49 # This will release the lock on cond, allowing queue manager (scheduler) |
53 ev.cond.wait() |
| 50 # thread to notify us if we are already to be released |
|
| 51 ev.cond.wait() |
|
| 52 with ev.cond: |
|
| 53 goodtogo=ev._goodtogo |
|
| 54 with self._cond: |
|
| 55 terminate_=self._terminate |
|
| 56 |
54 |
| 57 try: |
55 try: |
| 58 if not terminate_: |
56 if ev._goodtogo: |
| |
57 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') |
| |
58 # |
| |
59 # TODO: action() has to unlink on finish; so should maybe |
| |
60 # use weak references to event. |
| |
61 # Or we have to make action take all the time, so make the |
| |
62 # stdout thread. |
| |
63 # OR: Easiest to just move finish-waiting into __launch_check |
| |
64 # instead of at the outer level of the main loop. |
| |
65 # |
| 59 action() |
66 action() |
| 60 finally: |
67 finally: |
| 61 with self._cond: |
68 with self._cond: |
| 62 self._unlink(ev) |
69 self._unlink(ev) |
| 63 # Let _fifo_thread proceed to next action |
70 # Let _fifo_thread proceed to next action |
| 64 self._cond.notify() |
71 self._cond.notify() |
| |
72 |
| |
73 return ev._goodtogo |
| 65 |
74 |
| 66 class Repository(FIFO): |
75 class Repository(FIFO): |
| 67 def __init__(self, name): |
76 def __init__(self, name): |
| 68 super().__init__(name = 'RepositoryThread %s' % name) |
77 super().__init__(name = 'RepositoryThread %s' % name) |
| 69 self.repository_name=name |
78 self.repository_name=name |