borgend/scheduler.py

changeset 87
a214d475aa28
parent 86
2fe66644c50d
child 89
51cc2e25af38
equal deleted inserted replaced
86:2fe66644c50d 87:a214d475aa28
16 def __init__(self, cond, name=None): 16 def __init__(self, cond, name=None):
17 self.next=None 17 self.next=None
18 self.prev=None 18 self.prev=None
19 self.name=name 19 self.name=name
20 self.cond=cond 20 self.cond=cond
21 self.linked=False
21 22
22 def __lt__(self, other): 23 def __lt__(self, other):
23 raise NotImplementedError 24 raise NotImplementedError
24 25
25 def insert_after(self, ev): 26 def insert_after(self, ev):
75 super().__init__(*args, **kwargs) 76 super().__init__(*args, **kwargs)
76 self.daemon = True 77 self.daemon = True
77 self._list = None 78 self._list = None
78 79
79 def _insert(self, ev): 80 def _insert(self, ev):
81 assert(not ev.linked)
80 if not self._list: 82 if not self._list:
81 #logger.debug("Insert first") 83 #logger.debug("Insert first")
82 self._list=ev 84 self._list=ev
83 elif ev<self._list: 85 elif ev<self._list:
84 #logger.debug("Insert beginning") 86 #logger.debug("Insert beginning")
85 self._list.insert_immediately_before(ev) 87 self._list.insert_immediately_before(ev)
86 self._list=ev 88 self._list=ev
87 else: 89 else:
88 #logger.debug("Insert after") 90 #logger.debug("Insert after")
89 self._list.insert_after(ev) 91 self._list.insert_after(ev)
90 92 ev.linked=True
91 self._cond.notify()
92 93
93 def _unlink(self, ev): 94 def _unlink(self, ev):
95 assert(ev.linked)
94 if ev==self._list: 96 if ev==self._list:
95 self._list=ev.next 97 self._list=ev.next
96 ev.unlink() 98 ev.unlink()
99 ev.linked=False
97 100
98 def _resort(self): 101 def _resort(self):
99 oldlist=self._list 102 oldlist=self._list
100 self._list=None 103 self._list=None
101 while oldlist: 104 while oldlist:
137 140
138 while self._list and self._list.when.monotonic() <= now: 141 while self._list and self._list.when.monotonic() <= now:
139 ev=self._list 142 ev=self._list
140 logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) 143 logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
141 # We are only allowed to remove ev from list when ev.cond allows 144 # We are only allowed to remove ev from list when ev.cond allows
145 self._unlink(ev)
146 # We need to release the lock on self._cond before acquire
147 # one ev.cond to avoid race conditions with self._wait
148 self._cond.release()
142 with ev.cond: 149 with ev.cond:
143 self._list=ev.next 150 ev.cond.notify_all()
144 ev.unlink() 151 self._cond.acquire()
145 ev.cond.notifyAll() 152
146 153
147 def _wakeup_callback(self): 154 def _wakeup_callback(self):
148 logger.debug("Rescheduling events after wakeup") 155 logger.debug("Rescheduling events after wakeup")
149 with self._cond: 156 with self._cond:
150 self._resort() 157 self._resort()
151 158
159 # It is required to have acquired the lock on ev.cond on entry
152 def _wait(self, ev): 160 def _wait(self, ev):
153 with self._cond: 161 with self._cond:
154 self._insert(ev) 162 self._insert(ev)
163 self._cond.notify()
155 164
156 # This will release the lock on cond, allowing queue manager (scheduler) 165 # This will release the lock on cond, allowing the scheduler
157 # thread to notify us if we are already to be released 166 # thread to notify us if we are ready to be released
158 ev.cond.wait() 167 ev.cond.wait()
159 168
160 # If we were woken up by some other event, not the scheduler, 169 # If we were woken up by some other event, not the scheduler,
161 # ensure the event is removed 170 # ensure the event is removed
162 with self._cond: 171 if ev.linked:
163 self._unlink(ev) 172 # Deal with race conditions wrt. the two different locks
173 # in the scheduler
174 #ev.cond.release()
175 with self._cond:
176 self._unlink(ev)
177 #ev.cond.acquire()
164 178
165 # cond has to be acquired on entry! 179 # cond has to be acquired on entry!
166 def wait_until(self, when, cond, name=None): 180 def wait_until(self, when, cond, name=None):
167 logger.debug("Scheduling '%s' in %s seconds [%s]" % 181 logger.debug("Scheduling '%s' in %s seconds [%s]" %
168 (name, when.seconds_to(), when.__class__.__name__)) 182 (name, when.seconds_to(), when.__class__.__name__))

mercurial