Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/pip/_internal/req/req_set.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/pip/_internal/req/req_set.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,202 @@ +import logging +from collections import OrderedDict + +from pip._vendor.packaging.utils import canonicalize_name + +from pip._internal.exceptions import InstallationError +from pip._internal.models.wheel import Wheel +from pip._internal.utils import compatibility_tags +from pip._internal.utils.typing import MYPY_CHECK_RUNNING + +if MYPY_CHECK_RUNNING: + from typing import Dict, Iterable, List, Optional, Tuple + + from pip._internal.req.req_install import InstallRequirement + + +logger = logging.getLogger(__name__) + + +class RequirementSet: + + def __init__(self, check_supported_wheels=True): + # type: (bool) -> None + """Create a RequirementSet. + """ + + self.requirements = OrderedDict() # type: Dict[str, InstallRequirement] + self.check_supported_wheels = check_supported_wheels + + self.unnamed_requirements = [] # type: List[InstallRequirement] + + def __str__(self): + # type: () -> str + requirements = sorted( + (req for req in self.requirements.values() if not req.comes_from), + key=lambda req: canonicalize_name(req.name), + ) + return ' '.join(str(req.req) for req in requirements) + + def __repr__(self): + # type: () -> str + requirements = sorted( + self.requirements.values(), + key=lambda req: canonicalize_name(req.name), + ) + + format_string = '<{classname} object; {count} requirement(s): {reqs}>' + return format_string.format( + classname=self.__class__.__name__, + count=len(requirements), + reqs=', '.join(str(req.req) for req in requirements), + ) + + def add_unnamed_requirement(self, install_req): + # type: (InstallRequirement) -> None + assert not install_req.name + self.unnamed_requirements.append(install_req) + + def add_named_requirement(self, install_req): + # type: (InstallRequirement) -> None + assert install_req.name + + project_name = canonicalize_name(install_req.name) + self.requirements[project_name] = install_req + + def add_requirement( + self, + install_req, # type: InstallRequirement + parent_req_name=None, # type: Optional[str] + extras_requested=None # type: Optional[Iterable[str]] + ): + # type: (...) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]] + """Add install_req as a requirement to install. + + :param parent_req_name: The name of the requirement that needed this + added. The name is used because when multiple unnamed requirements + resolve to the same name, we could otherwise end up with dependency + links that point outside the Requirements set. parent_req must + already be added. Note that None implies that this is a user + supplied requirement, vs an inferred one. + :param extras_requested: an iterable of extras used to evaluate the + environment markers. + :return: Additional requirements to scan. That is either [] if + the requirement is not applicable, or [install_req] if the + requirement is applicable and has just been added. + """ + # If the markers do not match, ignore this requirement. + if not install_req.match_markers(extras_requested): + logger.info( + "Ignoring %s: markers '%s' don't match your environment", + install_req.name, install_req.markers, + ) + return [], None + + # If the wheel is not supported, raise an error. + # Should check this after filtering out based on environment markers to + # allow specifying different wheels based on the environment/OS, in a + # single requirements file. + if install_req.link and install_req.link.is_wheel: + wheel = Wheel(install_req.link.filename) + tags = compatibility_tags.get_supported() + if (self.check_supported_wheels and not wheel.supported(tags)): + raise InstallationError( + "{} is not a supported wheel on this platform.".format( + wheel.filename) + ) + + # This next bit is really a sanity check. + assert not install_req.user_supplied or parent_req_name is None, ( + "a user supplied req shouldn't have a parent" + ) + + # Unnamed requirements are scanned again and the requirement won't be + # added as a dependency until after scanning. + if not install_req.name: + self.add_unnamed_requirement(install_req) + return [install_req], None + + try: + existing_req = self.get_requirement( + install_req.name) # type: Optional[InstallRequirement] + except KeyError: + existing_req = None + + has_conflicting_requirement = ( + parent_req_name is None and + existing_req and + not existing_req.constraint and + existing_req.extras == install_req.extras and + existing_req.req.specifier != install_req.req.specifier + ) + if has_conflicting_requirement: + raise InstallationError( + "Double requirement given: {} (already in {}, name={!r})" + .format(install_req, existing_req, install_req.name) + ) + + # When no existing requirement exists, add the requirement as a + # dependency and it will be scanned again after. + if not existing_req: + self.add_named_requirement(install_req) + # We'd want to rescan this requirement later + return [install_req], install_req + + # Assume there's no need to scan, and that we've already + # encountered this for scanning. + if install_req.constraint or not existing_req.constraint: + return [], existing_req + + does_not_satisfy_constraint = ( + install_req.link and + not ( + existing_req.link and + install_req.link.path == existing_req.link.path + ) + ) + if does_not_satisfy_constraint: + raise InstallationError( + "Could not satisfy constraints for '{}': " + "installation from path or url cannot be " + "constrained to a version".format(install_req.name) + ) + # If we're now installing a constraint, mark the existing + # object for real installation. + existing_req.constraint = False + # If we're now installing a user supplied requirement, + # mark the existing object as such. + if install_req.user_supplied: + existing_req.user_supplied = True + existing_req.extras = tuple(sorted( + set(existing_req.extras) | set(install_req.extras) + )) + logger.debug( + "Setting %s extras to: %s", + existing_req, existing_req.extras, + ) + # Return the existing requirement for addition to the parent and + # scanning again. + return [existing_req], existing_req + + def has_requirement(self, name): + # type: (str) -> bool + project_name = canonicalize_name(name) + + return ( + project_name in self.requirements and + not self.requirements[project_name].constraint + ) + + def get_requirement(self, name): + # type: (str) -> InstallRequirement + project_name = canonicalize_name(name) + + if project_name in self.requirements: + return self.requirements[project_name] + + raise KeyError("No project with the name {name!r}".format(**locals())) + + @property + def all_requirements(self): + # type: () -> List[InstallRequirement] + return self.unnamed_requirements + list(self.requirements.values())