diff env/lib/python3.9/site-packages/pip/_internal/index/collector.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/pip/_internal/index/collector.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,666 @@
+"""
+The main purpose of this module is to expose LinkCollector.collect_links().
+"""
+
+import cgi
+import functools
+import itertools
+import logging
+import mimetypes
+import os
+import re
+import urllib.parse
+import urllib.request
+from collections import OrderedDict
+
+from pip._vendor import html5lib, requests
+from pip._vendor.distlib.compat import unescape
+from pip._vendor.requests.exceptions import RetryError, SSLError
+
+from pip._internal.exceptions import NetworkConnectionError
+from pip._internal.models.link import Link
+from pip._internal.models.search_scope import SearchScope
+from pip._internal.network.utils import raise_for_status
+from pip._internal.utils.filetypes import is_archive_file
+from pip._internal.utils.misc import pairwise, redact_auth_from_url
+from pip._internal.utils.typing import MYPY_CHECK_RUNNING
+from pip._internal.utils.urls import path_to_url, url_to_path
+from pip._internal.vcs import is_url, vcs
+
+if MYPY_CHECK_RUNNING:
+    import xml.etree.ElementTree
+    from optparse import Values
+    from typing import (
+        Callable,
+        Iterable,
+        List,
+        MutableMapping,
+        Optional,
+        Sequence,
+        Tuple,
+        Union,
+    )
+
+    from pip._vendor.requests import Response
+
+    from pip._internal.network.session import PipSession
+
+    HTMLElement = xml.etree.ElementTree.Element
+    ResponseHeaders = MutableMapping[str, str]
+
+
+logger = logging.getLogger(__name__)
+
+
+def _match_vcs_scheme(url):
+    # type: (str) -> Optional[str]
+    """Look for VCS schemes in the URL.
+
+    Returns the matched VCS scheme, or None if there's no match.
+    """
+    for scheme in vcs.schemes:
+        if url.lower().startswith(scheme) and url[len(scheme)] in '+:':
+            return scheme
+    return None
+
+
+class _NotHTML(Exception):
+    def __init__(self, content_type, request_desc):
+        # type: (str, str) -> None
+        super().__init__(content_type, request_desc)
+        self.content_type = content_type
+        self.request_desc = request_desc
+
+
+def _ensure_html_header(response):
+    # type: (Response) -> None
+    """Check the Content-Type header to ensure the response contains HTML.
+
+    Raises `_NotHTML` if the content type is not text/html.
+    """
+    content_type = response.headers.get("Content-Type", "")
+    if not content_type.lower().startswith("text/html"):
+        raise _NotHTML(content_type, response.request.method)
+
+
+class _NotHTTP(Exception):
+    pass
+
+
+def _ensure_html_response(url, session):
+    # type: (str, PipSession) -> None
+    """Send a HEAD request to the URL, and ensure the response contains HTML.
+
+    Raises `_NotHTTP` if the URL is not available for a HEAD request, or
+    `_NotHTML` if the content type is not text/html.
+    """
+    scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url)
+    if scheme not in {'http', 'https'}:
+        raise _NotHTTP()
+
+    resp = session.head(url, allow_redirects=True)
+    raise_for_status(resp)
+
+    _ensure_html_header(resp)
+
+
+def _get_html_response(url, session):
+    # type: (str, PipSession) -> Response
+    """Access an HTML page with GET, and return the response.
+
+    This consists of three parts:
+
+    1. If the URL looks suspiciously like an archive, send a HEAD first to
+       check the Content-Type is HTML, to avoid downloading a large file.
+       Raise `_NotHTTP` if the content type cannot be determined, or
+       `_NotHTML` if it is not HTML.
+    2. Actually perform the request. Raise HTTP exceptions on network failures.
+    3. Check the Content-Type header to make sure we got HTML, and raise
+       `_NotHTML` otherwise.
+    """
+    if is_archive_file(Link(url).filename):
+        _ensure_html_response(url, session=session)
+
+    logger.debug('Getting page %s', redact_auth_from_url(url))
+
+    resp = session.get(
+        url,
+        headers={
+            "Accept": "text/html",
+            # We don't want to blindly returned cached data for
+            # /simple/, because authors generally expecting that
+            # twine upload && pip install will function, but if
+            # they've done a pip install in the last ~10 minutes
+            # it won't. Thus by setting this to zero we will not
+            # blindly use any cached data, however the benefit of
+            # using max-age=0 instead of no-cache, is that we will
+            # still support conditional requests, so we will still
+            # minimize traffic sent in cases where the page hasn't
+            # changed at all, we will just always incur the round
+            # trip for the conditional GET now instead of only
+            # once per 10 minutes.
+            # For more information, please see pypa/pip#5670.
+            "Cache-Control": "max-age=0",
+        },
+    )
+    raise_for_status(resp)
+
+    # The check for archives above only works if the url ends with
+    # something that looks like an archive. However that is not a
+    # requirement of an url. Unless we issue a HEAD request on every
+    # url we cannot know ahead of time for sure if something is HTML
+    # or not. However we can check after we've downloaded it.
+    _ensure_html_header(resp)
+
+    return resp
+
+
+def _get_encoding_from_headers(headers):
+    # type: (ResponseHeaders) -> Optional[str]
+    """Determine if we have any encoding information in our headers.
+    """
+    if headers and "Content-Type" in headers:
+        content_type, params = cgi.parse_header(headers["Content-Type"])
+        if "charset" in params:
+            return params['charset']
+    return None
+
+
+def _determine_base_url(document, page_url):
+    # type: (HTMLElement, str) -> str
+    """Determine the HTML document's base URL.
+
+    This looks for a ``<base>`` tag in the HTML document. If present, its href
+    attribute denotes the base URL of anchor tags in the document. If there is
+    no such tag (or if it does not have a valid href attribute), the HTML
+    file's URL is used as the base URL.
+
+    :param document: An HTML document representation. The current
+        implementation expects the result of ``html5lib.parse()``.
+    :param page_url: The URL of the HTML document.
+    """
+    for base in document.findall(".//base"):
+        href = base.get("href")
+        if href is not None:
+            return href
+    return page_url
+
+
+def _clean_url_path_part(part):
+    # type: (str) -> str
+    """
+    Clean a "part" of a URL path (i.e. after splitting on "@" characters).
+    """
+    # We unquote prior to quoting to make sure nothing is double quoted.
+    return urllib.parse.quote(urllib.parse.unquote(part))
+
+
+def _clean_file_url_path(part):
+    # type: (str) -> str
+    """
+    Clean the first part of a URL path that corresponds to a local
+    filesystem path (i.e. the first part after splitting on "@" characters).
+    """
+    # We unquote prior to quoting to make sure nothing is double quoted.
+    # Also, on Windows the path part might contain a drive letter which
+    # should not be quoted. On Linux where drive letters do not
+    # exist, the colon should be quoted. We rely on urllib.request
+    # to do the right thing here.
+    return urllib.request.pathname2url(urllib.request.url2pathname(part))
+
+
+# percent-encoded:                   /
+_reserved_chars_re = re.compile('(@|%2F)', re.IGNORECASE)
+
+
+def _clean_url_path(path, is_local_path):
+    # type: (str, bool) -> str
+    """
+    Clean the path portion of a URL.
+    """
+    if is_local_path:
+        clean_func = _clean_file_url_path
+    else:
+        clean_func = _clean_url_path_part
+
+    # Split on the reserved characters prior to cleaning so that
+    # revision strings in VCS URLs are properly preserved.
+    parts = _reserved_chars_re.split(path)
+
+    cleaned_parts = []
+    for to_clean, reserved in pairwise(itertools.chain(parts, [''])):
+        cleaned_parts.append(clean_func(to_clean))
+        # Normalize %xx escapes (e.g. %2f -> %2F)
+        cleaned_parts.append(reserved.upper())
+
+    return ''.join(cleaned_parts)
+
+
+def _clean_link(url):
+    # type: (str) -> str
+    """
+    Make sure a link is fully quoted.
+    For example, if ' ' occurs in the URL, it will be replaced with "%20",
+    and without double-quoting other characters.
+    """
+    # Split the URL into parts according to the general structure
+    # `scheme://netloc/path;parameters?query#fragment`.
+    result = urllib.parse.urlparse(url)
+    # If the netloc is empty, then the URL refers to a local filesystem path.
+    is_local_path = not result.netloc
+    path = _clean_url_path(result.path, is_local_path=is_local_path)
+    return urllib.parse.urlunparse(result._replace(path=path))
+
+
+def _create_link_from_element(
+    anchor,    # type: HTMLElement
+    page_url,  # type: str
+    base_url,  # type: str
+):
+    # type: (...) -> Optional[Link]
+    """
+    Convert an anchor element in a simple repository page to a Link.
+    """
+    href = anchor.get("href")
+    if not href:
+        return None
+
+    url = _clean_link(urllib.parse.urljoin(base_url, href))
+    pyrequire = anchor.get('data-requires-python')
+    pyrequire = unescape(pyrequire) if pyrequire else None
+
+    yanked_reason = anchor.get('data-yanked')
+    if yanked_reason:
+        # This is a unicode string in Python 2 (and 3).
+        yanked_reason = unescape(yanked_reason)
+
+    link = Link(
+        url,
+        comes_from=page_url,
+        requires_python=pyrequire,
+        yanked_reason=yanked_reason,
+    )
+
+    return link
+
+
+class CacheablePageContent:
+    def __init__(self, page):
+        # type: (HTMLPage) -> None
+        assert page.cache_link_parsing
+        self.page = page
+
+    def __eq__(self, other):
+        # type: (object) -> bool
+        return (isinstance(other, type(self)) and
+                self.page.url == other.page.url)
+
+    def __hash__(self):
+        # type: () -> int
+        return hash(self.page.url)
+
+
+def with_cached_html_pages(
+    fn,    # type: Callable[[HTMLPage], Iterable[Link]]
+):
+    # type: (...) -> Callable[[HTMLPage], List[Link]]
+    """
+    Given a function that parses an Iterable[Link] from an HTMLPage, cache the
+    function's result (keyed by CacheablePageContent), unless the HTMLPage
+    `page` has `page.cache_link_parsing == False`.
+    """
+
+    @functools.lru_cache(maxsize=None)
+    def wrapper(cacheable_page):
+        # type: (CacheablePageContent) -> List[Link]
+        return list(fn(cacheable_page.page))
+
+    @functools.wraps(fn)
+    def wrapper_wrapper(page):
+        # type: (HTMLPage) -> List[Link]
+        if page.cache_link_parsing:
+            return wrapper(CacheablePageContent(page))
+        return list(fn(page))
+
+    return wrapper_wrapper
+
+
+@with_cached_html_pages
+def parse_links(page):
+    # type: (HTMLPage) -> Iterable[Link]
+    """
+    Parse an HTML document, and yield its anchor elements as Link objects.
+    """
+    document = html5lib.parse(
+        page.content,
+        transport_encoding=page.encoding,
+        namespaceHTMLElements=False,
+    )
+
+    url = page.url
+    base_url = _determine_base_url(document, url)
+    for anchor in document.findall(".//a"):
+        link = _create_link_from_element(
+            anchor,
+            page_url=url,
+            base_url=base_url,
+        )
+        if link is None:
+            continue
+        yield link
+
+
+class HTMLPage:
+    """Represents one page, along with its URL"""
+
+    def __init__(
+        self,
+        content,                  # type: bytes
+        encoding,                 # type: Optional[str]
+        url,                      # type: str
+        cache_link_parsing=True,  # type: bool
+    ):
+        # type: (...) -> None
+        """
+        :param encoding: the encoding to decode the given content.
+        :param url: the URL from which the HTML was downloaded.
+        :param cache_link_parsing: whether links parsed from this page's url
+                                   should be cached. PyPI index urls should
+                                   have this set to False, for example.
+        """
+        self.content = content
+        self.encoding = encoding
+        self.url = url
+        self.cache_link_parsing = cache_link_parsing
+
+    def __str__(self):
+        # type: () -> str
+        return redact_auth_from_url(self.url)
+
+
+def _handle_get_page_fail(
+    link,  # type: Link
+    reason,  # type: Union[str, Exception]
+    meth=None  # type: Optional[Callable[..., None]]
+):
+    # type: (...) -> None
+    if meth is None:
+        meth = logger.debug
+    meth("Could not fetch URL %s: %s - skipping", link, reason)
+
+
+def _make_html_page(response, cache_link_parsing=True):
+    # type: (Response, bool) -> HTMLPage
+    encoding = _get_encoding_from_headers(response.headers)
+    return HTMLPage(
+        response.content,
+        encoding=encoding,
+        url=response.url,
+        cache_link_parsing=cache_link_parsing)
+
+
+def _get_html_page(link, session=None):
+    # type: (Link, Optional[PipSession]) -> Optional[HTMLPage]
+    if session is None:
+        raise TypeError(
+            "_get_html_page() missing 1 required keyword argument: 'session'"
+        )
+
+    url = link.url.split('#', 1)[0]
+
+    # Check for VCS schemes that do not support lookup as web pages.
+    vcs_scheme = _match_vcs_scheme(url)
+    if vcs_scheme:
+        logger.warning('Cannot look at %s URL %s because it does not support '
+                       'lookup as web pages.', vcs_scheme, link)
+        return None
+
+    # Tack index.html onto file:// URLs that point to directories
+    scheme, _, path, _, _, _ = urllib.parse.urlparse(url)
+    if (scheme == 'file' and os.path.isdir(urllib.request.url2pathname(path))):
+        # add trailing slash if not present so urljoin doesn't trim
+        # final segment
+        if not url.endswith('/'):
+            url += '/'
+        url = urllib.parse.urljoin(url, 'index.html')
+        logger.debug(' file: URL is directory, getting %s', url)
+
+    try:
+        resp = _get_html_response(url, session=session)
+    except _NotHTTP:
+        logger.warning(
+            'Skipping page %s because it looks like an archive, and cannot '
+            'be checked by a HTTP HEAD request.', link,
+        )
+    except _NotHTML as exc:
+        logger.warning(
+            'Skipping page %s because the %s request got Content-Type: %s.'
+            'The only supported Content-Type is text/html',
+            link, exc.request_desc, exc.content_type,
+        )
+    except NetworkConnectionError as exc:
+        _handle_get_page_fail(link, exc)
+    except RetryError as exc:
+        _handle_get_page_fail(link, exc)
+    except SSLError as exc:
+        reason = "There was a problem confirming the ssl certificate: "
+        reason += str(exc)
+        _handle_get_page_fail(link, reason, meth=logger.info)
+    except requests.ConnectionError as exc:
+        _handle_get_page_fail(link, f"connection error: {exc}")
+    except requests.Timeout:
+        _handle_get_page_fail(link, "timed out")
+    else:
+        return _make_html_page(resp,
+                               cache_link_parsing=link.cache_link_parsing)
+    return None
+
+
+def _remove_duplicate_links(links):
+    # type: (Iterable[Link]) -> List[Link]
+    """
+    Return a list of links, with duplicates removed and ordering preserved.
+    """
+    # We preserve the ordering when removing duplicates because we can.
+    return list(OrderedDict.fromkeys(links))
+
+
+def group_locations(locations, expand_dir=False):
+    # type: (Sequence[str], bool) -> Tuple[List[str], List[str]]
+    """
+    Divide a list of locations into two groups: "files" (archives) and "urls."
+
+    :return: A pair of lists (files, urls).
+    """
+    files = []
+    urls = []
+
+    # puts the url for the given file path into the appropriate list
+    def sort_path(path):
+        # type: (str) -> None
+        url = path_to_url(path)
+        if mimetypes.guess_type(url, strict=False)[0] == 'text/html':
+            urls.append(url)
+        else:
+            files.append(url)
+
+    for url in locations:
+
+        is_local_path = os.path.exists(url)
+        is_file_url = url.startswith('file:')
+
+        if is_local_path or is_file_url:
+            if is_local_path:
+                path = url
+            else:
+                path = url_to_path(url)
+            if os.path.isdir(path):
+                if expand_dir:
+                    path = os.path.realpath(path)
+                    for item in os.listdir(path):
+                        sort_path(os.path.join(path, item))
+                elif is_file_url:
+                    urls.append(url)
+                else:
+                    logger.warning(
+                        "Path '%s' is ignored: it is a directory.", path,
+                    )
+            elif os.path.isfile(path):
+                sort_path(path)
+            else:
+                logger.warning(
+                    "Url '%s' is ignored: it is neither a file "
+                    "nor a directory.", url,
+                )
+        elif is_url(url):
+            # Only add url with clear scheme
+            urls.append(url)
+        else:
+            logger.warning(
+                "Url '%s' is ignored. It is either a non-existing "
+                "path or lacks a specific scheme.", url,
+            )
+
+    return files, urls
+
+
+class CollectedLinks:
+
+    """
+    Encapsulates the return value of a call to LinkCollector.collect_links().
+
+    The return value includes both URLs to project pages containing package
+    links, as well as individual package Link objects collected from other
+    sources.
+
+    This info is stored separately as:
+
+    (1) links from the configured file locations,
+    (2) links from the configured find_links, and
+    (3) urls to HTML project pages, as described by the PEP 503 simple
+        repository API.
+    """
+
+    def __init__(
+        self,
+        files,         # type: List[Link]
+        find_links,    # type: List[Link]
+        project_urls,  # type: List[Link]
+    ):
+        # type: (...) -> None
+        """
+        :param files: Links from file locations.
+        :param find_links: Links from find_links.
+        :param project_urls: URLs to HTML project pages, as described by
+            the PEP 503 simple repository API.
+        """
+        self.files = files
+        self.find_links = find_links
+        self.project_urls = project_urls
+
+
+class LinkCollector:
+
+    """
+    Responsible for collecting Link objects from all configured locations,
+    making network requests as needed.
+
+    The class's main method is its collect_links() method.
+    """
+
+    def __init__(
+        self,
+        session,       # type: PipSession
+        search_scope,  # type: SearchScope
+    ):
+        # type: (...) -> None
+        self.search_scope = search_scope
+        self.session = session
+
+    @classmethod
+    def create(cls, session, options, suppress_no_index=False):
+        # type: (PipSession, Values, bool) -> LinkCollector
+        """
+        :param session: The Session to use to make requests.
+        :param suppress_no_index: Whether to ignore the --no-index option
+            when constructing the SearchScope object.
+        """
+        index_urls = [options.index_url] + options.extra_index_urls
+        if options.no_index and not suppress_no_index:
+            logger.debug(
+                'Ignoring indexes: %s',
+                ','.join(redact_auth_from_url(url) for url in index_urls),
+            )
+            index_urls = []
+
+        # Make sure find_links is a list before passing to create().
+        find_links = options.find_links or []
+
+        search_scope = SearchScope.create(
+            find_links=find_links, index_urls=index_urls,
+        )
+        link_collector = LinkCollector(
+            session=session, search_scope=search_scope,
+        )
+        return link_collector
+
+    @property
+    def find_links(self):
+        # type: () -> List[str]
+        return self.search_scope.find_links
+
+    def fetch_page(self, location):
+        # type: (Link) -> Optional[HTMLPage]
+        """
+        Fetch an HTML page containing package links.
+        """
+        return _get_html_page(location, session=self.session)
+
+    def collect_links(self, project_name):
+        # type: (str) -> CollectedLinks
+        """Find all available links for the given project name.
+
+        :return: All the Link objects (unfiltered), as a CollectedLinks object.
+        """
+        search_scope = self.search_scope
+        index_locations = search_scope.get_index_urls_locations(project_name)
+        index_file_loc, index_url_loc = group_locations(index_locations)
+        fl_file_loc, fl_url_loc = group_locations(
+            self.find_links, expand_dir=True,
+        )
+
+        file_links = [
+            Link(url) for url in itertools.chain(index_file_loc, fl_file_loc)
+        ]
+
+        # We trust every directly linked archive in find_links
+        find_link_links = [Link(url, '-f') for url in self.find_links]
+
+        # We trust every url that the user has given us whether it was given
+        # via --index-url or --find-links.
+        # We want to filter out anything that does not have a secure origin.
+        url_locations = [
+            link for link in itertools.chain(
+                # Mark PyPI indices as "cache_link_parsing == False" -- this
+                # will avoid caching the result of parsing the page for links.
+                (Link(url, cache_link_parsing=False) for url in index_url_loc),
+                (Link(url) for url in fl_url_loc),
+            )
+            if self.session.is_secure_origin(link)
+        ]
+
+        url_locations = _remove_duplicate_links(url_locations)
+        lines = [
+            '{} location(s) to search for versions of {}:'.format(
+                len(url_locations), project_name,
+            ),
+        ]
+        for link in url_locations:
+            lines.append(f'* {link}')
+        logger.debug('\n'.join(lines))
+
+        return CollectedLinks(
+            files=file_links,
+            find_links=find_link_links,
+            project_urls=url_locations,
+        )