5 import config |
5 import config |
6 import logging |
6 import logging |
7 import time |
7 import time |
8 from instance import BorgInstance |
8 from instance import BorgInstance |
9 from queue import Queue |
9 from queue import Queue |
10 from threading import Thread, Lock |
10 from threading import Thread, Lock, Timer |
11 |
11 |
12 loglevel_translation={ |
12 loglevel_translation={ |
13 'CRITICAL': logging.CRITICAL, |
13 'CRITICAL': logging.CRITICAL, |
14 'ERROR': logging.ERROR, |
14 'ERROR': logging.ERROR, |
15 'WARNING': logging.WARNING, |
15 'WARNING': logging.WARNING, |
73 self.lastrun=None |
73 self.lastrun=None |
74 self.lastrun_success=None |
74 self.lastrun_success=None |
75 self.borg_instance=None |
75 self.borg_instance=None |
76 self.current_operation=None |
76 self.current_operation=None |
77 self.thread_log=None |
77 self.thread_log=None |
78 self.thread_err=None |
78 self.thread_res=None |
|
79 self.timer=None |
|
80 self.timer_operation=None |
|
81 self.timer_time=None |
79 self.lock=Lock() |
82 self.lock=Lock() |
80 |
83 |
81 def is_running(self): |
84 def is_running(self): |
82 with self.lock: |
85 with self.lock: |
83 running=self.borg_instance or self.thread_log or self.thread_err |
86 running=self.__is_running_unlocked() |
|
87 return running |
|
88 |
|
89 def __is_running_unlocked(self): |
|
90 running=self.current_operation |
|
91 if not running: |
|
92 # Consistency check |
|
93 assert((not self.borg_instance and not self.thread_log and |
|
94 not self.thread_res)) |
84 return running |
95 return running |
85 |
96 |
86 def __block_when_running(self): |
97 def __block_when_running(self): |
87 running=self.is_running() |
98 running=self.is_running() |
88 assert(not running) |
99 assert(not running) |
122 elif t=='exception': |
133 elif t=='exception': |
123 success=False |
134 success=False |
124 elif t=='unparsed_error': |
135 elif t=='unparsed_error': |
125 success=False |
136 success=False |
126 |
137 |
127 #if (may_indicate_finished and 'finished' in status and |
|
128 # status['finished']): |
|
129 # logging.info('Borg subprocess finished succesfully') |
|
130 # success=status['finished'] |
|
131 |
|
132 logging.debug('Waiting for borg subprocess to terminate in log thread') |
138 logging.debug('Waiting for borg subprocess to terminate in log thread') |
133 |
139 |
134 self.borg_instance.wait() |
140 self.borg_instance.wait() |
135 |
141 |
136 logging.debug('Borg subprocess terminated; terminating log listener thread') |
142 logging.debug('Borg subprocess terminated; terminating log listener thread') |
137 |
143 |
138 with self.lock: |
144 with self.lock: |
139 self.thread_log=None |
145 self.thread_log=None |
140 self.__cleanup_if_both_listeners_terminated() |
146 self.__finish_and_reschedule_if_both_listeners_terminated() |
141 |
147 |
142 |
148 |
143 def __result_listener(self): |
149 def __result_listener(self): |
144 logging.debug('Result listener thread waiting for result') |
150 logging.debug('Result listener thread waiting for result') |
145 |
151 |
152 if res==None: |
158 if res==None: |
153 success=False |
159 success=False |
154 |
160 |
155 logging.debug('Waiting for borg subprocess to terminate in result thread') |
161 logging.debug('Waiting for borg subprocess to terminate in result thread') |
156 |
162 |
157 self.borg_instance.wait() |
163 success=success and self.borg_instance.wait() |
158 |
164 |
159 logging.debug('Borg subprocess terminated; terminating result listener thread') |
165 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) |
160 |
166 |
161 with self.lock: |
167 with self.lock: |
162 if self.current_operation=='create': |
168 if self.current_operation=='create': |
163 self.lastrun=self.time_started |
169 self.lastrun=self.time_started |
164 self.lastrun_success=success |
170 self.lastrun_success=success |
165 self.thread_res=None |
171 self.thread_res=None |
166 self.__cleanup_if_both_listeners_terminated() |
172 self.__finish_and_reschedule_if_both_listeners_terminated() |
167 |
173 |
168 def __cleanup_if_both_listeners_terminated(self): |
174 def __finish_and_reschedule_if_both_listeners_terminated(self): |
169 if self.thread_res==None and self.thread_log==None: |
175 if self.thread_res==None and self.thread_log==None: |
170 logging.debug('Both threads terminated') |
176 logging.debug('Both threads terminated') |
171 self.borg_instance=None |
177 self.borg_instance=None |
|
178 self.time_started=None |
172 self.current_operation=None |
179 self.current_operation=None |
173 self.time_started=None |
180 self.__schedule_unlocked() |
174 |
181 |
175 def __launch(self, queue, operation, archive_or_repository, *args): |
182 def __do_launch(self, queue, operation, archive_or_repository, *args): |
176 |
|
177 inst=BorgInstance(operation, archive_or_repository, *args) |
183 inst=BorgInstance(operation, archive_or_repository, *args) |
178 inst.launch() |
184 inst.launch() |
179 |
185 |
180 t_log=Thread(target=self.__log_listener) |
186 t_log=Thread(target=self.__log_listener) |
181 t_log.daemon=True |
187 t_log.daemon=True |
191 self.time_started=time.monotonic() |
197 self.time_started=time.monotonic() |
192 |
198 |
193 t_log.start() |
199 t_log.start() |
194 t_res.start() |
200 t_res.start() |
195 |
201 |
|
202 def __launch(self, operation, queue): |
|
203 if self.__is_running_unlocked(): |
|
204 logging.info('Cannot start %s: already running %s' |
|
205 % (operation, self.current_operation)) |
|
206 return False |
|
207 else: |
|
208 if self.timer: |
|
209 logging.debug('Unscheduling timed operation due to launch of operation') |
|
210 self.timer=None |
|
211 self.timer_operation=None |
|
212 self.timer_time=None |
|
213 |
|
214 logging.debug("Launching '%s' on '%s'" % (operation, self.name)) |
|
215 |
|
216 if operation=='create': |
|
217 archive="%s::%s%s" % (self.repository, |
|
218 self.archive_prefix, |
|
219 self.archive_template) |
|
220 |
|
221 self.__do_launch(queue, operation, archive, |
|
222 self.common_parameters+self.create_parameters, |
|
223 self.paths) |
|
224 elif operation=='prune': |
|
225 self.__do_launch(queue, 'prune', self.repository, |
|
226 ([{'prefix': self.archive_prefix}] + |
|
227 self.common_parameters + |
|
228 self.prune_parameters)) |
|
229 else: |
|
230 logging.error("Invalid operaton '%s'" % operation) |
|
231 self.__schedule_unlocked() |
|
232 |
|
233 return True |
|
234 |
196 def create(self, queue): |
235 def create(self, queue): |
197 self.__block_when_running() |
236 with self.lock: |
198 |
237 res=self.__launch('create', queue) |
199 archive="%s::%s%s" % (self.repository, |
238 return res |
200 self.archive_prefix, |
|
201 self.archive_template) |
|
202 |
|
203 self.__launch(queue, 'create', archive, |
|
204 self.common_parameters+self.create_parameters, |
|
205 self.paths) |
|
206 |
239 |
207 def prune(self, queue): |
240 def prune(self, queue): |
208 self.__block_when_running() |
241 with self.lock: |
209 self.__launch(queue, 'prune', self.repository, |
242 res=self.__launch('prune', queue) |
210 ([{'prefix': self.archive_prefix}] + |
243 return res |
211 self.common_parameters + |
|
212 self.prune_parameters)) |
|
213 |
244 |
214 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
245 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
215 def abort(self): |
246 def abort(self): |
216 with self.lock: |
247 with self.lock: |
217 if self.borg_instance: |
248 if self.borg_instance: |
239 if thread_res: |
270 if thread_res: |
240 thread_res.join() |
271 thread_res.join() |
241 |
272 |
242 assert(self.thread_log==None and self.thread_res==None) |
273 assert(self.thread_log==None and self.thread_res==None) |
243 |
274 |
244 def next_action(): |
275 def __queue_timed_operation(self): |
245 __block_when_running() |
276 with self.lock: |
246 # TODO pruning as well |
277 operation=self.timer_operation |
|
278 self.timer_operation=None |
|
279 self.timer_time=None |
|
280 self.timer=None |
|
281 |
|
282 if self.__is_running_unlocked(): |
|
283 logging.info('Aborted queue operation, as an operation is already running') |
|
284 else: |
|
285 # TODO: Queue on 'repository' and online status for SSH, etc. |
|
286 |
|
287 # TODO: UI comms. queue? |
|
288 self.__launch(operation, None) |
|
289 |
|
290 def __schedule_unlocked(self): |
|
291 if self.current_operation: |
|
292 return self.current_operation, None |
|
293 else: |
|
294 operation, when=self.__next_operation_unlocked() |
|
295 |
|
296 if operation: |
|
297 now=time.monotonic() |
|
298 delay=max(0, when-now) |
|
299 logging.info("Scheduling '%s' of '%s' in %d seconds" % |
|
300 (operation, self.name, delay)) |
|
301 tmr=Timer(delay, self.__queue_timed_operation) |
|
302 self.timer_operation=operation |
|
303 self.timer_time=when |
|
304 self.timer=tmr |
|
305 tmr.start() |
|
306 |
|
307 return operation, time |
|
308 |
|
309 def __next_operation_unlocked(self): |
|
310 # TODO: pruning as well |
247 now=time.monotonic() |
311 now=time.monotonic() |
248 if not self.lastrun: |
312 if not self.lastrun: |
249 return 'create', now+self.retry_interval |
313 return 'create', now+self.retry_interval |
250 elif not self.lastrun_success: |
314 elif not self.lastrun_success: |
251 return 'create', self.lastrun+self.retry_interval |
315 return 'create', self.lastrun+self.retry_interval |