borgend/repository.py

changeset 80
a409242121d5
parent 79
b075b3db3044
child 86
2fe66644c50d
equal deleted inserted replaced
79:b075b3db3044 80:a409242121d5
1 #
2 # Repository abstraction for queuing
3 #
4
5 import weakref
6 import keyring
7
8 from . import loggers
9 from . import config
10 from .scheduler import QueueThread, QueuedEvent
11
12 logger=loggers.get(__name__)
13
14 class FIFOEvent(QueuedEvent):
15 def __init__(self, cond, name=None):
16 self._goodtogo=False
17 super().__init__(cond, name=name)
18
19 def __lt__(self, other):
20 return False
21
22 class FIFO(QueueThread):
23 def __init__(self, **kwargs):
24 super().__init__(target = self._fifo_thread, **kwargs)
25
26 def _fifo_thread(self):
27 with self._cond:
28 while not self._terminate:
29 ev=self._list
30 if ev:
31 # We can only remove ev from the list when ev.cond allows
32 with ev.cond:
33 if not ev._goodtogo:
34 ev._goodtogo=True
35 ev.cond.notifyAll()
36 self._cond.wait()
37
38 # Termination cleanup
39 ev=self._list
40 while ev:
41 # We can only remove ev from the list when ev.cond allows
42 with ev.cond:
43 ev.cond.notifyAll()
44 ev=ev.next
45
46 # cond has to be acquired on entry!
47 def queue_action(self, cond, action=lambda: (), name=None):
48 ev=FIFOEvent(cond, name=name)
49
50 with self._cond:
51 self._insert(ev)
52
53 # This will release the lock on cond, allowing queue manager (scheduler)
54 # thread to notify us if we are already to be released
55 logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN')
56 ev.cond.wait()
57
58 try:
59 if ev._goodtogo:
60 logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN')
61 #
62 # TODO: action() has to unlink on finish; so should maybe
63 # use weak references to event.
64 # Or we have to make action take all the time, so make the
65 # stdout thread.
66 # OR: Easiest to just move finish-waiting into __launch_check
67 # instead of at the outer level of the main loop.
68 #
69 action()
70 finally:
71 with self._cond:
72 self._unlink(ev)
73 # Let _fifo_thread proceed to next action
74 self._cond.notify()
75
76 return ev._goodtogo
77
78 repositories=weakref.WeakValueDictionary()
79
80 class Repository(FIFO):
81 def __decode_config(self, cfg):
82 loc0='Repository %d' % self.identifier
83
84 self.repository_name=config.check_string(cfg, 'name', 'Name', loc0)
85
86 logger.debug("Configuring repository '%s'" % self.repository_name)
87
88 loc = 'Repository "%s"'
89
90 self.logger=logger.getChild(self.repository_name)
91
92 self.location=config.check_string(cfg, 'location',
93 'Target repository location', loc)
94
95 self.borg_parameters=config.BorgParameters.from_config(cfg, loc)
96
97 self.__keychain_account=config.check_string(cfg, 'keychain_account',
98 'Keychain account', loc,
99 default='')
100
101 self.__passphrase=None
102
103 if config.settings['extract_passphrases_at_startup']:
104 try:
105 self.extract_passphrase()
106 except Exception:
107 pass
108
109 def __init__(self, identifier, cfg):
110 self.identifier=identifier
111 self.__decode_config(cfg)
112 super().__init__(name = 'RepositoryThread %s' % self.repository_name)
113 repositories[self.repository_name]=self
114
115 def __extract_passphrase(self):
116 acc=self.__keychain_account
117 if not self.__passphrase:
118 if acc and acc!='':
119 self.logger.debug('Requesting passphrase')
120 try:
121 pw=keyring.get_password("borg-backup", acc)
122 except Exception as err:
123 self.logger.error('Failed to retrieve passphrase')
124 raise err
125 else:
126 self.logger.debug('Received passphrase')
127 self.__passphrase=pw
128 else:
129 self.__passphrase=None
130 return self.__passphrase
131
132 def launch_borg_instance(self, inst):
133 with self._cond:
134 passphrase=self.__extract_passphrase()
135 inst.launch(passphrase=passphrase)
136
137 def find_repository(name):
138 if name in repositories:
139 return repositories[name]
140 else:
141 return None
142
143

mercurial