backup.py

changeset 61
bc6c3d74e6ea
parent 58
170d69da51bb
child 62
b7d13b2ad67e
equal deleted inserted replaced
60:10bd7e3906d9 61:bc6c3d74e6ea
6 import logging 6 import logging
7 import time 7 import time
8 import keyring 8 import keyring
9 import borgend 9 import borgend
10 import repository 10 import repository
11 from enum import IntEnum
11 from instance import BorgInstance 12 from instance import BorgInstance
12 from threading import Thread, Lock, Condition 13 from threading import Thread, Lock, Condition
13 from scheduler import TerminableThread 14 from scheduler import TerminableThread
14 15
15 logger=borgend.logger.getChild(__name__) 16 logger=borgend.logger.getChild(__name__)
16 17
17 # State 18 class State(IntEnum):
18 INACTIVE=0 19 # State
19 SCHEDULED=1 20 INACTIVE=0
20 QUEUED=2 21 SCHEDULED=1
21 ACTIVE=3 22 QUEUED=2
22 BUSY=4 23 ACTIVE=3
23 OFFLINE=5 24
24 ERRORS=6 25
25 26 class Errors(IntEnum):
26 def combine_state(state1, state2): 27 OK=0
27 return max(state1, state2) 28 BUSY=1
29 OFFLINE=2
30 ERRORS=3
31
32 def combine(self, other):
33 return max(self, other)
34
35 def ok(self):
36 return self==self.OK
37
38 def __str__(self):
39 return _errorstring[self]
40
41 _errorstring={
42 Errors.OK: 'ok',
43 Errors.BUSY: 'busy',
44 Errors.OFFLINE: 'offline',
45 Errors.ERRORS: 'errors'
46 }
47
48 def translate_loglevel(x):
49 if x in loglevel_translation:
50 return loglevel_translation[x]
51 else:
52 return logging.ERROR
53
54 def safe_get_int(t, x):
55 if x in t:
56 tmp=t[x]
57 if isinstance(tmp, int):
58 return tmp
59 return None
28 60
29 loglevel_translation={ 61 loglevel_translation={
30 'CRITICAL': logging.CRITICAL, 62 'CRITICAL': logging.CRITICAL,
31 'ERROR': logging.ERROR, 63 'ERROR': logging.ERROR,
32 'WARNING': logging.WARNING, 64 'WARNING': logging.WARNING,
33 'DEBUG': logging.DEBUG, 65 'DEBUG': logging.DEBUG,
34 'INFO': logging.INFO 66 'INFO': logging.INFO
35 } 67 }
36
37 def translate_loglevel(x):
38 if x in loglevel_translation:
39 return loglevel_translation[x]
40 else:
41 return logging.ERROR
42
43 def safe_get_int(t, x):
44 if x in t:
45 tmp=t[x]
46 if isinstance(tmp, int):
47 return tmp
48 return None
49
50 68
51 class Backup(TerminableThread): 69 class Backup(TerminableThread):
52 70
53 def __decode_config(self, cfg): 71 def __decode_config(self, cfg):
54 loc0='backup target %d' % self.identifier 72 loc0='backup target %d' % self.identifier
133 self.thread_res=None 151 self.thread_res=None
134 152
135 self.current_operation=None 153 self.current_operation=None
136 self.scheduled_operation=None 154 self.scheduled_operation=None
137 self.lastrun_when=None 155 self.lastrun_when=None
138 self.state=INACTIVE 156 self.state=State.INACTIVE
157 self.errors=Errors.OK
139 158
140 self.__decode_config(cfg) 159 self.__decode_config(cfg)
141 160
142 super().__init__(target = self.__main_thread, name = self._name) 161 super().__init__(target = self.__main_thread, name = self._name)
143 self.daemon=True 162 self.daemon=True
160 assert(not running) 179 assert(not running)
161 180
162 def __log_listener(self): 181 def __log_listener(self):
163 self.logger.debug('Log listener thread waiting for entries') 182 self.logger.debug('Log listener thread waiting for entries')
164 success=True 183 success=True
165 for status in iter(self.borg_instance.read_log, None): 184 for msg in iter(self.borg_instance.read_log, None):
166 self.logger.debug(str(status)) 185 self.logger.debug(str(msg))
167 t=status['type'] 186 t=msg['type']
168 187
169 errors_this_message=None 188 errormsg=None
170 callback=None 189 callback=None
171 190
172 if t=='progress_percent': 191 if t=='progress_percent':
173 current=safe_get_int(status, 'current') 192 current=safe_get_int(msg, 'current')
174 total=safe_get_int(status, 'total') 193 total=safe_get_int(msg, 'total')
175 if current is not None and total is not None: 194 if current is not None and total is not None:
176 with self._cond: 195 with self._cond:
177 self.current_operation['progress_current']=current 196 self.current_operation['progress_current']=current
178 self.current_operation['progress_total']=total 197 self.current_operation['progress_total']=total
179 status, callback=self.__status_unlocked() 198 status, callback=self.__status_unlocked()
180 199
181 elif t=='archive_progress': 200 elif t=='archive_progress':
182 original_size=safe_get_int(status, 'original_size') 201 original_size=safe_get_int(msg, 'original_size')
183 compressed_size=safe_get_int(status, 'compressed_size') 202 compressed_size=safe_get_int(msg, 'compressed_size')
184 deduplicated_size=safe_get_int(status, 'deduplicated_size') 203 deduplicated_size=safe_get_int(msg, 'deduplicated_size')
185 if original_size is not None and original_size is not None and deduplicated_size is not None: 204 if original_size is not None and original_size is not None and deduplicated_size is not None:
186 with self._cond: 205 with self._cond:
187 self.current_operation['original_size']=original_size 206 self.current_operation['original_size']=original_size
188 self.current_operation['compressed_size']=compressed_size 207 self.current_operation['compressed_size']=compressed_size
189 self.current_operation['deduplicated_size']=deduplicated_size 208 self.current_operation['deduplicated_size']=deduplicated_size
194 213
195 elif t=='file_status': 214 elif t=='file_status':
196 pass 215 pass
197 216
198 elif t=='log_message': 217 elif t=='log_message':
199 if 'levelname' not in status: 218 if 'levelname' not in msg:
200 status['levelname']='ERROR' 219 msg['levelname']='ERROR'
201 if 'message' not in status: 220 if 'message' not in msg:
202 status['message']='UNKNOWN' 221 msg['message']='UNKNOWN'
203 if 'name' not in status: 222 if 'name' not in msg:
204 status['name']='borg' 223 msg['name']='borg'
205 lvl=translate_loglevel(status['levelname']) 224 lvl=translate_loglevel(msg['levelname'])
206 self.logger.log(lvl, status['name'] + ': ' + status['message']) 225 self.logger.log(lvl, msg['name'] + ': ' + msg['message'])
207 if lvl>=logging.WARNING: 226 if lvl>=logging.WARNING:
208 errors_this_message=status 227 errormsg=msg
209 state=ERRORS 228 errors=Errors.ERRORS
210 if ('msgid' in status and 229 if ('msgid' in msg and
211 (status['msgid']=='LockTimeout' or # observed in reality 230 (msg['msgid']=='LockTimeout' or # observed in reality
212 status['msgid']=='LockErrorT' or # in docs 231 msg['msgid']=='LockErrorT' or # in docs
213 status['msgid']=='LockErrorT')): # in docs 232 msg['msgid']=='LockErrorT')): # in docs
214 state=BUSY 233 errors=Errors.BUSY
215 with self._cond: 234 with self._cond:
216 self.state=combine_state(self.state, state) 235 self.errors=self.errors.combine(errors)
217 status, callback=self.__status_unlocked() 236 status, callback=self.__status_unlocked()
218 else: 237 else:
219 self.logger.debug('Unrecognised log entry %s' % str(status)) 238 self.logger.debug('Unrecognised log entry %s' % str(status))
220 239
221 if callback: 240 if callback:
222 callback(status, errors=errors_this_message) 241 callback(status, errors=errormsg)
223 242
224 self.logger.debug('Waiting for borg subprocess to terminate in log thread') 243 self.logger.debug('Waiting for borg subprocess to terminate in log thread')
225 244
226 self.borg_instance.wait() 245 self.borg_instance.wait()
227 246
228 self.logger.debug('Borg subprocess terminated; terminating log listener thread') 247 self.logger.debug('Borg subprocess terminated; terminating log listener thread')
229 248
230 def __result_listener(self): 249 def __result_listener(self):
231 # self.state=ACTIVE
232 # with self._cond:
233 # status, callback=self.__status_unlocked()
234 # if callback:
235 # callback(status)
236
237 self.logger.debug('Result listener thread waiting for result') 250 self.logger.debug('Result listener thread waiting for result')
238 251
239 res=self.borg_instance.read_result() 252 res=self.borg_instance.read_result()
240 253
241 # Finish processing remaining errors 254 # Finish processing remaining errors
242 self.thread_log.join() 255 self.thread_log.join()
243 256
244 with self._cond: 257 with self._cond:
245 state=self.state 258 errors=self.errors
246
247 # If there were no errors, reset back to INACTIVE state
248 if state==ACTIVE:
249 state=INACTIVE
250 259
251 self.logger.debug('Borg result: %s' % str(res)) 260 self.logger.debug('Borg result: %s' % str(res))
252 261
253 if res is None and state==INACTIVE: 262 if res is None:
254 self.logger.error('No result from borg despite no error in log') 263 self.logger.error('No result from borg despite no error in log')
255 state=ERRORS 264 if errors.ok():
265 errors=Errors.ERRORS
256 266
257 self.logger.debug('Waiting for borg subprocess to terminate in result thread') 267 self.logger.debug('Waiting for borg subprocess to terminate in result thread')
258 268
259 if not self.borg_instance.wait(): 269 if not self.borg_instance.wait():
260 self.logger.error('Borg subprocess did not terminate') 270 self.logger.error('Borg subprocess did not terminate')
261 state=combine_state(state, ERRORS) 271 if errors.ok():
262 272 errors=Errors.ERRORS
263 self.logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) 273
274 self.logger.debug('Borg subprocess terminated (errors state: %s); terminating result listener thread' % str(errors))
264 275
265 with self._cond: 276 with self._cond:
266 if self.current_operation['operation']=='create': 277 if self.current_operation['operation']=='create':
267 self.lastrun_when=self.current_operation['when_monotonic'] 278 self.lastrun_when=self.current_operation['when_monotonic']
268 self.thread_res=None 279 self.thread_res=None
269 self.thread_log=None 280 self.thread_log=None
270 self.borg_instance=None 281 self.borg_instance=None
271 self.current_operation=None 282 self.current_operation=None
272 self.state=state 283 self.state=State.INACTIVE
284 self.errors=errors
273 self.__update_status() 285 self.__update_status()
274 self._cond.notify() 286 self._cond.notify()
275 287
276 def __do_launch(self, op, archive_or_repository, *args): 288 def __do_launch(self, op, archive_or_repository, *args):
277 passphrase=self.extract_passphrase() 289 passphrase=self.extract_passphrase()
311 self.common_parameters + 323 self.common_parameters +
312 self.prune_parameters)) 324 self.prune_parameters))
313 else: 325 else:
314 raise NotImplementedError("Invalid operation '%s'" % op['operation']) 326 raise NotImplementedError("Invalid operation '%s'" % op['operation'])
315 327
328 # This must be called with self._cond held.
316 def __launch_check(self): 329 def __launch_check(self):
317 op=self.scheduled_operation 330 op=self.scheduled_operation
318 if not op: 331 if not op:
319 self.logger.debug("Queued operation aborted") 332 self.logger.debug("Queued operation aborted")
320 else: 333 else:
322 335
323 self.__launch(op) 336 self.__launch(op)
324 337
325 self.current_operation=op 338 self.current_operation=op
326 self.current_operation['when_monotonic']=time.monotonic() 339 self.current_operation['when_monotonic']=time.monotonic()
327 self.state=ACTIVE 340 self.state=State.ACTIVE
341 # Reset error status when starting a new operation
342 self.errors=Errors.OK
328 self.__update_status() 343 self.__update_status()
329 344
330 345
331 def __main_thread(self): 346 def __main_thread(self):
332 with self._cond: 347 with self._cond:
337 self.__main_thread_wait_schedule() 352 self.__main_thread_wait_schedule()
338 if not self._terminate: 353 if not self._terminate:
339 self.__main_thread_queue_and_launch() 354 self.__main_thread_queue_and_launch()
340 except Exception as err: 355 except Exception as err:
341 self.logger.exception("Error with backup '%s'" % self._name) 356 self.logger.exception("Error with backup '%s'" % self._name)
342 self.lastrun_when=time.monotonic() 357 self.errors=Errors.ERRORS
343 self.state=ERRORS 358
344 self.scheduled_operation=None 359 self.state=State.INACTIVE
360 self.scheduled_operation=None
345 361
346 # Clean up to terminate: kill borg instance and communication threads 362 # Clean up to terminate: kill borg instance and communication threads
347 if self.borg_instance: 363 if self.borg_instance:
348 self.logger.debug("Terminating a borg instance") 364 self.logger.debug("Terminating a borg instance")
349 self.borg_instance.terminate() 365 self.borg_instance.terminate()
350 366
351 # Store threads to use outside lock 367 # Store threads to use outside lock
352 thread_log=self.thread_log 368 thread_log=self.thread_log
353 thread_res=self.thread_res 369 thread_res=self.thread_res
370 self.thread_log=None
371 self.thread_res=None
354 372
355 self.logger.debug("Waiting for log and result threads to terminate") 373 self.logger.debug("Waiting for log and result threads to terminate")
356 374
357 if thread_log: 375 if thread_log:
358 thread_log.join() 376 thread_log.join()
378 delay=max(0, op['when_monotonic']-now) 396 delay=max(0, op['when_monotonic']-now)
379 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % 397 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" %
380 (op['operation'], op['detail'], delay)) 398 (op['operation'], op['detail'], delay))
381 399
382 self.scheduled_operation=op 400 self.scheduled_operation=op
383 self.state=combine_state(self.state, SCHEDULED) 401 self.state=State.SCHEDULED
384 self.__update_status() 402 self.__update_status()
385 403
386 # Wait under scheduled wait 404 # Wait under scheduled wait
387 self.scheduler.wait_until(now+delay, self._cond, self._name) 405 self.scheduler.wait_until(now+delay, self._cond, self._name)
388 else: 406 else:
389 # Nothing scheduled - just wait 407 # Nothing scheduled - just wait
390 self.logger.debug("Waiting for manual scheduling") 408 self.logger.info("Waiting for manual scheduling")
409
410 self.state=State.INACTIVE
411 self.__update_status()
412
391 self._cond.wait() 413 self._cond.wait()
392 414
393 # Main thread/3. If there is a scheduled operation (it might have been 415 # Main thread/3. If there is a scheduled operation (it might have been
394 # changed manually from 'op' created in __main_thread_wait_schedule above), 416 # changed manually from 'op' created in __main_thread_wait_schedule above),
395 # queue it on the repository, and launch the operation once repository 417 # queue it on the repository, and launch the operation once repository
396 # available 418 # available
397 def __main_thread_queue_and_launch(self): 419 def __main_thread_queue_and_launch(self):
398 if self.scheduled_operation: 420 if self.scheduled_operation:
399 self.logger.debug("Queuing") 421 self.logger.debug("Queuing")
400 self.state=combine_state(self.state, QUEUED) 422 self.state=State.QUEUED
401 self.__update_status() 423 self.__update_status()
402 self.repository.queue_action(self._cond, 424 self.repository.queue_action(self._cond,
403 action=self.__launch_check, 425 action=self.__launch_check,
404 name=self._name) 426 name=self._name)
405 427
414 return None 436 return None
415 else: 437 else:
416 return {'operation': 'create', 438 return {'operation': 'create',
417 'detail': 'initial', 439 'detail': 'initial',
418 'when_monotonic': now+initial_interval} 440 'when_monotonic': now+initial_interval}
419 elif self.state>=BUSY: 441 elif not self.errors.ok():
420 if self.retry_interval==0: 442 if self.retry_interval==0:
421 return None 443 return None
422 else: 444 else:
423 return {'operation': 'create', 445 return {'operation': 'create',
424 'detail': 'retry', 446 'detail': 'retry',
434 def __status_unlocked(self): 456 def __status_unlocked(self):
435 callback=self.__status_update_callback 457 callback=self.__status_update_callback
436 458
437 if self.current_operation: 459 if self.current_operation:
438 status=self.current_operation 460 status=self.current_operation
439 status['type']='current' 461 elif self.scheduled_operation:
440 # Errors should be set by listeners 462 status=self.scheduled_operation
441 else: 463 else:
442 if self.scheduled_operation: 464 status={'type': 'nothing'}
443 status=self.scheduled_operation
444 if self.state==QUEUED:
445 status['type']='queued'
446 else:
447 status['type']='scheduled'
448 else:
449 status={'type': 'nothing'}
450 465
451 status['name']=self._name 466 status['name']=self._name
452 status['state']=self.state 467 status['state']=self.state
468 status['errors']=self.errors
453 469
454 if 'detail' not in status: 470 if 'detail' not in status:
455 status['detail']='NONE' 471 status['detail']='NONE'
456 472
457 if 'when_monotonic' in status: 473 if 'when_monotonic' in status:

mercurial