44 if isinstance(tmp, int): |
44 if isinstance(tmp, int): |
45 return tmp |
45 return tmp |
46 return None |
46 return None |
47 |
47 |
48 |
48 |
49 class Backup: |
49 class Backup(TerminableThread): |
50 |
50 |
51 def __decode_config(self, cfg): |
51 def __decode_config(self, cfg): |
52 loc0='backup target %d' % self.identifier |
52 loc0='backup target %d' % self.identifier |
53 |
53 |
54 self.name=config.check_string(cfg, 'name', 'Name', loc0) |
54 self._name=config.check_string(cfg, 'name', 'Name', loc0) |
55 |
55 |
56 self.loc='backup target "%s"' % self.name |
56 self.loc='backup target "%s"' % self._name |
57 |
57 |
58 self.repository=config.check_string(cfg, 'repository', |
58 self.repository=config.check_string(cfg, 'repository', |
59 'Target repository', self.loc) |
59 'Target repository', self.loc) |
60 |
60 |
61 self.archive_prefix=config.check_string(cfg, 'archive_prefix', |
61 self.archive_prefix=config.check_string(cfg, 'archive_prefix', |
113 self.__passphrase=pw |
113 self.__passphrase=pw |
114 else: |
114 else: |
115 self.__passphrase=None |
115 self.__passphrase=None |
116 return self.__passphrase |
116 return self.__passphrase |
117 |
117 |
118 def __init__(self, identifier, cfg): |
118 def __init__(self, identifier, cfg, scheduler): |
119 self.identifier=identifier |
119 self.identifier=identifier |
120 |
|
121 self.__decode_config(cfg) |
|
122 |
|
123 self.config=config |
120 self.config=config |
124 self.lastrun_when=None |
121 self.lastrun_when=None |
125 self.borg_instance=None |
122 self.borg_instance=None |
126 self.current_operation=None |
123 self.current_operation=None |
127 self.thread_log=None |
124 self.thread_log=None |
128 self.thread_res=None |
125 self.thread_res=None |
129 self.timer=None |
|
130 self.scheduled_operation=None |
126 self.scheduled_operation=None |
131 self.lock=Lock() |
|
132 self.__status_update_callback=None |
127 self.__status_update_callback=None |
133 self.state=INACTIVE |
128 self.state=INACTIVE |
|
129 self.scheduler=scheduler |
|
130 |
|
131 self.__decode_config(cfg) |
|
132 |
|
133 super().__init__(target = self.__main_thread, name = self._name) |
|
134 self.daemon=True |
134 |
135 |
135 def is_running(self): |
136 def is_running(self): |
136 with self.lock: |
137 with self._cond: |
137 running=self.__is_running_unlocked() |
138 running=self.__is_running_unlocked() |
138 return running |
139 return running |
139 |
140 |
140 def __is_running_unlocked(self): |
141 def __is_running_unlocked(self): |
141 running=self.current_operation |
142 running=self.current_operation |
161 |
162 |
162 if t=='progress_percent': |
163 if t=='progress_percent': |
163 current=safe_get_int(status, 'current') |
164 current=safe_get_int(status, 'current') |
164 total=safe_get_int(status, 'total') |
165 total=safe_get_int(status, 'total') |
165 if current is not None and total is not None: |
166 if current is not None and total is not None: |
166 with self.lock: |
167 with self._cond: |
167 self.current_operation['progress_current']=current |
168 self.current_operation['progress_current']=current |
168 self.current_operation['progress_total']=total |
169 self.current_operation['progress_total']=total |
169 status, callback=self.__status_unlocked() |
170 status, callback=self.__status_unlocked() |
170 |
171 |
171 elif t=='archive_progress': |
172 elif t=='archive_progress': |
172 original_size=safe_get_int(status, 'original_size') |
173 original_size=safe_get_int(status, 'original_size') |
173 compressed_size=safe_get_int(status, 'compressed_size') |
174 compressed_size=safe_get_int(status, 'compressed_size') |
174 deduplicated_size=safe_get_int(status, 'deduplicated_size') |
175 deduplicated_size=safe_get_int(status, 'deduplicated_size') |
175 if original_size is not None and original_size is not None and deduplicated_size is not None: |
176 if original_size is not None and original_size is not None and deduplicated_size is not None: |
176 with self.lock: |
177 with self._cond: |
177 self.current_operation['original_size']=original_size |
178 self.current_operation['original_size']=original_size |
178 self.current_operation['compressed_size']=compressed_size |
179 self.current_operation['compressed_size']=compressed_size |
179 self.current_operation['deduplicated_size']=deduplicated_size |
180 self.current_operation['deduplicated_size']=deduplicated_size |
180 status, callback=self.__status_unlocked() |
181 status, callback=self.__status_unlocked() |
181 |
182 |
249 logger.critical('Borg subprocess did not terminate') |
250 logger.critical('Borg subprocess did not terminate') |
250 state=combine_state(state, ERRORS) |
251 state=combine_state(state, ERRORS) |
251 |
252 |
252 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
253 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
253 |
254 |
254 with self.lock: |
255 with self._cond: |
255 if self.current_operation['operation']=='create': |
256 if self.current_operation['operation']=='create': |
256 self.lastrun_when=self.current_operation['when_monotonic'] |
257 self.lastrun_when=self.current_operation['when_monotonic'] |
257 self.thread_res=None |
258 self.thread_res=None |
258 self.thread_log=None |
259 self.thread_log=None |
259 self.borg_instance=None |
260 self.borg_instance=None |
260 self.current_operation=None |
261 self.current_operation=None |
261 self.state=state |
262 self.state=state |
262 self.__schedule_unlocked() |
263 self._cond.notify() |
263 status, callback=self.__status_unlocked() |
264 |
264 if callback: |
265 def __do_launch(self, op, archive_or_repository, *args): |
265 callback(self, status) |
|
266 |
|
267 def __do_launch(self, queue, op, archive_or_repository, *args): |
|
268 passphrase=self.extract_passphrase() |
266 passphrase=self.extract_passphrase() |
269 |
267 |
270 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
268 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
271 inst.launch(passphrase=passphrase) |
269 inst.launch(passphrase=passphrase) |
272 |
270 |
279 t_res.daemon=True |
277 t_res.daemon=True |
280 |
278 |
281 self.thread_log=t_log |
279 self.thread_log=t_log |
282 self.thread_res=t_res |
280 self.thread_res=t_res |
283 self.borg_instance=inst |
281 self.borg_instance=inst |
284 self.queue=queue |
|
285 self.current_operation=op |
282 self.current_operation=op |
286 self.current_operation['when_monotonic']=time.monotonic() |
283 self.current_operation['when_monotonic']=time.monotonic() |
287 self.state=ACTIVE |
284 self.state=ACTIVE |
288 |
285 |
289 t_log.start() |
286 t_log.start() |
290 t_res.start() |
287 t_res.start() |
291 |
288 |
292 def __launch(self, op, queue): |
289 def __launch(self, op): |
293 if self.__is_running_unlocked(): |
290 if self.__is_running_unlocked(): |
294 logging.info('Cannot start %s: already running %s' |
291 logging.info('Cannot start %s: already running %s' |
295 % (operation, self.current_operation)) |
292 % (operation, self.current_operation)) |
296 return False |
293 return False |
297 else: |
294 else: |
298 if self.timer: |
|
299 logger.debug('Unscheduling timed operation due to launch of operation') |
|
300 self.timer=None |
|
301 self.scheduled_operation=None |
|
302 |
|
303 try: |
295 try: |
304 logger.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) |
296 logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) |
305 |
297 |
306 if op['operation']=='create': |
298 if op['operation']=='create': |
307 archive="%s::%s%s" % (self.repository, |
299 archive="%s::%s%s" % (self.repository, |
308 self.archive_prefix, |
300 self.archive_prefix, |
309 self.archive_template) |
301 self.archive_template) |
310 |
302 |
311 self.__do_launch(queue, op, archive, |
303 self.__do_launch(op, archive, |
312 self.common_parameters+self.create_parameters, |
304 self.common_parameters+self.create_parameters, |
313 self.paths) |
305 self.paths) |
314 elif op['operation']=='prune': |
306 elif op['operation']=='prune': |
315 self.__do_launch(queue, op, self.repository, |
307 self.__do_launch(op, self.repository, |
316 ([{'prefix': self.archive_prefix}] + |
308 ([{'prefix': self.archive_prefix}] + |
317 self.common_parameters + |
309 self.common_parameters + |
318 self.prune_parameters)) |
310 self.prune_parameters)) |
319 else: |
311 else: |
320 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
312 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
321 except Exception as err: |
313 except Exception as err: |
322 logger.debug('Rescheduling after failure') |
314 logger.debug('Rescheduling after failure') |
323 self.lastrun_when=time.monotonic() |
315 self.lastrun_when=time.monotonic() |
324 self.state=ERRORS |
316 self.state=ERRORS |
325 self.__schedule_unlocked() |
|
326 raise err |
317 raise err |
327 |
318 |
328 return True |
319 return True |
329 |
320 |
330 def create(self, queue): |
321 def create(self): |
331 op={'operation': 'create', 'detail': 'manual'} |
322 op={'operation': 'create', 'detail': 'manual'} |
332 with self.lock: |
323 with self._cond: |
333 res=self.__launch(op, queue) |
324 self.scheduled_operation=op |
334 return res |
325 self._cond.notify() |
335 |
326 |
336 def prune(self, queue): |
327 def prune(self): |
337 op={'operation': 'prune', 'detail': 'manual'} |
328 op={'operation': 'prune', 'detail': 'manual'} |
338 with self.lock: |
329 with self._cond: |
339 res=self.__launch(op, queue) |
330 self.scheduled_operation=op |
340 return res |
331 self._cond.notify() |
341 |
332 |
342 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
333 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
343 def abort(self): |
334 def abort(self): |
344 with self.lock: |
335 with self._cond: |
345 if self.borg_instance: |
336 if self.borg_instance: |
346 self.borg_instance.terminate() |
337 self.borg_instance.terminate() |
347 #thread_log=self.thread_log |
|
348 #thread_res=self.thread_res |
|
349 |
|
350 #if thread_log: |
|
351 # thread_log.terminate() |
|
352 |
|
353 #if thread_res: |
|
354 # thread_res.terminate() |
|
355 |
|
356 |
|
357 def join(self): |
|
358 logger.debug('Waiting for borg listener threads to terminate') |
|
359 |
|
360 with self.lock: |
|
361 thread_log=self.thread_log |
|
362 thread_res=self.thread_res |
|
363 |
|
364 if thread_log: |
|
365 thread_log.join() |
|
366 |
|
367 if thread_res: |
|
368 thread_res.join() |
|
369 |
|
370 assert(self.thread_log==None and self.thread_res==None) |
|
371 |
|
372 def __queue_timed_operation(self): |
|
373 with self.lock: |
|
374 op=self.scheduled_operation |
|
375 self.scheduled_operation=None |
|
376 self.timer=None |
|
377 |
|
378 if self.__is_running_unlocked(): |
|
379 logger.info('Aborted queue operation, as an operation is already running') |
|
380 else: |
|
381 # TODO: Queue on 'repository' and online status for SSH, etc. |
|
382 |
|
383 # TODO: UI comms. queue? |
|
384 self.__launch(op, None) |
|
385 |
338 |
386 def __next_operation_unlocked(self): |
339 def __next_operation_unlocked(self): |
387 # TODO: pruning as well |
340 # TODO: pruning as well |
388 now=time.monotonic() |
341 now=time.monotonic() |
389 if not self.lastrun_when: |
342 if not self.lastrun_when: |
409 else: |
362 else: |
410 return {'operation': 'create', |
363 return {'operation': 'create', |
411 'detail': 'normal', |
364 'detail': 'normal', |
412 'when_monotonic': self.lastrun_when+self.backup_interval} |
365 'when_monotonic': self.lastrun_when+self.backup_interval} |
413 |
366 |
414 def __schedule_unlocked(self): |
367 def __main_thread(self): |
415 if self.current_operation: |
368 with self._cond: |
416 return self.current_operation |
369 while not self._terminate: |
417 else: |
370 op=None |
418 op=self.__next_operation_unlocked() |
371 if not self.current_operation: |
419 |
372 op=self.__next_operation_unlocked() |
420 if op: |
373 if not op: |
421 now=time.monotonic() |
374 self.__update_status() |
422 delay=max(0, op['when_monotonic']-now) |
375 self._cond.wait() |
423 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
376 else: |
424 (op['operation'], op['detail'], self.name, delay)) |
377 now=time.monotonic() |
425 tmr=Timer(delay, self.__queue_timed_operation) |
378 delay=max(0, op['when_monotonic']-now) |
426 self.scheduled_operation=op |
379 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
427 self.timer=tmr |
380 (op['operation'], op['detail'], self._name, delay)) |
428 self.state=combine_state(self.state, SCHEDULED) |
381 |
429 tmr.start() |
382 self.scheduled_operation=op |
430 |
383 self.state=combine_state(self.state, SCHEDULED) |
431 return op |
384 |
432 |
385 self.__update_status() |
433 def schedule(self): |
386 self.scheduler.wait_until(now+delay, self._cond, self._name) |
434 with self.lock: |
387 |
435 return self.__schedule_unlocked() |
388 if self.scheduled_operation: |
436 |
389 op=self.scheduled_operation |
437 def status(self): |
390 self.scheduled_operation=None |
438 with self.lock: |
391 self.__launch(op) |
439 res=self.__status_unlocked() |
392 |
440 return res[0] |
393 # Kill a running borg to cause log and result threads to terminate |
|
394 if self.borg_instance: |
|
395 logger.debug("Terminating a borg instance") |
|
396 self.borg_instance.terminate() |
|
397 |
|
398 # Store threads to use outside lock |
|
399 thread_log=self.thread_log |
|
400 thread_err=self.thread_err |
|
401 |
|
402 logger.debug("Waiting for log and result threads to terminate") |
|
403 |
|
404 if thread_log: |
|
405 thread_log.join() |
|
406 |
|
407 if thread_res: |
|
408 thread_res.join() |
|
409 |
441 |
410 |
442 def __status_unlocked(self): |
411 def __status_unlocked(self): |
443 callback=self.__status_update_callback |
412 callback=self.__status_update_callback |
444 |
413 |
445 if self.current_operation: |
414 if self.current_operation: |