backup.py

changeset 55
407af23d16bb
parent 54
cfcaa5f6ba33
child 58
170d69da51bb
equal deleted inserted replaced
54:cfcaa5f6ba33 55:407af23d16bb
15 logger=borgend.logger.getChild(__name__) 15 logger=borgend.logger.getChild(__name__)
16 16
17 # State 17 # State
18 INACTIVE=0 18 INACTIVE=0
19 SCHEDULED=1 19 SCHEDULED=1
20 ACTIVE=2 20 QUEUED=2
21 BUSY=3 21 ACTIVE=3
22 OFFLINE=4 22 BUSY=4
23 ERRORS=5 23 OFFLINE=5
24 ERRORS=6
24 25
25 def combine_state(state1, state2): 26 def combine_state(state1, state2):
26 return max(state1, state2) 27 return max(state1, state2)
27 28
28 loglevel_translation={ 29 loglevel_translation={
52 def __decode_config(self, cfg): 53 def __decode_config(self, cfg):
53 loc0='backup target %d' % self.identifier 54 loc0='backup target %d' % self.identifier
54 55
55 self._name=config.check_string(cfg, 'name', 'Name', loc0) 56 self._name=config.check_string(cfg, 'name', 'Name', loc0)
56 57
58 self.logger=logger.getChild(self._name)
59
57 self.loc='backup target "%s"' % self._name 60 self.loc='backup target "%s"' % self._name
58 61
59 reponame=config.check_string(cfg, 'repository', 62 reponame=config.check_string(cfg, 'repository',
60 'Target repository', self.loc) 63 'Target repository', self.loc)
61 64
103 106
104 def extract_passphrase(self): 107 def extract_passphrase(self):
105 acc=self.__keychain_account 108 acc=self.__keychain_account
106 if not self.__passphrase: 109 if not self.__passphrase:
107 if acc and acc!='': 110 if acc and acc!='':
108 logger.debug('Requesting passphrase') 111 self.logger.debug('Requesting passphrase')
109 try: 112 try:
110 pw=keyring.get_password("borg-backup", acc) 113 pw=keyring.get_password("borg-backup", acc)
111 except Exception as err: 114 except Exception as err:
112 logger.error('Failed to retrieve passphrase') 115 self.logger.error('Failed to retrieve passphrase')
113 raise err 116 raise err
114 else: 117 else:
115 logger.debug('Received passphrase') 118 self.logger.debug('Received passphrase')
116 self.__passphrase=pw 119 self.__passphrase=pw
117 else: 120 else:
118 self.__passphrase=None 121 self.__passphrase=None
119 return self.__passphrase 122 return self.__passphrase
120 123
121 def __init__(self, identifier, cfg, scheduler): 124 def __init__(self, identifier, cfg, scheduler):
122 self.identifier=identifier 125 self.identifier=identifier
123 self.config=config 126 self.config=config
124 self.lastrun_when=None 127 self.__status_update_callback=None
128 self.scheduler=scheduler
129 self.logger=None # setup up __decode_config once backup name is known
130
125 self.borg_instance=None 131 self.borg_instance=None
126 self.current_operation=None
127 self.thread_log=None 132 self.thread_log=None
128 self.thread_res=None 133 self.thread_res=None
134
135 self.current_operation=None
129 self.scheduled_operation=None 136 self.scheduled_operation=None
130 self.__status_update_callback=None 137 self.lastrun_when=None
131 self.state=INACTIVE 138 self.state=INACTIVE
132 self.scheduler=scheduler
133 139
134 self.__decode_config(cfg) 140 self.__decode_config(cfg)
135 141
136 super().__init__(target = self.__main_thread, name = self._name) 142 super().__init__(target = self.__main_thread, name = self._name)
137 self.daemon=True 143 self.daemon=True
152 def __block_when_running(self): 158 def __block_when_running(self):
153 running=self.is_running() 159 running=self.is_running()
154 assert(not running) 160 assert(not running)
155 161
156 def __log_listener(self): 162 def __log_listener(self):
157 logger.debug('Log listener thread waiting for entries') 163 self.logger.debug('Log listener thread waiting for entries')
158 success=True 164 success=True
159 for status in iter(self.borg_instance.read_log, None): 165 for status in iter(self.borg_instance.read_log, None):
160 logger.debug(str(status)) 166 self.logger.debug(str(status))
161 t=status['type'] 167 t=status['type']
162 168
163 errors_this_message=None 169 errors_this_message=None
164 callback=None 170 callback=None
165 171
195 if 'message' not in status: 201 if 'message' not in status:
196 status['message']='UNKNOWN' 202 status['message']='UNKNOWN'
197 if 'name' not in status: 203 if 'name' not in status:
198 status['name']='borg' 204 status['name']='borg'
199 lvl=translate_loglevel(status['levelname']) 205 lvl=translate_loglevel(status['levelname'])
200 logger.log(lvl, status['name'] + ': ' + status['message']) 206 self.logger.log(lvl, status['name'] + ': ' + status['message'])
201 if lvl>=logging.WARNING: 207 if lvl>=logging.WARNING:
202 errors_this_message=status 208 errors_this_message=status
203 state=ERRORS 209 state=ERRORS
204 if ('msgid' in status and 210 if ('msgid' in status and
205 (status['msgid']=='LockTimeout' or # observed in reality 211 (status['msgid']=='LockTimeout' or # observed in reality
208 state=BUSY 214 state=BUSY
209 with self._cond: 215 with self._cond:
210 self.state=combine_state(self.state, state) 216 self.state=combine_state(self.state, state)
211 status, callback=self.__status_unlocked() 217 status, callback=self.__status_unlocked()
212 else: 218 else:
213 logger.debug('Unrecognised log entry %s' % str(status)) 219 self.logger.debug('Unrecognised log entry %s' % str(status))
214 220
215 if callback: 221 if callback:
216 callback(self, status, errors=errors_this_message) 222 callback(status, errors=errors_this_message)
217 223
218 logger.debug('Waiting for borg subprocess to terminate in log thread') 224 self.logger.debug('Waiting for borg subprocess to terminate in log thread')
219 225
220 self.borg_instance.wait() 226 self.borg_instance.wait()
221 227
222 logger.debug('Borg subprocess terminated; terminating log listener thread') 228 self.logger.debug('Borg subprocess terminated; terminating log listener thread')
223 229
224 def __result_listener(self): 230 def __result_listener(self):
225 with self._cond: 231 # self.state=ACTIVE
226 status, callback=self.__status_unlocked() 232 # with self._cond:
227 if callback: 233 # status, callback=self.__status_unlocked()
228 callback(self, status) 234 # if callback:
229 235 # callback(status)
230 logger.debug('Result listener thread waiting for result') 236
237 self.logger.debug('Result listener thread waiting for result')
231 238
232 res=self.borg_instance.read_result() 239 res=self.borg_instance.read_result()
233 240
234 # Finish processing remaining errors 241 # Finish processing remaining errors
235 self.thread_log.join() 242 self.thread_log.join()
239 246
240 # If there were no errors, reset back to INACTIVE state 247 # If there were no errors, reset back to INACTIVE state
241 if state==ACTIVE: 248 if state==ACTIVE:
242 state=INACTIVE 249 state=INACTIVE
243 250
244 logger.debug('Borg result: %s' % str(res)) 251 self.logger.debug('Borg result: %s' % str(res))
245 252
246 if res is None and state==INACTIVE: 253 if res is None and state==INACTIVE:
247 logger.error('No result from borg despite no error in log') 254 self.logger.error('No result from borg despite no error in log')
248 state=ERRORS 255 state=ERRORS
249 256
250 logger.debug('Waiting for borg subprocess to terminate in result thread') 257 self.logger.debug('Waiting for borg subprocess to terminate in result thread')
251 258
252 if not self.borg_instance.wait(): 259 if not self.borg_instance.wait():
253 logger.critical('Borg subprocess did not terminate') 260 self.logger.error('Borg subprocess did not terminate')
254 state=combine_state(state, ERRORS) 261 state=combine_state(state, ERRORS)
255 262
256 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) 263 self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state))
257 264
258 with self._cond: 265 with self._cond:
259 if self.current_operation['operation']=='create': 266 if self.current_operation['operation']=='create':
260 self.lastrun_when=self.current_operation['when_monotonic'] 267 self.lastrun_when=self.current_operation['when_monotonic']
261 self.thread_res=None 268 self.thread_res=None
262 self.thread_log=None 269 self.thread_log=None
263 self.borg_instance=None 270 self.borg_instance=None
264 self.current_operation=None 271 self.current_operation=None
265 self.state=state 272 self.state=state
273 self.__update_status()
266 self._cond.notify() 274 self._cond.notify()
267 275
268 def __do_launch(self, op, archive_or_repository, *args): 276 def __do_launch(self, op, archive_or_repository, *args):
269 passphrase=self.extract_passphrase() 277 passphrase=self.extract_passphrase()
270 278
271 inst=BorgInstance(op['operation'], archive_or_repository, *args) 279 inst=BorgInstance(op['operation'], archive_or_repository, *args)
272 inst.launch(passphrase=passphrase) 280 inst.launch(passphrase=passphrase)
273 281
274 logger.debug('Creating listener threads') 282 self.logger.debug('Creating listener threads')
275 283
276 t_log=Thread(target=self.__log_listener) 284 t_log=Thread(target=self.__log_listener)
277 t_log.daemon=True 285 t_log.daemon=True
278 286
279 t_res=Thread(target=self.__result_listener) 287 t_res=Thread(target=self.__result_listener)
280 t_res.daemon=True 288 t_res.daemon=True
281 289
282 self.thread_log=t_log 290 self.thread_log=t_log
283 self.thread_res=t_res 291 self.thread_res=t_res
284 self.borg_instance=inst 292 self.borg_instance=inst
285 self.current_operation=op
286 self.current_operation['when_monotonic']=time.monotonic()
287 self.state=ACTIVE
288 293
289 t_log.start() 294 t_log.start()
290 t_res.start() 295 t_res.start()
291 296
292 def __launch(self, op): 297 def __launch(self, op):
293 if self.__is_running_unlocked(): 298 self.logger.debug("Launching '%s'" % op['operation'])
294 logging.info('Cannot start %s: already running %s' 299
295 % (operation, self.current_operation)) 300 if op['operation']=='create':
296 return False 301 archive="%s::%s%s" % (self.repository.repository_name,
302 self.archive_prefix,
303 self.archive_template)
304
305 self.__do_launch(op, archive,
306 self.common_parameters+self.create_parameters,
307 self.paths)
308 elif op['operation']=='prune':
309 self.__do_launch(op, self.repository.repository_name,
310 ([{'prefix': self.archive_prefix}] +
311 self.common_parameters +
312 self.prune_parameters))
297 else: 313 else:
314 raise NotImplementedError("Invalid operation '%s'" % op['operation'])
315
316 def __launch_check(self):
317 op=self.scheduled_operation
318 if not op:
319 self.logger.debug("Queued operation aborted")
320 else:
321 self.scheduled_operation=None
322 self.current_operation=op
323 self.current_operation['when_monotonic']=time.monotonic()
324
325 self.__launch(op)
326
327 self.state=ACTIVE
328 self.__update_status()
329
330
331 def __main_thread(self):
332 with self._cond:
298 try: 333 try:
299 logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) 334 while not self._terminate:
300 335 self.__main_thread_wait_finish()
301 if op['operation']=='create': 336 if not self._terminate:
302 archive="%s::%s%s" % (self.repository.repository_name, 337 self.__main_thread_wait_schedule()
303 self.archive_prefix, 338 if not self._terminate:
304 self.archive_template) 339 self.__main_thread_queue_and_launch()
305
306 self.__do_launch(op, archive,
307 self.common_parameters+self.create_parameters,
308 self.paths)
309 elif op['operation']=='prune':
310 self.__do_launch(op, self.repository.repository_name,
311 ([{'prefix': self.archive_prefix}] +
312 self.common_parameters +
313 self.prune_parameters))
314 else:
315 raise NotImplementedError("Invalid operation '%s'" % op['operation'])
316 except Exception as err: 340 except Exception as err:
317 logger.debug('Rescheduling after failure') 341 self.logger.exception("Error with backup '%s'" % self._name)
318 self.lastrun_when=time.monotonic() 342 self.lastrun_when=time.monotonic()
319 self.state=ERRORS 343 self.state=ERRORS
320 raise err 344 self.scheduled_operation=None
321 345
322 return True 346 # Clean up to terminate: kill borg instance and communication threads
323 347 if self.borg_instance:
324 def create(self): 348 self.logger.debug("Terminating a borg instance")
325 op={'operation': 'create', 'detail': 'manual'} 349 self.borg_instance.terminate()
326 with self._cond: 350
351 # Store threads to use outside lock
352 thread_log=self.thread_log
353 thread_res=self.thread_res
354
355 self.logger.debug("Waiting for log and result threads to terminate")
356
357 if thread_log:
358 thread_log.join()
359
360 if thread_res:
361 thread_res.join()
362
363
364 # Main thread/1. Wait while a current operation is running
365 def __main_thread_wait_finish(self):
366 while self.current_operation and not self._terminate:
367 self.logger.debug("Waiting for current operation to finish")
368 self._cond.wait()
369
370 # Main thread/2. Schedule next operation if there is no manually
371 # requested one
372 def __main_thread_wait_schedule(self):
373 op=None
374 if not self.scheduled_operation:
375 op=self.__next_operation_unlocked()
376 if op:
377 now=time.monotonic()
378 delay=max(0, op['when_monotonic']-now)
379 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" %
380 (op['operation'], op['detail'], delay))
381
327 self.scheduled_operation=op 382 self.scheduled_operation=op
328 self._cond.notify() 383 self.state=combine_state(self.state, SCHEDULED)
329 384 self.__update_status()
330 def prune(self): 385
331 op={'operation': 'prune', 'detail': 'manual'} 386 # Wait under scheduled wait
332 with self._cond: 387 self.scheduler.wait_until(now+delay, self._cond, self._name)
333 self.scheduled_operation=op 388 else:
334 self._cond.notify() 389 # Nothing scheduled - just wait
335 390 self.logger.debug("Waiting for manual scheduling")
336 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages 391 self._cond.wait()
337 def abort(self): 392
338 with self._cond: 393 # Main thread/3. If there is a scheduled operation (it might have been
339 if self.borg_instance: 394 # changed manually from 'op' created in __main_thread_wait_schedule above),
340 self.borg_instance.terminate() 395 # queue it on the repository, and launch the operation once repository
396 # available
397 def __main_thread_queue_and_launch(self):
398 if self.scheduled_operation:
399 self.logger.debug("Queuing")
400 self.state=combine_state(self.state, QUEUED)
401 self.__update_status()
402 self.repository.queue_action(self._cond,
403 action=self.__launch_check,
404 name=self._name)
341 405
342 def __next_operation_unlocked(self): 406 def __next_operation_unlocked(self):
343 # TODO: pruning as well 407 # TODO: pruning as well
344 now=time.monotonic() 408 now=time.monotonic()
345 if not self.lastrun_when: 409 if not self.lastrun_when:
365 else: 429 else:
366 return {'operation': 'create', 430 return {'operation': 'create',
367 'detail': 'normal', 431 'detail': 'normal',
368 'when_monotonic': self.lastrun_when+self.backup_interval} 432 'when_monotonic': self.lastrun_when+self.backup_interval}
369 433
370 def __main_thread(self):
371 with self._cond:
372 while not self._terminate:
373 op=None
374 if not self.current_operation:
375 op=self.__next_operation_unlocked()
376 if not op:
377 self.__update_status()
378 self._cond.wait()
379 else:
380 now=time.monotonic()
381 delay=max(0, op['when_monotonic']-now)
382 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" %
383 (op['operation'], op['detail'], self._name, delay))
384
385 self.scheduled_operation=op
386 self.state=combine_state(self.state, SCHEDULED)
387
388 self.__update_status()
389 self.scheduler.wait_until(now+delay, self._cond, self._name)
390
391 if self.scheduled_operation:
392 op=self.scheduled_operation
393 self.scheduled_operation=None
394 self.repository.queue_action(self._cond, name=self._name,
395 action=lambda: self.__launch(op))
396 # Kill a running borg to cause log and result threads to terminate
397 if self.borg_instance:
398 logger.debug("Terminating a borg instance")
399 self.borg_instance.terminate()
400
401 # Store threads to use outside lock
402 thread_log=self.thread_log
403 thread_err=self.thread_err
404
405 logger.debug("Waiting for log and result threads to terminate")
406
407 if thread_log:
408 thread_log.join()
409
410 if thread_res:
411 thread_res.join()
412
413
414 def __status_unlocked(self): 434 def __status_unlocked(self):
415 callback=self.__status_update_callback 435 callback=self.__status_update_callback
416 436
417 if self.current_operation: 437 if self.current_operation:
418 status=self.current_operation 438 status=self.current_operation
419 status['type']='current' 439 status['type']='current'
420 # Errors should be set by listeners 440 # Errors should be set by listeners
421 else: 441 else:
422 if self.scheduled_operation: 442 if self.scheduled_operation:
423 status=self.scheduled_operation 443 status=self.scheduled_operation
424 status['type']='scheduled' 444 if self.state==QUEUED:
445 status['type']='queued'
446 else:
447 status['type']='scheduled'
425 else: 448 else:
426 status={'type': 'nothing'} 449 status={'type': 'nothing'}
427 450
428 status['name']=self._name 451 status['name']=self._name
429 status['state']=self.state 452 status['state']=self.state
438 return status, callback 461 return status, callback
439 462
440 def __update_status(self): 463 def __update_status(self):
441 status, callback = self.__status_unlocked() 464 status, callback = self.__status_unlocked()
442 if callback: 465 if callback:
443 self._cond.release() 466 #self._cond.release()
444 try: 467 try:
445 callback(self, status) 468 callback(status)
446 finally: 469 except Exception:
447 self._cond.acquire() 470 self.logger.exception("Status update error")
471 #finally:
472 # self._cond.acquire()
473
474 #
475 # Interface functions
476 #
448 477
449 def set_status_update_callback(self, callback): 478 def set_status_update_callback(self, callback):
450 with self._cond: 479 with self._cond:
451 self.__status_update_callback=callback 480 self.__status_update_callback=callback
452 481
453 def status(self): 482 def status(self):
454 with self._cond: 483 with self._cond:
455 res=self.__status_unlocked() 484 res=self.__status_unlocked()
456 return res[0] 485 return res[0]
457 486
487 def create(self):
488 op={'operation': 'create', 'detail': 'manual'}
489 with self._cond:
490 self.scheduled_operation=op
491 self._cond.notify()
492
493 def prune(self):
494 op={'operation': 'prune', 'detail': 'manual'}
495 with self._cond:
496 self.scheduled_operation=op
497 self._cond.notify()
498
499 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
500 def abort(self):
501 with self._cond:
502 if self.borg_instance:
503 self.borg_instance.terminate()
504

mercurial