borgend/scheduler.py

changeset 106
a7bdc239ef62
parent 104
d33e2d7dbeb1
child 113
6993964140bd
equal deleted inserted replaced
105:55043f86c0b5 106:a7bdc239ef62
10 import logging 10 import logging
11 import math 11 import math
12 from threading import Condition, Thread 12 from threading import Condition, Thread
13 13
14 from . import dreamtime 14 from . import dreamtime
15 from .exprotect import protect_noreturn
15 16
16 logger=logging.getLogger(__name__) 17 logger=logging.getLogger(__name__)
17 18
18 class QueuedEvent: 19 class QueuedEvent:
19 def __init__(self, cond, name=None): 20 def __init__(self, cond, name=None):
145 self.precision = precision 146 self.precision = precision
146 self._next_event_time = None 147 self._next_event_time = None
147 super().__init__(target = self._scheduler_thread, name = 'Scheduler') 148 super().__init__(target = self._scheduler_thread, name = 'Scheduler')
148 dreamtime.add_callback(self, self._sleepwake_callback) 149 dreamtime.add_callback(self, self._sleepwake_callback)
149 150
151 @protect_noreturn
150 def _scheduler_thread(self): 152 def _scheduler_thread(self):
151 logger.debug("Scheduler thread started") 153 logger.debug("Scheduler thread started")
152 with self._cond: 154 with self._cond:
153 while not self._terminate: 155 while not self._terminate:
154 try: 156 snapshot = dreamtime.Snapshot()
155 self.__schedule_one() 157 now = snapshot.monotonic()
156 except: 158 if not self._list:
157 logger.exception("Bug in scheduler") 159 timeout = None
158 160 delta = None
159 def __schedule_one(self): 161 nextname = None
160 snapshot = dreamtime.Snapshot() 162 else:
161 now = snapshot.monotonic() 163 nextname=self._list.name
162 if not self._list: 164 delta = self._list.when.horizon(snapshot)-now
163 timeout = None 165 if delta==math.inf:
164 delta = None 166 timeout=None
165 nextname = None 167 else:
166 else: 168 timeout = min(self.precision, delta)
167 nextname=self._list.name 169
168 delta = self._list.when.horizon(snapshot)-now 170 if not timeout or timeout>0:
169 if delta==math.inf: 171 logger.debug("Scheduler waiting %s seconds [next event '%s' in %s seconds]"
170 timeout=None 172 % (str(timeout), nextname, str(delta)))
171 else: 173 self._cond.wait(timeout)
172 timeout = min(self.precision, delta) 174 snapshot = dreamtime.Snapshot()
173 175 now = snapshot.monotonic()
174 if not timeout or timeout>0: 176 logger.debug("Scheduler timed out")
175 logger.debug("Scheduler waiting %s seconds [next event '%s' in %s seconds]" 177
176 % (str(timeout), nextname, str(delta))) 178 while self._list and self._list.when.horizon(snapshot) <= now:
177 self._cond.wait(timeout) 179 ev=self._list
178 snapshot = dreamtime.Snapshot() 180 logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
179 now = snapshot.monotonic() 181 # We are only allowed to remove ev from list when ev.cond allows
180 logger.debug("Scheduler timed out") 182 self._unlink(ev)
181 183 # We need to release the lock on self._cond before acquire
182 while self._list and self._list.when.horizon(snapshot) <= now: 184 # one ev.cond to avoid race conditions with self._wait
183 ev=self._list 185 self._cond.release()
184 logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) 186 with ev.cond:
185 # We are only allowed to remove ev from list when ev.cond allows 187 ev.cond.notify_all()
186 self._unlink(ev) 188 self._cond.acquire()
187 # We need to release the lock on self._cond before acquire 189
188 # one ev.cond to avoid race conditions with self._wait 190 @protect_noreturn
189 self._cond.release()
190 with ev.cond:
191 ev.cond.notify_all()
192 self._cond.acquire()
193
194
195 def _sleepwake_callback(self, woke): 191 def _sleepwake_callback(self, woke):
196 logger.debug("Rescheduling events after sleep/wakeup") 192 logger.debug("Rescheduling events after sleep/wakeup")
197 with self._cond: 193 with self._cond:
198 self._resort() 194 self._resort()
199 self._cond.notify() 195 self._cond.notify()

mercurial