10 from instance import BorgInstance |
10 from instance import BorgInstance |
11 from queue import Queue |
11 from queue import Queue |
12 from threading import Thread, Lock, Timer |
12 from threading import Thread, Lock, Timer |
13 |
13 |
14 logger=borgend.logger.getChild(__name__) |
14 logger=borgend.logger.getChild(__name__) |
|
15 |
|
16 # State |
|
17 INACTIVE=0 |
|
18 SCHEDULED=1 |
|
19 ACTIVE=2 |
|
20 BUSY=3 |
|
21 OFFLINE=4 |
|
22 ERRORS=5 |
|
23 |
|
24 def combine_state(state1, state2): |
|
25 return max(state1, state2) |
15 |
26 |
16 loglevel_translation={ |
27 loglevel_translation={ |
17 'CRITICAL': logging.CRITICAL, |
28 'CRITICAL': logging.CRITICAL, |
18 'ERROR': logging.ERROR, |
29 'ERROR': logging.ERROR, |
19 'WARNING': logging.WARNING, |
30 'WARNING': logging.WARNING, |
109 |
120 |
110 self.__decode_config(cfg) |
121 self.__decode_config(cfg) |
111 |
122 |
112 self.config=config |
123 self.config=config |
113 self.lastrun_when=None |
124 self.lastrun_when=None |
114 self.lastrun_errors=None |
|
115 self.borg_instance=None |
125 self.borg_instance=None |
116 self.current_operation=None |
126 self.current_operation=None |
117 self.thread_log=None |
127 self.thread_log=None |
118 self.thread_res=None |
128 self.thread_res=None |
119 self.timer=None |
129 self.timer=None |
120 self.scheduled_operation=None |
130 self.scheduled_operation=None |
121 self.lock=Lock() |
131 self.lock=Lock() |
122 self.__status_update_callback=None |
132 self.__status_update_callback=None |
|
133 self.state=INACTIVE |
123 |
134 |
124 def is_running(self): |
135 def is_running(self): |
125 with self.lock: |
136 with self.lock: |
126 running=self.__is_running_unlocked() |
137 running=self.__is_running_unlocked() |
127 return running |
138 return running |
183 status['name']='borg' |
194 status['name']='borg' |
184 lvl=translate_loglevel(status['levelname']) |
195 lvl=translate_loglevel(status['levelname']) |
185 logger.log(lvl, status['name'] + ': ' + status['message']) |
196 logger.log(lvl, status['name'] + ': ' + status['message']) |
186 if lvl>=logging.WARNING: |
197 if lvl>=logging.WARNING: |
187 errors_this_message=status |
198 errors_this_message=status |
|
199 state=ERRORS |
|
200 if ('msgid' in status and |
|
201 (status['msgid']=='LockTimeout' or # observed in reality |
|
202 status['msgid']=='LockErrorT' or # in docs |
|
203 status['msgid']=='LockErrorT')): # in docs |
|
204 state=BUSY |
188 with self.lock: |
205 with self.lock: |
189 self.current_operation['errors']=True |
206 self.state=combine_state(self.state, state) |
190 status, callback=self.__status_unlocked() |
207 status, callback=self.__status_unlocked() |
191 else: |
208 else: |
192 logger.debug('Unrecognised log entry %s' % str(status)) |
209 logger.debug('Unrecognised log entry %s' % str(status)) |
193 |
210 |
194 if callback: |
211 if callback: |
208 |
225 |
209 logger.debug('Result listener thread waiting for result') |
226 logger.debug('Result listener thread waiting for result') |
210 |
227 |
211 res=self.borg_instance.read_result() |
228 res=self.borg_instance.read_result() |
212 |
229 |
213 errors=False |
230 # Finish processing remaining errors |
|
231 self.thread_log.join() |
|
232 |
|
233 with self.lock: |
|
234 state=self.state |
|
235 |
|
236 # If there were no errors, reset back to INACTIVE state |
|
237 if state==ACTIVE: |
|
238 state=INACTIVE |
214 |
239 |
215 logger.debug('Borg result: %s' % str(res)) |
240 logger.debug('Borg result: %s' % str(res)) |
216 |
241 |
217 if res is None: |
242 if res is None and state==INACTIVE: |
218 errors=True |
243 logger.error('No result from borg despite no error in log') |
|
244 state=ERRORS |
219 |
245 |
220 logger.debug('Waiting for borg subprocess to terminate in result thread') |
246 logger.debug('Waiting for borg subprocess to terminate in result thread') |
221 |
247 |
222 errors=errors or not self.borg_instance.wait() |
248 if not self.borg_instance.wait(): |
223 |
249 logger.critical('Borg subprocess did not terminate') |
224 logger.debug('Borg subprocess terminated (errors: %s); terminating result listener thread' % str(errors)) |
250 state=combine_state(state, ERRORS) |
225 |
251 |
226 self.thread_log.join() |
252 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
227 |
253 |
228 with self.lock: |
254 with self.lock: |
229 if self.current_operation['operation']=='create': |
255 if self.current_operation['operation']=='create': |
230 self.lastrun_when=self.current_operation['when_monotonic'] |
256 self.lastrun_when=self.current_operation['when_monotonic'] |
231 self.lastrun_errors=errors |
|
232 self.thread_res=None |
257 self.thread_res=None |
233 self.thread_log=None |
258 self.thread_log=None |
234 self.borg_instance=None |
259 self.borg_instance=None |
235 self.current_operation=None |
260 self.current_operation=None |
236 self.__schedule_unlocked() |
261 self.__schedule_unlocked() |
|
262 self.state=state |
237 status, callback=self.__status_unlocked() |
263 status, callback=self.__status_unlocked() |
238 if callback: |
264 if callback: |
239 callback(self, status) |
265 callback(self, status) |
240 |
266 |
241 def __do_launch(self, queue, op, archive_or_repository, *args): |
267 def __do_launch(self, queue, op, archive_or_repository, *args): |
256 self.thread_res=t_res |
282 self.thread_res=t_res |
257 self.borg_instance=inst |
283 self.borg_instance=inst |
258 self.queue=queue |
284 self.queue=queue |
259 self.current_operation=op |
285 self.current_operation=op |
260 self.current_operation['when_monotonic']=time.monotonic() |
286 self.current_operation['when_monotonic']=time.monotonic() |
|
287 self.state=ACTIVE |
261 |
288 |
262 t_log.start() |
289 t_log.start() |
263 t_res.start() |
290 t_res.start() |
264 |
291 |
265 def __launch(self, op, queue): |
292 def __launch(self, op, queue): |
292 else: |
319 else: |
293 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
320 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
294 except Exception as err: |
321 except Exception as err: |
295 logger.debug('Rescheduling after failure') |
322 logger.debug('Rescheduling after failure') |
296 self.lastrun_when=time.monotonic() |
323 self.lastrun_when=time.monotonic() |
297 self.lastrun_errors=True |
324 self.state=ERRORS |
298 self.__schedule_unlocked() |
325 self.__schedule_unlocked() |
299 raise err |
326 raise err |
300 |
327 |
301 return True |
328 return True |
302 |
329 |
367 return None |
394 return None |
368 else: |
395 else: |
369 return {'operation': 'create', |
396 return {'operation': 'create', |
370 'detail': 'initial', |
397 'detail': 'initial', |
371 'when_monotonic': now+initial_interval} |
398 'when_monotonic': now+initial_interval} |
372 elif self.lastrun_errors: |
399 elif self.state>=BUSY: |
373 if self.retry_interval==0: |
400 if self.retry_interval==0: |
374 return None |
401 return None |
375 else: |
402 else: |
376 return {'operation': 'create', |
403 return {'operation': 'create', |
377 'detail': 'retry', |
404 'detail': 'retry', |
396 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
423 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
397 (op['operation'], op['detail'], self.name, delay)) |
424 (op['operation'], op['detail'], self.name, delay)) |
398 tmr=Timer(delay, self.__queue_timed_operation) |
425 tmr=Timer(delay, self.__queue_timed_operation) |
399 self.scheduled_operation=op |
426 self.scheduled_operation=op |
400 self.timer=tmr |
427 self.timer=tmr |
|
428 self.state=combine_state(self.state, SCHEDULED) |
401 tmr.start() |
429 tmr.start() |
402 |
430 |
403 return op |
431 return op |
404 |
432 |
405 def schedule(self): |
433 def schedule(self): |
423 status=self.scheduled_operation |
451 status=self.scheduled_operation |
424 status['type']='scheduled' |
452 status['type']='scheduled' |
425 else: |
453 else: |
426 status={'type': 'nothing'} |
454 status={'type': 'nothing'} |
427 |
455 |
428 if self.lastrun_errors is not None: |
456 status['name']=self.name |
429 status['errors']=self.lastrun_errors |
457 status['state']=self.state |
430 |
458 |
431 if 'detail' not in status: |
459 if 'detail' not in status: |
432 status['detail']='NONE' |
460 status['detail']='NONE' |
433 |
|
434 if 'errors' not in status: |
|
435 status['errors']=False |
|
436 |
|
437 status['name']=self.name |
|
438 |
461 |
439 if 'when_monotonic' in status: |
462 if 'when_monotonic' in status: |
440 status['when']=(status['when_monotonic'] |
463 status['when']=(status['when_monotonic'] |
441 -time.monotonic()+time.time()) |
464 -time.monotonic()+time.time()) |
442 |
465 |