view env/lib/python3.9/site-packages/pip/_internal/models/ @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
line wrap: on
line source

""" PEP 610 """
import json
import re
import urllib.parse

from pip._internal.utils.typing import MYPY_CHECK_RUNNING

    from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union

    T = TypeVar("T")

DIRECT_URL_METADATA_NAME = "direct_url.json"
ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")

__all__ = [

class DirectUrlValidationError(Exception):

def _get(d, expected_type, key, default=None):
    # type: (Dict[str, Any], Type[T], str, Optional[T]) -> Optional[T]
    """Get value from dictionary and verify expected type."""
    if key not in d:
        return default
    value = d[key]
    if not isinstance(value, expected_type):
        raise DirectUrlValidationError(
            "{!r} has unexpected type for {} (expected {})".format(
                value, key, expected_type
    return value

def _get_required(d, expected_type, key, default=None):
    # type: (Dict[str, Any], Type[T], str, Optional[T]) -> T
    value = _get(d, expected_type, key, default)
    if value is None:
        raise DirectUrlValidationError(f"{key} must have a value")
    return value

def _exactly_one_of(infos):
    # type: (Iterable[Optional[InfoType]]) -> InfoType
    infos = [info for info in infos if info is not None]
    if not infos:
        raise DirectUrlValidationError(
            "missing one of archive_info, dir_info, vcs_info"
    if len(infos) > 1:
        raise DirectUrlValidationError(
            "more than one of archive_info, dir_info, vcs_info"
    assert infos[0] is not None
    return infos[0]

def _filter_none(**kwargs):
    # type: (Any) -> Dict[str, Any]
    """Make dict excluding None values."""
    return {k: v for k, v in kwargs.items() if v is not None}

class VcsInfo:
    name = "vcs_info"

    def __init__(
        vcs,  # type: str
        commit_id,  # type: str
        requested_revision=None,  # type: Optional[str]
        resolved_revision=None,  # type: Optional[str]
        resolved_revision_type=None,  # type: Optional[str]
        self.vcs = vcs
        self.requested_revision = requested_revision
        self.commit_id = commit_id
        self.resolved_revision = resolved_revision
        self.resolved_revision_type = resolved_revision_type

    def _from_dict(cls, d):
        # type: (Optional[Dict[str, Any]]) -> Optional[VcsInfo]
        if d is None:
            return None
        return cls(
            vcs=_get_required(d, str, "vcs"),
            commit_id=_get_required(d, str, "commit_id"),
            requested_revision=_get(d, str, "requested_revision"),
            resolved_revision=_get(d, str, "resolved_revision"),
            resolved_revision_type=_get(d, str, "resolved_revision_type"),

    def _to_dict(self):
        # type: () -> Dict[str, Any]
        return _filter_none(

class ArchiveInfo:
    name = "archive_info"

    def __init__(
        hash=None,  # type: Optional[str]
        self.hash = hash

    def _from_dict(cls, d):
        # type: (Optional[Dict[str, Any]]) -> Optional[ArchiveInfo]
        if d is None:
            return None
        return cls(hash=_get(d, str, "hash"))

    def _to_dict(self):
        # type: () -> Dict[str, Any]
        return _filter_none(hash=self.hash)

class DirInfo:
    name = "dir_info"

    def __init__(
        editable=False,  # type: bool
        self.editable = editable

    def _from_dict(cls, d):
        # type: (Optional[Dict[str, Any]]) -> Optional[DirInfo]
        if d is None:
            return None
        return cls(
            editable=_get_required(d, bool, "editable", default=False)

    def _to_dict(self):
        # type: () -> Dict[str, Any]
        return _filter_none(editable=self.editable or None)

    InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]

class DirectUrl:

    def __init__(
        url,  # type: str
        info,  # type: InfoType
        subdirectory=None,  # type: Optional[str]
        self.url = url = info
        self.subdirectory = subdirectory

    def _remove_auth_from_netloc(self, netloc):
        # type: (str) -> str
        if "@" not in netloc:
            return netloc
        user_pass, netloc_no_user_pass = netloc.split("@", 1)
        if (
            isinstance(, VcsInfo) and
   == "git" and
            user_pass == "git"
            return netloc
        if ENV_VAR_RE.match(user_pass):
            return netloc
        return netloc_no_user_pass

    def redacted_url(self):
        # type: () -> str
        """url with user:password part removed unless it is formed with
        environment variables as specified in PEP 610, or it is ``git``
        in the case of a git URL.
        purl = urllib.parse.urlsplit(self.url)
        netloc = self._remove_auth_from_netloc(purl.netloc)
        surl = urllib.parse.urlunsplit(
            (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
        return surl

    def validate(self):
        # type: () -> None

    def from_dict(cls, d):
        # type: (Dict[str, Any]) -> DirectUrl
        return DirectUrl(
            url=_get_required(d, str, "url"),
            subdirectory=_get(d, str, "subdirectory"),
                    ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
                    DirInfo._from_dict(_get(d, dict, "dir_info")),
                    VcsInfo._from_dict(_get(d, dict, "vcs_info")),

    def to_dict(self):
        # type: () -> Dict[str, Any]
        res = _filter_none(
        res[] =
        return res

    def from_json(cls, s):
        # type: (str) -> DirectUrl
        return cls.from_dict(json.loads(s))

    def to_json(self):
        # type: () -> str
        return json.dumps(self.to_dict(), sort_keys=True)