Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/pip/_internal/req/req_tracker.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import contextlib | |
2 import hashlib | |
3 import logging | |
4 import os | |
5 | |
6 from pip._vendor import contextlib2 | |
7 | |
8 from pip._internal.utils.temp_dir import TempDirectory | |
9 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
10 | |
11 if MYPY_CHECK_RUNNING: | |
12 from types import TracebackType | |
13 from typing import Dict, Iterator, Optional, Set, Type, Union | |
14 | |
15 from pip._internal.models.link import Link | |
16 from pip._internal.req.req_install import InstallRequirement | |
17 | |
18 logger = logging.getLogger(__name__) | |
19 | |
20 | |
21 @contextlib.contextmanager | |
22 def update_env_context_manager(**changes): | |
23 # type: (str) -> Iterator[None] | |
24 target = os.environ | |
25 | |
26 # Save values from the target and change them. | |
27 non_existent_marker = object() | |
28 saved_values = {} # type: Dict[str, Union[object, str]] | |
29 for name, new_value in changes.items(): | |
30 try: | |
31 saved_values[name] = target[name] | |
32 except KeyError: | |
33 saved_values[name] = non_existent_marker | |
34 target[name] = new_value | |
35 | |
36 try: | |
37 yield | |
38 finally: | |
39 # Restore original values in the target. | |
40 for name, original_value in saved_values.items(): | |
41 if original_value is non_existent_marker: | |
42 del target[name] | |
43 else: | |
44 assert isinstance(original_value, str) # for mypy | |
45 target[name] = original_value | |
46 | |
47 | |
48 @contextlib.contextmanager | |
49 def get_requirement_tracker(): | |
50 # type: () -> Iterator[RequirementTracker] | |
51 root = os.environ.get('PIP_REQ_TRACKER') | |
52 with contextlib2.ExitStack() as ctx: | |
53 if root is None: | |
54 root = ctx.enter_context( | |
55 TempDirectory(kind='req-tracker') | |
56 ).path | |
57 ctx.enter_context(update_env_context_manager(PIP_REQ_TRACKER=root)) | |
58 logger.debug("Initialized build tracking at %s", root) | |
59 | |
60 with RequirementTracker(root) as tracker: | |
61 yield tracker | |
62 | |
63 | |
64 class RequirementTracker: | |
65 | |
66 def __init__(self, root): | |
67 # type: (str) -> None | |
68 self._root = root | |
69 self._entries = set() # type: Set[InstallRequirement] | |
70 logger.debug("Created build tracker: %s", self._root) | |
71 | |
72 def __enter__(self): | |
73 # type: () -> RequirementTracker | |
74 logger.debug("Entered build tracker: %s", self._root) | |
75 return self | |
76 | |
77 def __exit__( | |
78 self, | |
79 exc_type, # type: Optional[Type[BaseException]] | |
80 exc_val, # type: Optional[BaseException] | |
81 exc_tb # type: Optional[TracebackType] | |
82 ): | |
83 # type: (...) -> None | |
84 self.cleanup() | |
85 | |
86 def _entry_path(self, link): | |
87 # type: (Link) -> str | |
88 hashed = hashlib.sha224(link.url_without_fragment.encode()).hexdigest() | |
89 return os.path.join(self._root, hashed) | |
90 | |
91 def add(self, req): | |
92 # type: (InstallRequirement) -> None | |
93 """Add an InstallRequirement to build tracking. | |
94 """ | |
95 | |
96 assert req.link | |
97 # Get the file to write information about this requirement. | |
98 entry_path = self._entry_path(req.link) | |
99 | |
100 # Try reading from the file. If it exists and can be read from, a build | |
101 # is already in progress, so a LookupError is raised. | |
102 try: | |
103 with open(entry_path) as fp: | |
104 contents = fp.read() | |
105 except FileNotFoundError: | |
106 pass | |
107 else: | |
108 message = '{} is already being built: {}'.format( | |
109 req.link, contents) | |
110 raise LookupError(message) | |
111 | |
112 # If we're here, req should really not be building already. | |
113 assert req not in self._entries | |
114 | |
115 # Start tracking this requirement. | |
116 with open(entry_path, 'w') as fp: | |
117 fp.write(str(req)) | |
118 self._entries.add(req) | |
119 | |
120 logger.debug('Added %s to build tracker %r', req, self._root) | |
121 | |
122 def remove(self, req): | |
123 # type: (InstallRequirement) -> None | |
124 """Remove an InstallRequirement from build tracking. | |
125 """ | |
126 | |
127 assert req.link | |
128 # Delete the created file and the corresponding entries. | |
129 os.unlink(self._entry_path(req.link)) | |
130 self._entries.remove(req) | |
131 | |
132 logger.debug('Removed %s from build tracker %r', req, self._root) | |
133 | |
134 def cleanup(self): | |
135 # type: () -> None | |
136 for req in set(self._entries): | |
137 self.remove(req) | |
138 | |
139 logger.debug("Removed build tracker: %r", self._root) | |
140 | |
141 @contextlib.contextmanager | |
142 def track(self, req): | |
143 # type: (InstallRequirement) -> Iterator[None] | |
144 self.add(req) | |
145 yield | |
146 self.remove(req) |