repository.py

changeset 54
cfcaa5f6ba33
parent 53
442c558bd632
child 64
6cfe6a89e810
equal deleted inserted replaced
53:442c558bd632 54:cfcaa5f6ba33
1 # 1 #
2 # Repository abstraction for queuing 2 # Repository abstraction for queuing
3 # 3 #
4 4
5 from scheduler import TerminableThread, QueuedEvent 5 import weakref
6 from scheduler import QueueThread, QueuedEvent
6 7
7 class Repository(TerminableThread): 8 class FIFOEvent(QueuedEvent):
8 def __init__(self) 9 def __init__(self, cond, name=None):
9 self.__next_event_time = None 10 self._goodtogo=False
10 self.__list = None 11 super().__init__(cond, name=name)
11 self._cond = Condition() 12
12 self._terminate = False 13 def __lt__(self, other):
13 super().__init__(target = self.__scheduler_thread, name = 'Scheduler') 14 return True
14 self.daemon=True 15
16 class FIFO(QueueThread):
17 def __init__(self, **kwargs):
18 super().__init__(target = self._fifo_thread, **kwargs)
19
20 def _fifo_thread(self):
21 with self._cond:
22 while not self._terminate:
23 ev=self._list
24 if ev:
25 # We can only remove ev from the list when ev.cond allows
26 with ev.cond:
27 ev._goodtogo=True
28 ev.cond.notifyAll()
29 self._cond.wait()
30
31 # Termination cleanup
32 ev=self._list
33 while ev:
34 # We can only remove ev from the list when ev.cond allows
35 with ev.cond:
36 ev.cond.notifyAll()
37 ev=ev.next
38
39 # cond has to be acquired on entry!
40 def queue_action(self, cond, action=lambda: (), name=None):
41 ev=FIFOEvent(cond, name=name)
42
43 with self._cond:
44 self._insert(ev)
45
46 goodtogo=False
47 terminate_=False
48 while not goodtogo and not terminate_:
49 # This will release the lock on cond, allowing queue manager (scheduler)
50 # thread to notify us if we are already to be released
51 ev.cond.wait()
52 with ev.cond:
53 goodtogo=ev._goodtogo
54 with self._cond:
55 terminate_=self._terminate
56
57 try:
58 if not terminate_:
59 action()
60 finally:
61 with self._cond:
62 self._unlink(ev)
63 # Let _fifo_thread proceed to next action
64 self._cond.notify()
65
66 class Repository(FIFO):
67 def __init__(self, name):
68 super().__init__(name = 'RepositoryThread %s' % name)
69 self.repository_name=name
70
71
72 # TODO: Should use weak references but they give KeyError
73 repositories=weakref.WeakValueDictionary()
74
75 def get_controller(name):
76 if name in repositories:
77 repo = repositories[name]
78 else:
79 repo = Repository(name)
80 repo.start()
81 repositories[name] = repo
82 return repo
83

mercurial