1 # |
|
2 # Repository abstraction for queuing |
|
3 # |
|
4 |
|
5 import weakref |
|
6 import keyring |
|
7 import loggers |
|
8 import config |
|
9 from scheduler import QueueThread, QueuedEvent |
|
10 |
|
11 logger=loggers.get(__name__) |
|
12 |
|
13 class FIFOEvent(QueuedEvent): |
|
14 def __init__(self, cond, name=None): |
|
15 self._goodtogo=False |
|
16 super().__init__(cond, name=name) |
|
17 |
|
18 def __lt__(self, other): |
|
19 return False |
|
20 |
|
21 class FIFO(QueueThread): |
|
22 def __init__(self, **kwargs): |
|
23 super().__init__(target = self._fifo_thread, **kwargs) |
|
24 |
|
25 def _fifo_thread(self): |
|
26 with self._cond: |
|
27 while not self._terminate: |
|
28 ev=self._list |
|
29 if ev: |
|
30 # We can only remove ev from the list when ev.cond allows |
|
31 with ev.cond: |
|
32 if not ev._goodtogo: |
|
33 ev._goodtogo=True |
|
34 ev.cond.notifyAll() |
|
35 self._cond.wait() |
|
36 |
|
37 # Termination cleanup |
|
38 ev=self._list |
|
39 while ev: |
|
40 # We can only remove ev from the list when ev.cond allows |
|
41 with ev.cond: |
|
42 ev.cond.notifyAll() |
|
43 ev=ev.next |
|
44 |
|
45 # cond has to be acquired on entry! |
|
46 def queue_action(self, cond, action=lambda: (), name=None): |
|
47 ev=FIFOEvent(cond, name=name) |
|
48 |
|
49 with self._cond: |
|
50 self._insert(ev) |
|
51 |
|
52 # This will release the lock on cond, allowing queue manager (scheduler) |
|
53 # thread to notify us if we are already to be released |
|
54 logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') |
|
55 ev.cond.wait() |
|
56 |
|
57 try: |
|
58 if ev._goodtogo: |
|
59 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') |
|
60 # |
|
61 # TODO: action() has to unlink on finish; so should maybe |
|
62 # use weak references to event. |
|
63 # Or we have to make action take all the time, so make the |
|
64 # stdout thread. |
|
65 # OR: Easiest to just move finish-waiting into __launch_check |
|
66 # instead of at the outer level of the main loop. |
|
67 # |
|
68 action() |
|
69 finally: |
|
70 with self._cond: |
|
71 self._unlink(ev) |
|
72 # Let _fifo_thread proceed to next action |
|
73 self._cond.notify() |
|
74 |
|
75 return ev._goodtogo |
|
76 |
|
77 repositories=weakref.WeakValueDictionary() |
|
78 |
|
79 class Repository(FIFO): |
|
80 def __decode_config(self, cfg): |
|
81 loc0='Repository %d' % self.identifier |
|
82 |
|
83 self.repository_name=config.check_string(cfg, 'name', 'Name', loc0) |
|
84 |
|
85 logger.debug("Configuring repository '%s'" % self.repository_name) |
|
86 |
|
87 loc = 'Repository "%s"' |
|
88 |
|
89 self.logger=logger.getChild(self.repository_name) |
|
90 |
|
91 self.location=config.check_string(cfg, 'location', |
|
92 'Target repository location', loc) |
|
93 |
|
94 self.borg_parameters=config.BorgParameters.from_config(cfg, loc) |
|
95 |
|
96 self.__keychain_account=config.check_string(cfg, 'keychain_account', |
|
97 'Keychain account', loc, |
|
98 default='') |
|
99 |
|
100 self.__passphrase=None |
|
101 |
|
102 if config.settings['extract_passphrases_at_startup']: |
|
103 try: |
|
104 self.extract_passphrase() |
|
105 except Exception: |
|
106 pass |
|
107 |
|
108 def __init__(self, identifier, cfg): |
|
109 self.identifier=identifier |
|
110 self.__decode_config(cfg) |
|
111 super().__init__(name = 'RepositoryThread %s' % self.repository_name) |
|
112 repositories[self.repository_name]=self |
|
113 |
|
114 def __extract_passphrase(self): |
|
115 acc=self.__keychain_account |
|
116 if not self.__passphrase: |
|
117 if acc and acc!='': |
|
118 self.logger.debug('Requesting passphrase') |
|
119 try: |
|
120 pw=keyring.get_password("borg-backup", acc) |
|
121 except Exception as err: |
|
122 self.logger.error('Failed to retrieve passphrase') |
|
123 raise err |
|
124 else: |
|
125 self.logger.debug('Received passphrase') |
|
126 self.__passphrase=pw |
|
127 else: |
|
128 self.__passphrase=None |
|
129 return self.__passphrase |
|
130 |
|
131 def launch_borg_instance(self, inst): |
|
132 with self._cond: |
|
133 passphrase=self.__extract_passphrase() |
|
134 inst.launch(passphrase=passphrase) |
|
135 |
|
136 def find_repository(name): |
|
137 if name in repositories: |
|
138 return repositories[name] |
|
139 else: |
|
140 return None |
|
141 |
|
142 |
|