103 |
106 |
104 def extract_passphrase(self): |
107 def extract_passphrase(self): |
105 acc=self.__keychain_account |
108 acc=self.__keychain_account |
106 if not self.__passphrase: |
109 if not self.__passphrase: |
107 if acc and acc!='': |
110 if acc and acc!='': |
108 logger.debug('Requesting passphrase') |
111 self.logger.debug('Requesting passphrase') |
109 try: |
112 try: |
110 pw=keyring.get_password("borg-backup", acc) |
113 pw=keyring.get_password("borg-backup", acc) |
111 except Exception as err: |
114 except Exception as err: |
112 logger.error('Failed to retrieve passphrase') |
115 self.logger.error('Failed to retrieve passphrase') |
113 raise err |
116 raise err |
114 else: |
117 else: |
115 logger.debug('Received passphrase') |
118 self.logger.debug('Received passphrase') |
116 self.__passphrase=pw |
119 self.__passphrase=pw |
117 else: |
120 else: |
118 self.__passphrase=None |
121 self.__passphrase=None |
119 return self.__passphrase |
122 return self.__passphrase |
120 |
123 |
121 def __init__(self, identifier, cfg, scheduler): |
124 def __init__(self, identifier, cfg, scheduler): |
122 self.identifier=identifier |
125 self.identifier=identifier |
123 self.config=config |
126 self.config=config |
124 self.lastrun_when=None |
127 self.__status_update_callback=None |
|
128 self.scheduler=scheduler |
|
129 self.logger=None # setup up __decode_config once backup name is known |
|
130 |
125 self.borg_instance=None |
131 self.borg_instance=None |
126 self.current_operation=None |
|
127 self.thread_log=None |
132 self.thread_log=None |
128 self.thread_res=None |
133 self.thread_res=None |
|
134 |
|
135 self.current_operation=None |
129 self.scheduled_operation=None |
136 self.scheduled_operation=None |
130 self.__status_update_callback=None |
137 self.lastrun_when=None |
131 self.state=INACTIVE |
138 self.state=INACTIVE |
132 self.scheduler=scheduler |
|
133 |
139 |
134 self.__decode_config(cfg) |
140 self.__decode_config(cfg) |
135 |
141 |
136 super().__init__(target = self.__main_thread, name = self._name) |
142 super().__init__(target = self.__main_thread, name = self._name) |
137 self.daemon=True |
143 self.daemon=True |
239 |
246 |
240 # If there were no errors, reset back to INACTIVE state |
247 # If there were no errors, reset back to INACTIVE state |
241 if state==ACTIVE: |
248 if state==ACTIVE: |
242 state=INACTIVE |
249 state=INACTIVE |
243 |
250 |
244 logger.debug('Borg result: %s' % str(res)) |
251 self.logger.debug('Borg result: %s' % str(res)) |
245 |
252 |
246 if res is None and state==INACTIVE: |
253 if res is None and state==INACTIVE: |
247 logger.error('No result from borg despite no error in log') |
254 self.logger.error('No result from borg despite no error in log') |
248 state=ERRORS |
255 state=ERRORS |
249 |
256 |
250 logger.debug('Waiting for borg subprocess to terminate in result thread') |
257 self.logger.debug('Waiting for borg subprocess to terminate in result thread') |
251 |
258 |
252 if not self.borg_instance.wait(): |
259 if not self.borg_instance.wait(): |
253 logger.critical('Borg subprocess did not terminate') |
260 self.logger.error('Borg subprocess did not terminate') |
254 state=combine_state(state, ERRORS) |
261 state=combine_state(state, ERRORS) |
255 |
262 |
256 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
263 self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
257 |
264 |
258 with self._cond: |
265 with self._cond: |
259 if self.current_operation['operation']=='create': |
266 if self.current_operation['operation']=='create': |
260 self.lastrun_when=self.current_operation['when_monotonic'] |
267 self.lastrun_when=self.current_operation['when_monotonic'] |
261 self.thread_res=None |
268 self.thread_res=None |
262 self.thread_log=None |
269 self.thread_log=None |
263 self.borg_instance=None |
270 self.borg_instance=None |
264 self.current_operation=None |
271 self.current_operation=None |
265 self.state=state |
272 self.state=state |
|
273 self.__update_status() |
266 self._cond.notify() |
274 self._cond.notify() |
267 |
275 |
268 def __do_launch(self, op, archive_or_repository, *args): |
276 def __do_launch(self, op, archive_or_repository, *args): |
269 passphrase=self.extract_passphrase() |
277 passphrase=self.extract_passphrase() |
270 |
278 |
271 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
279 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
272 inst.launch(passphrase=passphrase) |
280 inst.launch(passphrase=passphrase) |
273 |
281 |
274 logger.debug('Creating listener threads') |
282 self.logger.debug('Creating listener threads') |
275 |
283 |
276 t_log=Thread(target=self.__log_listener) |
284 t_log=Thread(target=self.__log_listener) |
277 t_log.daemon=True |
285 t_log.daemon=True |
278 |
286 |
279 t_res=Thread(target=self.__result_listener) |
287 t_res=Thread(target=self.__result_listener) |
280 t_res.daemon=True |
288 t_res.daemon=True |
281 |
289 |
282 self.thread_log=t_log |
290 self.thread_log=t_log |
283 self.thread_res=t_res |
291 self.thread_res=t_res |
284 self.borg_instance=inst |
292 self.borg_instance=inst |
285 self.current_operation=op |
|
286 self.current_operation['when_monotonic']=time.monotonic() |
|
287 self.state=ACTIVE |
|
288 |
293 |
289 t_log.start() |
294 t_log.start() |
290 t_res.start() |
295 t_res.start() |
291 |
296 |
292 def __launch(self, op): |
297 def __launch(self, op): |
293 if self.__is_running_unlocked(): |
298 self.logger.debug("Launching '%s'" % op['operation']) |
294 logging.info('Cannot start %s: already running %s' |
299 |
295 % (operation, self.current_operation)) |
300 if op['operation']=='create': |
296 return False |
301 archive="%s::%s%s" % (self.repository.repository_name, |
|
302 self.archive_prefix, |
|
303 self.archive_template) |
|
304 |
|
305 self.__do_launch(op, archive, |
|
306 self.common_parameters+self.create_parameters, |
|
307 self.paths) |
|
308 elif op['operation']=='prune': |
|
309 self.__do_launch(op, self.repository.repository_name, |
|
310 ([{'prefix': self.archive_prefix}] + |
|
311 self.common_parameters + |
|
312 self.prune_parameters)) |
297 else: |
313 else: |
|
314 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
|
315 |
|
316 def __launch_check(self): |
|
317 op=self.scheduled_operation |
|
318 if not op: |
|
319 self.logger.debug("Queued operation aborted") |
|
320 else: |
|
321 self.scheduled_operation=None |
|
322 self.current_operation=op |
|
323 self.current_operation['when_monotonic']=time.monotonic() |
|
324 |
|
325 self.__launch(op) |
|
326 |
|
327 self.state=ACTIVE |
|
328 self.__update_status() |
|
329 |
|
330 |
|
331 def __main_thread(self): |
|
332 with self._cond: |
298 try: |
333 try: |
299 logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) |
334 while not self._terminate: |
300 |
335 self.__main_thread_wait_finish() |
301 if op['operation']=='create': |
336 if not self._terminate: |
302 archive="%s::%s%s" % (self.repository.repository_name, |
337 self.__main_thread_wait_schedule() |
303 self.archive_prefix, |
338 if not self._terminate: |
304 self.archive_template) |
339 self.__main_thread_queue_and_launch() |
305 |
|
306 self.__do_launch(op, archive, |
|
307 self.common_parameters+self.create_parameters, |
|
308 self.paths) |
|
309 elif op['operation']=='prune': |
|
310 self.__do_launch(op, self.repository.repository_name, |
|
311 ([{'prefix': self.archive_prefix}] + |
|
312 self.common_parameters + |
|
313 self.prune_parameters)) |
|
314 else: |
|
315 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
|
316 except Exception as err: |
340 except Exception as err: |
317 logger.debug('Rescheduling after failure') |
341 self.logger.exception("Error with backup '%s'" % self._name) |
318 self.lastrun_when=time.monotonic() |
342 self.lastrun_when=time.monotonic() |
319 self.state=ERRORS |
343 self.state=ERRORS |
320 raise err |
344 self.scheduled_operation=None |
321 |
345 |
322 return True |
346 # Clean up to terminate: kill borg instance and communication threads |
323 |
347 if self.borg_instance: |
324 def create(self): |
348 self.logger.debug("Terminating a borg instance") |
325 op={'operation': 'create', 'detail': 'manual'} |
349 self.borg_instance.terminate() |
326 with self._cond: |
350 |
|
351 # Store threads to use outside lock |
|
352 thread_log=self.thread_log |
|
353 thread_res=self.thread_res |
|
354 |
|
355 self.logger.debug("Waiting for log and result threads to terminate") |
|
356 |
|
357 if thread_log: |
|
358 thread_log.join() |
|
359 |
|
360 if thread_res: |
|
361 thread_res.join() |
|
362 |
|
363 |
|
364 # Main thread/1. Wait while a current operation is running |
|
365 def __main_thread_wait_finish(self): |
|
366 while self.current_operation and not self._terminate: |
|
367 self.logger.debug("Waiting for current operation to finish") |
|
368 self._cond.wait() |
|
369 |
|
370 # Main thread/2. Schedule next operation if there is no manually |
|
371 # requested one |
|
372 def __main_thread_wait_schedule(self): |
|
373 op=None |
|
374 if not self.scheduled_operation: |
|
375 op=self.__next_operation_unlocked() |
|
376 if op: |
|
377 now=time.monotonic() |
|
378 delay=max(0, op['when_monotonic']-now) |
|
379 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % |
|
380 (op['operation'], op['detail'], delay)) |
|
381 |
327 self.scheduled_operation=op |
382 self.scheduled_operation=op |
328 self._cond.notify() |
383 self.state=combine_state(self.state, SCHEDULED) |
329 |
384 self.__update_status() |
330 def prune(self): |
385 |
331 op={'operation': 'prune', 'detail': 'manual'} |
386 # Wait under scheduled wait |
332 with self._cond: |
387 self.scheduler.wait_until(now+delay, self._cond, self._name) |
333 self.scheduled_operation=op |
388 else: |
334 self._cond.notify() |
389 # Nothing scheduled - just wait |
335 |
390 self.logger.debug("Waiting for manual scheduling") |
336 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
391 self._cond.wait() |
337 def abort(self): |
392 |
338 with self._cond: |
393 # Main thread/3. If there is a scheduled operation (it might have been |
339 if self.borg_instance: |
394 # changed manually from 'op' created in __main_thread_wait_schedule above), |
340 self.borg_instance.terminate() |
395 # queue it on the repository, and launch the operation once repository |
|
396 # available |
|
397 def __main_thread_queue_and_launch(self): |
|
398 if self.scheduled_operation: |
|
399 self.logger.debug("Queuing") |
|
400 self.state=combine_state(self.state, QUEUED) |
|
401 self.__update_status() |
|
402 self.repository.queue_action(self._cond, |
|
403 action=self.__launch_check, |
|
404 name=self._name) |
341 |
405 |
342 def __next_operation_unlocked(self): |
406 def __next_operation_unlocked(self): |
343 # TODO: pruning as well |
407 # TODO: pruning as well |
344 now=time.monotonic() |
408 now=time.monotonic() |
345 if not self.lastrun_when: |
409 if not self.lastrun_when: |
365 else: |
429 else: |
366 return {'operation': 'create', |
430 return {'operation': 'create', |
367 'detail': 'normal', |
431 'detail': 'normal', |
368 'when_monotonic': self.lastrun_when+self.backup_interval} |
432 'when_monotonic': self.lastrun_when+self.backup_interval} |
369 |
433 |
370 def __main_thread(self): |
|
371 with self._cond: |
|
372 while not self._terminate: |
|
373 op=None |
|
374 if not self.current_operation: |
|
375 op=self.__next_operation_unlocked() |
|
376 if not op: |
|
377 self.__update_status() |
|
378 self._cond.wait() |
|
379 else: |
|
380 now=time.monotonic() |
|
381 delay=max(0, op['when_monotonic']-now) |
|
382 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
|
383 (op['operation'], op['detail'], self._name, delay)) |
|
384 |
|
385 self.scheduled_operation=op |
|
386 self.state=combine_state(self.state, SCHEDULED) |
|
387 |
|
388 self.__update_status() |
|
389 self.scheduler.wait_until(now+delay, self._cond, self._name) |
|
390 |
|
391 if self.scheduled_operation: |
|
392 op=self.scheduled_operation |
|
393 self.scheduled_operation=None |
|
394 self.repository.queue_action(self._cond, name=self._name, |
|
395 action=lambda: self.__launch(op)) |
|
396 # Kill a running borg to cause log and result threads to terminate |
|
397 if self.borg_instance: |
|
398 logger.debug("Terminating a borg instance") |
|
399 self.borg_instance.terminate() |
|
400 |
|
401 # Store threads to use outside lock |
|
402 thread_log=self.thread_log |
|
403 thread_err=self.thread_err |
|
404 |
|
405 logger.debug("Waiting for log and result threads to terminate") |
|
406 |
|
407 if thread_log: |
|
408 thread_log.join() |
|
409 |
|
410 if thread_res: |
|
411 thread_res.join() |
|
412 |
|
413 |
|
414 def __status_unlocked(self): |
434 def __status_unlocked(self): |
415 callback=self.__status_update_callback |
435 callback=self.__status_update_callback |
416 |
436 |
417 if self.current_operation: |
437 if self.current_operation: |
418 status=self.current_operation |
438 status=self.current_operation |
419 status['type']='current' |
439 status['type']='current' |
420 # Errors should be set by listeners |
440 # Errors should be set by listeners |
421 else: |
441 else: |
422 if self.scheduled_operation: |
442 if self.scheduled_operation: |
423 status=self.scheduled_operation |
443 status=self.scheduled_operation |
424 status['type']='scheduled' |
444 if self.state==QUEUED: |
|
445 status['type']='queued' |
|
446 else: |
|
447 status['type']='scheduled' |
425 else: |
448 else: |
426 status={'type': 'nothing'} |
449 status={'type': 'nothing'} |
427 |
450 |
428 status['name']=self._name |
451 status['name']=self._name |
429 status['state']=self.state |
452 status['state']=self.state |