163 success=success and self.borg_instance.wait() |
168 success=success and self.borg_instance.wait() |
164 |
169 |
165 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) |
170 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) |
166 |
171 |
167 with self.lock: |
172 with self.lock: |
168 if self.current_operation=='create': |
173 if self.current_operation['operation']=='create': |
169 self.lastrun=self.time_started |
174 self.lastrun=self.current_operation['when_monotonic'] |
170 self.lastrun_success=success |
175 self.lastrun_success=success |
171 self.thread_res=None |
176 self.thread_res=None |
172 self.__finish_and_reschedule_if_both_listeners_terminated() |
177 self.__finish_and_reschedule_if_both_listeners_terminated() |
|
178 status, callback=self.__status_unlocked() |
|
179 if callback: |
|
180 callback(self, status) |
173 |
181 |
174 def __finish_and_reschedule_if_both_listeners_terminated(self): |
182 def __finish_and_reschedule_if_both_listeners_terminated(self): |
175 if self.thread_res==None and self.thread_log==None: |
183 if self.thread_res==None and self.thread_log==None: |
176 logging.debug('Both threads terminated') |
184 logging.debug('Both threads terminated') |
177 self.borg_instance=None |
185 self.borg_instance=None |
178 self.time_started=None |
|
179 self.current_operation=None |
186 self.current_operation=None |
180 self.__schedule_unlocked() |
187 self.__schedule_unlocked() |
181 |
188 |
182 def __do_launch(self, queue, operation, archive_or_repository, *args): |
189 def __do_launch(self, queue, op, archive_or_repository, *args): |
183 inst=BorgInstance(operation, archive_or_repository, *args) |
190 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
184 inst.launch() |
191 inst.launch() |
185 |
192 |
186 t_log=Thread(target=self.__log_listener) |
193 t_log=Thread(target=self.__log_listener) |
187 t_log.daemon=True |
194 t_log.daemon=True |
188 |
195 |
191 |
198 |
192 self.thread_log=t_log |
199 self.thread_log=t_log |
193 self.thread_res=t_res |
200 self.thread_res=t_res |
194 self.borg_instance=inst |
201 self.borg_instance=inst |
195 self.queue=queue |
202 self.queue=queue |
196 self.current_operation=operation |
203 self.current_operation=op |
197 self.time_started=time.monotonic() |
204 self.current_operation['when_monotonic']=time.monotonic() |
198 |
205 |
199 t_log.start() |
206 t_log.start() |
200 t_res.start() |
207 t_res.start() |
201 |
208 |
202 def __launch(self, operation, queue): |
209 def __launch(self, op, queue): |
203 if self.__is_running_unlocked(): |
210 if self.__is_running_unlocked(): |
204 logging.info('Cannot start %s: already running %s' |
211 logging.info('Cannot start %s: already running %s' |
205 % (operation, self.current_operation)) |
212 % (operation, self.current_operation)) |
206 return False |
213 return False |
207 else: |
214 else: |
208 if self.timer: |
215 if self.timer: |
209 logging.debug('Unscheduling timed operation due to launch of operation') |
216 logging.debug('Unscheduling timed operation due to launch of operation') |
210 self.timer=None |
217 self.timer=None |
211 self.timer_operation=None |
218 self.scheduled_operation=None |
212 self.timer_time=None |
219 |
213 |
220 logging.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) |
214 logging.debug("Launching '%s' on '%s'" % (operation, self.name)) |
221 |
215 |
222 if op['operation']=='create': |
216 if operation=='create': |
|
217 archive="%s::%s%s" % (self.repository, |
223 archive="%s::%s%s" % (self.repository, |
218 self.archive_prefix, |
224 self.archive_prefix, |
219 self.archive_template) |
225 self.archive_template) |
220 |
226 |
221 self.__do_launch(queue, operation, archive, |
227 self.__do_launch(queue, op, archive, |
222 self.common_parameters+self.create_parameters, |
228 self.common_parameters+self.create_parameters, |
223 self.paths) |
229 self.paths) |
224 elif operation=='prune': |
230 elif op['operation']=='prune': |
225 self.__do_launch(queue, 'prune', self.repository, |
231 self.__do_launch(queue, op, self.repository, |
226 ([{'prefix': self.archive_prefix}] + |
232 ([{'prefix': self.archive_prefix}] + |
227 self.common_parameters + |
233 self.common_parameters + |
228 self.prune_parameters)) |
234 self.prune_parameters)) |
229 else: |
235 else: |
230 logging.error("Invalid operaton '%s'" % operation) |
236 logging.error("Invalid operaton '%s'" % op['operation']) |
231 self.__schedule_unlocked() |
237 self.__schedule_unlocked() |
232 |
238 |
233 return True |
239 return True |
234 |
240 |
235 def create(self, queue): |
241 def create(self, queue): |
236 with self.lock: |
242 op={'operation': 'create', 'detail': 'manual'} |
237 res=self.__launch('create', queue) |
243 with self.lock: |
|
244 res=self.__launch(op, queue) |
238 return res |
245 return res |
239 |
246 |
240 def prune(self, queue): |
247 def prune(self, queue): |
241 with self.lock: |
248 op={'operation': 'prune', 'detail': 'manual'} |
242 res=self.__launch('prune', queue) |
249 with self.lock: |
|
250 res=self.__launch(op, queue) |
243 return res |
251 return res |
244 |
252 |
245 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
253 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
246 def abort(self): |
254 def abort(self): |
247 with self.lock: |
255 with self.lock: |
272 |
280 |
273 assert(self.thread_log==None and self.thread_res==None) |
281 assert(self.thread_log==None and self.thread_res==None) |
274 |
282 |
275 def __queue_timed_operation(self): |
283 def __queue_timed_operation(self): |
276 with self.lock: |
284 with self.lock: |
277 operation=self.timer_operation |
285 op=self.scheduled_operation |
278 self.timer_operation=None |
286 self.scheduled_operation=None |
279 self.timer_time=None |
|
280 self.timer=None |
287 self.timer=None |
281 |
288 |
282 if self.__is_running_unlocked(): |
289 if self.__is_running_unlocked(): |
283 logging.info('Aborted queue operation, as an operation is already running') |
290 logging.info('Aborted queue operation, as an operation is already running') |
284 else: |
291 else: |
285 # TODO: Queue on 'repository' and online status for SSH, etc. |
292 # TODO: Queue on 'repository' and online status for SSH, etc. |
286 |
293 |
287 # TODO: UI comms. queue? |
294 # TODO: UI comms. queue? |
288 self.__launch(operation, None) |
295 self.__launch(op, None) |
289 |
|
290 def __schedule_unlocked(self): |
|
291 if self.current_operation: |
|
292 return self.current_operation, None |
|
293 else: |
|
294 operation, when=self.__next_operation_unlocked() |
|
295 |
|
296 if operation: |
|
297 now=time.monotonic() |
|
298 delay=max(0, when-now) |
|
299 logging.info("Scheduling '%s' of '%s' in %d seconds" % |
|
300 (operation, self.name, delay)) |
|
301 tmr=Timer(delay, self.__queue_timed_operation) |
|
302 self.timer_operation=operation |
|
303 self.timer_time=when |
|
304 self.timer=tmr |
|
305 tmr.start() |
|
306 |
|
307 return operation, time |
|
308 |
296 |
309 def __next_operation_unlocked(self): |
297 def __next_operation_unlocked(self): |
310 # TODO: pruning as well |
298 # TODO: pruning as well |
311 now=time.monotonic() |
299 now=time.monotonic() |
312 if not self.lastrun: |
300 if not self.lastrun: |
313 return 'create', now+self.retry_interval |
301 initial_interval=self.retry_interval |
|
302 if initial_interval==0: |
|
303 initial_interval=self.backup_interval |
|
304 if initial_interval==0: |
|
305 return None |
|
306 else: |
|
307 return {'operation': 'create', |
|
308 'detail': 'initial', |
|
309 'when_monotonic': now+initial_interval} |
314 elif not self.lastrun_success: |
310 elif not self.lastrun_success: |
315 return 'create', self.lastrun+self.retry_interval |
311 if self.retry_interval==0: |
|
312 return None |
|
313 else: |
|
314 return {'operation': 'create', |
|
315 'detail': 'retry', |
|
316 'when_monotonic': self.lastrun+self.retry_interval} |
316 else: |
317 else: |
317 if self.backup_interval==0: |
318 if self.backup_interval==0: |
318 return 'none', 0 |
319 return None |
319 else: |
320 else: |
320 return 'create', self.lastrun+self.backup_interval |
321 return {'operation': 'create', |
|
322 'detail': None, |
|
323 'when_monotonic': self.lastrun+self.backup_interval} |
|
324 |
|
325 def __schedule_unlocked(self): |
|
326 if self.current_operation: |
|
327 return self.current_operation |
|
328 else: |
|
329 op=self.__next_operation_unlocked() |
|
330 |
|
331 if op: |
|
332 now=time.monotonic() |
|
333 delay=max(0, op['when_monotonic']-now) |
|
334 logging.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
|
335 (op['operation'], op['detail'], self.name, delay)) |
|
336 tmr=Timer(delay, self.__queue_timed_operation) |
|
337 self.scheduled_operation=op |
|
338 self.timer=tmr |
|
339 tmr.start() |
|
340 |
|
341 return op |
321 |
342 |
322 def schedule(self): |
343 def schedule(self): |
323 with self.lock: |
344 with self.lock: |
324 return self.__schedule_unlocked() |
345 return self.__schedule_unlocked() |
325 |
346 |
326 |
347 def set_status_update_callback(self, callback): |
|
348 with self.lock: |
|
349 self.status_update_callback=callback |
|
350 |
|
351 def status(self): |
|
352 with self.lock: |
|
353 res=self.__status_unlocked() |
|
354 return res[0] |
|
355 |
|
356 def __status_unlocked(self): |
|
357 callback=self.status_update_callback |
|
358 if self.current_operation: |
|
359 status=self.current_operation |
|
360 status['type']='current' |
|
361 elif self.scheduled_operation: |
|
362 status=self.scheduled_operation |
|
363 status['type']='scheduled' |
|
364 else: |
|
365 status={'type': 'nothing'} |
|
366 |
|
367 status['name']=self.name |
|
368 |
|
369 if 'when_monotonic' in status: |
|
370 status['when']=(status['when_monotonic'] |
|
371 -time.monotonic()+time.time()) |
|
372 |
|
373 return status, callback |
|
374 |
|
375 |
|
376 |