| 10 from instance import BorgInstance |
10 from instance import BorgInstance |
| 11 from queue import Queue |
11 from queue import Queue |
| 12 from threading import Thread, Lock, Timer |
12 from threading import Thread, Lock, Timer |
| 13 |
13 |
| 14 logger=borgend.logger.getChild(__name__) |
14 logger=borgend.logger.getChild(__name__) |
| |
15 |
| |
16 # State |
| |
17 INACTIVE=0 |
| |
18 SCHEDULED=1 |
| |
19 ACTIVE=2 |
| |
20 BUSY=3 |
| |
21 OFFLINE=4 |
| |
22 ERRORS=5 |
| |
23 |
| |
24 def combine_state(state1, state2): |
| |
25 return max(state1, state2) |
| 15 |
26 |
| 16 loglevel_translation={ |
27 loglevel_translation={ |
| 17 'CRITICAL': logging.CRITICAL, |
28 'CRITICAL': logging.CRITICAL, |
| 18 'ERROR': logging.ERROR, |
29 'ERROR': logging.ERROR, |
| 19 'WARNING': logging.WARNING, |
30 'WARNING': logging.WARNING, |
| 109 |
120 |
| 110 self.__decode_config(cfg) |
121 self.__decode_config(cfg) |
| 111 |
122 |
| 112 self.config=config |
123 self.config=config |
| 113 self.lastrun_when=None |
124 self.lastrun_when=None |
| 114 self.lastrun_errors=None |
|
| 115 self.borg_instance=None |
125 self.borg_instance=None |
| 116 self.current_operation=None |
126 self.current_operation=None |
| 117 self.thread_log=None |
127 self.thread_log=None |
| 118 self.thread_res=None |
128 self.thread_res=None |
| 119 self.timer=None |
129 self.timer=None |
| 120 self.scheduled_operation=None |
130 self.scheduled_operation=None |
| 121 self.lock=Lock() |
131 self.lock=Lock() |
| 122 self.__status_update_callback=None |
132 self.__status_update_callback=None |
| |
133 self.state=INACTIVE |
| 123 |
134 |
| 124 def is_running(self): |
135 def is_running(self): |
| 125 with self.lock: |
136 with self.lock: |
| 126 running=self.__is_running_unlocked() |
137 running=self.__is_running_unlocked() |
| 127 return running |
138 return running |
| 183 status['name']='borg' |
194 status['name']='borg' |
| 184 lvl=translate_loglevel(status['levelname']) |
195 lvl=translate_loglevel(status['levelname']) |
| 185 logger.log(lvl, status['name'] + ': ' + status['message']) |
196 logger.log(lvl, status['name'] + ': ' + status['message']) |
| 186 if lvl>=logging.WARNING: |
197 if lvl>=logging.WARNING: |
| 187 errors_this_message=status |
198 errors_this_message=status |
| |
199 state=ERRORS |
| |
200 if ('msgid' in status and |
| |
201 (status['msgid']=='LockTimeout' or # observed in reality |
| |
202 status['msgid']=='LockErrorT' or # in docs |
| |
203 status['msgid']=='LockErrorT')): # in docs |
| |
204 state=BUSY |
| 188 with self.lock: |
205 with self.lock: |
| 189 self.current_operation['errors']=True |
206 self.state=combine_state(self.state, state) |
| 190 status, callback=self.__status_unlocked() |
207 status, callback=self.__status_unlocked() |
| 191 else: |
208 else: |
| 192 logger.debug('Unrecognised log entry %s' % str(status)) |
209 logger.debug('Unrecognised log entry %s' % str(status)) |
| 193 |
210 |
| 194 if callback: |
211 if callback: |
| 208 |
225 |
| 209 logger.debug('Result listener thread waiting for result') |
226 logger.debug('Result listener thread waiting for result') |
| 210 |
227 |
| 211 res=self.borg_instance.read_result() |
228 res=self.borg_instance.read_result() |
| 212 |
229 |
| 213 errors=False |
230 # Finish processing remaining errors |
| |
231 self.thread_log.join() |
| |
232 |
| |
233 with self.lock: |
| |
234 state=self.state |
| |
235 |
| |
236 # If there were no errors, reset back to INACTIVE state |
| |
237 if state==ACTIVE: |
| |
238 state=INACTIVE |
| 214 |
239 |
| 215 logger.debug('Borg result: %s' % str(res)) |
240 logger.debug('Borg result: %s' % str(res)) |
| 216 |
241 |
| 217 if res is None: |
242 if res is None and state==INACTIVE: |
| 218 errors=True |
243 logger.error('No result from borg despite no error in log') |
| |
244 state=ERRORS |
| 219 |
245 |
| 220 logger.debug('Waiting for borg subprocess to terminate in result thread') |
246 logger.debug('Waiting for borg subprocess to terminate in result thread') |
| 221 |
247 |
| 222 errors=errors or not self.borg_instance.wait() |
248 if not self.borg_instance.wait(): |
| 223 |
249 logger.critical('Borg subprocess did not terminate') |
| 224 logger.debug('Borg subprocess terminated (errors: %s); terminating result listener thread' % str(errors)) |
250 state=combine_state(state, ERRORS) |
| 225 |
251 |
| 226 self.thread_log.join() |
252 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
| 227 |
253 |
| 228 with self.lock: |
254 with self.lock: |
| 229 if self.current_operation['operation']=='create': |
255 if self.current_operation['operation']=='create': |
| 230 self.lastrun_when=self.current_operation['when_monotonic'] |
256 self.lastrun_when=self.current_operation['when_monotonic'] |
| 231 self.lastrun_errors=errors |
|
| 232 self.thread_res=None |
257 self.thread_res=None |
| 233 self.thread_log=None |
258 self.thread_log=None |
| 234 self.borg_instance=None |
259 self.borg_instance=None |
| 235 self.current_operation=None |
260 self.current_operation=None |
| 236 self.__schedule_unlocked() |
261 self.__schedule_unlocked() |
| |
262 self.state=state |
| 237 status, callback=self.__status_unlocked() |
263 status, callback=self.__status_unlocked() |
| 238 if callback: |
264 if callback: |
| 239 callback(self, status) |
265 callback(self, status) |
| 240 |
266 |
| 241 def __do_launch(self, queue, op, archive_or_repository, *args): |
267 def __do_launch(self, queue, op, archive_or_repository, *args): |
| 256 self.thread_res=t_res |
282 self.thread_res=t_res |
| 257 self.borg_instance=inst |
283 self.borg_instance=inst |
| 258 self.queue=queue |
284 self.queue=queue |
| 259 self.current_operation=op |
285 self.current_operation=op |
| 260 self.current_operation['when_monotonic']=time.monotonic() |
286 self.current_operation['when_monotonic']=time.monotonic() |
| |
287 self.state=ACTIVE |
| 261 |
288 |
| 262 t_log.start() |
289 t_log.start() |
| 263 t_res.start() |
290 t_res.start() |
| 264 |
291 |
| 265 def __launch(self, op, queue): |
292 def __launch(self, op, queue): |
| 292 else: |
319 else: |
| 293 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
320 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
| 294 except Exception as err: |
321 except Exception as err: |
| 295 logger.debug('Rescheduling after failure') |
322 logger.debug('Rescheduling after failure') |
| 296 self.lastrun_when=time.monotonic() |
323 self.lastrun_when=time.monotonic() |
| 297 self.lastrun_errors=True |
324 self.state=ERRORS |
| 298 self.__schedule_unlocked() |
325 self.__schedule_unlocked() |
| 299 raise err |
326 raise err |
| 300 |
327 |
| 301 return True |
328 return True |
| 302 |
329 |
| 367 return None |
394 return None |
| 368 else: |
395 else: |
| 369 return {'operation': 'create', |
396 return {'operation': 'create', |
| 370 'detail': 'initial', |
397 'detail': 'initial', |
| 371 'when_monotonic': now+initial_interval} |
398 'when_monotonic': now+initial_interval} |
| 372 elif self.lastrun_errors: |
399 elif self.state>=BUSY: |
| 373 if self.retry_interval==0: |
400 if self.retry_interval==0: |
| 374 return None |
401 return None |
| 375 else: |
402 else: |
| 376 return {'operation': 'create', |
403 return {'operation': 'create', |
| 377 'detail': 'retry', |
404 'detail': 'retry', |
| 396 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
423 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
| 397 (op['operation'], op['detail'], self.name, delay)) |
424 (op['operation'], op['detail'], self.name, delay)) |
| 398 tmr=Timer(delay, self.__queue_timed_operation) |
425 tmr=Timer(delay, self.__queue_timed_operation) |
| 399 self.scheduled_operation=op |
426 self.scheduled_operation=op |
| 400 self.timer=tmr |
427 self.timer=tmr |
| |
428 self.state=combine_state(self.state, SCHEDULED) |
| 401 tmr.start() |
429 tmr.start() |
| 402 |
430 |
| 403 return op |
431 return op |
| 404 |
432 |
| 405 def schedule(self): |
433 def schedule(self): |
| 423 status=self.scheduled_operation |
451 status=self.scheduled_operation |
| 424 status['type']='scheduled' |
452 status['type']='scheduled' |
| 425 else: |
453 else: |
| 426 status={'type': 'nothing'} |
454 status={'type': 'nothing'} |
| 427 |
455 |
| 428 if self.lastrun_errors is not None: |
456 status['name']=self.name |
| 429 status['errors']=self.lastrun_errors |
457 status['state']=self.state |
| 430 |
458 |
| 431 if 'detail' not in status: |
459 if 'detail' not in status: |
| 432 status['detail']='NONE' |
460 status['detail']='NONE' |
| 433 |
|
| 434 if 'errors' not in status: |
|
| 435 status['errors']=False |
|
| 436 |
|
| 437 status['name']=self.name |
|
| 438 |
461 |
| 439 if 'when_monotonic' in status: |
462 if 'when_monotonic' in status: |
| 440 status['when']=(status['when_monotonic'] |
463 status['when']=(status['when_monotonic'] |
| 441 -time.monotonic()+time.time()) |
464 -time.monotonic()+time.time()) |
| 442 |
465 |