backup.py

changeset 8
7b2d2eac6a48
parent 7
e189d4a6cb8c
child 10
76dbfb06eba0
equal deleted inserted replaced
7:e189d4a6cb8c 8:7b2d2eac6a48
5 import config 5 import config
6 import logging 6 import logging
7 import time 7 import time
8 from instance import BorgInstance 8 from instance import BorgInstance
9 from queue import Queue 9 from queue import Queue
10 from threading import Thread, Lock 10 from threading import Thread, Lock, Timer
11 11
12 loglevel_translation={ 12 loglevel_translation={
13 'CRITICAL': logging.CRITICAL, 13 'CRITICAL': logging.CRITICAL,
14 'ERROR': logging.ERROR, 14 'ERROR': logging.ERROR,
15 'WARNING': logging.WARNING, 15 'WARNING': logging.WARNING,
73 self.lastrun=None 73 self.lastrun=None
74 self.lastrun_success=None 74 self.lastrun_success=None
75 self.borg_instance=None 75 self.borg_instance=None
76 self.current_operation=None 76 self.current_operation=None
77 self.thread_log=None 77 self.thread_log=None
78 self.thread_err=None 78 self.thread_res=None
79 self.timer=None
80 self.timer_operation=None
81 self.timer_time=None
79 self.lock=Lock() 82 self.lock=Lock()
80 83
81 def is_running(self): 84 def is_running(self):
82 with self.lock: 85 with self.lock:
83 running=self.borg_instance or self.thread_log or self.thread_err 86 running=self.__is_running_unlocked()
87 return running
88
89 def __is_running_unlocked(self):
90 running=self.current_operation
91 if not running:
92 # Consistency check
93 assert((not self.borg_instance and not self.thread_log and
94 not self.thread_res))
84 return running 95 return running
85 96
86 def __block_when_running(self): 97 def __block_when_running(self):
87 running=self.is_running() 98 running=self.is_running()
88 assert(not running) 99 assert(not running)
122 elif t=='exception': 133 elif t=='exception':
123 success=False 134 success=False
124 elif t=='unparsed_error': 135 elif t=='unparsed_error':
125 success=False 136 success=False
126 137
127 #if (may_indicate_finished and 'finished' in status and
128 # status['finished']):
129 # logging.info('Borg subprocess finished succesfully')
130 # success=status['finished']
131
132 logging.debug('Waiting for borg subprocess to terminate in log thread') 138 logging.debug('Waiting for borg subprocess to terminate in log thread')
133 139
134 self.borg_instance.wait() 140 self.borg_instance.wait()
135 141
136 logging.debug('Borg subprocess terminated; terminating log listener thread') 142 logging.debug('Borg subprocess terminated; terminating log listener thread')
137 143
138 with self.lock: 144 with self.lock:
139 self.thread_log=None 145 self.thread_log=None
140 self.__cleanup_if_both_listeners_terminated() 146 self.__finish_and_reschedule_if_both_listeners_terminated()
141 147
142 148
143 def __result_listener(self): 149 def __result_listener(self):
144 logging.debug('Result listener thread waiting for result') 150 logging.debug('Result listener thread waiting for result')
145 151
152 if res==None: 158 if res==None:
153 success=False 159 success=False
154 160
155 logging.debug('Waiting for borg subprocess to terminate in result thread') 161 logging.debug('Waiting for borg subprocess to terminate in result thread')
156 162
157 self.borg_instance.wait() 163 success=success and self.borg_instance.wait()
158 164
159 logging.debug('Borg subprocess terminated; terminating result listener thread') 165 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success))
160 166
161 with self.lock: 167 with self.lock:
162 if self.current_operation=='create': 168 if self.current_operation=='create':
163 self.lastrun=self.time_started 169 self.lastrun=self.time_started
164 self.lastrun_success=success 170 self.lastrun_success=success
165 self.thread_res=None 171 self.thread_res=None
166 self.__cleanup_if_both_listeners_terminated() 172 self.__finish_and_reschedule_if_both_listeners_terminated()
167 173
168 def __cleanup_if_both_listeners_terminated(self): 174 def __finish_and_reschedule_if_both_listeners_terminated(self):
169 if self.thread_res==None and self.thread_log==None: 175 if self.thread_res==None and self.thread_log==None:
170 logging.debug('Both threads terminated') 176 logging.debug('Both threads terminated')
171 self.borg_instance=None 177 self.borg_instance=None
178 self.time_started=None
172 self.current_operation=None 179 self.current_operation=None
173 self.time_started=None 180 self.__schedule_unlocked()
174 181
175 def __launch(self, queue, operation, archive_or_repository, *args): 182 def __do_launch(self, queue, operation, archive_or_repository, *args):
176
177 inst=BorgInstance(operation, archive_or_repository, *args) 183 inst=BorgInstance(operation, archive_or_repository, *args)
178 inst.launch() 184 inst.launch()
179 185
180 t_log=Thread(target=self.__log_listener) 186 t_log=Thread(target=self.__log_listener)
181 t_log.daemon=True 187 t_log.daemon=True
191 self.time_started=time.monotonic() 197 self.time_started=time.monotonic()
192 198
193 t_log.start() 199 t_log.start()
194 t_res.start() 200 t_res.start()
195 201
202 def __launch(self, operation, queue):
203 if self.__is_running_unlocked():
204 logging.info('Cannot start %s: already running %s'
205 % (operation, self.current_operation))
206 return False
207 else:
208 if self.timer:
209 logging.debug('Unscheduling timed operation due to launch of operation')
210 self.timer=None
211 self.timer_operation=None
212 self.timer_time=None
213
214 logging.debug("Launching '%s' on '%s'" % (operation, self.name))
215
216 if operation=='create':
217 archive="%s::%s%s" % (self.repository,
218 self.archive_prefix,
219 self.archive_template)
220
221 self.__do_launch(queue, operation, archive,
222 self.common_parameters+self.create_parameters,
223 self.paths)
224 elif operation=='prune':
225 self.__do_launch(queue, 'prune', self.repository,
226 ([{'prefix': self.archive_prefix}] +
227 self.common_parameters +
228 self.prune_parameters))
229 else:
230 logging.error("Invalid operaton '%s'" % operation)
231 self.__schedule_unlocked()
232
233 return True
234
196 def create(self, queue): 235 def create(self, queue):
197 self.__block_when_running() 236 with self.lock:
198 237 res=self.__launch('create', queue)
199 archive="%s::%s%s" % (self.repository, 238 return res
200 self.archive_prefix,
201 self.archive_template)
202
203 self.__launch(queue, 'create', archive,
204 self.common_parameters+self.create_parameters,
205 self.paths)
206 239
207 def prune(self, queue): 240 def prune(self, queue):
208 self.__block_when_running() 241 with self.lock:
209 self.__launch(queue, 'prune', self.repository, 242 res=self.__launch('prune', queue)
210 ([{'prefix': self.archive_prefix}] + 243 return res
211 self.common_parameters +
212 self.prune_parameters))
213 244
214 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages 245 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
215 def abort(self): 246 def abort(self):
216 with self.lock: 247 with self.lock:
217 if self.borg_instance: 248 if self.borg_instance:
225 if thread_res: 256 if thread_res:
226 thread_res.terminate() 257 thread_res.terminate()
227 258
228 259
229 def join(self): 260 def join(self):
230 logging.debug('Waiting for borg listener thread to terminate') 261 logging.debug('Waiting for borg listener threads to terminate')
231 262
232 with self.lock: 263 with self.lock:
233 thread_log=self.thread_log 264 thread_log=self.thread_log
234 thread_res=self.thread_res 265 thread_res=self.thread_res
235 266
239 if thread_res: 270 if thread_res:
240 thread_res.join() 271 thread_res.join()
241 272
242 assert(self.thread_log==None and self.thread_res==None) 273 assert(self.thread_log==None and self.thread_res==None)
243 274
244 def next_action(): 275 def __queue_timed_operation(self):
245 __block_when_running() 276 with self.lock:
246 # TODO pruning as well 277 operation=self.timer_operation
278 self.timer_operation=None
279 self.timer_time=None
280 self.timer=None
281
282 if self.__is_running_unlocked():
283 logging.info('Aborted queue operation, as an operation is already running')
284 else:
285 # TODO: Queue on 'repository' and online status for SSH, etc.
286
287 # TODO: UI comms. queue?
288 self.__launch(operation, None)
289
290 def __schedule_unlocked(self):
291 if self.current_operation:
292 return self.current_operation, None
293 else:
294 operation, when=self.__next_operation_unlocked()
295
296 if operation:
297 now=time.monotonic()
298 delay=max(0, when-now)
299 logging.info("Scheduling '%s' of '%s' in %d seconds" %
300 (operation, self.name, delay))
301 tmr=Timer(delay, self.__queue_timed_operation)
302 self.timer_operation=operation
303 self.timer_time=when
304 self.timer=tmr
305 tmr.start()
306
307 return operation, time
308
309 def __next_operation_unlocked(self):
310 # TODO: pruning as well
247 now=time.monotonic() 311 now=time.monotonic()
248 if not self.lastrun: 312 if not self.lastrun:
249 return 'create', now+self.retry_interval 313 return 'create', now+self.retry_interval
250 elif not self.lastrun_success: 314 elif not self.lastrun_success:
251 return 'create', self.lastrun+self.retry_interval 315 return 'create', self.lastrun+self.retry_interval
253 if self.backup_interval==0: 317 if self.backup_interval==0:
254 return 'none', 0 318 return 'none', 0
255 else: 319 else:
256 return 'create', self.lastrun+self.backup_interval 320 return 'create', self.lastrun+self.backup_interval
257 321
258 322 def schedule(self):
323 with self.lock:
324 return self.__schedule_unlocked()
325
326

mercurial