backup.py

changeset 62
b7d13b2ad67e
parent 61
bc6c3d74e6ea
child 63
1fd6814a29fc
equal deleted inserted replaced
61:bc6c3d74e6ea 62:b7d13b2ad67e
13 from threading import Thread, Lock, Condition 13 from threading import Thread, Lock, Condition
14 from scheduler import TerminableThread 14 from scheduler import TerminableThread
15 15
16 logger=borgend.logger.getChild(__name__) 16 logger=borgend.logger.getChild(__name__)
17 17
18 #
19 # State and operation related helper classes
20 #
21
18 class State(IntEnum): 22 class State(IntEnum):
19 # State 23 # State
20 INACTIVE=0 24 INACTIVE=0
21 SCHEDULED=1 25 SCHEDULED=1
22 QUEUED=2 26 QUEUED=2
43 Errors.BUSY: 'busy', 47 Errors.BUSY: 'busy',
44 Errors.OFFLINE: 'offline', 48 Errors.OFFLINE: 'offline',
45 Errors.ERRORS: 'errors' 49 Errors.ERRORS: 'errors'
46 } 50 }
47 51
48 def translate_loglevel(x): 52 class Operation:
49 if x in loglevel_translation: 53 CREATE='create'
50 return loglevel_translation[x] 54 PRUNE='prune'
51 else: 55 def __init__(self, operation, when_monotonic, **kwargs):
52 return logging.ERROR 56 self.operation=operation
53 57 self.when_monotonic=when_monotonic
54 def safe_get_int(t, x): 58 self.detail=kwargs
55 if x in t: 59
56 tmp=t[x] 60 def when(self):
57 if isinstance(tmp, int): 61 return self.when_monotonic-time.monotonic()+time.time()
58 return tmp 62
59 return None 63
64 class Status(Operation):
65 def __init__(self, backup, op=None):
66 if op:
67 super().__init__(op.operation, op.when_monotonic,
68 detail=op.detail)
69 else:
70 super().__init__(None, None)
71
72 self.name=backup.name
73 self.state=backup.state
74 self.errors=backup.errors
75
76 #
77 # Miscellaneous helper routines
78 #
60 79
61 loglevel_translation={ 80 loglevel_translation={
62 'CRITICAL': logging.CRITICAL, 81 'CRITICAL': logging.CRITICAL,
63 'ERROR': logging.ERROR, 82 'ERROR': logging.ERROR,
64 'WARNING': logging.WARNING, 83 'WARNING': logging.WARNING,
65 'DEBUG': logging.DEBUG, 84 'DEBUG': logging.DEBUG,
66 'INFO': logging.INFO 85 'INFO': logging.INFO
67 } 86 }
87
88 def translate_loglevel(x):
89 if x in loglevel_translation:
90 return loglevel_translation[x]
91 else:
92 return logging.ERROR
93
94 def safe_get_int(t, x):
95 if x in t:
96 tmp=t[x]
97 if isinstance(tmp, int):
98 return tmp
99 return None
100
101 #
102 # The Backup class
103 #
68 104
69 class Backup(TerminableThread): 105 class Backup(TerminableThread):
70 106
71 def __decode_config(self, cfg): 107 def __decode_config(self, cfg):
72 loc0='backup target %d' % self.identifier 108 loc0='backup target %d' % self.identifier
191 if t=='progress_percent': 227 if t=='progress_percent':
192 current=safe_get_int(msg, 'current') 228 current=safe_get_int(msg, 'current')
193 total=safe_get_int(msg, 'total') 229 total=safe_get_int(msg, 'total')
194 if current is not None and total is not None: 230 if current is not None and total is not None:
195 with self._cond: 231 with self._cond:
196 self.current_operation['progress_current']=current 232 self.current_operation.detail['progress_current']=current
197 self.current_operation['progress_total']=total 233 self.current_operation.detail['progress_total']=total
198 status, callback=self.__status_unlocked() 234 status, callback=self.__status_unlocked()
199 235
200 elif t=='archive_progress': 236 elif t=='archive_progress':
201 original_size=safe_get_int(msg, 'original_size') 237 original_size=safe_get_int(msg, 'original_size')
202 compressed_size=safe_get_int(msg, 'compressed_size') 238 compressed_size=safe_get_int(msg, 'compressed_size')
203 deduplicated_size=safe_get_int(msg, 'deduplicated_size') 239 deduplicated_size=safe_get_int(msg, 'deduplicated_size')
204 if original_size is not None and original_size is not None and deduplicated_size is not None: 240 if original_size is not None and original_size is not None and deduplicated_size is not None:
205 with self._cond: 241 with self._cond:
206 self.current_operation['original_size']=original_size 242 self.current_operation.detail['original_size']=original_size
207 self.current_operation['compressed_size']=compressed_size 243 self.current_operation.detail['compressed_size']=compressed_size
208 self.current_operation['deduplicated_size']=deduplicated_size 244 self.current_operation.detail['deduplicated_size']=deduplicated_size
209 status, callback=self.__status_unlocked() 245 status, callback=self.__status_unlocked()
210 246
211 elif t=='progress_message': 247 elif t=='progress_message':
212 pass 248 pass
213 249
272 errors=Errors.ERRORS 308 errors=Errors.ERRORS
273 309
274 self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors)) 310 self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors))
275 311
276 with self._cond: 312 with self._cond:
277 if self.current_operation['operation']=='create': 313 if self.current_operation.operation=='create':
278 self.lastrun_when=self.current_operation['when_monotonic'] 314 self.lastrun_when=self.current_operation.when_monotonic
279 self.thread_res=None 315 self.thread_res=None
280 self.thread_log=None 316 self.thread_log=None
281 self.borg_instance=None 317 self.borg_instance=None
282 self.current_operation=None 318 self.current_operation=None
283 self.state=State.INACTIVE 319 self.state=State.INACTIVE
286 self._cond.notify() 322 self._cond.notify()
287 323
288 def __do_launch(self, op, archive_or_repository, *args): 324 def __do_launch(self, op, archive_or_repository, *args):
289 passphrase=self.extract_passphrase() 325 passphrase=self.extract_passphrase()
290 326
291 inst=BorgInstance(op['operation'], archive_or_repository, *args) 327 inst=BorgInstance(op.operation, archive_or_repository, *args)
292 inst.launch(passphrase=passphrase) 328 inst.launch(passphrase=passphrase)
293 329
294 self.logger.debug('Creating listener threads') 330 self.logger.debug('Creating listener threads')
295 331
296 t_log=Thread(target=self.__log_listener) 332 t_log=Thread(target=self.__log_listener)
305 341
306 t_log.start() 342 t_log.start()
307 t_res.start() 343 t_res.start()
308 344
309 def __launch(self, op): 345 def __launch(self, op):
310 self.logger.debug("Launching '%s'" % op['operation']) 346 self.logger.debug("Launching '%s'" % str(op.operation))
311 347
312 if op['operation']=='create': 348 if op.operation==Operation.CREATE:
313 archive="%s::%s%s" % (self.repository.repository_name, 349 archive="%s::%s%s" % (self.repository.repository_name,
314 self.archive_prefix, 350 self.archive_prefix,
315 self.archive_template) 351 self.archive_template)
316 352
317 self.__do_launch(op, archive, 353 self.__do_launch(op, archive,
318 self.common_parameters+self.create_parameters, 354 self.common_parameters+self.create_parameters,
319 self.paths) 355 self.paths)
320 elif op['operation']=='prune': 356 elif op.operation==Operation.PRUNE:
321 self.__do_launch(op, self.repository.repository_name, 357 self.__do_launch(op, self.repository.repository_name,
322 ([{'prefix': self.archive_prefix}] + 358 ([{'prefix': self.archive_prefix}] +
323 self.common_parameters + 359 self.common_parameters +
324 self.prune_parameters)) 360 self.prune_parameters))
325 else: 361 else:
326 raise NotImplementedError("Invalid operation '%s'" % op['operation']) 362 raise NotImplementedError("Invalid operation '%s'" % str(op.operation))
327 363
328 # This must be called with self._cond held. 364 # This must be called with self._cond held.
329 def __launch_check(self): 365 def __launch_check(self):
330 op=self.scheduled_operation 366 op=self.scheduled_operation
331 if not op: 367 if not op:
334 self.scheduled_operation=None 370 self.scheduled_operation=None
335 371
336 self.__launch(op) 372 self.__launch(op)
337 373
338 self.current_operation=op 374 self.current_operation=op
339 self.current_operation['when_monotonic']=time.monotonic() 375 # Update scheduled time to real starting time to schedule
376 # next run relative to this
377 self.current_operation.when_monotonic=time.monotonic()
340 self.state=State.ACTIVE 378 self.state=State.ACTIVE
341 # Reset error status when starting a new operation 379 # Reset error status when starting a new operation
342 self.errors=Errors.OK 380 self.errors=Errors.OK
343 self.__update_status() 381 self.__update_status()
344 382
391 op=None 429 op=None
392 if not self.scheduled_operation: 430 if not self.scheduled_operation:
393 op=self.__next_operation_unlocked() 431 op=self.__next_operation_unlocked()
394 if op: 432 if op:
395 now=time.monotonic() 433 now=time.monotonic()
396 delay=max(0, op['when_monotonic']-now) 434 delay=max(0, op.when_monotonic-now)
397 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % 435 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" %
398 (op['operation'], op['detail'], delay)) 436 (str(op.operation), op.detail or 'none', delay))
399 437
400 self.scheduled_operation=op 438 self.scheduled_operation=op
401 self.state=State.SCHEDULED 439 self.state=State.SCHEDULED
402 self.__update_status() 440 self.__update_status()
403 441
433 if initial_interval==0: 471 if initial_interval==0:
434 initial_interval=self.backup_interval 472 initial_interval=self.backup_interval
435 if initial_interval==0: 473 if initial_interval==0:
436 return None 474 return None
437 else: 475 else:
438 return {'operation': 'create', 476 return Operation(Operation.CREATE, now+initial_interval,
439 'detail': 'initial', 477 reason='initial')
440 'when_monotonic': now+initial_interval}
441 elif not self.errors.ok(): 478 elif not self.errors.ok():
442 if self.retry_interval==0: 479 if self.retry_interval==0:
443 return None 480 return None
444 else: 481 else:
445 return {'operation': 'create', 482 return Operation(Operation.CREATE,
446 'detail': 'retry', 483 self.lastrun_when+self.retry_interval,
447 'when_monotonic': self.lastrun_when+self.retry_interval} 484 reason='retry')
448 else: 485 else:
449 if self.backup_interval==0: 486 if self.backup_interval==0:
450 return None 487 return None
451 else: 488 else:
452 return {'operation': 'create', 489 return Operation(Operation.CREATE,
453 'detail': 'normal', 490 self.lastrun_when+self.backup_interval)
454 'when_monotonic': self.lastrun_when+self.backup_interval}
455 491
456 def __status_unlocked(self): 492 def __status_unlocked(self):
457 callback=self.__status_update_callback 493 callback=self.__status_update_callback
458 494
459 if self.current_operation: 495 if self.current_operation:
460 status=self.current_operation 496 status=Status(self, self.current_operation)
461 elif self.scheduled_operation: 497 elif self.scheduled_operation:
462 status=self.scheduled_operation 498 status=Status(self, self.scheduled_operation)
463 else: 499 else:
464 status={'type': 'nothing'} 500 status=Status(self)
465
466 status['name']=self._name
467 status['state']=self.state
468 status['errors']=self.errors
469
470 if 'detail' not in status:
471 status['detail']='NONE'
472
473 if 'when_monotonic' in status:
474 status['when']=(status['when_monotonic']
475 -time.monotonic()+time.time())
476 501
477 return status, callback 502 return status, callback
478 503
479 def __update_status(self): 504 def __update_status(self):
480 status, callback = self.__status_unlocked() 505 status, callback = self.__status_unlocked()
499 with self._cond: 524 with self._cond:
500 res=self.__status_unlocked() 525 res=self.__status_unlocked()
501 return res[0] 526 return res[0]
502 527
503 def create(self): 528 def create(self):
504 op={'operation': 'create', 'detail': 'manual'} 529 op=Operation(Operation.CREATE, time.monotonic(), reason='manual')
505 with self._cond: 530 with self._cond:
506 self.scheduled_operation=op 531 self.scheduled_operation=op
507 self._cond.notify() 532 self._cond.notify()
508 533
509 def prune(self): 534 def prune(self):
510 op={'operation': 'prune', 'detail': 'manual'} 535 op=Operation(Operation.PRUNE, time.monotonic(), reason='manual')
511 with self._cond: 536 with self._cond:
512 self.scheduled_operation=op 537 self.scheduled_operation=op
513 self._cond.notify() 538 self._cond.notify()
514 539
515 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages 540 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages

mercurial