diff env/lib/python3.9/site-packages/pip/_internal/req/req_set.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/pip/_internal/req/req_set.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,202 @@
+import logging
+from collections import OrderedDict
+
+from pip._vendor.packaging.utils import canonicalize_name
+
+from pip._internal.exceptions import InstallationError
+from pip._internal.models.wheel import Wheel
+from pip._internal.utils import compatibility_tags
+from pip._internal.utils.typing import MYPY_CHECK_RUNNING
+
+if MYPY_CHECK_RUNNING:
+    from typing import Dict, Iterable, List, Optional, Tuple
+
+    from pip._internal.req.req_install import InstallRequirement
+
+
+logger = logging.getLogger(__name__)
+
+
+class RequirementSet:
+
+    def __init__(self, check_supported_wheels=True):
+        # type: (bool) -> None
+        """Create a RequirementSet.
+        """
+
+        self.requirements = OrderedDict()  # type: Dict[str, InstallRequirement]
+        self.check_supported_wheels = check_supported_wheels
+
+        self.unnamed_requirements = []  # type: List[InstallRequirement]
+
+    def __str__(self):
+        # type: () -> str
+        requirements = sorted(
+            (req for req in self.requirements.values() if not req.comes_from),
+            key=lambda req: canonicalize_name(req.name),
+        )
+        return ' '.join(str(req.req) for req in requirements)
+
+    def __repr__(self):
+        # type: () -> str
+        requirements = sorted(
+            self.requirements.values(),
+            key=lambda req: canonicalize_name(req.name),
+        )
+
+        format_string = '<{classname} object; {count} requirement(s): {reqs}>'
+        return format_string.format(
+            classname=self.__class__.__name__,
+            count=len(requirements),
+            reqs=', '.join(str(req.req) for req in requirements),
+        )
+
+    def add_unnamed_requirement(self, install_req):
+        # type: (InstallRequirement) -> None
+        assert not install_req.name
+        self.unnamed_requirements.append(install_req)
+
+    def add_named_requirement(self, install_req):
+        # type: (InstallRequirement) -> None
+        assert install_req.name
+
+        project_name = canonicalize_name(install_req.name)
+        self.requirements[project_name] = install_req
+
+    def add_requirement(
+        self,
+        install_req,  # type: InstallRequirement
+        parent_req_name=None,  # type: Optional[str]
+        extras_requested=None  # type: Optional[Iterable[str]]
+    ):
+        # type: (...) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]]
+        """Add install_req as a requirement to install.
+
+        :param parent_req_name: The name of the requirement that needed this
+            added. The name is used because when multiple unnamed requirements
+            resolve to the same name, we could otherwise end up with dependency
+            links that point outside the Requirements set. parent_req must
+            already be added. Note that None implies that this is a user
+            supplied requirement, vs an inferred one.
+        :param extras_requested: an iterable of extras used to evaluate the
+            environment markers.
+        :return: Additional requirements to scan. That is either [] if
+            the requirement is not applicable, or [install_req] if the
+            requirement is applicable and has just been added.
+        """
+        # If the markers do not match, ignore this requirement.
+        if not install_req.match_markers(extras_requested):
+            logger.info(
+                "Ignoring %s: markers '%s' don't match your environment",
+                install_req.name, install_req.markers,
+            )
+            return [], None
+
+        # If the wheel is not supported, raise an error.
+        # Should check this after filtering out based on environment markers to
+        # allow specifying different wheels based on the environment/OS, in a
+        # single requirements file.
+        if install_req.link and install_req.link.is_wheel:
+            wheel = Wheel(install_req.link.filename)
+            tags = compatibility_tags.get_supported()
+            if (self.check_supported_wheels and not wheel.supported(tags)):
+                raise InstallationError(
+                    "{} is not a supported wheel on this platform.".format(
+                        wheel.filename)
+                )
+
+        # This next bit is really a sanity check.
+        assert not install_req.user_supplied or parent_req_name is None, (
+            "a user supplied req shouldn't have a parent"
+        )
+
+        # Unnamed requirements are scanned again and the requirement won't be
+        # added as a dependency until after scanning.
+        if not install_req.name:
+            self.add_unnamed_requirement(install_req)
+            return [install_req], None
+
+        try:
+            existing_req = self.get_requirement(
+                install_req.name)  # type: Optional[InstallRequirement]
+        except KeyError:
+            existing_req = None
+
+        has_conflicting_requirement = (
+            parent_req_name is None and
+            existing_req and
+            not existing_req.constraint and
+            existing_req.extras == install_req.extras and
+            existing_req.req.specifier != install_req.req.specifier
+        )
+        if has_conflicting_requirement:
+            raise InstallationError(
+                "Double requirement given: {} (already in {}, name={!r})"
+                .format(install_req, existing_req, install_req.name)
+            )
+
+        # When no existing requirement exists, add the requirement as a
+        # dependency and it will be scanned again after.
+        if not existing_req:
+            self.add_named_requirement(install_req)
+            # We'd want to rescan this requirement later
+            return [install_req], install_req
+
+        # Assume there's no need to scan, and that we've already
+        # encountered this for scanning.
+        if install_req.constraint or not existing_req.constraint:
+            return [], existing_req
+
+        does_not_satisfy_constraint = (
+            install_req.link and
+            not (
+                existing_req.link and
+                install_req.link.path == existing_req.link.path
+            )
+        )
+        if does_not_satisfy_constraint:
+            raise InstallationError(
+                "Could not satisfy constraints for '{}': "
+                "installation from path or url cannot be "
+                "constrained to a version".format(install_req.name)
+            )
+        # If we're now installing a constraint, mark the existing
+        # object for real installation.
+        existing_req.constraint = False
+        # If we're now installing a user supplied requirement,
+        # mark the existing object as such.
+        if install_req.user_supplied:
+            existing_req.user_supplied = True
+        existing_req.extras = tuple(sorted(
+            set(existing_req.extras) | set(install_req.extras)
+        ))
+        logger.debug(
+            "Setting %s extras to: %s",
+            existing_req, existing_req.extras,
+        )
+        # Return the existing requirement for addition to the parent and
+        # scanning again.
+        return [existing_req], existing_req
+
+    def has_requirement(self, name):
+        # type: (str) -> bool
+        project_name = canonicalize_name(name)
+
+        return (
+            project_name in self.requirements and
+            not self.requirements[project_name].constraint
+        )
+
+    def get_requirement(self, name):
+        # type: (str) -> InstallRequirement
+        project_name = canonicalize_name(name)
+
+        if project_name in self.requirements:
+            return self.requirements[project_name]
+
+        raise KeyError("No project with the name {name!r}".format(**locals()))
+
+    @property
+    def all_requirements(self):
+        # type: () -> List[InstallRequirement]
+        return self.unnamed_requirements + list(self.requirements.values())