borgend/backup.py

changeset 119
e5f29271089d
parent 116
4a56dda09745
child 146
e79cc90c7798
equal deleted inserted replaced
118:aadd60a24bc8 119:e5f29271089d
439 self.logger.info('borg info: Could not discover a previous backup') 439 self.logger.info('borg info: Could not discover a previous backup')
440 440
441 def __do_launch(self, op, archive_or_repository, 441 def __do_launch(self, op, archive_or_repository,
442 common_params, op_params, paths=[]): 442 common_params, op_params, paths=[]):
443 443
444 self.logger.debug('Creating BorgInstance') 444 # Set up current_operation here so errors can be added to it in
445 445 # __main_thread if there is an exception
446 inst=BorgInstance(op.type, archive_or_repository,
447 common_params, op_params, paths)
448
449 self.logger.debug('Launching BorgInstance via repository')
450
451 # Only the Repository object has access to the passphrase
452 self.repository.launch_borg_instance(inst)
453
454 self.logger.debug('Creating listener threads')
455
456 t_log=Thread(target=self.__log_listener)
457 t_log.daemon=True
458
459 t_res=Thread(target=self.__result_listener)
460 t_res.daemon=True
461
462 self.thread_log=t_log
463 self.thread_res=t_res
464 self.borg_instance=inst
465 self.current_operation=op 446 self.current_operation=op
466 # Update scheduled time to real starting time to schedule 447 # Update scheduled time to real starting time to schedule
467 # next run relative to this 448 # next run relative to this
468 self.current_operation.start_time=MonotonicTime.now() 449 self.current_operation.start_time=MonotonicTime.now()
450
451 self.logger.debug('Creating BorgInstance')
452
453 inst=BorgInstance(op.type, archive_or_repository,
454 common_params, op_params, paths)
455
456 self.logger.debug('Launching BorgInstance via repository')
457
458 # Only the Repository object has access to the passphrase
459 self.repository.launch_borg_instance(inst)
460
461 self.logger.debug('Creating listener threads')
462
463 t_log=Thread(target=self.__log_listener)
464 t_log.daemon=True
465
466 t_res=Thread(target=self.__result_listener)
467 t_res.daemon=True
468
469 self.thread_log=t_log
470 self.thread_res=t_res
471 self.borg_instance=inst
469 # Reset error status when starting a new operation 472 # Reset error status when starting a new operation
470 self.__update_status(State.ACTIVE) 473 self.__update_status(State.ACTIVE)
471 474
472 t_log.start() 475 t_log.start()
473 t_res.start() 476 t_res.start()
474
475 477
476 def __launch(self, op): 478 def __launch(self, op):
477 self.logger.info("Launching '%s'" % str(op.type)) 479 self.logger.info("Launching '%s'" % str(op.type))
478 480
479 params=(config.borg_parameters 481 params=(config.borg_parameters
511 513
512 def _terminate_or_pause(self): 514 def _terminate_or_pause(self):
513 return self._terminate or self._pause 515 return self._terminate or self._pause
514 516
515 def __wait_finish(self): 517 def __wait_finish(self):
516 current=self.current_operation
517
518 # Wait for main logger thread to terminate, or for us to be terminated 518 # Wait for main logger thread to terminate, or for us to be terminated
519 while not self._terminate_or_pause() and self.thread_res.is_alive(): 519 while not self._terminate_or_pause() and self.thread_res.is_alive():
520 self._cond.release() 520 self._cond.release()
521 # Maybe wait for borg instead? 521 # Maybe wait for borg instead?
522 self.thread_res.join(JOIN_TIMEOUT) 522 self.thread_res.join(JOIN_TIMEOUT)
536 self.thread_log.join() 536 self.thread_log.join()
537 self._cond.acquire() 537 self._cond.acquire()
538 538
539 if not self.borg_instance.wait(): 539 if not self.borg_instance.wait():
540 self.logger.error('Borg subprocess did not terminate') 540 self.logger.error('Borg subprocess did not terminate')
541 curent.add_error(Errors.ERRORS) 541 self.current_operation.add_error(Errors.ERRORS)
542
543 self.thread_res=None
544 self.thread_log=None
545 self.borg_instance=None
546
547 self.__mark_current_finished()
548
549 def __mark_current_finished(self):
550 current=self.current_operation
551
552 assert(current)
542 553
543 current.finish_time=MonotonicTime.now() 554 current.finish_time=MonotonicTime.now()
544 555
545 self.previous_operation_of_type[current.type]=current 556 self.previous_operation_of_type[current.type]=current
546 self.previous_operation=current 557 self.previous_operation=current
547 self.current_operation=None 558 self.current_operation=None
548 self.thread_res=None
549 self.thread_log=None
550 self.borg_instance=None
551 559
552 @protect_noreturn 560 @protect_noreturn
553 def __main_thread(self): 561 def __main_thread(self):
554 with self._cond: 562 with self._cond:
555 while not self._terminate: 563 while not self._terminate:
559 if ((not self._terminate_or_pause()) and self.scheduled_operation 567 if ((not self._terminate_or_pause()) and self.scheduled_operation
560 and self.scheduled_operation.start_time <= MonotonicTime.now()): 568 and self.scheduled_operation.start_time <= MonotonicTime.now()):
561 self.__main_thread_queue_and_launch() 569 self.__main_thread_queue_and_launch()
562 except Exception as err: 570 except Exception as err:
563 self.logger.exception("Exception in backup '%s'" % self.backup_name) 571 self.logger.exception("Exception in backup '%s'" % self.backup_name)
572 current=self.current_operation
573 if current:
574 current.add_error(Errors.ERRORS)
575 self.__mark_current_finished()
564 finally: 576 finally:
565 self.__cleanup() 577 self.__cleanup()
566 578
567 def __cleanup(self): 579 def __cleanup(self):
568 thread_log=self.thread_log 580 thread_log=self.thread_log

mercurial