16 def __init__(self, cond, name=None): |
16 def __init__(self, cond, name=None): |
17 self.next=None |
17 self.next=None |
18 self.prev=None |
18 self.prev=None |
19 self.name=name |
19 self.name=name |
20 self.cond=cond |
20 self.cond=cond |
|
21 self.linked=False |
21 |
22 |
22 def __lt__(self, other): |
23 def __lt__(self, other): |
23 raise NotImplementedError |
24 raise NotImplementedError |
24 |
25 |
25 def insert_after(self, ev): |
26 def insert_after(self, ev): |
75 super().__init__(*args, **kwargs) |
76 super().__init__(*args, **kwargs) |
76 self.daemon = True |
77 self.daemon = True |
77 self._list = None |
78 self._list = None |
78 |
79 |
79 def _insert(self, ev): |
80 def _insert(self, ev): |
|
81 assert(not ev.linked) |
80 if not self._list: |
82 if not self._list: |
81 #logger.debug("Insert first") |
83 #logger.debug("Insert first") |
82 self._list=ev |
84 self._list=ev |
83 elif ev<self._list: |
85 elif ev<self._list: |
84 #logger.debug("Insert beginning") |
86 #logger.debug("Insert beginning") |
85 self._list.insert_immediately_before(ev) |
87 self._list.insert_immediately_before(ev) |
86 self._list=ev |
88 self._list=ev |
87 else: |
89 else: |
88 #logger.debug("Insert after") |
90 #logger.debug("Insert after") |
89 self._list.insert_after(ev) |
91 self._list.insert_after(ev) |
90 |
92 ev.linked=True |
91 self._cond.notify() |
|
92 |
93 |
93 def _unlink(self, ev): |
94 def _unlink(self, ev): |
|
95 assert(ev.linked) |
94 if ev==self._list: |
96 if ev==self._list: |
95 self._list=ev.next |
97 self._list=ev.next |
96 ev.unlink() |
98 ev.unlink() |
|
99 ev.linked=False |
97 |
100 |
98 def _resort(self): |
101 def _resort(self): |
99 oldlist=self._list |
102 oldlist=self._list |
100 self._list=None |
103 self._list=None |
101 while oldlist: |
104 while oldlist: |
137 |
140 |
138 while self._list and self._list.when.monotonic() <= now: |
141 while self._list and self._list.when.monotonic() <= now: |
139 ev=self._list |
142 ev=self._list |
140 logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) |
143 logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) |
141 # We are only allowed to remove ev from list when ev.cond allows |
144 # We are only allowed to remove ev from list when ev.cond allows |
|
145 self._unlink(ev) |
|
146 # We need to release the lock on self._cond before acquire |
|
147 # one ev.cond to avoid race conditions with self._wait |
|
148 self._cond.release() |
142 with ev.cond: |
149 with ev.cond: |
143 self._list=ev.next |
150 ev.cond.notify_all() |
144 ev.unlink() |
151 self._cond.acquire() |
145 ev.cond.notifyAll() |
152 |
146 |
153 |
147 def _wakeup_callback(self): |
154 def _wakeup_callback(self): |
148 logger.debug("Rescheduling events after wakeup") |
155 logger.debug("Rescheduling events after wakeup") |
149 with self._cond: |
156 with self._cond: |
150 self._resort() |
157 self._resort() |
151 |
158 |
|
159 # It is required to have acquired the lock on ev.cond on entry |
152 def _wait(self, ev): |
160 def _wait(self, ev): |
153 with self._cond: |
161 with self._cond: |
154 self._insert(ev) |
162 self._insert(ev) |
|
163 self._cond.notify() |
155 |
164 |
156 # This will release the lock on cond, allowing queue manager (scheduler) |
165 # This will release the lock on cond, allowing the scheduler |
157 # thread to notify us if we are already to be released |
166 # thread to notify us if we are ready to be released |
158 ev.cond.wait() |
167 ev.cond.wait() |
159 |
168 |
160 # If we were woken up by some other event, not the scheduler, |
169 # If we were woken up by some other event, not the scheduler, |
161 # ensure the event is removed |
170 # ensure the event is removed |
162 with self._cond: |
171 if ev.linked: |
163 self._unlink(ev) |
172 # Deal with race conditions wrt. the two different locks |
|
173 # in the scheduler |
|
174 #ev.cond.release() |
|
175 with self._cond: |
|
176 self._unlink(ev) |
|
177 #ev.cond.acquire() |
164 |
178 |
165 # cond has to be acquired on entry! |
179 # cond has to be acquired on entry! |
166 def wait_until(self, when, cond, name=None): |
180 def wait_until(self, when, cond, name=None): |
167 logger.debug("Scheduling '%s' in %s seconds [%s]" % |
181 logger.debug("Scheduling '%s' in %s seconds [%s]" % |
168 (name, when.seconds_to(), when.__class__.__name__)) |
182 (name, when.seconds_to(), when.__class__.__name__)) |