Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/stdfsaccess.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Abstracted IO access.""" | |
2 | |
3 import glob | |
4 import os | |
5 import urllib | |
6 from typing import IO, Any, List | |
7 | |
8 from schema_salad.ref_resolver import file_uri, uri_file_path | |
9 | |
10 from .utils import onWindows | |
11 | |
12 | |
13 def abspath(src: str, basedir: str) -> str: | |
14 if src.startswith("file://"): | |
15 abpath = uri_file_path(src) | |
16 elif urllib.parse.urlsplit(src).scheme in ["http", "https"]: | |
17 return src | |
18 else: | |
19 if basedir.startswith("file://"): | |
20 abpath = src if os.path.isabs(src) else basedir + "/" + src | |
21 else: | |
22 abpath = src if os.path.isabs(src) else os.path.join(basedir, src) | |
23 return abpath | |
24 | |
25 | |
26 class StdFsAccess: | |
27 """Local filesystem implementation.""" | |
28 | |
29 def __init__(self, basedir: str) -> None: | |
30 """Perform operations with respect to a base directory.""" | |
31 self.basedir = basedir | |
32 | |
33 def _abs(self, p: str) -> str: | |
34 return abspath(p, self.basedir) | |
35 | |
36 def glob(self, pattern: str) -> List[str]: | |
37 return [ | |
38 file_uri(str(self._abs(line))) for line in glob.glob(self._abs(pattern)) | |
39 ] | |
40 | |
41 def open(self, fn: str, mode: str) -> IO[Any]: | |
42 return open(self._abs(fn), mode) | |
43 | |
44 def exists(self, fn: str) -> bool: | |
45 return os.path.exists(self._abs(fn)) | |
46 | |
47 def size(self, fn: str) -> int: | |
48 return os.stat(self._abs(fn)).st_size | |
49 | |
50 def isfile(self, fn: str) -> bool: | |
51 return os.path.isfile(self._abs(fn)) | |
52 | |
53 def isdir(self, fn: str) -> bool: | |
54 return os.path.isdir(self._abs(fn)) | |
55 | |
56 def listdir(self, fn: str) -> List[str]: | |
57 return [ | |
58 abspath(urllib.parse.quote(entry), fn) | |
59 for entry in os.listdir(self._abs(fn)) | |
60 ] | |
61 | |
62 def join(self, path, *paths): # type: (str, *str) -> str | |
63 return os.path.join(path, *paths) | |
64 | |
65 def realpath(self, path: str) -> str: | |
66 return os.path.realpath(path) | |
67 | |
68 # On windows os.path.realpath appends unecessary Drive, here we would avoid that | |
69 def docker_compatible_realpath(self, path: str) -> str: | |
70 if onWindows(): | |
71 if path.startswith("/"): | |
72 return path | |
73 return "/" + path | |
74 return self.realpath(path) |