view env/lib/python3.9/site-packages/pip/_vendor/resolvelib/resolvers.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

import collections

from .providers import AbstractResolver
from .structs import DirectedGraph, build_iter_view


RequirementInformation = collections.namedtuple(
    "RequirementInformation", ["requirement", "parent"]
)


class ResolverException(Exception):
    """A base class for all exceptions raised by this module.

    Exceptions derived by this class should all be handled in this module. Any
    bubbling pass the resolver should be treated as a bug.
    """


class RequirementsConflicted(ResolverException):
    def __init__(self, criterion):
        super(RequirementsConflicted, self).__init__(criterion)
        self.criterion = criterion

    def __str__(self):
        return "Requirements conflict: {}".format(
            ", ".join(repr(r) for r in self.criterion.iter_requirement()),
        )


class InconsistentCandidate(ResolverException):
    def __init__(self, candidate, criterion):
        super(InconsistentCandidate, self).__init__(candidate, criterion)
        self.candidate = candidate
        self.criterion = criterion

    def __str__(self):
        return "Provided candidate {!r} does not satisfy {}".format(
            self.candidate,
            ", ".join(repr(r) for r in self.criterion.iter_requirement()),
        )


class Criterion(object):
    """Representation of possible resolution results of a package.

    This holds three attributes:

    * `information` is a collection of `RequirementInformation` pairs.
      Each pair is a requirement contributing to this criterion, and the
      candidate that provides the requirement.
    * `incompatibilities` is a collection of all known not-to-work candidates
      to exclude from consideration.
    * `candidates` is a collection containing all possible candidates deducted
      from the union of contributing requirements and known incompatibilities.
      It should never be empty, except when the criterion is an attribute of a
      raised `RequirementsConflicted` (in which case it is always empty).

    .. note::
        This class is intended to be externally immutable. **Do not** mutate
        any of its attribute containers.
    """

    def __init__(self, candidates, information, incompatibilities):
        self.candidates = candidates
        self.information = information
        self.incompatibilities = incompatibilities

    def __repr__(self):
        requirements = ", ".join(
            "({!r}, via={!r})".format(req, parent)
            for req, parent in self.information
        )
        return "Criterion({})".format(requirements)

    @classmethod
    def from_requirement(cls, provider, requirement, parent):
        """Build an instance from a requirement."""
        cands = build_iter_view(provider.find_matches([requirement]))
        infos = [RequirementInformation(requirement, parent)]
        criterion = cls(cands, infos, incompatibilities=[])
        if not cands:
            raise RequirementsConflicted(criterion)
        return criterion

    def iter_requirement(self):
        return (i.requirement for i in self.information)

    def iter_parent(self):
        return (i.parent for i in self.information)

    def merged_with(self, provider, requirement, parent):
        """Build a new instance from this and a new requirement."""
        infos = list(self.information)
        infos.append(RequirementInformation(requirement, parent))
        cands = build_iter_view(provider.find_matches([r for r, _ in infos]))
        criterion = type(self)(cands, infos, list(self.incompatibilities))
        if not cands:
            raise RequirementsConflicted(criterion)
        return criterion

    def excluded_of(self, candidates):
        """Build a new instance from this, but excluding specified candidates.

        Returns the new instance, or None if we still have no valid candidates.
        """
        cands = self.candidates.excluding(candidates)
        if not cands:
            return None
        incompats = self.incompatibilities + candidates
        return type(self)(cands, list(self.information), incompats)


class ResolutionError(ResolverException):
    pass


class ResolutionImpossible(ResolutionError):
    def __init__(self, causes):
        super(ResolutionImpossible, self).__init__(causes)
        # causes is a list of RequirementInformation objects
        self.causes = causes


class ResolutionTooDeep(ResolutionError):
    def __init__(self, round_count):
        super(ResolutionTooDeep, self).__init__(round_count)
        self.round_count = round_count


# Resolution state in a round.
State = collections.namedtuple("State", "mapping criteria")


class Resolution(object):
    """Stateful resolution object.

    This is designed as a one-off object that holds information to kick start
    the resolution process, and holds the results afterwards.
    """

    def __init__(self, provider, reporter):
        self._p = provider
        self._r = reporter
        self._states = []

    @property
    def state(self):
        try:
            return self._states[-1]
        except IndexError:
            raise AttributeError("state")

    def _push_new_state(self):
        """Push a new state into history.

        This new state will be used to hold resolution results of the next
        coming round.
        """
        base = self._states[-1]
        state = State(
            mapping=base.mapping.copy(),
            criteria=base.criteria.copy(),
        )
        self._states.append(state)

    def _merge_into_criterion(self, requirement, parent):
        self._r.adding_requirement(requirement, parent)
        name = self._p.identify(requirement)
        try:
            crit = self.state.criteria[name]
        except KeyError:
            crit = Criterion.from_requirement(self._p, requirement, parent)
        else:
            crit = crit.merged_with(self._p, requirement, parent)
        return name, crit

    def _get_criterion_item_preference(self, item):
        name, criterion = item
        return self._p.get_preference(
            self.state.mapping.get(name),
            criterion.candidates.for_preference(),
            criterion.information,
        )

    def _is_current_pin_satisfying(self, name, criterion):
        try:
            current_pin = self.state.mapping[name]
        except KeyError:
            return False
        return all(
            self._p.is_satisfied_by(r, current_pin)
            for r in criterion.iter_requirement()
        )

    def _get_criteria_to_update(self, candidate):
        criteria = {}
        for r in self._p.get_dependencies(candidate):
            name, crit = self._merge_into_criterion(r, parent=candidate)
            criteria[name] = crit
        return criteria

    def _attempt_to_pin_criterion(self, name, criterion):
        causes = []
        for candidate in criterion.candidates:
            try:
                criteria = self._get_criteria_to_update(candidate)
            except RequirementsConflicted as e:
                causes.append(e.criterion)
                continue

            # Check the newly-pinned candidate actually works. This should
            # always pass under normal circumstances, but in the case of a
            # faulty provider, we will raise an error to notify the implementer
            # to fix find_matches() and/or is_satisfied_by().
            satisfied = all(
                self._p.is_satisfied_by(r, candidate)
                for r in criterion.iter_requirement()
            )
            if not satisfied:
                raise InconsistentCandidate(candidate, criterion)

            # Put newly-pinned candidate at the end. This is essential because
            # backtracking looks at this mapping to get the last pin.
            self._r.pinning(candidate)
            self.state.mapping.pop(name, None)
            self.state.mapping[name] = candidate
            self.state.criteria.update(criteria)

            return []

        # All candidates tried, nothing works. This criterion is a dead
        # end, signal for backtracking.
        return causes

    def _backtrack(self):
        """Perform backtracking.

        When we enter here, the stack is like this::

            [ state Z ]
            [ state Y ]
            [ state X ]
            .... earlier states are irrelevant.

        1. No pins worked for Z, so it does not have a pin.
        2. We want to reset state Y to unpinned, and pin another candidate.
        3. State X holds what state Y was before the pin, but does not
           have the incompatibility information gathered in state Y.

        Each iteration of the loop will:

        1.  Discard Z.
        2.  Discard Y but remember its incompatibility information gathered
            previously, and the failure we're dealing with right now.
        3.  Push a new state Y' based on X, and apply the incompatibility
            information from Y to Y'.
        4a. If this causes Y' to conflict, we need to backtrack again. Make Y'
            the new Z and go back to step 2.
        4b. If the incompatibilities apply cleanly, end backtracking.
        """
        while len(self._states) >= 3:
            # Remove the state that triggered backtracking.
            del self._states[-1]

            # Retrieve the last candidate pin and known incompatibilities.
            broken_state = self._states.pop()
            name, candidate = broken_state.mapping.popitem()
            incompatibilities_from_broken = [
                (k, v.incompatibilities)
                for k, v in broken_state.criteria.items()
            ]

            # Also mark the newly known incompatibility.
            incompatibilities_from_broken.append((name, [candidate]))

            self._r.backtracking(candidate)

            # Create a new state from the last known-to-work one, and apply
            # the previously gathered incompatibility information.
            def _patch_criteria():
                for k, incompatibilities in incompatibilities_from_broken:
                    if not incompatibilities:
                        continue
                    try:
                        criterion = self.state.criteria[k]
                    except KeyError:
                        continue
                    criterion = criterion.excluded_of(incompatibilities)
                    if criterion is None:
                        return False
                    self.state.criteria[k] = criterion
                return True

            self._push_new_state()
            success = _patch_criteria()

            # It works! Let's work on this new state.
            if success:
                return True

            # State does not work after applying known incompatibilities.
            # Try the still previous state.

        # No way to backtrack anymore.
        return False

    def resolve(self, requirements, max_rounds):
        if self._states:
            raise RuntimeError("already resolved")

        self._r.starting()

        # Initialize the root state.
        self._states = [State(mapping=collections.OrderedDict(), criteria={})]
        for r in requirements:
            try:
                name, crit = self._merge_into_criterion(r, parent=None)
            except RequirementsConflicted as e:
                raise ResolutionImpossible(e.criterion.information)
            self.state.criteria[name] = crit

        # The root state is saved as a sentinel so the first ever pin can have
        # something to backtrack to if it fails. The root state is basically
        # pinning the virtual "root" package in the graph.
        self._push_new_state()

        for round_index in range(max_rounds):
            self._r.starting_round(round_index)

            unsatisfied_criterion_items = [
                item
                for item in self.state.criteria.items()
                if not self._is_current_pin_satisfying(*item)
            ]

            # All criteria are accounted for. Nothing more to pin, we are done!
            if not unsatisfied_criterion_items:
                self._r.ending(self.state)
                return self.state

            # Choose the most preferred unpinned criterion to try.
            name, criterion = min(
                unsatisfied_criterion_items,
                key=self._get_criterion_item_preference,
            )
            failure_causes = self._attempt_to_pin_criterion(name, criterion)

            if failure_causes:
                # Backtrack if pinning fails. The backtrack process puts us in
                # an unpinned state, so we can work on it in the next round.
                success = self._backtrack()

                # Dead ends everywhere. Give up.
                if not success:
                    causes = [i for c in failure_causes for i in c.information]
                    raise ResolutionImpossible(causes)
            else:
                # Pinning was successful. Push a new state to do another pin.
                self._push_new_state()

            self._r.ending_round(round_index, self.state)

        raise ResolutionTooDeep(max_rounds)


def _has_route_to_root(criteria, key, all_keys, connected):
    if key in connected:
        return True
    if key not in criteria:
        return False
    for p in criteria[key].iter_parent():
        try:
            pkey = all_keys[id(p)]
        except KeyError:
            continue
        if pkey in connected:
            connected.add(key)
            return True
        if _has_route_to_root(criteria, pkey, all_keys, connected):
            connected.add(key)
            return True
    return False


Result = collections.namedtuple("Result", "mapping graph criteria")


def _build_result(state):
    mapping = state.mapping
    all_keys = {id(v): k for k, v in mapping.items()}
    all_keys[id(None)] = None

    graph = DirectedGraph()
    graph.add(None)  # Sentinel as root dependencies' parent.

    connected = {None}
    for key, criterion in state.criteria.items():
        if not _has_route_to_root(state.criteria, key, all_keys, connected):
            continue
        if key not in graph:
            graph.add(key)
        for p in criterion.iter_parent():
            try:
                pkey = all_keys[id(p)]
            except KeyError:
                continue
            if pkey not in graph:
                graph.add(pkey)
            graph.connect(pkey, key)

    return Result(
        mapping={k: v for k, v in mapping.items() if k in connected},
        graph=graph,
        criteria=state.criteria,
    )


class Resolver(AbstractResolver):
    """The thing that performs the actual resolution work."""

    base_exception = ResolverException

    def resolve(self, requirements, max_rounds=100):
        """Take a collection of constraints, spit out the resolution result.

        The return value is a representation to the final resolution result. It
        is a tuple subclass with three public members:

        * `mapping`: A dict of resolved candidates. Each key is an identifier
            of a requirement (as returned by the provider's `identify` method),
            and the value is the resolved candidate.
        * `graph`: A `DirectedGraph` instance representing the dependency tree.
            The vertices are keys of `mapping`, and each edge represents *why*
            a particular package is included. A special vertex `None` is
            included to represent parents of user-supplied requirements.
        * `criteria`: A dict of "criteria" that hold detailed information on
            how edges in the graph are derived. Each key is an identifier of a
            requirement, and the value is a `Criterion` instance.

        The following exceptions may be raised if a resolution cannot be found:

        * `ResolutionImpossible`: A resolution cannot be found for the given
            combination of requirements. The `causes` attribute of the
            exception is a list of (requirement, parent), giving the
            requirements that could not be satisfied.
        * `ResolutionTooDeep`: The dependency tree is too deeply nested and
            the resolver gave up. This is usually caused by a circular
            dependency, but you can try to resolve this by increasing the
            `max_rounds` argument.
        """
        resolution = Resolution(self.provider, self.reporter)
        state = resolution.resolve(requirements, max_rounds=max_rounds)
        return _build_result(state)