borgend/backup.py

changeset 97
96d5adbe0205
parent 96
de8ac6c470d8
child 98
9052e427ea39
equal deleted inserted replaced
96:de8ac6c470d8 97:96d5adbe0205
6 # 6 #
7 7
8 import logging 8 import logging
9 import time 9 import time
10 import datetime 10 import datetime
11 import re
11 from enum import IntEnum 12 from enum import IntEnum
12 from threading import Thread, Lock, Condition 13 from threading import Thread, Lock, Condition
13 14
14 from . import config 15 from . import config
15 from . import repository 16 from . import repository
16 from . import dreamtime 17 from .dreamtime import MonotonicTime, DreamTime, RealTime
17 from .instance import BorgInstance 18 from .instance import BorgInstance
18 from .scheduler import TerminableThread 19 from .scheduler import TerminableThread
19 20
20 _logger=logging.getLogger(__name__) 21 _logger=logging.getLogger(__name__)
21 22
59 CREATE='create' 60 CREATE='create'
60 PRUNE='prune' 61 PRUNE='prune'
61 INFO='info' 62 INFO='info'
62 LIST='list' 63 LIST='list'
63 64
64 def __init__(self, operation, time, **kwargs): 65 def __init__(self, type, start_time, **kwargs):
65 self.operation=operation 66 self.type=type
66 self.time=time 67 self.start_time=start_time
68 self.finish_time=None
67 self.detail=kwargs 69 self.detail=kwargs
70 self.errors=Errors.OK
68 71
69 def when(self): 72 def when(self):
70 return self.time.realtime() 73 return self.start_time.realtime()
74
75 def ok(self):
76 return self.errors.ok()
77
78 def add_error(self, error):
79 self.errors=self.errors.combine(error)
71 80
72 81
73 class Status(Operation): 82 class Status(Operation):
74 def __init__(self, backup, op=None): 83 def __init__(self, backup, op=None):
84 op=None
85 errorsop=None
86
87 if backup.current_operation:
88 op=backup.current_operation
89 elif backup.scheduled_operation:
90 op=backup.scheduled_operation
91 if backup.previous_operation:
92 errorsop=backup.previous_operation
93
75 if op: 94 if op:
76 super().__init__(op.operation, op.time, **op.detail) 95 super().__init__(op.type, op.start_time, **op.detail)
96 if not errorsop:
97 errorsop=op
98 self.errors=errorsop.errors
77 else: 99 else:
78 super().__init__(None, None) 100 super().__init__(None, None)
79 101
80 self.name=backup.name 102 self.name=backup.name
81 self.state=backup.state 103 self.state=backup.state
82 self.errors=backup.errors
83 104
84 # 105 #
85 # Miscellaneous helper routines 106 # Miscellaneous helper routines
86 # 107 #
87 108
137 if not thistime: 158 if not thistime:
138 return None, False 159 return None, False
139 160
140 return thistime, True 161 return thistime, True
141 162
163 _prune_progress_re=re.compile(".*\(([0-9]+)/([0-9]+)\)$")
164 # Borg gives very little progress info in easy form, so try to extrat it
165 def check_prune_status(msg):
166 res=_prune_progress_re.match(msg)
167 if res:
168 c=res.groups()
169 try:
170 archive_no=int(c[0])
171 of_total=int(c[1])
172 except:
173 pass
174 else:
175 return archive_no, of_total
176 return None, None
177
178
142 # 179 #
143 # The Backup class 180 # The Backup class
144 # 181 #
145 182
146 class Backup(TerminableThread): 183 class Backup(TerminableThread):
171 208
172 self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', 209 self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval',
173 'Backup interval', loc, 210 'Backup interval', loc,
174 config.defaults['backup_interval']) 211 config.defaults['backup_interval'])
175 212
213 self.prune_interval=config.check_nonneg_int(cfg, 'prune_interval',
214 'Prune interval', loc,
215 config.defaults['prune_interval'])
216
176 self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', 217 self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
177 'Retry interval', loc, 218 'Retry interval', loc,
178 config.defaults['retry_interval']) 219 config.defaults['retry_interval'])
179 220
180 221
181 scheduling=config.check_string(cfg, 'scheduling', 222 scheduling=config.check_string(cfg, 'scheduling',
182 'Scheduling mode', loc, 223 'Scheduling mode', loc,
183 default="dreamtime") 224 default="dreamtime")
184 225
185 if scheduling=="dreamtime": 226 if scheduling=="dreamtime":
186 self.timeclass=dreamtime.DreamTime 227 self.timeclass=DreamTime
187 elif scheduling=="realtime": 228 elif scheduling=="realtime":
188 self.timeclass=dreamtime.MonotonicTime 229 self.timeclass=MonotonicTime
189 elif scheduling=="manual": 230 elif scheduling=="manual":
190 self.backup_interval=0 231 self.backup_interval=0
191 else: 232 else:
192 logging.error("Invalid time class '%s' for %s" % (scheduling, loc)) 233 logging.error("Invalid time class '%s' for %s" % (scheduling, loc))
193 234
204 245
205 self.borg_instance=None 246 self.borg_instance=None
206 self.thread_log=None 247 self.thread_log=None
207 self.thread_res=None 248 self.thread_res=None
208 249
250 self.previous_operation=None
209 self.current_operation=None 251 self.current_operation=None
210 self.scheduled_operation=None 252 self.scheduled_operation=None
211 self.last_operation_finished=None 253 self.previous_operation_of_type={}
212 self.last_create_when=None
213 self.last_create_finished=None
214 self.state=State.INACTIVE 254 self.state=State.INACTIVE
215 self.errors=Errors.OK 255 self.timeclass=DreamTime
216 self.timeclass=dreamtime.DreamTime
217 256
218 self.__decode_config(cfg) 257 self.__decode_config(cfg)
219 258
220 super().__init__(target = self.__main_thread, name = self.backup_name) 259 super().__init__(target = self.__main_thread, name = self.backup_name)
221 self.daemon=True 260 self.daemon=True
239 278
240 def __log_listener(self): 279 def __log_listener(self):
241 self.logger.debug('Log listener thread waiting for entries') 280 self.logger.debug('Log listener thread waiting for entries')
242 success=True 281 success=True
243 for msg in iter(self.borg_instance.read_log, None): 282 for msg in iter(self.borg_instance.read_log, None):
244 self.logger.info(str(msg)) 283 self.logger.debug(str(msg))
245 t=msg['type'] 284 t=msg['type']
246 285
247 errormsg=None 286 errormsg=None
248 callback=None 287 callback=None
249 288
291 (msg['msgid']=='LockTimeout' or # observed in reality 330 (msg['msgid']=='LockTimeout' or # observed in reality
292 msg['msgid']=='LockErrorT' or # in docs 331 msg['msgid']=='LockErrorT' or # in docs
293 msg['msgid']=='LockErrorT')): # in docs 332 msg['msgid']=='LockErrorT')): # in docs
294 errors=Errors.BUSY 333 errors=Errors.BUSY
295 with self._cond: 334 with self._cond:
296 self.errors=self.errors.combine(errors) 335 self.current_operation.add_error(errors)
336 status, callback=self.__status_unlocked()
337 elif lvl==logging.INFO and self.current_operation.type==Operation.PRUNE:
338 # Borg gives very little progress info in easy form, so try to extrat it
339 archive_number, of_total=check_prune_status(msg['message'])
340 if archive_number!=None and of_total!=None:
341 self.current_operation.detail['progress_current_secondary']=archive_number
342 self.current_operation.detail['progress_total_secondary']=of_total
297 status, callback=self.__status_unlocked() 343 status, callback=self.__status_unlocked()
298 344
299 elif t=='question_prompt' or t=='question_prompt_retry': 345 elif t=='question_prompt' or t=='question_prompt_retry':
300 self.logger.error('Did not expect to receive question prompt from borg') 346 self.logger.error('Did not expect to receive question prompt from borg')
301 with self._cond: 347 with self._cond:
302 self.errors=self.errors.combine(Errors.ERRORS) 348 self.current_operation.add_error(Errors.ERRORS)
303 # TODO: terminate org? Send 'NO' reply? 349 # TODO: terminate org? Send 'NO' reply?
304 350
305 elif (t=='question_invalid_answer' or t=='question_accepted_default' 351 elif (t=='question_invalid_answer' or t=='question_accepted_default'
306 or t=='question_accepted_true' or t=='question_accepted_false' 352 or t=='question_accepted_true' or t=='question_accepted_false'
307 or t=='question_env_answer'): 353 or t=='question_env_answer'):
326 372
327 self.logger.debug('Borg result: %s' % str(res)) 373 self.logger.debug('Borg result: %s' % str(res))
328 374
329 if res is None: 375 if res is None:
330 with self._cond: 376 with self._cond:
331 if self.errors.ok(): 377 # Prune gives absolutely no result, so don't complain
378 if (self.current_operation.ok() and
379 self.current_operation.type!=Operation.PRUNE):
332 self.logger.error('No result from borg despite no error in log') 380 self.logger.error('No result from borg despite no error in log')
333 self.errors=Errors.ERRORS 381 self.current_operation.add_error(Errors.ERRORS)
334 elif self.current_operation.operation==Operation.CREATE: 382 elif self.current_operation.type==Operation.LIST:
335 with self._cond: 383 self.__handle_list_result(res)
336 self.last_create_finished=time.monotonic() 384
337 self.last_create_when=self.current_operation.time.monotonic() 385 # All other results are discarded
338 elif self.current_operation.operation==Operation.LIST: 386
387 def __handle_list_result(self, res):
339 ok=True 388 ok=True
340 latest=None 389 latest=None
341 if 'archives' in res and isinstance(res['archives'], list): 390 if 'archives' in res and isinstance(res['archives'], list):
342 archives=res['archives'] 391 archives=res['archives']
343 for a in archives: 392 for a in archives:
352 else: 401 else:
353 logger.error('Borg archive list missing "archives" entry') 402 logger.error('Borg archive list missing "archives" entry')
354 403
355 if not ok: 404 if not ok:
356 with self._cond: 405 with self._cond:
357 self.errors=self.errors.combine(Errors.ERRORS) 406 self.current_operation.add_error(Errors.ERRORS)
358 407
359 if latest: 408 if latest:
360 self.logger.info('borg info: Previous backup was on %s' % latest.isoformat()) 409 self.logger.info('borg info: Previous backup was on %s' % latest.isoformat())
361 realtime=time.mktime(latest.timetuple()) 410 when=MonotonicTime.from_realtime(time.mktime(latest.timetuple()))
362 monotonic=realtime+time.monotonic()-time.time() 411 op=Operation(Operation.CREATE, when, reason='listed')
412 op.finish_time=when
363 with self._cond: 413 with self._cond:
364 self.last_create_finished=monotonic 414 self.previous_operation_of_type[Operation.CREATE]=op
365 self.last_create_when=monotonic
366 else: 415 else:
367 self.logger.info('borg info: Could not discover a previous backup') 416 self.logger.info('borg info: Could not discover a previous backup')
368 with self._cond:
369 self.last_create_finished=-1
370 self.last_create_when=None
371 417
372 def __do_launch(self, op, archive_or_repository, 418 def __do_launch(self, op, archive_or_repository,
373 common_params, op_params, paths=[]): 419 common_params, op_params, paths=[]):
374 420
375 self.logger.debug('Creating BorgInstance') 421 self.logger.debug('Creating BorgInstance')
376 422
377 inst=BorgInstance(op.operation, archive_or_repository, 423 inst=BorgInstance(op.type, archive_or_repository,
378 common_params, op_params, paths) 424 common_params, op_params, paths)
379 425
380 self.logger.debug('Launching BorgInstance via repository') 426 self.logger.debug('Launching BorgInstance via repository')
381 427
382 # Only the Repository object has access to the passphrase 428 # Only the Repository object has access to the passphrase
394 self.thread_res=t_res 440 self.thread_res=t_res
395 self.borg_instance=inst 441 self.borg_instance=inst
396 self.current_operation=op 442 self.current_operation=op
397 # Update scheduled time to real starting time to schedule 443 # Update scheduled time to real starting time to schedule
398 # next run relative to this 444 # next run relative to this
399 self.current_operation.time=dreamtime.MonotonicTime.now() 445 self.current_operation.start_time=MonotonicTime.now()
400 self.state=State.ACTIVE 446 self.state=State.ACTIVE
401 # Reset error status when starting a new operation 447 # Reset error status when starting a new operation
402 self.errors=Errors.OK
403 self.__update_status() 448 self.__update_status()
404 449
405 t_log.start() 450 t_log.start()
406 t_res.start() 451 t_res.start()
407 452
408 453
409 def __launch(self, op): 454 def __launch(self, op):
410 self.logger.debug("Launching '%s'" % str(op.operation)) 455 self.logger.debug("Launching '%s'" % str(op.type))
411 456
412 params=(config.borg_parameters 457 params=(config.borg_parameters
413 +self.repository.borg_parameters 458 +self.repository.borg_parameters
414 +self.borg_parameters) 459 +self.borg_parameters)
415 460
416 if op.operation==Operation.CREATE: 461 if op.type==Operation.CREATE:
417 archive="%s::%s%s" % (self.repository.location, 462 archive="%s::%s%s" % (self.repository.location,
418 self.archive_prefix, 463 self.archive_prefix,
419 self.archive_template) 464 self.archive_template)
420 465
421 self.__do_launch(op, archive, params.common, 466 self.__do_launch(op, archive, params.common,
422 params.create, self.paths) 467 params.create, self.paths)
423 elif op.operation==Operation.PRUNE: 468 elif op.type==Operation.PRUNE:
424 self.__do_launch(op, self.repository.location, params.common, 469 self.__do_launch(op, self.repository.location, params.common,
425 [{'prefix': self.archive_prefix}] + params.create) 470 [{'prefix': self.archive_prefix}] + params.prune)
426 471
427 elif op.operation==Operation.LIST: 472 elif op.type==Operation.LIST:
428 self.__do_launch(op, self.repository.location, params.common, 473 self.__do_launch(op, self.repository.location, params.common,
429 [{'prefix': self.archive_prefix}]) 474 [{'prefix': self.archive_prefix}])
430 else: 475 else:
431 raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) 476 raise NotImplementedError("Invalid operation '%s'" % str(op.type))
432 477
433 # This must be called with self._cond held. 478 # This must be called with self._cond held.
434 def __launch_and_wait(self): 479 def __launch_and_wait(self):
435 op=self.scheduled_operation 480 op=self.scheduled_operation
436 if not op: 481 if not op:
441 self.__launch(op) 486 self.__launch(op)
442 487
443 self.__wait_finish() 488 self.__wait_finish()
444 489
445 def __wait_finish(self): 490 def __wait_finish(self):
491 current=self.current_operation
492
446 # Wait for main logger thread to terminate, or for us to be terminated 493 # Wait for main logger thread to terminate, or for us to be terminated
447 while not self.terminate and self.thread_res.is_alive(): 494 while not self.terminate and self.thread_res.is_alive():
448 self._cond.release() 495 self._cond.release()
449 self.thread_res.join(JOIN_TIMEOUT) 496 self.thread_res.join(JOIN_TIMEOUT)
450 self._cond.acquire() 497 self._cond.acquire()
463 self.thread_log.join() 510 self.thread_log.join()
464 self._cond.acquire() 511 self._cond.acquire()
465 512
466 if not self.borg_instance.wait(): 513 if not self.borg_instance.wait():
467 self.logger.error('Borg subprocess did not terminate') 514 self.logger.error('Borg subprocess did not terminate')
468 self.errors=self.errors.combine(Errors.ERRORS) 515 curent.add_error(Errors.ERRORS)
469 516
470 now=time.monotonic() 517 current.finish_time=MonotonicTime.now()
471 self.last_operation_finished=now 518
519 self.previous_operation_of_type[current.type]=current
520 self.previous_operation=current
521 self.current_operation=None
472 self.thread_res=None 522 self.thread_res=None
473 self.thread_log=None 523 self.thread_log=None
474 self.borg_instance=None 524 self.borg_instance=None
475 self.current_operation=None
476 self.state=State.INACTIVE 525 self.state=State.INACTIVE
477 self.__update_status() 526 self.__update_status()
478 527
479 def __main_thread(self): 528 def __main_thread(self):
480 with self._cond: 529 with self._cond:
523 def __main_thread_wait_schedule(self): 572 def __main_thread_wait_schedule(self):
524 op=None 573 op=None
525 if not self.scheduled_operation: 574 if not self.scheduled_operation:
526 op=self.__next_operation_unlocked() 575 op=self.__next_operation_unlocked()
527 if op: 576 if op:
528 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" % 577 self.logger.info("Scheduling '%s' (detail: %s) on %s [%s]" %
529 (str(op.operation), op.detail or 'none', 578 (str(op.type), op.detail or 'none',
530 op.time.seconds_to(), 579 op.start_time.isoformat(),
531 op.time.__class__.__name__)) 580 op.start_time.__class__.__name__))
532 581
533 self.scheduled_operation=op 582 self.scheduled_operation=op
534 self.state=State.SCHEDULED 583 self.state=State.SCHEDULED
535 self.__update_status() 584 self.__update_status()
536 585
537 # Wait under scheduled wait 586 # Wait under scheduled wait
538 self.scheduler.wait_until(op.time, self._cond, self.backup_name) 587 self.scheduler.wait_until(op.start_time, self._cond, self.backup_name)
539 else: 588 else:
540 # Nothing scheduled - just wait 589 # Nothing scheduled - just wait
541 self.logger.info("Waiting for manual scheduling") 590 self.logger.info("Waiting for manual scheduling")
542 591
543 self.state=State.INACTIVE 592 self.state=State.INACTIVE
562 self.scheduled_operation=None 611 self.scheduled_operation=None
563 self.state=State.INACTIVE 612 self.state=State.INACTIVE
564 self.__update_status() 613 self.__update_status()
565 614
566 def __next_operation_unlocked(self): 615 def __next_operation_unlocked(self):
567 if self.backup_interval==0: 616 listop=self.__next_operation_list()
568 # Manual backup 617 if listop:
618 return listop
619
620 create=self.__next_operation_type(Operation.CREATE,
621 self.backup_interval,
622 important=True,
623 initial_reason='initial');
624
625 prune=self.__next_operation_type(Operation.PRUNE,
626 self.prune_interval,
627 important=False,
628 initial_reason=None);
629
630 if not prune:
631 return create
632 elif not create:
633 return prune
634 elif create.start_time < prune.start_time:
635 return create
636 else:
637 return prune
638
639 def __next_operation_list(self):
640 reason='initial'
641 # Unless manual backup has been chosen (backup_interval<=0), perform
642 # repository listing if no previous create operation known, or if we
643 # just pruned the repository
644 if self.backup_interval<=0:
569 return None 645 return None
570 elif self.last_create_finished==None: 646 elif (self.previous_operation and
571 # Don't know when last create finished: try to get it from 647 self.previous_operation.type==Operation.PRUNE):
572 # archive list. 648 tm=MonotonicTime.now()
573 if not self.errors.ok() and self.retry_interval==0: 649 reason='post-prune'
650 elif Operation.LIST in self.previous_operation_of_type:
651 prev=self.previous_operation_of_type[Operation.LIST]
652 if prev.ok():
653 return None
654 if self.retry_interval==0:
574 # Do not retry in case of errors if retry interval is zero 655 # Do not retry in case of errors if retry interval is zero
575 return None 656 return None
576 elif self.last_operation_finished: 657 # Attempt after retry interval
577 # Attempt ater retry interval if some operation has been run 658 tm=MonotonicTime.after_other(prev.finish_time, self.retry_interval)
578 # already 659 else:
579 tm=dreamtime.MonotonicTime(self.last_operation_finished+self.retry_interval) 660 # Nothing has been attempted: run immediately
580 else: 661 tm=MonotonicTime.now()
581 # Nothing has been attempted: run immediately 662 return Operation(Operation.LIST, tm, reason=reason)
582 tm=dreamtime.MonotonicTime.now() 663
583 return Operation(Operation.LIST, tm, reason='initial') 664 def __next_operation_type(self, optype, standard_interval,
584 elif self.errors.ok() and self.last_create_finished<0: 665 important=False,
585 # No backup exists; perform initial backup 666 initial_reason=None):
586 tm=dreamtime.MonotonicTime.now() 667 if optype not in self.previous_operation_of_type:
587 return Operation(Operation.CREATE, tm, reason='initial') 668 # No previous operation exists; perform immediately
588 elif not self.errors.ok(): 669 # if important, otherwise after standard interval.
589 # Retry create in case of errors unless retry has been disabled 670 # Do not perform if manual operation selected by
590 if self.retry_interval==0: 671 # setting standard_interval<=0
672 if standard_interval<=0:
591 return None 673 return None
592 else: 674 else:
593 if self.last_create_finished<0: 675 if important:
594 basetime=time.monotonic() 676 tm=MonotonicTime.now()
595 else: 677 else:
596 basetime=self.last_create_finished 678 tm=self.timeclass.after(standard_interval)
597 tm=dreamtime.MonotonicTime(basetime+self.retry_interval) 679 if initial_reason:
598 return Operation(Operation.CREATE, tm, reason='retry') 680 return Operation(optype, tm, reason=initial_reason)
599 else: 681 else:
600 # All ok - run create at standard backup interval 682 return Operation(optype, tm)
601 tm=self.timeclass.from_monotonic(self.last_create_when+self.backup_interval) 683 else:
602 return Operation(Operation.CREATE, tm) 684 # Previous operation has been performed; perform after
685 # retry interval if there were errors, otherwise after
686 # standard interval.
687 prev=self.previous_operation_of_type[optype]
688 if not prev.ok():
689 tm=MonotonicTime.after_other(prev.start_time,
690 self.retry_interval)
691 return Operation(optype, tm, reason='retry')
692 elif standard_interval>0:
693 tm=self.timeclass.after_other(prev.start_time,
694 standard_interval)
695 return Operation(optype, tm)
696 else:
697 # Manual operation is standard_interval is zero.
698 return None
603 699
604 def __status_unlocked(self): 700 def __status_unlocked(self):
605 callback=self.__status_update_callback 701 callback=self.__status_update_callback
606 702 status=Status(self)
607 if self.current_operation:
608 status=Status(self, self.current_operation)
609 elif self.scheduled_operation:
610 status=Status(self, self.scheduled_operation)
611 else:
612 status=Status(self)
613 703
614 return status, callback 704 return status, callback
615 705
616 def __update_status(self): 706 def __update_status(self):
617 status, callback = self.__status_unlocked() 707 status, callback = self.__status_unlocked()
636 with self._cond: 726 with self._cond:
637 res=self.__status_unlocked() 727 res=self.__status_unlocked()
638 return res[0] 728 return res[0]
639 729
640 def create(self): 730 def create(self):
641 op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual') 731 op=Operation(Operation.CREATE, MonotonicTime.now(), reason='manual')
642 with self._cond: 732 with self._cond:
643 self.scheduled_operation=op 733 self.scheduled_operation=op
644 self._cond.notify() 734 self._cond.notify()
645 735
646 def prune(self): 736 def prune(self):
647 op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual') 737 op=Operation(Operation.PRUNE, MonotonicTime.now(), reason='manual')
648 with self._cond: 738 with self._cond:
649 self.scheduled_operation=op 739 self.scheduled_operation=op
650 self._cond.notify() 740 self._cond.notify()
651 741
652 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages 742 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages

mercurial