| 7 import time |
7 import time |
| 8 import keyring |
8 import keyring |
| 9 from instance import BorgInstance |
9 from instance import BorgInstance |
| 10 from queue import Queue |
10 from queue import Queue |
| 11 from threading import Thread, Lock, Timer |
11 from threading import Thread, Lock, Timer |
| |
12 |
| |
13 logger=logging.getLogger(__name__) |
| 12 |
14 |
| 13 loglevel_translation={ |
15 loglevel_translation={ |
| 14 'CRITICAL': logging.CRITICAL, |
16 'CRITICAL': logging.CRITICAL, |
| 15 'ERROR': logging.ERROR, |
17 'ERROR': logging.ERROR, |
| 16 'WARNING': logging.WARNING, |
18 'WARNING': logging.WARNING, |
| 120 def __block_when_running(self): |
122 def __block_when_running(self): |
| 121 running=self.is_running() |
123 running=self.is_running() |
| 122 assert(not running) |
124 assert(not running) |
| 123 |
125 |
| 124 def __log_listener(self): |
126 def __log_listener(self): |
| 125 logging.debug('Log listener thread waiting for entries') |
127 logger.debug('Log listener thread waiting for entries') |
| 126 success=True |
128 success=True |
| 127 for status in iter(self.borg_instance.read_log, None): |
129 for status in iter(self.borg_instance.read_log, None): |
| 128 logging.debug(str(status)) |
130 logger.debug(str(status)) |
| 129 t=status['type'] |
131 t=status['type'] |
| 130 |
132 |
| 131 errors_this_message=None |
133 errors_this_message=None |
| 132 callback=None |
134 callback=None |
| 133 |
135 |
| 163 if 'message' not in status: |
165 if 'message' not in status: |
| 164 status['message']='UNKNOWN' |
166 status['message']='UNKNOWN' |
| 165 if 'name' not in status: |
167 if 'name' not in status: |
| 166 status['name']='borg' |
168 status['name']='borg' |
| 167 lvl=translate_loglevel(status['levelname']) |
169 lvl=translate_loglevel(status['levelname']) |
| 168 logging.log(lvl, status['name'] + ': ' + status['message']) |
170 logger.log(lvl, status['name'] + ': ' + status['message']) |
| 169 if lvl>=logging.WARNING: |
171 if lvl>=logging.WARNING: |
| 170 errors_this_message=status |
172 errors_this_message=status |
| 171 with self.lock: |
173 with self.lock: |
| 172 self.current_operation['errors']=True |
174 self.current_operation['errors']=True |
| 173 status, callback=self.__status_unlocked() |
175 status, callback=self.__status_unlocked() |
| 174 else: |
176 else: |
| 175 logging.debug('Unrecognised log entry %s' % str(status)) |
177 logger.debug('Unrecognised log entry %s' % str(status)) |
| 176 |
178 |
| 177 if callback: |
179 if callback: |
| 178 callback(self, status, errors=errors_this_message) |
180 callback(self, status, errors=errors_this_message) |
| 179 |
181 |
| 180 logging.debug('Waiting for borg subprocess to terminate in log thread') |
182 logger.debug('Waiting for borg subprocess to terminate in log thread') |
| 181 |
183 |
| 182 self.borg_instance.wait() |
184 self.borg_instance.wait() |
| 183 |
185 |
| 184 logging.debug('Borg subprocess terminated; terminating log listener thread') |
186 logger.debug('Borg subprocess terminated; terminating log listener thread') |
| 185 |
187 |
| 186 def __result_listener(self): |
188 def __result_listener(self): |
| 187 with self.lock: |
189 with self.lock: |
| 188 status, callback=self.__status_unlocked() |
190 status, callback=self.__status_unlocked() |
| 189 if callback: |
191 if callback: |
| 190 callback(self, status) |
192 callback(self, status) |
| 191 |
193 |
| 192 logging.debug('Result listener thread waiting for result') |
194 logger.debug('Result listener thread waiting for result') |
| 193 |
195 |
| 194 res=self.borg_instance.read_result() |
196 res=self.borg_instance.read_result() |
| 195 |
197 |
| 196 success=True |
198 success=True |
| 197 |
199 |
| 198 logging.debug('Borg result: %s' % str(res)) |
200 logger.debug('Borg result: %s' % str(res)) |
| 199 |
201 |
| 200 if res==None: |
202 if res==None: |
| 201 success=False |
203 success=False |
| 202 |
204 |
| 203 logging.debug('Waiting for borg subprocess to terminate in result thread') |
205 logger.debug('Waiting for borg subprocess to terminate in result thread') |
| 204 |
206 |
| 205 success=success and self.borg_instance.wait() |
207 success=success and self.borg_instance.wait() |
| 206 |
208 |
| 207 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) |
209 logger.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) |
| 208 |
210 |
| 209 self.thread_log.join() |
211 self.thread_log.join() |
| 210 |
212 |
| 211 with self.lock: |
213 with self.lock: |
| 212 if self.current_operation['operation']=='create': |
214 if self.current_operation['operation']=='create': |
| 225 self.extract_passphrase() |
227 self.extract_passphrase() |
| 226 |
228 |
| 227 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
229 inst=BorgInstance(op['operation'], archive_or_repository, *args) |
| 228 inst.launch(passphrase=self.__passphrase) |
230 inst.launch(passphrase=self.__passphrase) |
| 229 |
231 |
| |
232 logger.debug('Creating listener threads') |
| |
233 |
| 230 t_log=Thread(target=self.__log_listener) |
234 t_log=Thread(target=self.__log_listener) |
| 231 t_log.daemon=True |
235 t_log.daemon=True |
| 232 |
236 |
| 233 t_res=Thread(target=self.__result_listener) |
237 t_res=Thread(target=self.__result_listener) |
| 234 t_res.daemon=True |
238 t_res.daemon=True |
| 248 logging.info('Cannot start %s: already running %s' |
252 logging.info('Cannot start %s: already running %s' |
| 249 % (operation, self.current_operation)) |
253 % (operation, self.current_operation)) |
| 250 return False |
254 return False |
| 251 else: |
255 else: |
| 252 if self.timer: |
256 if self.timer: |
| 253 logging.debug('Unscheduling timed operation due to launch of operation') |
257 logger.debug('Unscheduling timed operation due to launch of operation') |
| 254 self.timer=None |
258 self.timer=None |
| 255 self.scheduled_operation=None |
259 self.scheduled_operation=None |
| 256 |
260 |
| 257 logging.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) |
261 logger.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) |
| 258 |
262 |
| 259 if op['operation']=='create': |
263 if op['operation']=='create': |
| 260 archive="%s::%s%s" % (self.repository, |
264 archive="%s::%s%s" % (self.repository, |
| 261 self.archive_prefix, |
265 self.archive_prefix, |
| 262 self.archive_template) |
266 self.archive_template) |
| 268 self.__do_launch(queue, op, self.repository, |
272 self.__do_launch(queue, op, self.repository, |
| 269 ([{'prefix': self.archive_prefix}] + |
273 ([{'prefix': self.archive_prefix}] + |
| 270 self.common_parameters + |
274 self.common_parameters + |
| 271 self.prune_parameters)) |
275 self.prune_parameters)) |
| 272 else: |
276 else: |
| 273 logging.error("Invalid operaton '%s'" % op['operation']) |
277 logger.error("Invalid operaton '%s'" % op['operation']) |
| 274 self.__schedule_unlocked() |
278 self.__schedule_unlocked() |
| 275 |
279 |
| 276 return True |
280 return True |
| 277 |
281 |
| 278 def create(self, queue): |
282 def create(self, queue): |