5 import config |
5 import config |
6 import logging |
6 import logging |
7 import time |
7 import time |
8 import borgend |
8 import borgend |
9 import repository |
9 import repository |
|
10 import sleep |
10 from enum import IntEnum |
11 from enum import IntEnum |
11 from instance import BorgInstance |
12 from instance import BorgInstance |
12 from threading import Thread, Lock, Condition |
13 from threading import Thread, Lock, Condition |
13 from scheduler import TerminableThread |
14 from scheduler import TerminableThread |
14 |
15 |
51 } |
52 } |
52 |
53 |
53 class Operation: |
54 class Operation: |
54 CREATE='create' |
55 CREATE='create' |
55 PRUNE='prune' |
56 PRUNE='prune' |
56 def __init__(self, operation, when_monotonic, **kwargs): |
57 def __init__(self, operation, time, **kwargs): |
57 self.operation=operation |
58 self.operation=operation |
58 self.when_monotonic=when_monotonic |
59 self.time=time |
59 self.detail=kwargs |
60 self.detail=kwargs |
60 |
61 |
61 def when(self): |
62 def when(self): |
62 return self.when_monotonic-time.monotonic()+time.time() |
63 return self.time.realtime() |
63 |
64 |
64 |
65 |
65 class Status(Operation): |
66 class Status(Operation): |
66 def __init__(self, backup, op=None): |
67 def __init__(self, backup, op=None): |
67 if op: |
68 if op: |
68 super().__init__(op.operation, op.when_monotonic, |
69 super().__init__(op.operation, op.time, **op.detail) |
69 **op.detail) |
|
70 else: |
70 else: |
71 super().__init__(None, None) |
71 super().__init__(None, None) |
72 |
72 |
73 self.name=backup.name |
73 self.name=backup.name |
74 self.state=backup.state |
74 self.state=backup.state |
135 |
135 |
136 self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', |
136 self.retry_interval=config.check_nonneg_int(cfg, 'retry_interval', |
137 'Retry interval', loc, |
137 'Retry interval', loc, |
138 config.defaults['retry_interval']) |
138 config.defaults['retry_interval']) |
139 |
139 |
|
140 |
|
141 scheduling=config.check_string(cfg, 'scheduling', |
|
142 'Scheduling mode', loc, |
|
143 default="dreamtime") |
|
144 |
|
145 if scheduling=="dreamtime": |
|
146 self.timeclass=sleep.DreamTime |
|
147 elif scheduling=="realtime": |
|
148 self.timeclass=sleep.MonotonicTime |
|
149 elif scheduling=="manual": |
|
150 self.backup_interval=0 |
|
151 else: |
|
152 logging.error("Invalid time class '%s' for %s" % (scheduling, loc)) |
|
153 |
140 self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) |
154 self.paths=config.check_nonempty_list_of_strings(cfg, 'paths', 'Paths', loc) |
141 |
155 |
142 self.borg_parameters=config.BorgParameters.from_config(cfg, loc) |
156 self.borg_parameters=config.BorgParameters.from_config(cfg, loc) |
143 |
157 |
144 |
158 |
156 self.scheduled_operation=None |
170 self.scheduled_operation=None |
157 self.lastrun_when=None |
171 self.lastrun_when=None |
158 self.lastrun_finished=None |
172 self.lastrun_finished=None |
159 self.state=State.INACTIVE |
173 self.state=State.INACTIVE |
160 self.errors=Errors.OK |
174 self.errors=Errors.OK |
|
175 self.timeclass=sleep.DreamTime |
161 |
176 |
162 self.__decode_config(cfg) |
177 self.__decode_config(cfg) |
163 |
178 |
164 super().__init__(target = self.__main_thread, name = self.backup_name) |
179 super().__init__(target = self.__main_thread, name = self.backup_name) |
165 self.daemon=True |
180 self.daemon=True |
283 self.thread_res=t_res |
298 self.thread_res=t_res |
284 self.borg_instance=inst |
299 self.borg_instance=inst |
285 self.current_operation=op |
300 self.current_operation=op |
286 # Update scheduled time to real starting time to schedule |
301 # Update scheduled time to real starting time to schedule |
287 # next run relative to this |
302 # next run relative to this |
288 self.current_operation.when_monotonic=time.monotonic() |
303 self.current_operation.time=sleep.MonotonicTime.now() |
289 self.state=State.ACTIVE |
304 self.state=State.ACTIVE |
290 # Reset error status when starting a new operation |
305 # Reset error status when starting a new operation |
291 self.errors=Errors.OK |
306 self.errors=Errors.OK |
292 self.__update_status() |
307 self.__update_status() |
293 |
308 |
352 if not self.borg_instance.wait(): |
367 if not self.borg_instance.wait(): |
353 self.logger.error('Borg subprocess did not terminate') |
368 self.logger.error('Borg subprocess did not terminate') |
354 self.errors=self.errors.combine(Errors.ERRORS) |
369 self.errors=self.errors.combine(Errors.ERRORS) |
355 |
370 |
356 if self.current_operation.operation=='create': |
371 if self.current_operation.operation=='create': |
357 self.lastrun_when=self.current_operation.when_monotonic |
372 self.lastrun_when=self.current_operation.time.monotonic() |
358 self.lastrun_finished=time.monotonic() |
373 self.lastrun_finished=time.monotonic() |
359 self.thread_res=None |
374 self.thread_res=None |
360 self.thread_log=None |
375 self.thread_log=None |
361 self.borg_instance=None |
376 self.borg_instance=None |
362 self.current_operation=None |
377 self.current_operation=None |
402 def __main_thread_wait_schedule(self): |
417 def __main_thread_wait_schedule(self): |
403 op=None |
418 op=None |
404 if not self.scheduled_operation: |
419 if not self.scheduled_operation: |
405 op=self.__next_operation_unlocked() |
420 op=self.__next_operation_unlocked() |
406 if op: |
421 if op: |
407 now=time.monotonic() |
422 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds [%s]" % |
408 delay=max(0, op.when_monotonic-now) |
423 (str(op.operation), op.detail or 'none', |
409 self.logger.info("Scheduling '%s' (detail: %s) in %d seconds" % |
424 op.time.seconds_to(), |
410 (str(op.operation), op.detail or 'none', delay)) |
425 op.time.__class__.__name__)) |
411 |
426 |
412 self.scheduled_operation=op |
427 self.scheduled_operation=op |
413 self.state=State.SCHEDULED |
428 self.state=State.SCHEDULED |
414 self.__update_status() |
429 self.__update_status() |
415 |
430 |
416 # Wait under scheduled wait |
431 # Wait under scheduled wait |
417 self.scheduler.wait_until(now+delay, self._cond, self.backup_name) |
432 self.scheduler.wait_until(op.time, self._cond, self.backup_name) |
418 else: |
433 else: |
419 # Nothing scheduled - just wait |
434 # Nothing scheduled - just wait |
420 self.logger.info("Waiting for manual scheduling") |
435 self.logger.info("Waiting for manual scheduling") |
421 |
436 |
422 self.state=State.INACTIVE |
437 self.state=State.INACTIVE |
442 self.state=State.INACTIVE |
457 self.state=State.INACTIVE |
443 self.__update_status() |
458 self.__update_status() |
444 |
459 |
445 def __next_operation_unlocked(self): |
460 def __next_operation_unlocked(self): |
446 # TODO: pruning as well |
461 # TODO: pruning as well |
447 now=time.monotonic() |
|
448 if not self.lastrun_finished: |
462 if not self.lastrun_finished: |
449 initial_interval=self.retry_interval |
463 initial_interval=self.retry_interval |
450 if initial_interval==0: |
464 if initial_interval==0: |
451 initial_interval=self.backup_interval |
465 initial_interval=self.backup_interval |
452 if initial_interval==0: |
466 if initial_interval==0: |
453 return None |
467 return None |
454 else: |
468 else: |
455 return Operation(Operation.CREATE, now+initial_interval, |
469 tm=self.timeclass.after(initial_interval) |
456 reason='initial') |
470 return Operation(Operation.CREATE, tm, reason='initial') |
457 elif not self.errors.ok(): |
471 elif not self.errors.ok(): |
458 if self.retry_interval==0: |
472 if self.retry_interval==0: |
459 return None |
473 return None |
460 else: |
474 else: |
461 return Operation(Operation.CREATE, |
475 tm=sleep.MonotonicTime(self.lastrun_finished+self.retry_interval) |
462 self.lastrun_finished+self.retry_interval, |
476 return Operation(Operation.CREATE, tm, reason='retry') |
463 reason='retry') |
|
464 else: |
477 else: |
465 if self.backup_interval==0: |
478 if self.backup_interval==0: |
466 return None |
479 return None |
467 else: |
480 else: |
468 return Operation(Operation.CREATE, |
481 tm=self.timeclass.from_monotonic(self.lastrun_when+self.backup_interval) |
469 self.lastrun_when+self.backup_interval) |
482 return Operation(Operation.CREATE, tm) |
470 |
483 |
471 def __status_unlocked(self): |
484 def __status_unlocked(self): |
472 callback=self.__status_update_callback |
485 callback=self.__status_update_callback |
473 |
486 |
474 if self.current_operation: |
487 if self.current_operation: |
503 with self._cond: |
516 with self._cond: |
504 res=self.__status_unlocked() |
517 res=self.__status_unlocked() |
505 return res[0] |
518 return res[0] |
506 |
519 |
507 def create(self): |
520 def create(self): |
508 op=Operation(Operation.CREATE, time.monotonic(), reason='manual') |
521 op=Operation(Operation.CREATE, sleep.MonotonicTime.now(), reason='manual') |
509 with self._cond: |
522 with self._cond: |
510 self.scheduled_operation=op |
523 self.scheduled_operation=op |
511 self._cond.notify() |
524 self._cond.notify() |
512 |
525 |
513 def prune(self): |
526 def prune(self): |
514 op=Operation(Operation.PRUNE, time.monotonic(), reason='manual') |
527 op=Operation(Operation.PRUNE, sleep.MonotonicTime.now(), reason='manual') |
515 with self._cond: |
528 with self._cond: |
516 self.scheduled_operation=op |
529 self.scheduled_operation=op |
517 self._cond.notify() |
530 self._cond.notify() |
518 |
531 |
519 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
532 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |