repository.py

changeset 80
a409242121d5
parent 79
b075b3db3044
child 81
7bcd715f19e3
equal deleted inserted replaced
79:b075b3db3044 80:a409242121d5
1 #
2 # Repository abstraction for queuing
3 #
4
5 import weakref
6 import keyring
7 import loggers
8 import config
9 from scheduler import QueueThread, QueuedEvent
10
11 logger=loggers.get(__name__)
12
13 class FIFOEvent(QueuedEvent):
14 def __init__(self, cond, name=None):
15 self._goodtogo=False
16 super().__init__(cond, name=name)
17
18 def __lt__(self, other):
19 return False
20
21 class FIFO(QueueThread):
22 def __init__(self, **kwargs):
23 super().__init__(target = self._fifo_thread, **kwargs)
24
25 def _fifo_thread(self):
26 with self._cond:
27 while not self._terminate:
28 ev=self._list
29 if ev:
30 # We can only remove ev from the list when ev.cond allows
31 with ev.cond:
32 if not ev._goodtogo:
33 ev._goodtogo=True
34 ev.cond.notifyAll()
35 self._cond.wait()
36
37 # Termination cleanup
38 ev=self._list
39 while ev:
40 # We can only remove ev from the list when ev.cond allows
41 with ev.cond:
42 ev.cond.notifyAll()
43 ev=ev.next
44
45 # cond has to be acquired on entry!
46 def queue_action(self, cond, action=lambda: (), name=None):
47 ev=FIFOEvent(cond, name=name)
48
49 with self._cond:
50 self._insert(ev)
51
52 # This will release the lock on cond, allowing queue manager (scheduler)
53 # thread to notify us if we are already to be released
54 logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN')
55 ev.cond.wait()
56
57 try:
58 if ev._goodtogo:
59 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN')
60 #
61 # TODO: action() has to unlink on finish; so should maybe
62 # use weak references to event.
63 # Or we have to make action take all the time, so make the
64 # stdout thread.
65 # OR: Easiest to just move finish-waiting into __launch_check
66 # instead of at the outer level of the main loop.
67 #
68 action()
69 finally:
70 with self._cond:
71 self._unlink(ev)
72 # Let _fifo_thread proceed to next action
73 self._cond.notify()
74
75 return ev._goodtogo
76
77 repositories=weakref.WeakValueDictionary()
78
79 class Repository(FIFO):
80 def __decode_config(self, cfg):
81 loc0='Repository %d' % self.identifier
82
83 self.repository_name=config.check_string(cfg, 'name', 'Name', loc0)
84
85 logger.debug("Configuring repository '%s'" % self.repository_name)
86
87 loc = 'Repository "%s"'
88
89 self.logger=logger.getChild(self.repository_name)
90
91 self.location=config.check_string(cfg, 'location',
92 'Target repository location', loc)
93
94 self.borg_parameters=config.BorgParameters.from_config(cfg, loc)
95
96 self.__keychain_account=config.check_string(cfg, 'keychain_account',
97 'Keychain account', loc,
98 default='')
99
100 self.__passphrase=None
101
102 if config.settings['extract_passphrases_at_startup']:
103 try:
104 self.extract_passphrase()
105 except Exception:
106 pass
107
108 def __init__(self, identifier, cfg):
109 self.identifier=identifier
110 self.__decode_config(cfg)
111 super().__init__(name = 'RepositoryThread %s' % self.repository_name)
112 repositories[self.repository_name]=self
113
114 def __extract_passphrase(self):
115 acc=self.__keychain_account
116 if not self.__passphrase:
117 if acc and acc!='':
118 self.logger.debug('Requesting passphrase')
119 try:
120 pw=keyring.get_password("borg-backup", acc)
121 except Exception as err:
122 self.logger.error('Failed to retrieve passphrase')
123 raise err
124 else:
125 self.logger.debug('Received passphrase')
126 self.__passphrase=pw
127 else:
128 self.__passphrase=None
129 return self.__passphrase
130
131 def launch_borg_instance(self, inst):
132 with self._cond:
133 passphrase=self.__extract_passphrase()
134 inst.launch(passphrase=passphrase)
135
136 def find_repository(name):
137 if name in repositories:
138 return repositories[name]
139 else:
140 return None
141
142

mercurial