7 import time |
7 import time |
8 import keyring |
8 import keyring |
9 from instance import BorgInstance |
9 from instance import BorgInstance |
10 from queue import Queue |
10 from queue import Queue |
11 from threading import Thread, Lock, Timer |
11 from threading import Thread, Lock, Timer |
|
12 |
|
13 logger=logging.getLogger(__name__) |
12 |
14 |
13 loglevel_translation={ |
15 loglevel_translation={ |
14 'CRITICAL': logging.CRITICAL, |
16 'CRITICAL': logging.CRITICAL, |
15 'ERROR': logging.ERROR, |
17 'ERROR': logging.ERROR, |
16 'WARNING': logging.WARNING, |
18 'WARNING': logging.WARNING, |
120 def __block_when_running(self): |
122 def __block_when_running(self): |
121 running=self.is_running() |
123 running=self.is_running() |
122 assert(not running) |
124 assert(not running) |
123 |
125 |
124 def __log_listener(self): |
126 def __log_listener(self): |
125 logging.debug('Log listener thread waiting for entries') |
127 logger.debug('Log listener thread waiting for entries') |
126 success=True |
128 success=True |
127 for status in iter(self.borg_instance.read_log, None): |
129 for status in iter(self.borg_instance.read_log, None): |
128 logging.debug(str(status)) |
130 logger.debug(str(status)) |
129 t=status['type'] |
131 t=status['type'] |
130 |
132 |
131 errors_this_message=None |
133 errors_this_message=None |
132 callback=None |
134 callback=None |
133 |
135 |
163 if 'message' not in status: |
165 if 'message' not in status: |
164 status['message']='UNKNOWN' |
166 status['message']='UNKNOWN' |
165 if 'name' not in status: |
167 if 'name' not in status: |
166 status['name']='borg' |
168 status['name']='borg' |
167 lvl=translate_loglevel(status['levelname']) |
169 lvl=translate_loglevel(status['levelname']) |
168 logging.log(lvl, status['name'] + ': ' + status['message']) |
170 logger.log(lvl, status['name'] + ': ' + status['message']) |
169 if lvl>=logging.WARNING: |
171 if lvl>=logging.WARNING: |
170 errors_this_message=status |
172 errors_this_message=status |
171 with self.lock: |
173 with self.lock: |
172 self.current_operation['errors']=True |
174 self.current_operation['errors']=True |
173 status, callback=self.__status_unlocked() |
175 status, callback=self.__status_unlocked() |
174 else: |
176 else: |
175 logging.debug('Unrecognised log entry %s' % str(status)) |
177 logger.debug('Unrecognised log entry %s' % str(status)) |
176 |
178 |
177 if callback: |
179 if callback: |
178 callback(self, status, errors=errors_this_message) |
180 callback(self, status, errors=errors_this_message) |
179 |
181 |
180 logging.debug('Waiting for borg subprocess to terminate in log thread') |
182 logger.debug('Waiting for borg subprocess to terminate in log thread') |
181 |
183 |
182 self.borg_instance.wait() |
184 self.borg_instance.wait() |
183 |
185 |
184 logging.debug('Borg subprocess terminated; terminating log listener thread') |
186 logger.debug('Borg subprocess terminated; terminating log listener thread') |
185 |
187 |
186 def __result_listener(self): |
188 def __result_listener(self): |
187 with self.lock: |
189 with self.lock: |
188 status, callback=self.__status_unlocked() |
190 status, callback=self.__status_unlocked() |
189 if callback: |
191 if callback: |
190 callback(self, status) |
192 callback(self, status) |
191 |
193 |
192 logging.debug('Result listener thread waiting for result') |
194 logger.debug('Result listener thread waiting for result') |
193 |
195 |
194 res=self.borg_instance.read_result() |
196 res=self.borg_instance.read_result() |
195 |
197 |
196 success=True |
198 success=True |
197 |
199 |
198 logging.debug('Borg result: %s' % str(res)) |
200 logger.debug('Borg result: %s' % str(res)) |
199 |
201 |
200 if res==None: |
202 if res==None: |
201 success=False |
203 success=False |
202 |
204 |
203 logging.debug('Waiting for borg subprocess to terminate in result thread') |
205 logger.debug('Waiting for borg subprocess to terminate in result thread') |
204 |
206 |
205 success=success and self.borg_instance.wait() |
207 success=success and self.borg_instance.wait() |
206 |
208 |
207 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) |
209 logger.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) |
208 |
210 |
209 self.thread_log.join() |
211 self.thread_log.join() |
210 |
212 |
211 with self.lock: |
213 with self.lock: |
212 if self.current_operation['operation']=='create': |
214 if self.current_operation['operation']=='create': |
225 self.extract_passphrase() |
227 self.extract_passphrase() |
226 |
228 |
227 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
229 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
228 inst.launch(passphrase=self.__passphrase) |
230 inst.launch(passphrase=self.__passphrase) |
229 |
231 |
|
232 logger.debug('Creating listener threads') |
|
233 |
230 t_log=Thread(target=self.__log_listener) |
234 t_log=Thread(target=self.__log_listener) |
231 t_log.daemon=True |
235 t_log.daemon=True |
232 |
236 |
233 t_res=Thread(target=self.__result_listener) |
237 t_res=Thread(target=self.__result_listener) |
234 t_res.daemon=True |
238 t_res.daemon=True |
248 logging.info('Cannot start %s: already running %s' |
252 logging.info('Cannot start %s: already running %s' |
249 % (operation, self.current_operation)) |
253 % (operation, self.current_operation)) |
250 return False |
254 return False |
251 else: |
255 else: |
252 if self.timer: |
256 if self.timer: |
253 logging.debug('Unscheduling timed operation due to launch of operation') |
257 logger.debug('Unscheduling timed operation due to launch of operation') |
254 self.timer=None |
258 self.timer=None |
255 self.scheduled_operation=None |
259 self.scheduled_operation=None |
256 |
260 |
257 logging.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) |
261 logger.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) |
258 |
262 |
259 if op['operation']=='create': |
263 if op['operation']=='create': |
260 archive="%s::%s%s" % (self.repository, |
264 archive="%s::%s%s" % (self.repository, |
261 self.archive_prefix, |
265 self.archive_prefix, |
262 self.archive_template) |
266 self.archive_template) |
268 self.__do_launch(queue, op, self.repository, |
272 self.__do_launch(queue, op, self.repository, |
269 ([{'prefix': self.archive_prefix}] + |
273 ([{'prefix': self.archive_prefix}] + |
270 self.common_parameters + |
274 self.common_parameters + |
271 self.prune_parameters)) |
275 self.prune_parameters)) |
272 else: |
276 else: |
273 logging.error("Invalid operaton '%s'" % op['operation']) |
277 logger.error("Invalid operaton '%s'" % op['operation']) |
274 self.__schedule_unlocked() |
278 self.__schedule_unlocked() |
275 |
279 |
276 return True |
280 return True |
277 |
281 |
278 def create(self, queue): |
282 def create(self, queue): |