borgend/backup.py

changeset 80
a409242121d5
parent 79
b075b3db3044
child 85
56a000d15965
equal deleted inserted replaced
79:b075b3db3044 80:a409242121d5
1 #
2 # Borgend Backup instance
3 #
4
5 import logging
6 import time
7 from enum import IntEnum
8 from threading import Thread, Lock, Condition
9
10 from . import config
11 from . import loggers
12 from . import repository
13 from . import dreamtime
14 from .instance import BorgInstance
15 from .scheduler import TerminableThread
16
17 logger=loggers.get(__name__)
18
19 JOIN_TIMEOUT=60
20
21 #
22 # State and operation related helper classes
23 #
24
25 class State(IntEnum):
26 # State
27 INACTIVE=0
28 SCHEDULED=1
29 QUEUED=2
30 ACTIVE=3
31
32
33 class Errors(IntEnum):
34 OK=0
35 BUSY=1
36 OFFLINE=2
37 ERRORS=3
38
39 def combine(self, other):
40 return max(self, other)
41
42 def ok(self):
43 return self==self.OK
44
45 def __str__(self):
46 return _errorstring[self]
47
48 _errorstring={
49 Errors.OK: 'ok',
50 Errors.BUSY: 'busy',
51 Errors.OFFLINE: 'offline',
52 Errors.ERRORS: 'errors'
53 }
54
55 class Operation:
56 CREATE='create'
57 PRUNE='prune'
58 def __init__(self, operation, time, **kwargs):
59 self.operation=operation
60 self.time=time
61 self.detail=kwargs
62
63 def when(self):
64 return self.time.realtime()
65
66
67 class Status(Operation):
68 def __init__(self, backup, op=None):
69 if op:
70 super().__init__(op.operation, op.time, **op.detail)
71 else:
72 super().__init__(None, None)
73
74 self.name=backup.name
75 self.state=backup.state
76 self.errors=backup.errors
77
78 #
79 # Miscellaneous helper routines
80 #
81
82 loglevel_translation={
83 'CRITICAL': logging.CRITICAL,
84 'ERROR': logging.ERROR,
85 'WARNING': logging.WARNING,
86 'DEBUG': logging.DEBUG,
87 'INFO': logging.INFO
88 }
89
90 def translate_loglevel(x):
91 if x in loglevel_translation:
92 return loglevel_translation[x]
93 else:
94 return logging.ERROR
95
96 def safe_get_int(t, x):
97 if x in t:
98 tmp=t[x]
99 if isinstance(tmp, int):
100 return tmp
101 return None
102
103 #
104 # The Backup class
105 #
106
107 class Backup(TerminableThread):
108
109 def __decode_config(self, cfg):
110 loc0='Backup %d' % self.identifier
111
112 self.backup_name=config.check_string(cfg, 'name', 'Name', loc0)
113
114 logger.debug("Configuring backup '%s'" % self.backup_name)
115
116 self.logger=logger.getChild(self.backup_name)
117
118 loc="Backup '%s'" % self.backup_name
119
120 reponame=config.check_string(cfg, 'repository',
121 'Target repository', loc)
122
123 self.repository=repository.find_repository(reponame)
124 if not self.repository:
125 raise Exception("Repository '%s' not configured" % reponame)
126
127 self.archive_prefix=config.check_string(cfg, 'archive_prefix',
128 'Archive prefix', loc)
129
130 self.archive_template=config.check_string(cfg, 'archive_template',
131 'Archive template', loc)
132
133 self.backup_interval=config.check_nonneg_int(cfg, 'backup_interval',
134 'Backup interval', loc,
135 config.defaults['backup_interval'])
136
137 self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval',
138 'Retry interval', loc,
139 config.defaults['retry_interval'])
140
141
142 scheduling=config.check_string(cfg, 'scheduling',
143 'Scheduling mode', loc,
144 default="dreamtime")
145
146 if scheduling=="dreamtime":
147 self.timeclass=dreamtime.DreamTime
148 elif scheduling=="realtime":
149 self.timeclass=dreamtime.MonotonicTime
150 elif scheduling=="manual":
151 self.backup_interval=0
152 else:
153 logging.error("Invalid time class '%s' for %s" % (scheduling, loc))
154
155 self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc)
156
157 self.borg_parameters=config.BorgParameters.from_config(cfg, loc)
158
159
160 def __init__(self, identifier, cfg, scheduler):
161 self.identifier=identifier
162 self.__status_update_callback=None
163 self.scheduler=scheduler
164 self.logger=None # setup up in __decode_config once backup name is known
165
166 self.borg_instance=None
167 self.thread_log=None
168 self.thread_res=None
169
170 self.current_operation=None
171 self.scheduled_operation=None
172 self.lastrun_when=None
173 self.lastrun_finished=None
174 self.state=State.INACTIVE
175 self.errors=Errors.OK
176 self.timeclass=dreamtime.DreamTime
177
178 self.__decode_config(cfg)
179
180 super().__init__(target = self.__main_thread, name = self.backup_name)
181 self.daemon=True
182
183 def is_running(self):
184 with self._cond:
185 running=self.__is_running_unlocked()
186 return running
187
188 def __is_running_unlocked(self):
189 running=self.current_operation
190 if not running:
191 # Consistency check
192 assert((not self.borg_instance and not self.thread_log and
193 not self.thread_res))
194 return running
195
196 def __block_when_running(self):
197 running=self.is_running()
198 assert(not running)
199
200 def __log_listener(self):
201 self.logger.debug('Log listener thread waiting for entries')
202 success=True
203 for msg in iter(self.borg_instance.read_log, None):
204 self.logger.info(str(msg))
205 t=msg['type']
206
207 errormsg=None
208 callback=None
209
210 if t=='progress_percent':
211 current=safe_get_int(msg, 'current')
212 total=safe_get_int(msg, 'total')
213 if current is not None and total is not None:
214 with self._cond:
215 self.current_operation.detail['progress_current']=current
216 self.current_operation.detail['progress_total']=total
217 status, callback=self.__status_unlocked()
218
219 elif t=='archive_progress':
220 original_size=safe_get_int(msg, 'original_size')
221 compressed_size=safe_get_int(msg, 'compressed_size')
222 deduplicated_size=safe_get_int(msg, 'deduplicated_size')
223 if original_size is not None and original_size is not None and deduplicated_size is not None:
224 with self._cond:
225 self.current_operation.detail['original_size']=original_size
226 self.current_operation.detail['compressed_size']=compressed_size
227 self.current_operation.detail['deduplicated_size']=deduplicated_size
228 status, callback=self.__status_unlocked()
229
230 elif t=='progress_message':
231 pass
232
233 elif t=='file_status':
234 pass
235
236 elif t=='log_message':
237 if 'levelname' not in msg:
238 msg['levelname']='ERROR'
239 if 'message' not in msg:
240 msg['message']='UNKNOWN'
241 if 'name' not in msg:
242 msg['name']='borg'
243 lvl=translate_loglevel(msg['levelname'])
244 self.logger.log(lvl, msg['name'] + ': ' + msg['message'])
245 if lvl>=logging.ERROR:
246 errormsg=msg
247 errors=Errors.ERRORS
248 if ('msgid' in msg and
249 (msg['msgid']=='LockTimeout' or # observed in reality
250 msg['msgid']=='LockErrorT' or # in docs
251 msg['msgid']=='LockErrorT')): # in docs
252 errors=Errors.BUSY
253 with self._cond:
254 self.errors=self.errors.combine(errors)
255 status, callback=self.__status_unlocked()
256 else:
257 self.logger.debug('Unrecognised log entry %s' % str(status))
258
259 if callback:
260 callback(status, errors=errormsg)
261
262 self.logger.debug('Waiting for borg subprocess to terminate in log thread')
263
264 self.borg_instance.wait()
265
266 self.logger.debug('Borg subprocess terminated; terminating log listener thread')
267
268 def __result_listener(self):
269 self.logger.debug('Result listener thread waiting for result')
270
271 res=self.borg_instance.read_result()
272
273 self.logger.debug('Borg result: %s' % str(res))
274
275 with self._cond:
276 if res is None and self.errors.ok():
277 self.logger.error('No result from borg despite no error in log')
278 self.errors=Errors.ERRORS
279
280
281 def __do_launch(self, op, archive_or_repository,
282 common_params, op_params, paths=[]):
283
284 inst=BorgInstance(op.operation, archive_or_repository,
285 common_params, op_params, paths)
286
287 # Only the Repository object has access to the passphrase
288 self.repository.launch_borg_instance(inst)
289
290 self.logger.debug('Creating listener threads')
291
292 t_log=Thread(target=self.__log_listener)
293 t_log.daemon=True
294
295 t_res=Thread(target=self.__result_listener)
296 t_res.daemon=True
297
298 self.thread_log=t_log
299 self.thread_res=t_res
300 self.borg_instance=inst
301 self.current_operation=op
302 # Update scheduled time to real starting time to schedule
303 # next run relative to this
304 self.current_operation.time=dreamtime.MonotonicTime.now()
305 self.state=State.ACTIVE
306 # Reset error status when starting a new operation
307 self.errors=Errors.OK
308 self.__update_status()
309
310 t_log.start()
311 t_res.start()
312
313
314 def __launch(self, op):
315 self.logger.debug("Launching '%s'" % str(op.operation))
316
317 params=(config.borg_parameters
318 +self.repository.borg_parameters
319 +self.borg_parameters)
320
321 if op.operation==Operation.CREATE:
322 archive="%s::%s%s" % (self.repository.location,
323 self.archive_prefix,
324 self.archive_template)
325
326 self.__do_launch(op, archive, params.common,
327 params.create, self.paths)
328 elif op.operation==Operation.PRUNE:
329 self.__do_launch(op, self.repository.location, params.common,
330 [{'prefix': self.archive_prefix}] + params.create)
331
332 else:
333 raise NotImplementedError("Invalid operation '%s'" % str(op.operation))
334
335 # This must be called with self._cond held.
336 def __launch_and_wait(self):
337 op=self.scheduled_operation
338 if not op:
339 self.logger.debug("Queued operation aborted")
340 else:
341 self.scheduled_operation=None
342
343 self.__launch(op)
344
345 self.__wait_finish()
346
347 def __wait_finish(self):
348 # Wait for main logger thread to terminate, or for us to be terminated
349 while not self.terminate and self.thread_res.is_alive():
350 self._cond.release()
351 self.thread_res.join(JOIN_TIMEOUT)
352 self._cond.acquire()
353
354 # If terminate has been signalled, let outer termination handler
355 # take care of things (Within this Backup class, it would be cleanest
356 # to raise an exception instead, but in most other places it's better
357 # to just check self._terminate, so we don't complicate things with
358 # an extra exception.)
359 if self._terminate:
360 return
361
362 self.logger.debug('Waiting for borg and log subprocesses to terminate')
363
364 self._cond.release()
365 self.thread_log.join()
366 self._cond.acquire()
367
368 if not self.borg_instance.wait():
369 self.logger.error('Borg subprocess did not terminate')
370 self.errors=self.errors.combine(Errors.ERRORS)
371
372 if self.current_operation.operation=='create':
373 self.lastrun_when=self.current_operation.time.monotonic()
374 self.lastrun_finished=time.monotonic()
375 self.thread_res=None
376 self.thread_log=None
377 self.borg_instance=None
378 self.current_operation=None
379 self.state=State.INACTIVE
380 self.__update_status()
381
382 def __main_thread(self):
383 with self._cond:
384 try:
385 while not self._terminate:
386 assert(not self.current_operation)
387 self.__main_thread_wait_schedule()
388 if not self._terminate:
389 self.__main_thread_queue_and_launch()
390 except Exception as err:
391 self.logger.exception("Error with backup '%s'" % self.backup_name)
392 self.errors=Errors.ERRORS
393
394 self.state=State.INACTIVE
395 self.scheduled_operation=None
396
397 # Clean up to terminate: kill borg instance and communication threads
398 if self.borg_instance:
399 self.logger.debug("Terminating a borg instance")
400 self.borg_instance.terminate()
401
402 # Store threads to use outside lock
403 thread_log=self.thread_log
404 thread_res=self.thread_res
405 self.thread_log=None
406 self.thread_res=None
407
408 self.logger.debug("Waiting for log and result threads to terminate")
409
410 if thread_log:
411 thread_log.join()
412
413 if thread_res:
414 thread_res.join()
415
416 # Main thread/2. Schedule next operation if there is no manually
417 # requested one
418 def __main_thread_wait_schedule(self):
419 op=None
420 if not self.scheduled_operation:
421 op=self.__next_operation_unlocked()
422 if op:
423 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" %
424 (str(op.operation), op.detail or 'none',
425 op.time.seconds_to(),
426 op.time.__class__.__name__))
427
428 self.scheduled_operation=op
429 self.state=State.SCHEDULED
430 self.__update_status()
431
432 # Wait under scheduled wait
433 self.scheduler.wait_until(op.time, self._cond, self.backup_name)
434 else:
435 # Nothing scheduled - just wait
436 self.logger.info("Waiting for manual scheduling")
437
438 self.state=State.INACTIVE
439 self.__update_status()
440
441 self._cond.wait()
442
443 # Main thread/3. If there is a scheduled operation (it might have been
444 # changed manually from 'op' created in __main_thread_wait_schedule above),
445 # queue it on the repository, and launch the operation once repository
446 # available
447 def __main_thread_queue_and_launch(self):
448 if self.scheduled_operation:
449 self.logger.debug("Queuing")
450 self.state=State.QUEUED
451 self.__update_status()
452 res=self.repository.queue_action(self._cond,
453 action=self.__launch_and_wait,
454 name=self.backup_name)
455 if not res and not self._terminate:
456 self.logger.debug("Queueing aborted")
457 self.scheduled_operation=None
458 self.state=State.INACTIVE
459 self.__update_status()
460
461 def __next_operation_unlocked(self):
462 # TODO: pruning as well
463 if not self.lastrun_finished:
464 initial_interval=self.retry_interval
465 if initial_interval==0:
466 initial_interval=self.backup_interval
467 if initial_interval==0:
468 return None
469 else:
470 tm=self.timeclass.after(initial_interval)
471 return Operation(Operation.CREATE, tm, reason='initial')
472 elif not self.errors.ok():
473 if self.retry_interval==0:
474 return None
475 else:
476 tm=dreamtime.MonotonicTime(self.lastrun_finished+self.retry_interval)
477 return Operation(Operation.CREATE, tm, reason='retry')
478 else:
479 if self.backup_interval==0:
480 return None
481 else:
482 tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval)
483 return Operation(Operation.CREATE, tm)
484
485 def __status_unlocked(self):
486 callback=self.__status_update_callback
487
488 if self.current_operation:
489 status=Status(self, self.current_operation)
490 elif self.scheduled_operation:
491 status=Status(self, self.scheduled_operation)
492 else:
493 status=Status(self)
494
495 return status, callback
496
497 def __update_status(self):
498 status, callback = self.__status_unlocked()
499 if callback:
500 #self._cond.release()
501 try:
502 callback(status)
503 except Exception:
504 self.logger.exception("Status update error")
505 #finally:
506 # self._cond.acquire()
507
508 #
509 # Interface functions
510 #
511
512 def set_status_update_callback(self, callback):
513 with self._cond:
514 self.__status_update_callback=callback
515
516 def status(self):
517 with self._cond:
518 res=self.__status_unlocked()
519 return res[0]
520
521 def create(self):
522 op=Operation(Operation.CREATE, dreamtime.MonotonicTime.now(), reason='manual')
523 with self._cond:
524 self.scheduled_operation=op
525 self._cond.notify()
526
527 def prune(self):
528 op=Operation(Operation.PRUNE, dreamtime.MonotonicTime.now(), reason='manual')
529 with self._cond:
530 self.scheduled_operation=op
531 self._cond.notify()
532
533 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
534 def abort(self):
535 with self._cond:
536 if self.borg_instance:
537 self.borg_instance.terminate()
538

mercurial