backup.py

changeset 64
6cfe6a89e810
parent 63
1fd6814a29fc
child 70
3f794760d52e
equal deleted inserted replaced
63:1fd6814a29fc 64:6cfe6a89e810
12 from instance import BorgInstance 12 from instance import BorgInstance
13 from threading import Thread, Lock, Condition 13 from threading import Thread, Lock, Condition
14 from scheduler import TerminableThread 14 from scheduler import TerminableThread
15 15
16 logger=borgend.logger.getChild(__name__) 16 logger=borgend.logger.getChild(__name__)
17
18 JOIN_TIMEOUT=60
17 19
18 # 20 #
19 # State and operation related helper classes 21 # State and operation related helper classes
20 # 22 #
21 23
216 218
217 def __log_listener(self): 219 def __log_listener(self):
218 self.logger.debug('Log listener thread waiting for entries') 220 self.logger.debug('Log listener thread waiting for entries')
219 success=True 221 success=True
220 for msg in iter(self.borg_instance.read_log, None): 222 for msg in iter(self.borg_instance.read_log, None):
221 self.logger.debug(str(msg)) 223 self.logger.info(str(msg))
222 t=msg['type'] 224 t=msg['type']
223 225
224 errormsg=None 226 errormsg=None
225 callback=None 227 callback=None
226 228
285 def __result_listener(self): 287 def __result_listener(self):
286 self.logger.debug('Result listener thread waiting for result') 288 self.logger.debug('Result listener thread waiting for result')
287 289
288 res=self.borg_instance.read_result() 290 res=self.borg_instance.read_result()
289 291
290 # Finish processing remaining errors
291 self.thread_log.join()
292
293 with self._cond:
294 errors=self.errors
295
296 self.logger.debug('Borg result: %s' % str(res)) 292 self.logger.debug('Borg result: %s' % str(res))
297 293
298 if res is None: 294 with self._cond:
299 self.logger.error('No result from borg despite no error in log') 295 if res is None:
300 if errors.ok(): 296 self.logger.error('No result from borg despite no error in log')
301 errors=Errors.ERRORS 297 if errors.ok():
302 298 self.errors=self.errors.combine(Errors.ERRORS)
303 self.logger.debug('Waiting for borg subprocess to terminate in result thread') 299
304
305 if not self.borg_instance.wait():
306 self.logger.error('Borg subprocess did not terminate')
307 if errors.ok():
308 errors=Errors.ERRORS
309
310 self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors))
311
312 with self._cond:
313 if self.current_operation.operation=='create':
314 self.lastrun_when=self.current_operation.when_monotonic
315 self.thread_res=None
316 self.thread_log=None
317 self.borg_instance=None
318 self.current_operation=None
319 self.state=State.INACTIVE
320 self.errors=errors
321 self.__update_status()
322 self._cond.notify()
323 300
324 def __do_launch(self, op, archive_or_repository, *args): 301 def __do_launch(self, op, archive_or_repository, *args):
325 passphrase=self.extract_passphrase() 302 passphrase=self.extract_passphrase()
326 303
327 inst=BorgInstance(op.operation, archive_or_repository, *args) 304 inst=BorgInstance(op.operation, archive_or_repository, *args)
336 t_res.daemon=True 313 t_res.daemon=True
337 314
338 self.thread_log=t_log 315 self.thread_log=t_log
339 self.thread_res=t_res 316 self.thread_res=t_res
340 self.borg_instance=inst 317 self.borg_instance=inst
318 self.current_operation=op
319 # Update scheduled time to real starting time to schedule
320 # next run relative to this
321 self.current_operation.when_monotonic=time.monotonic()
322 self.state=State.ACTIVE
323 # Reset error status when starting a new operation
324 self.errors=Errors.OK
325 self.__update_status()
341 326
342 t_log.start() 327 t_log.start()
343 t_res.start() 328 t_res.start()
344 329
345 def __launch(self, op): 330 def __launch(self, op):
360 self.prune_parameters)) 345 self.prune_parameters))
361 else: 346 else:
362 raise NotImplementedError("Invalid operation '%s'" % str(op.operation)) 347 raise NotImplementedError("Invalid operation '%s'" % str(op.operation))
363 348
364 # This must be called with self._cond held. 349 # This must be called with self._cond held.
365 def __launch_check(self): 350 def __launch_and_wait(self):
366 op=self.scheduled_operation 351 op=self.scheduled_operation
367 if not op: 352 if not op:
368 self.logger.debug("Queued operation aborted") 353 self.logger.debug("Queued operation aborted")
369 else: 354 else:
370 self.scheduled_operation=None 355 self.scheduled_operation=None
371 356
372 self.__launch(op) 357 self.__launch(op)
373 358
374 self.current_operation=op 359 self.__wait_finish()
375 # Update scheduled time to real starting time to schedule 360
376 # next run relative to this 361 def __wait_finish(self):
377 self.current_operation.when_monotonic=time.monotonic() 362 # Wait for main logger thread to terminate, or for us to be terminated
378 self.state=State.ACTIVE 363 while not self.terminate and self.thread_res.is_alive():
379 # Reset error status when starting a new operation 364 self._cond.release()
380 self.errors=Errors.OK 365 self.thread_res.join(JOIN_TIMEOUT)
381 self.__update_status() 366 self._cond.acquire()
382 367
368 # If terminate has been signalled, let outer termination handler
369 # take care of things (Within this Backup class, it would be cleanest
370 # to raise an exception instead, but in most other places it's better
371 # to just check self._terminate, so we don't complicate things with
372 # an extra exception.)
373 if self._terminate:
374 return
375
376 self.logger.debug('Waiting for borg and log subprocesses to terminate')
377
378 self._cond.release()
379 self.thread_log.join()
380 self._cond.acquire()
381
382 if not self.borg_instance.wait():
383 self.logger.error('Borg subprocess did not terminate')
384 self.errors=self.errors.combine(Errors.ERRORS)
385
386 if self.current_operation.operation=='create':
387 self.lastrun_when=self.current_operation.when_monotonic
388 self.thread_res=None
389 self.thread_log=None
390 self.borg_instance=None
391 self.current_operation=None
392 self.state=State.INACTIVE
393 self.__update_status()
383 394
384 def __main_thread(self): 395 def __main_thread(self):
385 with self._cond: 396 with self._cond:
386 try: 397 try:
387 while not self._terminate: 398 while not self._terminate:
388 self.__main_thread_wait_finish() 399 assert(not self.current_operation)
400 self.__main_thread_wait_schedule()
389 if not self._terminate: 401 if not self._terminate:
390 self.__main_thread_wait_schedule() 402 self.__main_thread_queue_and_launch()
391 if not self._terminate:
392 self.__main_thread_queue_and_launch()
393 except Exception as err: 403 except Exception as err:
394 self.logger.exception("Error with backup '%s'" % self._name) 404 self.logger.exception("Error with backup '%s'" % self._name)
395 self.errors=Errors.ERRORS 405 self.errors=Errors.ERRORS
396 406
397 self.state=State.INACTIVE 407 self.state=State.INACTIVE
413 if thread_log: 423 if thread_log:
414 thread_log.join() 424 thread_log.join()
415 425
416 if thread_res: 426 if thread_res:
417 thread_res.join() 427 thread_res.join()
418
419
420 # Main thread/1. Wait while a current operation is running
421 def __main_thread_wait_finish(self):
422 while self.current_operation and not self._terminate:
423 self.logger.debug("Waiting for current operation to finish")
424 self._cond.wait()
425 428
426 # Main thread/2. Schedule next operation if there is no manually 429 # Main thread/2. Schedule next operation if there is no manually
427 # requested one 430 # requested one
428 def __main_thread_wait_schedule(self): 431 def __main_thread_wait_schedule(self):
429 op=None 432 op=None
457 def __main_thread_queue_and_launch(self): 460 def __main_thread_queue_and_launch(self):
458 if self.scheduled_operation: 461 if self.scheduled_operation:
459 self.logger.debug("Queuing") 462 self.logger.debug("Queuing")
460 self.state=State.QUEUED 463 self.state=State.QUEUED
461 self.__update_status() 464 self.__update_status()
462 self.repository.queue_action(self._cond, 465 res=self.repository.queue_action(self._cond,
463 action=self.__launch_check, 466 action=self.__launch_and_wait,
464 name=self._name) 467 name=self._name)
468 if not res and not self._terminate:
469 self.logger.debug("Queueing aborted")
470 self.scheduled_operation=None
471 self.state=State.INACTIVE
472 self.__update_status()
465 473
466 def __next_operation_unlocked(self): 474 def __next_operation_unlocked(self):
467 # TODO: pruning as well 475 # TODO: pruning as well
468 now=time.monotonic() 476 now=time.monotonic()
469 if not self.lastrun_when: 477 if not self.lastrun_when:

mercurial