Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/virtualenv/discovery/cached_py_info.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """ | |
2 | |
3 We acquire the python information by running an interrogation script via subprocess trigger. This operation is not | |
4 cheap, especially not on Windows. To not have to pay this hefty cost every time we apply multiple levels of | |
5 caching. | |
6 """ | |
7 from __future__ import absolute_import, unicode_literals | |
8 | |
9 import logging | |
10 import os | |
11 import pipes | |
12 import sys | |
13 from collections import OrderedDict | |
14 | |
15 from virtualenv.app_data import AppDataDisabled | |
16 from virtualenv.discovery.py_info import PythonInfo | |
17 from virtualenv.info import PY2 | |
18 from virtualenv.util.path import Path | |
19 from virtualenv.util.six import ensure_text | |
20 from virtualenv.util.subprocess import Popen, subprocess | |
21 | |
22 _CACHE = OrderedDict() | |
23 _CACHE[Path(sys.executable)] = PythonInfo() | |
24 | |
25 | |
26 def from_exe(cls, app_data, exe, env=None, raise_on_error=True, ignore_cache=False): | |
27 env = os.environ if env is None else env | |
28 result = _get_from_cache(cls, app_data, exe, env, ignore_cache=ignore_cache) | |
29 if isinstance(result, Exception): | |
30 if raise_on_error: | |
31 raise result | |
32 else: | |
33 logging.info("%s", str(result)) | |
34 result = None | |
35 return result | |
36 | |
37 | |
38 def _get_from_cache(cls, app_data, exe, env, ignore_cache=True): | |
39 # note here we cannot resolve symlinks, as the symlink may trigger different prefix information if there's a | |
40 # pyenv.cfg somewhere alongside on python3.4+ | |
41 exe_path = Path(exe) | |
42 if not ignore_cache and exe_path in _CACHE: # check in the in-memory cache | |
43 result = _CACHE[exe_path] | |
44 else: # otherwise go through the app data cache | |
45 py_info = _get_via_file_cache(cls, app_data, exe_path, exe, env) | |
46 result = _CACHE[exe_path] = py_info | |
47 # independent if it was from the file or in-memory cache fix the original executable location | |
48 if isinstance(result, PythonInfo): | |
49 result.executable = exe | |
50 return result | |
51 | |
52 | |
53 def _get_via_file_cache(cls, app_data, path, exe, env): | |
54 path_text = ensure_text(str(path)) | |
55 try: | |
56 path_modified = path.stat().st_mtime | |
57 except OSError: | |
58 path_modified = -1 | |
59 if app_data is None: | |
60 app_data = AppDataDisabled() | |
61 py_info, py_info_store = None, app_data.py_info(path) | |
62 with py_info_store.locked(): | |
63 if py_info_store.exists(): # if exists and matches load | |
64 data = py_info_store.read() | |
65 of_path, of_st_mtime, of_content = data["path"], data["st_mtime"], data["content"] | |
66 if of_path == path_text and of_st_mtime == path_modified: | |
67 py_info = cls._from_dict({k: v for k, v in of_content.items()}) | |
68 sys_exe = py_info.system_executable | |
69 if sys_exe is not None and not os.path.exists(sys_exe): | |
70 py_info_store.remove() | |
71 py_info = None | |
72 else: | |
73 py_info_store.remove() | |
74 if py_info is None: # if not loaded run and save | |
75 failure, py_info = _run_subprocess(cls, exe, app_data, env) | |
76 if failure is None: | |
77 data = {"st_mtime": path_modified, "path": path_text, "content": py_info._to_dict()} | |
78 py_info_store.write(data) | |
79 else: | |
80 py_info = failure | |
81 return py_info | |
82 | |
83 | |
84 def _run_subprocess(cls, exe, app_data, env): | |
85 py_info_script = Path(os.path.abspath(__file__)).parent / "py_info.py" | |
86 with app_data.ensure_extracted(py_info_script) as py_info_script: | |
87 cmd = [exe, str(py_info_script)] | |
88 # prevent sys.prefix from leaking into the child process - see https://bugs.python.org/issue22490 | |
89 env = env.copy() | |
90 env.pop("__PYVENV_LAUNCHER__", None) | |
91 logging.debug("get interpreter info via cmd: %s", LogCmd(cmd)) | |
92 try: | |
93 process = Popen( | |
94 cmd, | |
95 universal_newlines=True, | |
96 stdin=subprocess.PIPE, | |
97 stderr=subprocess.PIPE, | |
98 stdout=subprocess.PIPE, | |
99 env=env, | |
100 ) | |
101 out, err = process.communicate() | |
102 code = process.returncode | |
103 except OSError as os_error: | |
104 out, err, code = "", os_error.strerror, os_error.errno | |
105 result, failure = None, None | |
106 if code == 0: | |
107 result = cls._from_json(out) | |
108 result.executable = exe # keep original executable as this may contain initialization code | |
109 else: | |
110 msg = "failed to query {} with code {}{}{}".format( | |
111 exe, | |
112 code, | |
113 " out: {!r}".format(out) if out else "", | |
114 " err: {!r}".format(err) if err else "", | |
115 ) | |
116 failure = RuntimeError(msg) | |
117 return failure, result | |
118 | |
119 | |
120 class LogCmd(object): | |
121 def __init__(self, cmd, env=None): | |
122 self.cmd = cmd | |
123 self.env = env | |
124 | |
125 def __repr__(self): | |
126 def e(v): | |
127 return v.decode("utf-8") if isinstance(v, bytes) else v | |
128 | |
129 cmd_repr = e(" ").join(pipes.quote(e(c)) for c in self.cmd) | |
130 if self.env is not None: | |
131 cmd_repr += e(" env of {!r}").format(self.env) | |
132 if PY2: | |
133 return cmd_repr.encode("utf-8") | |
134 return cmd_repr | |
135 | |
136 def __unicode__(self): | |
137 raw = repr(self) | |
138 if PY2: | |
139 return raw.decode("utf-8") | |
140 return raw | |
141 | |
142 | |
143 def clear(app_data): | |
144 app_data.py_info_clear() | |
145 _CACHE.clear() | |
146 | |
147 | |
148 ___all___ = ( | |
149 "from_exe", | |
150 "clear", | |
151 "LogCmd", | |
152 ) |