scheduler.py

changeset 76
4b08fca3ce34
parent 75
2a44b9649212
child 78
83b43987e61e
equal deleted inserted replaced
75:2a44b9649212 76:4b08fca3ce34
4 # This module simply provide a way for other threads to until a given time 4 # This module simply provide a way for other threads to until a given time
5 # 5 #
6 6
7 import time 7 import time
8 import borgend 8 import borgend
9 import sleep
9 from threading import Condition, Lock, Thread 10 from threading import Condition, Lock, Thread
10 11
11 logger=borgend.logger.getChild(__name__) 12 logger=borgend.logger.getChild(__name__)
12 13
13 class QueuedEvent: 14 class QueuedEvent:
43 p=self.prev 44 p=self.prev
44 if n: 45 if n:
45 n.prev=p 46 n.prev=p
46 if p: 47 if p:
47 p.next=n 48 p.next=n
49 self.next=None
50 self.prev=None
48 51
49 class ScheduledEvent(QueuedEvent): 52 class ScheduledEvent(QueuedEvent):
53 #@accepts(ScheduledEvent, sleep.Time, threading.Cond, str)
50 def __init__(self, when, cond, name=None): 54 def __init__(self, when, cond, name=None):
51 super().__init__(cond, name=name) 55 super().__init__(cond, name=name)
52 self.when=when 56 self.when=when
53 57
54 def __lt__(self, other): 58 def __lt__(self, other):
88 def _unlink(self, ev): 92 def _unlink(self, ev):
89 if ev==self._list: 93 if ev==self._list:
90 self._list=ev.next 94 self._list=ev.next
91 ev.unlink() 95 ev.unlink()
92 96
97 def _resort(self):
98 oldlist=self._list
99 self._list=None
100 while oldlist:
101 ev=oldlist
102 oldlist=oldlist.next
103 ev.unlink()
104 self._insert(ev)
105
106
93 107
94 class Scheduler(QueueThread): 108 class Scheduler(QueueThread):
95 # Default to precision of 60 seconds: the scheduler thread will never 109 # Default to precision of 60 seconds: the scheduler thread will never
96 # sleep longer than that, to get quickly back on track with the schedule 110 # sleep longer than that, to get quickly back on track with the schedule
97 # when the computer wakes up from sleep 111 # when the computer wakes up from sleep
98 def __init__(self, precision=60): 112 def __init__(self, precision=60):
99 self.precision = precision 113 self.precision = precision
100 self._next_event_time = None 114 self._next_event_time = None
101 super().__init__(target = self._scheduler_thread, name = 'Scheduler') 115 super().__init__(target = self._scheduler_thread, name = 'Scheduler')
116 sleep.add_callback(self, self._wakeup_callback)
102 117
103 def _scheduler_thread(self): 118 def _scheduler_thread(self):
104 logger.debug("Scheduler thread started") 119 logger.debug("Scheduler thread started")
105 with self._cond: 120 with self._cond:
106 while not self._terminate: 121 while not self._terminate:
108 if not self._list: 123 if not self._list:
109 timeout = None 124 timeout = None
110 else: 125 else:
111 # Wait at most precision seconds, or until next event if it 126 # Wait at most precision seconds, or until next event if it
112 # comes earlier 127 # comes earlier
113 timeout=min(self.precision, self._list.when-now) 128 timeout=min(self.precision, self._list.when.realtime()-now)
114 129
115 if not timeout or timeout>0: 130 if not timeout or timeout>0:
116 logger.debug("Scheduler waiting %d seconds" % (timeout or (-1))) 131 logger.debug("Scheduler waiting %d seconds" % (timeout or (-1)))
117 self._cond.wait(timeout) 132 self._cond.wait(timeout)
118 now = time.monotonic() 133 now = time.monotonic()
119 134
120 logger.debug("Scheduler timed out") 135 logger.debug("Scheduler timed out")
121 136
122 while self._list and self._list.when <= now: 137 while self._list and self._list.when.monotonic() <= now:
123 ev=self._list 138 ev=self._list
124 logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) 139 logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
125 # We are only allowed to remove ev from list when ev.cond allows 140 # We are only allowed to remove ev from list when ev.cond allows
126 with ev.cond: 141 with ev.cond:
127 self._list=ev.next 142 self._list=ev.next
128 ev.unlink() 143 ev.unlink()
129 ev.cond.notifyAll() 144 ev.cond.notifyAll()
145
146 def _wakeup_callback(self):
147 logger.debug("Rescheduling events after wakeup")
148 with self._cond:
149 self._resort()
130 150
131 def _wait(self, ev): 151 def _wait(self, ev):
132 with self._cond: 152 with self._cond:
133 self._insert(ev) 153 self._insert(ev)
134 154
141 with self._cond: 161 with self._cond:
142 self._unlink(ev) 162 self._unlink(ev)
143 163
144 # cond has to be acquired on entry! 164 # cond has to be acquired on entry!
145 def wait_until(self, when, cond, name=None): 165 def wait_until(self, when, cond, name=None):
146 logger.debug("Scheduling '%s' at %d" % (name, when)) 166 logger.debug("Scheduling '%s' in %s seconds [%s]" %
167 (name, when.seconds_to(), when.__class__.__name__))
147 self._wait(ScheduledEvent(when, cond, name)) 168 self._wait(ScheduledEvent(when, cond, name))
148 169

mercurial