17 super().__init__(cond, name=name) |
17 super().__init__(cond, name=name) |
18 |
18 |
19 def __lt__(self, other): |
19 def __lt__(self, other): |
20 return False |
20 return False |
21 |
21 |
|
22 # This FIFO essentially a fancy semaphore: Each thread waits on its own |
|
23 # Condition, so can also be woken up from other executing threads. |
|
24 # If they are woken up by the FIFO, then a "good to go" flag is set; |
|
25 # and a specified action executed. Otherwise this is not done. |
22 class FIFO(QueueThread): |
26 class FIFO(QueueThread): |
23 def __init__(self, **kwargs): |
27 def __init__(self, **kwargs): |
24 super().__init__(target = self._fifo_thread, **kwargs) |
28 super().__init__(target = self._fifo_thread, **kwargs) |
25 |
29 |
26 def _fifo_thread(self): |
30 def _fifo_thread(self): |
27 with self._cond: |
31 with self._cond: |
28 while not self._terminate: |
32 while not self._terminate: |
29 ev=self._list |
33 ev=self._list |
30 if ev: |
34 if ev: |
31 # We can only remove ev from the list when ev.cond allows |
35 # We have to release lock on self._cond before obtaining |
|
36 # one on ev.cond to avoid race conditions with |
|
37 # self.queue_acion |
|
38 self._cond.release() |
32 with ev.cond: |
39 with ev.cond: |
|
40 # Just set "good to go" flag and notify the queued |
|
41 # thread. To keep blocking other thread, it is the |
|
42 # job of the queued thred to remove itself. |
33 if not ev._goodtogo: |
43 if not ev._goodtogo: |
34 ev._goodtogo=True |
44 ev._goodtogo=True |
35 ev.cond.notifyAll() |
45 ev.cond.notify_all() |
|
46 self._cond.acquire() |
36 self._cond.wait() |
47 self._cond.wait() |
|
48 |
|
49 self.logger.debug('Terminating') |
37 |
50 |
38 # Termination cleanup |
51 # Termination cleanup |
39 ev=self._list |
52 ev=self._list |
40 while ev: |
53 while ev: |
41 # We can only remove ev from the list when ev.cond allows |
54 # We can only remove ev from the list when ev.cond allows |
47 def queue_action(self, cond, action=lambda: (), name=None): |
60 def queue_action(self, cond, action=lambda: (), name=None): |
48 ev=FIFOEvent(cond, name=name) |
61 ev=FIFOEvent(cond, name=name) |
49 |
62 |
50 with self._cond: |
63 with self._cond: |
51 self._insert(ev) |
64 self._insert(ev) |
|
65 self._cond.notify() |
52 |
66 |
53 # This will release the lock on cond, allowing queue manager (scheduler) |
67 # This will release the lock on cond, allowing queue manager (scheduler) |
54 # thread to notify us if we are already to be released |
68 # thread to notify us if we are ready to be released |
55 logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') |
69 logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') |
56 ev.cond.wait() |
70 ev.cond.wait() |
57 |
71 |
58 try: |
72 try: |
59 if ev._goodtogo: |
73 if ev._goodtogo: |
60 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') |
74 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') |
61 # |
|
62 # TODO: action() has to unlink on finish; so should maybe |
|
63 # use weak references to event. |
|
64 # Or we have to make action take all the time, so make the |
|
65 # stdout thread. |
|
66 # OR: Easiest to just move finish-waiting into __launch_check |
|
67 # instead of at the outer level of the main loop. |
|
68 # |
|
69 action() |
75 action() |
70 finally: |
76 finally: |
71 with self._cond: |
77 with self._cond: |
72 self._unlink(ev) |
78 self._unlink(ev) |
73 # Let _fifo_thread proceed to next action |
79 # Let _fifo_thread proceed to next action |
100 |
106 |
101 self.__passphrase=None |
107 self.__passphrase=None |
102 |
108 |
103 if config.settings['extract_passphrases_at_startup']: |
109 if config.settings['extract_passphrases_at_startup']: |
104 try: |
110 try: |
105 self.extract_passphrase() |
111 self.__extract_passphrase() |
106 except Exception: |
112 except Exception: |
107 pass |
113 pass |
108 |
114 |
109 def __init__(self, identifier, cfg): |
115 def __init__(self, identifier, cfg): |
110 self.identifier=identifier |
116 self.identifier=identifier |
111 self.__decode_config(cfg) |
117 self.__decode_config(cfg) |
112 super().__init__(name = 'RepositoryThread %s' % self.repository_name) |
118 super().__init__(name = 'RepositoryThread %s' % self.repository_name) |
113 repositories[self.repository_name]=self |
119 repositories[self.repository_name]=self |
114 |
120 |
115 def __extract_passphrase(self): |
121 def __extract_passphrase(self): |
116 acc=self.__keychain_account |
|
117 if not self.__passphrase: |
122 if not self.__passphrase: |
|
123 acc=self.__keychain_account |
118 if acc and acc!='': |
124 if acc and acc!='': |
119 self.logger.debug('Requesting passphrase') |
125 self.logger.debug('Requesting passphrase') |
120 try: |
126 try: |
121 pw=keyring.get_password("borg-backup", acc) |
127 pw=keyring.get_password("borg-backup", acc) |
122 except Exception as err: |
128 except Exception as err: |
128 else: |
134 else: |
129 self.__passphrase=None |
135 self.__passphrase=None |
130 return self.__passphrase |
136 return self.__passphrase |
131 |
137 |
132 def launch_borg_instance(self, inst): |
138 def launch_borg_instance(self, inst): |
133 with self._cond: |
139 try: |
134 passphrase=self.__extract_passphrase() |
140 self.logger.debug('launch_borg_instance: entering _cond') |
135 inst.launch(passphrase=passphrase) |
141 with self._cond: |
|
142 self.logger.debug('launch_borg_instance: entering __extract_passphrase') |
|
143 passphrase=self.__extract_passphrase() |
|
144 except Exception as err: |
|
145 self.logger.error('Aborting operation due to failure to obtain passphrase') |
|
146 raise err |
|
147 else: |
|
148 inst.launch(passphrase=passphrase) |
136 |
149 |
137 def find_repository(name): |
150 def find_repository(name): |
138 if name in repositories: |
151 if name in repositories: |
139 return repositories[name] |
152 return repositories[name] |
140 else: |
153 else: |