backup.py

changeset 45
aa2a95dc6093
parent 38
085a635f23f5
child 46
ecb41072a1b7
equal deleted inserted replaced
44:ac449729a896 45:aa2a95dc6093
10 from instance import BorgInstance 10 from instance import BorgInstance
11 from queue import Queue 11 from queue import Queue
12 from threading import Thread, Lock, Timer 12 from threading import Thread, Lock, Timer
13 13
14 logger=borgend.logger.getChild(__name__) 14 logger=borgend.logger.getChild(__name__)
15
16 # State
17 INACTIVE=0
18 SCHEDULED=1
19 ACTIVE=2
20 BUSY=3
21 OFFLINE=4
22 ERRORS=5
23
24 def combine_state(state1, state2):
25 return max(state1, state2)
15 26
16 loglevel_translation={ 27 loglevel_translation={
17 'CRITICAL': logging.CRITICAL, 28 'CRITICAL': logging.CRITICAL,
18 'ERROR': logging.ERROR, 29 'ERROR': logging.ERROR,
19 'WARNING': logging.WARNING, 30 'WARNING': logging.WARNING,
109 120
110 self.__decode_config(cfg) 121 self.__decode_config(cfg)
111 122
112 self.config=config 123 self.config=config
113 self.lastrun_when=None 124 self.lastrun_when=None
114 self.lastrun_errors=None
115 self.borg_instance=None 125 self.borg_instance=None
116 self.current_operation=None 126 self.current_operation=None
117 self.thread_log=None 127 self.thread_log=None
118 self.thread_res=None 128 self.thread_res=None
119 self.timer=None 129 self.timer=None
120 self.scheduled_operation=None 130 self.scheduled_operation=None
121 self.lock=Lock() 131 self.lock=Lock()
122 self.__status_update_callback=None 132 self.__status_update_callback=None
133 self.state=INACTIVE
123 134
124 def is_running(self): 135 def is_running(self):
125 with self.lock: 136 with self.lock:
126 running=self.__is_running_unlocked() 137 running=self.__is_running_unlocked()
127 return running 138 return running
183 status['name']='borg' 194 status['name']='borg'
184 lvl=translate_loglevel(status['levelname']) 195 lvl=translate_loglevel(status['levelname'])
185 logger.log(lvl, status['name'] + ': ' + status['message']) 196 logger.log(lvl, status['name'] + ': ' + status['message'])
186 if lvl>=logging.WARNING: 197 if lvl>=logging.WARNING:
187 errors_this_message=status 198 errors_this_message=status
199 state=ERRORS
200 if ('msgid' in status and
201 (status['msgid']=='LockTimeout' or # observed in reality
202 status['msgid']=='LockErrorT' or # in docs
203 status['msgid']=='LockErrorT')): # in docs
204 state=BUSY
188 with self.lock: 205 with self.lock:
189 self.current_operation['errors']=True 206 self.state=combine_state(self.state, state)
190 status, callback=self.__status_unlocked() 207 status, callback=self.__status_unlocked()
191 else: 208 else:
192 logger.debug('Unrecognised log entry %s' % str(status)) 209 logger.debug('Unrecognised log entry %s' % str(status))
193 210
194 if callback: 211 if callback:
208 225
209 logger.debug('Result listener thread waiting for result') 226 logger.debug('Result listener thread waiting for result')
210 227
211 res=self.borg_instance.read_result() 228 res=self.borg_instance.read_result()
212 229
213 errors=False 230 # Finish processing remaining errors
231 self.thread_log.join()
232
233 with self.lock:
234 state=self.state
235
236 # If there were no errors, reset back to INACTIVE state
237 if state==ACTIVE:
238 state=INACTIVE
214 239
215 logger.debug('Borg result: %s' % str(res)) 240 logger.debug('Borg result: %s' % str(res))
216 241
217 if res is None: 242 if res is None and state==INACTIVE:
218 errors=True 243 logger.error('No result from borg despite no error in log')
244 state=ERRORS
219 245
220 logger.debug('Waiting for borg subprocess to terminate in result thread') 246 logger.debug('Waiting for borg subprocess to terminate in result thread')
221 247
222 errors=errors or not self.borg_instance.wait() 248 if not self.borg_instance.wait():
223 249 logger.critical('Borg subprocess did not terminate')
224 logger.debug('Borg subprocess terminated (errors: %s); terminating result listener thread' % str(errors)) 250 state=combine_state(state, ERRORS)
225 251
226 self.thread_log.join() 252 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state))
227 253
228 with self.lock: 254 with self.lock:
229 if self.current_operation['operation']=='create': 255 if self.current_operation['operation']=='create':
230 self.lastrun_when=self.current_operation['when_monotonic'] 256 self.lastrun_when=self.current_operation['when_monotonic']
231 self.lastrun_errors=errors
232 self.thread_res=None 257 self.thread_res=None
233 self.thread_log=None 258 self.thread_log=None
234 self.borg_instance=None 259 self.borg_instance=None
235 self.current_operation=None 260 self.current_operation=None
236 self.__schedule_unlocked() 261 self.__schedule_unlocked()
262 self.state=state
237 status, callback=self.__status_unlocked() 263 status, callback=self.__status_unlocked()
238 if callback: 264 if callback:
239 callback(self, status) 265 callback(self, status)
240 266
241 def __do_launch(self, queue, op, archive_or_repository, *args): 267 def __do_launch(self, queue, op, archive_or_repository, *args):
256 self.thread_res=t_res 282 self.thread_res=t_res
257 self.borg_instance=inst 283 self.borg_instance=inst
258 self.queue=queue 284 self.queue=queue
259 self.current_operation=op 285 self.current_operation=op
260 self.current_operation['when_monotonic']=time.monotonic() 286 self.current_operation['when_monotonic']=time.monotonic()
287 self.state=ACTIVE
261 288
262 t_log.start() 289 t_log.start()
263 t_res.start() 290 t_res.start()
264 291
265 def __launch(self, op, queue): 292 def __launch(self, op, queue):
292 else: 319 else:
293 raise NotImplementedError("Invalid operation '%s'" % op['operation']) 320 raise NotImplementedError("Invalid operation '%s'" % op['operation'])
294 except Exception as err: 321 except Exception as err:
295 logger.debug('Rescheduling after failure') 322 logger.debug('Rescheduling after failure')
296 self.lastrun_when=time.monotonic() 323 self.lastrun_when=time.monotonic()
297 self.lastrun_errors=True 324 self.state=ERRORS
298 self.__schedule_unlocked() 325 self.__schedule_unlocked()
299 raise err 326 raise err
300 327
301 return True 328 return True
302 329
367 return None 394 return None
368 else: 395 else:
369 return {'operation': 'create', 396 return {'operation': 'create',
370 'detail': 'initial', 397 'detail': 'initial',
371 'when_monotonic': now+initial_interval} 398 'when_monotonic': now+initial_interval}
372 elif self.lastrun_errors: 399 elif self.state>=BUSY:
373 if self.retry_interval==0: 400 if self.retry_interval==0:
374 return None 401 return None
375 else: 402 else:
376 return {'operation': 'create', 403 return {'operation': 'create',
377 'detail': 'retry', 404 'detail': 'retry',
396 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % 423 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" %
397 (op['operation'], op['detail'], self.name, delay)) 424 (op['operation'], op['detail'], self.name, delay))
398 tmr=Timer(delay, self.__queue_timed_operation) 425 tmr=Timer(delay, self.__queue_timed_operation)
399 self.scheduled_operation=op 426 self.scheduled_operation=op
400 self.timer=tmr 427 self.timer=tmr
428 self.state=combine_state(self.state, SCHEDULED)
401 tmr.start() 429 tmr.start()
402 430
403 return op 431 return op
404 432
405 def schedule(self): 433 def schedule(self):
423 status=self.scheduled_operation 451 status=self.scheduled_operation
424 status['type']='scheduled' 452 status['type']='scheduled'
425 else: 453 else:
426 status={'type': 'nothing'} 454 status={'type': 'nothing'}
427 455
428 if self.lastrun_errors is not None: 456 status['name']=self.name
429 status['errors']=self.lastrun_errors 457 status['state']=self.state
430 458
431 if 'detail' not in status: 459 if 'detail' not in status:
432 status['detail']='NONE' 460 status['detail']='NONE'
433
434 if 'errors' not in status:
435 status['errors']=False
436
437 status['name']=self.name
438 461
439 if 'when_monotonic' in status: 462 if 'when_monotonic' in status:
440 status['when']=(status['when_monotonic'] 463 status['when']=(status['when_monotonic']
441 -time.monotonic()+time.time()) 464 -time.monotonic()+time.time())
442 465

mercurial