borgend/backup.py

changeset 98
9052e427ea39
parent 97
96d5adbe0205
child 100
b141bed9e718
equal deleted inserted replaced
97:96d5adbe0205 98:9052e427ea39
18 from .instance import BorgInstance 18 from .instance import BorgInstance
19 from .scheduler import TerminableThread 19 from .scheduler import TerminableThread
20 20
21 _logger=logging.getLogger(__name__) 21 _logger=logging.getLogger(__name__)
22 22
23 JOIN_TIMEOUT=60 23 JOIN_TIMEOUT=10
24 24
25 # 25 #
26 # State and operation related helper classes 26 # State and operation related helper classes
27 # 27 #
28 28
29 class State(IntEnum): 29 class State(IntEnum):
30 # State 30 # State
31 INACTIVE=0 31 INACTIVE=0
32 SCHEDULED=1 32 PAUSED=1
33 QUEUED=2 33 SCHEDULED=2
34 ACTIVE=3 34 QUEUED=3
35 ACTIVE=4
35 36
36 37
37 class Errors(IntEnum): 38 class Errors(IntEnum):
38 OK=0 39 OK=0
39 BUSY=1 40 BUSY=1
238 239
239 240
240 def __init__(self, identifier, cfg, scheduler): 241 def __init__(self, identifier, cfg, scheduler):
241 self.identifier=identifier 242 self.identifier=identifier
242 self.__status_update_callback=None 243 self.__status_update_callback=None
244 self._pause=False
243 self.scheduler=scheduler 245 self.scheduler=scheduler
244 self.logger=None # setup up in __decode_config once backup name is known 246 self.logger=None # setup up in __decode_config once backup name is known
245 247
246 self.borg_instance=None 248 self.borg_instance=None
247 self.thread_log=None 249 self.thread_log=None
331 msg['msgid']=='LockErrorT' or # in docs 333 msg['msgid']=='LockErrorT' or # in docs
332 msg['msgid']=='LockErrorT')): # in docs 334 msg['msgid']=='LockErrorT')): # in docs
333 errors=Errors.BUSY 335 errors=Errors.BUSY
334 with self._cond: 336 with self._cond:
335 self.current_operation.add_error(errors) 337 self.current_operation.add_error(errors)
336 status, callback=self.__status_unlocked() 338 # Don't notify of errors if we are terminating or pausing
339 if not self._terminate_or_pause():
340 status, callback=self.__status_unlocked()
337 elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE: 341 elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE:
338 # Borg gives very little progress info in easy form, so try to extrat it 342 # Borg gives very little progress info in easy form, so try to extrat it
339 archive_number, of_total=check_prune_status(msg['message']) 343 archive_number, of_total=check_prune_status(msg['message'])
340 if archive_number!=None and of_total!=None: 344 if archive_number!=None and of_total!=None:
341 self.current_operation.detail['progress_current_secondary']=archive_number 345 self.current_operation.detail['progress_current_secondary']=archive_number
485 489
486 self.__launch(op) 490 self.__launch(op)
487 491
488 self.__wait_finish() 492 self.__wait_finish()
489 493
494 def _terminate_or_pause(self):
495 return self._terminate or self._pause
496
490 def __wait_finish(self): 497 def __wait_finish(self):
491 current=self.current_operation 498 current=self.current_operation
492 499
493 # Wait for main logger thread to terminate, or for us to be terminated 500 # Wait for main logger thread to terminate, or for us to be terminated
494 while not self.terminate and self.thread_res.is_alive(): 501 while not self._terminate_or_pause() and self.thread_res.is_alive():
495 self._cond.release() 502 self._cond.release()
496 self.thread_res.join(JOIN_TIMEOUT) 503 self.thread_res.join(JOIN_TIMEOUT)
497 self._cond.acquire() 504 self._cond.acquire()
498 505
499 # If terminate has been signalled, let outer termination handler 506 # If terminate or pause has been signalled, let outer termination handler
500 # take care of things (Within this Backup class, it would be cleanest 507 # take care of things (Within this Backup class, it would be cleanest
501 # to raise an exception instead, but in most other places it's better 508 # to raise an exception instead, but in most other places it's better
502 # to just check self._terminate, so we don't complicate things with 509 # to just check self._terminate, so we don't complicate things with
503 # an extra exception.) 510 # an extra exception.)
504 if self._terminate: 511 if self.thread_res.is_alive():
505 return 512 return
506 513
507 self.logger.debug('Waiting for borg and log subprocesses to terminate') 514 self.logger.debug('Waiting for borg and log subprocesses to terminate')
508 515
509 self._cond.release() 516 self._cond.release()
520 self.previous_operation=current 527 self.previous_operation=current
521 self.current_operation=None 528 self.current_operation=None
522 self.thread_res=None 529 self.thread_res=None
523 self.thread_log=None 530 self.thread_log=None
524 self.borg_instance=None 531 self.borg_instance=None
525 self.state=State.INACTIVE
526 self.__update_status()
527 532
528 def __main_thread(self): 533 def __main_thread(self):
529 with self._cond: 534 with self._cond:
530 while not self._terminate: 535 while not self._terminate:
531 try: 536 try:
532 assert(not self.current_operation) 537 assert(not self.current_operation)
533 self.__main_thread_wait_schedule() 538 self.__main_thread_wait_schedule()
534 if not self._terminate: 539 if (not self._terminate_or_pause() and self.scheduled_operation
540 and self.scheduled_operation.start_time <= MonotonicTime.now()):
535 self.__main_thread_queue_and_launch() 541 self.__main_thread_queue_and_launch()
536 except Exception as err: 542 except Exception as err:
537 self.logger.exception("Error with backup '%s'" % self.backup_name) 543 self.logger.exception("Exception in backup '%s'" % self.backup_name)
538 self.errors=Errors.ERRORS 544 finally:
539 self.__cleanup() 545 self.__cleanup()
540 546
541 self.__cleanup()
542
543 def __cleanup(self): 547 def __cleanup(self):
544 self.state=State.INACTIVE
545 self.scheduled_operation=None
546 self.current_operation=None
547 thread_log=self.thread_log 548 thread_log=self.thread_log
548 thread_res=self.thread_res 549 thread_res=self.thread_res
549 borg_instance=self.borg_instance 550 borg_instance=self.borg_instance
551 self.scheduled_operation=None
550 self.thread_log=None 552 self.thread_log=None
551 self.thread_res=None 553 self.thread_res=None
552 self.borg_instance=None 554 self.borg_instance=None
553 555
554 self._cond.release() 556 self._cond.release()
564 if thread_res: 566 if thread_res:
565 self.logger.debug("Waiting for result thread to terminate") 567 self.logger.debug("Waiting for result thread to terminate")
566 thread_res.join() 568 thread_res.join()
567 finally: 569 finally:
568 self._cond.acquire() 570 self._cond.acquire()
571 self.current_operation=None
569 572
570 # Main thread/2. Schedule next operation if there is no manually 573 # Main thread/2. Schedule next operation if there is no manually
571 # requested one 574 # requested one
572 def __main_thread_wait_schedule(self): 575 def __main_thread_wait_schedule(self):
573 op=None 576 op=None
574 if not self.scheduled_operation: 577 if self._pause:
575 op=self.__next_operation_unlocked() 578 self.logger.info("Waiting for resume to be signalled")
576 if op: 579
577 self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" % 580 self.state=State.PAUSED
578 (str(op.type), op.detail or 'none',
579 op.start_time.isoformat(),
580 op.start_time.__class__.__name__))
581
582 self.scheduled_operation=op
583 self.state=State.SCHEDULED
584 self.__update_status() 581 self.__update_status()
585 582
586 # Wait under scheduled wait
587 self.scheduler.wait_until(op.start_time, self._cond, self.backup_name)
588 else:
589 # Nothing scheduled - just wait
590 self.logger.info("Waiting for manual scheduling")
591
592 self.state=State.INACTIVE
593 self.__update_status()
594
595 self._cond.wait() 583 self._cond.wait()
584 else:
585 if not self.scheduled_operation:
586 op=self.__next_operation_unlocked()
587 if op:
588 self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" %
589 (str(op.type), op.detail or 'none',
590 op.start_time.isoformat(),
591 op.start_time.__class__.__name__))
592
593 self.scheduled_operation=op
594 self.state=State.SCHEDULED
595 self.__update_status()
596
597 # Wait under scheduled wait
598 self.scheduler.wait_until(op.start_time, self._cond, self.backup_name)
599 else:
600 # Nothing scheduled - just wait
601 self.logger.info("Waiting for manual scheduling")
602
603 self.state=State.INACTIVE
604 self.__update_status()
605
606 self._cond.wait()
596 607
597 # Main thread/3. If there is a scheduled operation (it might have been 608 # Main thread/3. If there is a scheduled operation (it might have been
598 # changed manually from 'op' created in __main_thread_wait_schedule above), 609 # changed manually from 'op' created in __main_thread_wait_schedule above),
599 # queue it on the repository, and launch the operation once repository 610 # queue it on the repository, and launch the operation once repository
600 # available 611 # available
604 self.state=State.QUEUED 615 self.state=State.QUEUED
605 self.__update_status() 616 self.__update_status()
606 res=self.repository.queue_action(self._cond, 617 res=self.repository.queue_action(self._cond,
607 action=self.__launch_and_wait, 618 action=self.__launch_and_wait,
608 name=self.backup_name) 619 name=self.backup_name)
609 if not res and not self._terminate: 620 if not res:
610 self.logger.debug("Queueing aborted") 621 self.logger.debug("Queueing aborted")
611 self.scheduled_operation=None
612 self.state=State.INACTIVE
613 self.__update_status()
614 622
615 def __next_operation_unlocked(self): 623 def __next_operation_unlocked(self):
616 listop=self.__next_operation_list() 624 listop=self.__next_operation_list()
617 if listop: 625 if listop:
618 return listop 626 return listop
743 def abort(self): 751 def abort(self):
744 with self._cond: 752 with self._cond:
745 if self.borg_instance: 753 if self.borg_instance:
746 self.borg_instance.terminate() 754 self.borg_instance.terminate()
747 755
756 def is_paused(self):
757 with self._cond:
758 paused=self.state==State.PAUSED
759 return paused
760
761 def pause(self):
762 with self._cond:
763 self.logger.debug('Pause signalled')
764 self.scheduled_operation=None
765 self._pause=True
766 self._cond.notify()
767
768 def resume(self):
769 with self._cond:
770 self.logger.debug('Resume signalled')
771 self._pause=False
772 self._cond.notify()
773

mercurial