| 163 success=success and self.borg_instance.wait() |
168 success=success and self.borg_instance.wait() |
| 164 |
169 |
| 165 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) |
170 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) |
| 166 |
171 |
| 167 with self.lock: |
172 with self.lock: |
| 168 if self.current_operation=='create': |
173 if self.current_operation['operation']=='create': |
| 169 self.lastrun=self.time_started |
174 self.lastrun=self.current_operation['when_monotonic'] |
| 170 self.lastrun_success=success |
175 self.lastrun_success=success |
| 171 self.thread_res=None |
176 self.thread_res=None |
| 172 self.__finish_and_reschedule_if_both_listeners_terminated() |
177 self.__finish_and_reschedule_if_both_listeners_terminated() |
| |
178 status, callback=self.__status_unlocked() |
| |
179 if callback: |
| |
180 callback(self, status) |
| 173 |
181 |
| 174 def __finish_and_reschedule_if_both_listeners_terminated(self): |
182 def __finish_and_reschedule_if_both_listeners_terminated(self): |
| 175 if self.thread_res==None and self.thread_log==None: |
183 if self.thread_res==None and self.thread_log==None: |
| 176 logging.debug('Both threads terminated') |
184 logging.debug('Both threads terminated') |
| 177 self.borg_instance=None |
185 self.borg_instance=None |
| 178 self.time_started=None |
|
| 179 self.current_operation=None |
186 self.current_operation=None |
| 180 self.__schedule_unlocked() |
187 self.__schedule_unlocked() |
| 181 |
188 |
| 182 def __do_launch(self, queue, operation, archive_or_repository, *args): |
189 def __do_launch(self, queue, op, archive_or_repository, *args): |
| 183 inst=BorgInstance(operation, archive_or_repository, *args) |
190 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
| 184 inst.launch() |
191 inst.launch() |
| 185 |
192 |
| 186 t_log=Thread(target=self.__log_listener) |
193 t_log=Thread(target=self.__log_listener) |
| 187 t_log.daemon=True |
194 t_log.daemon=True |
| 188 |
195 |
| 191 |
198 |
| 192 self.thread_log=t_log |
199 self.thread_log=t_log |
| 193 self.thread_res=t_res |
200 self.thread_res=t_res |
| 194 self.borg_instance=inst |
201 self.borg_instance=inst |
| 195 self.queue=queue |
202 self.queue=queue |
| 196 self.current_operation=operation |
203 self.current_operation=op |
| 197 self.time_started=time.monotonic() |
204 self.current_operation['when_monotonic']=time.monotonic() |
| 198 |
205 |
| 199 t_log.start() |
206 t_log.start() |
| 200 t_res.start() |
207 t_res.start() |
| 201 |
208 |
| 202 def __launch(self, operation, queue): |
209 def __launch(self, op, queue): |
| 203 if self.__is_running_unlocked(): |
210 if self.__is_running_unlocked(): |
| 204 logging.info('Cannot start %s: already running %s' |
211 logging.info('Cannot start %s: already running %s' |
| 205 % (operation, self.current_operation)) |
212 % (operation, self.current_operation)) |
| 206 return False |
213 return False |
| 207 else: |
214 else: |
| 208 if self.timer: |
215 if self.timer: |
| 209 logging.debug('Unscheduling timed operation due to launch of operation') |
216 logging.debug('Unscheduling timed operation due to launch of operation') |
| 210 self.timer=None |
217 self.timer=None |
| 211 self.timer_operation=None |
218 self.scheduled_operation=None |
| 212 self.timer_time=None |
219 |
| 213 |
220 logging.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) |
| 214 logging.debug("Launching '%s' on '%s'" % (operation, self.name)) |
221 |
| 215 |
222 if op['operation']=='create': |
| 216 if operation=='create': |
|
| 217 archive="%s::%s%s" % (self.repository, |
223 archive="%s::%s%s" % (self.repository, |
| 218 self.archive_prefix, |
224 self.archive_prefix, |
| 219 self.archive_template) |
225 self.archive_template) |
| 220 |
226 |
| 221 self.__do_launch(queue, operation, archive, |
227 self.__do_launch(queue, op, archive, |
| 222 self.common_parameters+self.create_parameters, |
228 self.common_parameters+self.create_parameters, |
| 223 self.paths) |
229 self.paths) |
| 224 elif operation=='prune': |
230 elif op['operation']=='prune': |
| 225 self.__do_launch(queue, 'prune', self.repository, |
231 self.__do_launch(queue, op, self.repository, |
| 226 ([{'prefix': self.archive_prefix}] + |
232 ([{'prefix': self.archive_prefix}] + |
| 227 self.common_parameters + |
233 self.common_parameters + |
| 228 self.prune_parameters)) |
234 self.prune_parameters)) |
| 229 else: |
235 else: |
| 230 logging.error("Invalid operaton '%s'" % operation) |
236 logging.error("Invalid operaton '%s'" % op['operation']) |
| 231 self.__schedule_unlocked() |
237 self.__schedule_unlocked() |
| 232 |
238 |
| 233 return True |
239 return True |
| 234 |
240 |
| 235 def create(self, queue): |
241 def create(self, queue): |
| 236 with self.lock: |
242 op={'operation': 'create', 'detail': 'manual'} |
| 237 res=self.__launch('create', queue) |
243 with self.lock: |
| |
244 res=self.__launch(op, queue) |
| 238 return res |
245 return res |
| 239 |
246 |
| 240 def prune(self, queue): |
247 def prune(self, queue): |
| 241 with self.lock: |
248 op={'operation': 'prune', 'detail': 'manual'} |
| 242 res=self.__launch('prune', queue) |
249 with self.lock: |
| |
250 res=self.__launch(op, queue) |
| 243 return res |
251 return res |
| 244 |
252 |
| 245 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
253 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
| 246 def abort(self): |
254 def abort(self): |
| 247 with self.lock: |
255 with self.lock: |
| 272 |
280 |
| 273 assert(self.thread_log==None and self.thread_res==None) |
281 assert(self.thread_log==None and self.thread_res==None) |
| 274 |
282 |
| 275 def __queue_timed_operation(self): |
283 def __queue_timed_operation(self): |
| 276 with self.lock: |
284 with self.lock: |
| 277 operation=self.timer_operation |
285 op=self.scheduled_operation |
| 278 self.timer_operation=None |
286 self.scheduled_operation=None |
| 279 self.timer_time=None |
|
| 280 self.timer=None |
287 self.timer=None |
| 281 |
288 |
| 282 if self.__is_running_unlocked(): |
289 if self.__is_running_unlocked(): |
| 283 logging.info('Aborted queue operation, as an operation is already running') |
290 logging.info('Aborted queue operation, as an operation is already running') |
| 284 else: |
291 else: |
| 285 # TODO: Queue on 'repository' and online status for SSH, etc. |
292 # TODO: Queue on 'repository' and online status for SSH, etc. |
| 286 |
293 |
| 287 # TODO: UI comms. queue? |
294 # TODO: UI comms. queue? |
| 288 self.__launch(operation, None) |
295 self.__launch(op, None) |
| 289 |
|
| 290 def __schedule_unlocked(self): |
|
| 291 if self.current_operation: |
|
| 292 return self.current_operation, None |
|
| 293 else: |
|
| 294 operation, when=self.__next_operation_unlocked() |
|
| 295 |
|
| 296 if operation: |
|
| 297 now=time.monotonic() |
|
| 298 delay=max(0, when-now) |
|
| 299 logging.info("Scheduling '%s' of '%s' in %d seconds" % |
|
| 300 (operation, self.name, delay)) |
|
| 301 tmr=Timer(delay, self.__queue_timed_operation) |
|
| 302 self.timer_operation=operation |
|
| 303 self.timer_time=when |
|
| 304 self.timer=tmr |
|
| 305 tmr.start() |
|
| 306 |
|
| 307 return operation, time |
|
| 308 |
296 |
| 309 def __next_operation_unlocked(self): |
297 def __next_operation_unlocked(self): |
| 310 # TODO: pruning as well |
298 # TODO: pruning as well |
| 311 now=time.monotonic() |
299 now=time.monotonic() |
| 312 if not self.lastrun: |
300 if not self.lastrun: |
| 313 return 'create', now+self.retry_interval |
301 initial_interval=self.retry_interval |
| |
302 if initial_interval==0: |
| |
303 initial_interval=self.backup_interval |
| |
304 if initial_interval==0: |
| |
305 return None |
| |
306 else: |
| |
307 return {'operation': 'create', |
| |
308 'detail': 'initial', |
| |
309 'when_monotonic': now+initial_interval} |
| 314 elif not self.lastrun_success: |
310 elif not self.lastrun_success: |
| 315 return 'create', self.lastrun+self.retry_interval |
311 if self.retry_interval==0: |
| |
312 return None |
| |
313 else: |
| |
314 return {'operation': 'create', |
| |
315 'detail': 'retry', |
| |
316 'when_monotonic': self.lastrun+self.retry_interval} |
| 316 else: |
317 else: |
| 317 if self.backup_interval==0: |
318 if self.backup_interval==0: |
| 318 return 'none', 0 |
319 return None |
| 319 else: |
320 else: |
| 320 return 'create', self.lastrun+self.backup_interval |
321 return {'operation': 'create', |
| |
322 'detail': None, |
| |
323 'when_monotonic': self.lastrun+self.backup_interval} |
| |
324 |
| |
325 def __schedule_unlocked(self): |
| |
326 if self.current_operation: |
| |
327 return self.current_operation |
| |
328 else: |
| |
329 op=self.__next_operation_unlocked() |
| |
330 |
| |
331 if op: |
| |
332 now=time.monotonic() |
| |
333 delay=max(0, op['when_monotonic']-now) |
| |
334 logging.info("Scheduling '%s' (detail: %s) of '%s' in %d seconds" % |
| |
335 (op['operation'], op['detail'], self.name, delay)) |
| |
336 tmr=Timer(delay, self.__queue_timed_operation) |
| |
337 self.scheduled_operation=op |
| |
338 self.timer=tmr |
| |
339 tmr.start() |
| |
340 |
| |
341 return op |
| 321 |
342 |
| 322 def schedule(self): |
343 def schedule(self): |
| 323 with self.lock: |
344 with self.lock: |
| 324 return self.__schedule_unlocked() |
345 return self.__schedule_unlocked() |
| 325 |
346 |
| 326 |
347 def set_status_update_callback(self, callback): |
| |
348 with self.lock: |
| |
349 self.status_update_callback=callback |
| |
350 |
| |
351 def status(self): |
| |
352 with self.lock: |
| |
353 res=self.__status_unlocked() |
| |
354 return res[0] |
| |
355 |
| |
356 def __status_unlocked(self): |
| |
357 callback=self.status_update_callback |
| |
358 if self.current_operation: |
| |
359 status=self.current_operation |
| |
360 status['type']='current' |
| |
361 elif self.scheduled_operation: |
| |
362 status=self.scheduled_operation |
| |
363 status['type']='scheduled' |
| |
364 else: |
| |
365 status={'type': 'nothing'} |
| |
366 |
| |
367 status['name']=self.name |
| |
368 |
| |
369 if 'when_monotonic' in status: |
| |
370 status['when']=(status['when_monotonic'] |
| |
371 -time.monotonic()+time.time()) |
| |
372 |
| |
373 return status, callback |
| |
374 |
| |
375 |
| |
376 |