borgend/repository.py

changeset 87
a214d475aa28
parent 86
2fe66644c50d
child 89
51cc2e25af38
equal deleted inserted replaced
86:2fe66644c50d 87:a214d475aa28
17 super().__init__(cond, name=name) 17 super().__init__(cond, name=name)
18 18
19 def __lt__(self, other): 19 def __lt__(self, other):
20 return False 20 return False
21 21
22 # This FIFO essentially a fancy semaphore: Each thread waits on its own
23 # Condition, so can also be woken up from other executing threads.
24 # If they are woken up by the FIFO, then a "good to go" flag is set;
25 # and a specified action executed. Otherwise this is not done.
22 class FIFO(QueueThread): 26 class FIFO(QueueThread):
23 def __init__(self, **kwargs): 27 def __init__(self, **kwargs):
24 super().__init__(target = self._fifo_thread, **kwargs) 28 super().__init__(target = self._fifo_thread, **kwargs)
25 29
26 def _fifo_thread(self): 30 def _fifo_thread(self):
27 with self._cond: 31 with self._cond:
28 while not self._terminate: 32 while not self._terminate:
29 ev=self._list 33 ev=self._list
30 if ev: 34 if ev:
31 # We can only remove ev from the list when ev.cond allows 35 # We have to release lock on self._cond before obtaining
36 # one on ev.cond to avoid race conditions with
37 # self.queue_acion
38 self._cond.release()
32 with ev.cond: 39 with ev.cond:
40 # Just set "good to go" flag and notify the queued
41 # thread. To keep blocking other thread, it is the
42 # job of the queued thred to remove itself.
33 if not ev._goodtogo: 43 if not ev._goodtogo:
34 ev._goodtogo=True 44 ev._goodtogo=True
35 ev.cond.notifyAll() 45 ev.cond.notify_all()
46 self._cond.acquire()
36 self._cond.wait() 47 self._cond.wait()
48
49 self.logger.debug('Terminating')
37 50
38 # Termination cleanup 51 # Termination cleanup
39 ev=self._list 52 ev=self._list
40 while ev: 53 while ev:
41 # We can only remove ev from the list when ev.cond allows 54 # We can only remove ev from the list when ev.cond allows
47 def queue_action(self, cond, action=lambda: (), name=None): 60 def queue_action(self, cond, action=lambda: (), name=None):
48 ev=FIFOEvent(cond, name=name) 61 ev=FIFOEvent(cond, name=name)
49 62
50 with self._cond: 63 with self._cond:
51 self._insert(ev) 64 self._insert(ev)
65 self._cond.notify()
52 66
53 # This will release the lock on cond, allowing queue manager (scheduler) 67 # This will release the lock on cond, allowing queue manager (scheduler)
54 # thread to notify us if we are already to be released 68 # thread to notify us if we are ready to be released
55 logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') 69 logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN')
56 ev.cond.wait() 70 ev.cond.wait()
57 71
58 try: 72 try:
59 if ev._goodtogo: 73 if ev._goodtogo:
60 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') 74 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN')
61 #
62 # TODO: action() has to unlink on finish; so should maybe
63 # use weak references to event.
64 # Or we have to make action take all the time, so make the
65 # stdout thread.
66 # OR: Easiest to just move finish-waiting into __launch_check
67 # instead of at the outer level of the main loop.
68 #
69 action() 75 action()
70 finally: 76 finally:
71 with self._cond: 77 with self._cond:
72 self._unlink(ev) 78 self._unlink(ev)
73 # Let _fifo_thread proceed to next action 79 # Let _fifo_thread proceed to next action
100 106
101 self.__passphrase=None 107 self.__passphrase=None
102 108
103 if config.settings['extract_passphrases_at_startup']: 109 if config.settings['extract_passphrases_at_startup']:
104 try: 110 try:
105 self.extract_passphrase() 111 self.__extract_passphrase()
106 except Exception: 112 except Exception:
107 pass 113 pass
108 114
109 def __init__(self, identifier, cfg): 115 def __init__(self, identifier, cfg):
110 self.identifier=identifier 116 self.identifier=identifier
111 self.__decode_config(cfg) 117 self.__decode_config(cfg)
112 super().__init__(name = 'RepositoryThread %s' % self.repository_name) 118 super().__init__(name = 'RepositoryThread %s' % self.repository_name)
113 repositories[self.repository_name]=self 119 repositories[self.repository_name]=self
114 120
115 def __extract_passphrase(self): 121 def __extract_passphrase(self):
116 acc=self.__keychain_account
117 if not self.__passphrase: 122 if not self.__passphrase:
123 acc=self.__keychain_account
118 if acc and acc!='': 124 if acc and acc!='':
119 self.logger.debug('Requesting passphrase') 125 self.logger.debug('Requesting passphrase')
120 try: 126 try:
121 pw=keyring.get_password("borg-backup", acc) 127 pw=keyring.get_password("borg-backup", acc)
122 except Exception as err: 128 except Exception as err:
128 else: 134 else:
129 self.__passphrase=None 135 self.__passphrase=None
130 return self.__passphrase 136 return self.__passphrase
131 137
132 def launch_borg_instance(self, inst): 138 def launch_borg_instance(self, inst):
133 with self._cond: 139 try:
134 passphrase=self.__extract_passphrase() 140 self.logger.debug('launch_borg_instance: entering _cond')
135 inst.launch(passphrase=passphrase) 141 with self._cond:
142 self.logger.debug('launch_borg_instance: entering __extract_passphrase')
143 passphrase=self.__extract_passphrase()
144 except Exception as err:
145 self.logger.error('Aborting operation due to failure to obtain passphrase')
146 raise err
147 else:
148 inst.launch(passphrase=passphrase)
136 149
137 def find_repository(name): 150 def find_repository(name):
138 if name in repositories: 151 if name in repositories:
139 return repositories[name] 152 return repositories[name]
140 else: 153 else:

mercurial