repository.py

Thu, 25 Jan 2018 22:34:55 +0000

author
Tuomo Valkonen <tuomov@iki.fi>
date
Thu, 25 Jan 2018 22:34:55 +0000
changeset 64
6cfe6a89e810
parent 54
cfcaa5f6ba33
child 74
4f56142e7497
permissions
-rw-r--r--

Repository queuing fixes

#
# Repository abstraction for queuing
#

import weakref
import borgend
from scheduler import QueueThread, QueuedEvent

logger=borgend.logger.getChild(__name__)

class FIFOEvent(QueuedEvent):
    def __init__(self, cond, name=None):
        self._goodtogo=False
        super().__init__(cond, name=name)

    def __lt__(self, other):
        return False

class FIFO(QueueThread):
    def __init__(self, **kwargs):
        super().__init__(target = self._fifo_thread, **kwargs)

    def _fifo_thread(self):
        with self._cond:
            while not self._terminate:
                ev=self._list
                if ev:
                    # We can only remove ev from the list when ev.cond allows
                    with ev.cond:
                        if not ev._goodtogo:
                            ev._goodtogo=True
                            ev.cond.notifyAll()
                self._cond.wait()

            # Termination cleanup
            ev=self._list
            while ev:
                # We can only remove ev from the list when ev.cond allows
                with ev.cond:
                    ev.cond.notifyAll()
                    ev=ev.next

    # cond has to be acquired on entry!
    def queue_action(self, cond, action=lambda: (), name=None):
        ev=FIFOEvent(cond, name=name)

        with self._cond:
            self._insert(ev)

        # This will release the lock on cond, allowing queue manager (scheduler)
        # thread to notify us if we are already to be released
        logger.debug("%s:Queuing %s", self.name, ev.name or 'UNKNOWN')
        ev.cond.wait()

        try:
            if ev._goodtogo:
                logger.debug("%s:Executing %s", self.name, ev.name or 'UNKNOWN')
                #
                # TODO: action() has to unlink on finish; so should maybe
                # use weak references to event.
                # Or we have to make action take all the time, so make the
                # stdout thread.
                # OR: Easiest to just move finish-waiting into __launch_check
                # instead of at the outer level of the main loop.
                #
                action()
        finally:
            with self._cond:
                self._unlink(ev)
                # Let _fifo_thread proceed to next action
                self._cond.notify()

        return ev._goodtogo

class Repository(FIFO):
    def __init__(self, name):
        super().__init__(name = 'RepositoryThread %s' % name)
        self.repository_name=name


# TODO: Should use weak references but they give KeyError
repositories=weakref.WeakValueDictionary()

def get_controller(name):
    if name in repositories:
        repo = repositories[name]
    else:
        repo = Repository(name)
        repo.start()
        repositories[name] = repo
    return repo

mercurial