comparison env/lib/python3.9/site-packages/cwltool/stdfsaccess.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Abstracted IO access."""
2
3 import glob
4 import os
5 import urllib
6 from typing import IO, Any, List
7
8 from schema_salad.ref_resolver import file_uri, uri_file_path
9
10 from .utils import onWindows
11
12
13 def abspath(src: str, basedir: str) -> str:
14 if src.startswith("file://"):
15 abpath = uri_file_path(src)
16 elif urllib.parse.urlsplit(src).scheme in ["http", "https"]:
17 return src
18 else:
19 if basedir.startswith("file://"):
20 abpath = src if os.path.isabs(src) else basedir + "/" + src
21 else:
22 abpath = src if os.path.isabs(src) else os.path.join(basedir, src)
23 return abpath
24
25
26 class StdFsAccess:
27 """Local filesystem implementation."""
28
29 def __init__(self, basedir: str) -> None:
30 """Perform operations with respect to a base directory."""
31 self.basedir = basedir
32
33 def _abs(self, p: str) -> str:
34 return abspath(p, self.basedir)
35
36 def glob(self, pattern: str) -> List[str]:
37 return [
38 file_uri(str(self._abs(line))) for line in glob.glob(self._abs(pattern))
39 ]
40
41 def open(self, fn: str, mode: str) -> IO[Any]:
42 return open(self._abs(fn), mode)
43
44 def exists(self, fn: str) -> bool:
45 return os.path.exists(self._abs(fn))
46
47 def size(self, fn: str) -> int:
48 return os.stat(self._abs(fn)).st_size
49
50 def isfile(self, fn: str) -> bool:
51 return os.path.isfile(self._abs(fn))
52
53 def isdir(self, fn: str) -> bool:
54 return os.path.isdir(self._abs(fn))
55
56 def listdir(self, fn: str) -> List[str]:
57 return [
58 abspath(urllib.parse.quote(entry), fn)
59 for entry in os.listdir(self._abs(fn))
60 ]
61
62 def join(self, path, *paths): # type: (str, *str) -> str
63 return os.path.join(path, *paths)
64
65 def realpath(self, path: str) -> str:
66 return os.path.realpath(path)
67
68 # On windows os.path.realpath appends unecessary Drive, here we would avoid that
69 def docker_compatible_realpath(self, path: str) -> str:
70 if onWindows():
71 if path.startswith("/"):
72 return path
73 return "/" + path
74 return self.realpath(path)