backup.py

changeset 10
76dbfb06eba0
parent 8
7b2d2eac6a48
child 12
16a8c63344c0
equal deleted inserted replaced
9:aa121291eb0e 10:76dbfb06eba0
75 self.borg_instance=None 75 self.borg_instance=None
76 self.current_operation=None 76 self.current_operation=None
77 self.thread_log=None 77 self.thread_log=None
78 self.thread_res=None 78 self.thread_res=None
79 self.timer=None 79 self.timer=None
80 self.timer_operation=None 80 self.scheduled_operation=None
81 self.timer_time=None
82 self.lock=Lock() 81 self.lock=Lock()
82 self.status_update_callback=None
83 83
84 def is_running(self): 84 def is_running(self):
85 with self.lock: 85 with self.lock:
86 running=self.__is_running_unlocked() 86 running=self.__is_running_unlocked()
87 return running 87 return running
145 self.thread_log=None 145 self.thread_log=None
146 self.__finish_and_reschedule_if_both_listeners_terminated() 146 self.__finish_and_reschedule_if_both_listeners_terminated()
147 147
148 148
149 def __result_listener(self): 149 def __result_listener(self):
150 with self.lock:
151 status, callback=self.__status_unlocked()
152 if callback:
153 callback(self, status)
154
150 logging.debug('Result listener thread waiting for result') 155 logging.debug('Result listener thread waiting for result')
151 156
152 res=self.borg_instance.read_result() 157 res=self.borg_instance.read_result()
153 158
154 success=True 159 success=True
163 success=success and self.borg_instance.wait() 168 success=success and self.borg_instance.wait()
164 169
165 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) 170 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success))
166 171
167 with self.lock: 172 with self.lock:
168 if self.current_operation=='create': 173 if self.current_operation['operation']=='create':
169 self.lastrun=self.time_started 174 self.lastrun=self.current_operation['when_monotonic']
170 self.lastrun_success=success 175 self.lastrun_success=success
171 self.thread_res=None 176 self.thread_res=None
172 self.__finish_and_reschedule_if_both_listeners_terminated() 177 self.__finish_and_reschedule_if_both_listeners_terminated()
178 status, callback=self.__status_unlocked()
179 if callback:
180 callback(self, status)
173 181
174 def __finish_and_reschedule_if_both_listeners_terminated(self): 182 def __finish_and_reschedule_if_both_listeners_terminated(self):
175 if self.thread_res==None and self.thread_log==None: 183 if self.thread_res==None and self.thread_log==None:
176 logging.debug('Both threads terminated') 184 logging.debug('Both threads terminated')
177 self.borg_instance=None 185 self.borg_instance=None
178 self.time_started=None
179 self.current_operation=None 186 self.current_operation=None
180 self.__schedule_unlocked() 187 self.__schedule_unlocked()
181 188
182 def __do_launch(self, queue, operation, archive_or_repository, *args): 189 def __do_launch(self, queue, op, archive_or_repository, *args):
183 inst=BorgInstance(operation, archive_or_repository, *args) 190 inst=BorgInstance(op['operation'], archive_or_repository, *args)
184 inst.launch() 191 inst.launch()
185 192
186 t_log=Thread(target=self.__log_listener) 193 t_log=Thread(target=self.__log_listener)
187 t_log.daemon=True 194 t_log.daemon=True
188 195
191 198
192 self.thread_log=t_log 199 self.thread_log=t_log
193 self.thread_res=t_res 200 self.thread_res=t_res
194 self.borg_instance=inst 201 self.borg_instance=inst
195 self.queue=queue 202 self.queue=queue
196 self.current_operation=operation 203 self.current_operation=op
197 self.time_started=time.monotonic() 204 self.current_operation['when_monotonic']=time.monotonic()
198 205
199 t_log.start() 206 t_log.start()
200 t_res.start() 207 t_res.start()
201 208
202 def __launch(self, operation, queue): 209 def __launch(self, op, queue):
203 if self.__is_running_unlocked(): 210 if self.__is_running_unlocked():
204 logging.info('Cannot start %s: already running %s' 211 logging.info('Cannot start %s: already running %s'
205 % (operation, self.current_operation)) 212 % (operation, self.current_operation))
206 return False 213 return False
207 else: 214 else:
208 if self.timer: 215 if self.timer:
209 logging.debug('Unscheduling timed operation due to launch of operation') 216 logging.debug('Unscheduling timed operation due to launch of operation')
210 self.timer=None 217 self.timer=None
211 self.timer_operation=None 218 self.scheduled_operation=None
212 self.timer_time=None 219
213 220 logging.debug("Launching '%s' on '%s'" % (op['operation'], self.name))
214 logging.debug("Launching '%s' on '%s'" % (operation, self.name)) 221
215 222 if op['operation']=='create':
216 if operation=='create':
217 archive="%s::%s%s" % (self.repository, 223 archive="%s::%s%s" % (self.repository,
218 self.archive_prefix, 224 self.archive_prefix,
219 self.archive_template) 225 self.archive_template)
220 226
221 self.__do_launch(queue, operation, archive, 227 self.__do_launch(queue, op, archive,
222 self.common_parameters+self.create_parameters, 228 self.common_parameters+self.create_parameters,
223 self.paths) 229 self.paths)
224 elif operation=='prune': 230 elif op['operation']=='prune':
225 self.__do_launch(queue, 'prune', self.repository, 231 self.__do_launch(queue, op, self.repository,
226 ([{'prefix': self.archive_prefix}] + 232 ([{'prefix': self.archive_prefix}] +
227 self.common_parameters + 233 self.common_parameters +
228 self.prune_parameters)) 234 self.prune_parameters))
229 else: 235 else:
230 logging.error("Invalid operaton '%s'" % operation) 236 logging.error("Invalid operaton '%s'" % op['operation'])
231 self.__schedule_unlocked() 237 self.__schedule_unlocked()
232 238
233 return True 239 return True
234 240
235 def create(self, queue): 241 def create(self, queue):
236 with self.lock: 242 op={'operation': 'create', 'detail': 'manual'}
237 res=self.__launch('create', queue) 243 with self.lock:
244 res=self.__launch(op, queue)
238 return res 245 return res
239 246
240 def prune(self, queue): 247 def prune(self, queue):
241 with self.lock: 248 op={'operation': 'prune', 'detail': 'manual'}
242 res=self.__launch('prune', queue) 249 with self.lock:
250 res=self.__launch(op, queue)
243 return res 251 return res
244 252
245 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages 253 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
246 def abort(self): 254 def abort(self):
247 with self.lock: 255 with self.lock:
272 280
273 assert(self.thread_log==None and self.thread_res==None) 281 assert(self.thread_log==None and self.thread_res==None)
274 282
275 def __queue_timed_operation(self): 283 def __queue_timed_operation(self):
276 with self.lock: 284 with self.lock:
277 operation=self.timer_operation 285 op=self.scheduled_operation
278 self.timer_operation=None 286 self.scheduled_operation=None
279 self.timer_time=None
280 self.timer=None 287 self.timer=None
281 288
282 if self.__is_running_unlocked(): 289 if self.__is_running_unlocked():
283 logging.info('Aborted queue operation, as an operation is already running') 290 logging.info('Aborted queue operation, as an operation is already running')
284 else: 291 else:
285 # TODO: Queue on 'repository' and online status for SSH, etc. 292 # TODO: Queue on 'repository' and online status for SSH, etc.
286 293
287 # TODO: UI comms. queue? 294 # TODO: UI comms. queue?
288 self.__launch(operation, None) 295 self.__launch(op, None)
289
290 def __schedule_unlocked(self):
291 if self.current_operation:
292 return self.current_operation, None
293 else:
294 operation, when=self.__next_operation_unlocked()
295
296 if operation:
297 now=time.monotonic()
298 delay=max(0, when-now)
299 logging.info("Scheduling '%s' of '%s' in %d seconds" %
300 (operation, self.name, delay))
301 tmr=Timer(delay, self.__queue_timed_operation)
302 self.timer_operation=operation
303 self.timer_time=when
304 self.timer=tmr
305 tmr.start()
306
307 return operation, time
308 296
309 def __next_operation_unlocked(self): 297 def __next_operation_unlocked(self):
310 # TODO: pruning as well 298 # TODO: pruning as well
311 now=time.monotonic() 299 now=time.monotonic()
312 if not self.lastrun: 300 if not self.lastrun:
313 return 'create', now+self.retry_interval 301 initial_interval=self.retry_interval
302 if initial_interval==0:
303 initial_interval=self.backup_interval
304 if initial_interval==0:
305 return None
306 else:
307 return {'operation': 'create',
308 'detail': 'initial',
309 'when_monotonic': now+initial_interval}
314 elif not self.lastrun_success: 310 elif not self.lastrun_success:
315 return 'create', self.lastrun+self.retry_interval 311 if self.retry_interval==0:
312 return None
313 else:
314 return {'operation': 'create',
315 'detail': 'retry',
316 'when_monotonic': self.lastrun+self.retry_interval}
316 else: 317 else:
317 if self.backup_interval==0: 318 if self.backup_interval==0:
318 return 'none', 0 319 return None
319 else: 320 else:
320 return 'create', self.lastrun+self.backup_interval 321 return {'operation': 'create',
322 'detail': None,
323 'when_monotonic': self.lastrun+self.backup_interval}
324
325 def __schedule_unlocked(self):
326 if self.current_operation:
327 return self.current_operation
328 else:
329 op=self.__next_operation_unlocked()
330
331 if op:
332 now=time.monotonic()
333 delay=max(0, op['when_monotonic']-now)
334 logging.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" %
335 (op['operation'], op['detail'], self.name, delay))
336 tmr=Timer(delay, self.__queue_timed_operation)
337 self.scheduled_operation=op
338 self.timer=tmr
339 tmr.start()
340
341 return op
321 342
322 def schedule(self): 343 def schedule(self):
323 with self.lock: 344 with self.lock:
324 return self.__schedule_unlocked() 345 return self.__schedule_unlocked()
325 346
326 347 def set_status_update_callback(self, callback):
348 with self.lock:
349 self.status_update_callback=callback
350
351 def status(self):
352 with self.lock:
353 res=self.__status_unlocked()
354 return res[0]
355
356 def __status_unlocked(self):
357 callback=self.status_update_callback
358 if self.current_operation:
359 status=self.current_operation
360 status['type']='current'
361 elif self.scheduled_operation:
362 status=self.scheduled_operation
363 status['type']='scheduled'
364 else:
365 status={'type': 'nothing'}
366
367 status['name']=self.name
368
369 if 'when_monotonic' in status:
370 status['when']=(status['when_monotonic']
371 -time.monotonic()+time.time())
372
373 return status, callback
374
375
376

mercurial