| 72 self.config=config |
72 self.config=config |
| 73 self.lastrun=None |
73 self.lastrun=None |
| 74 self.lastrun_success=None |
74 self.lastrun_success=None |
| 75 self.borg_instance=None |
75 self.borg_instance=None |
| 76 self.current_operation=None |
76 self.current_operation=None |
| 77 self.thread=None |
77 self.thread_log=None |
| |
78 self.thread_err=None |
| 78 self.lock=Lock() |
79 self.lock=Lock() |
| 79 |
80 |
| |
81 def is_running(self): |
| |
82 with self.lock: |
| |
83 running=self.borg_instance or self.thread_log or self.thread_err |
| |
84 return running |
| |
85 |
| 80 def __block_when_running(self): |
86 def __block_when_running(self): |
| 81 with self.lock: |
87 running=self.is_running() |
| 82 not_running=self.borg_instance is None and self.thread is None |
88 assert(not running) |
| 83 assert(not_running) |
89 |
| 84 |
90 def __log_listener(self): |
| 85 def __listener(self): |
91 logging.debug('Log listener thread waiting for entries') |
| 86 success=False |
92 success=True |
| 87 for status in iter(self.borg_instance.read, None): |
93 for status in iter(self.borg_instance.read_log, None): |
| 88 logging.info(str(status)) |
94 logging.debug(str(status)) |
| 89 t=status['type'] |
95 t=status['type'] |
| |
96 #may_indicate_finished=False |
| 90 if t=='progress_percent': |
97 if t=='progress_percent': |
| 91 pass |
98 #may_indicate_finished=True |
| |
99 # Temporary output |
| |
100 if 'current' not in status: |
| |
101 status['current']=0 |
| |
102 if 'total' not in status: |
| |
103 status['total']=0 |
| |
104 print('%d / %d' % (status['current'], status['total'])) |
| 92 elif t=='archive_progress': |
105 elif t=='archive_progress': |
| 93 pass |
106 pass |
| 94 elif t=='progress_message': |
107 elif t=='progress_message': |
| 95 if 'finished' in status: |
108 #may_indicate_finished=True |
| 96 logging.info('Borg subprocess finished succesfully') |
|
| 97 success=status['finished'] |
|
| 98 elif t=='progress_percent': |
|
| 99 # Temporary output |
|
| 100 print('%d / %d', status['current'], status['total']) |
|
| 101 pass |
109 pass |
| 102 elif t=='file_status': |
110 elif t=='file_status': |
| 103 pass |
111 pass |
| 104 elif t=='log_message': |
112 elif t=='log_message': |
| 105 if 'levelname' not in status: |
113 if 'levelname' not in status: |
| 108 status['message']='UNKNOWN' |
116 status['message']='UNKNOWN' |
| 109 if 'name' not in status: |
117 if 'name' not in status: |
| 110 status['name']='borg' |
118 status['name']='borg' |
| 111 logging.log(translate_loglevel(status['levelname']), |
119 logging.log(translate_loglevel(status['levelname']), |
| 112 status['name'] + ': ' + status['message']) |
120 status['name'] + ': ' + status['message']) |
| |
121 # set success=False? |
| 113 elif t=='exception': |
122 elif t=='exception': |
| 114 pass |
123 success=False |
| 115 elif t=='unparsed_error': |
124 elif t=='unparsed_error': |
| 116 pass |
125 success=False |
| 117 |
126 |
| 118 logging.info('Waiting for borg subprocess to terminate') |
127 #if (may_indicate_finished and 'finished' in status and |
| |
128 # status['finished']): |
| |
129 # logging.info('Borg subprocess finished succesfully') |
| |
130 # success=status['finished'] |
| |
131 |
| |
132 logging.debug('Waiting for borg subprocess to terminate in log thread') |
| 119 |
133 |
| 120 self.borg_instance.wait() |
134 self.borg_instance.wait() |
| 121 |
135 |
| 122 logging.info('Borg subprocess terminated; terminating listener thread') |
136 logging.debug('Borg subprocess terminated; terminating log listener thread') |
| |
137 |
| |
138 with self.lock: |
| |
139 self.thread_log=None |
| |
140 self.__cleanup_if_both_listeners_terminated() |
| |
141 |
| |
142 |
| |
143 def __result_listener(self): |
| |
144 logging.debug('Result listener thread waiting for result') |
| |
145 |
| |
146 res=self.borg_instance.read_result() |
| |
147 |
| |
148 success=True |
| |
149 |
| |
150 logging.debug('Borg result: %s' % str(res)) |
| |
151 |
| |
152 if res==None: |
| |
153 success=False |
| |
154 |
| |
155 logging.debug('Waiting for borg subprocess to terminate in result thread') |
| |
156 |
| |
157 self.borg_instance.wait() |
| |
158 |
| |
159 logging.debug('Borg subprocess terminated; terminating result listener thread') |
| 123 |
160 |
| 124 with self.lock: |
161 with self.lock: |
| 125 if self.current_operation=='create': |
162 if self.current_operation=='create': |
| 126 self.lastrun=self.time_started |
163 self.lastrun=self.time_started |
| 127 self.lastrun_success=success |
164 self.lastrun_success=success |
| |
165 self.thread_res=None |
| |
166 self.__cleanup_if_both_listeners_terminated() |
| |
167 |
| |
168 def __cleanup_if_both_listeners_terminated(self): |
| |
169 if self.thread_res==None and self.thread_log==None: |
| |
170 logging.debug('Both threads terminated') |
| 128 self.borg_instance=None |
171 self.borg_instance=None |
| 129 self.thread=None |
|
| 130 self.current_operation=None |
172 self.current_operation=None |
| 131 self.time_started=None |
173 self.time_started=None |
| 132 |
174 |
| 133 def __launch(self, queue, operation, archive_or_repository, *args): |
175 def __launch(self, queue, operation, archive_or_repository, *args): |
| 134 |
176 |
| 135 inst=BorgInstance(operation, archive_or_repository, *args) |
177 inst=BorgInstance(operation, archive_or_repository, *args) |
| 136 inst.launch() |
178 inst.launch() |
| 137 |
179 |
| 138 t=Thread(target=self.__listener) |
180 t_log=Thread(target=self.__log_listener) |
| 139 t.daemon=True |
181 t_log.daemon=True |
| 140 |
182 |
| 141 self.thread=t |
183 t_res=Thread(target=self.__result_listener) |
| |
184 t_res.daemon=True |
| |
185 |
| |
186 self.thread_log=t_log |
| |
187 self.thread_res=t_res |
| 142 self.borg_instance=inst |
188 self.borg_instance=inst |
| 143 self.queue=queue |
189 self.queue=queue |
| 144 self.current_operation=operation |
190 self.current_operation=operation |
| 145 self.time_started=time.monotonic() |
191 self.time_started=time.monotonic() |
| 146 |
192 |
| 147 t.start() |
193 t_log.start() |
| |
194 t_res.start() |
| 148 |
195 |
| 149 def create(self, queue): |
196 def create(self, queue): |
| 150 self.__block_when_running() |
197 self.__block_when_running() |
| 151 |
198 |
| 152 archive="%s::%s%s" % (self.repository, |
199 archive="%s::%s%s" % (self.repository, |
| 167 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
214 # TODO: Decide exact (manual) abort mechanism. Perhaps two stages |
| 168 def abort(self): |
215 def abort(self): |
| 169 with self.lock: |
216 with self.lock: |
| 170 if self.borg_instance: |
217 if self.borg_instance: |
| 171 self.borg_instance.terminate() |
218 self.borg_instance.terminate() |
| 172 if self.thread: |
219 thread_log=self.thread_log |
| 173 self.thread.terminate() |
220 thread_res=self.thread_res |
| |
221 |
| |
222 if thread_log: |
| |
223 thread_log.terminate() |
| |
224 |
| |
225 if thread_res: |
| |
226 thread_res.terminate() |
| |
227 |
| 174 |
228 |
| 175 def join(self): |
229 def join(self): |
| 176 if self.thread: |
230 logging.debug('Waiting for borg listener thread to terminate') |
| 177 self.thread.join() |
231 |
| |
232 with self.lock: |
| |
233 thread_log=self.thread_log |
| |
234 thread_res=self.thread_res |
| |
235 |
| |
236 if thread_log: |
| |
237 thread_log.join() |
| |
238 |
| |
239 if thread_res: |
| |
240 thread_res.join() |
| |
241 |
| |
242 assert(self.thread_log==None and self.thread_res==None) |
| 178 |
243 |
| 179 def next_action(): |
244 def next_action(): |
| |
245 __block_when_running() |
| 180 # TODO pruning as well |
246 # TODO pruning as well |
| 181 now=time.monotonic() |
247 now=time.monotonic() |
| 182 if not self.lastrun: |
248 if not self.lastrun: |
| 183 return 'create', now+self.retry_interval |
249 return 'create', now+self.retry_interval |
| 184 elif not self.lastrun_success: |
250 elif not self.lastrun_success: |