Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/pip/_internal/network/cache.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """HTTP cache implementation. | |
2 """ | |
3 | |
4 import os | |
5 from contextlib import contextmanager | |
6 | |
7 from pip._vendor.cachecontrol.cache import BaseCache | |
8 from pip._vendor.cachecontrol.caches import FileCache | |
9 from pip._vendor.requests.models import Response | |
10 | |
11 from pip._internal.utils.filesystem import adjacent_tmp_file, replace | |
12 from pip._internal.utils.misc import ensure_dir | |
13 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
14 | |
15 if MYPY_CHECK_RUNNING: | |
16 from typing import Iterator, Optional | |
17 | |
18 | |
19 def is_from_cache(response): | |
20 # type: (Response) -> bool | |
21 return getattr(response, "from_cache", False) | |
22 | |
23 | |
24 @contextmanager | |
25 def suppressed_cache_errors(): | |
26 # type: () -> Iterator[None] | |
27 """If we can't access the cache then we can just skip caching and process | |
28 requests as if caching wasn't enabled. | |
29 """ | |
30 try: | |
31 yield | |
32 except OSError: | |
33 pass | |
34 | |
35 | |
36 class SafeFileCache(BaseCache): | |
37 """ | |
38 A file based cache which is safe to use even when the target directory may | |
39 not be accessible or writable. | |
40 """ | |
41 | |
42 def __init__(self, directory): | |
43 # type: (str) -> None | |
44 assert directory is not None, "Cache directory must not be None." | |
45 super().__init__() | |
46 self.directory = directory | |
47 | |
48 def _get_cache_path(self, name): | |
49 # type: (str) -> str | |
50 # From cachecontrol.caches.file_cache.FileCache._fn, brought into our | |
51 # class for backwards-compatibility and to avoid using a non-public | |
52 # method. | |
53 hashed = FileCache.encode(name) | |
54 parts = list(hashed[:5]) + [hashed] | |
55 return os.path.join(self.directory, *parts) | |
56 | |
57 def get(self, key): | |
58 # type: (str) -> Optional[bytes] | |
59 path = self._get_cache_path(key) | |
60 with suppressed_cache_errors(): | |
61 with open(path, 'rb') as f: | |
62 return f.read() | |
63 | |
64 def set(self, key, value): | |
65 # type: (str, bytes) -> None | |
66 path = self._get_cache_path(key) | |
67 with suppressed_cache_errors(): | |
68 ensure_dir(os.path.dirname(path)) | |
69 | |
70 with adjacent_tmp_file(path) as f: | |
71 f.write(value) | |
72 | |
73 replace(f.name, path) | |
74 | |
75 def delete(self, key): | |
76 # type: (str) -> None | |
77 path = self._get_cache_path(key) | |
78 with suppressed_cache_errors(): | |
79 os.remove(path) |