| 103 | 106 | 
| 104     def extract_passphrase(self): | 107     def extract_passphrase(self): | 
| 105         acc=self.__keychain_account | 108         acc=self.__keychain_account | 
| 106         if not self.__passphrase: | 109         if not self.__passphrase: | 
| 107             if acc and acc!='': | 110             if acc and acc!='': | 
| 108                 logger.debug('Requesting passphrase') | 111                 self.logger.debug('Requesting passphrase') | 
| 109                 try: | 112                 try: | 
| 110                     pw=keyring.get_password("borg-backup", acc) | 113                     pw=keyring.get_password("borg-backup", acc) | 
| 111                 except Exception as err: | 114                 except Exception as err: | 
| 112                     logger.error('Failed to retrieve passphrase') | 115                     self.logger.error('Failed to retrieve passphrase') | 
| 113                     raise err | 116                     raise err | 
| 114                 else: | 117                 else: | 
| 115                     logger.debug('Received passphrase') | 118                     self.logger.debug('Received passphrase') | 
| 116                 self.__passphrase=pw | 119                 self.__passphrase=pw | 
| 117             else: | 120             else: | 
| 118                 self.__passphrase=None | 121                 self.__passphrase=None | 
| 119         return self.__passphrase | 122         return self.__passphrase | 
| 120 | 123 | 
| 121     def __init__(self, identifier, cfg, scheduler): | 124     def __init__(self, identifier, cfg, scheduler): | 
| 122         self.identifier=identifier | 125         self.identifier=identifier | 
| 123         self.config=config | 126         self.config=config | 
| 124         self.lastrun_when=None | 127         self.__status_update_callback=None | 
|  | 128         self.scheduler=scheduler | 
|  | 129         self.logger=None # setup up __decode_config once backup name is known | 
|  | 130 | 
| 125         self.borg_instance=None | 131         self.borg_instance=None | 
| 126         self.current_operation=None |  | 
| 127         self.thread_log=None | 132         self.thread_log=None | 
| 128         self.thread_res=None | 133         self.thread_res=None | 
|  | 134 | 
|  | 135         self.current_operation=None | 
| 129         self.scheduled_operation=None | 136         self.scheduled_operation=None | 
| 130         self.__status_update_callback=None | 137         self.lastrun_when=None | 
| 131         self.state=INACTIVE | 138         self.state=INACTIVE | 
| 132         self.scheduler=scheduler |  | 
| 133 | 139 | 
| 134         self.__decode_config(cfg) | 140         self.__decode_config(cfg) | 
| 135 | 141 | 
| 136         super().__init__(target = self.__main_thread, name = self._name) | 142         super().__init__(target = self.__main_thread, name = self._name) | 
| 137         self.daemon=True | 143         self.daemon=True | 
| 239 | 246 | 
| 240         # If there were no errors, reset back to INACTIVE state | 247         # If there were no errors, reset back to INACTIVE state | 
| 241         if state==ACTIVE: | 248         if state==ACTIVE: | 
| 242             state=INACTIVE | 249             state=INACTIVE | 
| 243 | 250 | 
| 244         logger.debug('Borg result: %s' % str(res)) | 251         self.logger.debug('Borg result: %s' % str(res)) | 
| 245 | 252 | 
| 246         if res is None and state==INACTIVE: | 253         if res is None and state==INACTIVE: | 
| 247             logger.error('No result from borg despite no error in log') | 254             self.logger.error('No result from borg despite no error in log') | 
| 248             state=ERRORS | 255             state=ERRORS | 
| 249 | 256 | 
| 250         logger.debug('Waiting for borg subprocess to terminate in result thread') | 257         self.logger.debug('Waiting for borg subprocess to terminate in result thread') | 
| 251 | 258 | 
| 252         if not self.borg_instance.wait(): | 259         if not self.borg_instance.wait(): | 
| 253             logger.critical('Borg subprocess did not terminate') | 260             self.logger.error('Borg subprocess did not terminate') | 
| 254             state=combine_state(state, ERRORS) | 261             state=combine_state(state, ERRORS) | 
| 255 | 262 | 
| 256         logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) | 263         self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) | 
| 257 | 264 | 
| 258         with self._cond: | 265         with self._cond: | 
| 259             if self.current_operation['operation']=='create': | 266             if self.current_operation['operation']=='create': | 
| 260                 self.lastrun_when=self.current_operation['when_monotonic'] | 267                 self.lastrun_when=self.current_operation['when_monotonic'] | 
| 261             self.thread_res=None | 268             self.thread_res=None | 
| 262             self.thread_log=None | 269             self.thread_log=None | 
| 263             self.borg_instance=None | 270             self.borg_instance=None | 
| 264             self.current_operation=None | 271             self.current_operation=None | 
| 265             self.state=state | 272             self.state=state | 
|  | 273             self.__update_status() | 
| 266             self._cond.notify() | 274             self._cond.notify() | 
| 267 | 275 | 
| 268     def __do_launch(self, op, archive_or_repository, *args): | 276     def __do_launch(self, op, archive_or_repository, *args): | 
| 269         passphrase=self.extract_passphrase() | 277         passphrase=self.extract_passphrase() | 
| 270 | 278 | 
| 271         inst=BorgInstance(op['operation'], archive_or_repository, *args) | 279         inst=BorgInstance(op['operation'], archive_or_repository, *args) | 
| 272         inst.launch(passphrase=passphrase) | 280         inst.launch(passphrase=passphrase) | 
| 273 | 281 | 
| 274         logger.debug('Creating listener threads') | 282         self.logger.debug('Creating listener threads') | 
| 275 | 283 | 
| 276         t_log=Thread(target=self.__log_listener) | 284         t_log=Thread(target=self.__log_listener) | 
| 277         t_log.daemon=True | 285         t_log.daemon=True | 
| 278 | 286 | 
| 279         t_res=Thread(target=self.__result_listener) | 287         t_res=Thread(target=self.__result_listener) | 
| 280         t_res.daemon=True | 288         t_res.daemon=True | 
| 281 | 289 | 
| 282         self.thread_log=t_log | 290         self.thread_log=t_log | 
| 283         self.thread_res=t_res | 291         self.thread_res=t_res | 
| 284         self.borg_instance=inst | 292         self.borg_instance=inst | 
| 285         self.current_operation=op |  | 
| 286         self.current_operation['when_monotonic']=time.monotonic() |  | 
| 287         self.state=ACTIVE |  | 
| 288 | 293 | 
| 289         t_log.start() | 294         t_log.start() | 
| 290         t_res.start() | 295         t_res.start() | 
| 291 | 296 | 
| 292     def __launch(self, op): | 297     def __launch(self, op): | 
| 293         if self.__is_running_unlocked(): | 298         self.logger.debug("Launching '%s'" % op['operation']) | 
| 294             logging.info('Cannot start %s: already running %s' | 299 | 
| 295                          % (operation, self.current_operation)) | 300         if op['operation']=='create': | 
| 296             return False | 301             archive="%s::%s%s" % (self.repository.repository_name, | 
|  | 302                                   self.archive_prefix, | 
|  | 303                                   self.archive_template) | 
|  | 304 | 
|  | 305             self.__do_launch(op, archive, | 
|  | 306                              self.common_parameters+self.create_parameters, | 
|  | 307                              self.paths) | 
|  | 308         elif op['operation']=='prune': | 
|  | 309             self.__do_launch(op, self.repository.repository_name, | 
|  | 310                              ([{'prefix': self.archive_prefix}] + | 
|  | 311                               self.common_parameters + | 
|  | 312                               self.prune_parameters)) | 
| 297         else: | 313         else: | 
|  | 314             raise NotImplementedError("Invalid operation '%s'" % op['operation']) | 
|  | 315 | 
|  | 316     def __launch_check(self): | 
|  | 317         op=self.scheduled_operation | 
|  | 318         if not op: | 
|  | 319             self.logger.debug("Queued operation aborted") | 
|  | 320         else: | 
|  | 321             self.scheduled_operation=None | 
|  | 322             self.current_operation=op | 
|  | 323             self.current_operation['when_monotonic']=time.monotonic() | 
|  | 324 | 
|  | 325             self.__launch(op) | 
|  | 326 | 
|  | 327             self.state=ACTIVE | 
|  | 328             self.__update_status() | 
|  | 329 | 
|  | 330 | 
|  | 331     def __main_thread(self): | 
|  | 332         with self._cond: | 
| 298             try: | 333             try: | 
| 299                 logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) | 334                 while not self._terminate: | 
| 300 | 335                     self.__main_thread_wait_finish() | 
| 301                 if op['operation']=='create': | 336                     if not self._terminate: | 
| 302                     archive="%s::%s%s" % (self.repository.repository_name, | 337                         self.__main_thread_wait_schedule() | 
| 303                                           self.archive_prefix, | 338                         if not self._terminate: | 
| 304                                           self.archive_template) | 339                             self.__main_thread_queue_and_launch() | 
| 305 |  | 
| 306                     self.__do_launch(op, archive, |  | 
| 307                                      self.common_parameters+self.create_parameters, |  | 
| 308                                      self.paths) |  | 
| 309                 elif op['operation']=='prune': |  | 
| 310                     self.__do_launch(op, self.repository.repository_name, |  | 
| 311                                      ([{'prefix': self.archive_prefix}] + |  | 
| 312                                       self.common_parameters + |  | 
| 313                                       self.prune_parameters)) |  | 
| 314                 else: |  | 
| 315                     raise NotImplementedError("Invalid operation '%s'" % op['operation']) |  | 
| 316             except Exception as err: | 340             except Exception as err: | 
| 317                 logger.debug('Rescheduling after failure') | 341                 self.logger.exception("Error with backup '%s'" % self._name) | 
| 318                 self.lastrun_when=time.monotonic() | 342                 self.lastrun_when=time.monotonic() | 
| 319                 self.state=ERRORS | 343                 self.state=ERRORS | 
| 320                 raise err | 344                 self.scheduled_operation=None | 
| 321 | 345 | 
| 322             return True | 346             # Clean up to terminate: kill borg instance and communication threads | 
| 323 | 347             if self.borg_instance: | 
| 324     def create(self): | 348                 self.logger.debug("Terminating a borg instance") | 
| 325         op={'operation': 'create', 'detail': 'manual'} | 349                 self.borg_instance.terminate() | 
| 326         with self._cond: | 350 | 
|  | 351             # Store threads to use outside lock | 
|  | 352             thread_log=self.thread_log | 
|  | 353             thread_res=self.thread_res | 
|  | 354 | 
|  | 355         self.logger.debug("Waiting for log and result threads to terminate") | 
|  | 356 | 
|  | 357         if thread_log: | 
|  | 358             thread_log.join() | 
|  | 359 | 
|  | 360         if thread_res: | 
|  | 361             thread_res.join() | 
|  | 362 | 
|  | 363 | 
|  | 364     # Main thread/1. Wait while a current operation is running | 
|  | 365     def __main_thread_wait_finish(self): | 
|  | 366         while self.current_operation and not self._terminate: | 
|  | 367             self.logger.debug("Waiting for current operation to finish") | 
|  | 368             self._cond.wait() | 
|  | 369 | 
|  | 370     # Main thread/2. Schedule next operation if there is no manually | 
|  | 371     # requested one | 
|  | 372     def __main_thread_wait_schedule(self): | 
|  | 373         op=None | 
|  | 374         if not self.scheduled_operation: | 
|  | 375             op=self.__next_operation_unlocked() | 
|  | 376         if op: | 
|  | 377             now=time.monotonic() | 
|  | 378             delay=max(0, op['when_monotonic']-now) | 
|  | 379             self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % | 
|  | 380                              (op['operation'], op['detail'],  delay)) | 
|  | 381 | 
| 327             self.scheduled_operation=op | 382             self.scheduled_operation=op | 
| 328             self._cond.notify() | 383             self.state=combine_state(self.state, SCHEDULED) | 
| 329 | 384             self.__update_status() | 
| 330     def prune(self): | 385 | 
| 331         op={'operation': 'prune', 'detail': 'manual'} | 386             # Wait under scheduled wait | 
| 332         with self._cond: | 387             self.scheduler.wait_until(now+delay, self._cond, self._name) | 
| 333             self.scheduled_operation=op | 388         else: | 
| 334             self._cond.notify() | 389             # Nothing scheduled - just wait | 
| 335 | 390             self.logger.debug("Waiting for manual scheduling") | 
| 336     # TODO: Decide exact (manual) abort mechanism. Perhaps two stages | 391             self._cond.wait() | 
| 337     def abort(self): | 392 | 
| 338         with self._cond: | 393     # Main thread/3. If there is a scheduled operation (it might have been | 
| 339             if self.borg_instance: | 394     # changed manually from 'op' created in __main_thread_wait_schedule above), | 
| 340                 self.borg_instance.terminate() | 395     # queue it on the repository, and launch the operation once repository | 
|  | 396     # available | 
|  | 397     def __main_thread_queue_and_launch(self): | 
|  | 398         if self.scheduled_operation: | 
|  | 399             self.logger.debug("Queuing") | 
|  | 400             self.state=combine_state(self.state, QUEUED) | 
|  | 401             self.__update_status() | 
|  | 402             self.repository.queue_action(self._cond, | 
|  | 403                                          action=self.__launch_check, | 
|  | 404                                          name=self._name) | 
| 341 | 405 | 
| 342     def __next_operation_unlocked(self): | 406     def __next_operation_unlocked(self): | 
| 343         # TODO: pruning as well | 407         # TODO: pruning as well | 
| 344         now=time.monotonic() | 408         now=time.monotonic() | 
| 345         if not self.lastrun_when: | 409         if not self.lastrun_when: | 
| 365             else: | 429             else: | 
| 366                 return {'operation': 'create', | 430                 return {'operation': 'create', | 
| 367                         'detail': 'normal', | 431                         'detail': 'normal', | 
| 368                         'when_monotonic': self.lastrun_when+self.backup_interval} | 432                         'when_monotonic': self.lastrun_when+self.backup_interval} | 
| 369 | 433 | 
| 370     def __main_thread(self): |  | 
| 371         with self._cond: |  | 
| 372             while not self._terminate: |  | 
| 373                 op=None |  | 
| 374                 if not self.current_operation: |  | 
| 375                     op=self.__next_operation_unlocked() |  | 
| 376                 if not op: |  | 
| 377                     self.__update_status() |  | 
| 378                     self._cond.wait() |  | 
| 379                 else: |  | 
| 380                     now=time.monotonic() |  | 
| 381                     delay=max(0, op['when_monotonic']-now) |  | 
| 382                     logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |  | 
| 383                                 (op['operation'], op['detail'], self._name, delay)) |  | 
| 384 |  | 
| 385                     self.scheduled_operation=op |  | 
| 386                     self.state=combine_state(self.state, SCHEDULED) |  | 
| 387 |  | 
| 388                     self.__update_status() |  | 
| 389                     self.scheduler.wait_until(now+delay, self._cond, self._name) |  | 
| 390 |  | 
| 391                 if self.scheduled_operation: |  | 
| 392                     op=self.scheduled_operation |  | 
| 393                     self.scheduled_operation=None |  | 
| 394                     self.repository.queue_action(self._cond, name=self._name, |  | 
| 395                                                  action=lambda: self.__launch(op)) |  | 
| 396             # Kill a running borg to cause log and result threads to terminate |  | 
| 397             if self.borg_instance: |  | 
| 398                 logger.debug("Terminating a borg instance") |  | 
| 399                 self.borg_instance.terminate() |  | 
| 400 |  | 
| 401             # Store threads to use outside lock |  | 
| 402             thread_log=self.thread_log |  | 
| 403             thread_err=self.thread_err |  | 
| 404 |  | 
| 405         logger.debug("Waiting for log and result threads to terminate") |  | 
| 406 |  | 
| 407         if thread_log: |  | 
| 408             thread_log.join() |  | 
| 409 |  | 
| 410         if thread_res: |  | 
| 411             thread_res.join() |  | 
| 412 |  | 
| 413 |  | 
| 414     def __status_unlocked(self): | 434     def __status_unlocked(self): | 
| 415         callback=self.__status_update_callback | 435         callback=self.__status_update_callback | 
| 416 | 436 | 
| 417         if self.current_operation: | 437         if self.current_operation: | 
| 418             status=self.current_operation | 438             status=self.current_operation | 
| 419             status['type']='current' | 439             status['type']='current' | 
| 420             # Errors should be set by listeners | 440             # Errors should be set by listeners | 
| 421         else: | 441         else: | 
| 422             if self.scheduled_operation: | 442             if self.scheduled_operation: | 
| 423                 status=self.scheduled_operation | 443                 status=self.scheduled_operation | 
| 424                 status['type']='scheduled' | 444                 if self.state==QUEUED: | 
|  | 445                     status['type']='queued' | 
|  | 446                 else: | 
|  | 447                     status['type']='scheduled' | 
| 425             else: | 448             else: | 
| 426                 status={'type': 'nothing'} | 449                 status={'type': 'nothing'} | 
| 427 | 450 | 
| 428         status['name']=self._name | 451         status['name']=self._name | 
| 429         status['state']=self.state | 452         status['state']=self.state |