| 285 def __result_listener(self): |
287 def __result_listener(self): |
| 286 self.logger.debug('Result listener thread waiting for result') |
288 self.logger.debug('Result listener thread waiting for result') |
| 287 |
289 |
| 288 res=self.borg_instance.read_result() |
290 res=self.borg_instance.read_result() |
| 289 |
291 |
| 290 # Finish processing remaining errors |
|
| 291 self.thread_log.join() |
|
| 292 |
|
| 293 with self._cond: |
|
| 294 errors=self.errors |
|
| 295 |
|
| 296 self.logger.debug('Borg result: %s' % str(res)) |
292 self.logger.debug('Borg result: %s' % str(res)) |
| 297 |
293 |
| 298 if res is None: |
294 with self._cond: |
| 299 self.logger.error('No result from borg despite no error in log') |
295 if res is None: |
| 300 if errors.ok(): |
296 self.logger.error('No result from borg despite no error in log') |
| 301 errors=Errors.ERRORS |
297 if errors.ok(): |
| 302 |
298 self.errors=self.errors.combine(Errors.ERRORS) |
| 303 self.logger.debug('Waiting for borg subprocess to terminate in result thread') |
299 |
| 304 |
|
| 305 if not self.borg_instance.wait(): |
|
| 306 self.logger.error('Borg subprocess did not terminate') |
|
| 307 if errors.ok(): |
|
| 308 errors=Errors.ERRORS |
|
| 309 |
|
| 310 self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors)) |
|
| 311 |
|
| 312 with self._cond: |
|
| 313 if self.current_operation.operation=='create': |
|
| 314 self.lastrun_when=self.current_operation.when_monotonic |
|
| 315 self.thread_res=None |
|
| 316 self.thread_log=None |
|
| 317 self.borg_instance=None |
|
| 318 self.current_operation=None |
|
| 319 self.state=State.INACTIVE |
|
| 320 self.errors=errors |
|
| 321 self.__update_status() |
|
| 322 self._cond.notify() |
|
| 323 |
300 |
| 324 def __do_launch(self, op, archive_or_repository, *args): |
301 def __do_launch(self, op, archive_or_repository, *args): |
| 325 passphrase=self.extract_passphrase() |
302 passphrase=self.extract_passphrase() |
| 326 |
303 |
| 327 inst=BorgInstance(op.operation, archive_or_repository, *args) |
304 inst=BorgInstance(op.operation, archive_or_repository, *args) |
| 336 t_res.daemon=True |
313 t_res.daemon=True |
| 337 |
314 |
| 338 self.thread_log=t_log |
315 self.thread_log=t_log |
| 339 self.thread_res=t_res |
316 self.thread_res=t_res |
| 340 self.borg_instance=inst |
317 self.borg_instance=inst |
| |
318 self.current_operation=op |
| |
319 # Update scheduled time to real starting time to schedule |
| |
320 # next run relative to this |
| |
321 self.current_operation.when_monotonic=time.monotonic() |
| |
322 self.state=State.ACTIVE |
| |
323 # Reset error status when starting a new operation |
| |
324 self.errors=Errors.OK |
| |
325 self.__update_status() |
| 341 |
326 |
| 342 t_log.start() |
327 t_log.start() |
| 343 t_res.start() |
328 t_res.start() |
| 344 |
329 |
| 345 def __launch(self, op): |
330 def __launch(self, op): |
| 360 self.prune_parameters)) |
345 self.prune_parameters)) |
| 361 else: |
346 else: |
| 362 raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) |
347 raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) |
| 363 |
348 |
| 364 # This must be called with self._cond held. |
349 # This must be called with self._cond held. |
| 365 def __launch_check(self): |
350 def __launch_and_wait(self): |
| 366 op=self.scheduled_operation |
351 op=self.scheduled_operation |
| 367 if not op: |
352 if not op: |
| 368 self.logger.debug("Queued operation aborted") |
353 self.logger.debug("Queued operation aborted") |
| 369 else: |
354 else: |
| 370 self.scheduled_operation=None |
355 self.scheduled_operation=None |
| 371 |
356 |
| 372 self.__launch(op) |
357 self.__launch(op) |
| 373 |
358 |
| 374 self.current_operation=op |
359 self.__wait_finish() |
| 375 # Update scheduled time to real starting time to schedule |
360 |
| 376 # next run relative to this |
361 def __wait_finish(self): |
| 377 self.current_operation.when_monotonic=time.monotonic() |
362 # Wait for main logger thread to terminate, or for us to be terminated |
| 378 self.state=State.ACTIVE |
363 while not self.terminate and self.thread_res.is_alive(): |
| 379 # Reset error status when starting a new operation |
364 self._cond.release() |
| 380 self.errors=Errors.OK |
365 self.thread_res.join(JOIN_TIMEOUT) |
| 381 self.__update_status() |
366 self._cond.acquire() |
| 382 |
367 |
| |
368 # If terminate has been signalled, let outer termination handler |
| |
369 # take care of things (Within this Backup class, it would be cleanest |
| |
370 # to raise an exception instead, but in most other places it's better |
| |
371 # to just check self._terminate, so we don't complicate things with |
| |
372 # an extra exception.) |
| |
373 if self._terminate: |
| |
374 return |
| |
375 |
| |
376 self.logger.debug('Waiting for borg and log subprocesses to terminate') |
| |
377 |
| |
378 self._cond.release() |
| |
379 self.thread_log.join() |
| |
380 self._cond.acquire() |
| |
381 |
| |
382 if not self.borg_instance.wait(): |
| |
383 self.logger.error('Borg subprocess did not terminate') |
| |
384 self.errors=self.errors.combine(Errors.ERRORS) |
| |
385 |
| |
386 if self.current_operation.operation=='create': |
| |
387 self.lastrun_when=self.current_operation.when_monotonic |
| |
388 self.thread_res=None |
| |
389 self.thread_log=None |
| |
390 self.borg_instance=None |
| |
391 self.current_operation=None |
| |
392 self.state=State.INACTIVE |
| |
393 self.__update_status() |
| 383 |
394 |
| 384 def __main_thread(self): |
395 def __main_thread(self): |
| 385 with self._cond: |
396 with self._cond: |
| 386 try: |
397 try: |
| 387 while not self._terminate: |
398 while not self._terminate: |
| 388 self.__main_thread_wait_finish() |
399 assert(not self.current_operation) |
| |
400 self.__main_thread_wait_schedule() |
| 389 if not self._terminate: |
401 if not self._terminate: |
| 390 self.__main_thread_wait_schedule() |
402 self.__main_thread_queue_and_launch() |
| 391 if not self._terminate: |
|
| 392 self.__main_thread_queue_and_launch() |
|
| 393 except Exception as err: |
403 except Exception as err: |
| 394 self.logger.exception("Error with backup '%s'" % self._name) |
404 self.logger.exception("Error with backup '%s'" % self._name) |
| 395 self.errors=Errors.ERRORS |
405 self.errors=Errors.ERRORS |
| 396 |
406 |
| 397 self.state=State.INACTIVE |
407 self.state=State.INACTIVE |
| 457 def __main_thread_queue_and_launch(self): |
460 def __main_thread_queue_and_launch(self): |
| 458 if self.scheduled_operation: |
461 if self.scheduled_operation: |
| 459 self.logger.debug("Queuing") |
462 self.logger.debug("Queuing") |
| 460 self.state=State.QUEUED |
463 self.state=State.QUEUED |
| 461 self.__update_status() |
464 self.__update_status() |
| 462 self.repository.queue_action(self._cond, |
465 res=self.repository.queue_action(self._cond, |
| 463 action=self.__launch_check, |
466 action=self.__launch_and_wait, |
| 464 name=self._name) |
467 name=self._name) |
| |
468 if not res and not self._terminate: |
| |
469 self.logger.debug("Queueing aborted") |
| |
470 self.scheduled_operation=None |
| |
471 self.state=State.INACTIVE |
| |
472 self.__update_status() |
| 465 |
473 |
| 466 def __next_operation_unlocked(self): |
474 def __next_operation_unlocked(self): |
| 467 # TODO: pruning as well |
475 # TODO: pruning as well |
| 468 now=time.monotonic() |
476 now=time.monotonic() |
| 469 if not self.lastrun_when: |
477 if not self.lastrun_when: |