|
1 # |
|
2 # Borgend Backup instance |
|
3 # |
|
4 |
|
5 import logging |
|
6 import time |
|
7 from enum import IntEnum |
|
8 from threading import Thread, Lock, Condition |
|
9 |
|
10 from . import config |
|
11 from . import loggers |
|
12 from . import repository |
|
13 from . import dreamtime |
|
14 from .instance import BorgInstance |
|
15 from .scheduler import TerminableThread |
|
16 |
|
17 logger=loggers.get(__name__) |
|
18 |
|
19 JOIN_TIMEOUT=60 |
|
20 |
|
21 # |
|
22 # State and operation related helper classes |
|
23 # |
|
24 |
|
25 class State(IntEnum): |
|
26 # State |
|
27 INACTIVE=0 |
|
28 SCHEDULED=1 |
|
29 QUEUED=2 |
|
30 ACTIVE=3 |
|
31 |
|
32 |
|
33 class Errors(IntEnum): |
|
34 OK=0 |
|
35 BUSY=1 |
|
36 OFFLINE=2 |
|
37 ERRORS=3 |
|
38 |
|
39 def combine(self, other): |
|
40 return max(self, other) |
|
41 |
|
42 def ok(self): |
|
43 return self==self.OK |
|
44 |
|
45 def __str__(self): |
|
46 return _errorstring[self] |
|
47 |
|
48 _errorstring={ |
|
49 Errors.OK: 'ok', |
|
50 Errors.BUSY: 'busy', |
|
51 Errors.OFFLINE: 'offline', |
|
52 Errors.ERRORS: 'errors' |
|
53 } |
|
54 |
|
55 class Operation: |
|
56 CREATE='create' |
|
57 PRUNE='prune' |
|
58 def __init__(self, operation, time, **kwargs): |
|
59 self.operation=operation |
|
60 self.time=time |
|
61 self.detail=kwargs |
|
62 |
|
63 def when(self): |
|
64 return self.time.realtime() |
|
65 |
|
66 |
|
67 class Status(Operation): |
|
68 def __init__(self, backup, op=None): |
|
69 if op: |
|
70 super().__init__(op.operation, op.time, **op.detail) |
|
71 else: |
|
72 super().__init__(None, None) |
|
73 |
|
74 self.name=backup.name |
|
75 self.state=backup.state |
|
76 self.errors=backup.errors |
|
77 |
|
78 # |
|
79 # Miscellaneous helper routines |
|
80 # |
|
81 |
|
82 loglevel_translation={ |
|
83 'CRITICAL': logging.CRITICAL, |
|
84 'ERROR': logging.ERROR, |
|
85 'WARNING': logging.WARNING, |
|
86 'DEBUG': logging.DEBUG, |
|
87 'INFO': logging.INFO |
|
88 } |
|
89 |
|
90 def translate_loglevel(x): |
|
91 if x in loglevel_translation: |
|
92 return loglevel_translation[x] |
|
93 else: |
|
94 return logging.ERROR |
|
95 |
|
96 def safe_get_int(t, x): |
|
97 if x in t: |
|
98 tmp=t[x] |
|
99 if isinstance(tmp, int): |
|
100 return tmp |
|
101 return None |
|
102 |
|
103 # |
|
104 # The Backup class |
|
105 # |
|
106 |
|
107 class Backup(TerminableThread): |
|
108 |
|
109 def __decode_config(self, cfg): |
|
110 loc0='Backup %d' % self.identifier |
|
111 |
|
112 self.backup_name=config.check_string(cfg, 'name', 'Name', loc0) |
|
113 |
|
114 logger.debug("Configuring backup '%s'" % self.backup_name) |
|
115 |
|
116 self.logger=logger.getChild(self.backup_name) |
|
117 |
|
118 loc="Backup '%s'" % self.backup_name |
|
119 |
|
120 reponame=config.check_string(cfg, 'repository', |
|
121 'Target repository', loc) |
|
122 |
|
123 self.repository=repository.find_repository(reponame) |
|
124 if not self.repository: |
|
125 raise Exception("Repository '%s' not configured" % reponame) |
|
126 |
|
127 self.archive_prefix=config.check_string(cfg, 'archive_prefix', |
|
128 'Archive prefix', loc) |
|
129 |
|
130 self.archive_template=config.check_string(cfg, 'archive_template', |
|
131 'Archive template', loc) |
|
132 |
|
133 self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval', |
|
134 'Backup interval', loc, |
|
135 config.defaults['backup_interval']) |
|
136 |
|
137 self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', |
|
138 'Retry interval', loc, |
|
139 config.defaults['retry_interval']) |
|
140 |
|
141 |
|
142 scheduling=config.check_string(cfg, 'scheduling', |
|
143 'Scheduling mode', loc, |
|
144 default="dreamtime") |
|
145 |
|
146 if scheduling=="dreamtime": |
|
147 self.timeclass=dreamtime.DreamTime |
|
148 elif scheduling=="realtime": |
|
149 self.timeclass=dreamtime.MonotonicTime |
|
150 elif scheduling=="manual": |
|
151 self.backup_interval=0 |
|
152 else: |
|
153 logging.error("Invalid time class '%s' for %s" % (scheduling, loc)) |
|
154 |
|
155 self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) |
|
156 |
|
157 self.borg_parameters=config.BorgParameters.from_config(cfg, loc) |
|
158 |
|
159 |
|
160 def __init__(self, identifier, cfg, scheduler): |
|
161 self.identifier=identifier |
|
162 self.__status_update_callback=None |
|
163 self.scheduler=scheduler |
|
164 self.logger=None # setup up in __decode_config once backup name is known |
|
165 |
|
166 self.borg_instance=None |
|
167 self.thread_log=None |
|
168 self.thread_res=None |
|
169 |
|
170 self.current_operation=None |
|
171 self.scheduled_operation=None |
|
172 self.lastrun_when=None |
|
173 self.lastrun_finished=None |
|
174 self.state=State.INACTIVE |
|
175 self.errors=Errors.OK |
|
176 self.timeclass=dreamtime.DreamTime |
|
177 |
|
178 self.__decode_config(cfg) |
|
179 |
|
180 super().__init__(target = self.__main_thread, name = self.backup_name) |
|
181 self.daemon=True |
|
182 |
|
183 def is_running(self): |
|
184 with self._cond: |
|
185 running=self.__is_running_unlocked() |
|
186 return running |
|
187 |
|
188 def __is_running_unlocked(self): |
|
189 running=self.current_operation |
|
190 if not running: |
|
191 # Consistency check |
|
192 assert((not self.borg_instance and not self.thread_log and |
|
193 not self.thread_res)) |
|
194 return running |
|
195 |
|
196 def __block_when_running(self): |
|
197 running=self.is_running() |
|
198 assert(not running) |
|
199 |
|
200 def __log_listener(self): |
|
201 self.logger.debug('Log listener thread waiting for entries') |
|
202 success=True |
|
203 for msg in iter(self.borg_instance.read_log, None): |
|
204 self.logger.info(str(msg)) |
|
205 t=msg['type'] |
|
206 |
|
207 errormsg=None |
|
208 callback=None |
|
209 |
|
210 if t=='progress_percent': |
|
211 current=safe_get_int(msg, 'current') |
|
212 total=safe_get_int(msg, 'total') |
|
213 if current is not None and total is not None: |
|
214 with self._cond: |
|
215 self.current_operation.detail['progress_current']=current |
|
216 self.current_operation.detail['progress_total']=total |
|
217 status, callback=self.__status_unlocked() |
|
218 |
|
219 elif t=='archive_progress': |
|
220 original_size=safe_get_int(msg, 'original_size') |
|
221 compressed_size=safe_get_int(msg, 'compressed_size') |
|
222 deduplicated_size=safe_get_int(msg, 'deduplicated_size') |
|
223 if original_size is not None and original_size is not None and deduplicated_size is not None: |
|
224 with self._cond: |
|
225 self.current_operation.detail['original_size']=original_size |
|
226 self.current_operation.detail['compressed_size']=compressed_size |
|
227 self.current_operation.detail['deduplicated_size']=deduplicated_size |
|
228 status, callback=self.__status_unlocked() |
|
229 |
|
230 elif t=='progress_message': |
|
231 pass |
|
232 |
|
233 elif t=='file_status': |
|
234 pass |
|
235 |
|
236 elif t=='log_message': |
|
237 if 'levelname' not in msg: |
|
238 msg['levelname']='ERROR' |
|
239 if 'message' not in msg: |
|
240 msg['message']='UNKNOWN' |
|
241 if 'name' not in msg: |
|
242 msg['name']='borg' |
|
243 lvl=translate_loglevel(msg['levelname']) |
|
244 self.logger.log(lvl, msg['name'] + ': ' + msg['message']) |
|
245 if lvl>=logging.ERROR: |
|
246 errormsg=msg |
|
247 errors=Errors.ERRORS |
|
248 if ('msgid' in msg and |
|
249 (msg['msgid']=='LockTimeout' or # observed in reality |
|
250 msg['msgid']=='LockErrorT' or # in docs |
|
251 msg['msgid']=='LockErrorT')): # in docs |
|
252 errors=Errors.BUSY |
|
253 with self._cond: |
|
254 self.errors=self.errors.combine(errors) |
|
255 status, callback=self.__status_unlocked() |
|
256 else: |
|
257 self.logger.debug('Unrecognised log entry %s' % str(status)) |
|
258 |
|
259 if callback: |
|
260 callback(status, errors=errormsg) |
|
261 |
|
262 self.logger.debug('Waiting for borg subprocess to terminate in log thread') |
|
263 |
|
264 self.borg_instance.wait() |
|
265 |
|
266 self.logger.debug('Borg subprocess terminated; terminating log listener thread') |
|
267 |
|
268 def __result_listener(self): |
|
269 self.logger.debug('Result listener thread waiting for result') |
|
270 |
|
271 res=self.borg_instance.read_result() |
|
272 |
|
273 self.logger.debug('Borg result: %s' % str(res)) |
|
274 |
|
275 with self._cond: |
|
276 if res is None and self.errors.ok(): |
|
277 self.logger.error('No result from borg despite no error in log') |
|
278 self.errors=Errors.ERRORS |
|
279 |
|
280 |
|
281 def __do_launch(self, op, archive_or_repository, |
|
282 common_params, op_params, paths=[]): |
|
283 |
|
284 inst=BorgInstance(op.operation, archive_or_repository, |
|
285 common_params, op_params, paths) |
|
286 |
|
287 # Only the Repository object has access to the passphrase |
|
288 self.repository.launch_borg_instance(inst) |
|
289 |
|
290 self.logger.debug('Creating listener threads') |
|
291 |
|
292 t_log=Thread(target=self.__log_listener) |
|
293 t_log.daemon=True |
|
294 |
|
295 t_res=Thread(target=self.__result_listener) |
|
296 t_res.daemon=True |
|
297 |
|
298 self.thread_log=t_log |
|
299 self.thread_res=t_res |
|
300 self.borg_instance=inst |
|
301 self.current_operation=op |
|
302 # Update scheduled time to real starting time to schedule |
|
303 # next run relative to this |
|
304 self.current_operation.time=dreamtime.MonotonicTime.now() |
|
305 self.state=State.ACTIVE |
|
306 # Reset error status when starting a new operation |
|
307 self.errors=Errors.OK |
|
308 self.__update_status() |
|
309 |
|
310 t_log.start() |
|
311 t_res.start() |
|
312 |
|
313 |
|
314 def __launch(self, op): |
|
315 self.logger.debug("Launching '%s'" % str(op.operation)) |
|
316 |
|
317 params=(config.borg_parameters |
|
318 +self.repository.borg_parameters |
|
319 +self.borg_parameters) |
|
320 |
|
321 if op.operation==Operation.CREATE: |
|
322 archive="%s::%s%s" % (self.repository.location, |
|
323 self.archive_prefix, |
|
324 self.archive_template) |
|
325 |
|
326 self.__do_launch(op, archive, params.common, |
|
327 params.create, self.paths) |
|
328 elif op.operation==Operation.PRUNE: |
|
329 self.__do_launch(op, self.repository.location, params.common, |
|
330 [{'prefix': self.archive_prefix}] + params.create) |
|
331 |
|
332 else: |
|
333 raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) |
|
334 |
|
335 # This must be called with self._cond held. |
|
336 def __launch_and_wait(self): |
|
337 op=self.scheduled_operation |
|
338 if not op: |
|
339 self.logger.debug("Queued operation aborted") |
|
340 else: |
|
341 self.scheduled_operation=None |
|
342 |
|
343 self.__launch(op) |
|
344 |
|
345 self.__wait_finish() |
|
346 |
|
347 def __wait_finish(self): |
|
348 # Wait for main logger thread to terminate, or for us to be terminated |
|
349 while not self.terminate and self.thread_res.is_alive(): |
|
350 self._cond.release() |
|
351 self.thread_res.join(JOIN_TIMEOUT) |
|
352 self._cond.acquire() |
|
353 |
|
354 # If terminate has been signalled, let outer termination handler |
|
355 # take care of things (Within this Backup class, it would be cleanest |
|
356 # to raise an exception instead, but in most other places it's better |
|
357 # to just check self._terminate, so we don't complicate things with |
|
358 # an extra exception.) |
|
359 if self._terminate: |
|
360 return |
|
361 |
|
362 self.logger.debug('Waiting for borg and log subprocesses to terminate') |
|
363 |
|
364 self._cond.release() |
|
365 self.thread_log.join() |
|
366 self._cond.acquire() |
|
367 |
|
368 if not self.borg_instance.wait(): |
|
369 self.logger.error('Borg subprocess did not terminate') |
|
370 self.errors=self.errors.combine(Errors.ERRORS) |
|
371 |
|
372 if self.current_operation.operation=='create': |
|
373 self.lastrun_when=self.current_operation.time.monotonic() |
|
374 self.lastrun_finished=time.monotonic() |
|
375 self.thread_res=None |
|
376 self.thread_log=None |
|
377 self.borg_instance=None |
|
378 self.current_operation=None |
|
379 self.state=State.INACTIVE |
|
380 self.__update_status() |
|
381 |
|
382 def __main_thread(self): |
|
383 with self._cond: |
|
384 try: |
|
385 while not self._terminate: |
|
386 assert(not self.current_operation) |
|
387 self.__main_thread_wait_schedule() |
|
388 if not self._terminate: |
|
389 self.__main_thread_queue_and_launch() |
|
390 except Exception as err: |
|
391 self.logger.exception("Error with backup '%s'" % self.backup_name) |
|
392 self.errors=Errors.ERRORS |
|
393 |
|
394 self.state=State.INACTIVE |
|
395 self.scheduled_operation=None |
|
396 |
|
397 # Clean up to terminate: kill borg instance and communication threads |
|
398 if self.borg_instance: |
|
399 self.logger.debug("Terminating a borg instance") |
|
400 self.borg_instance.terminate() |
|
401 |
|
402 # Store threads to use outside lock |
|
403 thread_log=self.thread_log |
|
404 thread_res=self.thread_res |
|
405 self.thread_log=None |
|
406 self.thread_res=None |
|
407 |
|
408 self.logger.debug("Waiting for log and result threads to terminate") |
|
409 |
|
410 if thread_log: |
|
411 thread_log.join() |
|
412 |
|
413 if thread_res: |
|
414 thread_res.join() |
|
415 |
|
416 # Main thread/2. Schedule next operation if there is no manually |
|
417 # requested one |
|
418 def __main_thread_wait_schedule(self): |
|
419 op=None |
|
420 if not self.scheduled_operation: |
|
421 op=self.__next_operation_unlocked() |
|
422 if op: |
|
423 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" % |
|
424 (str(op.operation), op.detail or 'none', |
|
425 op.time.seconds_to(), |
|
426 op.time.__class__.__name__)) |
|
427 |
|
428 self.scheduled_operation=op |
|
429 self.state=State.SCHEDULED |
|
430 self.__update_status() |
|
431 |
|
432 # Wait under scheduled wait |
|
433 self.scheduler.wait_until(op.time, self._cond, self.backup_name) |
|
434 else: |
|
435 # Nothing scheduled - just wait |
|
436 self.logger.info("Waiting for manual scheduling") |
|
437 |
|
438 self.state=State.INACTIVE |
|
439 self.__update_status() |
|
440 |
|
441 self._cond.wait() |
|
442 |
|
443 # Main thread/3. If there is a scheduled operation (it might have been |
|
444 # changed manually from 'op' created in __main_thread_wait_schedule above), |
|
445 # queue it on the repository, and launch the operation once repository |
|
446 # available |
|
447 def __main_thread_queue_and_launch(self): |
|
448 if self.scheduled_operation: |
|
449 self.logger.debug("Queuing") |
|
450 self.state=State.QUEUED |
|
451 self.__update_status() |
|
452 res=self.repository.queue_action(self._cond, |
|
453 action=self.__launch_and_wait, |
|
454 name=self.backup_name) |
|
455 if not res and not self._terminate: |
|
456 self.logger.debug("Queueing aborted") |
|
457 self.scheduled_operation=None |
|
458 self.state=State.INACTIVE |
|
459 self.__update_status() |
|
460 |
|
461 def __next_operation_unlocked(self): |
|
462 # TODO: pruning as well |
|
463 if not self.lastrun_finished: |
|
464 initial_interval=self.retry_interval |
|
465 if initial_interval==0: |
|
466 initial_interval=self.backup_interval |
|
467 if initial_interval==0: |
|
468 return None |
|
469 else: |
|
470 tm=self.timeclass.after(initial_interval) |
|
471 return Operation(Operation.CREATE, tm, reason='initial') |
|
472 elif not self.errors.ok(): |
|
473 if self.retry_interval==0: |
|
474 return None |
|
475 else: |
|
476 tm=dreamtime.MonotonicTime(self.lastrun_finished+self.retry_interval) |
|
477 return Operation(Operation.CREATE, tm, reason='retry') |
|
478 else: |
|
479 if self.backup_interval==0: |
|
480 return None |
|
481 else: |
|
482 tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval) |
|
483 return Operation(Operation.CREATE, tm) |
|
484 |
|
485 def __status_unlocked(self): |
|
486 callback=self.__status_update_callback |
|
487 |
|
488 if self.current_operation: |
|
489 status=Status(self, self.current_operation) |
|
490 elif self.scheduled_operation: |
|
491 status=Status(self, self.scheduled_operation) |
|
492 else: |
|
493 status=Status(self) |
|
494 |
|
495 return status, callback |
|
496 |
|
497 def __update_status(self): |
|
498 status, callback = self.__status_unlocked() |
|
499 if callback: |
|
500 #self._cond.release() |
|
501 try: |
|
502 callback(status) |
|
503 except Exception: |
|
504 self.logger.exception("Status update error") |
|
505 #finally: |
|
506 # self._cond.acquire() |
|
507 |
|
508 # |
|
509 # Interface functions |
|
510 # |
|
511 |
|
512 def set_status_update_callback(self, callback): |
|
513 with self._cond: |
|
514 self.__status_update_callback=callback |
|
515 |
|
516 def status(self): |
|
517 with self._cond: |
|
518 res=self.__status_unlocked() |
|
519 return res[0] |
|
520 |
|
521 def create(self): |
|
522 op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual') |
|
523 with self._cond: |
|
524 self.scheduled_operation=op |
|
525 self._cond.notify() |
|
526 |
|
527 def prune(self): |
|
528 op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual') |
|
529 with self._cond: |
|
530 self.scheduled_operation=op |
|
531 self._cond.notify() |
|
532 |
|
533 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
|
534 def abort(self): |
|
535 with self._cond: |
|
536 if self.borg_instance: |
|
537 self.borg_instance.terminate() |
|
538 |