72 self.config=config |
72 self.config=config |
73 self.lastrun=None |
73 self.lastrun=None |
74 self.lastrun_success=None |
74 self.lastrun_success=None |
75 self.borg_instance=None |
75 self.borg_instance=None |
76 self.current_operation=None |
76 self.current_operation=None |
77 self.thread=None |
77 self.thread_log=None |
|
78 self.thread_err=None |
78 self.lock=Lock() |
79 self.lock=Lock() |
79 |
80 |
|
81 def is_running(self): |
|
82 with self.lock: |
|
83 running=self.borg_instance or self.thread_log or self.thread_err |
|
84 return running |
|
85 |
80 def __block_when_running(self): |
86 def __block_when_running(self): |
81 with self.lock: |
87 running=self.is_running() |
82 not_running=self.borg_instance is None and self.thread is None |
88 assert(not running) |
83 assert(not_running) |
89 |
84 |
90 def __log_listener(self): |
85 def __listener(self): |
91 logging.debug('Log listener thread waiting for entries') |
86 success=False |
92 success=True |
87 for status in iter(self.borg_instance.read, None): |
93 for status in iter(self.borg_instance.read_log, None): |
88 logging.info(str(status)) |
94 logging.debug(str(status)) |
89 t=status['type'] |
95 t=status['type'] |
|
96 #may_indicate_finished=False |
90 if t=='progress_percent': |
97 if t=='progress_percent': |
91 pass |
98 #may_indicate_finished=True |
|
99 # Temporary output |
|
100 if 'current' not in status: |
|
101 status['current']=0 |
|
102 if 'total' not in status: |
|
103 status['total']=0 |
|
104 print('%d / %d' % (status['current'], status['total'])) |
92 elif t=='archive_progress': |
105 elif t=='archive_progress': |
93 pass |
106 pass |
94 elif t=='progress_message': |
107 elif t=='progress_message': |
95 if 'finished' in status: |
108 #may_indicate_finished=True |
96 logging.info('Borg subprocess finished succesfully') |
|
97 success=status['finished'] |
|
98 elif t=='progress_percent': |
|
99 # Temporary output |
|
100 print('%d / %d', status['current'], status['total']) |
|
101 pass |
109 pass |
102 elif t=='file_status': |
110 elif t=='file_status': |
103 pass |
111 pass |
104 elif t=='log_message': |
112 elif t=='log_message': |
105 if 'levelname' not in status: |
113 if 'levelname' not in status: |
108 status['message']='UNKNOWN' |
116 status['message']='UNKNOWN' |
109 if 'name' not in status: |
117 if 'name' not in status: |
110 status['name']='borg' |
118 status['name']='borg' |
111 logging.log(translate_loglevel(status['levelname']), |
119 logging.log(translate_loglevel(status['levelname']), |
112 status['name'] + ': ' + status['message']) |
120 status['name'] + ': ' + status['message']) |
|
121 # set success=False? |
113 elif t=='exception': |
122 elif t=='exception': |
114 pass |
123 success=False |
115 elif t=='unparsed_error': |
124 elif t=='unparsed_error': |
116 pass |
125 success=False |
117 |
126 |
118 logging.info('Waiting for borg subprocess to terminate') |
127 #if (may_indicate_finished and 'finished' in status and |
|
128 # status['finished']): |
|
129 # logging.info('Borg subprocess finished succesfully') |
|
130 # success=status['finished'] |
|
131 |
|
132 logging.debug('Waiting for borg subprocess to terminate in log thread') |
119 |
133 |
120 self.borg_instance.wait() |
134 self.borg_instance.wait() |
121 |
135 |
122 logging.info('Borg subprocess terminated; terminating listener thread') |
136 logging.debug('Borg subprocess terminated; terminating log listener thread') |
|
137 |
|
138 with self.lock: |
|
139 self.thread_log=None |
|
140 self.__cleanup_if_both_listeners_terminated() |
|
141 |
|
142 |
|
143 def __result_listener(self): |
|
144 logging.debug('Result listener thread waiting for result') |
|
145 |
|
146 res=self.borg_instance.read_result() |
|
147 |
|
148 success=True |
|
149 |
|
150 logging.debug('Borg result: %s' % str(res)) |
|
151 |
|
152 if res==None: |
|
153 success=False |
|
154 |
|
155 logging.debug('Waiting for borg subprocess to terminate in result thread') |
|
156 |
|
157 self.borg_instance.wait() |
|
158 |
|
159 logging.debug('Borg subprocess terminated; terminating result listener thread') |
123 |
160 |
124 with self.lock: |
161 with self.lock: |
125 if self.current_operation=='create': |
162 if self.current_operation=='create': |
126 self.lastrun=self.time_started |
163 self.lastrun=self.time_started |
127 self.lastrun_success=success |
164 self.lastrun_success=success |
|
165 self.thread_res=None |
|
166 self.__cleanup_if_both_listeners_terminated() |
|
167 |
|
168 def __cleanup_if_both_listeners_terminated(self): |
|
169 if self.thread_res==None and self.thread_log==None: |
|
170 logging.debug('Both threads terminated') |
128 self.borg_instance=None |
171 self.borg_instance=None |
129 self.thread=None |
|
130 self.current_operation=None |
172 self.current_operation=None |
131 self.time_started=None |
173 self.time_started=None |
132 |
174 |
133 def __launch(self, queue, operation, archive_or_repository, *args): |
175 def __launch(self, queue, operation, archive_or_repository, *args): |
134 |
176 |
135 inst=BorgInstance(operation, archive_or_repository, *args) |
177 inst=BorgInstance(operation, archive_or_repository, *args) |
136 inst.launch() |
178 inst.launch() |
137 |
179 |
138 t=Thread(target=self.__listener) |
180 t_log=Thread(target=self.__log_listener) |
139 t.daemon=True |
181 t_log.daemon=True |
140 |
182 |
141 self.thread=t |
183 t_res=Thread(target=self.__result_listener) |
|
184 t_res.daemon=True |
|
185 |
|
186 self.thread_log=t_log |
|
187 self.thread_res=t_res |
142 self.borg_instance=inst |
188 self.borg_instance=inst |
143 self.queue=queue |
189 self.queue=queue |
144 self.current_operation=operation |
190 self.current_operation=operation |
145 self.time_started=time.monotonic() |
191 self.time_started=time.monotonic() |
146 |
192 |
147 t.start() |
193 t_log.start() |
|
194 t_res.start() |
148 |
195 |
149 def create(self, queue): |
196 def create(self, queue): |
150 self.__block_when_running() |
197 self.__block_when_running() |
151 |
198 |
152 archive="%s::%s%s" % (self.repository, |
199 archive="%s::%s%s" % (self.repository, |
167 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
214 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
168 def abort(self): |
215 def abort(self): |
169 with self.lock: |
216 with self.lock: |
170 if self.borg_instance: |
217 if self.borg_instance: |
171 self.borg_instance.terminate() |
218 self.borg_instance.terminate() |
172 if self.thread: |
219 thread_log=self.thread_log |
173 self.thread.terminate() |
220 thread_res=self.thread_res |
|
221 |
|
222 if thread_log: |
|
223 thread_log.terminate() |
|
224 |
|
225 if thread_res: |
|
226 thread_res.terminate() |
|
227 |
174 |
228 |
175 def join(self): |
229 def join(self): |
176 if self.thread: |
230 logging.debug('Waiting for borg listener thread to terminate') |
177 self.thread.join() |
231 |
|
232 with self.lock: |
|
233 thread_log=self.thread_log |
|
234 thread_res=self.thread_res |
|
235 |
|
236 if thread_log: |
|
237 thread_log.join() |
|
238 |
|
239 if thread_res: |
|
240 thread_res.join() |
|
241 |
|
242 assert(self.thread_log==None and self.thread_res==None) |
178 |
243 |
179 def next_action(): |
244 def next_action(): |
|
245 __block_when_running() |
180 # TODO pruning as well |
246 # TODO pruning as well |
181 now=time.monotonic() |
247 now=time.monotonic() |
182 if not self.lastrun: |
248 if not self.lastrun: |
183 return 'create', now+self.retry_interval |
249 return 'create', now+self.retry_interval |
184 elif not self.lastrun_success: |
250 elif not self.lastrun_success: |