comparison env/lib/python3.9/site-packages/cwltool/utils.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Shared functions and other definitions."""
2
3 import collections
4 import os
5 import platform
6 import random
7 import shutil
8 import stat
9 import string
10 import subprocess # nosec
11 import sys
12 import tempfile
13 import urllib
14 import uuid
15 from functools import partial
16 from itertools import zip_longest
17 from pathlib import Path, PurePosixPath
18 from tempfile import NamedTemporaryFile
19 from types import ModuleType
20 from typing import (
21 IO,
22 Any,
23 Callable,
24 Dict,
25 Generator,
26 Iterable,
27 List,
28 MutableMapping,
29 MutableSequence,
30 NamedTuple,
31 Optional,
32 Set,
33 Union,
34 cast,
35 )
36
37 import pkg_resources
38 import requests
39 from cachecontrol import CacheControl
40 from cachecontrol.caches import FileCache
41 from mypy_extensions import TypedDict
42 from schema_salad.exceptions import ValidationException
43 from schema_salad.ref_resolver import Loader
44 from typing_extensions import TYPE_CHECKING, Deque
45
46 if TYPE_CHECKING:
47 from .command_line_tool import CallbackJob, ExpressionJob
48 from .job import CommandLineJob, JobBase
49 from .stdfsaccess import StdFsAccess
50 from .workflow_job import WorkflowJob
51
52 __random_outdir = None # type: Optional[str]
53
54 CONTENT_LIMIT = 64 * 1024
55
56 windows_default_container_id = "frolvlad/alpine-bash"
57
58 DEFAULT_TMP_PREFIX = tempfile.gettempdir() + os.path.sep
59
60 processes_to_kill = collections.deque() # type: Deque[subprocess.Popen[str]]
61
62 CWLOutputAtomType = Union[
63 None,
64 bool,
65 str,
66 int,
67 float,
68 MutableSequence[
69 Union[
70 None, bool, str, int, float, MutableSequence[Any], MutableMapping[str, Any]
71 ]
72 ],
73 MutableMapping[
74 str,
75 Union[
76 None, bool, str, int, float, MutableSequence[Any], MutableMapping[str, Any]
77 ],
78 ],
79 ]
80 CWLOutputType = Union[
81 bool,
82 str,
83 int,
84 float,
85 MutableSequence[CWLOutputAtomType],
86 MutableMapping[str, CWLOutputAtomType],
87 ]
88 CWLObjectType = MutableMapping[str, Optional[CWLOutputType]]
89 JobsType = Union[
90 "CommandLineJob", "JobBase", "WorkflowJob", "ExpressionJob", "CallbackJob"
91 ]
92 JobsGeneratorType = Generator[Optional[JobsType], None, None]
93 OutputCallbackType = Callable[[Optional[CWLObjectType], str], None]
94 ResolverType = Callable[["Loader", str], Optional[str]]
95 DestinationsType = MutableMapping[str, Optional[CWLOutputType]]
96 ScatterDestinationsType = MutableMapping[str, List[Optional[CWLOutputType]]]
97 ScatterOutputCallbackType = Callable[[Optional[ScatterDestinationsType], str], None]
98 SinkType = Union[CWLOutputType, CWLObjectType]
99 DirectoryType = TypedDict(
100 "DirectoryType", {"class": str, "listing": List[CWLObjectType], "basename": str}
101 )
102 JSONAtomType = Union[Dict[str, Any], List[Any], str, int, float, bool, None]
103 JSONType = Union[
104 Dict[str, JSONAtomType], List[JSONAtomType], str, int, float, bool, None
105 ]
106 WorkflowStateItem = NamedTuple(
107 "WorkflowStateItem",
108 [
109 ("parameter", CWLObjectType),
110 ("value", Optional[CWLOutputType]),
111 ("success", str),
112 ],
113 )
114
115 ParametersType = List[CWLObjectType]
116 StepType = CWLObjectType # WorkflowStep
117
118
119 def versionstring() -> str:
120 """Version of CWLtool used to execute the workflow."""
121 pkg = pkg_resources.require("cwltool")
122 if pkg:
123 return "{} {}".format(sys.argv[0], pkg[0].version)
124 return "{} {}".format(sys.argv[0], "unknown version")
125
126
127 def aslist(thing: Any) -> MutableSequence[Any]:
128 """Wrap any non-MutableSequence/list in a list."""
129 if isinstance(thing, MutableSequence):
130 return thing
131 return [thing]
132
133
134 def copytree_with_merge(src: str, dst: str) -> None:
135 if not os.path.exists(dst):
136 os.makedirs(dst)
137 shutil.copystat(src, dst)
138 lst = os.listdir(src)
139 for item in lst:
140 spath = os.path.join(src, item)
141 dpath = os.path.join(dst, item)
142 if os.path.isdir(spath):
143 copytree_with_merge(spath, dpath)
144 else:
145 shutil.copy2(spath, dpath)
146
147
148 def docker_windows_path_adjust(path: str) -> str:
149 r"""
150 Adjust only windows paths for Docker.
151
152 The docker run command treats them as unix paths.
153
154 Example: 'C:\Users\foo to /C/Users/foo (Docker for Windows) or /c/Users/foo
155 (Docker toolbox).
156 """
157 if onWindows():
158 split = path.split(":")
159 if len(split) == 2:
160 if platform.win32_ver()[0] in ("7", "8"):
161 # Docker toolbox uses lowecase windows Drive letters
162 split[0] = split[0].lower()
163 else:
164 split[0] = split[0].capitalize()
165 # Docker for Windows uses uppercase windows Drive letters
166 path = ":".join(split)
167 path = path.replace(":", "").replace("\\", "/")
168 return path if path[0] == "/" else "/" + path
169 return path
170
171
172 def docker_windows_reverse_path_adjust(path: str) -> str:
173 r"""
174 Change docker path (only on windows os) appropriately back to Windows path.
175
176 Example: /C/Users/foo to C:\Users\foo
177 """
178 if path is not None and onWindows():
179 if path[0] == "/":
180 path = path[1:]
181 else:
182 raise ValueError("not a docker path")
183 splitpath = path.split("/")
184 splitpath[0] = splitpath[0] + ":"
185 return "\\".join(splitpath)
186 return path
187
188
189 def docker_windows_reverse_fileuri_adjust(fileuri: str) -> str:
190 r"""
191 Convert fileuri to be MS Windows comptabile, if needed.
192
193 On docker in windows fileuri do not contain : in path
194 To convert this file uri to windows compatible add : after drive letter,
195 so file:///E/var becomes file:///E:/var
196 """
197 if fileuri is not None and onWindows():
198 if urllib.parse.urlsplit(fileuri).scheme == "file":
199 filesplit = fileuri.split("/")
200 if filesplit[3][-1] != ":":
201 filesplit[3] = filesplit[3] + ":"
202 return "/".join(filesplit)
203 return fileuri
204 raise ValueError("not a file URI")
205 return fileuri
206
207
208 def onWindows() -> bool:
209 """Check if we are on Windows OS."""
210 return os.name == "nt"
211
212
213 def convert_pathsep_to_unix(path: str) -> str:
214 """
215 Convert path seperators to unix style.
216
217 On windows os.path.join would use backslash to join path, since we would
218 use these paths in Docker we would convert it to use forward slashes: /
219 """
220 if path is not None and onWindows():
221 return path.replace("\\", "/")
222 return path
223
224
225 def cmp_like_py2(dict1: Dict[str, Any], dict2: Dict[str, Any]) -> int:
226 """
227 Compare in the same manner as Python2.
228
229 Comparision function to be used in sorting as python3 doesn't allow sorting
230 of different types like str() and int().
231 This function re-creates sorting nature in py2 of heterogeneous list of
232 `int` and `str`
233 """
234 # extract lists from both dicts
235 first, second = dict1["position"], dict2["position"]
236 # iterate through both list till max of their size
237 for i, j in zip_longest(first, second):
238 if i == j:
239 continue
240 # in case 1st list is smaller
241 # should come first in sorting
242 if i is None:
243 return -1
244 # if 1st list is longer,
245 # it should come later in sort
246 elif j is None:
247 return 1
248
249 # if either of the list contains str element
250 # at any index, both should be str before comparing
251 if isinstance(i, str) or isinstance(j, str):
252 return 1 if str(i) > str(j) else -1
253 # int comparison otherwise
254 return 1 if i > j else -1
255 # if both lists are equal
256 return 0
257
258
259 def bytes2str_in_dicts(
260 inp: Union[MutableMapping[str, Any], MutableSequence[Any], Any],
261 ):
262 # type: (...) -> Union[str, MutableSequence[Any], MutableMapping[str, Any]]
263 """
264 Convert any present byte string to unicode string, inplace.
265
266 input is a dict of nested dicts and lists
267 """
268 # if input is dict, recursively call for each value
269 if isinstance(inp, MutableMapping):
270 for k in inp:
271 inp[k] = bytes2str_in_dicts(inp[k])
272 return inp
273
274 # if list, iterate through list and fn call
275 # for all its elements
276 if isinstance(inp, MutableSequence):
277 for idx, value in enumerate(inp):
278 inp[idx] = bytes2str_in_dicts(value)
279 return inp
280
281 # if value is bytes, return decoded string,
282 elif isinstance(inp, bytes):
283 return inp.decode("utf-8")
284
285 # simply return elements itself
286 return inp
287
288
289 def visit_class(rec: Any, cls: Iterable[Any], op: Callable[..., Any]) -> None:
290 """Apply a function to with "class" in cls."""
291 if isinstance(rec, MutableMapping):
292 if "class" in rec and rec.get("class") in cls:
293 op(rec)
294 for d in rec:
295 visit_class(rec[d], cls, op)
296 if isinstance(rec, MutableSequence):
297 for d in rec:
298 visit_class(d, cls, op)
299
300
301 def visit_field(rec: Any, field: str, op: Callable[..., Any]) -> None:
302 """Apply a function to mapping with 'field'."""
303 if isinstance(rec, MutableMapping):
304 if field in rec:
305 rec[field] = op(rec[field])
306 for d in rec:
307 visit_field(rec[d], field, op)
308 if isinstance(rec, MutableSequence):
309 for d in rec:
310 visit_field(d, field, op)
311
312
313 def random_outdir() -> str:
314 """Return the random directory name chosen to use for tool / workflow output."""
315 global __random_outdir
316 if not __random_outdir:
317 __random_outdir = "/" + "".join(
318 [random.choice(string.ascii_letters) for _ in range(6)] # nosec
319 )
320 return __random_outdir
321 return __random_outdir
322
323
324 #
325 # Simple multi-platform (fcntl/msvrt) file locking wrapper
326 #
327 fcntl = None # type: Optional[ModuleType]
328 msvcrt = None # type: Optional[ModuleType]
329 try:
330 import fcntl # type: ignore
331 except ImportError:
332 import msvcrt # type: ignore
333
334
335 def shared_file_lock(fd: IO[Any]) -> None:
336 if fcntl:
337 fcntl.flock(fd.fileno(), fcntl.LOCK_SH) # type: ignore
338 elif msvcrt:
339 msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1024) # type: ignore
340
341
342 def upgrade_lock(fd: IO[Any]) -> None:
343 if fcntl:
344 fcntl.flock(fd.fileno(), fcntl.LOCK_EX) # type: ignore
345 elif msvcrt:
346 pass
347
348
349 def adjustFileObjs(
350 rec, op
351 ): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
352 """Apply an update function to each File object in the object `rec`."""
353 visit_class(rec, ("File",), op)
354
355
356 def adjustDirObjs(rec, op):
357 # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
358 """Apply an update function to each Directory object in the object `rec`."""
359 visit_class(rec, ("Directory",), op)
360
361
362 def dedup(listing: List[CWLObjectType]) -> List[CWLObjectType]:
363 marksub = set()
364
365 def mark(d: Dict[str, str]) -> None:
366 marksub.add(d["location"])
367
368 for entry in listing:
369 if entry["class"] == "Directory":
370 for e in cast(List[CWLObjectType], entry.get("listing", [])):
371 adjustFileObjs(e, mark)
372 adjustDirObjs(e, mark)
373
374 dd = []
375 markdup = set() # type: Set[str]
376 for r in listing:
377 if r["location"] not in marksub and r["location"] not in markdup:
378 dd.append(r)
379 markdup.add(cast(str, r["location"]))
380
381 return dd
382
383
384 def get_listing(
385 fs_access: "StdFsAccess", rec: CWLObjectType, recursive: bool = True
386 ) -> None:
387 if rec.get("class") != "Directory":
388 finddirs = [] # type: List[CWLObjectType]
389 visit_class(rec, ("Directory",), finddirs.append)
390 for f in finddirs:
391 get_listing(fs_access, f, recursive=recursive)
392 return
393 if "listing" in rec:
394 return
395 listing = [] # type: List[CWLOutputAtomType]
396 loc = cast(str, rec["location"])
397 for ld in fs_access.listdir(loc):
398 parse = urllib.parse.urlparse(ld)
399 bn = os.path.basename(urllib.request.url2pathname(parse.path))
400 if fs_access.isdir(ld):
401 ent = {
402 "class": "Directory",
403 "location": ld,
404 "basename": bn,
405 } # type: MutableMapping[str, Any]
406 if recursive:
407 get_listing(fs_access, ent, recursive)
408 listing.append(ent)
409 else:
410 listing.append({"class": "File", "location": ld, "basename": bn})
411 rec["listing"] = listing
412
413
414 def trim_listing(obj): # type: (Dict[str, Any]) -> None
415 """
416 Remove 'listing' field from Directory objects that are file references.
417
418 It redundant and potentially expensive to pass fully enumerated Directory
419 objects around if not explicitly needed, so delete the 'listing' field when
420 it is safe to do so.
421 """
422 if obj.get("location", "").startswith("file://") and "listing" in obj:
423 del obj["listing"]
424
425
426 def downloadHttpFile(httpurl):
427 # type: (str) -> str
428 cache_session = None
429 if "XDG_CACHE_HOME" in os.environ:
430 directory = os.environ["XDG_CACHE_HOME"]
431 elif "HOME" in os.environ:
432 directory = os.environ["HOME"]
433 else:
434 directory = os.path.expanduser("~")
435
436 cache_session = CacheControl(
437 requests.Session(),
438 cache=FileCache(os.path.join(directory, ".cache", "cwltool")),
439 )
440
441 r = cache_session.get(httpurl, stream=True)
442 with NamedTemporaryFile(mode="wb", delete=False) as f:
443 for chunk in r.iter_content(chunk_size=16384):
444 if chunk: # filter out keep-alive new chunks
445 f.write(chunk)
446 r.close()
447 return str(f.name)
448
449
450 def ensure_writable(path): # type: (str) -> None
451 if os.path.isdir(path):
452 for root, dirs, files in os.walk(path):
453 for name in files:
454 j = os.path.join(root, name)
455 st = os.stat(j)
456 mode = stat.S_IMODE(st.st_mode)
457 os.chmod(j, mode | stat.S_IWUSR)
458 for name in dirs:
459 j = os.path.join(root, name)
460 st = os.stat(j)
461 mode = stat.S_IMODE(st.st_mode)
462 os.chmod(j, mode | stat.S_IWUSR)
463 else:
464 st = os.stat(path)
465 mode = stat.S_IMODE(st.st_mode)
466 os.chmod(path, mode | stat.S_IWUSR)
467
468
469 def ensure_non_writable(path): # type: (str) -> None
470 if os.path.isdir(path):
471 for root, dirs, files in os.walk(path):
472 for name in files:
473 j = os.path.join(root, name)
474 st = os.stat(j)
475 mode = stat.S_IMODE(st.st_mode)
476 os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
477 for name in dirs:
478 j = os.path.join(root, name)
479 st = os.stat(j)
480 mode = stat.S_IMODE(st.st_mode)
481 os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
482 else:
483 st = os.stat(path)
484 mode = stat.S_IMODE(st.st_mode)
485 os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
486
487
488 def normalizeFilesDirs(
489 job: Optional[
490 Union[
491 MutableSequence[MutableMapping[str, Any]],
492 MutableMapping[str, Any],
493 DirectoryType,
494 ]
495 ]
496 ) -> None:
497 def addLocation(d): # type: (Dict[str, Any]) -> None
498 if "location" not in d:
499 if d["class"] == "File" and ("contents" not in d):
500 raise ValidationException(
501 "Anonymous file object must have 'contents' and 'basename' fields."
502 )
503 if d["class"] == "Directory" and (
504 "listing" not in d or "basename" not in d
505 ):
506 raise ValidationException(
507 "Anonymous directory object must have 'listing' and 'basename' fields."
508 )
509 d["location"] = "_:" + str(uuid.uuid4())
510 if "basename" not in d:
511 d["basename"] = d["location"][2:]
512
513 parse = urllib.parse.urlparse(d["location"])
514 path = parse.path
515 # strip trailing slash
516 if path.endswith("/"):
517 if d["class"] != "Directory":
518 raise ValidationException(
519 "location '%s' ends with '/' but is not a Directory" % d["location"]
520 )
521 path = path.rstrip("/")
522 d["location"] = urllib.parse.urlunparse(
523 (
524 parse.scheme,
525 parse.netloc,
526 path,
527 parse.params,
528 parse.query,
529 parse.fragment,
530 )
531 )
532
533 if not d.get("basename"):
534 if path.startswith("_:"):
535 d["basename"] = str(path[2:])
536 else:
537 d["basename"] = str(os.path.basename(urllib.request.url2pathname(path)))
538
539 if d["class"] == "File":
540 nr, ne = os.path.splitext(d["basename"])
541 if d.get("nameroot") != nr:
542 d["nameroot"] = str(nr)
543 if d.get("nameext") != ne:
544 d["nameext"] = str(ne)
545
546 visit_class(job, ("File", "Directory"), addLocation)
547
548
549 def posix_path(local_path: str) -> str:
550 return str(PurePosixPath(Path(local_path)))
551
552
553 def local_path(posix_path: str) -> str:
554 return str(Path(posix_path))
555
556
557 def create_tmp_dir(tmpdir_prefix: str) -> str:
558 """Create a temporary directory that respects the given tmpdir_prefix."""
559 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
560 return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)