|
1 # |
|
2 # Repository abstraction for queuing |
|
3 # |
|
4 |
|
5 import weakref |
|
6 import keyring |
|
7 |
|
8 from . import loggers |
|
9 from . import config |
|
10 from .scheduler import QueueThread, QueuedEvent |
|
11 |
|
12 logger=loggers.get(__name__) |
|
13 |
|
14 class FIFOEvent(QueuedEvent): |
|
15 def __init__(self, cond, name=None): |
|
16 self._goodtogo=False |
|
17 super().__init__(cond, name=name) |
|
18 |
|
19 def __lt__(self, other): |
|
20 return False |
|
21 |
|
22 class FIFO(QueueThread): |
|
23 def __init__(self, **kwargs): |
|
24 super().__init__(target = self._fifo_thread, **kwargs) |
|
25 |
|
26 def _fifo_thread(self): |
|
27 with self._cond: |
|
28 while not self._terminate: |
|
29 ev=self._list |
|
30 if ev: |
|
31 # We can only remove ev from the list when ev.cond allows |
|
32 with ev.cond: |
|
33 if not ev._goodtogo: |
|
34 ev._goodtogo=True |
|
35 ev.cond.notifyAll() |
|
36 self._cond.wait() |
|
37 |
|
38 # Termination cleanup |
|
39 ev=self._list |
|
40 while ev: |
|
41 # We can only remove ev from the list when ev.cond allows |
|
42 with ev.cond: |
|
43 ev.cond.notifyAll() |
|
44 ev=ev.next |
|
45 |
|
46 # cond has to be acquired on entry! |
|
47 def queue_action(self, cond, action=lambda: (), name=None): |
|
48 ev=FIFOEvent(cond, name=name) |
|
49 |
|
50 with self._cond: |
|
51 self._insert(ev) |
|
52 |
|
53 # This will release the lock on cond, allowing queue manager (scheduler) |
|
54 # thread to notify us if we are already to be released |
|
55 logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN') |
|
56 ev.cond.wait() |
|
57 |
|
58 try: |
|
59 if ev._goodtogo: |
|
60 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN') |
|
61 # |
|
62 # TODO: action() has to unlink on finish; so should maybe |
|
63 # use weak references to event. |
|
64 # Or we have to make action take all the time, so make the |
|
65 # stdout thread. |
|
66 # OR: Easiest to just move finish-waiting into __launch_check |
|
67 # instead of at the outer level of the main loop. |
|
68 # |
|
69 action() |
|
70 finally: |
|
71 with self._cond: |
|
72 self._unlink(ev) |
|
73 # Let _fifo_thread proceed to next action |
|
74 self._cond.notify() |
|
75 |
|
76 return ev._goodtogo |
|
77 |
|
78 repositories=weakref.WeakValueDictionary() |
|
79 |
|
80 class Repository(FIFO): |
|
81 def __decode_config(self, cfg): |
|
82 loc0='Repository %d' % self.identifier |
|
83 |
|
84 self.repository_name=config.check_string(cfg, 'name', 'Name', loc0) |
|
85 |
|
86 logger.debug("Configuring repository '%s'" % self.repository_name) |
|
87 |
|
88 loc = 'Repository "%s"' |
|
89 |
|
90 self.logger=logger.getChild(self.repository_name) |
|
91 |
|
92 self.location=config.check_string(cfg, 'location', |
|
93 'Target repository location', loc) |
|
94 |
|
95 self.borg_parameters=config.BorgParameters.from_config(cfg, loc) |
|
96 |
|
97 self.__keychain_account=config.check_string(cfg, 'keychain_account', |
|
98 'Keychain account', loc, |
|
99 default='') |
|
100 |
|
101 self.__passphrase=None |
|
102 |
|
103 if config.settings['extract_passphrases_at_startup']: |
|
104 try: |
|
105 self.extract_passphrase() |
|
106 except Exception: |
|
107 pass |
|
108 |
|
109 def __init__(self, identifier, cfg): |
|
110 self.identifier=identifier |
|
111 self.__decode_config(cfg) |
|
112 super().__init__(name = 'RepositoryThread %s' % self.repository_name) |
|
113 repositories[self.repository_name]=self |
|
114 |
|
115 def __extract_passphrase(self): |
|
116 acc=self.__keychain_account |
|
117 if not self.__passphrase: |
|
118 if acc and acc!='': |
|
119 self.logger.debug('Requesting passphrase') |
|
120 try: |
|
121 pw=keyring.get_password("borg-backup", acc) |
|
122 except Exception as err: |
|
123 self.logger.error('Failed to retrieve passphrase') |
|
124 raise err |
|
125 else: |
|
126 self.logger.debug('Received passphrase') |
|
127 self.__passphrase=pw |
|
128 else: |
|
129 self.__passphrase=None |
|
130 return self.__passphrase |
|
131 |
|
132 def launch_borg_instance(self, inst): |
|
133 with self._cond: |
|
134 passphrase=self.__extract_passphrase() |
|
135 inst.launch(passphrase=passphrase) |
|
136 |
|
137 def find_repository(name): |
|
138 if name in repositories: |
|
139 return repositories[name] |
|
140 else: |
|
141 return None |
|
142 |
|
143 |