Sun, 28 Jan 2018 19:27:34 +0000
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
49 | 1 | # |
2 | # Scheduler for Borgend | |
3 | # | |
4 | # This module simply provide a way for other threads to until a given time | |
5 | # | |
6 | ||
7 | import time | |
86
2fe66644c50d
Can use logging.getLogger directly now after proper packageisation
Tuomo Valkonen <tuomov@iki.fi>
parents:
80
diff
changeset
|
8 | import logging |
80
a409242121d5
Better package-like organisation
Tuomo Valkonen <tuomov@iki.fi>
parents:
79
diff
changeset
|
9 | from threading import Condition, Thread |
a409242121d5
Better package-like organisation
Tuomo Valkonen <tuomov@iki.fi>
parents:
79
diff
changeset
|
10 | |
a409242121d5
Better package-like organisation
Tuomo Valkonen <tuomov@iki.fi>
parents:
79
diff
changeset
|
11 | from . import dreamtime |
49 | 12 | |
86
2fe66644c50d
Can use logging.getLogger directly now after proper packageisation
Tuomo Valkonen <tuomov@iki.fi>
parents:
80
diff
changeset
|
13 | logger=logging.getLogger(__name__) |
49 | 14 | |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
15 | class QueuedEvent: |
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
16 | def __init__(self, cond, name=None): |
49 | 17 | self.next=None |
18 | self.prev=None | |
19 | self.name=name | |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
20 | self.cond=cond |
87
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
21 | self.linked=False |
49 | 22 | |
23 | def __lt__(self, other): | |
54 | 24 | raise NotImplementedError |
49 | 25 | |
26 | def insert_after(self, ev): | |
69
8705e296c7a0
Scheduling list fix and simplifications
Tuomo Valkonen <tuomov@iki.fi>
parents:
59
diff
changeset
|
27 | if not self.next or ev<self.next: |
49 | 28 | self.insert_immediately_after(ev) |
29 | else: | |
30 | self.next.insert_after(ev) | |
31 | ||
32 | def insert_immediately_after(self, ev): | |
69
8705e296c7a0
Scheduling list fix and simplifications
Tuomo Valkonen <tuomov@iki.fi>
parents:
59
diff
changeset
|
33 | assert(ev.next is None and ev.prev is None) |
49 | 34 | ev.prev=self |
35 | ev.next=self.next | |
36 | self.next=ev | |
37 | ||
69
8705e296c7a0
Scheduling list fix and simplifications
Tuomo Valkonen <tuomov@iki.fi>
parents:
59
diff
changeset
|
38 | def insert_immediately_before(self, ev): |
8705e296c7a0
Scheduling list fix and simplifications
Tuomo Valkonen <tuomov@iki.fi>
parents:
59
diff
changeset
|
39 | assert(ev.next is None and ev.prev is None) |
8705e296c7a0
Scheduling list fix and simplifications
Tuomo Valkonen <tuomov@iki.fi>
parents:
59
diff
changeset
|
40 | ev.next=self |
8705e296c7a0
Scheduling list fix and simplifications
Tuomo Valkonen <tuomov@iki.fi>
parents:
59
diff
changeset
|
41 | ev.prev=self.prev |
8705e296c7a0
Scheduling list fix and simplifications
Tuomo Valkonen <tuomov@iki.fi>
parents:
59
diff
changeset
|
42 | self.prev=ev |
8705e296c7a0
Scheduling list fix and simplifications
Tuomo Valkonen <tuomov@iki.fi>
parents:
59
diff
changeset
|
43 | |
49 | 44 | def unlink(self): |
45 | n=self.next | |
46 | p=self.prev | |
47 | if n: | |
48 | n.prev=p | |
49 | if p: | |
50 | p.next=n | |
76
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
51 | self.next=None |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
52 | self.prev=None |
49 | 53 | |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
54 | class ScheduledEvent(QueuedEvent): |
78
83b43987e61e
Renamed the "sleep" module "dreamtime"
Tuomo Valkonen <tuomov@iki.fi>
parents:
76
diff
changeset
|
55 | #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str) |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
56 | def __init__(self, when, cond, name=None): |
54 | 57 | super().__init__(cond, name=name) |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
58 | self.when=when |
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
59 | |
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
60 | def __lt__(self, other): |
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
61 | return self.when < other.when |
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
62 | |
49 | 63 | class TerminableThread(Thread): |
64 | def __init__(self, *args, **kwargs): | |
65 | super().__init__(*args, **kwargs) | |
66 | self._terminate=False | |
67 | self._cond=Condition() | |
68 | ||
69 | def terminate(self): | |
70 | with self._cond: | |
71 | _terminate=True | |
72 | self._cond.notify() | |
73 | ||
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
74 | class QueueThread(TerminableThread): |
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
75 | def __init__(self, *args, **kwargs): |
54 | 76 | super().__init__(*args, **kwargs) |
77 | self.daemon = True | |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
78 | self._list = None |
49 | 79 | |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
80 | def _insert(self, ev): |
87
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
81 | assert(not ev.linked) |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
82 | if not self._list: |
75
2a44b9649212
UI refresh fix; added debug messages
Tuomo Valkonen <tuomov@iki.fi>
parents:
69
diff
changeset
|
83 | #logger.debug("Insert first") |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
84 | self._list=ev |
54 | 85 | elif ev<self._list: |
75
2a44b9649212
UI refresh fix; added debug messages
Tuomo Valkonen <tuomov@iki.fi>
parents:
69
diff
changeset
|
86 | #logger.debug("Insert beginning") |
69
8705e296c7a0
Scheduling list fix and simplifications
Tuomo Valkonen <tuomov@iki.fi>
parents:
59
diff
changeset
|
87 | self._list.insert_immediately_before(ev) |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
88 | self._list=ev |
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
89 | else: |
75
2a44b9649212
UI refresh fix; added debug messages
Tuomo Valkonen <tuomov@iki.fi>
parents:
69
diff
changeset
|
90 | #logger.debug("Insert after") |
59
8d0a815022cc
Oops, accidentally calling the wrong function (+log message clarification)
Tuomo Valkonen <tuomov@iki.fi>
parents:
55
diff
changeset
|
91 | self._list.insert_after(ev) |
87
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
92 | ev.linked=True |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
93 | |
54 | 94 | def _unlink(self, ev): |
87
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
95 | assert(ev.linked) |
54 | 96 | if ev==self._list: |
97 | self._list=ev.next | |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
98 | ev.unlink() |
87
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
99 | ev.linked=False |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
100 | |
76
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
101 | def _resort(self): |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
102 | oldlist=self._list |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
103 | self._list=None |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
104 | while oldlist: |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
105 | ev=oldlist |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
106 | oldlist=oldlist.next |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
107 | ev.unlink() |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
108 | self._insert(ev) |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
109 | |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
110 | |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
111 | |
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
112 | class Scheduler(QueueThread): |
49 | 113 | # Default to precision of 60 seconds: the scheduler thread will never |
114 | # sleep longer than that, to get quickly back on track with the schedule | |
115 | # when the computer wakes up from sleep | |
116 | def __init__(self, precision=60): | |
117 | self.precision = precision | |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
118 | self._next_event_time = None |
54 | 119 | super().__init__(target = self._scheduler_thread, name = 'Scheduler') |
78
83b43987e61e
Renamed the "sleep" module "dreamtime"
Tuomo Valkonen <tuomov@iki.fi>
parents:
76
diff
changeset
|
120 | dreamtime.add_callback(self, self._wakeup_callback) |
49 | 121 | |
54 | 122 | def _scheduler_thread(self): |
75
2a44b9649212
UI refresh fix; added debug messages
Tuomo Valkonen <tuomov@iki.fi>
parents:
69
diff
changeset
|
123 | logger.debug("Scheduler thread started") |
49 | 124 | with self._cond: |
125 | while not self._terminate: | |
126 | now = time.monotonic() | |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
127 | if not self._list: |
49 | 128 | timeout = None |
129 | else: | |
130 | # Wait at most precision seconds, or until next event if it | |
131 | # comes earlier | |
76
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
132 | timeout=min(self.precision, self._list.when.realtime()-now) |
49 | 133 | |
134 | if not timeout or timeout>0: | |
86
2fe66644c50d
Can use logging.getLogger directly now after proper packageisation
Tuomo Valkonen <tuomov@iki.fi>
parents:
80
diff
changeset
|
135 | logger.debug("Scheduler waiting %s seconds" % str(timeout)) |
49 | 136 | self._cond.wait(timeout) |
137 | now = time.monotonic() | |
138 | ||
75
2a44b9649212
UI refresh fix; added debug messages
Tuomo Valkonen <tuomov@iki.fi>
parents:
69
diff
changeset
|
139 | logger.debug("Scheduler timed out") |
2a44b9649212
UI refresh fix; added debug messages
Tuomo Valkonen <tuomov@iki.fi>
parents:
69
diff
changeset
|
140 | |
76
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
141 | while self._list and self._list.when.monotonic() <= now: |
54 | 142 | ev=self._list |
59
8d0a815022cc
Oops, accidentally calling the wrong function (+log message clarification)
Tuomo Valkonen <tuomov@iki.fi>
parents:
55
diff
changeset
|
143 | logger.debug("Scheduler activating %s" % (ev.name or "(unknown)")) |
54 | 144 | # We are only allowed to remove ev from list when ev.cond allows |
87
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
145 | self._unlink(ev) |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
146 | # We need to release the lock on self._cond before acquire |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
147 | # one ev.cond to avoid race conditions with self._wait |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
148 | self._cond.release() |
54 | 149 | with ev.cond: |
87
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
150 | ev.cond.notify_all() |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
151 | self._cond.acquire() |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
152 | |
54 | 153 | |
76
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
154 | def _wakeup_callback(self): |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
155 | logger.debug("Rescheduling events after wakeup") |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
156 | with self._cond: |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
157 | self._resort() |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
158 | |
87
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
159 | # It is required to have acquired the lock on ev.cond on entry |
54 | 160 | def _wait(self, ev): |
161 | with self._cond: | |
162 | self._insert(ev) | |
87
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
163 | self._cond.notify() |
54 | 164 | |
87
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
165 | # This will release the lock on cond, allowing the scheduler |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
166 | # thread to notify us if we are ready to be released |
54 | 167 | ev.cond.wait() |
168 | ||
169 | # If we were woken up by some other event, not the scheduler, | |
170 | # ensure the event is removed | |
87
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
171 | if ev.linked: |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
172 | # Deal with race conditions wrt. the two different locks |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
173 | # in the scheduler |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
174 | #ev.cond.release() |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
175 | with self._cond: |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
176 | self._unlink(ev) |
a214d475aa28
Better recovery from errors; fixes to potential race conditions in scheduler and repository queue
Tuomo Valkonen <tuomov@iki.fi>
parents:
86
diff
changeset
|
177 | #ev.cond.acquire() |
49 | 178 | |
179 | # cond has to be acquired on entry! | |
180 | def wait_until(self, when, cond, name=None): | |
76
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
181 | logger.debug("Scheduling '%s' in %s seconds [%s]" % |
4b08fca3ce34
Dreamtime scheduling: discount system sleep periods
Tuomo Valkonen <tuomov@iki.fi>
parents:
75
diff
changeset
|
182 | (name, when.seconds_to(), when.__class__.__name__)) |
53
442c558bd632
Generalisation of scheduler thread to general queue threads
Tuomo Valkonen <tuomov@iki.fi>
parents:
49
diff
changeset
|
183 | self._wait(ScheduledEvent(when, cond, name)) |
49 | 184 |