| 10 from instance import BorgInstance | 10 from instance import BorgInstance | 
| 11 from queue import Queue | 11 from queue import Queue | 
| 12 from threading import Thread, Lock, Timer | 12 from threading import Thread, Lock, Timer | 
| 13 | 13 | 
| 14 logger=borgend.logger.getChild(__name__) | 14 logger=borgend.logger.getChild(__name__) | 
|  | 15 | 
|  | 16 # State | 
|  | 17 INACTIVE=0 | 
|  | 18 SCHEDULED=1 | 
|  | 19 ACTIVE=2 | 
|  | 20 BUSY=3 | 
|  | 21 OFFLINE=4 | 
|  | 22 ERRORS=5 | 
|  | 23 | 
|  | 24 def combine_state(state1, state2): | 
|  | 25     return max(state1, state2) | 
| 15 | 26 | 
| 16 loglevel_translation={ | 27 loglevel_translation={ | 
| 17     'CRITICAL': logging.CRITICAL, | 28     'CRITICAL': logging.CRITICAL, | 
| 18     'ERROR': logging.ERROR, | 29     'ERROR': logging.ERROR, | 
| 19     'WARNING': logging.WARNING, | 30     'WARNING': logging.WARNING, | 
| 109 | 120 | 
| 110         self.__decode_config(cfg) | 121         self.__decode_config(cfg) | 
| 111 | 122 | 
| 112         self.config=config | 123         self.config=config | 
| 113         self.lastrun_when=None | 124         self.lastrun_when=None | 
| 114         self.lastrun_errors=None |  | 
| 115         self.borg_instance=None | 125         self.borg_instance=None | 
| 116         self.current_operation=None | 126         self.current_operation=None | 
| 117         self.thread_log=None | 127         self.thread_log=None | 
| 118         self.thread_res=None | 128         self.thread_res=None | 
| 119         self.timer=None | 129         self.timer=None | 
| 120         self.scheduled_operation=None | 130         self.scheduled_operation=None | 
| 121         self.lock=Lock() | 131         self.lock=Lock() | 
| 122         self.__status_update_callback=None | 132         self.__status_update_callback=None | 
|  | 133         self.state=INACTIVE | 
| 123 | 134 | 
| 124     def is_running(self): | 135     def is_running(self): | 
| 125         with self.lock: | 136         with self.lock: | 
| 126             running=self.__is_running_unlocked() | 137             running=self.__is_running_unlocked() | 
| 127         return running | 138         return running | 
| 183                     status['name']='borg' | 194                     status['name']='borg' | 
| 184                 lvl=translate_loglevel(status['levelname']) | 195                 lvl=translate_loglevel(status['levelname']) | 
| 185                 logger.log(lvl, status['name'] + ': ' + status['message']) | 196                 logger.log(lvl, status['name'] + ': ' + status['message']) | 
| 186                 if lvl>=logging.WARNING: | 197                 if lvl>=logging.WARNING: | 
| 187                     errors_this_message=status | 198                     errors_this_message=status | 
|  | 199                     state=ERRORS | 
|  | 200                     if ('msgid' in status and | 
|  | 201                         (status['msgid']=='LockTimeout' or # observed in reality | 
|  | 202                          status['msgid']=='LockErrorT' or # in docs | 
|  | 203                          status['msgid']=='LockErrorT')): # in docs | 
|  | 204                         state=BUSY | 
| 188                     with self.lock: | 205                     with self.lock: | 
| 189                         self.current_operation['errors']=True | 206                         self.state=combine_state(self.state, state) | 
| 190                         status, callback=self.__status_unlocked() | 207                         status, callback=self.__status_unlocked() | 
| 191             else: | 208             else: | 
| 192                 logger.debug('Unrecognised log entry %s' % str(status)) | 209                 logger.debug('Unrecognised log entry %s' % str(status)) | 
| 193 | 210 | 
| 194             if callback: | 211             if callback: | 
| 208 | 225 | 
| 209         logger.debug('Result listener thread waiting for result') | 226         logger.debug('Result listener thread waiting for result') | 
| 210 | 227 | 
| 211         res=self.borg_instance.read_result() | 228         res=self.borg_instance.read_result() | 
| 212 | 229 | 
| 213         errors=False | 230         # Finish processing remaining errors | 
|  | 231         self.thread_log.join() | 
|  | 232 | 
|  | 233         with self.lock: | 
|  | 234             state=self.state | 
|  | 235 | 
|  | 236         # If there were no errors, reset back to INACTIVE state | 
|  | 237         if state==ACTIVE: | 
|  | 238             state=INACTIVE | 
| 214 | 239 | 
| 215         logger.debug('Borg result: %s' % str(res)) | 240         logger.debug('Borg result: %s' % str(res)) | 
| 216 | 241 | 
| 217         if res is None: | 242         if res is None and state==INACTIVE: | 
| 218             errors=True | 243             logger.error('No result from borg despite no error in log') | 
|  | 244             state=ERRORS | 
| 219 | 245 | 
| 220         logger.debug('Waiting for borg subprocess to terminate in result thread') | 246         logger.debug('Waiting for borg subprocess to terminate in result thread') | 
| 221 | 247 | 
| 222         errors=errors or not self.borg_instance.wait() | 248         if not self.borg_instance.wait(): | 
| 223 | 249             logger.critical('Borg subprocess did not terminate') | 
| 224         logger.debug('Borg subprocess terminated (errors: %s); terminating result listener thread' % str(errors)) | 250             state=combine_state(state, ERRORS) | 
| 225 | 251 | 
| 226         self.thread_log.join() | 252         logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) | 
| 227 | 253 | 
| 228         with self.lock: | 254         with self.lock: | 
| 229             if self.current_operation['operation']=='create': | 255             if self.current_operation['operation']=='create': | 
| 230                 self.lastrun_when=self.current_operation['when_monotonic'] | 256                 self.lastrun_when=self.current_operation['when_monotonic'] | 
| 231                 self.lastrun_errors=errors |  | 
| 232             self.thread_res=None | 257             self.thread_res=None | 
| 233             self.thread_log=None | 258             self.thread_log=None | 
| 234             self.borg_instance=None | 259             self.borg_instance=None | 
| 235             self.current_operation=None | 260             self.current_operation=None | 
| 236             self.__schedule_unlocked() | 261             self.__schedule_unlocked() | 
|  | 262             self.state=state | 
| 237             status, callback=self.__status_unlocked() | 263             status, callback=self.__status_unlocked() | 
| 238         if callback: | 264         if callback: | 
| 239             callback(self, status) | 265             callback(self, status) | 
| 240 | 266 | 
| 241     def __do_launch(self, queue, op, archive_or_repository, *args): | 267     def __do_launch(self, queue, op, archive_or_repository, *args): | 
| 256         self.thread_res=t_res | 282         self.thread_res=t_res | 
| 257         self.borg_instance=inst | 283         self.borg_instance=inst | 
| 258         self.queue=queue | 284         self.queue=queue | 
| 259         self.current_operation=op | 285         self.current_operation=op | 
| 260         self.current_operation['when_monotonic']=time.monotonic() | 286         self.current_operation['when_monotonic']=time.monotonic() | 
|  | 287         self.state=ACTIVE | 
| 261 | 288 | 
| 262         t_log.start() | 289         t_log.start() | 
| 263         t_res.start() | 290         t_res.start() | 
| 264 | 291 | 
| 265     def __launch(self, op, queue): | 292     def __launch(self, op, queue): | 
| 292                 else: | 319                 else: | 
| 293                     raise NotImplementedError("Invalid operation '%s'" % op['operation']) | 320                     raise NotImplementedError("Invalid operation '%s'" % op['operation']) | 
| 294             except Exception as err: | 321             except Exception as err: | 
| 295                 logger.debug('Rescheduling after failure') | 322                 logger.debug('Rescheduling after failure') | 
| 296                 self.lastrun_when=time.monotonic() | 323                 self.lastrun_when=time.monotonic() | 
| 297                 self.lastrun_errors=True | 324                 self.state=ERRORS | 
| 298                 self.__schedule_unlocked() | 325                 self.__schedule_unlocked() | 
| 299                 raise err | 326                 raise err | 
| 300 | 327 | 
| 301             return True | 328             return True | 
| 302 | 329 | 
| 367                 return None | 394                 return None | 
| 368             else: | 395             else: | 
| 369                 return {'operation': 'create', | 396                 return {'operation': 'create', | 
| 370                         'detail': 'initial', | 397                         'detail': 'initial', | 
| 371                         'when_monotonic': now+initial_interval} | 398                         'when_monotonic': now+initial_interval} | 
| 372         elif self.lastrun_errors: | 399         elif self.state>=BUSY: | 
| 373             if self.retry_interval==0: | 400             if self.retry_interval==0: | 
| 374                 return None | 401                 return None | 
| 375             else: | 402             else: | 
| 376                 return {'operation': 'create', | 403                 return {'operation': 'create', | 
| 377                         'detail': 'retry', | 404                         'detail': 'retry', | 
| 396                 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % | 423                 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % | 
| 397                             (op['operation'], op['detail'], self.name, delay)) | 424                             (op['operation'], op['detail'], self.name, delay)) | 
| 398                 tmr=Timer(delay, self.__queue_timed_operation) | 425                 tmr=Timer(delay, self.__queue_timed_operation) | 
| 399                 self.scheduled_operation=op | 426                 self.scheduled_operation=op | 
| 400                 self.timer=tmr | 427                 self.timer=tmr | 
|  | 428                 self.state=combine_state(self.state, SCHEDULED) | 
| 401                 tmr.start() | 429                 tmr.start() | 
| 402 | 430 | 
| 403             return op | 431             return op | 
| 404 | 432 | 
| 405     def schedule(self): | 433     def schedule(self): | 
| 423                 status=self.scheduled_operation | 451                 status=self.scheduled_operation | 
| 424                 status['type']='scheduled' | 452                 status['type']='scheduled' | 
| 425             else: | 453             else: | 
| 426                 status={'type': 'nothing'} | 454                 status={'type': 'nothing'} | 
| 427 | 455 | 
| 428             if self.lastrun_errors is not None: | 456         status['name']=self.name | 
| 429                 status['errors']=self.lastrun_errors | 457         status['state']=self.state | 
| 430 | 458 | 
| 431         if 'detail' not in status: | 459         if 'detail' not in status: | 
| 432             status['detail']='NONE' | 460             status['detail']='NONE' | 
| 433 |  | 
| 434         if 'errors' not in status: |  | 
| 435             status['errors']=False |  | 
| 436 |  | 
| 437         status['name']=self.name |  | 
| 438 | 461 | 
| 439         if 'when_monotonic' in status: | 462         if 'when_monotonic' in status: | 
| 440             status['when']=(status['when_monotonic'] | 463             status['when']=(status['when_monotonic'] | 
| 441                             -time.monotonic()+time.time()) | 464                             -time.monotonic()+time.time()) | 
| 442 | 465 |