6 import logging |
6 import logging |
7 import time |
7 import time |
8 import keyring |
8 import keyring |
9 import borgend |
9 import borgend |
10 import repository |
10 import repository |
|
11 from enum import IntEnum |
11 from instance import BorgInstance |
12 from instance import BorgInstance |
12 from threading import Thread, Lock, Condition |
13 from threading import Thread, Lock, Condition |
13 from scheduler import TerminableThread |
14 from scheduler import TerminableThread |
14 |
15 |
15 logger=borgend.logger.getChild(__name__) |
16 logger=borgend.logger.getChild(__name__) |
16 |
17 |
17 # State |
18 class State(IntEnum): |
18 INACTIVE=0 |
19 # State |
19 SCHEDULED=1 |
20 INACTIVE=0 |
20 QUEUED=2 |
21 SCHEDULED=1 |
21 ACTIVE=3 |
22 QUEUED=2 |
22 BUSY=4 |
23 ACTIVE=3 |
23 OFFLINE=5 |
24 |
24 ERRORS=6 |
25 |
25 |
26 class Errors(IntEnum): |
26 def combine_state(state1, state2): |
27 OK=0 |
27 return max(state1, state2) |
28 BUSY=1 |
|
29 OFFLINE=2 |
|
30 ERRORS=3 |
|
31 |
|
32 def combine(self, other): |
|
33 return max(self, other) |
|
34 |
|
35 def ok(self): |
|
36 return self==self.OK |
|
37 |
|
38 def __str__(self): |
|
39 return _errorstring[self] |
|
40 |
|
41 _errorstring={ |
|
42 Errors.OK: 'ok', |
|
43 Errors.BUSY: 'busy', |
|
44 Errors.OFFLINE: 'offline', |
|
45 Errors.ERRORS: 'errors' |
|
46 } |
|
47 |
|
48 def translate_loglevel(x): |
|
49 if x in loglevel_translation: |
|
50 return loglevel_translation[x] |
|
51 else: |
|
52 return logging.ERROR |
|
53 |
|
54 def safe_get_int(t, x): |
|
55 if x in t: |
|
56 tmp=t[x] |
|
57 if isinstance(tmp, int): |
|
58 return tmp |
|
59 return None |
28 |
60 |
29 loglevel_translation={ |
61 loglevel_translation={ |
30 'CRITICAL': logging.CRITICAL, |
62 'CRITICAL': logging.CRITICAL, |
31 'ERROR': logging.ERROR, |
63 'ERROR': logging.ERROR, |
32 'WARNING': logging.WARNING, |
64 'WARNING': logging.WARNING, |
33 'DEBUG': logging.DEBUG, |
65 'DEBUG': logging.DEBUG, |
34 'INFO': logging.INFO |
66 'INFO': logging.INFO |
35 } |
67 } |
36 |
|
37 def translate_loglevel(x): |
|
38 if x in loglevel_translation: |
|
39 return loglevel_translation[x] |
|
40 else: |
|
41 return logging.ERROR |
|
42 |
|
43 def safe_get_int(t, x): |
|
44 if x in t: |
|
45 tmp=t[x] |
|
46 if isinstance(tmp, int): |
|
47 return tmp |
|
48 return None |
|
49 |
|
50 |
68 |
51 class Backup(TerminableThread): |
69 class Backup(TerminableThread): |
52 |
70 |
53 def __decode_config(self, cfg): |
71 def __decode_config(self, cfg): |
54 loc0='backup target %d' % self.identifier |
72 loc0='backup target %d' % self.identifier |
160 assert(not running) |
179 assert(not running) |
161 |
180 |
162 def __log_listener(self): |
181 def __log_listener(self): |
163 self.logger.debug('Log listener thread waiting for entries') |
182 self.logger.debug('Log listener thread waiting for entries') |
164 success=True |
183 success=True |
165 for status in iter(self.borg_instance.read_log, None): |
184 for msg in iter(self.borg_instance.read_log, None): |
166 self.logger.debug(str(status)) |
185 self.logger.debug(str(msg)) |
167 t=status['type'] |
186 t=msg['type'] |
168 |
187 |
169 errors_this_message=None |
188 errormsg=None |
170 callback=None |
189 callback=None |
171 |
190 |
172 if t=='progress_percent': |
191 if t=='progress_percent': |
173 current=safe_get_int(status, 'current') |
192 current=safe_get_int(msg, 'current') |
174 total=safe_get_int(status, 'total') |
193 total=safe_get_int(msg, 'total') |
175 if current is not None and total is not None: |
194 if current is not None and total is not None: |
176 with self._cond: |
195 with self._cond: |
177 self.current_operation['progress_current']=current |
196 self.current_operation['progress_current']=current |
178 self.current_operation['progress_total']=total |
197 self.current_operation['progress_total']=total |
179 status, callback=self.__status_unlocked() |
198 status, callback=self.__status_unlocked() |
180 |
199 |
181 elif t=='archive_progress': |
200 elif t=='archive_progress': |
182 original_size=safe_get_int(status, 'original_size') |
201 original_size=safe_get_int(msg, 'original_size') |
183 compressed_size=safe_get_int(status, 'compressed_size') |
202 compressed_size=safe_get_int(msg, 'compressed_size') |
184 deduplicated_size=safe_get_int(status, 'deduplicated_size') |
203 deduplicated_size=safe_get_int(msg, 'deduplicated_size') |
185 if original_size is not None and original_size is not None and deduplicated_size is not None: |
204 if original_size is not None and original_size is not None and deduplicated_size is not None: |
186 with self._cond: |
205 with self._cond: |
187 self.current_operation['original_size']=original_size |
206 self.current_operation['original_size']=original_size |
188 self.current_operation['compressed_size']=compressed_size |
207 self.current_operation['compressed_size']=compressed_size |
189 self.current_operation['deduplicated_size']=deduplicated_size |
208 self.current_operation['deduplicated_size']=deduplicated_size |
194 |
213 |
195 elif t=='file_status': |
214 elif t=='file_status': |
196 pass |
215 pass |
197 |
216 |
198 elif t=='log_message': |
217 elif t=='log_message': |
199 if 'levelname' not in status: |
218 if 'levelname' not in msg: |
200 status['levelname']='ERROR' |
219 msg['levelname']='ERROR' |
201 if 'message' not in status: |
220 if 'message' not in msg: |
202 status['message']='UNKNOWN' |
221 msg['message']='UNKNOWN' |
203 if 'name' not in status: |
222 if 'name' not in msg: |
204 status['name']='borg' |
223 msg['name']='borg' |
205 lvl=translate_loglevel(status['levelname']) |
224 lvl=translate_loglevel(msg['levelname']) |
206 self.logger.log(lvl, status['name'] + ': ' + status['message']) |
225 self.logger.log(lvl, msg['name'] + ': ' + msg['message']) |
207 if lvl>=logging.WARNING: |
226 if lvl>=logging.WARNING: |
208 errors_this_message=status |
227 errormsg=msg |
209 state=ERRORS |
228 errors=Errors.ERRORS |
210 if ('msgid' in status and |
229 if ('msgid' in msg and |
211 (status['msgid']=='LockTimeout' or # observed in reality |
230 (msg['msgid']=='LockTimeout' or # observed in reality |
212 status['msgid']=='LockErrorT' or # in docs |
231 msg['msgid']=='LockErrorT' or # in docs |
213 status['msgid']=='LockErrorT')): # in docs |
232 msg['msgid']=='LockErrorT')): # in docs |
214 state=BUSY |
233 errors=Errors.BUSY |
215 with self._cond: |
234 with self._cond: |
216 self.state=combine_state(self.state, state) |
235 self.errors=self.errors.combine(errors) |
217 status, callback=self.__status_unlocked() |
236 status, callback=self.__status_unlocked() |
218 else: |
237 else: |
219 self.logger.debug('Unrecognised log entry %s' % str(status)) |
238 self.logger.debug('Unrecognised log entry %s' % str(status)) |
220 |
239 |
221 if callback: |
240 if callback: |
222 callback(status, errors=errors_this_message) |
241 callback(status, errors=errormsg) |
223 |
242 |
224 self.logger.debug('Waiting for borg subprocess to terminate in log thread') |
243 self.logger.debug('Waiting for borg subprocess to terminate in log thread') |
225 |
244 |
226 self.borg_instance.wait() |
245 self.borg_instance.wait() |
227 |
246 |
228 self.logger.debug('Borg subprocess terminated; terminating log listener thread') |
247 self.logger.debug('Borg subprocess terminated; terminating log listener thread') |
229 |
248 |
230 def __result_listener(self): |
249 def __result_listener(self): |
231 # self.state=ACTIVE |
|
232 # with self._cond: |
|
233 # status, callback=self.__status_unlocked() |
|
234 # if callback: |
|
235 # callback(status) |
|
236 |
|
237 self.logger.debug('Result listener thread waiting for result') |
250 self.logger.debug('Result listener thread waiting for result') |
238 |
251 |
239 res=self.borg_instance.read_result() |
252 res=self.borg_instance.read_result() |
240 |
253 |
241 # Finish processing remaining errors |
254 # Finish processing remaining errors |
242 self.thread_log.join() |
255 self.thread_log.join() |
243 |
256 |
244 with self._cond: |
257 with self._cond: |
245 state=self.state |
258 errors=self.errors |
246 |
|
247 # If there were no errors, reset back to INACTIVE state |
|
248 if state==ACTIVE: |
|
249 state=INACTIVE |
|
250 |
259 |
251 self.logger.debug('Borg result: %s' % str(res)) |
260 self.logger.debug('Borg result: %s' % str(res)) |
252 |
261 |
253 if res is None and state==INACTIVE: |
262 if res is None: |
254 self.logger.error('No result from borg despite no error in log') |
263 self.logger.error('No result from borg despite no error in log') |
255 state=ERRORS |
264 if errors.ok(): |
|
265 errors=Errors.ERRORS |
256 |
266 |
257 self.logger.debug('Waiting for borg subprocess to terminate in result thread') |
267 self.logger.debug('Waiting for borg subprocess to terminate in result thread') |
258 |
268 |
259 if not self.borg_instance.wait(): |
269 if not self.borg_instance.wait(): |
260 self.logger.error('Borg subprocess did not terminate') |
270 self.logger.error('Borg subprocess did not terminate') |
261 state=combine_state(state, ERRORS) |
271 if errors.ok(): |
262 |
272 errors=Errors.ERRORS |
263 self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
273 |
|
274 self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors)) |
264 |
275 |
265 with self._cond: |
276 with self._cond: |
266 if self.current_operation['operation']=='create': |
277 if self.current_operation['operation']=='create': |
267 self.lastrun_when=self.current_operation['when_monotonic'] |
278 self.lastrun_when=self.current_operation['when_monotonic'] |
268 self.thread_res=None |
279 self.thread_res=None |
269 self.thread_log=None |
280 self.thread_log=None |
270 self.borg_instance=None |
281 self.borg_instance=None |
271 self.current_operation=None |
282 self.current_operation=None |
272 self.state=state |
283 self.state=State.INACTIVE |
|
284 self.errors=errors |
273 self.__update_status() |
285 self.__update_status() |
274 self._cond.notify() |
286 self._cond.notify() |
275 |
287 |
276 def __do_launch(self, op, archive_or_repository, *args): |
288 def __do_launch(self, op, archive_or_repository, *args): |
277 passphrase=self.extract_passphrase() |
289 passphrase=self.extract_passphrase() |
337 self.__main_thread_wait_schedule() |
352 self.__main_thread_wait_schedule() |
338 if not self._terminate: |
353 if not self._terminate: |
339 self.__main_thread_queue_and_launch() |
354 self.__main_thread_queue_and_launch() |
340 except Exception as err: |
355 except Exception as err: |
341 self.logger.exception("Error with backup '%s'" % self._name) |
356 self.logger.exception("Error with backup '%s'" % self._name) |
342 self.lastrun_when=time.monotonic() |
357 self.errors=Errors.ERRORS |
343 self.state=ERRORS |
358 |
344 self.scheduled_operation=None |
359 self.state=State.INACTIVE |
|
360 self.scheduled_operation=None |
345 |
361 |
346 # Clean up to terminate: kill borg instance and communication threads |
362 # Clean up to terminate: kill borg instance and communication threads |
347 if self.borg_instance: |
363 if self.borg_instance: |
348 self.logger.debug("Terminating a borg instance") |
364 self.logger.debug("Terminating a borg instance") |
349 self.borg_instance.terminate() |
365 self.borg_instance.terminate() |
350 |
366 |
351 # Store threads to use outside lock |
367 # Store threads to use outside lock |
352 thread_log=self.thread_log |
368 thread_log=self.thread_log |
353 thread_res=self.thread_res |
369 thread_res=self.thread_res |
|
370 self.thread_log=None |
|
371 self.thread_res=None |
354 |
372 |
355 self.logger.debug("Waiting for log and result threads to terminate") |
373 self.logger.debug("Waiting for log and result threads to terminate") |
356 |
374 |
357 if thread_log: |
375 if thread_log: |
358 thread_log.join() |
376 thread_log.join() |
378 delay=max(0, op['when_monotonic']-now) |
396 delay=max(0, op['when_monotonic']-now) |
379 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % |
397 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % |
380 (op['operation'], op['detail'], delay)) |
398 (op['operation'], op['detail'], delay)) |
381 |
399 |
382 self.scheduled_operation=op |
400 self.scheduled_operation=op |
383 self.state=combine_state(self.state, SCHEDULED) |
401 self.state=State.SCHEDULED |
384 self.__update_status() |
402 self.__update_status() |
385 |
403 |
386 # Wait under scheduled wait |
404 # Wait under scheduled wait |
387 self.scheduler.wait_until(now+delay, self._cond, self._name) |
405 self.scheduler.wait_until(now+delay, self._cond, self._name) |
388 else: |
406 else: |
389 # Nothing scheduled - just wait |
407 # Nothing scheduled - just wait |
390 self.logger.debug("Waiting for manual scheduling") |
408 self.logger.info("Waiting for manual scheduling") |
|
409 |
|
410 self.state=State.INACTIVE |
|
411 self.__update_status() |
|
412 |
391 self._cond.wait() |
413 self._cond.wait() |
392 |
414 |
393 # Main thread/3. If there is a scheduled operation (it might have been |
415 # Main thread/3. If there is a scheduled operation (it might have been |
394 # changed manually from 'op' created in __main_thread_wait_schedule above), |
416 # changed manually from 'op' created in __main_thread_wait_schedule above), |
395 # queue it on the repository, and launch the operation once repository |
417 # queue it on the repository, and launch the operation once repository |
396 # available |
418 # available |
397 def __main_thread_queue_and_launch(self): |
419 def __main_thread_queue_and_launch(self): |
398 if self.scheduled_operation: |
420 if self.scheduled_operation: |
399 self.logger.debug("Queuing") |
421 self.logger.debug("Queuing") |
400 self.state=combine_state(self.state, QUEUED) |
422 self.state=State.QUEUED |
401 self.__update_status() |
423 self.__update_status() |
402 self.repository.queue_action(self._cond, |
424 self.repository.queue_action(self._cond, |
403 action=self.__launch_check, |
425 action=self.__launch_check, |
404 name=self._name) |
426 name=self._name) |
405 |
427 |
434 def __status_unlocked(self): |
456 def __status_unlocked(self): |
435 callback=self.__status_update_callback |
457 callback=self.__status_update_callback |
436 |
458 |
437 if self.current_operation: |
459 if self.current_operation: |
438 status=self.current_operation |
460 status=self.current_operation |
439 status['type']='current' |
461 elif self.scheduled_operation: |
440 # Errors should be set by listeners |
462 status=self.scheduled_operation |
441 else: |
463 else: |
442 if self.scheduled_operation: |
464 status={'type': 'nothing'} |
443 status=self.scheduled_operation |
|
444 if self.state==QUEUED: |
|
445 status['type']='queued' |
|
446 else: |
|
447 status['type']='scheduled' |
|
448 else: |
|
449 status={'type': 'nothing'} |
|
450 |
465 |
451 status['name']=self._name |
466 status['name']=self._name |
452 status['state']=self.state |
467 status['state']=self.state |
|
468 status['errors']=self.errors |
453 |
469 |
454 if 'detail' not in status: |
470 if 'detail' not in status: |
455 status['detail']='NONE' |
471 status['detail']='NONE' |
456 |
472 |
457 if 'when_monotonic' in status: |
473 if 'when_monotonic' in status: |