borgend/scheduler.py

changeset 101
3068b0de12ee
parent 91
f53aa2007a84
child 102
0d43cd568f3c
equal deleted inserted replaced
100:b141bed9e718 101:3068b0de12ee
20 self.prev=None 20 self.prev=None
21 self.name=name 21 self.name=name
22 self.cond=cond 22 self.cond=cond
23 self.linked=False 23 self.linked=False
24 24
25 @staticmethod
26 def snapshot():
27 return None
28
29 def is_before(self, other, snapshot=None):
30 raise NotImplementedError
31
25 def __lt__(self, other): 32 def __lt__(self, other):
26 raise NotImplementedError 33 return self.is_before(other, self.snapshot())
27 34
28 def insert_after(self, ev): 35 def insert_after(self, ev, snapshot=None):
29 if not self.next or ev<self.next: 36 if not snapshot:
37 snapshot=self.snapshot()
38 if not self.next or ev.is_before(self.next, snapshot):
30 self.insert_immediately_after(ev) 39 self.insert_immediately_after(ev)
31 else: 40 else:
32 self.next.insert_after(ev) 41 self.next.insert_after(ev, snapshot)
33 42
34 def insert_immediately_after(self, ev): 43 def insert_immediately_after(self, ev):
35 assert(ev.next is None and ev.prev is None) 44 assert(ev.next is None and ev.prev is None)
36 ev.prev=self 45 ev.prev=self
37 ev.next=self.next 46 ev.next=self.next
57 #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str) 66 #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str)
58 def __init__(self, when, cond, name=None): 67 def __init__(self, when, cond, name=None):
59 super().__init__(cond, name=name) 68 super().__init__(cond, name=name)
60 self.when=when 69 self.when=when
61 70
62 def __lt__(self, other): 71 @staticmethod
63 return self.when < other.when 72 def snapshot():
73 return dreamtime.Snapshot()
74
75 def is_before(self, other, snapshot=None):
76 if not snapshot:
77 snapshot=self.snapshot()
78 return self.when.horizon(snapshot) < other.when.horizon(snapshot)
64 79
65 class TerminableThread(Thread): 80 class TerminableThread(Thread):
66 def __init__(self, *args, **kwargs): 81 def __init__(self, *args, **kwargs):
67 super().__init__(*args, **kwargs) 82 super().__init__(*args, **kwargs)
68 self._terminate=False 83 self._terminate=False
77 def __init__(self, *args, **kwargs): 92 def __init__(self, *args, **kwargs):
78 super().__init__(*args, **kwargs) 93 super().__init__(*args, **kwargs)
79 self.daemon = True 94 self.daemon = True
80 self._list = None 95 self._list = None
81 96
82 def _insert(self, ev): 97 def _insert(self, ev, snapshot=None):
83 assert(not ev.linked) 98 assert(not ev.linked)
84 if not self._list: 99 if not self._list:
85 #logger.debug("Insert first") 100 #logger.debug("Insert first")
86 self._list=ev 101 self._list=ev
87 elif ev<self._list: 102 elif ev<self._list:
88 #logger.debug("Insert beginning") 103 #logger.debug("Insert beginning")
89 self._list.insert_immediately_before(ev) 104 self._list.insert_immediately_before(ev)
90 self._list=ev 105 self._list=ev
91 else: 106 else:
92 #logger.debug("Insert after") 107 #logger.debug("Insert after")
93 self._list.insert_after(ev) 108 if not snapshot:
109 snapshot=ev.snapshot()
110 self._list.insert_after(ev, snapshot)
94 ev.linked=True 111 ev.linked=True
95 112
96 def _unlink(self, ev): 113 def _unlink(self, ev):
97 assert(ev.linked) 114 assert(ev.linked)
98 if ev==self._list: 115 if ev==self._list:
99 self._list=ev.next 116 self._list=ev.next
100 ev.unlink() 117 ev.unlink()
101 ev.linked=False 118 ev.linked=False
102 119
103 def _resort(self): 120 def _resort(self, snapshot=None):
104 oldlist=self._list 121 oldlist=self._list
105 self._list=None 122 self._list=None
123 if oldlist and not snapshot:
124 snapshot=oldlist.snapshot()
106 while oldlist: 125 while oldlist:
107 ev=oldlist 126 ev=oldlist
108 oldlist=oldlist.next 127 oldlist=oldlist.next
109 ev.unlink() 128 ev.unlink()
110 ev.linked=False 129 ev.linked=False
111 self._insert(ev) 130 self._insert(ev, snapshot)
112 131
113 132
114 133
115 class Scheduler(QueueThread): 134 class Scheduler(QueueThread):
116 # Default to precision of 60 seconds: the scheduler thread will never 135 # Default to precision of 60 seconds: the scheduler thread will never
118 # when the computer wakes up from sleep 137 # when the computer wakes up from sleep
119 def __init__(self, precision=60): 138 def __init__(self, precision=60):
120 self.precision = precision 139 self.precision = precision
121 self._next_event_time = None 140 self._next_event_time = None
122 super().__init__(target = self._scheduler_thread, name = 'Scheduler') 141 super().__init__(target = self._scheduler_thread, name = 'Scheduler')
123 dreamtime.add_callback(self, self._wakeup_callback) 142 dreamtime.add_callback(self, self._sleepwake_callback)
124 143
125 def _scheduler_thread(self): 144 def _scheduler_thread(self):
126 logger.debug("Scheduler thread started") 145 logger.debug("Scheduler thread started")
127 with self._cond: 146 with self._cond:
128 while not self._terminate: 147 while not self._terminate:
129 now = time.monotonic() 148 snapshot = dreamtime.Snapshot()
149 now = snapshot.monotonic()
130 if not self._list: 150 if not self._list:
131 timeout = None 151 timeout = None
132 else: 152 else:
133 # Wait at most precision seconds, or until next event if it 153 # Wait at most precision seconds, or until next event if it
134 # comes earlier 154 # comes earlier
135 timeout=min(self.precision, self._list.when.realtime()-now) 155 delta = self._list.when.monotonic(snapshot)-now
156 timeout = min(self.precision, delta)
136 157
137 if not timeout or timeout>0: 158 if not timeout or timeout>0:
138 logger.debug("Scheduler waiting %s seconds" % str(timeout)) 159 logger.debug("Scheduler waiting %s seconds" % str(timeout))
139 self._cond.wait(timeout) 160 self._cond.wait(timeout)
140 now = time.monotonic() 161 snapshot = dreamtime.Snapshot()
162 now = snapshot.monotonic()
141 163
142 logger.debug("Scheduler timed out") 164 logger.debug("Scheduler timed out")
143 165
144 while self._list and self._list.when.monotonic() <= now: 166 while self._list and self._list.when.horizon(snapshot) <= now:
145 ev=self._list 167 ev=self._list
146 logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) 168 logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
147 # We are only allowed to remove ev from list when ev.cond allows 169 # We are only allowed to remove ev from list when ev.cond allows
148 self._unlink(ev) 170 self._unlink(ev)
149 # We need to release the lock on self._cond before acquire 171 # We need to release the lock on self._cond before acquire
152 with ev.cond: 174 with ev.cond:
153 ev.cond.notify_all() 175 ev.cond.notify_all()
154 self._cond.acquire() 176 self._cond.acquire()
155 177
156 178
157 def _wakeup_callback(self): 179 def _sleepwake_callback(self, woke):
158 logger.debug("Rescheduling events after wakeup") 180 logger.debug("Rescheduling events after sleep/wakeup")
159 with self._cond: 181 with self._cond:
160 self._resort() 182 self._resort()
161 183
162 # It is required to have acquired the lock on ev.cond on entry 184 # It is required to have acquired the lock on ev.cond on entry
163 def _wait(self, ev): 185 def _wait(self, ev):

mercurial