backup.py

changeset 7
e189d4a6cb8c
parent 6
46c89e5a219f
child 8
7b2d2eac6a48
equal deleted inserted replaced
6:46c89e5a219f 7:e189d4a6cb8c
72 self.config=config 72 self.config=config
73 self.lastrun=None 73 self.lastrun=None
74 self.lastrun_success=None 74 self.lastrun_success=None
75 self.borg_instance=None 75 self.borg_instance=None
76 self.current_operation=None 76 self.current_operation=None
77 self.thread=None 77 self.thread_log=None
78 self.thread_err=None
78 self.lock=Lock() 79 self.lock=Lock()
79 80
81 def is_running(self):
82 with self.lock:
83 running=self.borg_instance or self.thread_log or self.thread_err
84 return running
85
80 def __block_when_running(self): 86 def __block_when_running(self):
81 with self.lock: 87 running=self.is_running()
82 not_running=self.borg_instance is None and self.thread is None 88 assert(not running)
83 assert(not_running) 89
84 90 def __log_listener(self):
85 def __listener(self): 91 logging.debug('Log listener thread waiting for entries')
86 success=False 92 success=True
87 for status in iter(self.borg_instance.read, None): 93 for status in iter(self.borg_instance.read_log, None):
88 logging.info(str(status)) 94 logging.debug(str(status))
89 t=status['type'] 95 t=status['type']
96 #may_indicate_finished=False
90 if t=='progress_percent': 97 if t=='progress_percent':
91 pass 98 #may_indicate_finished=True
99 # Temporary output
100 if 'current' not in status:
101 status['current']=0
102 if 'total' not in status:
103 status['total']=0
104 print('%d / %d' % (status['current'], status['total']))
92 elif t=='archive_progress': 105 elif t=='archive_progress':
93 pass 106 pass
94 elif t=='progress_message': 107 elif t=='progress_message':
95 if 'finished' in status: 108 #may_indicate_finished=True
96 logging.info('Borg subprocess finished succesfully')
97 success=status['finished']
98 elif t=='progress_percent':
99 # Temporary output
100 print('%d / %d', status['current'], status['total'])
101 pass 109 pass
102 elif t=='file_status': 110 elif t=='file_status':
103 pass 111 pass
104 elif t=='log_message': 112 elif t=='log_message':
105 if 'levelname' not in status: 113 if 'levelname' not in status:
108 status['message']='UNKNOWN' 116 status['message']='UNKNOWN'
109 if 'name' not in status: 117 if 'name' not in status:
110 status['name']='borg' 118 status['name']='borg'
111 logging.log(translate_loglevel(status['levelname']), 119 logging.log(translate_loglevel(status['levelname']),
112 status['name'] + ': ' + status['message']) 120 status['name'] + ': ' + status['message'])
121 # set success=False?
113 elif t=='exception': 122 elif t=='exception':
114 pass 123 success=False
115 elif t=='unparsed_error': 124 elif t=='unparsed_error':
116 pass 125 success=False
117 126
118 logging.info('Waiting for borg subprocess to terminate') 127 #if (may_indicate_finished and 'finished' in status and
128 # status['finished']):
129 # logging.info('Borg subprocess finished succesfully')
130 # success=status['finished']
131
132 logging.debug('Waiting for borg subprocess to terminate in log thread')
119 133
120 self.borg_instance.wait() 134 self.borg_instance.wait()
121 135
122 logging.info('Borg subprocess terminated; terminating listener thread') 136 logging.debug('Borg subprocess terminated; terminating log listener thread')
137
138 with self.lock:
139 self.thread_log=None
140 self.__cleanup_if_both_listeners_terminated()
141
142
143 def __result_listener(self):
144 logging.debug('Result listener thread waiting for result')
145
146 res=self.borg_instance.read_result()
147
148 success=True
149
150 logging.debug('Borg result: %s' % str(res))
151
152 if res==None:
153 success=False
154
155 logging.debug('Waiting for borg subprocess to terminate in result thread')
156
157 self.borg_instance.wait()
158
159 logging.debug('Borg subprocess terminated; terminating result listener thread')
123 160
124 with self.lock: 161 with self.lock:
125 if self.current_operation=='create': 162 if self.current_operation=='create':
126 self.lastrun=self.time_started 163 self.lastrun=self.time_started
127 self.lastrun_success=success 164 self.lastrun_success=success
165 self.thread_res=None
166 self.__cleanup_if_both_listeners_terminated()
167
168 def __cleanup_if_both_listeners_terminated(self):
169 if self.thread_res==None and self.thread_log==None:
170 logging.debug('Both threads terminated')
128 self.borg_instance=None 171 self.borg_instance=None
129 self.thread=None
130 self.current_operation=None 172 self.current_operation=None
131 self.time_started=None 173 self.time_started=None
132 174
133 def __launch(self, queue, operation, archive_or_repository, *args): 175 def __launch(self, queue, operation, archive_or_repository, *args):
134 176
135 inst=BorgInstance(operation, archive_or_repository, *args) 177 inst=BorgInstance(operation, archive_or_repository, *args)
136 inst.launch() 178 inst.launch()
137 179
138 t=Thread(target=self.__listener) 180 t_log=Thread(target=self.__log_listener)
139 t.daemon=True 181 t_log.daemon=True
140 182
141 self.thread=t 183 t_res=Thread(target=self.__result_listener)
184 t_res.daemon=True
185
186 self.thread_log=t_log
187 self.thread_res=t_res
142 self.borg_instance=inst 188 self.borg_instance=inst
143 self.queue=queue 189 self.queue=queue
144 self.current_operation=operation 190 self.current_operation=operation
145 self.time_started=time.monotonic() 191 self.time_started=time.monotonic()
146 192
147 t.start() 193 t_log.start()
194 t_res.start()
148 195
149 def create(self, queue): 196 def create(self, queue):
150 self.__block_when_running() 197 self.__block_when_running()
151 198
152 archive="%s::%s%s" % (self.repository, 199 archive="%s::%s%s" % (self.repository,
167 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages 214 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages
168 def abort(self): 215 def abort(self):
169 with self.lock: 216 with self.lock:
170 if self.borg_instance: 217 if self.borg_instance:
171 self.borg_instance.terminate() 218 self.borg_instance.terminate()
172 if self.thread: 219 thread_log=self.thread_log
173 self.thread.terminate() 220 thread_res=self.thread_res
221
222 if thread_log:
223 thread_log.terminate()
224
225 if thread_res:
226 thread_res.terminate()
227
174 228
175 def join(self): 229 def join(self):
176 if self.thread: 230 logging.debug('Waiting for borg listener thread to terminate')
177 self.thread.join() 231
232 with self.lock:
233 thread_log=self.thread_log
234 thread_res=self.thread_res
235
236 if thread_log:
237 thread_log.join()
238
239 if thread_res:
240 thread_res.join()
241
242 assert(self.thread_log==None and self.thread_res==None)
178 243
179 def next_action(): 244 def next_action():
245 __block_when_running()
180 # TODO pruning as well 246 # TODO pruning as well
181 now=time.monotonic() 247 now=time.monotonic()
182 if not self.lastrun: 248 if not self.lastrun:
183 return 'create', now+self.retry_interval 249 return 'create', now+self.retry_interval
184 elif not self.lastrun_success: 250 elif not self.lastrun_success:

mercurial