439 self.logger.info('borg info: Could not discover a previous backup') |
439 self.logger.info('borg info: Could not discover a previous backup') |
440 |
440 |
441 def __do_launch(self, op, archive_or_repository, |
441 def __do_launch(self, op, archive_or_repository, |
442 common_params, op_params, paths=[]): |
442 common_params, op_params, paths=[]): |
443 |
443 |
444 self.logger.debug('Creating BorgInstance') |
444 # Set up current_operation here so errors can be added to it in |
445 |
445 # __main_thread if there is an exception |
446 inst=BorgInstance(op.type, archive_or_repository, |
|
447 common_params, op_params, paths) |
|
448 |
|
449 self.logger.debug('Launching BorgInstance via repository') |
|
450 |
|
451 # Only the Repository object has access to the passphrase |
|
452 self.repository.launch_borg_instance(inst) |
|
453 |
|
454 self.logger.debug('Creating listener threads') |
|
455 |
|
456 t_log=Thread(target=self.__log_listener) |
|
457 t_log.daemon=True |
|
458 |
|
459 t_res=Thread(target=self.__result_listener) |
|
460 t_res.daemon=True |
|
461 |
|
462 self.thread_log=t_log |
|
463 self.thread_res=t_res |
|
464 self.borg_instance=inst |
|
465 self.current_operation=op |
446 self.current_operation=op |
466 # Update scheduled time to real starting time to schedule |
447 # Update scheduled time to real starting time to schedule |
467 # next run relative to this |
448 # next run relative to this |
468 self.current_operation.start_time=MonotonicTime.now() |
449 self.current_operation.start_time=MonotonicTime.now() |
|
450 |
|
451 self.logger.debug('Creating BorgInstance') |
|
452 |
|
453 inst=BorgInstance(op.type, archive_or_repository, |
|
454 common_params, op_params, paths) |
|
455 |
|
456 self.logger.debug('Launching BorgInstance via repository') |
|
457 |
|
458 # Only the Repository object has access to the passphrase |
|
459 self.repository.launch_borg_instance(inst) |
|
460 |
|
461 self.logger.debug('Creating listener threads') |
|
462 |
|
463 t_log=Thread(target=self.__log_listener) |
|
464 t_log.daemon=True |
|
465 |
|
466 t_res=Thread(target=self.__result_listener) |
|
467 t_res.daemon=True |
|
468 |
|
469 self.thread_log=t_log |
|
470 self.thread_res=t_res |
|
471 self.borg_instance=inst |
469 # Reset error status when starting a new operation |
472 # Reset error status when starting a new operation |
470 self.__update_status(State.ACTIVE) |
473 self.__update_status(State.ACTIVE) |
471 |
474 |
472 t_log.start() |
475 t_log.start() |
473 t_res.start() |
476 t_res.start() |
474 |
|
475 |
477 |
476 def __launch(self, op): |
478 def __launch(self, op): |
477 self.logger.info("Launching '%s'" % str(op.type)) |
479 self.logger.info("Launching '%s'" % str(op.type)) |
478 |
480 |
479 params=(config.borg_parameters |
481 params=(config.borg_parameters |
511 |
513 |
512 def _terminate_or_pause(self): |
514 def _terminate_or_pause(self): |
513 return self._terminate or self._pause |
515 return self._terminate or self._pause |
514 |
516 |
515 def __wait_finish(self): |
517 def __wait_finish(self): |
516 current=self.current_operation |
|
517 |
|
518 # Wait for main logger thread to terminate, or for us to be terminated |
518 # Wait for main logger thread to terminate, or for us to be terminated |
519 while not self._terminate_or_pause() and self.thread_res.is_alive(): |
519 while not self._terminate_or_pause() and self.thread_res.is_alive(): |
520 self._cond.release() |
520 self._cond.release() |
521 # Maybe wait for borg instead? |
521 # Maybe wait for borg instead? |
522 self.thread_res.join(JOIN_TIMEOUT) |
522 self.thread_res.join(JOIN_TIMEOUT) |
536 self.thread_log.join() |
536 self.thread_log.join() |
537 self._cond.acquire() |
537 self._cond.acquire() |
538 |
538 |
539 if not self.borg_instance.wait(): |
539 if not self.borg_instance.wait(): |
540 self.logger.error('Borg subprocess did not terminate') |
540 self.logger.error('Borg subprocess did not terminate') |
541 curent.add_error(Errors.ERRORS) |
541 self.current_operation.add_error(Errors.ERRORS) |
|
542 |
|
543 self.thread_res=None |
|
544 self.thread_log=None |
|
545 self.borg_instance=None |
|
546 |
|
547 self.__mark_current_finished() |
|
548 |
|
549 def __mark_current_finished(self): |
|
550 current=self.current_operation |
|
551 |
|
552 assert(current) |
542 |
553 |
543 current.finish_time=MonotonicTime.now() |
554 current.finish_time=MonotonicTime.now() |
544 |
555 |
545 self.previous_operation_of_type[current.type]=current |
556 self.previous_operation_of_type[current.type]=current |
546 self.previous_operation=current |
557 self.previous_operation=current |
547 self.current_operation=None |
558 self.current_operation=None |
548 self.thread_res=None |
|
549 self.thread_log=None |
|
550 self.borg_instance=None |
|
551 |
559 |
552 @protect_noreturn |
560 @protect_noreturn |
553 def __main_thread(self): |
561 def __main_thread(self): |
554 with self._cond: |
562 with self._cond: |
555 while not self._terminate: |
563 while not self._terminate: |
559 if ((not self._terminate_or_pause()) and self.scheduled_operation |
567 if ((not self._terminate_or_pause()) and self.scheduled_operation |
560 and self.scheduled_operation.start_time <= MonotonicTime.now()): |
568 and self.scheduled_operation.start_time <= MonotonicTime.now()): |
561 self.__main_thread_queue_and_launch() |
569 self.__main_thread_queue_and_launch() |
562 except Exception as err: |
570 except Exception as err: |
563 self.logger.exception("Exception in backup '%s'" % self.backup_name) |
571 self.logger.exception("Exception in backup '%s'" % self.backup_name) |
|
572 current=self.current_operation |
|
573 if current: |
|
574 current.add_error(Errors.ERRORS) |
|
575 self.__mark_current_finished() |
564 finally: |
576 finally: |
565 self.__cleanup() |
577 self.__cleanup() |
566 |
578 |
567 def __cleanup(self): |
579 def __cleanup(self): |
568 thread_log=self.thread_log |
580 thread_log=self.thread_log |