comparison env/lib/python3.9/site-packages/pip/_internal/req/req_tracker.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import contextlib
2 import hashlib
3 import logging
4 import os
5
6 from pip._vendor import contextlib2
7
8 from pip._internal.utils.temp_dir import TempDirectory
9 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
10
11 if MYPY_CHECK_RUNNING:
12 from types import TracebackType
13 from typing import Dict, Iterator, Optional, Set, Type, Union
14
15 from pip._internal.models.link import Link
16 from pip._internal.req.req_install import InstallRequirement
17
18 logger = logging.getLogger(__name__)
19
20
21 @contextlib.contextmanager
22 def update_env_context_manager(**changes):
23 # type: (str) -> Iterator[None]
24 target = os.environ
25
26 # Save values from the target and change them.
27 non_existent_marker = object()
28 saved_values = {} # type: Dict[str, Union[object, str]]
29 for name, new_value in changes.items():
30 try:
31 saved_values[name] = target[name]
32 except KeyError:
33 saved_values[name] = non_existent_marker
34 target[name] = new_value
35
36 try:
37 yield
38 finally:
39 # Restore original values in the target.
40 for name, original_value in saved_values.items():
41 if original_value is non_existent_marker:
42 del target[name]
43 else:
44 assert isinstance(original_value, str) # for mypy
45 target[name] = original_value
46
47
48 @contextlib.contextmanager
49 def get_requirement_tracker():
50 # type: () -> Iterator[RequirementTracker]
51 root = os.environ.get('PIP_REQ_TRACKER')
52 with contextlib2.ExitStack() as ctx:
53 if root is None:
54 root = ctx.enter_context(
55 TempDirectory(kind='req-tracker')
56 ).path
57 ctx.enter_context(update_env_context_manager(PIP_REQ_TRACKER=root))
58 logger.debug("Initialized build tracking at %s", root)
59
60 with RequirementTracker(root) as tracker:
61 yield tracker
62
63
64 class RequirementTracker:
65
66 def __init__(self, root):
67 # type: (str) -> None
68 self._root = root
69 self._entries = set() # type: Set[InstallRequirement]
70 logger.debug("Created build tracker: %s", self._root)
71
72 def __enter__(self):
73 # type: () -> RequirementTracker
74 logger.debug("Entered build tracker: %s", self._root)
75 return self
76
77 def __exit__(
78 self,
79 exc_type, # type: Optional[Type[BaseException]]
80 exc_val, # type: Optional[BaseException]
81 exc_tb # type: Optional[TracebackType]
82 ):
83 # type: (...) -> None
84 self.cleanup()
85
86 def _entry_path(self, link):
87 # type: (Link) -> str
88 hashed = hashlib.sha224(link.url_without_fragment.encode()).hexdigest()
89 return os.path.join(self._root, hashed)
90
91 def add(self, req):
92 # type: (InstallRequirement) -> None
93 """Add an InstallRequirement to build tracking.
94 """
95
96 assert req.link
97 # Get the file to write information about this requirement.
98 entry_path = self._entry_path(req.link)
99
100 # Try reading from the file. If it exists and can be read from, a build
101 # is already in progress, so a LookupError is raised.
102 try:
103 with open(entry_path) as fp:
104 contents = fp.read()
105 except FileNotFoundError:
106 pass
107 else:
108 message = '{} is already being built: {}'.format(
109 req.link, contents)
110 raise LookupError(message)
111
112 # If we're here, req should really not be building already.
113 assert req not in self._entries
114
115 # Start tracking this requirement.
116 with open(entry_path, 'w') as fp:
117 fp.write(str(req))
118 self._entries.add(req)
119
120 logger.debug('Added %s to build tracker %r', req, self._root)
121
122 def remove(self, req):
123 # type: (InstallRequirement) -> None
124 """Remove an InstallRequirement from build tracking.
125 """
126
127 assert req.link
128 # Delete the created file and the corresponding entries.
129 os.unlink(self._entry_path(req.link))
130 self._entries.remove(req)
131
132 logger.debug('Removed %s from build tracker %r', req, self._root)
133
134 def cleanup(self):
135 # type: () -> None
136 for req in set(self._entries):
137 self.remove(req)
138
139 logger.debug("Removed build tracker: %r", self._root)
140
141 @contextlib.contextmanager
142 def track(self, req):
143 # type: (InstallRequirement) -> Iterator[None]
144 self.add(req)
145 yield
146 self.remove(req)