| 5 import config |
5 import config |
| 6 import logging |
6 import logging |
| 7 import time |
7 import time |
| 8 from instance import BorgInstance |
8 from instance import BorgInstance |
| 9 from queue import Queue |
9 from queue import Queue |
| 10 from threading import Thread, Lock |
10 from threading import Thread, Lock, Timer |
| 11 |
11 |
| 12 loglevel_translation={ |
12 loglevel_translation={ |
| 13 'CRITICAL': logging.CRITICAL, |
13 'CRITICAL': logging.CRITICAL, |
| 14 'ERROR': logging.ERROR, |
14 'ERROR': logging.ERROR, |
| 15 'WARNING': logging.WARNING, |
15 'WARNING': logging.WARNING, |
| 73 self.lastrun=None |
73 self.lastrun=None |
| 74 self.lastrun_success=None |
74 self.lastrun_success=None |
| 75 self.borg_instance=None |
75 self.borg_instance=None |
| 76 self.current_operation=None |
76 self.current_operation=None |
| 77 self.thread_log=None |
77 self.thread_log=None |
| 78 self.thread_err=None |
78 self.thread_res=None |
| |
79 self.timer=None |
| |
80 self.timer_operation=None |
| |
81 self.timer_time=None |
| 79 self.lock=Lock() |
82 self.lock=Lock() |
| 80 |
83 |
| 81 def is_running(self): |
84 def is_running(self): |
| 82 with self.lock: |
85 with self.lock: |
| 83 running=self.borg_instance or self.thread_log or self.thread_err |
86 running=self.__is_running_unlocked() |
| |
87 return running |
| |
88 |
| |
89 def __is_running_unlocked(self): |
| |
90 running=self.current_operation |
| |
91 if not running: |
| |
92 # Consistency check |
| |
93 assert((not self.borg_instance and not self.thread_log and |
| |
94 not self.thread_res)) |
| 84 return running |
95 return running |
| 85 |
96 |
| 86 def __block_when_running(self): |
97 def __block_when_running(self): |
| 87 running=self.is_running() |
98 running=self.is_running() |
| 88 assert(not running) |
99 assert(not running) |
| 122 elif t=='exception': |
133 elif t=='exception': |
| 123 success=False |
134 success=False |
| 124 elif t=='unparsed_error': |
135 elif t=='unparsed_error': |
| 125 success=False |
136 success=False |
| 126 |
137 |
| 127 #if (may_indicate_finished and 'finished' in status and |
|
| 128 # status['finished']): |
|
| 129 # logging.info('Borg subprocess finished succesfully') |
|
| 130 # success=status['finished'] |
|
| 131 |
|
| 132 logging.debug('Waiting for borg subprocess to terminate in log thread') |
138 logging.debug('Waiting for borg subprocess to terminate in log thread') |
| 133 |
139 |
| 134 self.borg_instance.wait() |
140 self.borg_instance.wait() |
| 135 |
141 |
| 136 logging.debug('Borg subprocess terminated; terminating log listener thread') |
142 logging.debug('Borg subprocess terminated; terminating log listener thread') |
| 137 |
143 |
| 138 with self.lock: |
144 with self.lock: |
| 139 self.thread_log=None |
145 self.thread_log=None |
| 140 self.__cleanup_if_both_listeners_terminated() |
146 self.__finish_and_reschedule_if_both_listeners_terminated() |
| 141 |
147 |
| 142 |
148 |
| 143 def __result_listener(self): |
149 def __result_listener(self): |
| 144 logging.debug('Result listener thread waiting for result') |
150 logging.debug('Result listener thread waiting for result') |
| 145 |
151 |
| 152 if res==None: |
158 if res==None: |
| 153 success=False |
159 success=False |
| 154 |
160 |
| 155 logging.debug('Waiting for borg subprocess to terminate in result thread') |
161 logging.debug('Waiting for borg subprocess to terminate in result thread') |
| 156 |
162 |
| 157 self.borg_instance.wait() |
163 success=success and self.borg_instance.wait() |
| 158 |
164 |
| 159 logging.debug('Borg subprocess terminated; terminating result listener thread') |
165 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) |
| 160 |
166 |
| 161 with self.lock: |
167 with self.lock: |
| 162 if self.current_operation=='create': |
168 if self.current_operation=='create': |
| 163 self.lastrun=self.time_started |
169 self.lastrun=self.time_started |
| 164 self.lastrun_success=success |
170 self.lastrun_success=success |
| 165 self.thread_res=None |
171 self.thread_res=None |
| 166 self.__cleanup_if_both_listeners_terminated() |
172 self.__finish_and_reschedule_if_both_listeners_terminated() |
| 167 |
173 |
| 168 def __cleanup_if_both_listeners_terminated(self): |
174 def __finish_and_reschedule_if_both_listeners_terminated(self): |
| 169 if self.thread_res==None and self.thread_log==None: |
175 if self.thread_res==None and self.thread_log==None: |
| 170 logging.debug('Both threads terminated') |
176 logging.debug('Both threads terminated') |
| 171 self.borg_instance=None |
177 self.borg_instance=None |
| |
178 self.time_started=None |
| 172 self.current_operation=None |
179 self.current_operation=None |
| 173 self.time_started=None |
180 self.__schedule_unlocked() |
| 174 |
181 |
| 175 def __launch(self, queue, operation, archive_or_repository, *args): |
182 def __do_launch(self, queue, operation, archive_or_repository, *args): |
| 176 |
|
| 177 inst=BorgInstance(operation, archive_or_repository, *args) |
183 inst=BorgInstance(operation, archive_or_repository, *args) |
| 178 inst.launch() |
184 inst.launch() |
| 179 |
185 |
| 180 t_log=Thread(target=self.__log_listener) |
186 t_log=Thread(target=self.__log_listener) |
| 181 t_log.daemon=True |
187 t_log.daemon=True |
| 191 self.time_started=time.monotonic() |
197 self.time_started=time.monotonic() |
| 192 |
198 |
| 193 t_log.start() |
199 t_log.start() |
| 194 t_res.start() |
200 t_res.start() |
| 195 |
201 |
| |
202 def __launch(self, operation, queue): |
| |
203 if self.__is_running_unlocked(): |
| |
204 logging.info('Cannot start %s: already running %s' |
| |
205 % (operation, self.current_operation)) |
| |
206 return False |
| |
207 else: |
| |
208 if self.timer: |
| |
209 logging.debug('Unscheduling timed operation due to launch of operation') |
| |
210 self.timer=None |
| |
211 self.timer_operation=None |
| |
212 self.timer_time=None |
| |
213 |
| |
214 logging.debug("Launching '%s' on '%s'" % (operation, self.name)) |
| |
215 |
| |
216 if operation=='create': |
| |
217 archive="%s::%s%s" % (self.repository, |
| |
218 self.archive_prefix, |
| |
219 self.archive_template) |
| |
220 |
| |
221 self.__do_launch(queue, operation, archive, |
| |
222 self.common_parameters+self.create_parameters, |
| |
223 self.paths) |
| |
224 elif operation=='prune': |
| |
225 self.__do_launch(queue, 'prune', self.repository, |
| |
226 ([{'prefix': self.archive_prefix}] + |
| |
227 self.common_parameters + |
| |
228 self.prune_parameters)) |
| |
229 else: |
| |
230 logging.error("Invalid operaton '%s'" % operation) |
| |
231 self.__schedule_unlocked() |
| |
232 |
| |
233 return True |
| |
234 |
| 196 def create(self, queue): |
235 def create(self, queue): |
| 197 self.__block_when_running() |
236 with self.lock: |
| 198 |
237 res=self.__launch('create', queue) |
| 199 archive="%s::%s%s" % (self.repository, |
238 return res |
| 200 self.archive_prefix, |
|
| 201 self.archive_template) |
|
| 202 |
|
| 203 self.__launch(queue, 'create', archive, |
|
| 204 self.common_parameters+self.create_parameters, |
|
| 205 self.paths) |
|
| 206 |
239 |
| 207 def prune(self, queue): |
240 def prune(self, queue): |
| 208 self.__block_when_running() |
241 with self.lock: |
| 209 self.__launch(queue, 'prune', self.repository, |
242 res=self.__launch('prune', queue) |
| 210 ([{'prefix': self.archive_prefix}] + |
243 return res |
| 211 self.common_parameters + |
|
| 212 self.prune_parameters)) |
|
| 213 |
244 |
| 214 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
245 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
| 215 def abort(self): |
246 def abort(self): |
| 216 with self.lock: |
247 with self.lock: |
| 217 if self.borg_instance: |
248 if self.borg_instance: |
| 239 if thread_res: |
270 if thread_res: |
| 240 thread_res.join() |
271 thread_res.join() |
| 241 |
272 |
| 242 assert(self.thread_log==None and self.thread_res==None) |
273 assert(self.thread_log==None and self.thread_res==None) |
| 243 |
274 |
| 244 def next_action(): |
275 def __queue_timed_operation(self): |
| 245 __block_when_running() |
276 with self.lock: |
| 246 # TODO pruning as well |
277 operation=self.timer_operation |
| |
278 self.timer_operation=None |
| |
279 self.timer_time=None |
| |
280 self.timer=None |
| |
281 |
| |
282 if self.__is_running_unlocked(): |
| |
283 logging.info('Aborted queue operation, as an operation is already running') |
| |
284 else: |
| |
285 # TODO: Queue on 'repository' and online status for SSH, etc. |
| |
286 |
| |
287 # TODO: UI comms. queue? |
| |
288 self.__launch(operation, None) |
| |
289 |
| |
290 def __schedule_unlocked(self): |
| |
291 if self.current_operation: |
| |
292 return self.current_operation, None |
| |
293 else: |
| |
294 operation, when=self.__next_operation_unlocked() |
| |
295 |
| |
296 if operation: |
| |
297 now=time.monotonic() |
| |
298 delay=max(0, when-now) |
| |
299 logging.info("Scheduling '%s' of '%s' in %d seconds" % |
| |
300 (operation, self.name, delay)) |
| |
301 tmr=Timer(delay, self.__queue_timed_operation) |
| |
302 self.timer_operation=operation |
| |
303 self.timer_time=when |
| |
304 self.timer=tmr |
| |
305 tmr.start() |
| |
306 |
| |
307 return operation, time |
| |
308 |
| |
309 def __next_operation_unlocked(self): |
| |
310 # TODO: pruning as well |
| 247 now=time.monotonic() |
311 now=time.monotonic() |
| 248 if not self.lastrun: |
312 if not self.lastrun: |
| 249 return 'create', now+self.retry_interval |
313 return 'create', now+self.retry_interval |
| 250 elif not self.lastrun_success: |
314 elif not self.lastrun_success: |
| 251 return 'create', self.lastrun+self.retry_interval |
315 return 'create', self.lastrun+self.retry_interval |