comparison env/lib/python3.9/site-packages/pip/_internal/vcs/git.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # The following comment should be removed at some point in the future.
2 # mypy: disallow-untyped-defs=False
3
4 import logging
5 import os.path
6 import re
7 import urllib.parse
8 import urllib.request
9
10 from pip._vendor.packaging.version import parse as parse_version
11
12 from pip._internal.exceptions import BadCommand, InstallationError
13 from pip._internal.utils.misc import display_path, hide_url
14 from pip._internal.utils.subprocess import make_command
15 from pip._internal.utils.temp_dir import TempDirectory
16 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
17 from pip._internal.vcs.versioncontrol import (
18 RemoteNotFoundError,
19 VersionControl,
20 find_path_to_setup_from_repo_root,
21 vcs,
22 )
23
24 if MYPY_CHECK_RUNNING:
25 from typing import Optional, Tuple
26
27 from pip._internal.utils.misc import HiddenText
28 from pip._internal.vcs.versioncontrol import AuthInfo, RevOptions
29
30
31 urlsplit = urllib.parse.urlsplit
32 urlunsplit = urllib.parse.urlunsplit
33
34
35 logger = logging.getLogger(__name__)
36
37
38 HASH_REGEX = re.compile('^[a-fA-F0-9]{40}$')
39
40
41 def looks_like_hash(sha):
42 return bool(HASH_REGEX.match(sha))
43
44
45 class Git(VersionControl):
46 name = 'git'
47 dirname = '.git'
48 repo_name = 'clone'
49 schemes = (
50 'git+http', 'git+https', 'git+ssh', 'git+git', 'git+file',
51 )
52 # Prevent the user's environment variables from interfering with pip:
53 # https://github.com/pypa/pip/issues/1130
54 unset_environ = ('GIT_DIR', 'GIT_WORK_TREE')
55 default_arg_rev = 'HEAD'
56
57 @staticmethod
58 def get_base_rev_args(rev):
59 return [rev]
60
61 def is_immutable_rev_checkout(self, url, dest):
62 # type: (str, str) -> bool
63 _, rev_options = self.get_url_rev_options(hide_url(url))
64 if not rev_options.rev:
65 return False
66 if not self.is_commit_id_equal(dest, rev_options.rev):
67 # the current commit is different from rev,
68 # which means rev was something else than a commit hash
69 return False
70 # return False in the rare case rev is both a commit hash
71 # and a tag or a branch; we don't want to cache in that case
72 # because that branch/tag could point to something else in the future
73 is_tag_or_branch = bool(
74 self.get_revision_sha(dest, rev_options.rev)[0]
75 )
76 return not is_tag_or_branch
77
78 def get_git_version(self):
79 VERSION_PFX = 'git version '
80 version = self.run_command(
81 ['version'], show_stdout=False, stdout_only=True
82 )
83 if version.startswith(VERSION_PFX):
84 version = version[len(VERSION_PFX):].split()[0]
85 else:
86 version = ''
87 # get first 3 positions of the git version because
88 # on windows it is x.y.z.windows.t, and this parses as
89 # LegacyVersion which always smaller than a Version.
90 version = '.'.join(version.split('.')[:3])
91 return parse_version(version)
92
93 @classmethod
94 def get_current_branch(cls, location):
95 """
96 Return the current branch, or None if HEAD isn't at a branch
97 (e.g. detached HEAD).
98 """
99 # git-symbolic-ref exits with empty stdout if "HEAD" is a detached
100 # HEAD rather than a symbolic ref. In addition, the -q causes the
101 # command to exit with status code 1 instead of 128 in this case
102 # and to suppress the message to stderr.
103 args = ['symbolic-ref', '-q', 'HEAD']
104 output = cls.run_command(
105 args,
106 extra_ok_returncodes=(1, ),
107 show_stdout=False,
108 stdout_only=True,
109 cwd=location,
110 )
111 ref = output.strip()
112
113 if ref.startswith('refs/heads/'):
114 return ref[len('refs/heads/'):]
115
116 return None
117
118 def export(self, location, url):
119 # type: (str, HiddenText) -> None
120 """Export the Git repository at the url to the destination location"""
121 if not location.endswith('/'):
122 location = location + '/'
123
124 with TempDirectory(kind="export") as temp_dir:
125 self.unpack(temp_dir.path, url=url)
126 self.run_command(
127 ['checkout-index', '-a', '-f', '--prefix', location],
128 show_stdout=False, cwd=temp_dir.path
129 )
130
131 @classmethod
132 def get_revision_sha(cls, dest, rev):
133 """
134 Return (sha_or_none, is_branch), where sha_or_none is a commit hash
135 if the revision names a remote branch or tag, otherwise None.
136
137 Args:
138 dest: the repository directory.
139 rev: the revision name.
140 """
141 # Pass rev to pre-filter the list.
142 output = cls.run_command(
143 ['show-ref', rev],
144 cwd=dest,
145 show_stdout=False,
146 stdout_only=True,
147 on_returncode='ignore',
148 )
149 refs = {}
150 for line in output.strip().splitlines():
151 try:
152 sha, ref = line.split()
153 except ValueError:
154 # Include the offending line to simplify troubleshooting if
155 # this error ever occurs.
156 raise ValueError(f'unexpected show-ref line: {line!r}')
157
158 refs[ref] = sha
159
160 branch_ref = f'refs/remotes/origin/{rev}'
161 tag_ref = f'refs/tags/{rev}'
162
163 sha = refs.get(branch_ref)
164 if sha is not None:
165 return (sha, True)
166
167 sha = refs.get(tag_ref)
168
169 return (sha, False)
170
171 @classmethod
172 def _should_fetch(cls, dest, rev):
173 """
174 Return true if rev is a ref or is a commit that we don't have locally.
175
176 Branches and tags are not considered in this method because they are
177 assumed to be always available locally (which is a normal outcome of
178 ``git clone`` and ``git fetch --tags``).
179 """
180 if rev.startswith("refs/"):
181 # Always fetch remote refs.
182 return True
183
184 if not looks_like_hash(rev):
185 # Git fetch would fail with abbreviated commits.
186 return False
187
188 if cls.has_commit(dest, rev):
189 # Don't fetch if we have the commit locally.
190 return False
191
192 return True
193
194 @classmethod
195 def resolve_revision(cls, dest, url, rev_options):
196 # type: (str, HiddenText, RevOptions) -> RevOptions
197 """
198 Resolve a revision to a new RevOptions object with the SHA1 of the
199 branch, tag, or ref if found.
200
201 Args:
202 rev_options: a RevOptions object.
203 """
204 rev = rev_options.arg_rev
205 # The arg_rev property's implementation for Git ensures that the
206 # rev return value is always non-None.
207 assert rev is not None
208
209 sha, is_branch = cls.get_revision_sha(dest, rev)
210
211 if sha is not None:
212 rev_options = rev_options.make_new(sha)
213 rev_options.branch_name = rev if is_branch else None
214
215 return rev_options
216
217 # Do not show a warning for the common case of something that has
218 # the form of a Git commit hash.
219 if not looks_like_hash(rev):
220 logger.warning(
221 "Did not find branch or tag '%s', assuming revision or ref.",
222 rev,
223 )
224
225 if not cls._should_fetch(dest, rev):
226 return rev_options
227
228 # fetch the requested revision
229 cls.run_command(
230 make_command('fetch', '-q', url, rev_options.to_args()),
231 cwd=dest,
232 )
233 # Change the revision to the SHA of the ref we fetched
234 sha = cls.get_revision(dest, rev='FETCH_HEAD')
235 rev_options = rev_options.make_new(sha)
236
237 return rev_options
238
239 @classmethod
240 def is_commit_id_equal(cls, dest, name):
241 """
242 Return whether the current commit hash equals the given name.
243
244 Args:
245 dest: the repository directory.
246 name: a string name.
247 """
248 if not name:
249 # Then avoid an unnecessary subprocess call.
250 return False
251
252 return cls.get_revision(dest) == name
253
254 def fetch_new(self, dest, url, rev_options):
255 # type: (str, HiddenText, RevOptions) -> None
256 rev_display = rev_options.to_display()
257 logger.info('Cloning %s%s to %s', url, rev_display, display_path(dest))
258 self.run_command(make_command('clone', '-q', url, dest))
259
260 if rev_options.rev:
261 # Then a specific revision was requested.
262 rev_options = self.resolve_revision(dest, url, rev_options)
263 branch_name = getattr(rev_options, 'branch_name', None)
264 if branch_name is None:
265 # Only do a checkout if the current commit id doesn't match
266 # the requested revision.
267 if not self.is_commit_id_equal(dest, rev_options.rev):
268 cmd_args = make_command(
269 'checkout', '-q', rev_options.to_args(),
270 )
271 self.run_command(cmd_args, cwd=dest)
272 elif self.get_current_branch(dest) != branch_name:
273 # Then a specific branch was requested, and that branch
274 # is not yet checked out.
275 track_branch = f'origin/{branch_name}'
276 cmd_args = [
277 'checkout', '-b', branch_name, '--track', track_branch,
278 ]
279 self.run_command(cmd_args, cwd=dest)
280
281 #: repo may contain submodules
282 self.update_submodules(dest)
283
284 def switch(self, dest, url, rev_options):
285 # type: (str, HiddenText, RevOptions) -> None
286 self.run_command(
287 make_command('config', 'remote.origin.url', url),
288 cwd=dest,
289 )
290 cmd_args = make_command('checkout', '-q', rev_options.to_args())
291 self.run_command(cmd_args, cwd=dest)
292
293 self.update_submodules(dest)
294
295 def update(self, dest, url, rev_options):
296 # type: (str, HiddenText, RevOptions) -> None
297 # First fetch changes from the default remote
298 if self.get_git_version() >= parse_version('1.9.0'):
299 # fetch tags in addition to everything else
300 self.run_command(['fetch', '-q', '--tags'], cwd=dest)
301 else:
302 self.run_command(['fetch', '-q'], cwd=dest)
303 # Then reset to wanted revision (maybe even origin/master)
304 rev_options = self.resolve_revision(dest, url, rev_options)
305 cmd_args = make_command('reset', '--hard', '-q', rev_options.to_args())
306 self.run_command(cmd_args, cwd=dest)
307 #: update submodules
308 self.update_submodules(dest)
309
310 @classmethod
311 def get_remote_url(cls, location):
312 # type: (str) -> str
313 """
314 Return URL of the first remote encountered.
315
316 Raises RemoteNotFoundError if the repository does not have a remote
317 url configured.
318 """
319 # We need to pass 1 for extra_ok_returncodes since the command
320 # exits with return code 1 if there are no matching lines.
321 stdout = cls.run_command(
322 ['config', '--get-regexp', r'remote\..*\.url'],
323 extra_ok_returncodes=(1, ),
324 show_stdout=False,
325 stdout_only=True,
326 cwd=location,
327 )
328 remotes = stdout.splitlines()
329 try:
330 found_remote = remotes[0]
331 except IndexError:
332 raise RemoteNotFoundError
333
334 for remote in remotes:
335 if remote.startswith('remote.origin.url '):
336 found_remote = remote
337 break
338 url = found_remote.split(' ')[1]
339 return url.strip()
340
341 @classmethod
342 def has_commit(cls, location, rev):
343 """
344 Check if rev is a commit that is available in the local repository.
345 """
346 try:
347 cls.run_command(
348 ['rev-parse', '-q', '--verify', "sha^" + rev],
349 cwd=location,
350 log_failed_cmd=False,
351 )
352 except InstallationError:
353 return False
354 else:
355 return True
356
357 @classmethod
358 def get_revision(cls, location, rev=None):
359 # type: (str, Optional[str]) -> str
360 if rev is None:
361 rev = 'HEAD'
362 current_rev = cls.run_command(
363 ['rev-parse', rev],
364 show_stdout=False,
365 stdout_only=True,
366 cwd=location,
367 )
368 return current_rev.strip()
369
370 @classmethod
371 def get_subdirectory(cls, location):
372 """
373 Return the path to setup.py, relative to the repo root.
374 Return None if setup.py is in the repo root.
375 """
376 # find the repo root
377 git_dir = cls.run_command(
378 ['rev-parse', '--git-dir'],
379 show_stdout=False,
380 stdout_only=True,
381 cwd=location,
382 ).strip()
383 if not os.path.isabs(git_dir):
384 git_dir = os.path.join(location, git_dir)
385 repo_root = os.path.abspath(os.path.join(git_dir, '..'))
386 return find_path_to_setup_from_repo_root(location, repo_root)
387
388 @classmethod
389 def get_url_rev_and_auth(cls, url):
390 # type: (str) -> Tuple[str, Optional[str], AuthInfo]
391 """
392 Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'.
393 That's required because although they use SSH they sometimes don't
394 work with a ssh:// scheme (e.g. GitHub). But we need a scheme for
395 parsing. Hence we remove it again afterwards and return it as a stub.
396 """
397 # Works around an apparent Git bug
398 # (see https://article.gmane.org/gmane.comp.version-control.git/146500)
399 scheme, netloc, path, query, fragment = urlsplit(url)
400 if scheme.endswith('file'):
401 initial_slashes = path[:-len(path.lstrip('/'))]
402 newpath = (
403 initial_slashes +
404 urllib.request.url2pathname(path)
405 .replace('\\', '/').lstrip('/')
406 )
407 after_plus = scheme.find('+') + 1
408 url = scheme[:after_plus] + urlunsplit(
409 (scheme[after_plus:], netloc, newpath, query, fragment),
410 )
411
412 if '://' not in url:
413 assert 'file:' not in url
414 url = url.replace('git+', 'git+ssh://')
415 url, rev, user_pass = super().get_url_rev_and_auth(url)
416 url = url.replace('ssh://', '')
417 else:
418 url, rev, user_pass = super().get_url_rev_and_auth(url)
419
420 return url, rev, user_pass
421
422 @classmethod
423 def update_submodules(cls, location):
424 if not os.path.exists(os.path.join(location, '.gitmodules')):
425 return
426 cls.run_command(
427 ['submodule', 'update', '--init', '--recursive', '-q'],
428 cwd=location,
429 )
430
431 @classmethod
432 def get_repository_root(cls, location):
433 loc = super().get_repository_root(location)
434 if loc:
435 return loc
436 try:
437 r = cls.run_command(
438 ['rev-parse', '--show-toplevel'],
439 cwd=location,
440 show_stdout=False,
441 stdout_only=True,
442 on_returncode='raise',
443 log_failed_cmd=False,
444 )
445 except BadCommand:
446 logger.debug("could not determine if %s is under git control "
447 "because git is not available", location)
448 return None
449 except InstallationError:
450 return None
451 return os.path.normpath(r.rstrip('\r\n'))
452
453
454 vcs.register(Git)