| 6 import logging |
6 import logging |
| 7 import time |
7 import time |
| 8 import keyring |
8 import keyring |
| 9 import borgend |
9 import borgend |
| 10 import repository |
10 import repository |
| |
11 from enum import IntEnum |
| 11 from instance import BorgInstance |
12 from instance import BorgInstance |
| 12 from threading import Thread, Lock, Condition |
13 from threading import Thread, Lock, Condition |
| 13 from scheduler import TerminableThread |
14 from scheduler import TerminableThread |
| 14 |
15 |
| 15 logger=borgend.logger.getChild(__name__) |
16 logger=borgend.logger.getChild(__name__) |
| 16 |
17 |
| 17 # State |
18 class State(IntEnum): |
| 18 INACTIVE=0 |
19 # State |
| 19 SCHEDULED=1 |
20 INACTIVE=0 |
| 20 QUEUED=2 |
21 SCHEDULED=1 |
| 21 ACTIVE=3 |
22 QUEUED=2 |
| 22 BUSY=4 |
23 ACTIVE=3 |
| 23 OFFLINE=5 |
24 |
| 24 ERRORS=6 |
25 |
| 25 |
26 class Errors(IntEnum): |
| 26 def combine_state(state1, state2): |
27 OK=0 |
| 27 return max(state1, state2) |
28 BUSY=1 |
| |
29 OFFLINE=2 |
| |
30 ERRORS=3 |
| |
31 |
| |
32 def combine(self, other): |
| |
33 return max(self, other) |
| |
34 |
| |
35 def ok(self): |
| |
36 return self==self.OK |
| |
37 |
| |
38 def __str__(self): |
| |
39 return _errorstring[self] |
| |
40 |
| |
41 _errorstring={ |
| |
42 Errors.OK: 'ok', |
| |
43 Errors.BUSY: 'busy', |
| |
44 Errors.OFFLINE: 'offline', |
| |
45 Errors.ERRORS: 'errors' |
| |
46 } |
| |
47 |
| |
48 def translate_loglevel(x): |
| |
49 if x in loglevel_translation: |
| |
50 return loglevel_translation[x] |
| |
51 else: |
| |
52 return logging.ERROR |
| |
53 |
| |
54 def safe_get_int(t, x): |
| |
55 if x in t: |
| |
56 tmp=t[x] |
| |
57 if isinstance(tmp, int): |
| |
58 return tmp |
| |
59 return None |
| 28 |
60 |
| 29 loglevel_translation={ |
61 loglevel_translation={ |
| 30 'CRITICAL': logging.CRITICAL, |
62 'CRITICAL': logging.CRITICAL, |
| 31 'ERROR': logging.ERROR, |
63 'ERROR': logging.ERROR, |
| 32 'WARNING': logging.WARNING, |
64 'WARNING': logging.WARNING, |
| 33 'DEBUG': logging.DEBUG, |
65 'DEBUG': logging.DEBUG, |
| 34 'INFO': logging.INFO |
66 'INFO': logging.INFO |
| 35 } |
67 } |
| 36 |
|
| 37 def translate_loglevel(x): |
|
| 38 if x in loglevel_translation: |
|
| 39 return loglevel_translation[x] |
|
| 40 else: |
|
| 41 return logging.ERROR |
|
| 42 |
|
| 43 def safe_get_int(t, x): |
|
| 44 if x in t: |
|
| 45 tmp=t[x] |
|
| 46 if isinstance(tmp, int): |
|
| 47 return tmp |
|
| 48 return None |
|
| 49 |
|
| 50 |
68 |
| 51 class Backup(TerminableThread): |
69 class Backup(TerminableThread): |
| 52 |
70 |
| 53 def __decode_config(self, cfg): |
71 def __decode_config(self, cfg): |
| 54 loc0='backup target %d' % self.identifier |
72 loc0='backup target %d' % self.identifier |
| 160 assert(not running) |
179 assert(not running) |
| 161 |
180 |
| 162 def __log_listener(self): |
181 def __log_listener(self): |
| 163 self.logger.debug('Log listener thread waiting for entries') |
182 self.logger.debug('Log listener thread waiting for entries') |
| 164 success=True |
183 success=True |
| 165 for status in iter(self.borg_instance.read_log, None): |
184 for msg in iter(self.borg_instance.read_log, None): |
| 166 self.logger.debug(str(status)) |
185 self.logger.debug(str(msg)) |
| 167 t=status['type'] |
186 t=msg['type'] |
| 168 |
187 |
| 169 errors_this_message=None |
188 errormsg=None |
| 170 callback=None |
189 callback=None |
| 171 |
190 |
| 172 if t=='progress_percent': |
191 if t=='progress_percent': |
| 173 current=safe_get_int(status, 'current') |
192 current=safe_get_int(msg, 'current') |
| 174 total=safe_get_int(status, 'total') |
193 total=safe_get_int(msg, 'total') |
| 175 if current is not None and total is not None: |
194 if current is not None and total is not None: |
| 176 with self._cond: |
195 with self._cond: |
| 177 self.current_operation['progress_current']=current |
196 self.current_operation['progress_current']=current |
| 178 self.current_operation['progress_total']=total |
197 self.current_operation['progress_total']=total |
| 179 status, callback=self.__status_unlocked() |
198 status, callback=self.__status_unlocked() |
| 180 |
199 |
| 181 elif t=='archive_progress': |
200 elif t=='archive_progress': |
| 182 original_size=safe_get_int(status, 'original_size') |
201 original_size=safe_get_int(msg, 'original_size') |
| 183 compressed_size=safe_get_int(status, 'compressed_size') |
202 compressed_size=safe_get_int(msg, 'compressed_size') |
| 184 deduplicated_size=safe_get_int(status, 'deduplicated_size') |
203 deduplicated_size=safe_get_int(msg, 'deduplicated_size') |
| 185 if original_size is not None and original_size is not None and deduplicated_size is not None: |
204 if original_size is not None and original_size is not None and deduplicated_size is not None: |
| 186 with self._cond: |
205 with self._cond: |
| 187 self.current_operation['original_size']=original_size |
206 self.current_operation['original_size']=original_size |
| 188 self.current_operation['compressed_size']=compressed_size |
207 self.current_operation['compressed_size']=compressed_size |
| 189 self.current_operation['deduplicated_size']=deduplicated_size |
208 self.current_operation['deduplicated_size']=deduplicated_size |
| 194 |
213 |
| 195 elif t=='file_status': |
214 elif t=='file_status': |
| 196 pass |
215 pass |
| 197 |
216 |
| 198 elif t=='log_message': |
217 elif t=='log_message': |
| 199 if 'levelname' not in status: |
218 if 'levelname' not in msg: |
| 200 status['levelname']='ERROR' |
219 msg['levelname']='ERROR' |
| 201 if 'message' not in status: |
220 if 'message' not in msg: |
| 202 status['message']='UNKNOWN' |
221 msg['message']='UNKNOWN' |
| 203 if 'name' not in status: |
222 if 'name' not in msg: |
| 204 status['name']='borg' |
223 msg['name']='borg' |
| 205 lvl=translate_loglevel(status['levelname']) |
224 lvl=translate_loglevel(msg['levelname']) |
| 206 self.logger.log(lvl, status['name'] + ': ' + status['message']) |
225 self.logger.log(lvl, msg['name'] + ': ' + msg['message']) |
| 207 if lvl>=logging.WARNING: |
226 if lvl>=logging.WARNING: |
| 208 errors_this_message=status |
227 errormsg=msg |
| 209 state=ERRORS |
228 errors=Errors.ERRORS |
| 210 if ('msgid' in status and |
229 if ('msgid' in msg and |
| 211 (status['msgid']=='LockTimeout' or # observed in reality |
230 (msg['msgid']=='LockTimeout' or # observed in reality |
| 212 status['msgid']=='LockErrorT' or # in docs |
231 msg['msgid']=='LockErrorT' or # in docs |
| 213 status['msgid']=='LockErrorT')): # in docs |
232 msg['msgid']=='LockErrorT')): # in docs |
| 214 state=BUSY |
233 errors=Errors.BUSY |
| 215 with self._cond: |
234 with self._cond: |
| 216 self.state=combine_state(self.state, state) |
235 self.errors=self.errors.combine(errors) |
| 217 status, callback=self.__status_unlocked() |
236 status, callback=self.__status_unlocked() |
| 218 else: |
237 else: |
| 219 self.logger.debug('Unrecognised log entry %s' % str(status)) |
238 self.logger.debug('Unrecognised log entry %s' % str(status)) |
| 220 |
239 |
| 221 if callback: |
240 if callback: |
| 222 callback(status, errors=errors_this_message) |
241 callback(status, errors=errormsg) |
| 223 |
242 |
| 224 self.logger.debug('Waiting for borg subprocess to terminate in log thread') |
243 self.logger.debug('Waiting for borg subprocess to terminate in log thread') |
| 225 |
244 |
| 226 self.borg_instance.wait() |
245 self.borg_instance.wait() |
| 227 |
246 |
| 228 self.logger.debug('Borg subprocess terminated; terminating log listener thread') |
247 self.logger.debug('Borg subprocess terminated; terminating log listener thread') |
| 229 |
248 |
| 230 def __result_listener(self): |
249 def __result_listener(self): |
| 231 # self.state=ACTIVE |
|
| 232 # with self._cond: |
|
| 233 # status, callback=self.__status_unlocked() |
|
| 234 # if callback: |
|
| 235 # callback(status) |
|
| 236 |
|
| 237 self.logger.debug('Result listener thread waiting for result') |
250 self.logger.debug('Result listener thread waiting for result') |
| 238 |
251 |
| 239 res=self.borg_instance.read_result() |
252 res=self.borg_instance.read_result() |
| 240 |
253 |
| 241 # Finish processing remaining errors |
254 # Finish processing remaining errors |
| 242 self.thread_log.join() |
255 self.thread_log.join() |
| 243 |
256 |
| 244 with self._cond: |
257 with self._cond: |
| 245 state=self.state |
258 errors=self.errors |
| 246 |
|
| 247 # If there were no errors, reset back to INACTIVE state |
|
| 248 if state==ACTIVE: |
|
| 249 state=INACTIVE |
|
| 250 |
259 |
| 251 self.logger.debug('Borg result: %s' % str(res)) |
260 self.logger.debug('Borg result: %s' % str(res)) |
| 252 |
261 |
| 253 if res is None and state==INACTIVE: |
262 if res is None: |
| 254 self.logger.error('No result from borg despite no error in log') |
263 self.logger.error('No result from borg despite no error in log') |
| 255 state=ERRORS |
264 if errors.ok(): |
| |
265 errors=Errors.ERRORS |
| 256 |
266 |
| 257 self.logger.debug('Waiting for borg subprocess to terminate in result thread') |
267 self.logger.debug('Waiting for borg subprocess to terminate in result thread') |
| 258 |
268 |
| 259 if not self.borg_instance.wait(): |
269 if not self.borg_instance.wait(): |
| 260 self.logger.error('Borg subprocess did not terminate') |
270 self.logger.error('Borg subprocess did not terminate') |
| 261 state=combine_state(state, ERRORS) |
271 if errors.ok(): |
| 262 |
272 errors=Errors.ERRORS |
| 263 self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
273 |
| |
274 self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors)) |
| 264 |
275 |
| 265 with self._cond: |
276 with self._cond: |
| 266 if self.current_operation['operation']=='create': |
277 if self.current_operation['operation']=='create': |
| 267 self.lastrun_when=self.current_operation['when_monotonic'] |
278 self.lastrun_when=self.current_operation['when_monotonic'] |
| 268 self.thread_res=None |
279 self.thread_res=None |
| 269 self.thread_log=None |
280 self.thread_log=None |
| 270 self.borg_instance=None |
281 self.borg_instance=None |
| 271 self.current_operation=None |
282 self.current_operation=None |
| 272 self.state=state |
283 self.state=State.INACTIVE |
| |
284 self.errors=errors |
| 273 self.__update_status() |
285 self.__update_status() |
| 274 self._cond.notify() |
286 self._cond.notify() |
| 275 |
287 |
| 276 def __do_launch(self, op, archive_or_repository, *args): |
288 def __do_launch(self, op, archive_or_repository, *args): |
| 277 passphrase=self.extract_passphrase() |
289 passphrase=self.extract_passphrase() |
| 337 self.__main_thread_wait_schedule() |
352 self.__main_thread_wait_schedule() |
| 338 if not self._terminate: |
353 if not self._terminate: |
| 339 self.__main_thread_queue_and_launch() |
354 self.__main_thread_queue_and_launch() |
| 340 except Exception as err: |
355 except Exception as err: |
| 341 self.logger.exception("Error with backup '%s'" % self._name) |
356 self.logger.exception("Error with backup '%s'" % self._name) |
| 342 self.lastrun_when=time.monotonic() |
357 self.errors=Errors.ERRORS |
| 343 self.state=ERRORS |
358 |
| 344 self.scheduled_operation=None |
359 self.state=State.INACTIVE |
| |
360 self.scheduled_operation=None |
| 345 |
361 |
| 346 # Clean up to terminate: kill borg instance and communication threads |
362 # Clean up to terminate: kill borg instance and communication threads |
| 347 if self.borg_instance: |
363 if self.borg_instance: |
| 348 self.logger.debug("Terminating a borg instance") |
364 self.logger.debug("Terminating a borg instance") |
| 349 self.borg_instance.terminate() |
365 self.borg_instance.terminate() |
| 350 |
366 |
| 351 # Store threads to use outside lock |
367 # Store threads to use outside lock |
| 352 thread_log=self.thread_log |
368 thread_log=self.thread_log |
| 353 thread_res=self.thread_res |
369 thread_res=self.thread_res |
| |
370 self.thread_log=None |
| |
371 self.thread_res=None |
| 354 |
372 |
| 355 self.logger.debug("Waiting for log and result threads to terminate") |
373 self.logger.debug("Waiting for log and result threads to terminate") |
| 356 |
374 |
| 357 if thread_log: |
375 if thread_log: |
| 358 thread_log.join() |
376 thread_log.join() |
| 378 delay=max(0, op['when_monotonic']-now) |
396 delay=max(0, op['when_monotonic']-now) |
| 379 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % |
397 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % |
| 380 (op['operation'], op['detail'], delay)) |
398 (op['operation'], op['detail'], delay)) |
| 381 |
399 |
| 382 self.scheduled_operation=op |
400 self.scheduled_operation=op |
| 383 self.state=combine_state(self.state, SCHEDULED) |
401 self.state=State.SCHEDULED |
| 384 self.__update_status() |
402 self.__update_status() |
| 385 |
403 |
| 386 # Wait under scheduled wait |
404 # Wait under scheduled wait |
| 387 self.scheduler.wait_until(now+delay, self._cond, self._name) |
405 self.scheduler.wait_until(now+delay, self._cond, self._name) |
| 388 else: |
406 else: |
| 389 # Nothing scheduled - just wait |
407 # Nothing scheduled - just wait |
| 390 self.logger.debug("Waiting for manual scheduling") |
408 self.logger.info("Waiting for manual scheduling") |
| |
409 |
| |
410 self.state=State.INACTIVE |
| |
411 self.__update_status() |
| |
412 |
| 391 self._cond.wait() |
413 self._cond.wait() |
| 392 |
414 |
| 393 # Main thread/3. If there is a scheduled operation (it might have been |
415 # Main thread/3. If there is a scheduled operation (it might have been |
| 394 # changed manually from 'op' created in __main_thread_wait_schedule above), |
416 # changed manually from 'op' created in __main_thread_wait_schedule above), |
| 395 # queue it on the repository, and launch the operation once repository |
417 # queue it on the repository, and launch the operation once repository |
| 396 # available |
418 # available |
| 397 def __main_thread_queue_and_launch(self): |
419 def __main_thread_queue_and_launch(self): |
| 398 if self.scheduled_operation: |
420 if self.scheduled_operation: |
| 399 self.logger.debug("Queuing") |
421 self.logger.debug("Queuing") |
| 400 self.state=combine_state(self.state, QUEUED) |
422 self.state=State.QUEUED |
| 401 self.__update_status() |
423 self.__update_status() |
| 402 self.repository.queue_action(self._cond, |
424 self.repository.queue_action(self._cond, |
| 403 action=self.__launch_check, |
425 action=self.__launch_check, |
| 404 name=self._name) |
426 name=self._name) |
| 405 |
427 |
| 434 def __status_unlocked(self): |
456 def __status_unlocked(self): |
| 435 callback=self.__status_update_callback |
457 callback=self.__status_update_callback |
| 436 |
458 |
| 437 if self.current_operation: |
459 if self.current_operation: |
| 438 status=self.current_operation |
460 status=self.current_operation |
| 439 status['type']='current' |
461 elif self.scheduled_operation: |
| 440 # Errors should be set by listeners |
462 status=self.scheduled_operation |
| 441 else: |
463 else: |
| 442 if self.scheduled_operation: |
464 status={'type': 'nothing'} |
| 443 status=self.scheduled_operation |
|
| 444 if self.state==QUEUED: |
|
| 445 status['type']='queued' |
|
| 446 else: |
|
| 447 status['type']='scheduled' |
|
| 448 else: |
|
| 449 status={'type': 'nothing'} |
|
| 450 |
465 |
| 451 status['name']=self._name |
466 status['name']=self._name |
| 452 status['state']=self.state |
467 status['state']=self.state |
| |
468 status['errors']=self.errors |
| 453 |
469 |
| 454 if 'detail' not in status: |
470 if 'detail' not in status: |
| 455 status['detail']='NONE' |
471 status['detail']='NONE' |
| 456 |
472 |
| 457 if 'when_monotonic' in status: |
473 if 'when_monotonic' in status: |