Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/pip/_internal/req/req_file.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/pip/_internal/req/req_file.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,560 @@ +""" +Requirements file parsing +""" + +import optparse +import os +import re +import shlex +import urllib.parse + +from pip._internal.cli import cmdoptions +from pip._internal.exceptions import InstallationError, RequirementsFileParseError +from pip._internal.models.search_scope import SearchScope +from pip._internal.network.utils import raise_for_status +from pip._internal.utils.encoding import auto_decode +from pip._internal.utils.typing import MYPY_CHECK_RUNNING +from pip._internal.utils.urls import get_url_scheme, url_to_path + +if MYPY_CHECK_RUNNING: + from optparse import Values + from typing import ( + Any, + Callable, + Dict, + Iterator, + List, + NoReturn, + Optional, + Text, + Tuple, + ) + + from pip._internal.index.package_finder import PackageFinder + from pip._internal.network.session import PipSession + + ReqFileLines = Iterator[Tuple[int, Text]] + + LineParser = Callable[[Text], Tuple[str, Values]] + + +__all__ = ['parse_requirements'] + +SCHEME_RE = re.compile(r'^(http|https|file):', re.I) +COMMENT_RE = re.compile(r'(^|\s+)#.*$') + +# Matches environment variable-style values in '${MY_VARIABLE_1}' with the +# variable name consisting of only uppercase letters, digits or the '_' +# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1, +# 2013 Edition. +ENV_VAR_RE = re.compile(r'(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})') + +SUPPORTED_OPTIONS = [ + cmdoptions.index_url, + cmdoptions.extra_index_url, + cmdoptions.no_index, + cmdoptions.constraints, + cmdoptions.requirements, + cmdoptions.editable, + cmdoptions.find_links, + cmdoptions.no_binary, + cmdoptions.only_binary, + cmdoptions.prefer_binary, + cmdoptions.require_hashes, + cmdoptions.pre, + cmdoptions.trusted_host, + cmdoptions.use_new_feature, +] # type: List[Callable[..., optparse.Option]] + +# options to be passed to requirements +SUPPORTED_OPTIONS_REQ = [ + cmdoptions.install_options, + cmdoptions.global_options, + cmdoptions.hash, +] # type: List[Callable[..., optparse.Option]] + +# the 'dest' string values +SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ] + + +class ParsedRequirement: + def __init__( + self, + requirement, # type:str + is_editable, # type: bool + comes_from, # type: str + constraint, # type: bool + options=None, # type: Optional[Dict[str, Any]] + line_source=None, # type: Optional[str] + ): + # type: (...) -> None + self.requirement = requirement + self.is_editable = is_editable + self.comes_from = comes_from + self.options = options + self.constraint = constraint + self.line_source = line_source + + +class ParsedLine: + def __init__( + self, + filename, # type: str + lineno, # type: int + args, # type: str + opts, # type: Values + constraint, # type: bool + ): + # type: (...) -> None + self.filename = filename + self.lineno = lineno + self.opts = opts + self.constraint = constraint + + if args: + self.is_requirement = True + self.is_editable = False + self.requirement = args + elif opts.editables: + self.is_requirement = True + self.is_editable = True + # We don't support multiple -e on one line + self.requirement = opts.editables[0] + else: + self.is_requirement = False + + +def parse_requirements( + filename, # type: str + session, # type: PipSession + finder=None, # type: Optional[PackageFinder] + options=None, # type: Optional[optparse.Values] + constraint=False, # type: bool +): + # type: (...) -> Iterator[ParsedRequirement] + """Parse a requirements file and yield ParsedRequirement instances. + + :param filename: Path or url of requirements file. + :param session: PipSession instance. + :param finder: Instance of pip.index.PackageFinder. + :param options: cli options. + :param constraint: If true, parsing a constraint file rather than + requirements file. + """ + line_parser = get_line_parser(finder) + parser = RequirementsFileParser(session, line_parser) + + for parsed_line in parser.parse(filename, constraint): + parsed_req = handle_line( + parsed_line, + options=options, + finder=finder, + session=session + ) + if parsed_req is not None: + yield parsed_req + + +def preprocess(content): + # type: (str) -> ReqFileLines + """Split, filter, and join lines, and return a line iterator + + :param content: the content of the requirements file + """ + lines_enum = enumerate(content.splitlines(), start=1) # type: ReqFileLines + lines_enum = join_lines(lines_enum) + lines_enum = ignore_comments(lines_enum) + lines_enum = expand_env_variables(lines_enum) + return lines_enum + + +def handle_requirement_line( + line, # type: ParsedLine + options=None, # type: Optional[optparse.Values] +): + # type: (...) -> ParsedRequirement + + # preserve for the nested code path + line_comes_from = '{} {} (line {})'.format( + '-c' if line.constraint else '-r', line.filename, line.lineno, + ) + + assert line.is_requirement + + if line.is_editable: + # For editable requirements, we don't support per-requirement + # options, so just return the parsed requirement. + return ParsedRequirement( + requirement=line.requirement, + is_editable=line.is_editable, + comes_from=line_comes_from, + constraint=line.constraint, + ) + else: + if options: + # Disable wheels if the user has specified build options + cmdoptions.check_install_build_global(options, line.opts) + + # get the options that apply to requirements + req_options = {} + for dest in SUPPORTED_OPTIONS_REQ_DEST: + if dest in line.opts.__dict__ and line.opts.__dict__[dest]: + req_options[dest] = line.opts.__dict__[dest] + + line_source = f'line {line.lineno} of {line.filename}' + return ParsedRequirement( + requirement=line.requirement, + is_editable=line.is_editable, + comes_from=line_comes_from, + constraint=line.constraint, + options=req_options, + line_source=line_source, + ) + + +def handle_option_line( + opts, # type: Values + filename, # type: str + lineno, # type: int + finder=None, # type: Optional[PackageFinder] + options=None, # type: Optional[optparse.Values] + session=None, # type: Optional[PipSession] +): + # type: (...) -> None + + if options: + # percolate options upward + if opts.require_hashes: + options.require_hashes = opts.require_hashes + if opts.features_enabled: + options.features_enabled.extend( + f for f in opts.features_enabled + if f not in options.features_enabled + ) + + # set finder options + if finder: + find_links = finder.find_links + index_urls = finder.index_urls + if opts.index_url: + index_urls = [opts.index_url] + if opts.no_index is True: + index_urls = [] + if opts.extra_index_urls: + index_urls.extend(opts.extra_index_urls) + if opts.find_links: + # FIXME: it would be nice to keep track of the source + # of the find_links: support a find-links local path + # relative to a requirements file. + value = opts.find_links[0] + req_dir = os.path.dirname(os.path.abspath(filename)) + relative_to_reqs_file = os.path.join(req_dir, value) + if os.path.exists(relative_to_reqs_file): + value = relative_to_reqs_file + find_links.append(value) + + if session: + # We need to update the auth urls in session + session.update_index_urls(index_urls) + + search_scope = SearchScope( + find_links=find_links, + index_urls=index_urls, + ) + finder.search_scope = search_scope + + if opts.pre: + finder.set_allow_all_prereleases() + + if opts.prefer_binary: + finder.set_prefer_binary() + + if session: + for host in opts.trusted_hosts or []: + source = f'line {lineno} of {filename}' + session.add_trusted_host(host, source=source) + + +def handle_line( + line, # type: ParsedLine + options=None, # type: Optional[optparse.Values] + finder=None, # type: Optional[PackageFinder] + session=None, # type: Optional[PipSession] +): + # type: (...) -> Optional[ParsedRequirement] + """Handle a single parsed requirements line; This can result in + creating/yielding requirements, or updating the finder. + + :param line: The parsed line to be processed. + :param options: CLI options. + :param finder: The finder - updated by non-requirement lines. + :param session: The session - updated by non-requirement lines. + + Returns a ParsedRequirement object if the line is a requirement line, + otherwise returns None. + + For lines that contain requirements, the only options that have an effect + are from SUPPORTED_OPTIONS_REQ, and they are scoped to the + requirement. Other options from SUPPORTED_OPTIONS may be present, but are + ignored. + + For lines that do not contain requirements, the only options that have an + effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may + be present, but are ignored. These lines may contain multiple options + (although our docs imply only one is supported), and all our parsed and + affect the finder. + """ + + if line.is_requirement: + parsed_req = handle_requirement_line(line, options) + return parsed_req + else: + handle_option_line( + line.opts, + line.filename, + line.lineno, + finder, + options, + session, + ) + return None + + +class RequirementsFileParser: + def __init__( + self, + session, # type: PipSession + line_parser, # type: LineParser + ): + # type: (...) -> None + self._session = session + self._line_parser = line_parser + + def parse(self, filename, constraint): + # type: (str, bool) -> Iterator[ParsedLine] + """Parse a given file, yielding parsed lines. + """ + yield from self._parse_and_recurse(filename, constraint) + + def _parse_and_recurse(self, filename, constraint): + # type: (str, bool) -> Iterator[ParsedLine] + for line in self._parse_file(filename, constraint): + if ( + not line.is_requirement and + (line.opts.requirements or line.opts.constraints) + ): + # parse a nested requirements file + if line.opts.requirements: + req_path = line.opts.requirements[0] + nested_constraint = False + else: + req_path = line.opts.constraints[0] + nested_constraint = True + + # original file is over http + if SCHEME_RE.search(filename): + # do a url join so relative paths work + req_path = urllib.parse.urljoin(filename, req_path) + # original file and nested file are paths + elif not SCHEME_RE.search(req_path): + # do a join so relative paths work + req_path = os.path.join( + os.path.dirname(filename), req_path, + ) + + yield from self._parse_and_recurse(req_path, nested_constraint) + else: + yield line + + def _parse_file(self, filename, constraint): + # type: (str, bool) -> Iterator[ParsedLine] + _, content = get_file_content(filename, self._session) + + lines_enum = preprocess(content) + + for line_number, line in lines_enum: + try: + args_str, opts = self._line_parser(line) + except OptionParsingError as e: + # add offending line + msg = f'Invalid requirement: {line}\n{e.msg}' + raise RequirementsFileParseError(msg) + + yield ParsedLine( + filename, + line_number, + args_str, + opts, + constraint, + ) + + +def get_line_parser(finder): + # type: (Optional[PackageFinder]) -> LineParser + def parse_line(line): + # type: (str) -> Tuple[str, Values] + # Build new parser for each line since it accumulates appendable + # options. + parser = build_parser() + defaults = parser.get_default_values() + defaults.index_url = None + if finder: + defaults.format_control = finder.format_control + + args_str, options_str = break_args_options(line) + + opts, _ = parser.parse_args(shlex.split(options_str), defaults) + + return args_str, opts + + return parse_line + + +def break_args_options(line): + # type: (str) -> Tuple[str, str] + """Break up the line into an args and options string. We only want to shlex + (and then optparse) the options, not the args. args can contain markers + which are corrupted by shlex. + """ + tokens = line.split(' ') + args = [] + options = tokens[:] + for token in tokens: + if token.startswith('-') or token.startswith('--'): + break + else: + args.append(token) + options.pop(0) + return ' '.join(args), ' '.join(options) + + +class OptionParsingError(Exception): + def __init__(self, msg): + # type: (str) -> None + self.msg = msg + + +def build_parser(): + # type: () -> optparse.OptionParser + """ + Return a parser for parsing requirement lines + """ + parser = optparse.OptionParser(add_help_option=False) + + option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ + for option_factory in option_factories: + option = option_factory() + parser.add_option(option) + + # By default optparse sys.exits on parsing errors. We want to wrap + # that in our own exception. + def parser_exit(self, msg): + # type: (Any, str) -> NoReturn + raise OptionParsingError(msg) + # NOTE: mypy disallows assigning to a method + # https://github.com/python/mypy/issues/2427 + parser.exit = parser_exit # type: ignore + + return parser + + +def join_lines(lines_enum): + # type: (ReqFileLines) -> ReqFileLines + """Joins a line ending in '\' with the previous line (except when following + comments). The joined line takes on the index of the first line. + """ + primary_line_number = None + new_line = [] # type: List[str] + for line_number, line in lines_enum: + if not line.endswith('\\') or COMMENT_RE.match(line): + if COMMENT_RE.match(line): + # this ensures comments are always matched later + line = ' ' + line + if new_line: + new_line.append(line) + assert primary_line_number is not None + yield primary_line_number, ''.join(new_line) + new_line = [] + else: + yield line_number, line + else: + if not new_line: + primary_line_number = line_number + new_line.append(line.strip('\\')) + + # last line contains \ + if new_line: + assert primary_line_number is not None + yield primary_line_number, ''.join(new_line) + + # TODO: handle space after '\'. + + +def ignore_comments(lines_enum): + # type: (ReqFileLines) -> ReqFileLines + """ + Strips comments and filter empty lines. + """ + for line_number, line in lines_enum: + line = COMMENT_RE.sub('', line) + line = line.strip() + if line: + yield line_number, line + + +def expand_env_variables(lines_enum): + # type: (ReqFileLines) -> ReqFileLines + """Replace all environment variables that can be retrieved via `os.getenv`. + + The only allowed format for environment variables defined in the + requirement file is `${MY_VARIABLE_1}` to ensure two things: + + 1. Strings that contain a `$` aren't accidentally (partially) expanded. + 2. Ensure consistency across platforms for requirement files. + + These points are the result of a discussion on the `github pull + request #3514 <https://github.com/pypa/pip/pull/3514>`_. + + Valid characters in variable names follow the `POSIX standard + <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited + to uppercase letter, digits and the `_` (underscore). + """ + for line_number, line in lines_enum: + for env_var, var_name in ENV_VAR_RE.findall(line): + value = os.getenv(var_name) + if not value: + continue + + line = line.replace(env_var, value) + + yield line_number, line + + +def get_file_content(url, session): + # type: (str, PipSession) -> Tuple[str, str] + """Gets the content of a file; it may be a filename, file: URL, or + http: URL. Returns (location, content). Content is unicode. + Respects # -*- coding: declarations on the retrieved files. + + :param url: File path or url. + :param session: PipSession instance. + """ + scheme = get_url_scheme(url) + + if scheme in ['http', 'https']: + # FIXME: catch some errors + resp = session.get(url) + raise_for_status(resp) + return resp.url, resp.text + + elif scheme == 'file': + url = url_to_path(url) + + try: + with open(url, 'rb') as f: + content = auto_decode(f.read()) + except OSError as exc: + raise InstallationError( + f'Could not open requirements file: {exc}' + ) + return url, content