238 |
239 |
239 |
240 |
240 def __init__(self, identifier, cfg, scheduler): |
241 def __init__(self, identifier, cfg, scheduler): |
241 self.identifier=identifier |
242 self.identifier=identifier |
242 self.__status_update_callback=None |
243 self.__status_update_callback=None |
|
244 self._pause=False |
243 self.scheduler=scheduler |
245 self.scheduler=scheduler |
244 self.logger=None # setup up in __decode_config once backup name is known |
246 self.logger=None # setup up in __decode_config once backup name is known |
245 |
247 |
246 self.borg_instance=None |
248 self.borg_instance=None |
247 self.thread_log=None |
249 self.thread_log=None |
331 msg['msgid']=='LockErrorT' or # in docs |
333 msg['msgid']=='LockErrorT' or # in docs |
332 msg['msgid']=='LockErrorT')): # in docs |
334 msg['msgid']=='LockErrorT')): # in docs |
333 errors=Errors.BUSY |
335 errors=Errors.BUSY |
334 with self._cond: |
336 with self._cond: |
335 self.current_operation.add_error(errors) |
337 self.current_operation.add_error(errors) |
336 status, callback=self.__status_unlocked() |
338 # Don't notify of errors if we are terminating or pausing |
|
339 if not self._terminate_or_pause(): |
|
340 status, callback=self.__status_unlocked() |
337 elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE: |
341 elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE: |
338 # Borg gives very little progress info in easy form, so try to extrat it |
342 # Borg gives very little progress info in easy form, so try to extrat it |
339 archive_number, of_total=check_prune_status(msg['message']) |
343 archive_number, of_total=check_prune_status(msg['message']) |
340 if archive_number!=None and of_total!=None: |
344 if archive_number!=None and of_total!=None: |
341 self.current_operation.detail['progress_current_secondary']=archive_number |
345 self.current_operation.detail['progress_current_secondary']=archive_number |
485 |
489 |
486 self.__launch(op) |
490 self.__launch(op) |
487 |
491 |
488 self.__wait_finish() |
492 self.__wait_finish() |
489 |
493 |
|
494 def _terminate_or_pause(self): |
|
495 return self._terminate or self._pause |
|
496 |
490 def __wait_finish(self): |
497 def __wait_finish(self): |
491 current=self.current_operation |
498 current=self.current_operation |
492 |
499 |
493 # Wait for main logger thread to terminate, or for us to be terminated |
500 # Wait for main logger thread to terminate, or for us to be terminated |
494 while not self.terminate and self.thread_res.is_alive(): |
501 while not self._terminate_or_pause() and self.thread_res.is_alive(): |
495 self._cond.release() |
502 self._cond.release() |
496 self.thread_res.join(JOIN_TIMEOUT) |
503 self.thread_res.join(JOIN_TIMEOUT) |
497 self._cond.acquire() |
504 self._cond.acquire() |
498 |
505 |
499 # If terminate has been signalled, let outer termination handler |
506 # If terminate or pause has been signalled, let outer termination handler |
500 # take care of things (Within this Backup class, it would be cleanest |
507 # take care of things (Within this Backup class, it would be cleanest |
501 # to raise an exception instead, but in most other places it's better |
508 # to raise an exception instead, but in most other places it's better |
502 # to just check self._terminate, so we don't complicate things with |
509 # to just check self._terminate, so we don't complicate things with |
503 # an extra exception.) |
510 # an extra exception.) |
504 if self._terminate: |
511 if self.thread_res.is_alive(): |
505 return |
512 return |
506 |
513 |
507 self.logger.debug('Waiting for borg and log subprocesses to terminate') |
514 self.logger.debug('Waiting for borg and log subprocesses to terminate') |
508 |
515 |
509 self._cond.release() |
516 self._cond.release() |
520 self.previous_operation=current |
527 self.previous_operation=current |
521 self.current_operation=None |
528 self.current_operation=None |
522 self.thread_res=None |
529 self.thread_res=None |
523 self.thread_log=None |
530 self.thread_log=None |
524 self.borg_instance=None |
531 self.borg_instance=None |
525 self.state=State.INACTIVE |
|
526 self.__update_status() |
|
527 |
532 |
528 def __main_thread(self): |
533 def __main_thread(self): |
529 with self._cond: |
534 with self._cond: |
530 while not self._terminate: |
535 while not self._terminate: |
531 try: |
536 try: |
532 assert(not self.current_operation) |
537 assert(not self.current_operation) |
533 self.__main_thread_wait_schedule() |
538 self.__main_thread_wait_schedule() |
534 if not self._terminate: |
539 if (not self._terminate_or_pause() and self.scheduled_operation |
|
540 and self.scheduled_operation.start_time <= MonotonicTime.now()): |
535 self.__main_thread_queue_and_launch() |
541 self.__main_thread_queue_and_launch() |
536 except Exception as err: |
542 except Exception as err: |
537 self.logger.exception("Error with backup '%s'" % self.backup_name) |
543 self.logger.exception("Exception in backup '%s'" % self.backup_name) |
538 self.errors=Errors.ERRORS |
544 finally: |
539 self.__cleanup() |
545 self.__cleanup() |
540 |
546 |
541 self.__cleanup() |
|
542 |
|
543 def __cleanup(self): |
547 def __cleanup(self): |
544 self.state=State.INACTIVE |
|
545 self.scheduled_operation=None |
|
546 self.current_operation=None |
|
547 thread_log=self.thread_log |
548 thread_log=self.thread_log |
548 thread_res=self.thread_res |
549 thread_res=self.thread_res |
549 borg_instance=self.borg_instance |
550 borg_instance=self.borg_instance |
|
551 self.scheduled_operation=None |
550 self.thread_log=None |
552 self.thread_log=None |
551 self.thread_res=None |
553 self.thread_res=None |
552 self.borg_instance=None |
554 self.borg_instance=None |
553 |
555 |
554 self._cond.release() |
556 self._cond.release() |
564 if thread_res: |
566 if thread_res: |
565 self.logger.debug("Waiting for result thread to terminate") |
567 self.logger.debug("Waiting for result thread to terminate") |
566 thread_res.join() |
568 thread_res.join() |
567 finally: |
569 finally: |
568 self._cond.acquire() |
570 self._cond.acquire() |
|
571 self.current_operation=None |
569 |
572 |
570 # Main thread/2. Schedule next operation if there is no manually |
573 # Main thread/2. Schedule next operation if there is no manually |
571 # requested one |
574 # requested one |
572 def __main_thread_wait_schedule(self): |
575 def __main_thread_wait_schedule(self): |
573 op=None |
576 op=None |
574 if not self.scheduled_operation: |
577 if self._pause: |
575 op=self.__next_operation_unlocked() |
578 self.logger.info("Waiting for resume to be signalled") |
576 if op: |
579 |
577 self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" % |
580 self.state=State.PAUSED |
578 (str(op.type), op.detail or 'none', |
|
579 op.start_time.isoformat(), |
|
580 op.start_time.__class__.__name__)) |
|
581 |
|
582 self.scheduled_operation=op |
|
583 self.state=State.SCHEDULED |
|
584 self.__update_status() |
581 self.__update_status() |
585 |
582 |
586 # Wait under scheduled wait |
|
587 self.scheduler.wait_until(op.start_time, self._cond, self.backup_name) |
|
588 else: |
|
589 # Nothing scheduled - just wait |
|
590 self.logger.info("Waiting for manual scheduling") |
|
591 |
|
592 self.state=State.INACTIVE |
|
593 self.__update_status() |
|
594 |
|
595 self._cond.wait() |
583 self._cond.wait() |
|
584 else: |
|
585 if not self.scheduled_operation: |
|
586 op=self.__next_operation_unlocked() |
|
587 if op: |
|
588 self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" % |
|
589 (str(op.type), op.detail or 'none', |
|
590 op.start_time.isoformat(), |
|
591 op.start_time.__class__.__name__)) |
|
592 |
|
593 self.scheduled_operation=op |
|
594 self.state=State.SCHEDULED |
|
595 self.__update_status() |
|
596 |
|
597 # Wait under scheduled wait |
|
598 self.scheduler.wait_until(op.start_time, self._cond, self.backup_name) |
|
599 else: |
|
600 # Nothing scheduled - just wait |
|
601 self.logger.info("Waiting for manual scheduling") |
|
602 |
|
603 self.state=State.INACTIVE |
|
604 self.__update_status() |
|
605 |
|
606 self._cond.wait() |
596 |
607 |
597 # Main thread/3. If there is a scheduled operation (it might have been |
608 # Main thread/3. If there is a scheduled operation (it might have been |
598 # changed manually from 'op' created in __main_thread_wait_schedule above), |
609 # changed manually from 'op' created in __main_thread_wait_schedule above), |
599 # queue it on the repository, and launch the operation once repository |
610 # queue it on the repository, and launch the operation once repository |
600 # available |
611 # available |
604 self.state=State.QUEUED |
615 self.state=State.QUEUED |
605 self.__update_status() |
616 self.__update_status() |
606 res=self.repository.queue_action(self._cond, |
617 res=self.repository.queue_action(self._cond, |
607 action=self.__launch_and_wait, |
618 action=self.__launch_and_wait, |
608 name=self.backup_name) |
619 name=self.backup_name) |
609 if not res and not self._terminate: |
620 if not res: |
610 self.logger.debug("Queueing aborted") |
621 self.logger.debug("Queueing aborted") |
611 self.scheduled_operation=None |
|
612 self.state=State.INACTIVE |
|
613 self.__update_status() |
|
614 |
622 |
615 def __next_operation_unlocked(self): |
623 def __next_operation_unlocked(self): |
616 listop=self.__next_operation_list() |
624 listop=self.__next_operation_list() |
617 if listop: |
625 if listop: |
618 return listop |
626 return listop |