1 # |
|
2 # Borgend Backup instance |
|
3 # |
|
4 |
|
5 import config |
|
6 import logging |
|
7 import time |
|
8 import loggers |
|
9 import repository |
|
10 import dreamtime |
|
11 from enum import IntEnum |
|
12 from instance import BorgInstance |
|
13 from threading import Thread, Lock, Condition |
|
14 from scheduler import TerminableThread |
|
15 |
|
16 logger=loggers.get(__name__) |
|
17 |
|
18 JOIN_TIMEOUT=60 |
|
19 |
|
20 # |
|
21 # State and operation related helper classes |
|
22 # |
|
23 |
|
24 class State(IntEnum): |
|
25 # State |
|
26 INACTIVE=0 |
|
27 SCHEDULED=1 |
|
28 QUEUED=2 |
|
29 ACTIVE=3 |
|
30 |
|
31 |
|
32 class Errors(IntEnum): |
|
33 OK=0 |
|
34 BUSY=1 |
|
35 OFFLINE=2 |
|
36 ERRORS=3 |
|
37 |
|
38 def combine(self, other): |
|
39 return max(self, other) |
|
40 |
|
41 def ok(self): |
|
42 return self==self.OK |
|
43 |
|
44 def __str__(self): |
|
45 return _errorstring[self] |
|
46 |
|
47 _errorstring={ |
|
48 Errors.OK: 'ok', |
|
49 Errors.BUSY: 'busy', |
|
50 Errors.OFFLINE: 'offline', |
|
51 Errors.ERRORS: 'errors' |
|
52 } |
|
53 |
|
54 class Operation: |
|
55 CREATE='create' |
|
56 PRUNE='prune' |
|
57 def __init__(self, operation, time, **kwargs): |
|
58 self.operation=operation |
|
59 self.time=time |
|
60 self.detail=kwargs |
|
61 |
|
62 def when(self): |
|
63 return self.time.realtime() |
|
64 |
|
65 |
|
66 class Status(Operation): |
|
67 def __init__(self, backup, op=None): |
|
68 if op: |
|
69 super().__init__(op.operation, op.time, **op.detail) |
|
70 else: |
|
71 super().__init__(None, None) |
|
72 |
|
73 self.name=backup.name |
|
74 self.state=backup.state |
|
75 self.errors=backup.errors |
|
76 |
|
77 # |
|
78 # Miscellaneous helper routines |
|
79 # |
|
80 |
|
81 loglevel_translation={ |
|
82 'CRITICAL': logging.CRITICAL, |
|
83 'ERROR': logging.ERROR, |
|
84 'WARNING': logging.WARNING, |
|
85 'DEBUG': logging.DEBUG, |
|
86 'INFO': logging.INFO |
|
87 } |
|
88 |
|
89 def translate_loglevel(x): |
|
90 if x in loglevel_translation: |
|
91 return loglevel_translation[x] |
|
92 else: |
|
93 return logging.ERROR |
|
94 |
|
95 def safe_get_int(t, x): |
|
96 if x in t: |
|
97 tmp=t[x] |
|
98 if isinstance(tmp, int): |
|
99 return tmp |
|
100 return None |
|
101 |
|
102 # |
|
103 # The Backup class |
|
104 # |
|
105 |
|
106 class Backup(TerminableThread): |
|
107 |
|
108 def __decode_config(self, cfg): |
|
109 loc0='Backup %d' % self.identifier |
|
110 |
|
111 self.backup_name=config.check_string(cfg, 'name', 'Name', loc0) |
|
112 |
|
113 logger.debug("Configuring backup '%s'" % self.backup_name) |
|
114 |
|
115 self.logger=logger.getChild(self.backup_name) |
|
116 |
|
117 loc="Backup '%s'" % self.backup_name |
|
118 |
|
119 reponame=config.check_string(cfg, 'repository', |
|
120 'Target repository', loc) |
|
121 |
|
122 self.repository=repository.find_repository(reponame) |
|
123 if not self.repository: |
|
124 raise Exception("Repository '%s' not configured" % reponame) |
|
125 |
|
126 self.archive_prefix=config.check_string(cfg, 'archive_prefix', |
|
127 'Archive prefix', loc) |
|
128 |
|
129 self.archive_template=config.check_string(cfg, 'archive_template', |
|
130 'Archive template', loc) |
|
131 |
|
132 self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', |
|
133 'Backup interval', loc, |
|
134 config.defaults['backup_interval']) |
|
135 |
|
136 self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', |
|
137 'Retry interval', loc, |
|
138 config.defaults['retry_interval']) |
|
139 |
|
140 |
|
141 scheduling=config.check_string(cfg, 'scheduling', |
|
142 'Scheduling mode', loc, |
|
143 default="dreamtime") |
|
144 |
|
145 if scheduling=="dreamtime": |
|
146 self.timeclass=dreamtime.DreamTime |
|
147 elif scheduling=="realtime": |
|
148 self.timeclass=dreamtime.MonotonicTime |
|
149 elif scheduling=="manual": |
|
150 self.backup_interval=0 |
|
151 else: |
|
152 logging.error("Invalid time class '%s' for %s" % (scheduling, loc)) |
|
153 |
|
154 self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) |
|
155 |
|
156 self.borg_parameters=config.BorgParameters.from_config(cfg, loc) |
|
157 |
|
158 |
|
159 def __init__(self, identifier, cfg, scheduler): |
|
160 self.identifier=identifier |
|
161 self.__status_update_callback=None |
|
162 self.scheduler=scheduler |
|
163 self.logger=None # setup up in __decode_config once backup name is known |
|
164 |
|
165 self.borg_instance=None |
|
166 self.thread_log=None |
|
167 self.thread_res=None |
|
168 |
|
169 self.current_operation=None |
|
170 self.scheduled_operation=None |
|
171 self.lastrun_when=None |
|
172 self.lastrun_finished=None |
|
173 self.state=State.INACTIVE |
|
174 self.errors=Errors.OK |
|
175 self.timeclass=dreamtime.DreamTime |
|
176 |
|
177 self.__decode_config(cfg) |
|
178 |
|
179 super().__init__(target = self.__main_thread, name = self.backup_name) |
|
180 self.daemon=True |
|
181 |
|
182 def is_running(self): |
|
183 with self._cond: |
|
184 running=self.__is_running_unlocked() |
|
185 return running |
|
186 |
|
187 def __is_running_unlocked(self): |
|
188 running=self.current_operation |
|
189 if not running: |
|
190 # Consistency check |
|
191 assert((not self.borg_instance and not self.thread_log and |
|
192 not self.thread_res)) |
|
193 return running |
|
194 |
|
195 def __block_when_running(self): |
|
196 running=self.is_running() |
|
197 assert(not running) |
|
198 |
|
199 def __log_listener(self): |
|
200 self.logger.debug('Log listener thread waiting for entries') |
|
201 success=True |
|
202 for msg in iter(self.borg_instance.read_log, None): |
|
203 self.logger.info(str(msg)) |
|
204 t=msg['type'] |
|
205 |
|
206 errormsg=None |
|
207 callback=None |
|
208 |
|
209 if t=='progress_percent': |
|
210 current=safe_get_int(msg, 'current') |
|
211 total=safe_get_int(msg, 'total') |
|
212 if current is not None and total is not None: |
|
213 with self._cond: |
|
214 self.current_operation.detail['progress_current']=current |
|
215 self.current_operation.detail['progress_total']=total |
|
216 status, callback=self.__status_unlocked() |
|
217 |
|
218 elif t=='archive_progress': |
|
219 original_size=safe_get_int(msg, 'original_size') |
|
220 compressed_size=safe_get_int(msg, 'compressed_size') |
|
221 deduplicated_size=safe_get_int(msg, 'deduplicated_size') |
|
222 if original_size is not None and original_size is not None and deduplicated_size is not None: |
|
223 with self._cond: |
|
224 self.current_operation.detail['original_size']=original_size |
|
225 self.current_operation.detail['compressed_size']=compressed_size |
|
226 self.current_operation.detail['deduplicated_size']=deduplicated_size |
|
227 status, callback=self.__status_unlocked() |
|
228 |
|
229 elif t=='progress_message': |
|
230 pass |
|
231 |
|
232 elif t=='file_status': |
|
233 pass |
|
234 |
|
235 elif t=='log_message': |
|
236 if 'levelname' not in msg: |
|
237 msg['levelname']='ERROR' |
|
238 if 'message' not in msg: |
|
239 msg['message']='UNKNOWN' |
|
240 if 'name' not in msg: |
|
241 msg['name']='borg' |
|
242 lvl=translate_loglevel(msg['levelname']) |
|
243 self.logger.log(lvl, msg['name'] + ': ' + msg['message']) |
|
244 if lvl>=logging.ERROR: |
|
245 errormsg=msg |
|
246 errors=Errors.ERRORS |
|
247 if ('msgid' in msg and |
|
248 (msg['msgid']=='LockTimeout' or # observed in reality |
|
249 msg['msgid']=='LockErrorT' or # in docs |
|
250 msg['msgid']=='LockErrorT')): # in docs |
|
251 errors=Errors.BUSY |
|
252 with self._cond: |
|
253 self.errors=self.errors.combine(errors) |
|
254 status, callback=self.__status_unlocked() |
|
255 else: |
|
256 self.logger.debug('Unrecognised log entry %s' % str(status)) |
|
257 |
|
258 if callback: |
|
259 callback(status, errors=errormsg) |
|
260 |
|
261 self.logger.debug('Waiting for borg subprocess to terminate in log thread') |
|
262 |
|
263 self.borg_instance.wait() |
|
264 |
|
265 self.logger.debug('Borg subprocess terminated; terminating log listener thread') |
|
266 |
|
267 def __result_listener(self): |
|
268 self.logger.debug('Result listener thread waiting for result') |
|
269 |
|
270 res=self.borg_instance.read_result() |
|
271 |
|
272 self.logger.debug('Borg result: %s' % str(res)) |
|
273 |
|
274 with self._cond: |
|
275 if res is None and self.errors.ok(): |
|
276 self.logger.error('No result from borg despite no error in log') |
|
277 self.errors=Errors.ERRORS |
|
278 |
|
279 |
|
280 def __do_launch(self, op, archive_or_repository, |
|
281 common_params, op_params, paths=[]): |
|
282 |
|
283 inst=BorgInstance(op.operation, archive_or_repository, |
|
284 common_params, op_params, paths) |
|
285 |
|
286 # Only the Repository object has access to the passphrase |
|
287 self.repository.launch_borg_instance(inst) |
|
288 |
|
289 self.logger.debug('Creating listener threads') |
|
290 |
|
291 t_log=Thread(target=self.__log_listener) |
|
292 t_log.daemon=True |
|
293 |
|
294 t_res=Thread(target=self.__result_listener) |
|
295 t_res.daemon=True |
|
296 |
|
297 self.thread_log=t_log |
|
298 self.thread_res=t_res |
|
299 self.borg_instance=inst |
|
300 self.current_operation=op |
|
301 # Update scheduled time to real starting time to schedule |
|
302 # next run relative to this |
|
303 self.current_operation.time=dreamtime.MonotonicTime.now() |
|
304 self.state=State.ACTIVE |
|
305 # Reset error status when starting a new operation |
|
306 self.errors=Errors.OK |
|
307 self.__update_status() |
|
308 |
|
309 t_log.start() |
|
310 t_res.start() |
|
311 |
|
312 |
|
313 def __launch(self, op): |
|
314 self.logger.debug("Launching '%s'" % str(op.operation)) |
|
315 |
|
316 params=(config.borg_parameters |
|
317 +self.repository.borg_parameters |
|
318 +self.borg_parameters) |
|
319 |
|
320 if op.operation==Operation.CREATE: |
|
321 archive="%s::%s%s" % (self.repository.location, |
|
322 self.archive_prefix, |
|
323 self.archive_template) |
|
324 |
|
325 self.__do_launch(op, archive, params.common, |
|
326 params.create, self.paths) |
|
327 elif op.operation==Operation.PRUNE: |
|
328 self.__do_launch(op, self.repository.location, params.common, |
|
329 [{'prefix': self.archive_prefix}] + params.create) |
|
330 |
|
331 else: |
|
332 raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) |
|
333 |
|
334 # This must be called with self._cond held. |
|
335 def __launch_and_wait(self): |
|
336 op=self.scheduled_operation |
|
337 if not op: |
|
338 self.logger.debug("Queued operation aborted") |
|
339 else: |
|
340 self.scheduled_operation=None |
|
341 |
|
342 self.__launch(op) |
|
343 |
|
344 self.__wait_finish() |
|
345 |
|
346 def __wait_finish(self): |
|
347 # Wait for main logger thread to terminate, or for us to be terminated |
|
348 while not self.terminate and self.thread_res.is_alive(): |
|
349 self._cond.release() |
|
350 self.thread_res.join(JOIN_TIMEOUT) |
|
351 self._cond.acquire() |
|
352 |
|
353 # If terminate has been signalled, let outer termination handler |
|
354 # take care of things (Within this Backup class, it would be cleanest |
|
355 # to raise an exception instead, but in most other places it's better |
|
356 # to just check self._terminate, so we don't complicate things with |
|
357 # an extra exception.) |
|
358 if self._terminate: |
|
359 return |
|
360 |
|
361 self.logger.debug('Waiting for borg and log subprocesses to terminate') |
|
362 |
|
363 self._cond.release() |
|
364 self.thread_log.join() |
|
365 self._cond.acquire() |
|
366 |
|
367 if not self.borg_instance.wait(): |
|
368 self.logger.error('Borg subprocess did not terminate') |
|
369 self.errors=self.errors.combine(Errors.ERRORS) |
|
370 |
|
371 if self.current_operation.operation=='create': |
|
372 self.lastrun_when=self.current_operation.time.monotonic() |
|
373 self.lastrun_finished=time.monotonic() |
|
374 self.thread_res=None |
|
375 self.thread_log=None |
|
376 self.borg_instance=None |
|
377 self.current_operation=None |
|
378 self.state=State.INACTIVE |
|
379 self.__update_status() |
|
380 |
|
381 def __main_thread(self): |
|
382 with self._cond: |
|
383 try: |
|
384 while not self._terminate: |
|
385 assert(not self.current_operation) |
|
386 self.__main_thread_wait_schedule() |
|
387 if not self._terminate: |
|
388 self.__main_thread_queue_and_launch() |
|
389 except Exception as err: |
|
390 self.logger.exception("Error with backup '%s'" % self.backup_name) |
|
391 self.errors=Errors.ERRORS |
|
392 |
|
393 self.state=State.INACTIVE |
|
394 self.scheduled_operation=None |
|
395 |
|
396 # Clean up to terminate: kill borg instance and communication threads |
|
397 if self.borg_instance: |
|
398 self.logger.debug("Terminating a borg instance") |
|
399 self.borg_instance.terminate() |
|
400 |
|
401 # Store threads to use outside lock |
|
402 thread_log=self.thread_log |
|
403 thread_res=self.thread_res |
|
404 self.thread_log=None |
|
405 self.thread_res=None |
|
406 |
|
407 self.logger.debug("Waiting for log and result threads to terminate") |
|
408 |
|
409 if thread_log: |
|
410 thread_log.join() |
|
411 |
|
412 if thread_res: |
|
413 thread_res.join() |
|
414 |
|
415 # Main thread/2. Schedule next operation if there is no manually |
|
416 # requested one |
|
417 def __main_thread_wait_schedule(self): |
|
418 op=None |
|
419 if not self.scheduled_operation: |
|
420 op=self.__next_operation_unlocked() |
|
421 if op: |
|
422 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" % |
|
423 (str(op.operation), op.detail or 'none', |
|
424 op.time.seconds_to(), |
|
425 op.time.__class__.__name__)) |
|
426 |
|
427 self.scheduled_operation=op |
|
428 self.state=State.SCHEDULED |
|
429 self.__update_status() |
|
430 |
|
431 # Wait under scheduled wait |
|
432 self.scheduler.wait_until(op.time, self._cond, self.backup_name) |
|
433 else: |
|
434 # Nothing scheduled - just wait |
|
435 self.logger.info("Waiting for manual scheduling") |
|
436 |
|
437 self.state=State.INACTIVE |
|
438 self.__update_status() |
|
439 |
|
440 self._cond.wait() |
|
441 |
|
442 # Main thread/3. If there is a scheduled operation (it might have been |
|
443 # changed manually from 'op' created in __main_thread_wait_schedule above), |
|
444 # queue it on the repository, and launch the operation once repository |
|
445 # available |
|
446 def __main_thread_queue_and_launch(self): |
|
447 if self.scheduled_operation: |
|
448 self.logger.debug("Queuing") |
|
449 self.state=State.QUEUED |
|
450 self.__update_status() |
|
451 res=self.repository.queue_action(self._cond, |
|
452 action=self.__launch_and_wait, |
|
453 name=self.backup_name) |
|
454 if not res and not self._terminate: |
|
455 self.logger.debug("Queueing aborted") |
|
456 self.scheduled_operation=None |
|
457 self.state=State.INACTIVE |
|
458 self.__update_status() |
|
459 |
|
460 def __next_operation_unlocked(self): |
|
461 # TODO: pruning as well |
|
462 if not self.lastrun_finished: |
|
463 initial_interval=self.retry_interval |
|
464 if initial_interval==0: |
|
465 initial_interval=self.backup_interval |
|
466 if initial_interval==0: |
|
467 return None |
|
468 else: |
|
469 tm=self.timeclass.after(initial_interval) |
|
470 return Operation(Operation.CREATE, tm, reason='initial') |
|
471 elif not self.errors.ok(): |
|
472 if self.retry_interval==0: |
|
473 return None |
|
474 else: |
|
475 tm=dreamtime.MonotonicTime(self.lastrun_finished+self.retry_interval) |
|
476 return Operation(Operation.CREATE, tm, reason='retry') |
|
477 else: |
|
478 if self.backup_interval==0: |
|
479 return None |
|
480 else: |
|
481 tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval) |
|
482 return Operation(Operation.CREATE, tm) |
|
483 |
|
484 def __status_unlocked(self): |
|
485 callback=self.__status_update_callback |
|
486 |
|
487 if self.current_operation: |
|
488 status=Status(self, self.current_operation) |
|
489 elif self.scheduled_operation: |
|
490 status=Status(self, self.scheduled_operation) |
|
491 else: |
|
492 status=Status(self) |
|
493 |
|
494 return status, callback |
|
495 |
|
496 def __update_status(self): |
|
497 status, callback = self.__status_unlocked() |
|
498 if callback: |
|
499 #self._cond.release() |
|
500 try: |
|
501 callback(status) |
|
502 except Exception: |
|
503 self.logger.exception("Status update error") |
|
504 #finally: |
|
505 # self._cond.acquire() |
|
506 |
|
507 # |
|
508 # Interface functions |
|
509 # |
|
510 |
|
511 def set_status_update_callback(self, callback): |
|
512 with self._cond: |
|
513 self.__status_update_callback=callback |
|
514 |
|
515 def status(self): |
|
516 with self._cond: |
|
517 res=self.__status_unlocked() |
|
518 return res[0] |
|
519 |
|
520 def create(self): |
|
521 op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual') |
|
522 with self._cond: |
|
523 self.scheduled_operation=op |
|
524 self._cond.notify() |
|
525 |
|
526 def prune(self): |
|
527 op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual') |
|
528 with self._cond: |
|
529 self.scheduled_operation=op |
|
530 self._cond.notify() |
|
531 |
|
532 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
|
533 def abort(self): |
|
534 with self._cond: |
|
535 if self.borg_instance: |
|
536 self.borg_instance.terminate() |
|
537 |
|