backup.py

changeset 49
db33dfa64ad6
parent 48
be3ed25df789
child 50
2d8947351727
equal deleted inserted replaced
48:be3ed25df789 49:db33dfa64ad6
6 import logging 6 import logging
7 import time 7 import time
8 import keyring 8 import keyring
9 import borgend 9 import borgend
10 from instance import BorgInstance 10 from instance import BorgInstance
11 from queue import Queue 11 from threading import Thread, Lock, Condition
12 from threading import Thread, Lock, Timer 12 from scheduler import TerminableThread
13 13
14 logger=borgend.logger.getChild(__name__) 14 logger=borgend.logger.getChild(__name__)
15 15
16 # State 16 # State
17 INACTIVE=0 17 INACTIVE=0
44 if isinstance(tmp, int): 44 if isinstance(tmp, int):
45 return tmp 45 return tmp
46 return None 46 return None
47 47
48 48
49 class Backup: 49 class Backup(TerminableThread):
50 50
51 def __decode_config(self, cfg): 51 def __decode_config(self, cfg):
52 loc0='backup target %d' % self.identifier 52 loc0='backup target %d' % self.identifier
53 53
54 self.name=config.check_string(cfg, 'name', 'Name', loc0) 54 self._name=config.check_string(cfg, 'name', 'Name', loc0)
55 55
56 self.loc='backup target "%s"' % self.name 56 self.loc='backup target "%s"' % self._name
57 57
58 self.repository=config.check_string(cfg, 'repository', 58 self.repository=config.check_string(cfg, 'repository',
59 'Target repository', self.loc) 59 'Target repository', self.loc)
60 60
61 self.archive_prefix=config.check_string(cfg, 'archive_prefix', 61 self.archive_prefix=config.check_string(cfg, 'archive_prefix',
113 self.__passphrase=pw 113 self.__passphrase=pw
114 else: 114 else:
115 self.__passphrase=None 115 self.__passphrase=None
116 return self.__passphrase 116 return self.__passphrase
117 117
118 def __init__(self, identifier, cfg): 118 def __init__(self, identifier, cfg, scheduler):
119 self.identifier=identifier 119 self.identifier=identifier
120
121 self.__decode_config(cfg)
122
123 self.config=config 120 self.config=config
124 self.lastrun_when=None 121 self.lastrun_when=None
125 self.borg_instance=None 122 self.borg_instance=None
126 self.current_operation=None 123 self.current_operation=None
127 self.thread_log=None 124 self.thread_log=None
128 self.thread_res=None 125 self.thread_res=None
129 self.timer=None
130 self.scheduled_operation=None 126 self.scheduled_operation=None
131 self.lock=Lock()
132 self.__status_update_callback=None 127 self.__status_update_callback=None
133 self.state=INACTIVE 128 self.state=INACTIVE
129 self.scheduler=scheduler
130
131 self.__decode_config(cfg)
132
133 super().__init__(target = self.__main_thread, name = self._name)
134 self.daemon=True
134 135
135 def is_running(self): 136 def is_running(self):
136 with self.lock: 137 with self._cond:
137 running=self.__is_running_unlocked() 138 running=self.__is_running_unlocked()
138 return running 139 return running
139 140
140 def __is_running_unlocked(self): 141 def __is_running_unlocked(self):
141 running=self.current_operation 142 running=self.current_operation
161 162
162 if t=='progress_percent': 163 if t=='progress_percent':
163 current=safe_get_int(status, 'current') 164 current=safe_get_int(status, 'current')
164 total=safe_get_int(status, 'total') 165 total=safe_get_int(status, 'total')
165 if current is not None and total is not None: 166 if current is not None and total is not None:
166 with self.lock: 167 with self._cond:
167 self.current_operation['progress_current']=current 168 self.current_operation['progress_current']=current
168 self.current_operation['progress_total']=total 169 self.current_operation['progress_total']=total
169 status, callback=self.__status_unlocked() 170 status, callback=self.__status_unlocked()
170 171
171 elif t=='archive_progress': 172 elif t=='archive_progress':
172 original_size=safe_get_int(status, 'original_size') 173 original_size=safe_get_int(status, 'original_size')
173 compressed_size=safe_get_int(status, 'compressed_size') 174 compressed_size=safe_get_int(status, 'compressed_size')
174 deduplicated_size=safe_get_int(status, 'deduplicated_size') 175 deduplicated_size=safe_get_int(status, 'deduplicated_size')
175 if original_size is not None and original_size is not None and deduplicated_size is not None: 176 if original_size is not None and original_size is not None and deduplicated_size is not None:
176 with self.lock: 177 with self._cond:
177 self.current_operation['original_size']=original_size 178 self.current_operation['original_size']=original_size
178 self.current_operation['compressed_size']=compressed_size 179 self.current_operation['compressed_size']=compressed_size
179 self.current_operation['deduplicated_size']=deduplicated_size 180 self.current_operation['deduplicated_size']=deduplicated_size
180 status, callback=self.__status_unlocked() 181 status, callback=self.__status_unlocked()
181 182
200 if ('msgid' in status and 201 if ('msgid' in status and
201 (status['msgid']=='LockTimeout' or # observed in reality 202 (status['msgid']=='LockTimeout' or # observed in reality
202 status['msgid']=='LockErrorT' or # in docs 203 status['msgid']=='LockErrorT' or # in docs
203 status['msgid']=='LockErrorT')): # in docs 204 status['msgid']=='LockErrorT')): # in docs
204 state=BUSY 205 state=BUSY
205 with self.lock: 206 with self._cond:
206 self.state=combine_state(self.state, state) 207 self.state=combine_state(self.state, state)
207 status, callback=self.__status_unlocked() 208 status, callback=self.__status_unlocked()
208 else: 209 else:
209 logger.debug('Unrecognised log entry %s' % str(status)) 210 logger.debug('Unrecognised log entry %s' % str(status))
210 211
216 self.borg_instance.wait() 217 self.borg_instance.wait()
217 218
218 logger.debug('Borg subprocess terminated; terminating log listener thread') 219 logger.debug('Borg subprocess terminated; terminating log listener thread')
219 220
220 def __result_listener(self): 221 def __result_listener(self):
221 with self.lock: 222 with self._cond:
222 status, callback=self.__status_unlocked() 223 status, callback=self.__status_unlocked()
223 if callback: 224 if callback:
224 callback(self, status) 225 callback(self, status)
225 226
226 logger.debug('Result listener thread waiting for result') 227 logger.debug('Result listener thread waiting for result')
228 res=self.borg_instance.read_result() 229 res=self.borg_instance.read_result()
229 230
230 # Finish processing remaining errors 231 # Finish processing remaining errors
231 self.thread_log.join() 232 self.thread_log.join()
232 233
233 with self.lock: 234 with self._cond:
234 state=self.state 235 state=self.state
235 236
236 # If there were no errors, reset back to INACTIVE state 237 # If there were no errors, reset back to INACTIVE state
237 if state==ACTIVE: 238 if state==ACTIVE:
238 state=INACTIVE 239 state=INACTIVE
249 logger.critical('Borg subprocess did not terminate') 250 logger.critical('Borg subprocess did not terminate')
250 state=combine_state(state, ERRORS) 251 state=combine_state(state, ERRORS)
251 252
252 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) 253 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state))
253 254
254 with self.lock: 255 with self._cond:
255 if self.current_operation['operation']=='create': 256 if self.current_operation['operation']=='create':
256 self.lastrun_when=self.current_operation['when_monotonic'] 257 self.lastrun_when=self.current_operation['when_monotonic']
257 self.thread_res=None 258 self.thread_res=None
258 self.thread_log=None 259 self.thread_log=None
259 self.borg_instance=None 260 self.borg_instance=None
260 self.current_operation=None 261 self.current_operation=None
261 self.state=state 262 self.state=state
262 self.__schedule_unlocked() 263 self._cond.notify()
263 status, callback=self.__status_unlocked() 264
264 if callback: 265 def __do_launch(self, op, archive_or_repository, *args):
265 callback(self, status)
266
267 def __do_launch(self, queue, op, archive_or_repository, *args):
268 passphrase=self.extract_passphrase() 266 passphrase=self.extract_passphrase()
269 267
270 inst=BorgInstance(op['operation'], archive_or_repository, *args) 268 inst=BorgInstance(op['operation'], archive_or_repository, *args)
271 inst.launch(passphrase=passphrase) 269 inst.launch(passphrase=passphrase)
272 270
279 t_res.daemon=True 277 t_res.daemon=True
280 278
281 self.thread_log=t_log 279 self.thread_log=t_log
282 self.thread_res=t_res 280 self.thread_res=t_res
283 self.borg_instance=inst 281 self.borg_instance=inst
284 self.queue=queue
285 self.current_operation=op 282 self.current_operation=op
286 self.current_operation['when_monotonic']=time.monotonic() 283 self.current_operation['when_monotonic']=time.monotonic()
287 self.state=ACTIVE 284 self.state=ACTIVE
288 285
289 t_log.start() 286 t_log.start()
290 t_res.start() 287 t_res.start()
291 288
292 def __launch(self, op, queue): 289 def __launch(self, op):
293 if self.__is_running_unlocked(): 290 if self.__is_running_unlocked():
294 logging.info('Cannot start %s: already running %s' 291 logging.info('Cannot start %s: already running %s'
295 % (operation, self.current_operation)) 292 % (operation, self.current_operation))
296 return False 293 return False
297 else: 294 else:
298 if self.timer:
299 logger.debug('Unscheduling timed operation due to launch of operation')
300 self.timer=None
301 self.scheduled_operation=None
302
303 try: 295 try:
304 logger.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) 296 logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name))
305 297
306 if op['operation']=='create': 298 if op['operation']=='create':
307 archive="%s::%s%s" % (self.repository, 299 archive="%s::%s%s" % (self.repository,
308 self.archive_prefix, 300 self.archive_prefix,
309 self.archive_template) 301 self.archive_template)
310 302
311 self.__do_launch(queue, op, archive, 303 self.__do_launch(op, archive,
312 self.common_parameters+self.create_parameters, 304 self.common_parameters+self.create_parameters,
313 self.paths) 305 self.paths)
314 elif op['operation']=='prune': 306 elif op['operation']=='prune':
315 self.__do_launch(queue, op, self.repository, 307 self.__do_launch(op, self.repository,
316 ([{'prefix': self.archive_prefix}] + 308 ([{'prefix': self.archive_prefix}] +
317 self.common_parameters + 309 self.common_parameters +
318 self.prune_parameters)) 310 self.prune_parameters))
319 else: 311 else:
320 raise NotImplementedError("Invalid operation '%s'" % op['operation']) 312 raise NotImplementedError("Invalid operation '%s'" % op['operation'])
321 except Exception as err: 313 except Exception as err:
322 logger.debug('Rescheduling after failure') 314 logger.debug('Rescheduling after failure')
323 self.lastrun_when=time.monotonic() 315 self.lastrun_when=time.monotonic()
324 self.state=ERRORS 316 self.state=ERRORS
325 self.__schedule_unlocked()
326 raise err 317 raise err
327 318
328 return True 319 return True
329 320
330 def create(self, queue): 321 def create(self):
331 op={'operation': 'create', 'detail': 'manual'} 322 op={'operation': 'create', 'detail': 'manual'}
332 with self.lock: 323 with self._cond:
333 res=self.__launch(op, queue) 324 self.scheduled_operation=op
334 return res 325 self._cond.notify()
335 326
336 def prune(self, queue): 327 def prune(self):
337 op={'operation': 'prune', 'detail': 'manual'} 328 op={'operation': 'prune', 'detail': 'manual'}
338 with self.lock: 329 with self._cond:
339 res=self.__launch(op, queue) 330 self.scheduled_operation=op
340 return res 331 self._cond.notify()
341 332
342 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages 333 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
343 def abort(self): 334 def abort(self):
344 with self.lock: 335 with self._cond:
345 if self.borg_instance: 336 if self.borg_instance:
346 self.borg_instance.terminate() 337 self.borg_instance.terminate()
347 #thread_log=self.thread_log
348 #thread_res=self.thread_res
349
350 #if thread_log:
351 # thread_log.terminate()
352
353 #if thread_res:
354 # thread_res.terminate()
355
356
357 def join(self):
358 logger.debug('Waiting for borg listener threads to terminate')
359
360 with self.lock:
361 thread_log=self.thread_log
362 thread_res=self.thread_res
363
364 if thread_log:
365 thread_log.join()
366
367 if thread_res:
368 thread_res.join()
369
370 assert(self.thread_log==None and self.thread_res==None)
371
372 def __queue_timed_operation(self):
373 with self.lock:
374 op=self.scheduled_operation
375 self.scheduled_operation=None
376 self.timer=None
377
378 if self.__is_running_unlocked():
379 logger.info('Aborted queue operation, as an operation is already running')
380 else:
381 # TODO: Queue on 'repository' and online status for SSH, etc.
382
383 # TODO: UI comms. queue?
384 self.__launch(op, None)
385 338
386 def __next_operation_unlocked(self): 339 def __next_operation_unlocked(self):
387 # TODO: pruning as well 340 # TODO: pruning as well
388 now=time.monotonic() 341 now=time.monotonic()
389 if not self.lastrun_when: 342 if not self.lastrun_when:
409 else: 362 else:
410 return {'operation': 'create', 363 return {'operation': 'create',
411 'detail': 'normal', 364 'detail': 'normal',
412 'when_monotonic': self.lastrun_when+self.backup_interval} 365 'when_monotonic': self.lastrun_when+self.backup_interval}
413 366
414 def __schedule_unlocked(self): 367 def __main_thread(self):
415 if self.current_operation: 368 with self._cond:
416 return self.current_operation 369 while not self._terminate:
417 else: 370 op=None
418 op=self.__next_operation_unlocked() 371 if not self.current_operation:
419 372 op=self.__next_operation_unlocked()
420 if op: 373 if not op:
421 now=time.monotonic() 374 self.__update_status()
422 delay=max(0, op['when_monotonic']-now) 375 self._cond.wait()
423 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % 376 else:
424 (op['operation'], op['detail'], self.name, delay)) 377 now=time.monotonic()
425 tmr=Timer(delay, self.__queue_timed_operation) 378 delay=max(0, op['when_monotonic']-now)
426 self.scheduled_operation=op 379 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" %
427 self.timer=tmr 380 (op['operation'], op['detail'], self._name, delay))
428 self.state=combine_state(self.state, SCHEDULED) 381
429 tmr.start() 382 self.scheduled_operation=op
430 383 self.state=combine_state(self.state, SCHEDULED)
431 return op 384
432 385 self.__update_status()
433 def schedule(self): 386 self.scheduler.wait_until(now+delay, self._cond, self._name)
434 with self.lock: 387
435 return self.__schedule_unlocked() 388 if self.scheduled_operation:
436 389 op=self.scheduled_operation
437 def status(self): 390 self.scheduled_operation=None
438 with self.lock: 391 self.__launch(op)
439 res=self.__status_unlocked() 392
440 return res[0] 393 # Kill a running borg to cause log and result threads to terminate
394 if self.borg_instance:
395 logger.debug("Terminating a borg instance")
396 self.borg_instance.terminate()
397
398 # Store threads to use outside lock
399 thread_log=self.thread_log
400 thread_err=self.thread_err
401
402 logger.debug("Waiting for log and result threads to terminate")
403
404 if thread_log:
405 thread_log.join()
406
407 if thread_res:
408 thread_res.join()
409
441 410
442 def __status_unlocked(self): 411 def __status_unlocked(self):
443 callback=self.__status_update_callback 412 callback=self.__status_update_callback
444 413
445 if self.current_operation: 414 if self.current_operation:
451 status=self.scheduled_operation 420 status=self.scheduled_operation
452 status['type']='scheduled' 421 status['type']='scheduled'
453 else: 422 else:
454 status={'type': 'nothing'} 423 status={'type': 'nothing'}
455 424
456 status['name']=self.name 425 status['name']=self._name
457 status['state']=self.state 426 status['state']=self.state
458 427
459 if 'detail' not in status: 428 if 'detail' not in status:
460 status['detail']='NONE' 429 status['detail']='NONE'
461 430
463 status['when']=(status['when_monotonic'] 432 status['when']=(status['when_monotonic']
464 -time.monotonic()+time.time()) 433 -time.monotonic()+time.time())
465 434
466 return status, callback 435 return status, callback
467 436
437 def __update_status(self):
438 status, callback = self.__status_unlocked()
439 if callback:
440 self._cond.release()
441 try:
442 callback(self, status)
443 finally:
444 self._cond.acquire()
445
468 def set_status_update_callback(self, callback): 446 def set_status_update_callback(self, callback):
469 with self.lock: 447 with self._cond:
470 self.__status_update_callback=callback 448 self.__status_update_callback=callback
471 449
472 450 def status(self):
451 with self._cond:
452 res=self.__status_unlocked()
453 return res[0]
454

mercurial