285 def __result_listener(self): |
287 def __result_listener(self): |
286 self.logger.debug('Result listener thread waiting for result') |
288 self.logger.debug('Result listener thread waiting for result') |
287 |
289 |
288 res=self.borg_instance.read_result() |
290 res=self.borg_instance.read_result() |
289 |
291 |
290 # Finish processing remaining errors |
|
291 self.thread_log.join() |
|
292 |
|
293 with self._cond: |
|
294 errors=self.errors |
|
295 |
|
296 self.logger.debug('Borg result: %s' % str(res)) |
292 self.logger.debug('Borg result: %s' % str(res)) |
297 |
293 |
298 if res is None: |
294 with self._cond: |
299 self.logger.error('No result from borg despite no error in log') |
295 if res is None: |
300 if errors.ok(): |
296 self.logger.error('No result from borg despite no error in log') |
301 errors=Errors.ERRORS |
297 if errors.ok(): |
302 |
298 self.errors=self.errors.combine(Errors.ERRORS) |
303 self.logger.debug('Waiting for borg subprocess to terminate in result thread') |
299 |
304 |
|
305 if not self.borg_instance.wait(): |
|
306 self.logger.error('Borg subprocess did not terminate') |
|
307 if errors.ok(): |
|
308 errors=Errors.ERRORS |
|
309 |
|
310 self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors)) |
|
311 |
|
312 with self._cond: |
|
313 if self.current_operation.operation=='create': |
|
314 self.lastrun_when=self.current_operation.when_monotonic |
|
315 self.thread_res=None |
|
316 self.thread_log=None |
|
317 self.borg_instance=None |
|
318 self.current_operation=None |
|
319 self.state=State.INACTIVE |
|
320 self.errors=errors |
|
321 self.__update_status() |
|
322 self._cond.notify() |
|
323 |
300 |
324 def __do_launch(self, op, archive_or_repository, *args): |
301 def __do_launch(self, op, archive_or_repository, *args): |
325 passphrase=self.extract_passphrase() |
302 passphrase=self.extract_passphrase() |
326 |
303 |
327 inst=BorgInstance(op.operation, archive_or_repository, *args) |
304 inst=BorgInstance(op.operation, archive_or_repository, *args) |
336 t_res.daemon=True |
313 t_res.daemon=True |
337 |
314 |
338 self.thread_log=t_log |
315 self.thread_log=t_log |
339 self.thread_res=t_res |
316 self.thread_res=t_res |
340 self.borg_instance=inst |
317 self.borg_instance=inst |
|
318 self.current_operation=op |
|
319 # Update scheduled time to real starting time to schedule |
|
320 # next run relative to this |
|
321 self.current_operation.when_monotonic=time.monotonic() |
|
322 self.state=State.ACTIVE |
|
323 # Reset error status when starting a new operation |
|
324 self.errors=Errors.OK |
|
325 self.__update_status() |
341 |
326 |
342 t_log.start() |
327 t_log.start() |
343 t_res.start() |
328 t_res.start() |
344 |
329 |
345 def __launch(self, op): |
330 def __launch(self, op): |
360 self.prune_parameters)) |
345 self.prune_parameters)) |
361 else: |
346 else: |
362 raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) |
347 raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) |
363 |
348 |
364 # This must be called with self._cond held. |
349 # This must be called with self._cond held. |
365 def __launch_check(self): |
350 def __launch_and_wait(self): |
366 op=self.scheduled_operation |
351 op=self.scheduled_operation |
367 if not op: |
352 if not op: |
368 self.logger.debug("Queued operation aborted") |
353 self.logger.debug("Queued operation aborted") |
369 else: |
354 else: |
370 self.scheduled_operation=None |
355 self.scheduled_operation=None |
371 |
356 |
372 self.__launch(op) |
357 self.__launch(op) |
373 |
358 |
374 self.current_operation=op |
359 self.__wait_finish() |
375 # Update scheduled time to real starting time to schedule |
360 |
376 # next run relative to this |
361 def __wait_finish(self): |
377 self.current_operation.when_monotonic=time.monotonic() |
362 # Wait for main logger thread to terminate, or for us to be terminated |
378 self.state=State.ACTIVE |
363 while not self.terminate and self.thread_res.is_alive(): |
379 # Reset error status when starting a new operation |
364 self._cond.release() |
380 self.errors=Errors.OK |
365 self.thread_res.join(JOIN_TIMEOUT) |
381 self.__update_status() |
366 self._cond.acquire() |
382 |
367 |
|
368 # If terminate has been signalled, let outer termination handler |
|
369 # take care of things (Within this Backup class, it would be cleanest |
|
370 # to raise an exception instead, but in most other places it's better |
|
371 # to just check self._terminate, so we don't complicate things with |
|
372 # an extra exception.) |
|
373 if self._terminate: |
|
374 return |
|
375 |
|
376 self.logger.debug('Waiting for borg and log subprocesses to terminate') |
|
377 |
|
378 self._cond.release() |
|
379 self.thread_log.join() |
|
380 self._cond.acquire() |
|
381 |
|
382 if not self.borg_instance.wait(): |
|
383 self.logger.error('Borg subprocess did not terminate') |
|
384 self.errors=self.errors.combine(Errors.ERRORS) |
|
385 |
|
386 if self.current_operation.operation=='create': |
|
387 self.lastrun_when=self.current_operation.when_monotonic |
|
388 self.thread_res=None |
|
389 self.thread_log=None |
|
390 self.borg_instance=None |
|
391 self.current_operation=None |
|
392 self.state=State.INACTIVE |
|
393 self.__update_status() |
383 |
394 |
384 def __main_thread(self): |
395 def __main_thread(self): |
385 with self._cond: |
396 with self._cond: |
386 try: |
397 try: |
387 while not self._terminate: |
398 while not self._terminate: |
388 self.__main_thread_wait_finish() |
399 assert(not self.current_operation) |
|
400 self.__main_thread_wait_schedule() |
389 if not self._terminate: |
401 if not self._terminate: |
390 self.__main_thread_wait_schedule() |
402 self.__main_thread_queue_and_launch() |
391 if not self._terminate: |
|
392 self.__main_thread_queue_and_launch() |
|
393 except Exception as err: |
403 except Exception as err: |
394 self.logger.exception("Error with backup '%s'" % self._name) |
404 self.logger.exception("Error with backup '%s'" % self._name) |
395 self.errors=Errors.ERRORS |
405 self.errors=Errors.ERRORS |
396 |
406 |
397 self.state=State.INACTIVE |
407 self.state=State.INACTIVE |
457 def __main_thread_queue_and_launch(self): |
460 def __main_thread_queue_and_launch(self): |
458 if self.scheduled_operation: |
461 if self.scheduled_operation: |
459 self.logger.debug("Queuing") |
462 self.logger.debug("Queuing") |
460 self.state=State.QUEUED |
463 self.state=State.QUEUED |
461 self.__update_status() |
464 self.__update_status() |
462 self.repository.queue_action(self._cond, |
465 res=self.repository.queue_action(self._cond, |
463 action=self.__launch_check, |
466 action=self.__launch_and_wait, |
464 name=self._name) |
467 name=self._name) |
|
468 if not res and not self._terminate: |
|
469 self.logger.debug("Queueing aborted") |
|
470 self.scheduled_operation=None |
|
471 self.state=State.INACTIVE |
|
472 self.__update_status() |
465 |
473 |
466 def __next_operation_unlocked(self): |
474 def __next_operation_unlocked(self): |
467 # TODO: pruning as well |
475 # TODO: pruning as well |
468 now=time.monotonic() |
476 now=time.monotonic() |
469 if not self.lastrun_when: |
477 if not self.lastrun_when: |