scheduler.py

changeset 54
cfcaa5f6ba33
parent 53
442c558bd632
child 55
407af23d16bb
equal deleted inserted replaced
53:442c558bd632 54:cfcaa5f6ba33
16 self.prev=None 16 self.prev=None
17 self.name=name 17 self.name=name
18 self.cond=cond 18 self.cond=cond
19 19
20 def __lt__(self, other): 20 def __lt__(self, other):
21 return False 21 raise NotImplementedError
22
23 def __gt__(self, other):
24 return True
25 22
26 def insert_after(self, ev): 23 def insert_after(self, ev):
27 if not self.next: 24 if not self.next:
28 ev.prev=self 25 ev.prev=self
29 self.next=ev 26 self.next=ev
30 ev.next=None 27 ev.next=None
31 elif self.next>ev: 28 elif ev<self.next:
32 self.insert_immediately_after(ev) 29 self.insert_immediately_after(ev)
33 else: 30 else:
34 self.next.insert_after(ev) 31 self.next.insert_after(ev)
35 32
36 def insert_immediately_after(self, ev): 33 def insert_immediately_after(self, ev):
48 if p: 45 if p:
49 p.next=n 46 p.next=n
50 47
51 class ScheduledEvent(QueuedEvent): 48 class ScheduledEvent(QueuedEvent):
52 def __init__(self, when, cond, name=None): 49 def __init__(self, when, cond, name=None):
53 super().__init__(cond, name) 50 super().__init__(cond, name=name)
54 self.when=when 51 self.when=when
55 52
56 def __lt__(self, other): 53 def __lt__(self, other):
57 return self.when < other.when 54 return self.when < other.when
58
59 def __gt__(self, other):
60 return self.when > other.when
61 55
62 class TerminableThread(Thread): 56 class TerminableThread(Thread):
63 def __init__(self, *args, **kwargs): 57 def __init__(self, *args, **kwargs):
64 super().__init__(*args, **kwargs) 58 super().__init__(*args, **kwargs)
65 self._terminate=False 59 self._terminate=False
70 _terminate=True 64 _terminate=True
71 self._cond.notify() 65 self._cond.notify()
72 66
73 class QueueThread(TerminableThread): 67 class QueueThread(TerminableThread):
74 def __init__(self, *args, **kwargs): 68 def __init__(self, *args, **kwargs):
69 super().__init__(*args, **kwargs)
70 self.daemon = True
75 self._list = None 71 self._list = None
76 super().__init__(*args, **kwargs)
77 72
78 def _insert(self, ev): 73 def _insert(self, ev):
79 if not self._list: 74 if not self._list:
80 self._list=ev 75 self._list=ev
81 elif self._list > ev: 76 elif ev<self._list:
82 ev.insert_immediately_after(self._list) 77 ev.insert_immediately_after(self._list)
83 self._list=ev 78 self._list=ev
84 else: 79 else:
85 self._list.insert_immediately_after(ev) 80 self._list.insert_immediately_after(ev)
86 81
87 self._cond.notify() 82 self._cond.notify()
88 83
89 def _wait(self, ev): 84 def _unlink(self, ev):
90 with self._cond: 85 if ev==self._list:
91 self._insert(ev) 86 self._list=ev.next
92 # This will release the lock on cond, allowing queue manager (scheduler)
93 # thread to notify us if we are already to be released
94 ev.cond.wait()
95
96 # If we were woken up by some other event, not the scheduler,
97 # ensure the event is removed
98 with self._cond:
99 if ev==self._list:
100 self._list=ev.next
101 ev.unlink()
102
103 def _pop(self):
104 ev=self._list
105 logger.info("%s: Found event %s" % self.name(), str(ev.name))
106 self._list=ev.next
107 ev.unlink() 87 ev.unlink()
108 ev.cond.acquire()
109 ev.cond.notifyAll()
110 ev.cond.release()
111 88
112 89
113 class Scheduler(QueueThread): 90 class Scheduler(QueueThread):
114 # Default to precision of 60 seconds: the scheduler thread will never 91 # Default to precision of 60 seconds: the scheduler thread will never
115 # sleep longer than that, to get quickly back on track with the schedule 92 # sleep longer than that, to get quickly back on track with the schedule
116 # when the computer wakes up from sleep 93 # when the computer wakes up from sleep
117 def __init__(self, precision=60): 94 def __init__(self, precision=60):
118 self.precision = precision 95 self.precision = precision
119 self._next_event_time = None 96 self._next_event_time = None
120 self._list = None 97 super().__init__(target = self._scheduler_thread, name = 'Scheduler')
121 super().__init__(target = self.__scheduler_thread, name = 'Scheduler')
122 self.daemon=True
123 98
124 def __scheduler_thread(self): 99 def _scheduler_thread(self):
125 with self._cond: 100 with self._cond:
126 while not self._terminate: 101 while not self._terminate:
127 now = time.monotonic() 102 now = time.monotonic()
128 if not self._list: 103 if not self._list:
129 timeout = None 104 timeout = None
135 if not timeout or timeout>0: 110 if not timeout or timeout>0:
136 self._cond.wait(timeout) 111 self._cond.wait(timeout)
137 now = time.monotonic() 112 now = time.monotonic()
138 113
139 while self._list and self._list.when <= now: 114 while self._list and self._list.when <= now:
140 self._pop() 115 ev=self._list
116 logger.info("%s: Scheduling event %s" % self.name(), str(ev.name))
117 # We are only allowed to remove ev from list when ev.cond allows
118 with ev.cond:
119 self._list=ev.next
120 ev.unlink()
121 ev.cond.notifyAll()
122
123 def _wait(self, ev):
124 with self._cond:
125 self._insert(ev)
126
127 # This will release the lock on cond, allowing queue manager (scheduler)
128 # thread to notify us if we are already to be released
129 ev.cond.wait()
130
131 # If we were woken up by some other event, not the scheduler,
132 # ensure the event is removed
133 with self._cond:
134 self._unlink(ev)
141 135
142 # cond has to be acquired on entry! 136 # cond has to be acquired on entry!
143 def wait_until(self, when, cond, name=None): 137 def wait_until(self, when, cond, name=None):
144 self._wait(ScheduledEvent(when, cond, name)) 138 self._wait(ScheduledEvent(when, cond, name))
145 139

mercurial