Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/stdfsaccess.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 """Abstracted IO access.""" | |
| 2 | |
| 3 import glob | |
| 4 import os | |
| 5 import urllib | |
| 6 from typing import IO, Any, List | |
| 7 | |
| 8 from schema_salad.ref_resolver import file_uri, uri_file_path | |
| 9 | |
| 10 from .utils import onWindows | |
| 11 | |
| 12 | |
| 13 def abspath(src: str, basedir: str) -> str: | |
| 14 if src.startswith("file://"): | |
| 15 abpath = uri_file_path(src) | |
| 16 elif urllib.parse.urlsplit(src).scheme in ["http", "https"]: | |
| 17 return src | |
| 18 else: | |
| 19 if basedir.startswith("file://"): | |
| 20 abpath = src if os.path.isabs(src) else basedir + "/" + src | |
| 21 else: | |
| 22 abpath = src if os.path.isabs(src) else os.path.join(basedir, src) | |
| 23 return abpath | |
| 24 | |
| 25 | |
| 26 class StdFsAccess: | |
| 27 """Local filesystem implementation.""" | |
| 28 | |
| 29 def __init__(self, basedir: str) -> None: | |
| 30 """Perform operations with respect to a base directory.""" | |
| 31 self.basedir = basedir | |
| 32 | |
| 33 def _abs(self, p: str) -> str: | |
| 34 return abspath(p, self.basedir) | |
| 35 | |
| 36 def glob(self, pattern: str) -> List[str]: | |
| 37 return [ | |
| 38 file_uri(str(self._abs(line))) for line in glob.glob(self._abs(pattern)) | |
| 39 ] | |
| 40 | |
| 41 def open(self, fn: str, mode: str) -> IO[Any]: | |
| 42 return open(self._abs(fn), mode) | |
| 43 | |
| 44 def exists(self, fn: str) -> bool: | |
| 45 return os.path.exists(self._abs(fn)) | |
| 46 | |
| 47 def size(self, fn: str) -> int: | |
| 48 return os.stat(self._abs(fn)).st_size | |
| 49 | |
| 50 def isfile(self, fn: str) -> bool: | |
| 51 return os.path.isfile(self._abs(fn)) | |
| 52 | |
| 53 def isdir(self, fn: str) -> bool: | |
| 54 return os.path.isdir(self._abs(fn)) | |
| 55 | |
| 56 def listdir(self, fn: str) -> List[str]: | |
| 57 return [ | |
| 58 abspath(urllib.parse.quote(entry), fn) | |
| 59 for entry in os.listdir(self._abs(fn)) | |
| 60 ] | |
| 61 | |
| 62 def join(self, path, *paths): # type: (str, *str) -> str | |
| 63 return os.path.join(path, *paths) | |
| 64 | |
| 65 def realpath(self, path: str) -> str: | |
| 66 return os.path.realpath(path) | |
| 67 | |
| 68 # On windows os.path.realpath appends unecessary Drive, here we would avoid that | |
| 69 def docker_compatible_realpath(self, path: str) -> str: | |
| 70 if onWindows(): | |
| 71 if path.startswith("/"): | |
| 72 return path | |
| 73 return "/" + path | |
| 74 return self.realpath(path) |
