Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/pip/_internal/req/req_set.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import logging | |
2 from collections import OrderedDict | |
3 | |
4 from pip._vendor.packaging.utils import canonicalize_name | |
5 | |
6 from pip._internal.exceptions import InstallationError | |
7 from pip._internal.models.wheel import Wheel | |
8 from pip._internal.utils import compatibility_tags | |
9 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
10 | |
11 if MYPY_CHECK_RUNNING: | |
12 from typing import Dict, Iterable, List, Optional, Tuple | |
13 | |
14 from pip._internal.req.req_install import InstallRequirement | |
15 | |
16 | |
17 logger = logging.getLogger(__name__) | |
18 | |
19 | |
20 class RequirementSet: | |
21 | |
22 def __init__(self, check_supported_wheels=True): | |
23 # type: (bool) -> None | |
24 """Create a RequirementSet. | |
25 """ | |
26 | |
27 self.requirements = OrderedDict() # type: Dict[str, InstallRequirement] | |
28 self.check_supported_wheels = check_supported_wheels | |
29 | |
30 self.unnamed_requirements = [] # type: List[InstallRequirement] | |
31 | |
32 def __str__(self): | |
33 # type: () -> str | |
34 requirements = sorted( | |
35 (req for req in self.requirements.values() if not req.comes_from), | |
36 key=lambda req: canonicalize_name(req.name), | |
37 ) | |
38 return ' '.join(str(req.req) for req in requirements) | |
39 | |
40 def __repr__(self): | |
41 # type: () -> str | |
42 requirements = sorted( | |
43 self.requirements.values(), | |
44 key=lambda req: canonicalize_name(req.name), | |
45 ) | |
46 | |
47 format_string = '<{classname} object; {count} requirement(s): {reqs}>' | |
48 return format_string.format( | |
49 classname=self.__class__.__name__, | |
50 count=len(requirements), | |
51 reqs=', '.join(str(req.req) for req in requirements), | |
52 ) | |
53 | |
54 def add_unnamed_requirement(self, install_req): | |
55 # type: (InstallRequirement) -> None | |
56 assert not install_req.name | |
57 self.unnamed_requirements.append(install_req) | |
58 | |
59 def add_named_requirement(self, install_req): | |
60 # type: (InstallRequirement) -> None | |
61 assert install_req.name | |
62 | |
63 project_name = canonicalize_name(install_req.name) | |
64 self.requirements[project_name] = install_req | |
65 | |
66 def add_requirement( | |
67 self, | |
68 install_req, # type: InstallRequirement | |
69 parent_req_name=None, # type: Optional[str] | |
70 extras_requested=None # type: Optional[Iterable[str]] | |
71 ): | |
72 # type: (...) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]] | |
73 """Add install_req as a requirement to install. | |
74 | |
75 :param parent_req_name: The name of the requirement that needed this | |
76 added. The name is used because when multiple unnamed requirements | |
77 resolve to the same name, we could otherwise end up with dependency | |
78 links that point outside the Requirements set. parent_req must | |
79 already be added. Note that None implies that this is a user | |
80 supplied requirement, vs an inferred one. | |
81 :param extras_requested: an iterable of extras used to evaluate the | |
82 environment markers. | |
83 :return: Additional requirements to scan. That is either [] if | |
84 the requirement is not applicable, or [install_req] if the | |
85 requirement is applicable and has just been added. | |
86 """ | |
87 # If the markers do not match, ignore this requirement. | |
88 if not install_req.match_markers(extras_requested): | |
89 logger.info( | |
90 "Ignoring %s: markers '%s' don't match your environment", | |
91 install_req.name, install_req.markers, | |
92 ) | |
93 return [], None | |
94 | |
95 # If the wheel is not supported, raise an error. | |
96 # Should check this after filtering out based on environment markers to | |
97 # allow specifying different wheels based on the environment/OS, in a | |
98 # single requirements file. | |
99 if install_req.link and install_req.link.is_wheel: | |
100 wheel = Wheel(install_req.link.filename) | |
101 tags = compatibility_tags.get_supported() | |
102 if (self.check_supported_wheels and not wheel.supported(tags)): | |
103 raise InstallationError( | |
104 "{} is not a supported wheel on this platform.".format( | |
105 wheel.filename) | |
106 ) | |
107 | |
108 # This next bit is really a sanity check. | |
109 assert not install_req.user_supplied or parent_req_name is None, ( | |
110 "a user supplied req shouldn't have a parent" | |
111 ) | |
112 | |
113 # Unnamed requirements are scanned again and the requirement won't be | |
114 # added as a dependency until after scanning. | |
115 if not install_req.name: | |
116 self.add_unnamed_requirement(install_req) | |
117 return [install_req], None | |
118 | |
119 try: | |
120 existing_req = self.get_requirement( | |
121 install_req.name) # type: Optional[InstallRequirement] | |
122 except KeyError: | |
123 existing_req = None | |
124 | |
125 has_conflicting_requirement = ( | |
126 parent_req_name is None and | |
127 existing_req and | |
128 not existing_req.constraint and | |
129 existing_req.extras == install_req.extras and | |
130 existing_req.req.specifier != install_req.req.specifier | |
131 ) | |
132 if has_conflicting_requirement: | |
133 raise InstallationError( | |
134 "Double requirement given: {} (already in {}, name={!r})" | |
135 .format(install_req, existing_req, install_req.name) | |
136 ) | |
137 | |
138 # When no existing requirement exists, add the requirement as a | |
139 # dependency and it will be scanned again after. | |
140 if not existing_req: | |
141 self.add_named_requirement(install_req) | |
142 # We'd want to rescan this requirement later | |
143 return [install_req], install_req | |
144 | |
145 # Assume there's no need to scan, and that we've already | |
146 # encountered this for scanning. | |
147 if install_req.constraint or not existing_req.constraint: | |
148 return [], existing_req | |
149 | |
150 does_not_satisfy_constraint = ( | |
151 install_req.link and | |
152 not ( | |
153 existing_req.link and | |
154 install_req.link.path == existing_req.link.path | |
155 ) | |
156 ) | |
157 if does_not_satisfy_constraint: | |
158 raise InstallationError( | |
159 "Could not satisfy constraints for '{}': " | |
160 "installation from path or url cannot be " | |
161 "constrained to a version".format(install_req.name) | |
162 ) | |
163 # If we're now installing a constraint, mark the existing | |
164 # object for real installation. | |
165 existing_req.constraint = False | |
166 # If we're now installing a user supplied requirement, | |
167 # mark the existing object as such. | |
168 if install_req.user_supplied: | |
169 existing_req.user_supplied = True | |
170 existing_req.extras = tuple(sorted( | |
171 set(existing_req.extras) | set(install_req.extras) | |
172 )) | |
173 logger.debug( | |
174 "Setting %s extras to: %s", | |
175 existing_req, existing_req.extras, | |
176 ) | |
177 # Return the existing requirement for addition to the parent and | |
178 # scanning again. | |
179 return [existing_req], existing_req | |
180 | |
181 def has_requirement(self, name): | |
182 # type: (str) -> bool | |
183 project_name = canonicalize_name(name) | |
184 | |
185 return ( | |
186 project_name in self.requirements and | |
187 not self.requirements[project_name].constraint | |
188 ) | |
189 | |
190 def get_requirement(self, name): | |
191 # type: (str) -> InstallRequirement | |
192 project_name = canonicalize_name(name) | |
193 | |
194 if project_name in self.requirements: | |
195 return self.requirements[project_name] | |
196 | |
197 raise KeyError("No project with the name {name!r}".format(**locals())) | |
198 | |
199 @property | |
200 def all_requirements(self): | |
201 # type: () -> List[InstallRequirement] | |
202 return self.unnamed_requirements + list(self.requirements.values()) |