| 1 # |
|
| 2 # Borgend Backup instance |
|
| 3 # |
|
| 4 |
|
| 5 import config |
|
| 6 import logging |
|
| 7 import time |
|
| 8 import loggers |
|
| 9 import repository |
|
| 10 import dreamtime |
|
| 11 from enum import IntEnum |
|
| 12 from instance import BorgInstance |
|
| 13 from threading import Thread, Lock, Condition |
|
| 14 from scheduler import TerminableThread |
|
| 15 |
|
| 16 logger=loggers.get(__name__) |
|
| 17 |
|
| 18 JOIN_TIMEOUT=60 |
|
| 19 |
|
| 20 # |
|
| 21 # State and operation related helper classes |
|
| 22 # |
|
| 23 |
|
| 24 class State(IntEnum): |
|
| 25 # State |
|
| 26 INACTIVE=0 |
|
| 27 SCHEDULED=1 |
|
| 28 QUEUED=2 |
|
| 29 ACTIVE=3 |
|
| 30 |
|
| 31 |
|
| 32 class Errors(IntEnum): |
|
| 33 OK=0 |
|
| 34 BUSY=1 |
|
| 35 OFFLINE=2 |
|
| 36 ERRORS=3 |
|
| 37 |
|
| 38 def combine(self, other): |
|
| 39 return max(self, other) |
|
| 40 |
|
| 41 def ok(self): |
|
| 42 return self==self.OK |
|
| 43 |
|
| 44 def __str__(self): |
|
| 45 return _errorstring[self] |
|
| 46 |
|
| 47 _errorstring={ |
|
| 48 Errors.OK: 'ok', |
|
| 49 Errors.BUSY: 'busy', |
|
| 50 Errors.OFFLINE: 'offline', |
|
| 51 Errors.ERRORS: 'errors' |
|
| 52 } |
|
| 53 |
|
| 54 class Operation: |
|
| 55 CREATE='create' |
|
| 56 PRUNE='prune' |
|
| 57 def __init__(self, operation, time, **kwargs): |
|
| 58 self.operation=operation |
|
| 59 self.time=time |
|
| 60 self.detail=kwargs |
|
| 61 |
|
| 62 def when(self): |
|
| 63 return self.time.realtime() |
|
| 64 |
|
| 65 |
|
| 66 class Status(Operation): |
|
| 67 def __init__(self, backup, op=None): |
|
| 68 if op: |
|
| 69 super().__init__(op.operation, op.time, **op.detail) |
|
| 70 else: |
|
| 71 super().__init__(None, None) |
|
| 72 |
|
| 73 self.name=backup.name |
|
| 74 self.state=backup.state |
|
| 75 self.errors=backup.errors |
|
| 76 |
|
| 77 # |
|
| 78 # Miscellaneous helper routines |
|
| 79 # |
|
| 80 |
|
| 81 loglevel_translation={ |
|
| 82 'CRITICAL': logging.CRITICAL, |
|
| 83 'ERROR': logging.ERROR, |
|
| 84 'WARNING': logging.WARNING, |
|
| 85 'DEBUG': logging.DEBUG, |
|
| 86 'INFO': logging.INFO |
|
| 87 } |
|
| 88 |
|
| 89 def translate_loglevel(x): |
|
| 90 if x in loglevel_translation: |
|
| 91 return loglevel_translation[x] |
|
| 92 else: |
|
| 93 return logging.ERROR |
|
| 94 |
|
| 95 def safe_get_int(t, x): |
|
| 96 if x in t: |
|
| 97 tmp=t[x] |
|
| 98 if isinstance(tmp, int): |
|
| 99 return tmp |
|
| 100 return None |
|
| 101 |
|
| 102 # |
|
| 103 # The Backup class |
|
| 104 # |
|
| 105 |
|
| 106 class Backup(TerminableThread): |
|
| 107 |
|
| 108 def __decode_config(self, cfg): |
|
| 109 loc0='Backup %d' % self.identifier |
|
| 110 |
|
| 111 self.backup_name=config.check_string(cfg, 'name', 'Name', loc0) |
|
| 112 |
|
| 113 logger.debug("Configuring backup '%s'" % self.backup_name) |
|
| 114 |
|
| 115 self.logger=logger.getChild(self.backup_name) |
|
| 116 |
|
| 117 loc="Backup '%s'" % self.backup_name |
|
| 118 |
|
| 119 reponame=config.check_string(cfg, 'repository', |
|
| 120 'Target repository', loc) |
|
| 121 |
|
| 122 self.repository=repository.find_repository(reponame) |
|
| 123 if not self.repository: |
|
| 124 raise Exception("Repository '%s' not configured" % reponame) |
|
| 125 |
|
| 126 self.archive_prefix=config.check_string(cfg, 'archive_prefix', |
|
| 127 'Archive prefix', loc) |
|
| 128 |
|
| 129 self.archive_template=config.check_string(cfg, 'archive_template', |
|
| 130 'Archive template', loc) |
|
| 131 |
|
| 132 self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', |
|
| 133 'Backup interval', loc, |
|
| 134 config.defaults['backup_interval']) |
|
| 135 |
|
| 136 self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', |
|
| 137 'Retry interval', loc, |
|
| 138 config.defaults['retry_interval']) |
|
| 139 |
|
| 140 |
|
| 141 scheduling=config.check_string(cfg, 'scheduling', |
|
| 142 'Scheduling mode', loc, |
|
| 143 default="dreamtime") |
|
| 144 |
|
| 145 if scheduling=="dreamtime": |
|
| 146 self.timeclass=dreamtime.DreamTime |
|
| 147 elif scheduling=="realtime": |
|
| 148 self.timeclass=dreamtime.MonotonicTime |
|
| 149 elif scheduling=="manual": |
|
| 150 self.backup_interval=0 |
|
| 151 else: |
|
| 152 logging.error("Invalid time class '%s' for %s" % (scheduling, loc)) |
|
| 153 |
|
| 154 self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) |
|
| 155 |
|
| 156 self.borg_parameters=config.BorgParameters.from_config(cfg, loc) |
|
| 157 |
|
| 158 |
|
| 159 def __init__(self, identifier, cfg, scheduler): |
|
| 160 self.identifier=identifier |
|
| 161 self.__status_update_callback=None |
|
| 162 self.scheduler=scheduler |
|
| 163 self.logger=None # setup up in __decode_config once backup name is known |
|
| 164 |
|
| 165 self.borg_instance=None |
|
| 166 self.thread_log=None |
|
| 167 self.thread_res=None |
|
| 168 |
|
| 169 self.current_operation=None |
|
| 170 self.scheduled_operation=None |
|
| 171 self.lastrun_when=None |
|
| 172 self.lastrun_finished=None |
|
| 173 self.state=State.INACTIVE |
|
| 174 self.errors=Errors.OK |
|
| 175 self.timeclass=dreamtime.DreamTime |
|
| 176 |
|
| 177 self.__decode_config(cfg) |
|
| 178 |
|
| 179 super().__init__(target = self.__main_thread, name = self.backup_name) |
|
| 180 self.daemon=True |
|
| 181 |
|
| 182 def is_running(self): |
|
| 183 with self._cond: |
|
| 184 running=self.__is_running_unlocked() |
|
| 185 return running |
|
| 186 |
|
| 187 def __is_running_unlocked(self): |
|
| 188 running=self.current_operation |
|
| 189 if not running: |
|
| 190 # Consistency check |
|
| 191 assert((not self.borg_instance and not self.thread_log and |
|
| 192 not self.thread_res)) |
|
| 193 return running |
|
| 194 |
|
| 195 def __block_when_running(self): |
|
| 196 running=self.is_running() |
|
| 197 assert(not running) |
|
| 198 |
|
| 199 def __log_listener(self): |
|
| 200 self.logger.debug('Log listener thread waiting for entries') |
|
| 201 success=True |
|
| 202 for msg in iter(self.borg_instance.read_log, None): |
|
| 203 self.logger.info(str(msg)) |
|
| 204 t=msg['type'] |
|
| 205 |
|
| 206 errormsg=None |
|
| 207 callback=None |
|
| 208 |
|
| 209 if t=='progress_percent': |
|
| 210 current=safe_get_int(msg, 'current') |
|
| 211 total=safe_get_int(msg, 'total') |
|
| 212 if current is not None and total is not None: |
|
| 213 with self._cond: |
|
| 214 self.current_operation.detail['progress_current']=current |
|
| 215 self.current_operation.detail['progress_total']=total |
|
| 216 status, callback=self.__status_unlocked() |
|
| 217 |
|
| 218 elif t=='archive_progress': |
|
| 219 original_size=safe_get_int(msg, 'original_size') |
|
| 220 compressed_size=safe_get_int(msg, 'compressed_size') |
|
| 221 deduplicated_size=safe_get_int(msg, 'deduplicated_size') |
|
| 222 if original_size is not None and original_size is not None and deduplicated_size is not None: |
|
| 223 with self._cond: |
|
| 224 self.current_operation.detail['original_size']=original_size |
|
| 225 self.current_operation.detail['compressed_size']=compressed_size |
|
| 226 self.current_operation.detail['deduplicated_size']=deduplicated_size |
|
| 227 status, callback=self.__status_unlocked() |
|
| 228 |
|
| 229 elif t=='progress_message': |
|
| 230 pass |
|
| 231 |
|
| 232 elif t=='file_status': |
|
| 233 pass |
|
| 234 |
|
| 235 elif t=='log_message': |
|
| 236 if 'levelname' not in msg: |
|
| 237 msg['levelname']='ERROR' |
|
| 238 if 'message' not in msg: |
|
| 239 msg['message']='UNKNOWN' |
|
| 240 if 'name' not in msg: |
|
| 241 msg['name']='borg' |
|
| 242 lvl=translate_loglevel(msg['levelname']) |
|
| 243 self.logger.log(lvl, msg['name'] + ': ' + msg['message']) |
|
| 244 if lvl>=logging.ERROR: |
|
| 245 errormsg=msg |
|
| 246 errors=Errors.ERRORS |
|
| 247 if ('msgid' in msg and |
|
| 248 (msg['msgid']=='LockTimeout' or # observed in reality |
|
| 249 msg['msgid']=='LockErrorT' or # in docs |
|
| 250 msg['msgid']=='LockErrorT')): # in docs |
|
| 251 errors=Errors.BUSY |
|
| 252 with self._cond: |
|
| 253 self.errors=self.errors.combine(errors) |
|
| 254 status, callback=self.__status_unlocked() |
|
| 255 else: |
|
| 256 self.logger.debug('Unrecognised log entry %s' % str(status)) |
|
| 257 |
|
| 258 if callback: |
|
| 259 callback(status, errors=errormsg) |
|
| 260 |
|
| 261 self.logger.debug('Waiting for borg subprocess to terminate in log thread') |
|
| 262 |
|
| 263 self.borg_instance.wait() |
|
| 264 |
|
| 265 self.logger.debug('Borg subprocess terminated; terminating log listener thread') |
|
| 266 |
|
| 267 def __result_listener(self): |
|
| 268 self.logger.debug('Result listener thread waiting for result') |
|
| 269 |
|
| 270 res=self.borg_instance.read_result() |
|
| 271 |
|
| 272 self.logger.debug('Borg result: %s' % str(res)) |
|
| 273 |
|
| 274 with self._cond: |
|
| 275 if res is None and self.errors.ok(): |
|
| 276 self.logger.error('No result from borg despite no error in log') |
|
| 277 self.errors=Errors.ERRORS |
|
| 278 |
|
| 279 |
|
| 280 def __do_launch(self, op, archive_or_repository, |
|
| 281 common_params, op_params, paths=[]): |
|
| 282 |
|
| 283 inst=BorgInstance(op.operation, archive_or_repository, |
|
| 284 common_params, op_params, paths) |
|
| 285 |
|
| 286 # Only the Repository object has access to the passphrase |
|
| 287 self.repository.launch_borg_instance(inst) |
|
| 288 |
|
| 289 self.logger.debug('Creating listener threads') |
|
| 290 |
|
| 291 t_log=Thread(target=self.__log_listener) |
|
| 292 t_log.daemon=True |
|
| 293 |
|
| 294 t_res=Thread(target=self.__result_listener) |
|
| 295 t_res.daemon=True |
|
| 296 |
|
| 297 self.thread_log=t_log |
|
| 298 self.thread_res=t_res |
|
| 299 self.borg_instance=inst |
|
| 300 self.current_operation=op |
|
| 301 # Update scheduled time to real starting time to schedule |
|
| 302 # next run relative to this |
|
| 303 self.current_operation.time=dreamtime.MonotonicTime.now() |
|
| 304 self.state=State.ACTIVE |
|
| 305 # Reset error status when starting a new operation |
|
| 306 self.errors=Errors.OK |
|
| 307 self.__update_status() |
|
| 308 |
|
| 309 t_log.start() |
|
| 310 t_res.start() |
|
| 311 |
|
| 312 |
|
| 313 def __launch(self, op): |
|
| 314 self.logger.debug("Launching '%s'" % str(op.operation)) |
|
| 315 |
|
| 316 params=(config.borg_parameters |
|
| 317 +self.repository.borg_parameters |
|
| 318 +self.borg_parameters) |
|
| 319 |
|
| 320 if op.operation==Operation.CREATE: |
|
| 321 archive="%s::%s%s" % (self.repository.location, |
|
| 322 self.archive_prefix, |
|
| 323 self.archive_template) |
|
| 324 |
|
| 325 self.__do_launch(op, archive, params.common, |
|
| 326 params.create, self.paths) |
|
| 327 elif op.operation==Operation.PRUNE: |
|
| 328 self.__do_launch(op, self.repository.location, params.common, |
|
| 329 [{'prefix': self.archive_prefix}] + params.create) |
|
| 330 |
|
| 331 else: |
|
| 332 raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) |
|
| 333 |
|
| 334 # This must be called with self._cond held. |
|
| 335 def __launch_and_wait(self): |
|
| 336 op=self.scheduled_operation |
|
| 337 if not op: |
|
| 338 self.logger.debug("Queued operation aborted") |
|
| 339 else: |
|
| 340 self.scheduled_operation=None |
|
| 341 |
|
| 342 self.__launch(op) |
|
| 343 |
|
| 344 self.__wait_finish() |
|
| 345 |
|
| 346 def __wait_finish(self): |
|
| 347 # Wait for main logger thread to terminate, or for us to be terminated |
|
| 348 while not self.terminate and self.thread_res.is_alive(): |
|
| 349 self._cond.release() |
|
| 350 self.thread_res.join(JOIN_TIMEOUT) |
|
| 351 self._cond.acquire() |
|
| 352 |
|
| 353 # If terminate has been signalled, let outer termination handler |
|
| 354 # take care of things (Within this Backup class, it would be cleanest |
|
| 355 # to raise an exception instead, but in most other places it's better |
|
| 356 # to just check self._terminate, so we don't complicate things with |
|
| 357 # an extra exception.) |
|
| 358 if self._terminate: |
|
| 359 return |
|
| 360 |
|
| 361 self.logger.debug('Waiting for borg and log subprocesses to terminate') |
|
| 362 |
|
| 363 self._cond.release() |
|
| 364 self.thread_log.join() |
|
| 365 self._cond.acquire() |
|
| 366 |
|
| 367 if not self.borg_instance.wait(): |
|
| 368 self.logger.error('Borg subprocess did not terminate') |
|
| 369 self.errors=self.errors.combine(Errors.ERRORS) |
|
| 370 |
|
| 371 if self.current_operation.operation=='create': |
|
| 372 self.lastrun_when=self.current_operation.time.monotonic() |
|
| 373 self.lastrun_finished=time.monotonic() |
|
| 374 self.thread_res=None |
|
| 375 self.thread_log=None |
|
| 376 self.borg_instance=None |
|
| 377 self.current_operation=None |
|
| 378 self.state=State.INACTIVE |
|
| 379 self.__update_status() |
|
| 380 |
|
| 381 def __main_thread(self): |
|
| 382 with self._cond: |
|
| 383 try: |
|
| 384 while not self._terminate: |
|
| 385 assert(not self.current_operation) |
|
| 386 self.__main_thread_wait_schedule() |
|
| 387 if not self._terminate: |
|
| 388 self.__main_thread_queue_and_launch() |
|
| 389 except Exception as err: |
|
| 390 self.logger.exception("Error with backup '%s'" % self.backup_name) |
|
| 391 self.errors=Errors.ERRORS |
|
| 392 |
|
| 393 self.state=State.INACTIVE |
|
| 394 self.scheduled_operation=None |
|
| 395 |
|
| 396 # Clean up to terminate: kill borg instance and communication threads |
|
| 397 if self.borg_instance: |
|
| 398 self.logger.debug("Terminating a borg instance") |
|
| 399 self.borg_instance.terminate() |
|
| 400 |
|
| 401 # Store threads to use outside lock |
|
| 402 thread_log=self.thread_log |
|
| 403 thread_res=self.thread_res |
|
| 404 self.thread_log=None |
|
| 405 self.thread_res=None |
|
| 406 |
|
| 407 self.logger.debug("Waiting for log and result threads to terminate") |
|
| 408 |
|
| 409 if thread_log: |
|
| 410 thread_log.join() |
|
| 411 |
|
| 412 if thread_res: |
|
| 413 thread_res.join() |
|
| 414 |
|
| 415 # Main thread/2. Schedule next operation if there is no manually |
|
| 416 # requested one |
|
| 417 def __main_thread_wait_schedule(self): |
|
| 418 op=None |
|
| 419 if not self.scheduled_operation: |
|
| 420 op=self.__next_operation_unlocked() |
|
| 421 if op: |
|
| 422 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" % |
|
| 423 (str(op.operation), op.detail or 'none', |
|
| 424 op.time.seconds_to(), |
|
| 425 op.time.__class__.__name__)) |
|
| 426 |
|
| 427 self.scheduled_operation=op |
|
| 428 self.state=State.SCHEDULED |
|
| 429 self.__update_status() |
|
| 430 |
|
| 431 # Wait under scheduled wait |
|
| 432 self.scheduler.wait_until(op.time, self._cond, self.backup_name) |
|
| 433 else: |
|
| 434 # Nothing scheduled - just wait |
|
| 435 self.logger.info("Waiting for manual scheduling") |
|
| 436 |
|
| 437 self.state=State.INACTIVE |
|
| 438 self.__update_status() |
|
| 439 |
|
| 440 self._cond.wait() |
|
| 441 |
|
| 442 # Main thread/3. If there is a scheduled operation (it might have been |
|
| 443 # changed manually from 'op' created in __main_thread_wait_schedule above), |
|
| 444 # queue it on the repository, and launch the operation once repository |
|
| 445 # available |
|
| 446 def __main_thread_queue_and_launch(self): |
|
| 447 if self.scheduled_operation: |
|
| 448 self.logger.debug("Queuing") |
|
| 449 self.state=State.QUEUED |
|
| 450 self.__update_status() |
|
| 451 res=self.repository.queue_action(self._cond, |
|
| 452 action=self.__launch_and_wait, |
|
| 453 name=self.backup_name) |
|
| 454 if not res and not self._terminate: |
|
| 455 self.logger.debug("Queueing aborted") |
|
| 456 self.scheduled_operation=None |
|
| 457 self.state=State.INACTIVE |
|
| 458 self.__update_status() |
|
| 459 |
|
| 460 def __next_operation_unlocked(self): |
|
| 461 # TODO: pruning as well |
|
| 462 if not self.lastrun_finished: |
|
| 463 initial_interval=self.retry_interval |
|
| 464 if initial_interval==0: |
|
| 465 initial_interval=self.backup_interval |
|
| 466 if initial_interval==0: |
|
| 467 return None |
|
| 468 else: |
|
| 469 tm=self.timeclass.after(initial_interval) |
|
| 470 return Operation(Operation.CREATE, tm, reason='initial') |
|
| 471 elif not self.errors.ok(): |
|
| 472 if self.retry_interval==0: |
|
| 473 return None |
|
| 474 else: |
|
| 475 tm=dreamtime.MonotonicTime(self.lastrun_finished+self.retry_interval) |
|
| 476 return Operation(Operation.CREATE, tm, reason='retry') |
|
| 477 else: |
|
| 478 if self.backup_interval==0: |
|
| 479 return None |
|
| 480 else: |
|
| 481 tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval) |
|
| 482 return Operation(Operation.CREATE, tm) |
|
| 483 |
|
| 484 def __status_unlocked(self): |
|
| 485 callback=self.__status_update_callback |
|
| 486 |
|
| 487 if self.current_operation: |
|
| 488 status=Status(self, self.current_operation) |
|
| 489 elif self.scheduled_operation: |
|
| 490 status=Status(self, self.scheduled_operation) |
|
| 491 else: |
|
| 492 status=Status(self) |
|
| 493 |
|
| 494 return status, callback |
|
| 495 |
|
| 496 def __update_status(self): |
|
| 497 status, callback = self.__status_unlocked() |
|
| 498 if callback: |
|
| 499 #self._cond.release() |
|
| 500 try: |
|
| 501 callback(status) |
|
| 502 except Exception: |
|
| 503 self.logger.exception("Status update error") |
|
| 504 #finally: |
|
| 505 # self._cond.acquire() |
|
| 506 |
|
| 507 # |
|
| 508 # Interface functions |
|
| 509 # |
|
| 510 |
|
| 511 def set_status_update_callback(self, callback): |
|
| 512 with self._cond: |
|
| 513 self.__status_update_callback=callback |
|
| 514 |
|
| 515 def status(self): |
|
| 516 with self._cond: |
|
| 517 res=self.__status_unlocked() |
|
| 518 return res[0] |
|
| 519 |
|
| 520 def create(self): |
|
| 521 op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual') |
|
| 522 with self._cond: |
|
| 523 self.scheduled_operation=op |
|
| 524 self._cond.notify() |
|
| 525 |
|
| 526 def prune(self): |
|
| 527 op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual') |
|
| 528 with self._cond: |
|
| 529 self.scheduled_operation=op |
|
| 530 self._cond.notify() |
|
| 531 |
|
| 532 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
|
| 533 def abort(self): |
|
| 534 with self._cond: |
|
| 535 if self.borg_instance: |
|
| 536 self.borg_instance.terminate() |
|
| 537 |
|