backup.py

changeset 80
a409242121d5
parent 79
b075b3db3044
child 81
7bcd715f19e3
equal deleted inserted replaced
79:b075b3db3044 80:a409242121d5
1 #
2 # Borgend Backup instance
3 #
4
5 import config
6 import logging
7 import time
8 import loggers
9 import repository
10 import dreamtime
11 from enum import IntEnum
12 from instance import BorgInstance
13 from threading import Thread, Lock, Condition
14 from scheduler import TerminableThread
15
16 logger=loggers.get(__name__)
17
18 JOIN_TIMEOUT=60
19
20 #
21 # State and operation related helper classes
22 #
23
24 class State(IntEnum):
25 # State
26 INACTIVE=0
27 SCHEDULED=1
28 QUEUED=2
29 ACTIVE=3
30
31
32 class Errors(IntEnum):
33 OK=0
34 BUSY=1
35 OFFLINE=2
36 ERRORS=3
37
38 def combine(self, other):
39 return max(self, other)
40
41 def ok(self):
42 return self==self.OK
43
44 def __str__(self):
45 return _errorstring[self]
46
47 _errorstring={
48 Errors.OK: 'ok',
49 Errors.BUSY: 'busy',
50 Errors.OFFLINE: 'offline',
51 Errors.ERRORS: 'errors'
52 }
53
54 class Operation:
55 CREATE='create'
56 PRUNE='prune'
57 def __init__(self, operation, time, **kwargs):
58 self.operation=operation
59 self.time=time
60 self.detail=kwargs
61
62 def when(self):
63 return self.time.realtime()
64
65
66 class Status(Operation):
67 def __init__(self, backup, op=None):
68 if op:
69 super().__init__(op.operation, op.time, **op.detail)
70 else:
71 super().__init__(None, None)
72
73 self.name=backup.name
74 self.state=backup.state
75 self.errors=backup.errors
76
77 #
78 # Miscellaneous helper routines
79 #
80
81 loglevel_translation={
82 'CRITICAL': logging.CRITICAL,
83 'ERROR': logging.ERROR,
84 'WARNING': logging.WARNING,
85 'DEBUG': logging.DEBUG,
86 'INFO': logging.INFO
87 }
88
89 def translate_loglevel(x):
90 if x in loglevel_translation:
91 return loglevel_translation[x]
92 else:
93 return logging.ERROR
94
95 def safe_get_int(t, x):
96 if x in t:
97 tmp=t[x]
98 if isinstance(tmp, int):
99 return tmp
100 return None
101
102 #
103 # The Backup class
104 #
105
106 class Backup(TerminableThread):
107
108 def __decode_config(self, cfg):
109 loc0='Backup %d' % self.identifier
110
111 self.backup_name=config.check_string(cfg, 'name', 'Name', loc0)
112
113 logger.debug("Configuring backup '%s'" % self.backup_name)
114
115 self.logger=logger.getChild(self.backup_name)
116
117 loc="Backup '%s'" % self.backup_name
118
119 reponame=config.check_string(cfg, 'repository',
120 'Target repository', loc)
121
122 self.repository=repository.find_repository(reponame)
123 if not self.repository:
124 raise Exception("Repository '%s' not configured" % reponame)
125
126 self.archive_prefix=config.check_string(cfg, 'archive_prefix',
127 'Archive prefix', loc)
128
129 self.archive_template=config.check_string(cfg, 'archive_template',
130 'Archive template', loc)
131
132 self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval',
133 'Backup interval', loc,
134 config.defaults['backup_interval'])
135
136 self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
137 'Retry interval', loc,
138 config.defaults['retry_interval'])
139
140
141 scheduling=config.check_string(cfg, 'scheduling',
142 'Scheduling mode', loc,
143 default="dreamtime")
144
145 if scheduling=="dreamtime":
146 self.timeclass=dreamtime.DreamTime
147 elif scheduling=="realtime":
148 self.timeclass=dreamtime.MonotonicTime
149 elif scheduling=="manual":
150 self.backup_interval=0
151 else:
152 logging.error("Invalid time class '%s' for %s" % (scheduling, loc))
153
154 self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc)
155
156 self.borg_parameters=config.BorgParameters.from_config(cfg, loc)
157
158
159 def __init__(self, identifier, cfg, scheduler):
160 self.identifier=identifier
161 self.__status_update_callback=None
162 self.scheduler=scheduler
163 self.logger=None # setup up in __decode_config once backup name is known
164
165 self.borg_instance=None
166 self.thread_log=None
167 self.thread_res=None
168
169 self.current_operation=None
170 self.scheduled_operation=None
171 self.lastrun_when=None
172 self.lastrun_finished=None
173 self.state=State.INACTIVE
174 self.errors=Errors.OK
175 self.timeclass=dreamtime.DreamTime
176
177 self.__decode_config(cfg)
178
179 super().__init__(target = self.__main_thread, name = self.backup_name)
180 self.daemon=True
181
182 def is_running(self):
183 with self._cond:
184 running=self.__is_running_unlocked()
185 return running
186
187 def __is_running_unlocked(self):
188 running=self.current_operation
189 if not running:
190 # Consistency check
191 assert((not self.borg_instance and not self.thread_log and
192 not self.thread_res))
193 return running
194
195 def __block_when_running(self):
196 running=self.is_running()
197 assert(not running)
198
199 def __log_listener(self):
200 self.logger.debug('Log listener thread waiting for entries')
201 success=True
202 for msg in iter(self.borg_instance.read_log, None):
203 self.logger.info(str(msg))
204 t=msg['type']
205
206 errormsg=None
207 callback=None
208
209 if t=='progress_percent':
210 current=safe_get_int(msg, 'current')
211 total=safe_get_int(msg, 'total')
212 if current is not None and total is not None:
213 with self._cond:
214 self.current_operation.detail['progress_current']=current
215 self.current_operation.detail['progress_total']=total
216 status, callback=self.__status_unlocked()
217
218 elif t=='archive_progress':
219 original_size=safe_get_int(msg, 'original_size')
220 compressed_size=safe_get_int(msg, 'compressed_size')
221 deduplicated_size=safe_get_int(msg, 'deduplicated_size')
222 if original_size is not None and original_size is not None and deduplicated_size is not None:
223 with self._cond:
224 self.current_operation.detail['original_size']=original_size
225 self.current_operation.detail['compressed_size']=compressed_size
226 self.current_operation.detail['deduplicated_size']=deduplicated_size
227 status, callback=self.__status_unlocked()
228
229 elif t=='progress_message':
230 pass
231
232 elif t=='file_status':
233 pass
234
235 elif t=='log_message':
236 if 'levelname' not in msg:
237 msg['levelname']='ERROR'
238 if 'message' not in msg:
239 msg['message']='UNKNOWN'
240 if 'name' not in msg:
241 msg['name']='borg'
242 lvl=translate_loglevel(msg['levelname'])
243 self.logger.log(lvl, msg['name'] + ': ' + msg['message'])
244 if lvl>=logging.ERROR:
245 errormsg=msg
246 errors=Errors.ERRORS
247 if ('msgid' in msg and
248 (msg['msgid']=='LockTimeout' or # observed in reality
249 msg['msgid']=='LockErrorT' or # in docs
250 msg['msgid']=='LockErrorT')): # in docs
251 errors=Errors.BUSY
252 with self._cond:
253 self.errors=self.errors.combine(errors)
254 status, callback=self.__status_unlocked()
255 else:
256 self.logger.debug('Unrecognised log entry %s' % str(status))
257
258 if callback:
259 callback(status, errors=errormsg)
260
261 self.logger.debug('Waiting for borg subprocess to terminate in log thread')
262
263 self.borg_instance.wait()
264
265 self.logger.debug('Borg subprocess terminated; terminating log listener thread')
266
267 def __result_listener(self):
268 self.logger.debug('Result listener thread waiting for result')
269
270 res=self.borg_instance.read_result()
271
272 self.logger.debug('Borg result: %s' % str(res))
273
274 with self._cond:
275 if res is None and self.errors.ok():
276 self.logger.error('No result from borg despite no error in log')
277 self.errors=Errors.ERRORS
278
279
280 def __do_launch(self, op, archive_or_repository,
281 common_params, op_params, paths=[]):
282
283 inst=BorgInstance(op.operation, archive_or_repository,
284 common_params, op_params, paths)
285
286 # Only the Repository object has access to the passphrase
287 self.repository.launch_borg_instance(inst)
288
289 self.logger.debug('Creating listener threads')
290
291 t_log=Thread(target=self.__log_listener)
292 t_log.daemon=True
293
294 t_res=Thread(target=self.__result_listener)
295 t_res.daemon=True
296
297 self.thread_log=t_log
298 self.thread_res=t_res
299 self.borg_instance=inst
300 self.current_operation=op
301 # Update scheduled time to real starting time to schedule
302 # next run relative to this
303 self.current_operation.time=dreamtime.MonotonicTime.now()
304 self.state=State.ACTIVE
305 # Reset error status when starting a new operation
306 self.errors=Errors.OK
307 self.__update_status()
308
309 t_log.start()
310 t_res.start()
311
312
313 def __launch(self, op):
314 self.logger.debug("Launching '%s'" % str(op.operation))
315
316 params=(config.borg_parameters
317 +self.repository.borg_parameters
318 +self.borg_parameters)
319
320 if op.operation==Operation.CREATE:
321 archive="%s::%s%s" % (self.repository.location,
322 self.archive_prefix,
323 self.archive_template)
324
325 self.__do_launch(op, archive, params.common,
326 params.create, self.paths)
327 elif op.operation==Operation.PRUNE:
328 self.__do_launch(op, self.repository.location, params.common,
329 [{'prefix': self.archive_prefix}] + params.create)
330
331 else:
332 raise NotImplementedError("Invalid operation '%s'" % str(op.operation))
333
334 # This must be called with self._cond held.
335 def __launch_and_wait(self):
336 op=self.scheduled_operation
337 if not op:
338 self.logger.debug("Queued operation aborted")
339 else:
340 self.scheduled_operation=None
341
342 self.__launch(op)
343
344 self.__wait_finish()
345
346 def __wait_finish(self):
347 # Wait for main logger thread to terminate, or for us to be terminated
348 while not self.terminate and self.thread_res.is_alive():
349 self._cond.release()
350 self.thread_res.join(JOIN_TIMEOUT)
351 self._cond.acquire()
352
353 # If terminate has been signalled, let outer termination handler
354 # take care of things (Within this Backup class, it would be cleanest
355 # to raise an exception instead, but in most other places it's better
356 # to just check self._terminate, so we don't complicate things with
357 # an extra exception.)
358 if self._terminate:
359 return
360
361 self.logger.debug('Waiting for borg and log subprocesses to terminate')
362
363 self._cond.release()
364 self.thread_log.join()
365 self._cond.acquire()
366
367 if not self.borg_instance.wait():
368 self.logger.error('Borg subprocess did not terminate')
369 self.errors=self.errors.combine(Errors.ERRORS)
370
371 if self.current_operation.operation=='create':
372 self.lastrun_when=self.current_operation.time.monotonic()
373 self.lastrun_finished=time.monotonic()
374 self.thread_res=None
375 self.thread_log=None
376 self.borg_instance=None
377 self.current_operation=None
378 self.state=State.INACTIVE
379 self.__update_status()
380
381 def __main_thread(self):
382 with self._cond:
383 try:
384 while not self._terminate:
385 assert(not self.current_operation)
386 self.__main_thread_wait_schedule()
387 if not self._terminate:
388 self.__main_thread_queue_and_launch()
389 except Exception as err:
390 self.logger.exception("Error with backup '%s'" % self.backup_name)
391 self.errors=Errors.ERRORS
392
393 self.state=State.INACTIVE
394 self.scheduled_operation=None
395
396 # Clean up to terminate: kill borg instance and communication threads
397 if self.borg_instance:
398 self.logger.debug("Terminating a borg instance")
399 self.borg_instance.terminate()
400
401 # Store threads to use outside lock
402 thread_log=self.thread_log
403 thread_res=self.thread_res
404 self.thread_log=None
405 self.thread_res=None
406
407 self.logger.debug("Waiting for log and result threads to terminate")
408
409 if thread_log:
410 thread_log.join()
411
412 if thread_res:
413 thread_res.join()
414
415 # Main thread/2. Schedule next operation if there is no manually
416 # requested one
417 def __main_thread_wait_schedule(self):
418 op=None
419 if not self.scheduled_operation:
420 op=self.__next_operation_unlocked()
421 if op:
422 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" %
423 (str(op.operation), op.detail or 'none',
424 op.time.seconds_to(),
425 op.time.__class__.__name__))
426
427 self.scheduled_operation=op
428 self.state=State.SCHEDULED
429 self.__update_status()
430
431 # Wait under scheduled wait
432 self.scheduler.wait_until(op.time, self._cond, self.backup_name)
433 else:
434 # Nothing scheduled - just wait
435 self.logger.info("Waiting for manual scheduling")
436
437 self.state=State.INACTIVE
438 self.__update_status()
439
440 self._cond.wait()
441
442 # Main thread/3. If there is a scheduled operation (it might have been
443 # changed manually from 'op' created in __main_thread_wait_schedule above),
444 # queue it on the repository, and launch the operation once repository
445 # available
446 def __main_thread_queue_and_launch(self):
447 if self.scheduled_operation:
448 self.logger.debug("Queuing")
449 self.state=State.QUEUED
450 self.__update_status()
451 res=self.repository.queue_action(self._cond,
452 action=self.__launch_and_wait,
453 name=self.backup_name)
454 if not res and not self._terminate:
455 self.logger.debug("Queueing aborted")
456 self.scheduled_operation=None
457 self.state=State.INACTIVE
458 self.__update_status()
459
460 def __next_operation_unlocked(self):
461 # TODO: pruning as well
462 if not self.lastrun_finished:
463 initial_interval=self.retry_interval
464 if initial_interval==0:
465 initial_interval=self.backup_interval
466 if initial_interval==0:
467 return None
468 else:
469 tm=self.timeclass.after(initial_interval)
470 return Operation(Operation.CREATE, tm, reason='initial')
471 elif not self.errors.ok():
472 if self.retry_interval==0:
473 return None
474 else:
475 tm=dreamtime.MonotonicTime(self.lastrun_finished+self.retry_interval)
476 return Operation(Operation.CREATE, tm, reason='retry')
477 else:
478 if self.backup_interval==0:
479 return None
480 else:
481 tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval)
482 return Operation(Operation.CREATE, tm)
483
484 def __status_unlocked(self):
485 callback=self.__status_update_callback
486
487 if self.current_operation:
488 status=Status(self, self.current_operation)
489 elif self.scheduled_operation:
490 status=Status(self, self.scheduled_operation)
491 else:
492 status=Status(self)
493
494 return status, callback
495
496 def __update_status(self):
497 status, callback = self.__status_unlocked()
498 if callback:
499 #self._cond.release()
500 try:
501 callback(status)
502 except Exception:
503 self.logger.exception("Status update error")
504 #finally:
505 # self._cond.acquire()
506
507 #
508 # Interface functions
509 #
510
511 def set_status_update_callback(self, callback):
512 with self._cond:
513 self.__status_update_callback=callback
514
515 def status(self):
516 with self._cond:
517 res=self.__status_unlocked()
518 return res[0]
519
520 def create(self):
521 op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual')
522 with self._cond:
523 self.scheduled_operation=op
524 self._cond.notify()
525
526 def prune(self):
527 op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual')
528 with self._cond:
529 self.scheduled_operation=op
530 self._cond.notify()
531
532 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
533 def abort(self):
534 with self._cond:
535 if self.borg_instance:
536 self.borg_instance.terminate()
537

mercurial