| 44 if isinstance(tmp, int): |
44 if isinstance(tmp, int): |
| 45 return tmp |
45 return tmp |
| 46 return None |
46 return None |
| 47 |
47 |
| 48 |
48 |
| 49 class Backup: |
49 class Backup(TerminableThread): |
| 50 |
50 |
| 51 def __decode_config(self, cfg): |
51 def __decode_config(self, cfg): |
| 52 loc0='backup target %d' % self.identifier |
52 loc0='backup target %d' % self.identifier |
| 53 |
53 |
| 54 self.name=config.check_string(cfg, 'name', 'Name', loc0) |
54 self._name=config.check_string(cfg, 'name', 'Name', loc0) |
| 55 |
55 |
| 56 self.loc='backup target "%s"' % self.name |
56 self.loc='backup target "%s"' % self._name |
| 57 |
57 |
| 58 self.repository=config.check_string(cfg, 'repository', |
58 self.repository=config.check_string(cfg, 'repository', |
| 59 'Target repository', self.loc) |
59 'Target repository', self.loc) |
| 60 |
60 |
| 61 self.archive_prefix=config.check_string(cfg, 'archive_prefix', |
61 self.archive_prefix=config.check_string(cfg, 'archive_prefix', |
| 113 self.__passphrase=pw |
113 self.__passphrase=pw |
| 114 else: |
114 else: |
| 115 self.__passphrase=None |
115 self.__passphrase=None |
| 116 return self.__passphrase |
116 return self.__passphrase |
| 117 |
117 |
| 118 def __init__(self, identifier, cfg): |
118 def __init__(self, identifier, cfg, scheduler): |
| 119 self.identifier=identifier |
119 self.identifier=identifier |
| 120 |
|
| 121 self.__decode_config(cfg) |
|
| 122 |
|
| 123 self.config=config |
120 self.config=config |
| 124 self.lastrun_when=None |
121 self.lastrun_when=None |
| 125 self.borg_instance=None |
122 self.borg_instance=None |
| 126 self.current_operation=None |
123 self.current_operation=None |
| 127 self.thread_log=None |
124 self.thread_log=None |
| 128 self.thread_res=None |
125 self.thread_res=None |
| 129 self.timer=None |
|
| 130 self.scheduled_operation=None |
126 self.scheduled_operation=None |
| 131 self.lock=Lock() |
|
| 132 self.__status_update_callback=None |
127 self.__status_update_callback=None |
| 133 self.state=INACTIVE |
128 self.state=INACTIVE |
| |
129 self.scheduler=scheduler |
| |
130 |
| |
131 self.__decode_config(cfg) |
| |
132 |
| |
133 super().__init__(target = self.__main_thread, name = self._name) |
| |
134 self.daemon=True |
| 134 |
135 |
| 135 def is_running(self): |
136 def is_running(self): |
| 136 with self.lock: |
137 with self._cond: |
| 137 running=self.__is_running_unlocked() |
138 running=self.__is_running_unlocked() |
| 138 return running |
139 return running |
| 139 |
140 |
| 140 def __is_running_unlocked(self): |
141 def __is_running_unlocked(self): |
| 141 running=self.current_operation |
142 running=self.current_operation |
| 161 |
162 |
| 162 if t=='progress_percent': |
163 if t=='progress_percent': |
| 163 current=safe_get_int(status, 'current') |
164 current=safe_get_int(status, 'current') |
| 164 total=safe_get_int(status, 'total') |
165 total=safe_get_int(status, 'total') |
| 165 if current is not None and total is not None: |
166 if current is not None and total is not None: |
| 166 with self.lock: |
167 with self._cond: |
| 167 self.current_operation['progress_current']=current |
168 self.current_operation['progress_current']=current |
| 168 self.current_operation['progress_total']=total |
169 self.current_operation['progress_total']=total |
| 169 status, callback=self.__status_unlocked() |
170 status, callback=self.__status_unlocked() |
| 170 |
171 |
| 171 elif t=='archive_progress': |
172 elif t=='archive_progress': |
| 172 original_size=safe_get_int(status, 'original_size') |
173 original_size=safe_get_int(status, 'original_size') |
| 173 compressed_size=safe_get_int(status, 'compressed_size') |
174 compressed_size=safe_get_int(status, 'compressed_size') |
| 174 deduplicated_size=safe_get_int(status, 'deduplicated_size') |
175 deduplicated_size=safe_get_int(status, 'deduplicated_size') |
| 175 if original_size is not None and original_size is not None and deduplicated_size is not None: |
176 if original_size is not None and original_size is not None and deduplicated_size is not None: |
| 176 with self.lock: |
177 with self._cond: |
| 177 self.current_operation['original_size']=original_size |
178 self.current_operation['original_size']=original_size |
| 178 self.current_operation['compressed_size']=compressed_size |
179 self.current_operation['compressed_size']=compressed_size |
| 179 self.current_operation['deduplicated_size']=deduplicated_size |
180 self.current_operation['deduplicated_size']=deduplicated_size |
| 180 status, callback=self.__status_unlocked() |
181 status, callback=self.__status_unlocked() |
| 181 |
182 |
| 249 logger.critical('Borg subprocess did not terminate') |
250 logger.critical('Borg subprocess did not terminate') |
| 250 state=combine_state(state, ERRORS) |
251 state=combine_state(state, ERRORS) |
| 251 |
252 |
| 252 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
253 logger.debug('Borg subprocess terminated (end state %s); terminating result listener thread' % str(state)) |
| 253 |
254 |
| 254 with self.lock: |
255 with self._cond: |
| 255 if self.current_operation['operation']=='create': |
256 if self.current_operation['operation']=='create': |
| 256 self.lastrun_when=self.current_operation['when_monotonic'] |
257 self.lastrun_when=self.current_operation['when_monotonic'] |
| 257 self.thread_res=None |
258 self.thread_res=None |
| 258 self.thread_log=None |
259 self.thread_log=None |
| 259 self.borg_instance=None |
260 self.borg_instance=None |
| 260 self.current_operation=None |
261 self.current_operation=None |
| 261 self.state=state |
262 self.state=state |
| 262 self.__schedule_unlocked() |
263 self._cond.notify() |
| 263 status, callback=self.__status_unlocked() |
264 |
| 264 if callback: |
265 def __do_launch(self, op, archive_or_repository, *args): |
| 265 callback(self, status) |
|
| 266 |
|
| 267 def __do_launch(self, queue, op, archive_or_repository, *args): |
|
| 268 passphrase=self.extract_passphrase() |
266 passphrase=self.extract_passphrase() |
| 269 |
267 |
| 270 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
268 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
| 271 inst.launch(passphrase=passphrase) |
269 inst.launch(passphrase=passphrase) |
| 272 |
270 |
| 279 t_res.daemon=True |
277 t_res.daemon=True |
| 280 |
278 |
| 281 self.thread_log=t_log |
279 self.thread_log=t_log |
| 282 self.thread_res=t_res |
280 self.thread_res=t_res |
| 283 self.borg_instance=inst |
281 self.borg_instance=inst |
| 284 self.queue=queue |
|
| 285 self.current_operation=op |
282 self.current_operation=op |
| 286 self.current_operation['when_monotonic']=time.monotonic() |
283 self.current_operation['when_monotonic']=time.monotonic() |
| 287 self.state=ACTIVE |
284 self.state=ACTIVE |
| 288 |
285 |
| 289 t_log.start() |
286 t_log.start() |
| 290 t_res.start() |
287 t_res.start() |
| 291 |
288 |
| 292 def __launch(self, op, queue): |
289 def __launch(self, op): |
| 293 if self.__is_running_unlocked(): |
290 if self.__is_running_unlocked(): |
| 294 logging.info('Cannot start %s: already running %s' |
291 logging.info('Cannot start %s: already running %s' |
| 295 % (operation, self.current_operation)) |
292 % (operation, self.current_operation)) |
| 296 return False |
293 return False |
| 297 else: |
294 else: |
| 298 if self.timer: |
|
| 299 logger.debug('Unscheduling timed operation due to launch of operation') |
|
| 300 self.timer=None |
|
| 301 self.scheduled_operation=None |
|
| 302 |
|
| 303 try: |
295 try: |
| 304 logger.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) |
296 logger.debug("Launching '%s' on '%s'" % (op['operation'], self._name)) |
| 305 |
297 |
| 306 if op['operation']=='create': |
298 if op['operation']=='create': |
| 307 archive="%s::%s%s" % (self.repository, |
299 archive="%s::%s%s" % (self.repository, |
| 308 self.archive_prefix, |
300 self.archive_prefix, |
| 309 self.archive_template) |
301 self.archive_template) |
| 310 |
302 |
| 311 self.__do_launch(queue, op, archive, |
303 self.__do_launch(op, archive, |
| 312 self.common_parameters+self.create_parameters, |
304 self.common_parameters+self.create_parameters, |
| 313 self.paths) |
305 self.paths) |
| 314 elif op['operation']=='prune': |
306 elif op['operation']=='prune': |
| 315 self.__do_launch(queue, op, self.repository, |
307 self.__do_launch(op, self.repository, |
| 316 ([{'prefix': self.archive_prefix}] + |
308 ([{'prefix': self.archive_prefix}] + |
| 317 self.common_parameters + |
309 self.common_parameters + |
| 318 self.prune_parameters)) |
310 self.prune_parameters)) |
| 319 else: |
311 else: |
| 320 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
312 raise NotImplementedError("Invalid operation '%s'" % op['operation']) |
| 321 except Exception as err: |
313 except Exception as err: |
| 322 logger.debug('Rescheduling after failure') |
314 logger.debug('Rescheduling after failure') |
| 323 self.lastrun_when=time.monotonic() |
315 self.lastrun_when=time.monotonic() |
| 324 self.state=ERRORS |
316 self.state=ERRORS |
| 325 self.__schedule_unlocked() |
|
| 326 raise err |
317 raise err |
| 327 |
318 |
| 328 return True |
319 return True |
| 329 |
320 |
| 330 def create(self, queue): |
321 def create(self): |
| 331 op={'operation': 'create', 'detail': 'manual'} |
322 op={'operation': 'create', 'detail': 'manual'} |
| 332 with self.lock: |
323 with self._cond: |
| 333 res=self.__launch(op, queue) |
324 self.scheduled_operation=op |
| 334 return res |
325 self._cond.notify() |
| 335 |
326 |
| 336 def prune(self, queue): |
327 def prune(self): |
| 337 op={'operation': 'prune', 'detail': 'manual'} |
328 op={'operation': 'prune', 'detail': 'manual'} |
| 338 with self.lock: |
329 with self._cond: |
| 339 res=self.__launch(op, queue) |
330 self.scheduled_operation=op |
| 340 return res |
331 self._cond.notify() |
| 341 |
332 |
| 342 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
333 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
| 343 def abort(self): |
334 def abort(self): |
| 344 with self.lock: |
335 with self._cond: |
| 345 if self.borg_instance: |
336 if self.borg_instance: |
| 346 self.borg_instance.terminate() |
337 self.borg_instance.terminate() |
| 347 #thread_log=self.thread_log |
|
| 348 #thread_res=self.thread_res |
|
| 349 |
|
| 350 #if thread_log: |
|
| 351 # thread_log.terminate() |
|
| 352 |
|
| 353 #if thread_res: |
|
| 354 # thread_res.terminate() |
|
| 355 |
|
| 356 |
|
| 357 def join(self): |
|
| 358 logger.debug('Waiting for borg listener threads to terminate') |
|
| 359 |
|
| 360 with self.lock: |
|
| 361 thread_log=self.thread_log |
|
| 362 thread_res=self.thread_res |
|
| 363 |
|
| 364 if thread_log: |
|
| 365 thread_log.join() |
|
| 366 |
|
| 367 if thread_res: |
|
| 368 thread_res.join() |
|
| 369 |
|
| 370 assert(self.thread_log==None and self.thread_res==None) |
|
| 371 |
|
| 372 def __queue_timed_operation(self): |
|
| 373 with self.lock: |
|
| 374 op=self.scheduled_operation |
|
| 375 self.scheduled_operation=None |
|
| 376 self.timer=None |
|
| 377 |
|
| 378 if self.__is_running_unlocked(): |
|
| 379 logger.info('Aborted queue operation, as an operation is already running') |
|
| 380 else: |
|
| 381 # TODO: Queue on 'repository' and online status for SSH, etc. |
|
| 382 |
|
| 383 # TODO: UI comms. queue? |
|
| 384 self.__launch(op, None) |
|
| 385 |
338 |
| 386 def __next_operation_unlocked(self): |
339 def __next_operation_unlocked(self): |
| 387 # TODO: pruning as well |
340 # TODO: pruning as well |
| 388 now=time.monotonic() |
341 now=time.monotonic() |
| 389 if not self.lastrun_when: |
342 if not self.lastrun_when: |
| 409 else: |
362 else: |
| 410 return {'operation': 'create', |
363 return {'operation': 'create', |
| 411 'detail': 'normal', |
364 'detail': 'normal', |
| 412 'when_monotonic': self.lastrun_when+self.backup_interval} |
365 'when_monotonic': self.lastrun_when+self.backup_interval} |
| 413 |
366 |
| 414 def __schedule_unlocked(self): |
367 def __main_thread(self): |
| 415 if self.current_operation: |
368 with self._cond: |
| 416 return self.current_operation |
369 while not self._terminate: |
| 417 else: |
370 op=None |
| 418 op=self.__next_operation_unlocked() |
371 if not self.current_operation: |
| 419 |
372 op=self.__next_operation_unlocked() |
| 420 if op: |
373 if not op: |
| 421 now=time.monotonic() |
374 self.__update_status() |
| 422 delay=max(0, op['when_monotonic']-now) |
375 self._cond.wait() |
| 423 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
376 else: |
| 424 (op['operation'], op['detail'], self.name, delay)) |
377 now=time.monotonic() |
| 425 tmr=Timer(delay, self.__queue_timed_operation) |
378 delay=max(0, op['when_monotonic']-now) |
| 426 self.scheduled_operation=op |
379 logger.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
| 427 self.timer=tmr |
380 (op['operation'], op['detail'], self._name, delay)) |
| 428 self.state=combine_state(self.state, SCHEDULED) |
381 |
| 429 tmr.start() |
382 self.scheduled_operation=op |
| 430 |
383 self.state=combine_state(self.state, SCHEDULED) |
| 431 return op |
384 |
| 432 |
385 self.__update_status() |
| 433 def schedule(self): |
386 self.scheduler.wait_until(now+delay, self._cond, self._name) |
| 434 with self.lock: |
387 |
| 435 return self.__schedule_unlocked() |
388 if self.scheduled_operation: |
| 436 |
389 op=self.scheduled_operation |
| 437 def status(self): |
390 self.scheduled_operation=None |
| 438 with self.lock: |
391 self.__launch(op) |
| 439 res=self.__status_unlocked() |
392 |
| 440 return res[0] |
393 # Kill a running borg to cause log and result threads to terminate |
| |
394 if self.borg_instance: |
| |
395 logger.debug("Terminating a borg instance") |
| |
396 self.borg_instance.terminate() |
| |
397 |
| |
398 # Store threads to use outside lock |
| |
399 thread_log=self.thread_log |
| |
400 thread_err=self.thread_err |
| |
401 |
| |
402 logger.debug("Waiting for log and result threads to terminate") |
| |
403 |
| |
404 if thread_log: |
| |
405 thread_log.join() |
| |
406 |
| |
407 if thread_res: |
| |
408 thread_res.join() |
| |
409 |
| 441 |
410 |
| 442 def __status_unlocked(self): |
411 def __status_unlocked(self): |
| 443 callback=self.__status_update_callback |
412 callback=self.__status_update_callback |
| 444 |
413 |
| 445 if self.current_operation: |
414 if self.current_operation: |