diff env/lib/python3.9/site-packages/pip/_internal/network/download.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/pip/_internal/network/download.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,202 @@
+"""Download files with progress indicators.
+"""
+import cgi
+import logging
+import mimetypes
+import os
+
+from pip._vendor.requests.models import CONTENT_CHUNK_SIZE
+
+from pip._internal.cli.progress_bars import DownloadProgressProvider
+from pip._internal.exceptions import NetworkConnectionError
+from pip._internal.models.index import PyPI
+from pip._internal.network.cache import is_from_cache
+from pip._internal.network.utils import HEADERS, raise_for_status, response_chunks
+from pip._internal.utils.misc import format_size, redact_auth_from_url, splitext
+from pip._internal.utils.typing import MYPY_CHECK_RUNNING
+
+if MYPY_CHECK_RUNNING:
+    from typing import Iterable, Optional, Tuple
+
+    from pip._vendor.requests.models import Response
+
+    from pip._internal.models.link import Link
+    from pip._internal.network.session import PipSession
+
+logger = logging.getLogger(__name__)
+
+
+def _get_http_response_size(resp):
+    # type: (Response) -> Optional[int]
+    try:
+        return int(resp.headers['content-length'])
+    except (ValueError, KeyError, TypeError):
+        return None
+
+
+def _prepare_download(
+    resp,  # type: Response
+    link,  # type: Link
+    progress_bar  # type: str
+):
+    # type: (...) -> Iterable[bytes]
+    total_length = _get_http_response_size(resp)
+
+    if link.netloc == PyPI.file_storage_domain:
+        url = link.show_url
+    else:
+        url = link.url_without_fragment
+
+    logged_url = redact_auth_from_url(url)
+
+    if total_length:
+        logged_url = '{} ({})'.format(logged_url, format_size(total_length))
+
+    if is_from_cache(resp):
+        logger.info("Using cached %s", logged_url)
+    else:
+        logger.info("Downloading %s", logged_url)
+
+    if logger.getEffectiveLevel() > logging.INFO:
+        show_progress = False
+    elif is_from_cache(resp):
+        show_progress = False
+    elif not total_length:
+        show_progress = True
+    elif total_length > (40 * 1000):
+        show_progress = True
+    else:
+        show_progress = False
+
+    chunks = response_chunks(resp, CONTENT_CHUNK_SIZE)
+
+    if not show_progress:
+        return chunks
+
+    return DownloadProgressProvider(
+        progress_bar, max=total_length
+    )(chunks)
+
+
+def sanitize_content_filename(filename):
+    # type: (str) -> str
+    """
+    Sanitize the "filename" value from a Content-Disposition header.
+    """
+    return os.path.basename(filename)
+
+
+def parse_content_disposition(content_disposition, default_filename):
+    # type: (str, str) -> str
+    """
+    Parse the "filename" value from a Content-Disposition header, and
+    return the default filename if the result is empty.
+    """
+    _type, params = cgi.parse_header(content_disposition)
+    filename = params.get('filename')
+    if filename:
+        # We need to sanitize the filename to prevent directory traversal
+        # in case the filename contains ".." path parts.
+        filename = sanitize_content_filename(filename)
+    return filename or default_filename
+
+
+def _get_http_response_filename(resp, link):
+    # type: (Response, Link) -> str
+    """Get an ideal filename from the given HTTP response, falling back to
+    the link filename if not provided.
+    """
+    filename = link.filename  # fallback
+    # Have a look at the Content-Disposition header for a better guess
+    content_disposition = resp.headers.get('content-disposition')
+    if content_disposition:
+        filename = parse_content_disposition(content_disposition, filename)
+    ext = splitext(filename)[1]  # type: Optional[str]
+    if not ext:
+        ext = mimetypes.guess_extension(
+            resp.headers.get('content-type', '')
+        )
+        if ext:
+            filename += ext
+    if not ext and link.url != resp.url:
+        ext = os.path.splitext(resp.url)[1]
+        if ext:
+            filename += ext
+    return filename
+
+
+def _http_get_download(session, link):
+    # type: (PipSession, Link) -> Response
+    target_url = link.url.split('#', 1)[0]
+    resp = session.get(target_url, headers=HEADERS, stream=True)
+    raise_for_status(resp)
+    return resp
+
+
+class Downloader:
+    def __init__(
+        self,
+        session,  # type: PipSession
+        progress_bar,  # type: str
+    ):
+        # type: (...) -> None
+        self._session = session
+        self._progress_bar = progress_bar
+
+    def __call__(self, link, location):
+        # type: (Link, str) -> Tuple[str, str]
+        """Download the file given by link into location."""
+        try:
+            resp = _http_get_download(self._session, link)
+        except NetworkConnectionError as e:
+            assert e.response is not None
+            logger.critical(
+                "HTTP error %s while getting %s", e.response.status_code, link
+            )
+            raise
+
+        filename = _get_http_response_filename(resp, link)
+        filepath = os.path.join(location, filename)
+
+        chunks = _prepare_download(resp, link, self._progress_bar)
+        with open(filepath, 'wb') as content_file:
+            for chunk in chunks:
+                content_file.write(chunk)
+        content_type = resp.headers.get('Content-Type', '')
+        return filepath, content_type
+
+
+class BatchDownloader:
+
+    def __init__(
+        self,
+        session,  # type: PipSession
+        progress_bar,  # type: str
+    ):
+        # type: (...) -> None
+        self._session = session
+        self._progress_bar = progress_bar
+
+    def __call__(self, links, location):
+        # type: (Iterable[Link], str) -> Iterable[Tuple[str, Tuple[str, str]]]
+        """Download the files given by links into location."""
+        for link in links:
+            try:
+                resp = _http_get_download(self._session, link)
+            except NetworkConnectionError as e:
+                assert e.response is not None
+                logger.critical(
+                    "HTTP error %s while getting %s",
+                    e.response.status_code, link,
+                )
+                raise
+
+            filename = _get_http_response_filename(resp, link)
+            filepath = os.path.join(location, filename)
+
+            chunks = _prepare_download(resp, link, self._progress_bar)
+            with open(filepath, 'wb') as content_file:
+                for chunk in chunks:
+                    content_file.write(chunk)
+            content_type = resp.headers.get('Content-Type', '')
+            yield link.url, (filepath, content_type)