Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/pip/_internal/models/direct_url.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """ PEP 610 """ | |
2 import json | |
3 import re | |
4 import urllib.parse | |
5 | |
6 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
7 | |
8 if MYPY_CHECK_RUNNING: | |
9 from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union | |
10 | |
11 T = TypeVar("T") | |
12 | |
13 | |
14 DIRECT_URL_METADATA_NAME = "direct_url.json" | |
15 ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$") | |
16 | |
17 __all__ = [ | |
18 "DirectUrl", | |
19 "DirectUrlValidationError", | |
20 "DirInfo", | |
21 "ArchiveInfo", | |
22 "VcsInfo", | |
23 ] | |
24 | |
25 | |
26 class DirectUrlValidationError(Exception): | |
27 pass | |
28 | |
29 | |
30 def _get(d, expected_type, key, default=None): | |
31 # type: (Dict[str, Any], Type[T], str, Optional[T]) -> Optional[T] | |
32 """Get value from dictionary and verify expected type.""" | |
33 if key not in d: | |
34 return default | |
35 value = d[key] | |
36 if not isinstance(value, expected_type): | |
37 raise DirectUrlValidationError( | |
38 "{!r} has unexpected type for {} (expected {})".format( | |
39 value, key, expected_type | |
40 ) | |
41 ) | |
42 return value | |
43 | |
44 | |
45 def _get_required(d, expected_type, key, default=None): | |
46 # type: (Dict[str, Any], Type[T], str, Optional[T]) -> T | |
47 value = _get(d, expected_type, key, default) | |
48 if value is None: | |
49 raise DirectUrlValidationError(f"{key} must have a value") | |
50 return value | |
51 | |
52 | |
53 def _exactly_one_of(infos): | |
54 # type: (Iterable[Optional[InfoType]]) -> InfoType | |
55 infos = [info for info in infos if info is not None] | |
56 if not infos: | |
57 raise DirectUrlValidationError( | |
58 "missing one of archive_info, dir_info, vcs_info" | |
59 ) | |
60 if len(infos) > 1: | |
61 raise DirectUrlValidationError( | |
62 "more than one of archive_info, dir_info, vcs_info" | |
63 ) | |
64 assert infos[0] is not None | |
65 return infos[0] | |
66 | |
67 | |
68 def _filter_none(**kwargs): | |
69 # type: (Any) -> Dict[str, Any] | |
70 """Make dict excluding None values.""" | |
71 return {k: v for k, v in kwargs.items() if v is not None} | |
72 | |
73 | |
74 class VcsInfo: | |
75 name = "vcs_info" | |
76 | |
77 def __init__( | |
78 self, | |
79 vcs, # type: str | |
80 commit_id, # type: str | |
81 requested_revision=None, # type: Optional[str] | |
82 resolved_revision=None, # type: Optional[str] | |
83 resolved_revision_type=None, # type: Optional[str] | |
84 ): | |
85 self.vcs = vcs | |
86 self.requested_revision = requested_revision | |
87 self.commit_id = commit_id | |
88 self.resolved_revision = resolved_revision | |
89 self.resolved_revision_type = resolved_revision_type | |
90 | |
91 @classmethod | |
92 def _from_dict(cls, d): | |
93 # type: (Optional[Dict[str, Any]]) -> Optional[VcsInfo] | |
94 if d is None: | |
95 return None | |
96 return cls( | |
97 vcs=_get_required(d, str, "vcs"), | |
98 commit_id=_get_required(d, str, "commit_id"), | |
99 requested_revision=_get(d, str, "requested_revision"), | |
100 resolved_revision=_get(d, str, "resolved_revision"), | |
101 resolved_revision_type=_get(d, str, "resolved_revision_type"), | |
102 ) | |
103 | |
104 def _to_dict(self): | |
105 # type: () -> Dict[str, Any] | |
106 return _filter_none( | |
107 vcs=self.vcs, | |
108 requested_revision=self.requested_revision, | |
109 commit_id=self.commit_id, | |
110 resolved_revision=self.resolved_revision, | |
111 resolved_revision_type=self.resolved_revision_type, | |
112 ) | |
113 | |
114 | |
115 class ArchiveInfo: | |
116 name = "archive_info" | |
117 | |
118 def __init__( | |
119 self, | |
120 hash=None, # type: Optional[str] | |
121 ): | |
122 self.hash = hash | |
123 | |
124 @classmethod | |
125 def _from_dict(cls, d): | |
126 # type: (Optional[Dict[str, Any]]) -> Optional[ArchiveInfo] | |
127 if d is None: | |
128 return None | |
129 return cls(hash=_get(d, str, "hash")) | |
130 | |
131 def _to_dict(self): | |
132 # type: () -> Dict[str, Any] | |
133 return _filter_none(hash=self.hash) | |
134 | |
135 | |
136 class DirInfo: | |
137 name = "dir_info" | |
138 | |
139 def __init__( | |
140 self, | |
141 editable=False, # type: bool | |
142 ): | |
143 self.editable = editable | |
144 | |
145 @classmethod | |
146 def _from_dict(cls, d): | |
147 # type: (Optional[Dict[str, Any]]) -> Optional[DirInfo] | |
148 if d is None: | |
149 return None | |
150 return cls( | |
151 editable=_get_required(d, bool, "editable", default=False) | |
152 ) | |
153 | |
154 def _to_dict(self): | |
155 # type: () -> Dict[str, Any] | |
156 return _filter_none(editable=self.editable or None) | |
157 | |
158 | |
159 if MYPY_CHECK_RUNNING: | |
160 InfoType = Union[ArchiveInfo, DirInfo, VcsInfo] | |
161 | |
162 | |
163 class DirectUrl: | |
164 | |
165 def __init__( | |
166 self, | |
167 url, # type: str | |
168 info, # type: InfoType | |
169 subdirectory=None, # type: Optional[str] | |
170 ): | |
171 self.url = url | |
172 self.info = info | |
173 self.subdirectory = subdirectory | |
174 | |
175 def _remove_auth_from_netloc(self, netloc): | |
176 # type: (str) -> str | |
177 if "@" not in netloc: | |
178 return netloc | |
179 user_pass, netloc_no_user_pass = netloc.split("@", 1) | |
180 if ( | |
181 isinstance(self.info, VcsInfo) and | |
182 self.info.vcs == "git" and | |
183 user_pass == "git" | |
184 ): | |
185 return netloc | |
186 if ENV_VAR_RE.match(user_pass): | |
187 return netloc | |
188 return netloc_no_user_pass | |
189 | |
190 @property | |
191 def redacted_url(self): | |
192 # type: () -> str | |
193 """url with user:password part removed unless it is formed with | |
194 environment variables as specified in PEP 610, or it is ``git`` | |
195 in the case of a git URL. | |
196 """ | |
197 purl = urllib.parse.urlsplit(self.url) | |
198 netloc = self._remove_auth_from_netloc(purl.netloc) | |
199 surl = urllib.parse.urlunsplit( | |
200 (purl.scheme, netloc, purl.path, purl.query, purl.fragment) | |
201 ) | |
202 return surl | |
203 | |
204 def validate(self): | |
205 # type: () -> None | |
206 self.from_dict(self.to_dict()) | |
207 | |
208 @classmethod | |
209 def from_dict(cls, d): | |
210 # type: (Dict[str, Any]) -> DirectUrl | |
211 return DirectUrl( | |
212 url=_get_required(d, str, "url"), | |
213 subdirectory=_get(d, str, "subdirectory"), | |
214 info=_exactly_one_of( | |
215 [ | |
216 ArchiveInfo._from_dict(_get(d, dict, "archive_info")), | |
217 DirInfo._from_dict(_get(d, dict, "dir_info")), | |
218 VcsInfo._from_dict(_get(d, dict, "vcs_info")), | |
219 ] | |
220 ), | |
221 ) | |
222 | |
223 def to_dict(self): | |
224 # type: () -> Dict[str, Any] | |
225 res = _filter_none( | |
226 url=self.redacted_url, | |
227 subdirectory=self.subdirectory, | |
228 ) | |
229 res[self.info.name] = self.info._to_dict() | |
230 return res | |
231 | |
232 @classmethod | |
233 def from_json(cls, s): | |
234 # type: (str) -> DirectUrl | |
235 return cls.from_dict(json.loads(s)) | |
236 | |
237 def to_json(self): | |
238 # type: () -> str | |
239 return json.dumps(self.to_dict(), sort_keys=True) |