|      5 import config | 
     5 import config | 
|      6 import logging | 
     6 import logging | 
|      7 import time | 
     7 import time | 
|      8 from instance import BorgInstance | 
     8 from instance import BorgInstance | 
|      9 from queue import Queue | 
     9 from queue import Queue | 
|     10 from threading import Thread, Lock | 
    10 from threading import Thread, Lock, Timer | 
|     11  | 
    11  | 
|     12 loglevel_translation={ | 
    12 loglevel_translation={ | 
|     13     'CRITICAL': logging.CRITICAL, | 
    13     'CRITICAL': logging.CRITICAL, | 
|     14     'ERROR': logging.ERROR, | 
    14     'ERROR': logging.ERROR, | 
|     15     'WARNING': logging.WARNING, | 
    15     'WARNING': logging.WARNING, | 
|     73         self.lastrun=None | 
    73         self.lastrun=None | 
|     74         self.lastrun_success=None | 
    74         self.lastrun_success=None | 
|     75         self.borg_instance=None | 
    75         self.borg_instance=None | 
|     76         self.current_operation=None | 
    76         self.current_operation=None | 
|     77         self.thread_log=None | 
    77         self.thread_log=None | 
|     78         self.thread_err=None | 
    78         self.thread_res=None | 
|         | 
    79         self.timer=None | 
|         | 
    80         self.timer_operation=None | 
|         | 
    81         self.timer_time=None | 
|     79         self.lock=Lock() | 
    82         self.lock=Lock() | 
|     80  | 
    83  | 
|     81     def is_running(self): | 
    84     def is_running(self): | 
|     82         with self.lock: | 
    85         with self.lock: | 
|     83             running=self.borg_instance or self.thread_log or self.thread_err | 
    86             running=self.__is_running_unlocked() | 
|         | 
    87         return running | 
|         | 
    88  | 
|         | 
    89     def __is_running_unlocked(self): | 
|         | 
    90         running=self.current_operation | 
|         | 
    91         if not running: | 
|         | 
    92             # Consistency check | 
|         | 
    93             assert((not self.borg_instance and not self.thread_log and | 
|         | 
    94                     not self.thread_res)) | 
|     84         return running | 
    95         return running | 
|     85  | 
    96  | 
|     86     def __block_when_running(self): | 
    97     def __block_when_running(self): | 
|     87         running=self.is_running() | 
    98         running=self.is_running() | 
|     88         assert(not running) | 
    99         assert(not running) | 
|    122             elif t=='exception': | 
   133             elif t=='exception': | 
|    123                 success=False | 
   134                 success=False | 
|    124             elif t=='unparsed_error': | 
   135             elif t=='unparsed_error': | 
|    125                 success=False | 
   136                 success=False | 
|    126  | 
   137  | 
|    127             #if (may_indicate_finished and 'finished' in status and | 
        | 
|    128             #    status['finished']): | 
        | 
|    129             #    logging.info('Borg subprocess finished succesfully') | 
        | 
|    130             #    success=status['finished'] | 
        | 
|    131  | 
        | 
|    132         logging.debug('Waiting for borg subprocess to terminate in log thread') | 
   138         logging.debug('Waiting for borg subprocess to terminate in log thread') | 
|    133  | 
   139  | 
|    134         self.borg_instance.wait() | 
   140         self.borg_instance.wait() | 
|    135  | 
   141  | 
|    136         logging.debug('Borg subprocess terminated; terminating log listener thread') | 
   142         logging.debug('Borg subprocess terminated; terminating log listener thread') | 
|    137  | 
   143  | 
|    138         with self.lock: | 
   144         with self.lock: | 
|    139             self.thread_log=None | 
   145             self.thread_log=None | 
|    140             self.__cleanup_if_both_listeners_terminated() | 
   146             self.__finish_and_reschedule_if_both_listeners_terminated() | 
|    141  | 
   147  | 
|    142  | 
   148  | 
|    143     def __result_listener(self): | 
   149     def __result_listener(self): | 
|    144         logging.debug('Result listener thread waiting for result') | 
   150         logging.debug('Result listener thread waiting for result') | 
|    145  | 
   151  | 
|    152         if res==None: | 
   158         if res==None: | 
|    153             success=False | 
   159             success=False | 
|    154  | 
   160  | 
|    155         logging.debug('Waiting for borg subprocess to terminate in result thread') | 
   161         logging.debug('Waiting for borg subprocess to terminate in result thread') | 
|    156  | 
   162  | 
|    157         self.borg_instance.wait() | 
   163         success=success and self.borg_instance.wait() | 
|    158  | 
   164  | 
|    159         logging.debug('Borg subprocess terminated; terminating result listener thread') | 
   165         logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) | 
|    160  | 
   166  | 
|    161         with self.lock: | 
   167         with self.lock: | 
|    162             if self.current_operation=='create': | 
   168             if self.current_operation=='create': | 
|    163                 self.lastrun=self.time_started | 
   169                 self.lastrun=self.time_started | 
|    164                 self.lastrun_success=success | 
   170                 self.lastrun_success=success | 
|    165             self.thread_res=None | 
   171             self.thread_res=None | 
|    166             self.__cleanup_if_both_listeners_terminated() | 
   172             self.__finish_and_reschedule_if_both_listeners_terminated() | 
|    167  | 
   173  | 
|    168     def __cleanup_if_both_listeners_terminated(self): | 
   174     def __finish_and_reschedule_if_both_listeners_terminated(self): | 
|    169         if self.thread_res==None and self.thread_log==None: | 
   175         if self.thread_res==None and self.thread_log==None: | 
|    170             logging.debug('Both threads terminated') | 
   176             logging.debug('Both threads terminated') | 
|    171             self.borg_instance=None | 
   177             self.borg_instance=None | 
|         | 
   178             self.time_started=None | 
|    172             self.current_operation=None | 
   179             self.current_operation=None | 
|    173             self.time_started=None | 
   180             self.__schedule_unlocked() | 
|    174  | 
   181  | 
|    175     def __launch(self, queue, operation, archive_or_repository, *args): | 
   182     def __do_launch(self, queue, operation, archive_or_repository, *args): | 
|    176  | 
        | 
|    177         inst=BorgInstance(operation, archive_or_repository, *args) | 
   183         inst=BorgInstance(operation, archive_or_repository, *args) | 
|    178         inst.launch() | 
   184         inst.launch() | 
|    179  | 
   185  | 
|    180         t_log=Thread(target=self.__log_listener) | 
   186         t_log=Thread(target=self.__log_listener) | 
|    181         t_log.daemon=True | 
   187         t_log.daemon=True | 
|    191         self.time_started=time.monotonic() | 
   197         self.time_started=time.monotonic() | 
|    192  | 
   198  | 
|    193         t_log.start() | 
   199         t_log.start() | 
|    194         t_res.start() | 
   200         t_res.start() | 
|    195  | 
   201  | 
|         | 
   202     def __launch(self, operation, queue): | 
|         | 
   203         if self.__is_running_unlocked(): | 
|         | 
   204             logging.info('Cannot start %s: already running %s' | 
|         | 
   205                          % (operation, self.current_operation)) | 
|         | 
   206             return False | 
|         | 
   207         else: | 
|         | 
   208             if self.timer: | 
|         | 
   209                 logging.debug('Unscheduling timed operation due to launch of operation') | 
|         | 
   210                 self.timer=None | 
|         | 
   211                 self.timer_operation=None | 
|         | 
   212                 self.timer_time=None | 
|         | 
   213  | 
|         | 
   214             logging.debug("Launching '%s' on '%s'" % (operation, self.name)) | 
|         | 
   215  | 
|         | 
   216             if operation=='create': | 
|         | 
   217                 archive="%s::%s%s" % (self.repository, | 
|         | 
   218                                       self.archive_prefix, | 
|         | 
   219                                       self.archive_template) | 
|         | 
   220  | 
|         | 
   221                 self.__do_launch(queue, operation, archive, | 
|         | 
   222                                  self.common_parameters+self.create_parameters, | 
|         | 
   223                                  self.paths) | 
|         | 
   224             elif operation=='prune': | 
|         | 
   225                 self.__do_launch(queue, 'prune', self.repository, | 
|         | 
   226                                  ([{'prefix': self.archive_prefix}] +  | 
|         | 
   227                                   self.common_parameters + | 
|         | 
   228                                   self.prune_parameters)) | 
|         | 
   229             else: | 
|         | 
   230                 logging.error("Invalid operaton '%s'" % operation) | 
|         | 
   231                 self.__schedule_unlocked() | 
|         | 
   232  | 
|         | 
   233             return True | 
|         | 
   234  | 
|    196     def create(self, queue): | 
   235     def create(self, queue): | 
|    197         self.__block_when_running() | 
   236         with self.lock: | 
|    198  | 
   237             res=self.__launch('create', queue) | 
|    199         archive="%s::%s%s" % (self.repository, | 
   238         return res | 
|    200                               self.archive_prefix, | 
        | 
|    201                               self.archive_template) | 
        | 
|    202  | 
        | 
|    203         self.__launch(queue, 'create', archive, | 
        | 
|    204                       self.common_parameters+self.create_parameters, | 
        | 
|    205                       self.paths) | 
        | 
|    206  | 
   239  | 
|    207     def prune(self, queue): | 
   240     def prune(self, queue): | 
|    208         self.__block_when_running() | 
   241         with self.lock: | 
|    209         self.__launch(queue, 'prune', self.repository, | 
   242             res=self.__launch('prune', queue) | 
|    210                       ([{'prefix': self.archive_prefix}] +  | 
   243         return res | 
|    211                        self.common_parameters + | 
        | 
|    212                        self.prune_parameters)) | 
        | 
|    213  | 
   244  | 
|    214     # TODO: Decide exact (manual) abort mechanism. Perhaps two stages | 
   245     # TODO: Decide exact (manual) abort mechanism. Perhaps two stages | 
|    215     def abort(self): | 
   246     def abort(self): | 
|    216         with self.lock: | 
   247         with self.lock: | 
|    217             if self.borg_instance: | 
   248             if self.borg_instance: | 
|    239         if thread_res: | 
   270         if thread_res: | 
|    240             thread_res.join() | 
   271             thread_res.join() | 
|    241  | 
   272  | 
|    242         assert(self.thread_log==None and self.thread_res==None) | 
   273         assert(self.thread_log==None and self.thread_res==None) | 
|    243  | 
   274  | 
|    244     def next_action(): | 
   275     def __queue_timed_operation(self): | 
|    245         __block_when_running() | 
   276         with self.lock: | 
|    246         # TODO pruning as well | 
   277             operation=self.timer_operation | 
|         | 
   278             self.timer_operation=None | 
|         | 
   279             self.timer_time=None | 
|         | 
   280             self.timer=None | 
|         | 
   281  | 
|         | 
   282             if self.__is_running_unlocked(): | 
|         | 
   283                 logging.info('Aborted queue operation, as an operation is already running') | 
|         | 
   284             else: | 
|         | 
   285                 # TODO: Queue on 'repository' and online status for SSH, etc. | 
|         | 
   286  | 
|         | 
   287                 # TODO: UI comms. queue? | 
|         | 
   288                 self.__launch(operation, None) | 
|         | 
   289  | 
|         | 
   290     def __schedule_unlocked(self): | 
|         | 
   291         if self.current_operation: | 
|         | 
   292             return self.current_operation, None | 
|         | 
   293         else: | 
|         | 
   294             operation, when=self.__next_operation_unlocked() | 
|         | 
   295  | 
|         | 
   296             if operation: | 
|         | 
   297                 now=time.monotonic() | 
|         | 
   298                 delay=max(0, when-now) | 
|         | 
   299                 logging.info("Scheduling '%s' of '%s' in %d seconds" % | 
|         | 
   300                              (operation, self.name, delay)) | 
|         | 
   301                 tmr=Timer(delay, self.__queue_timed_operation) | 
|         | 
   302                 self.timer_operation=operation | 
|         | 
   303                 self.timer_time=when | 
|         | 
   304                 self.timer=tmr | 
|         | 
   305                 tmr.start() | 
|         | 
   306  | 
|         | 
   307             return operation, time | 
|         | 
   308  | 
|         | 
   309     def __next_operation_unlocked(self): | 
|         | 
   310         # TODO: pruning as well | 
|    247         now=time.monotonic() | 
   311         now=time.monotonic() | 
|    248         if not self.lastrun: | 
   312         if not self.lastrun: | 
|    249             return 'create', now+self.retry_interval | 
   313             return 'create', now+self.retry_interval | 
|    250         elif not self.lastrun_success: | 
   314         elif not self.lastrun_success: | 
|    251             return 'create', self.lastrun+self.retry_interval | 
   315             return 'create', self.lastrun+self.retry_interval |