| 103 |
106 |
| 104 def extract_passphrase(self): |
107 def extract_passphrase(self): |
| 105 acc=self.__keychain_account |
108 acc=self.__keychain_account |
| 106 if not self.__passphrase: |
109 if not self.__passphrase: |
| 107 if acc and acc!='': |
110 if acc and acc!='': |
| 108 logger.debug('Requesting passphrase') |
111 self.logger.debug('Requesting passphrase') |
| 109 try: |
112 try: |
| 110 pw=keyring.get_password("borg-backup", acc) |
113 pw=keyring.get_password("borg-backup", acc) |
| 111 except Exception as err: |
114 except Exception as err: |
| 112 logger.error('Failed to retrieve passphrase') |
115 self.logger.error('Failed to retrieve passphrase') |
| 113 raise err |
116 raise err |
| 114 else: |
117 else: |
| 115 logger.debug('Received passphrase') |
118 self.logger.debug('Received passphrase') |
| 116 self.__passphrase=pw |
119 self.__passphrase=pw |
| 117 else: |
120 else: |
| 118 self.__passphrase=None |
121 self.__passphrase=None |
| 119 return self.__passphrase |
122 return self.__passphrase |
| 120 |
123 |
| 121 def __init__(self, identifier, cfg, scheduler): |
124 def __init__(self, identifier, cfg, scheduler): |
| 122 self.identifier=identifier |
125 self.identifier=identifier |
| 123 self.config=config |
126 self.config=config |
| 124 self.lastrun_when=None |
127 self.__status_update_callback=None |
| |
128 self.scheduler=scheduler |
| |
129 self.logger=None # setup up __decode_config once backup name is known |
| |
130 |
| 125 self.borg_instance=None |
131 self.borg_instance=None |
| 126 self.current_operation=None |
|
| 127 self.thread_log=None |
132 self.thread_log=None |
| 128 self.thread_res=None |
133 self.thread_res=None |
| |
134 |
| |
135 self.current_operation=None |
| 129 self.scheduled_operation=None |
136 self.scheduled_operation=None |
| 130 self.__status_update_callback=None |
137 self.lastrun_when=None |
| 131 self.state=INACTIVE |
138 self.state=INACTIVE |
| 132 self.scheduler=scheduler |
|
| 133 |
139 |
| 134 self.__decode_config(cfg) |
140 self.__decode_config(cfg) |
| 135 |
141 |
| 136 super().__init__(target = self.__main_thread, name = self._name) |
142 super().__init__(target = self.__main_thread, name = self._name) |
| 137 self.daemon=True |
143 self.daemon=True |
| 239 |
246 |
| 240 # If there were no errors, reset back to INACTIVE state |
247 # If there were no errors, reset back to INACTIVE state |
| 241 if state==ACTIVE: |
248 if state==ACTIVE: |
| 242 state=INACTIVE |
249 state=INACTIVE |
| 243 |
250 |
| 244 logger.debug('Borg result: %s' % str(res)) |
251 self.logger.debug('Borg result: %s' % str(res)) |
| 245 |
252 |
| 246 if res is None and state==INACTIVE: |
253 if res is None and state==INACTIVE: |
| 247 logger.error('No result from borg despite no error in log') |
254 self.logger.error('No result from borg despite no error in log') |
| 248 state=ERRORS |
255 state=ERRORS |
| 249 |
256 |
| 250 logger.debug('Waiting for borg subprocess to terminate in result thread') |
257 self.logger.debug('Waiting for borg subprocess to terminate in result thread') |
| 251 |
258 |
| 252 if not self.borg_instance.wait(): |
259 if not self.borg_instance.wait(): |
| 253 logger.critical('Borg subprocess did not terminate') |
260 self.logger.error('Borg subprocess did not terminate') |
| 254 state=combine_state(state, ERRORS) |
261 state=combine_state(state, ERRORS) |
| 255 |
262 |
| 256 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
263 self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
| 257 |
264 |
| 258 with self._cond: |
265 with self._cond: |
| 259 if self.current_operation['operation']=='create': |
266 if self.current_operation['operation']=='create': |
| 260 self.lastrun_when=self.current_operation['when_monotonic'] |
267 self.lastrun_when=self.current_operation['when_monotonic'] |
| 261 self.thread_res=None |
268 self.thread_res=None |
| 262 self.thread_log=None |
269 self.thread_log=None |
| 263 self.borg_instance=None |
270 self.borg_instance=None |
| 264 self.current_operation=None |
271 self.current_operation=None |
| 265 self.state=state |
272 self.state=state |
| |
273 self.__update_status() |
| 266 self._cond.notify() |
274 self._cond.notify() |
| 267 |
275 |
| 268 def __do_launch(self, op, archive_or_repository, *args): |
276 def __do_launch(self, op, archive_or_repository, *args): |
| 269 passphrase=self.extract_passphrase() |
277 passphrase=self.extract_passphrase() |
| 270 |
278 |
| 271 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
279 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
| 272 inst.launch(passphrase=passphrase) |
280 inst.launch(passphrase=passphrase) |
| 273 |
281 |
| 274 logger.debug('Creating listener threads') |
282 self.logger.debug('Creating listener threads') |
| 275 |
283 |
| 276 t_log=Thread(target=self.__log_listener) |
284 t_log=Thread(target=self.__log_listener) |
| 277 t_log.daemon=True |
285 t_log.daemon=True |
| 278 |
286 |
| 279 t_res=Thread(target=self.__result_listener) |
287 t_res=Thread(target=self.__result_listener) |
| 280 t_res.daemon=True |
288 t_res.daemon=True |
| 281 |
289 |
| 282 self.thread_log=t_log |
290 self.thread_log=t_log |
| 283 self.thread_res=t_res |
291 self.thread_res=t_res |
| 284 self.borg_instance=inst |
292 self.borg_instance=inst |
| 285 self.current_operation=op |
|
| 286 self.current_operation['when_monotonic']=time.monotonic() |
|
| 287 self.state=ACTIVE |
|
| 288 |
293 |
| 289 t_log.start() |
294 t_log.start() |
| 290 t_res.start() |
295 t_res.start() |
| 291 |
296 |
| 292 def __launch(self, op): |
297 def __launch(self, op): |
| 293 if self.__is_running_unlocked(): |
298 self.logger.debug("Launching '%s'" % op['operation']) |
| 294 logging.info('Cannot start %s: already running %s' |
299 |
| 295 % (operation, self.current_operation)) |
300 if op['operation']=='create': |
| 296 return False |
301 archive="%s::%s%s" % (self.repository.repository_name, |
| |
302 self.archive_prefix, |
| |
303 self.archive_template) |
| |
304 |
| |
305 self.__do_launch(op, archive, |
| |
306 self.common_parameters+self.create_parameters, |
| |
307 self.paths) |
| |
308 elif op['operation']=='prune': |
| |
309 self.__do_launch(op, self.repository.repository_name, |
| |
310 ([{'prefix': self.archive_prefix}] + |
| |
311 self.common_parameters + |
| |
312 self.prune_parameters)) |
| 297 else: |
313 else: |
| |
314 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
| |
315 |
| |
316 def __launch_check(self): |
| |
317 op=self.scheduled_operation |
| |
318 if not op: |
| |
319 self.logger.debug("Queued operation aborted") |
| |
320 else: |
| |
321 self.scheduled_operation=None |
| |
322 self.current_operation=op |
| |
323 self.current_operation['when_monotonic']=time.monotonic() |
| |
324 |
| |
325 self.__launch(op) |
| |
326 |
| |
327 self.state=ACTIVE |
| |
328 self.__update_status() |
| |
329 |
| |
330 |
| |
331 def __main_thread(self): |
| |
332 with self._cond: |
| 298 try: |
333 try: |
| 299 logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) |
334 while not self._terminate: |
| 300 |
335 self.__main_thread_wait_finish() |
| 301 if op['operation']=='create': |
336 if not self._terminate: |
| 302 archive="%s::%s%s" % (self.repository.repository_name, |
337 self.__main_thread_wait_schedule() |
| 303 self.archive_prefix, |
338 if not self._terminate: |
| 304 self.archive_template) |
339 self.__main_thread_queue_and_launch() |
| 305 |
|
| 306 self.__do_launch(op, archive, |
|
| 307 self.common_parameters+self.create_parameters, |
|
| 308 self.paths) |
|
| 309 elif op['operation']=='prune': |
|
| 310 self.__do_launch(op, self.repository.repository_name, |
|
| 311 ([{'prefix': self.archive_prefix}] + |
|
| 312 self.common_parameters + |
|
| 313 self.prune_parameters)) |
|
| 314 else: |
|
| 315 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
|
| 316 except Exception as err: |
340 except Exception as err: |
| 317 logger.debug('Rescheduling after failure') |
341 self.logger.exception("Error with backup '%s'" % self._name) |
| 318 self.lastrun_when=time.monotonic() |
342 self.lastrun_when=time.monotonic() |
| 319 self.state=ERRORS |
343 self.state=ERRORS |
| 320 raise err |
344 self.scheduled_operation=None |
| 321 |
345 |
| 322 return True |
346 # Clean up to terminate: kill borg instance and communication threads |
| 323 |
347 if self.borg_instance: |
| 324 def create(self): |
348 self.logger.debug("Terminating a borg instance") |
| 325 op={'operation': 'create', 'detail': 'manual'} |
349 self.borg_instance.terminate() |
| 326 with self._cond: |
350 |
| |
351 # Store threads to use outside lock |
| |
352 thread_log=self.thread_log |
| |
353 thread_res=self.thread_res |
| |
354 |
| |
355 self.logger.debug("Waiting for log and result threads to terminate") |
| |
356 |
| |
357 if thread_log: |
| |
358 thread_log.join() |
| |
359 |
| |
360 if thread_res: |
| |
361 thread_res.join() |
| |
362 |
| |
363 |
| |
364 # Main thread/1. Wait while a current operation is running |
| |
365 def __main_thread_wait_finish(self): |
| |
366 while self.current_operation and not self._terminate: |
| |
367 self.logger.debug("Waiting for current operation to finish") |
| |
368 self._cond.wait() |
| |
369 |
| |
370 # Main thread/2. Schedule next operation if there is no manually |
| |
371 # requested one |
| |
372 def __main_thread_wait_schedule(self): |
| |
373 op=None |
| |
374 if not self.scheduled_operation: |
| |
375 op=self.__next_operation_unlocked() |
| |
376 if op: |
| |
377 now=time.monotonic() |
| |
378 delay=max(0, op['when_monotonic']-now) |
| |
379 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % |
| |
380 (op['operation'], op['detail'], delay)) |
| |
381 |
| 327 self.scheduled_operation=op |
382 self.scheduled_operation=op |
| 328 self._cond.notify() |
383 self.state=combine_state(self.state, SCHEDULED) |
| 329 |
384 self.__update_status() |
| 330 def prune(self): |
385 |
| 331 op={'operation': 'prune', 'detail': 'manual'} |
386 # Wait under scheduled wait |
| 332 with self._cond: |
387 self.scheduler.wait_until(now+delay, self._cond, self._name) |
| 333 self.scheduled_operation=op |
388 else: |
| 334 self._cond.notify() |
389 # Nothing scheduled - just wait |
| 335 |
390 self.logger.debug("Waiting for manual scheduling") |
| 336 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
391 self._cond.wait() |
| 337 def abort(self): |
392 |
| 338 with self._cond: |
393 # Main thread/3. If there is a scheduled operation (it might have been |
| 339 if self.borg_instance: |
394 # changed manually from 'op' created in __main_thread_wait_schedule above), |
| 340 self.borg_instance.terminate() |
395 # queue it on the repository, and launch the operation once repository |
| |
396 # available |
| |
397 def __main_thread_queue_and_launch(self): |
| |
398 if self.scheduled_operation: |
| |
399 self.logger.debug("Queuing") |
| |
400 self.state=combine_state(self.state, QUEUED) |
| |
401 self.__update_status() |
| |
402 self.repository.queue_action(self._cond, |
| |
403 action=self.__launch_check, |
| |
404 name=self._name) |
| 341 |
405 |
| 342 def __next_operation_unlocked(self): |
406 def __next_operation_unlocked(self): |
| 343 # TODO: pruning as well |
407 # TODO: pruning as well |
| 344 now=time.monotonic() |
408 now=time.monotonic() |
| 345 if not self.lastrun_when: |
409 if not self.lastrun_when: |
| 365 else: |
429 else: |
| 366 return {'operation': 'create', |
430 return {'operation': 'create', |
| 367 'detail': 'normal', |
431 'detail': 'normal', |
| 368 'when_monotonic': self.lastrun_when+self.backup_interval} |
432 'when_monotonic': self.lastrun_when+self.backup_interval} |
| 369 |
433 |
| 370 def __main_thread(self): |
|
| 371 with self._cond: |
|
| 372 while not self._terminate: |
|
| 373 op=None |
|
| 374 if not self.current_operation: |
|
| 375 op=self.__next_operation_unlocked() |
|
| 376 if not op: |
|
| 377 self.__update_status() |
|
| 378 self._cond.wait() |
|
| 379 else: |
|
| 380 now=time.monotonic() |
|
| 381 delay=max(0, op['when_monotonic']-now) |
|
| 382 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
|
| 383 (op['operation'], op['detail'], self._name, delay)) |
|
| 384 |
|
| 385 self.scheduled_operation=op |
|
| 386 self.state=combine_state(self.state, SCHEDULED) |
|
| 387 |
|
| 388 self.__update_status() |
|
| 389 self.scheduler.wait_until(now+delay, self._cond, self._name) |
|
| 390 |
|
| 391 if self.scheduled_operation: |
|
| 392 op=self.scheduled_operation |
|
| 393 self.scheduled_operation=None |
|
| 394 self.repository.queue_action(self._cond, name=self._name, |
|
| 395 action=lambda: self.__launch(op)) |
|
| 396 # Kill a running borg to cause log and result threads to terminate |
|
| 397 if self.borg_instance: |
|
| 398 logger.debug("Terminating a borg instance") |
|
| 399 self.borg_instance.terminate() |
|
| 400 |
|
| 401 # Store threads to use outside lock |
|
| 402 thread_log=self.thread_log |
|
| 403 thread_err=self.thread_err |
|
| 404 |
|
| 405 logger.debug("Waiting for log and result threads to terminate") |
|
| 406 |
|
| 407 if thread_log: |
|
| 408 thread_log.join() |
|
| 409 |
|
| 410 if thread_res: |
|
| 411 thread_res.join() |
|
| 412 |
|
| 413 |
|
| 414 def __status_unlocked(self): |
434 def __status_unlocked(self): |
| 415 callback=self.__status_update_callback |
435 callback=self.__status_update_callback |
| 416 |
436 |
| 417 if self.current_operation: |
437 if self.current_operation: |
| 418 status=self.current_operation |
438 status=self.current_operation |
| 419 status['type']='current' |
439 status['type']='current' |
| 420 # Errors should be set by listeners |
440 # Errors should be set by listeners |
| 421 else: |
441 else: |
| 422 if self.scheduled_operation: |
442 if self.scheduled_operation: |
| 423 status=self.scheduled_operation |
443 status=self.scheduled_operation |
| 424 status['type']='scheduled' |
444 if self.state==QUEUED: |
| |
445 status['type']='queued' |
| |
446 else: |
| |
447 status['type']='scheduled' |
| 425 else: |
448 else: |
| 426 status={'type': 'nothing'} |
449 status={'type': 'nothing'} |
| 427 |
450 |
| 428 status['name']=self._name |
451 status['name']=self._name |
| 429 status['state']=self.state |
452 status['state']=self.state |