comparison env/lib/python3.9/site-packages/pip/_internal/vcs/versioncontrol.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Handles all VCS (version control) support"""
2
3 import logging
4 import os
5 import shutil
6 import sys
7 import urllib.parse
8
9 from pip._vendor import pkg_resources
10
11 from pip._internal.exceptions import BadCommand, InstallationError
12 from pip._internal.utils.misc import (
13 ask_path_exists,
14 backup_dir,
15 display_path,
16 hide_url,
17 hide_value,
18 rmtree,
19 )
20 from pip._internal.utils.subprocess import call_subprocess, make_command
21 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
22 from pip._internal.utils.urls import get_url_scheme
23
24 if MYPY_CHECK_RUNNING:
25 from typing import (
26 Any,
27 Dict,
28 Iterable,
29 Iterator,
30 List,
31 Mapping,
32 Optional,
33 Tuple,
34 Type,
35 Union,
36 )
37
38 from pip._internal.cli.spinners import SpinnerInterface
39 from pip._internal.utils.misc import HiddenText
40 from pip._internal.utils.subprocess import CommandArgs
41
42 AuthInfo = Tuple[Optional[str], Optional[str]]
43
44
45 __all__ = ['vcs']
46
47
48 logger = logging.getLogger(__name__)
49
50
51 def is_url(name):
52 # type: (str) -> bool
53 """
54 Return true if the name looks like a URL.
55 """
56 scheme = get_url_scheme(name)
57 if scheme is None:
58 return False
59 return scheme in ['http', 'https', 'file', 'ftp'] + vcs.all_schemes
60
61
62 def make_vcs_requirement_url(repo_url, rev, project_name, subdir=None):
63 # type: (str, str, str, Optional[str]) -> str
64 """
65 Return the URL for a VCS requirement.
66
67 Args:
68 repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
69 project_name: the (unescaped) project name.
70 """
71 egg_project_name = pkg_resources.to_filename(project_name)
72 req = f'{repo_url}@{rev}#egg={egg_project_name}'
73 if subdir:
74 req += f'&subdirectory={subdir}'
75
76 return req
77
78
79 def find_path_to_setup_from_repo_root(location, repo_root):
80 # type: (str, str) -> Optional[str]
81 """
82 Find the path to `setup.py` by searching up the filesystem from `location`.
83 Return the path to `setup.py` relative to `repo_root`.
84 Return None if `setup.py` is in `repo_root` or cannot be found.
85 """
86 # find setup.py
87 orig_location = location
88 while not os.path.exists(os.path.join(location, 'setup.py')):
89 last_location = location
90 location = os.path.dirname(location)
91 if location == last_location:
92 # We've traversed up to the root of the filesystem without
93 # finding setup.py
94 logger.warning(
95 "Could not find setup.py for directory %s (tried all "
96 "parent directories)",
97 orig_location,
98 )
99 return None
100
101 if os.path.samefile(repo_root, location):
102 return None
103
104 return os.path.relpath(location, repo_root)
105
106
107 class RemoteNotFoundError(Exception):
108 pass
109
110
111 class RevOptions:
112
113 """
114 Encapsulates a VCS-specific revision to install, along with any VCS
115 install options.
116
117 Instances of this class should be treated as if immutable.
118 """
119
120 def __init__(
121 self,
122 vc_class, # type: Type[VersionControl]
123 rev=None, # type: Optional[str]
124 extra_args=None, # type: Optional[CommandArgs]
125 ):
126 # type: (...) -> None
127 """
128 Args:
129 vc_class: a VersionControl subclass.
130 rev: the name of the revision to install.
131 extra_args: a list of extra options.
132 """
133 if extra_args is None:
134 extra_args = []
135
136 self.extra_args = extra_args
137 self.rev = rev
138 self.vc_class = vc_class
139 self.branch_name = None # type: Optional[str]
140
141 def __repr__(self):
142 # type: () -> str
143 return f'<RevOptions {self.vc_class.name}: rev={self.rev!r}>'
144
145 @property
146 def arg_rev(self):
147 # type: () -> Optional[str]
148 if self.rev is None:
149 return self.vc_class.default_arg_rev
150
151 return self.rev
152
153 def to_args(self):
154 # type: () -> CommandArgs
155 """
156 Return the VCS-specific command arguments.
157 """
158 args = [] # type: CommandArgs
159 rev = self.arg_rev
160 if rev is not None:
161 args += self.vc_class.get_base_rev_args(rev)
162 args += self.extra_args
163
164 return args
165
166 def to_display(self):
167 # type: () -> str
168 if not self.rev:
169 return ''
170
171 return f' (to revision {self.rev})'
172
173 def make_new(self, rev):
174 # type: (str) -> RevOptions
175 """
176 Make a copy of the current instance, but with a new rev.
177
178 Args:
179 rev: the name of the revision for the new object.
180 """
181 return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
182
183
184 class VcsSupport:
185 _registry = {} # type: Dict[str, VersionControl]
186 schemes = ['ssh', 'git', 'hg', 'bzr', 'sftp', 'svn']
187
188 def __init__(self):
189 # type: () -> None
190 # Register more schemes with urlparse for various version control
191 # systems
192 urllib.parse.uses_netloc.extend(self.schemes)
193 super().__init__()
194
195 def __iter__(self):
196 # type: () -> Iterator[str]
197 return self._registry.__iter__()
198
199 @property
200 def backends(self):
201 # type: () -> List[VersionControl]
202 return list(self._registry.values())
203
204 @property
205 def dirnames(self):
206 # type: () -> List[str]
207 return [backend.dirname for backend in self.backends]
208
209 @property
210 def all_schemes(self):
211 # type: () -> List[str]
212 schemes = [] # type: List[str]
213 for backend in self.backends:
214 schemes.extend(backend.schemes)
215 return schemes
216
217 def register(self, cls):
218 # type: (Type[VersionControl]) -> None
219 if not hasattr(cls, 'name'):
220 logger.warning('Cannot register VCS %s', cls.__name__)
221 return
222 if cls.name not in self._registry:
223 self._registry[cls.name] = cls()
224 logger.debug('Registered VCS backend: %s', cls.name)
225
226 def unregister(self, name):
227 # type: (str) -> None
228 if name in self._registry:
229 del self._registry[name]
230
231 def get_backend_for_dir(self, location):
232 # type: (str) -> Optional[VersionControl]
233 """
234 Return a VersionControl object if a repository of that type is found
235 at the given directory.
236 """
237 vcs_backends = {}
238 for vcs_backend in self._registry.values():
239 repo_path = vcs_backend.get_repository_root(location)
240 if not repo_path:
241 continue
242 logger.debug('Determine that %s uses VCS: %s',
243 location, vcs_backend.name)
244 vcs_backends[repo_path] = vcs_backend
245
246 if not vcs_backends:
247 return None
248
249 # Choose the VCS in the inner-most directory. Since all repository
250 # roots found here would be either `location` or one of its
251 # parents, the longest path should have the most path components,
252 # i.e. the backend representing the inner-most repository.
253 inner_most_repo_path = max(vcs_backends, key=len)
254 return vcs_backends[inner_most_repo_path]
255
256 def get_backend_for_scheme(self, scheme):
257 # type: (str) -> Optional[VersionControl]
258 """
259 Return a VersionControl object or None.
260 """
261 for vcs_backend in self._registry.values():
262 if scheme in vcs_backend.schemes:
263 return vcs_backend
264 return None
265
266 def get_backend(self, name):
267 # type: (str) -> Optional[VersionControl]
268 """
269 Return a VersionControl object or None.
270 """
271 name = name.lower()
272 return self._registry.get(name)
273
274
275 vcs = VcsSupport()
276
277
278 class VersionControl:
279 name = ''
280 dirname = ''
281 repo_name = ''
282 # List of supported schemes for this Version Control
283 schemes = () # type: Tuple[str, ...]
284 # Iterable of environment variable names to pass to call_subprocess().
285 unset_environ = () # type: Tuple[str, ...]
286 default_arg_rev = None # type: Optional[str]
287
288 @classmethod
289 def should_add_vcs_url_prefix(cls, remote_url):
290 # type: (str) -> bool
291 """
292 Return whether the vcs prefix (e.g. "git+") should be added to a
293 repository's remote url when used in a requirement.
294 """
295 return not remote_url.lower().startswith(f'{cls.name}:')
296
297 @classmethod
298 def get_subdirectory(cls, location):
299 # type: (str) -> Optional[str]
300 """
301 Return the path to setup.py, relative to the repo root.
302 Return None if setup.py is in the repo root.
303 """
304 return None
305
306 @classmethod
307 def get_requirement_revision(cls, repo_dir):
308 # type: (str) -> str
309 """
310 Return the revision string that should be used in a requirement.
311 """
312 return cls.get_revision(repo_dir)
313
314 @classmethod
315 def get_src_requirement(cls, repo_dir, project_name):
316 # type: (str, str) -> str
317 """
318 Return the requirement string to use to redownload the files
319 currently at the given repository directory.
320
321 Args:
322 project_name: the (unescaped) project name.
323
324 The return value has a form similar to the following:
325
326 {repository_url}@{revision}#egg={project_name}
327 """
328 repo_url = cls.get_remote_url(repo_dir)
329
330 if cls.should_add_vcs_url_prefix(repo_url):
331 repo_url = f'{cls.name}+{repo_url}'
332
333 revision = cls.get_requirement_revision(repo_dir)
334 subdir = cls.get_subdirectory(repo_dir)
335 req = make_vcs_requirement_url(repo_url, revision, project_name,
336 subdir=subdir)
337
338 return req
339
340 @staticmethod
341 def get_base_rev_args(rev):
342 # type: (str) -> List[str]
343 """
344 Return the base revision arguments for a vcs command.
345
346 Args:
347 rev: the name of a revision to install. Cannot be None.
348 """
349 raise NotImplementedError
350
351 def is_immutable_rev_checkout(self, url, dest):
352 # type: (str, str) -> bool
353 """
354 Return true if the commit hash checked out at dest matches
355 the revision in url.
356
357 Always return False, if the VCS does not support immutable commit
358 hashes.
359
360 This method does not check if there are local uncommitted changes
361 in dest after checkout, as pip currently has no use case for that.
362 """
363 return False
364
365 @classmethod
366 def make_rev_options(cls, rev=None, extra_args=None):
367 # type: (Optional[str], Optional[CommandArgs]) -> RevOptions
368 """
369 Return a RevOptions object.
370
371 Args:
372 rev: the name of a revision to install.
373 extra_args: a list of extra options.
374 """
375 return RevOptions(cls, rev, extra_args=extra_args)
376
377 @classmethod
378 def _is_local_repository(cls, repo):
379 # type: (str) -> bool
380 """
381 posix absolute paths start with os.path.sep,
382 win32 ones start with drive (like c:\\folder)
383 """
384 drive, tail = os.path.splitdrive(repo)
385 return repo.startswith(os.path.sep) or bool(drive)
386
387 def export(self, location, url):
388 # type: (str, HiddenText) -> None
389 """
390 Export the repository at the url to the destination location
391 i.e. only download the files, without vcs informations
392
393 :param url: the repository URL starting with a vcs prefix.
394 """
395 raise NotImplementedError
396
397 @classmethod
398 def get_netloc_and_auth(cls, netloc, scheme):
399 # type: (str, str) -> Tuple[str, Tuple[Optional[str], Optional[str]]]
400 """
401 Parse the repository URL's netloc, and return the new netloc to use
402 along with auth information.
403
404 Args:
405 netloc: the original repository URL netloc.
406 scheme: the repository URL's scheme without the vcs prefix.
407
408 This is mainly for the Subversion class to override, so that auth
409 information can be provided via the --username and --password options
410 instead of through the URL. For other subclasses like Git without
411 such an option, auth information must stay in the URL.
412
413 Returns: (netloc, (username, password)).
414 """
415 return netloc, (None, None)
416
417 @classmethod
418 def get_url_rev_and_auth(cls, url):
419 # type: (str) -> Tuple[str, Optional[str], AuthInfo]
420 """
421 Parse the repository URL to use, and return the URL, revision,
422 and auth info to use.
423
424 Returns: (url, rev, (username, password)).
425 """
426 scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)
427 if '+' not in scheme:
428 raise ValueError(
429 "Sorry, {!r} is a malformed VCS url. "
430 "The format is <vcs>+<protocol>://<url>, "
431 "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url)
432 )
433 # Remove the vcs prefix.
434 scheme = scheme.split('+', 1)[1]
435 netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
436 rev = None
437 if '@' in path:
438 path, rev = path.rsplit('@', 1)
439 if not rev:
440 raise InstallationError(
441 "The URL {!r} has an empty revision (after @) "
442 "which is not supported. Include a revision after @ "
443 "or remove @ from the URL.".format(url)
444 )
445 url = urllib.parse.urlunsplit((scheme, netloc, path, query, ''))
446 return url, rev, user_pass
447
448 @staticmethod
449 def make_rev_args(username, password):
450 # type: (Optional[str], Optional[HiddenText]) -> CommandArgs
451 """
452 Return the RevOptions "extra arguments" to use in obtain().
453 """
454 return []
455
456 def get_url_rev_options(self, url):
457 # type: (HiddenText) -> Tuple[HiddenText, RevOptions]
458 """
459 Return the URL and RevOptions object to use in obtain() and in
460 some cases export(), as a tuple (url, rev_options).
461 """
462 secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
463 username, secret_password = user_pass
464 password = None # type: Optional[HiddenText]
465 if secret_password is not None:
466 password = hide_value(secret_password)
467 extra_args = self.make_rev_args(username, password)
468 rev_options = self.make_rev_options(rev, extra_args=extra_args)
469
470 return hide_url(secret_url), rev_options
471
472 @staticmethod
473 def normalize_url(url):
474 # type: (str) -> str
475 """
476 Normalize a URL for comparison by unquoting it and removing any
477 trailing slash.
478 """
479 return urllib.parse.unquote(url).rstrip('/')
480
481 @classmethod
482 def compare_urls(cls, url1, url2):
483 # type: (str, str) -> bool
484 """
485 Compare two repo URLs for identity, ignoring incidental differences.
486 """
487 return (cls.normalize_url(url1) == cls.normalize_url(url2))
488
489 def fetch_new(self, dest, url, rev_options):
490 # type: (str, HiddenText, RevOptions) -> None
491 """
492 Fetch a revision from a repository, in the case that this is the
493 first fetch from the repository.
494
495 Args:
496 dest: the directory to fetch the repository to.
497 rev_options: a RevOptions object.
498 """
499 raise NotImplementedError
500
501 def switch(self, dest, url, rev_options):
502 # type: (str, HiddenText, RevOptions) -> None
503 """
504 Switch the repo at ``dest`` to point to ``URL``.
505
506 Args:
507 rev_options: a RevOptions object.
508 """
509 raise NotImplementedError
510
511 def update(self, dest, url, rev_options):
512 # type: (str, HiddenText, RevOptions) -> None
513 """
514 Update an already-existing repo to the given ``rev_options``.
515
516 Args:
517 rev_options: a RevOptions object.
518 """
519 raise NotImplementedError
520
521 @classmethod
522 def is_commit_id_equal(cls, dest, name):
523 # type: (str, Optional[str]) -> bool
524 """
525 Return whether the id of the current commit equals the given name.
526
527 Args:
528 dest: the repository directory.
529 name: a string name.
530 """
531 raise NotImplementedError
532
533 def obtain(self, dest, url):
534 # type: (str, HiddenText) -> None
535 """
536 Install or update in editable mode the package represented by this
537 VersionControl object.
538
539 :param dest: the repository directory in which to install or update.
540 :param url: the repository URL starting with a vcs prefix.
541 """
542 url, rev_options = self.get_url_rev_options(url)
543
544 if not os.path.exists(dest):
545 self.fetch_new(dest, url, rev_options)
546 return
547
548 rev_display = rev_options.to_display()
549 if self.is_repository_directory(dest):
550 existing_url = self.get_remote_url(dest)
551 if self.compare_urls(existing_url, url.secret):
552 logger.debug(
553 '%s in %s exists, and has correct URL (%s)',
554 self.repo_name.title(),
555 display_path(dest),
556 url,
557 )
558 if not self.is_commit_id_equal(dest, rev_options.rev):
559 logger.info(
560 'Updating %s %s%s',
561 display_path(dest),
562 self.repo_name,
563 rev_display,
564 )
565 self.update(dest, url, rev_options)
566 else:
567 logger.info('Skipping because already up-to-date.')
568 return
569
570 logger.warning(
571 '%s %s in %s exists with URL %s',
572 self.name,
573 self.repo_name,
574 display_path(dest),
575 existing_url,
576 )
577 prompt = ('(s)witch, (i)gnore, (w)ipe, (b)ackup ',
578 ('s', 'i', 'w', 'b'))
579 else:
580 logger.warning(
581 'Directory %s already exists, and is not a %s %s.',
582 dest,
583 self.name,
584 self.repo_name,
585 )
586 # https://github.com/python/mypy/issues/1174
587 prompt = ('(i)gnore, (w)ipe, (b)ackup ', # type: ignore
588 ('i', 'w', 'b'))
589
590 logger.warning(
591 'The plan is to install the %s repository %s',
592 self.name,
593 url,
594 )
595 response = ask_path_exists('What to do? {}'.format(
596 prompt[0]), prompt[1])
597
598 if response == 'a':
599 sys.exit(-1)
600
601 if response == 'w':
602 logger.warning('Deleting %s', display_path(dest))
603 rmtree(dest)
604 self.fetch_new(dest, url, rev_options)
605 return
606
607 if response == 'b':
608 dest_dir = backup_dir(dest)
609 logger.warning(
610 'Backing up %s to %s', display_path(dest), dest_dir,
611 )
612 shutil.move(dest, dest_dir)
613 self.fetch_new(dest, url, rev_options)
614 return
615
616 # Do nothing if the response is "i".
617 if response == 's':
618 logger.info(
619 'Switching %s %s to %s%s',
620 self.repo_name,
621 display_path(dest),
622 url,
623 rev_display,
624 )
625 self.switch(dest, url, rev_options)
626
627 def unpack(self, location, url):
628 # type: (str, HiddenText) -> None
629 """
630 Clean up current location and download the url repository
631 (and vcs infos) into location
632
633 :param url: the repository URL starting with a vcs prefix.
634 """
635 if os.path.exists(location):
636 rmtree(location)
637 self.obtain(location, url=url)
638
639 @classmethod
640 def get_remote_url(cls, location):
641 # type: (str) -> str
642 """
643 Return the url used at location
644
645 Raises RemoteNotFoundError if the repository does not have a remote
646 url configured.
647 """
648 raise NotImplementedError
649
650 @classmethod
651 def get_revision(cls, location):
652 # type: (str) -> str
653 """
654 Return the current commit id of the files at the given location.
655 """
656 raise NotImplementedError
657
658 @classmethod
659 def run_command(
660 cls,
661 cmd, # type: Union[List[str], CommandArgs]
662 show_stdout=True, # type: bool
663 cwd=None, # type: Optional[str]
664 on_returncode='raise', # type: str
665 extra_ok_returncodes=None, # type: Optional[Iterable[int]]
666 command_desc=None, # type: Optional[str]
667 extra_environ=None, # type: Optional[Mapping[str, Any]]
668 spinner=None, # type: Optional[SpinnerInterface]
669 log_failed_cmd=True, # type: bool
670 stdout_only=False, # type: bool
671 ):
672 # type: (...) -> str
673 """
674 Run a VCS subcommand
675 This is simply a wrapper around call_subprocess that adds the VCS
676 command name, and checks that the VCS is available
677 """
678 cmd = make_command(cls.name, *cmd)
679 try:
680 return call_subprocess(cmd, show_stdout, cwd,
681 on_returncode=on_returncode,
682 extra_ok_returncodes=extra_ok_returncodes,
683 command_desc=command_desc,
684 extra_environ=extra_environ,
685 unset_environ=cls.unset_environ,
686 spinner=spinner,
687 log_failed_cmd=log_failed_cmd,
688 stdout_only=stdout_only)
689 except FileNotFoundError:
690 # errno.ENOENT = no such file or directory
691 # In other words, the VCS executable isn't available
692 raise BadCommand(
693 'Cannot find command {cls.name!r} - do you have '
694 '{cls.name!r} installed and in your '
695 'PATH?'.format(**locals()))
696
697 @classmethod
698 def is_repository_directory(cls, path):
699 # type: (str) -> bool
700 """
701 Return whether a directory path is a repository directory.
702 """
703 logger.debug('Checking in %s for %s (%s)...',
704 path, cls.dirname, cls.name)
705 return os.path.exists(os.path.join(path, cls.dirname))
706
707 @classmethod
708 def get_repository_root(cls, location):
709 # type: (str) -> Optional[str]
710 """
711 Return the "root" (top-level) directory controlled by the vcs,
712 or `None` if the directory is not in any.
713
714 It is meant to be overridden to implement smarter detection
715 mechanisms for specific vcs.
716
717 This can do more than is_repository_directory() alone. For
718 example, the Git override checks that Git is actually available.
719 """
720 if cls.is_repository_directory(location):
721 return location
722 return None