Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/pip/_internal/models/direct_url.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 """ PEP 610 """ | |
| 2 import json | |
| 3 import re | |
| 4 import urllib.parse | |
| 5 | |
| 6 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
| 7 | |
| 8 if MYPY_CHECK_RUNNING: | |
| 9 from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union | |
| 10 | |
| 11 T = TypeVar("T") | |
| 12 | |
| 13 | |
| 14 DIRECT_URL_METADATA_NAME = "direct_url.json" | |
| 15 ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$") | |
| 16 | |
| 17 __all__ = [ | |
| 18 "DirectUrl", | |
| 19 "DirectUrlValidationError", | |
| 20 "DirInfo", | |
| 21 "ArchiveInfo", | |
| 22 "VcsInfo", | |
| 23 ] | |
| 24 | |
| 25 | |
| 26 class DirectUrlValidationError(Exception): | |
| 27 pass | |
| 28 | |
| 29 | |
| 30 def _get(d, expected_type, key, default=None): | |
| 31 # type: (Dict[str, Any], Type[T], str, Optional[T]) -> Optional[T] | |
| 32 """Get value from dictionary and verify expected type.""" | |
| 33 if key not in d: | |
| 34 return default | |
| 35 value = d[key] | |
| 36 if not isinstance(value, expected_type): | |
| 37 raise DirectUrlValidationError( | |
| 38 "{!r} has unexpected type for {} (expected {})".format( | |
| 39 value, key, expected_type | |
| 40 ) | |
| 41 ) | |
| 42 return value | |
| 43 | |
| 44 | |
| 45 def _get_required(d, expected_type, key, default=None): | |
| 46 # type: (Dict[str, Any], Type[T], str, Optional[T]) -> T | |
| 47 value = _get(d, expected_type, key, default) | |
| 48 if value is None: | |
| 49 raise DirectUrlValidationError(f"{key} must have a value") | |
| 50 return value | |
| 51 | |
| 52 | |
| 53 def _exactly_one_of(infos): | |
| 54 # type: (Iterable[Optional[InfoType]]) -> InfoType | |
| 55 infos = [info for info in infos if info is not None] | |
| 56 if not infos: | |
| 57 raise DirectUrlValidationError( | |
| 58 "missing one of archive_info, dir_info, vcs_info" | |
| 59 ) | |
| 60 if len(infos) > 1: | |
| 61 raise DirectUrlValidationError( | |
| 62 "more than one of archive_info, dir_info, vcs_info" | |
| 63 ) | |
| 64 assert infos[0] is not None | |
| 65 return infos[0] | |
| 66 | |
| 67 | |
| 68 def _filter_none(**kwargs): | |
| 69 # type: (Any) -> Dict[str, Any] | |
| 70 """Make dict excluding None values.""" | |
| 71 return {k: v for k, v in kwargs.items() if v is not None} | |
| 72 | |
| 73 | |
| 74 class VcsInfo: | |
| 75 name = "vcs_info" | |
| 76 | |
| 77 def __init__( | |
| 78 self, | |
| 79 vcs, # type: str | |
| 80 commit_id, # type: str | |
| 81 requested_revision=None, # type: Optional[str] | |
| 82 resolved_revision=None, # type: Optional[str] | |
| 83 resolved_revision_type=None, # type: Optional[str] | |
| 84 ): | |
| 85 self.vcs = vcs | |
| 86 self.requested_revision = requested_revision | |
| 87 self.commit_id = commit_id | |
| 88 self.resolved_revision = resolved_revision | |
| 89 self.resolved_revision_type = resolved_revision_type | |
| 90 | |
| 91 @classmethod | |
| 92 def _from_dict(cls, d): | |
| 93 # type: (Optional[Dict[str, Any]]) -> Optional[VcsInfo] | |
| 94 if d is None: | |
| 95 return None | |
| 96 return cls( | |
| 97 vcs=_get_required(d, str, "vcs"), | |
| 98 commit_id=_get_required(d, str, "commit_id"), | |
| 99 requested_revision=_get(d, str, "requested_revision"), | |
| 100 resolved_revision=_get(d, str, "resolved_revision"), | |
| 101 resolved_revision_type=_get(d, str, "resolved_revision_type"), | |
| 102 ) | |
| 103 | |
| 104 def _to_dict(self): | |
| 105 # type: () -> Dict[str, Any] | |
| 106 return _filter_none( | |
| 107 vcs=self.vcs, | |
| 108 requested_revision=self.requested_revision, | |
| 109 commit_id=self.commit_id, | |
| 110 resolved_revision=self.resolved_revision, | |
| 111 resolved_revision_type=self.resolved_revision_type, | |
| 112 ) | |
| 113 | |
| 114 | |
| 115 class ArchiveInfo: | |
| 116 name = "archive_info" | |
| 117 | |
| 118 def __init__( | |
| 119 self, | |
| 120 hash=None, # type: Optional[str] | |
| 121 ): | |
| 122 self.hash = hash | |
| 123 | |
| 124 @classmethod | |
| 125 def _from_dict(cls, d): | |
| 126 # type: (Optional[Dict[str, Any]]) -> Optional[ArchiveInfo] | |
| 127 if d is None: | |
| 128 return None | |
| 129 return cls(hash=_get(d, str, "hash")) | |
| 130 | |
| 131 def _to_dict(self): | |
| 132 # type: () -> Dict[str, Any] | |
| 133 return _filter_none(hash=self.hash) | |
| 134 | |
| 135 | |
| 136 class DirInfo: | |
| 137 name = "dir_info" | |
| 138 | |
| 139 def __init__( | |
| 140 self, | |
| 141 editable=False, # type: bool | |
| 142 ): | |
| 143 self.editable = editable | |
| 144 | |
| 145 @classmethod | |
| 146 def _from_dict(cls, d): | |
| 147 # type: (Optional[Dict[str, Any]]) -> Optional[DirInfo] | |
| 148 if d is None: | |
| 149 return None | |
| 150 return cls( | |
| 151 editable=_get_required(d, bool, "editable", default=False) | |
| 152 ) | |
| 153 | |
| 154 def _to_dict(self): | |
| 155 # type: () -> Dict[str, Any] | |
| 156 return _filter_none(editable=self.editable or None) | |
| 157 | |
| 158 | |
| 159 if MYPY_CHECK_RUNNING: | |
| 160 InfoType = Union[ArchiveInfo, DirInfo, VcsInfo] | |
| 161 | |
| 162 | |
| 163 class DirectUrl: | |
| 164 | |
| 165 def __init__( | |
| 166 self, | |
| 167 url, # type: str | |
| 168 info, # type: InfoType | |
| 169 subdirectory=None, # type: Optional[str] | |
| 170 ): | |
| 171 self.url = url | |
| 172 self.info = info | |
| 173 self.subdirectory = subdirectory | |
| 174 | |
| 175 def _remove_auth_from_netloc(self, netloc): | |
| 176 # type: (str) -> str | |
| 177 if "@" not in netloc: | |
| 178 return netloc | |
| 179 user_pass, netloc_no_user_pass = netloc.split("@", 1) | |
| 180 if ( | |
| 181 isinstance(self.info, VcsInfo) and | |
| 182 self.info.vcs == "git" and | |
| 183 user_pass == "git" | |
| 184 ): | |
| 185 return netloc | |
| 186 if ENV_VAR_RE.match(user_pass): | |
| 187 return netloc | |
| 188 return netloc_no_user_pass | |
| 189 | |
| 190 @property | |
| 191 def redacted_url(self): | |
| 192 # type: () -> str | |
| 193 """url with user:password part removed unless it is formed with | |
| 194 environment variables as specified in PEP 610, or it is ``git`` | |
| 195 in the case of a git URL. | |
| 196 """ | |
| 197 purl = urllib.parse.urlsplit(self.url) | |
| 198 netloc = self._remove_auth_from_netloc(purl.netloc) | |
| 199 surl = urllib.parse.urlunsplit( | |
| 200 (purl.scheme, netloc, purl.path, purl.query, purl.fragment) | |
| 201 ) | |
| 202 return surl | |
| 203 | |
| 204 def validate(self): | |
| 205 # type: () -> None | |
| 206 self.from_dict(self.to_dict()) | |
| 207 | |
| 208 @classmethod | |
| 209 def from_dict(cls, d): | |
| 210 # type: (Dict[str, Any]) -> DirectUrl | |
| 211 return DirectUrl( | |
| 212 url=_get_required(d, str, "url"), | |
| 213 subdirectory=_get(d, str, "subdirectory"), | |
| 214 info=_exactly_one_of( | |
| 215 [ | |
| 216 ArchiveInfo._from_dict(_get(d, dict, "archive_info")), | |
| 217 DirInfo._from_dict(_get(d, dict, "dir_info")), | |
| 218 VcsInfo._from_dict(_get(d, dict, "vcs_info")), | |
| 219 ] | |
| 220 ), | |
| 221 ) | |
| 222 | |
| 223 def to_dict(self): | |
| 224 # type: () -> Dict[str, Any] | |
| 225 res = _filter_none( | |
| 226 url=self.redacted_url, | |
| 227 subdirectory=self.subdirectory, | |
| 228 ) | |
| 229 res[self.info.name] = self.info._to_dict() | |
| 230 return res | |
| 231 | |
| 232 @classmethod | |
| 233 def from_json(cls, s): | |
| 234 # type: (str) -> DirectUrl | |
| 235 return cls.from_dict(json.loads(s)) | |
| 236 | |
| 237 def to_json(self): | |
| 238 # type: () -> str | |
| 239 return json.dumps(self.to_dict(), sort_keys=True) |
