borgend/scheduler.py

changeset 80
a409242121d5
parent 79
b075b3db3044
child 86
2fe66644c50d
equal deleted inserted replaced
79:b075b3db3044 80:a409242121d5
1 #
2 # Scheduler for Borgend
3 #
4 # This module simply provide a way for other threads to until a given time
5 #
6
7 import time
8 from threading import Condition, Thread
9
10 from . import loggers
11 from . import dreamtime
12
13 logger=loggers.get(__name__)
14
15 class QueuedEvent:
16 def __init__(self, cond, name=None):
17 self.next=None
18 self.prev=None
19 self.name=name
20 self.cond=cond
21
22 def __lt__(self, other):
23 raise NotImplementedError
24
25 def insert_after(self, ev):
26 if not self.next or ev<self.next:
27 self.insert_immediately_after(ev)
28 else:
29 self.next.insert_after(ev)
30
31 def insert_immediately_after(self, ev):
32 assert(ev.next is None and ev.prev is None)
33 ev.prev=self
34 ev.next=self.next
35 self.next=ev
36
37 def insert_immediately_before(self, ev):
38 assert(ev.next is None and ev.prev is None)
39 ev.next=self
40 ev.prev=self.prev
41 self.prev=ev
42
43 def unlink(self):
44 n=self.next
45 p=self.prev
46 if n:
47 n.prev=p
48 if p:
49 p.next=n
50 self.next=None
51 self.prev=None
52
53 class ScheduledEvent(QueuedEvent):
54 #@accepts(ScheduledEvent, dreamtime.Time, threading.Cond, str)
55 def __init__(self, when, cond, name=None):
56 super().__init__(cond, name=name)
57 self.when=when
58
59 def __lt__(self, other):
60 return self.when < other.when
61
62 class TerminableThread(Thread):
63 def __init__(self, *args, **kwargs):
64 super().__init__(*args, **kwargs)
65 self._terminate=False
66 self._cond=Condition()
67
68 def terminate(self):
69 with self._cond:
70 _terminate=True
71 self._cond.notify()
72
73 class QueueThread(TerminableThread):
74 def __init__(self, *args, **kwargs):
75 super().__init__(*args, **kwargs)
76 self.daemon = True
77 self._list = None
78
79 def _insert(self, ev):
80 if not self._list:
81 #logger.debug("Insert first")
82 self._list=ev
83 elif ev<self._list:
84 #logger.debug("Insert beginning")
85 self._list.insert_immediately_before(ev)
86 self._list=ev
87 else:
88 #logger.debug("Insert after")
89 self._list.insert_after(ev)
90
91 self._cond.notify()
92
93 def _unlink(self, ev):
94 if ev==self._list:
95 self._list=ev.next
96 ev.unlink()
97
98 def _resort(self):
99 oldlist=self._list
100 self._list=None
101 while oldlist:
102 ev=oldlist
103 oldlist=oldlist.next
104 ev.unlink()
105 self._insert(ev)
106
107
108
109 class Scheduler(QueueThread):
110 # Default to precision of 60 seconds: the scheduler thread will never
111 # sleep longer than that, to get quickly back on track with the schedule
112 # when the computer wakes up from sleep
113 def __init__(self, precision=60):
114 self.precision = precision
115 self._next_event_time = None
116 super().__init__(target = self._scheduler_thread, name = 'Scheduler')
117 dreamtime.add_callback(self, self._wakeup_callback)
118
119 def _scheduler_thread(self):
120 logger.debug("Scheduler thread started")
121 with self._cond:
122 while not self._terminate:
123 now = time.monotonic()
124 if not self._list:
125 timeout = None
126 else:
127 # Wait at most precision seconds, or until next event if it
128 # comes earlier
129 timeout=min(self.precision, self._list.when.realtime()-now)
130
131 if not timeout or timeout>0:
132 logger.debug("Scheduler waiting %d seconds" % (timeout or (-1)))
133 self._cond.wait(timeout)
134 now = time.monotonic()
135
136 logger.debug("Scheduler timed out")
137
138 while self._list and self._list.when.monotonic() <= now:
139 ev=self._list
140 logger.debug("Scheduler activating %s" % (ev.name or "(unknown)"))
141 # We are only allowed to remove ev from list when ev.cond allows
142 with ev.cond:
143 self._list=ev.next
144 ev.unlink()
145 ev.cond.notifyAll()
146
147 def _wakeup_callback(self):
148 logger.debug("Rescheduling events after wakeup")
149 with self._cond:
150 self._resort()
151
152 def _wait(self, ev):
153 with self._cond:
154 self._insert(ev)
155
156 # This will release the lock on cond, allowing queue manager (scheduler)
157 # thread to notify us if we are already to be released
158 ev.cond.wait()
159
160 # If we were woken up by some other event, not the scheduler,
161 # ensure the event is removed
162 with self._cond:
163 self._unlink(ev)
164
165 # cond has to be acquired on entry!
166 def wait_until(self, when, cond, name=None):
167 logger.debug("Scheduling '%s' in %s seconds [%s]" %
168 (name, when.seconds_to(), when.__class__.__name__))
169 self._wait(ScheduledEvent(when, cond, name))
170

mercurial