backup.py

changeset 31
b4b4bb7a2ec5
parent 30
3dd525652dc8
child 32
06fc14211ba9
equal deleted inserted replaced
30:3dd525652dc8 31:b4b4bb7a2ec5
7 import time 7 import time
8 import keyring 8 import keyring
9 from instance import BorgInstance 9 from instance import BorgInstance
10 from queue import Queue 10 from queue import Queue
11 from threading import Thread, Lock, Timer 11 from threading import Thread, Lock, Timer
12
13 logger=logging.getLogger(__name__)
12 14
13 loglevel_translation={ 15 loglevel_translation={
14 'CRITICAL': logging.CRITICAL, 16 'CRITICAL': logging.CRITICAL,
15 'ERROR': logging.ERROR, 17 'ERROR': logging.ERROR,
16 'WARNING': logging.WARNING, 18 'WARNING': logging.WARNING,
120 def __block_when_running(self): 122 def __block_when_running(self):
121 running=self.is_running() 123 running=self.is_running()
122 assert(not running) 124 assert(not running)
123 125
124 def __log_listener(self): 126 def __log_listener(self):
125 logging.debug('Log listener thread waiting for entries') 127 logger.debug('Log listener thread waiting for entries')
126 success=True 128 success=True
127 for status in iter(self.borg_instance.read_log, None): 129 for status in iter(self.borg_instance.read_log, None):
128 logging.debug(str(status)) 130 logger.debug(str(status))
129 t=status['type'] 131 t=status['type']
130 132
131 errors_this_message=None 133 errors_this_message=None
132 callback=None 134 callback=None
133 135
163 if 'message' not in status: 165 if 'message' not in status:
164 status['message']='UNKNOWN' 166 status['message']='UNKNOWN'
165 if 'name' not in status: 167 if 'name' not in status:
166 status['name']='borg' 168 status['name']='borg'
167 lvl=translate_loglevel(status['levelname']) 169 lvl=translate_loglevel(status['levelname'])
168 logging.log(lvl, status['name'] + ': ' + status['message']) 170 logger.log(lvl, status['name'] + ': ' + status['message'])
169 if lvl>=logging.WARNING: 171 if lvl>=logging.WARNING:
170 errors_this_message=status 172 errors_this_message=status
171 with self.lock: 173 with self.lock:
172 self.current_operation['errors']=True 174 self.current_operation['errors']=True
173 status, callback=self.__status_unlocked() 175 status, callback=self.__status_unlocked()
174 else: 176 else:
175 logging.debug('Unrecognised log entry %s' % str(status)) 177 logger.debug('Unrecognised log entry %s' % str(status))
176 178
177 if callback: 179 if callback:
178 callback(self, status, errors=errors_this_message) 180 callback(self, status, errors=errors_this_message)
179 181
180 logging.debug('Waiting for borg subprocess to terminate in log thread') 182 logger.debug('Waiting for borg subprocess to terminate in log thread')
181 183
182 self.borg_instance.wait() 184 self.borg_instance.wait()
183 185
184 logging.debug('Borg subprocess terminated; terminating log listener thread') 186 logger.debug('Borg subprocess terminated; terminating log listener thread')
185 187
186 def __result_listener(self): 188 def __result_listener(self):
187 with self.lock: 189 with self.lock:
188 status, callback=self.__status_unlocked() 190 status, callback=self.__status_unlocked()
189 if callback: 191 if callback:
190 callback(self, status) 192 callback(self, status)
191 193
192 logging.debug('Result listener thread waiting for result') 194 logger.debug('Result listener thread waiting for result')
193 195
194 res=self.borg_instance.read_result() 196 res=self.borg_instance.read_result()
195 197
196 success=True 198 success=True
197 199
198 logging.debug('Borg result: %s' % str(res)) 200 logger.debug('Borg result: %s' % str(res))
199 201
200 if res==None: 202 if res==None:
201 success=False 203 success=False
202 204
203 logging.debug('Waiting for borg subprocess to terminate in result thread') 205 logger.debug('Waiting for borg subprocess to terminate in result thread')
204 206
205 success=success and self.borg_instance.wait() 207 success=success and self.borg_instance.wait()
206 208
207 logging.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success)) 209 logger.debug('Borg subprocess terminated (success: %s); terminating result listener thread' % str(success))
208 210
209 self.thread_log.join() 211 self.thread_log.join()
210 212
211 with self.lock: 213 with self.lock:
212 if self.current_operation['operation']=='create': 214 if self.current_operation['operation']=='create':
225 self.extract_passphrase() 227 self.extract_passphrase()
226 228
227 inst=BorgInstance(op['operation'], archive_or_repository, *args) 229 inst=BorgInstance(op['operation'], archive_or_repository, *args)
228 inst.launch(passphrase=self.__passphrase) 230 inst.launch(passphrase=self.__passphrase)
229 231
232 logger.debug('Creating listener threads')
233
230 t_log=Thread(target=self.__log_listener) 234 t_log=Thread(target=self.__log_listener)
231 t_log.daemon=True 235 t_log.daemon=True
232 236
233 t_res=Thread(target=self.__result_listener) 237 t_res=Thread(target=self.__result_listener)
234 t_res.daemon=True 238 t_res.daemon=True
248 logging.info('Cannot start %s: already running %s' 252 logging.info('Cannot start %s: already running %s'
249 % (operation, self.current_operation)) 253 % (operation, self.current_operation))
250 return False 254 return False
251 else: 255 else:
252 if self.timer: 256 if self.timer:
253 logging.debug('Unscheduling timed operation due to launch of operation') 257 logger.debug('Unscheduling timed operation due to launch of operation')
254 self.timer=None 258 self.timer=None
255 self.scheduled_operation=None 259 self.scheduled_operation=None
256 260
257 logging.debug("Launching '%s' on '%s'" % (op['operation'], self.name)) 261 logger.debug("Launching '%s' on '%s'" % (op['operation'], self.name))
258 262
259 if op['operation']=='create': 263 if op['operation']=='create':
260 archive="%s::%s%s" % (self.repository, 264 archive="%s::%s%s" % (self.repository,
261 self.archive_prefix, 265 self.archive_prefix,
262 self.archive_template) 266 self.archive_template)
268 self.__do_launch(queue, op, self.repository, 272 self.__do_launch(queue, op, self.repository,
269 ([{'prefix': self.archive_prefix}] + 273 ([{'prefix': self.archive_prefix}] +
270 self.common_parameters + 274 self.common_parameters +
271 self.prune_parameters)) 275 self.prune_parameters))
272 else: 276 else:
273 logging.error("Invalid operaton '%s'" % op['operation']) 277 logger.error("Invalid operaton '%s'" % op['operation'])
274 self.__schedule_unlocked() 278 self.__schedule_unlocked()
275 279
276 return True 280 return True
277 281
278 def create(self, queue): 282 def create(self, queue):
301 if thread_res: 305 if thread_res:
302 thread_res.terminate() 306 thread_res.terminate()
303 307
304 308
305 def join(self): 309 def join(self):
306 logging.debug('Waiting for borg listener threads to terminate') 310 logger.debug('Waiting for borg listener threads to terminate')
307 311
308 with self.lock: 312 with self.lock:
309 thread_log=self.thread_log 313 thread_log=self.thread_log
310 thread_res=self.thread_res 314 thread_res=self.thread_res
311 315

mercurial