Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/utils.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Shared functions and other definitions.""" | |
2 | |
3 import collections | |
4 import os | |
5 import platform | |
6 import random | |
7 import shutil | |
8 import stat | |
9 import string | |
10 import subprocess # nosec | |
11 import sys | |
12 import tempfile | |
13 import urllib | |
14 import uuid | |
15 from functools import partial | |
16 from itertools import zip_longest | |
17 from pathlib import Path, PurePosixPath | |
18 from tempfile import NamedTemporaryFile | |
19 from types import ModuleType | |
20 from typing import ( | |
21 IO, | |
22 Any, | |
23 Callable, | |
24 Dict, | |
25 Generator, | |
26 Iterable, | |
27 List, | |
28 MutableMapping, | |
29 MutableSequence, | |
30 NamedTuple, | |
31 Optional, | |
32 Set, | |
33 Union, | |
34 cast, | |
35 ) | |
36 | |
37 import pkg_resources | |
38 import requests | |
39 from cachecontrol import CacheControl | |
40 from cachecontrol.caches import FileCache | |
41 from mypy_extensions import TypedDict | |
42 from schema_salad.exceptions import ValidationException | |
43 from schema_salad.ref_resolver import Loader | |
44 from typing_extensions import TYPE_CHECKING, Deque | |
45 | |
46 if TYPE_CHECKING: | |
47 from .command_line_tool import CallbackJob, ExpressionJob | |
48 from .job import CommandLineJob, JobBase | |
49 from .stdfsaccess import StdFsAccess | |
50 from .workflow_job import WorkflowJob | |
51 | |
52 __random_outdir = None # type: Optional[str] | |
53 | |
54 CONTENT_LIMIT = 64 * 1024 | |
55 | |
56 windows_default_container_id = "frolvlad/alpine-bash" | |
57 | |
58 DEFAULT_TMP_PREFIX = tempfile.gettempdir() + os.path.sep | |
59 | |
60 processes_to_kill = collections.deque() # type: Deque[subprocess.Popen[str]] | |
61 | |
62 CWLOutputAtomType = Union[ | |
63 None, | |
64 bool, | |
65 str, | |
66 int, | |
67 float, | |
68 MutableSequence[ | |
69 Union[ | |
70 None, bool, str, int, float, MutableSequence[Any], MutableMapping[str, Any] | |
71 ] | |
72 ], | |
73 MutableMapping[ | |
74 str, | |
75 Union[ | |
76 None, bool, str, int, float, MutableSequence[Any], MutableMapping[str, Any] | |
77 ], | |
78 ], | |
79 ] | |
80 CWLOutputType = Union[ | |
81 bool, | |
82 str, | |
83 int, | |
84 float, | |
85 MutableSequence[CWLOutputAtomType], | |
86 MutableMapping[str, CWLOutputAtomType], | |
87 ] | |
88 CWLObjectType = MutableMapping[str, Optional[CWLOutputType]] | |
89 JobsType = Union[ | |
90 "CommandLineJob", "JobBase", "WorkflowJob", "ExpressionJob", "CallbackJob" | |
91 ] | |
92 JobsGeneratorType = Generator[Optional[JobsType], None, None] | |
93 OutputCallbackType = Callable[[Optional[CWLObjectType], str], None] | |
94 ResolverType = Callable[["Loader", str], Optional[str]] | |
95 DestinationsType = MutableMapping[str, Optional[CWLOutputType]] | |
96 ScatterDestinationsType = MutableMapping[str, List[Optional[CWLOutputType]]] | |
97 ScatterOutputCallbackType = Callable[[Optional[ScatterDestinationsType], str], None] | |
98 SinkType = Union[CWLOutputType, CWLObjectType] | |
99 DirectoryType = TypedDict( | |
100 "DirectoryType", {"class": str, "listing": List[CWLObjectType], "basename": str} | |
101 ) | |
102 JSONAtomType = Union[Dict[str, Any], List[Any], str, int, float, bool, None] | |
103 JSONType = Union[ | |
104 Dict[str, JSONAtomType], List[JSONAtomType], str, int, float, bool, None | |
105 ] | |
106 WorkflowStateItem = NamedTuple( | |
107 "WorkflowStateItem", | |
108 [ | |
109 ("parameter", CWLObjectType), | |
110 ("value", Optional[CWLOutputType]), | |
111 ("success", str), | |
112 ], | |
113 ) | |
114 | |
115 ParametersType = List[CWLObjectType] | |
116 StepType = CWLObjectType # WorkflowStep | |
117 | |
118 | |
119 def versionstring() -> str: | |
120 """Version of CWLtool used to execute the workflow.""" | |
121 pkg = pkg_resources.require("cwltool") | |
122 if pkg: | |
123 return "{} {}".format(sys.argv[0], pkg[0].version) | |
124 return "{} {}".format(sys.argv[0], "unknown version") | |
125 | |
126 | |
127 def aslist(thing: Any) -> MutableSequence[Any]: | |
128 """Wrap any non-MutableSequence/list in a list.""" | |
129 if isinstance(thing, MutableSequence): | |
130 return thing | |
131 return [thing] | |
132 | |
133 | |
134 def copytree_with_merge(src: str, dst: str) -> None: | |
135 if not os.path.exists(dst): | |
136 os.makedirs(dst) | |
137 shutil.copystat(src, dst) | |
138 lst = os.listdir(src) | |
139 for item in lst: | |
140 spath = os.path.join(src, item) | |
141 dpath = os.path.join(dst, item) | |
142 if os.path.isdir(spath): | |
143 copytree_with_merge(spath, dpath) | |
144 else: | |
145 shutil.copy2(spath, dpath) | |
146 | |
147 | |
148 def docker_windows_path_adjust(path: str) -> str: | |
149 r""" | |
150 Adjust only windows paths for Docker. | |
151 | |
152 The docker run command treats them as unix paths. | |
153 | |
154 Example: 'C:\Users\foo to /C/Users/foo (Docker for Windows) or /c/Users/foo | |
155 (Docker toolbox). | |
156 """ | |
157 if onWindows(): | |
158 split = path.split(":") | |
159 if len(split) == 2: | |
160 if platform.win32_ver()[0] in ("7", "8"): | |
161 # Docker toolbox uses lowecase windows Drive letters | |
162 split[0] = split[0].lower() | |
163 else: | |
164 split[0] = split[0].capitalize() | |
165 # Docker for Windows uses uppercase windows Drive letters | |
166 path = ":".join(split) | |
167 path = path.replace(":", "").replace("\\", "/") | |
168 return path if path[0] == "/" else "/" + path | |
169 return path | |
170 | |
171 | |
172 def docker_windows_reverse_path_adjust(path: str) -> str: | |
173 r""" | |
174 Change docker path (only on windows os) appropriately back to Windows path. | |
175 | |
176 Example: /C/Users/foo to C:\Users\foo | |
177 """ | |
178 if path is not None and onWindows(): | |
179 if path[0] == "/": | |
180 path = path[1:] | |
181 else: | |
182 raise ValueError("not a docker path") | |
183 splitpath = path.split("/") | |
184 splitpath[0] = splitpath[0] + ":" | |
185 return "\\".join(splitpath) | |
186 return path | |
187 | |
188 | |
189 def docker_windows_reverse_fileuri_adjust(fileuri: str) -> str: | |
190 r""" | |
191 Convert fileuri to be MS Windows comptabile, if needed. | |
192 | |
193 On docker in windows fileuri do not contain : in path | |
194 To convert this file uri to windows compatible add : after drive letter, | |
195 so file:///E/var becomes file:///E:/var | |
196 """ | |
197 if fileuri is not None and onWindows(): | |
198 if urllib.parse.urlsplit(fileuri).scheme == "file": | |
199 filesplit = fileuri.split("/") | |
200 if filesplit[3][-1] != ":": | |
201 filesplit[3] = filesplit[3] + ":" | |
202 return "/".join(filesplit) | |
203 return fileuri | |
204 raise ValueError("not a file URI") | |
205 return fileuri | |
206 | |
207 | |
208 def onWindows() -> bool: | |
209 """Check if we are on Windows OS.""" | |
210 return os.name == "nt" | |
211 | |
212 | |
213 def convert_pathsep_to_unix(path: str) -> str: | |
214 """ | |
215 Convert path seperators to unix style. | |
216 | |
217 On windows os.path.join would use backslash to join path, since we would | |
218 use these paths in Docker we would convert it to use forward slashes: / | |
219 """ | |
220 if path is not None and onWindows(): | |
221 return path.replace("\\", "/") | |
222 return path | |
223 | |
224 | |
225 def cmp_like_py2(dict1: Dict[str, Any], dict2: Dict[str, Any]) -> int: | |
226 """ | |
227 Compare in the same manner as Python2. | |
228 | |
229 Comparision function to be used in sorting as python3 doesn't allow sorting | |
230 of different types like str() and int(). | |
231 This function re-creates sorting nature in py2 of heterogeneous list of | |
232 `int` and `str` | |
233 """ | |
234 # extract lists from both dicts | |
235 first, second = dict1["position"], dict2["position"] | |
236 # iterate through both list till max of their size | |
237 for i, j in zip_longest(first, second): | |
238 if i == j: | |
239 continue | |
240 # in case 1st list is smaller | |
241 # should come first in sorting | |
242 if i is None: | |
243 return -1 | |
244 # if 1st list is longer, | |
245 # it should come later in sort | |
246 elif j is None: | |
247 return 1 | |
248 | |
249 # if either of the list contains str element | |
250 # at any index, both should be str before comparing | |
251 if isinstance(i, str) or isinstance(j, str): | |
252 return 1 if str(i) > str(j) else -1 | |
253 # int comparison otherwise | |
254 return 1 if i > j else -1 | |
255 # if both lists are equal | |
256 return 0 | |
257 | |
258 | |
259 def bytes2str_in_dicts( | |
260 inp: Union[MutableMapping[str, Any], MutableSequence[Any], Any], | |
261 ): | |
262 # type: (...) -> Union[str, MutableSequence[Any], MutableMapping[str, Any]] | |
263 """ | |
264 Convert any present byte string to unicode string, inplace. | |
265 | |
266 input is a dict of nested dicts and lists | |
267 """ | |
268 # if input is dict, recursively call for each value | |
269 if isinstance(inp, MutableMapping): | |
270 for k in inp: | |
271 inp[k] = bytes2str_in_dicts(inp[k]) | |
272 return inp | |
273 | |
274 # if list, iterate through list and fn call | |
275 # for all its elements | |
276 if isinstance(inp, MutableSequence): | |
277 for idx, value in enumerate(inp): | |
278 inp[idx] = bytes2str_in_dicts(value) | |
279 return inp | |
280 | |
281 # if value is bytes, return decoded string, | |
282 elif isinstance(inp, bytes): | |
283 return inp.decode("utf-8") | |
284 | |
285 # simply return elements itself | |
286 return inp | |
287 | |
288 | |
289 def visit_class(rec: Any, cls: Iterable[Any], op: Callable[..., Any]) -> None: | |
290 """Apply a function to with "class" in cls.""" | |
291 if isinstance(rec, MutableMapping): | |
292 if "class" in rec and rec.get("class") in cls: | |
293 op(rec) | |
294 for d in rec: | |
295 visit_class(rec[d], cls, op) | |
296 if isinstance(rec, MutableSequence): | |
297 for d in rec: | |
298 visit_class(d, cls, op) | |
299 | |
300 | |
301 def visit_field(rec: Any, field: str, op: Callable[..., Any]) -> None: | |
302 """Apply a function to mapping with 'field'.""" | |
303 if isinstance(rec, MutableMapping): | |
304 if field in rec: | |
305 rec[field] = op(rec[field]) | |
306 for d in rec: | |
307 visit_field(rec[d], field, op) | |
308 if isinstance(rec, MutableSequence): | |
309 for d in rec: | |
310 visit_field(d, field, op) | |
311 | |
312 | |
313 def random_outdir() -> str: | |
314 """Return the random directory name chosen to use for tool / workflow output.""" | |
315 global __random_outdir | |
316 if not __random_outdir: | |
317 __random_outdir = "/" + "".join( | |
318 [random.choice(string.ascii_letters) for _ in range(6)] # nosec | |
319 ) | |
320 return __random_outdir | |
321 return __random_outdir | |
322 | |
323 | |
324 # | |
325 # Simple multi-platform (fcntl/msvrt) file locking wrapper | |
326 # | |
327 fcntl = None # type: Optional[ModuleType] | |
328 msvcrt = None # type: Optional[ModuleType] | |
329 try: | |
330 import fcntl # type: ignore | |
331 except ImportError: | |
332 import msvcrt # type: ignore | |
333 | |
334 | |
335 def shared_file_lock(fd: IO[Any]) -> None: | |
336 if fcntl: | |
337 fcntl.flock(fd.fileno(), fcntl.LOCK_SH) # type: ignore | |
338 elif msvcrt: | |
339 msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1024) # type: ignore | |
340 | |
341 | |
342 def upgrade_lock(fd: IO[Any]) -> None: | |
343 if fcntl: | |
344 fcntl.flock(fd.fileno(), fcntl.LOCK_EX) # type: ignore | |
345 elif msvcrt: | |
346 pass | |
347 | |
348 | |
349 def adjustFileObjs( | |
350 rec, op | |
351 ): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None | |
352 """Apply an update function to each File object in the object `rec`.""" | |
353 visit_class(rec, ("File",), op) | |
354 | |
355 | |
356 def adjustDirObjs(rec, op): | |
357 # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None | |
358 """Apply an update function to each Directory object in the object `rec`.""" | |
359 visit_class(rec, ("Directory",), op) | |
360 | |
361 | |
362 def dedup(listing: List[CWLObjectType]) -> List[CWLObjectType]: | |
363 marksub = set() | |
364 | |
365 def mark(d: Dict[str, str]) -> None: | |
366 marksub.add(d["location"]) | |
367 | |
368 for entry in listing: | |
369 if entry["class"] == "Directory": | |
370 for e in cast(List[CWLObjectType], entry.get("listing", [])): | |
371 adjustFileObjs(e, mark) | |
372 adjustDirObjs(e, mark) | |
373 | |
374 dd = [] | |
375 markdup = set() # type: Set[str] | |
376 for r in listing: | |
377 if r["location"] not in marksub and r["location"] not in markdup: | |
378 dd.append(r) | |
379 markdup.add(cast(str, r["location"])) | |
380 | |
381 return dd | |
382 | |
383 | |
384 def get_listing( | |
385 fs_access: "StdFsAccess", rec: CWLObjectType, recursive: bool = True | |
386 ) -> None: | |
387 if rec.get("class") != "Directory": | |
388 finddirs = [] # type: List[CWLObjectType] | |
389 visit_class(rec, ("Directory",), finddirs.append) | |
390 for f in finddirs: | |
391 get_listing(fs_access, f, recursive=recursive) | |
392 return | |
393 if "listing" in rec: | |
394 return | |
395 listing = [] # type: List[CWLOutputAtomType] | |
396 loc = cast(str, rec["location"]) | |
397 for ld in fs_access.listdir(loc): | |
398 parse = urllib.parse.urlparse(ld) | |
399 bn = os.path.basename(urllib.request.url2pathname(parse.path)) | |
400 if fs_access.isdir(ld): | |
401 ent = { | |
402 "class": "Directory", | |
403 "location": ld, | |
404 "basename": bn, | |
405 } # type: MutableMapping[str, Any] | |
406 if recursive: | |
407 get_listing(fs_access, ent, recursive) | |
408 listing.append(ent) | |
409 else: | |
410 listing.append({"class": "File", "location": ld, "basename": bn}) | |
411 rec["listing"] = listing | |
412 | |
413 | |
414 def trim_listing(obj): # type: (Dict[str, Any]) -> None | |
415 """ | |
416 Remove 'listing' field from Directory objects that are file references. | |
417 | |
418 It redundant and potentially expensive to pass fully enumerated Directory | |
419 objects around if not explicitly needed, so delete the 'listing' field when | |
420 it is safe to do so. | |
421 """ | |
422 if obj.get("location", "").startswith("file://") and "listing" in obj: | |
423 del obj["listing"] | |
424 | |
425 | |
426 def downloadHttpFile(httpurl): | |
427 # type: (str) -> str | |
428 cache_session = None | |
429 if "XDG_CACHE_HOME" in os.environ: | |
430 directory = os.environ["XDG_CACHE_HOME"] | |
431 elif "HOME" in os.environ: | |
432 directory = os.environ["HOME"] | |
433 else: | |
434 directory = os.path.expanduser("~") | |
435 | |
436 cache_session = CacheControl( | |
437 requests.Session(), | |
438 cache=FileCache(os.path.join(directory, ".cache", "cwltool")), | |
439 ) | |
440 | |
441 r = cache_session.get(httpurl, stream=True) | |
442 with NamedTemporaryFile(mode="wb", delete=False) as f: | |
443 for chunk in r.iter_content(chunk_size=16384): | |
444 if chunk: # filter out keep-alive new chunks | |
445 f.write(chunk) | |
446 r.close() | |
447 return str(f.name) | |
448 | |
449 | |
450 def ensure_writable(path): # type: (str) -> None | |
451 if os.path.isdir(path): | |
452 for root, dirs, files in os.walk(path): | |
453 for name in files: | |
454 j = os.path.join(root, name) | |
455 st = os.stat(j) | |
456 mode = stat.S_IMODE(st.st_mode) | |
457 os.chmod(j, mode | stat.S_IWUSR) | |
458 for name in dirs: | |
459 j = os.path.join(root, name) | |
460 st = os.stat(j) | |
461 mode = stat.S_IMODE(st.st_mode) | |
462 os.chmod(j, mode | stat.S_IWUSR) | |
463 else: | |
464 st = os.stat(path) | |
465 mode = stat.S_IMODE(st.st_mode) | |
466 os.chmod(path, mode | stat.S_IWUSR) | |
467 | |
468 | |
469 def ensure_non_writable(path): # type: (str) -> None | |
470 if os.path.isdir(path): | |
471 for root, dirs, files in os.walk(path): | |
472 for name in files: | |
473 j = os.path.join(root, name) | |
474 st = os.stat(j) | |
475 mode = stat.S_IMODE(st.st_mode) | |
476 os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) | |
477 for name in dirs: | |
478 j = os.path.join(root, name) | |
479 st = os.stat(j) | |
480 mode = stat.S_IMODE(st.st_mode) | |
481 os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) | |
482 else: | |
483 st = os.stat(path) | |
484 mode = stat.S_IMODE(st.st_mode) | |
485 os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) | |
486 | |
487 | |
488 def normalizeFilesDirs( | |
489 job: Optional[ | |
490 Union[ | |
491 MutableSequence[MutableMapping[str, Any]], | |
492 MutableMapping[str, Any], | |
493 DirectoryType, | |
494 ] | |
495 ] | |
496 ) -> None: | |
497 def addLocation(d): # type: (Dict[str, Any]) -> None | |
498 if "location" not in d: | |
499 if d["class"] == "File" and ("contents" not in d): | |
500 raise ValidationException( | |
501 "Anonymous file object must have 'contents' and 'basename' fields." | |
502 ) | |
503 if d["class"] == "Directory" and ( | |
504 "listing" not in d or "basename" not in d | |
505 ): | |
506 raise ValidationException( | |
507 "Anonymous directory object must have 'listing' and 'basename' fields." | |
508 ) | |
509 d["location"] = "_:" + str(uuid.uuid4()) | |
510 if "basename" not in d: | |
511 d["basename"] = d["location"][2:] | |
512 | |
513 parse = urllib.parse.urlparse(d["location"]) | |
514 path = parse.path | |
515 # strip trailing slash | |
516 if path.endswith("/"): | |
517 if d["class"] != "Directory": | |
518 raise ValidationException( | |
519 "location '%s' ends with '/' but is not a Directory" % d["location"] | |
520 ) | |
521 path = path.rstrip("/") | |
522 d["location"] = urllib.parse.urlunparse( | |
523 ( | |
524 parse.scheme, | |
525 parse.netloc, | |
526 path, | |
527 parse.params, | |
528 parse.query, | |
529 parse.fragment, | |
530 ) | |
531 ) | |
532 | |
533 if not d.get("basename"): | |
534 if path.startswith("_:"): | |
535 d["basename"] = str(path[2:]) | |
536 else: | |
537 d["basename"] = str(os.path.basename(urllib.request.url2pathname(path))) | |
538 | |
539 if d["class"] == "File": | |
540 nr, ne = os.path.splitext(d["basename"]) | |
541 if d.get("nameroot") != nr: | |
542 d["nameroot"] = str(nr) | |
543 if d.get("nameext") != ne: | |
544 d["nameext"] = str(ne) | |
545 | |
546 visit_class(job, ("File", "Directory"), addLocation) | |
547 | |
548 | |
549 def posix_path(local_path: str) -> str: | |
550 return str(PurePosixPath(Path(local_path))) | |
551 | |
552 | |
553 def local_path(posix_path: str) -> str: | |
554 return str(Path(posix_path)) | |
555 | |
556 | |
557 def create_tmp_dir(tmpdir_prefix: str) -> str: | |
558 """Create a temporary directory that respects the given tmpdir_prefix.""" | |
559 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) | |
560 return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir) |