diff env/lib/python3.9/site-packages/cwltool/context.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/context.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,198 @@
+"""Shared context objects that replace use of kwargs."""
+import copy
+import os
+import tempfile
+import threading
+from typing import IO, Any, Callable, Dict, Iterable, List, Optional, TextIO, Union
+
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+from ruamel.yaml.comments import CommentedMap
+from schema_salad.avro.schema import Names
+from schema_salad.ref_resolver import Loader
+from schema_salad.utils import FetcherCallableType
+from typing_extensions import TYPE_CHECKING
+
+from .builder import Builder, HasReqsHints
+from .mpi import MpiConfig
+from .mutation import MutationManager
+from .pathmapper import PathMapper
+from .secrets import SecretStore
+from .software_requirements import DependenciesConfiguration
+from .stdfsaccess import StdFsAccess
+from .utils import DEFAULT_TMP_PREFIX, CWLObjectType, ResolverType
+
+if TYPE_CHECKING:
+    from .process import Process
+    from .provenance import ResearchObject  # pylint: disable=unused-import
+    from .provenance_profile import ProvenanceProfile
+
+
+class ContextBase:
+    """Shared kwargs based initilizer for {Runtime,Loading}Context."""
+
+    def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
+        """Initialize."""
+        if kwargs:
+            for k, v in kwargs.items():
+                if hasattr(self, k):
+                    setattr(self, k, v)
+
+
+def make_tool_notimpl(
+    toolpath_object: CommentedMap, loadingContext: "LoadingContext"
+) -> "Process":
+    raise NotImplementedError()
+
+
+default_make_tool = make_tool_notimpl
+
+
+class LoadingContext(ContextBase):
+    def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
+        """Initialize the LoadingContext from the kwargs."""
+        self.debug = False  # type: bool
+        self.metadata = {}  # type: CWLObjectType
+        self.requirements = None  # type: Optional[List[CWLObjectType]]
+        self.hints = None  # type: Optional[List[CWLObjectType]]
+        self.overrides_list = []  # type: List[CWLObjectType]
+        self.loader = None  # type: Optional[Loader]
+        self.avsc_names = None  # type: Optional[Names]
+        self.disable_js_validation = False  # type: bool
+        self.js_hint_options_file = None
+        self.do_validate = True  # type: bool
+        self.enable_dev = False  # type: bool
+        self.strict = True  # type: bool
+        self.resolver = None  # type: Optional[ResolverType]
+        self.fetcher_constructor = None  # type: Optional[FetcherCallableType]
+        self.construct_tool_object = default_make_tool
+        self.research_obj = None  # type: Optional[ResearchObject]
+        self.orcid = ""  # type: str
+        self.cwl_full_name = ""  # type: str
+        self.host_provenance = False  # type: bool
+        self.user_provenance = False  # type: bool
+        self.prov_obj = None  # type: Optional[ProvenanceProfile]
+        self.do_update = None  # type: Optional[bool]
+        self.jobdefaults = None  # type: Optional[CommentedMap]
+        self.doc_cache = True  # type: bool
+        self.relax_path_checks = False  # type: bool
+
+        super().__init__(kwargs)
+
+    def copy(self):
+        # type: () -> LoadingContext
+        return copy.copy(self)
+
+
+class RuntimeContext(ContextBase):
+    def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
+        """Initialize the RuntimeContext from the kwargs."""
+        select_resources_callable = Callable[  # pylint: disable=unused-variable
+            [Dict[str, Union[int, float, str]], RuntimeContext],
+            Dict[str, Union[int, float, str]],
+        ]
+        self.user_space_docker_cmd = ""  # type: Optional[str]
+        self.secret_store = None  # type: Optional[SecretStore]
+        self.no_read_only = False  # type: bool
+        self.custom_net = ""  # type: Optional[str]
+        self.no_match_user = False  # type: bool
+        self.preserve_environment = ""  # type: Optional[Iterable[str]]
+        self.preserve_entire_environment = False  # type: bool
+        self.use_container = True  # type: bool
+        self.force_docker_pull = False  # type: bool
+
+        self.tmp_outdir_prefix = ""  # type: str
+        self.tmpdir_prefix = DEFAULT_TMP_PREFIX  # type: str
+        self.tmpdir = ""  # type: str
+        self.rm_tmpdir = True  # type: bool
+        self.pull_image = True  # type: bool
+        self.rm_container = True  # type: bool
+        self.move_outputs = "move"  # type: str
+
+        self.singularity = False  # type: bool
+        self.disable_net = False  # type: bool
+        self.debug = False  # type: bool
+        self.compute_checksum = True  # type: bool
+        self.name = ""  # type: str
+        self.default_container = ""  # type: Optional[str]
+        self.find_default_container = (
+            None
+        )  # type: Optional[Callable[[HasReqsHints], Optional[str]]]
+        self.cachedir = None  # type: Optional[str]
+        self.outdir = None  # type: Optional[str]
+        self.stagedir = ""  # type: str
+        self.part_of = ""  # type: str
+        self.basedir = ""  # type: str
+        self.toplevel = False  # type: bool
+        self.mutation_manager = None  # type: Optional[MutationManager]
+        self.make_fs_access = StdFsAccess  # type: Callable[[str], StdFsAccess]
+        self.path_mapper = PathMapper
+        self.builder = None  # type: Optional[Builder]
+        self.docker_outdir = ""  # type: str
+        self.docker_tmpdir = ""  # type: str
+        self.docker_stagedir = ""  # type: str
+        self.js_console = False  # type: bool
+        self.job_script_provider = None  # type: Optional[DependenciesConfiguration]
+        self.select_resources = None  # type: Optional[select_resources_callable]
+        self.eval_timeout = 20  # type: float
+        self.postScatterEval = (
+            None
+        )  # type: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]]
+        self.on_error = "stop"  # type: str
+        self.strict_memory_limit = False  # type: bool
+
+        self.cidfile_dir = None  # type: Optional[str]
+        self.cidfile_prefix = None  # type: Optional[str]
+
+        self.workflow_eval_lock = None  # type: Optional[threading.Condition]
+        self.research_obj = None  # type: Optional[ResearchObject]
+        self.orcid = ""  # type: str
+        self.cwl_full_name = ""  # type: str
+        self.process_run_id = None  # type: Optional[str]
+        self.prov_obj = None  # type: Optional[ProvenanceProfile]
+        self.mpi_config = MpiConfig()  # type: MpiConfig
+        self.default_stdout = None  # type: Optional[Union[IO[bytes], TextIO]]
+        self.default_stderr = None  # type: Optional[Union[IO[bytes], TextIO]]
+        super().__init__(kwargs)
+        if self.tmp_outdir_prefix == "":
+            self.tmp_outdir_prefix = self.tmpdir_prefix
+
+    def get_outdir(self) -> str:
+        """Return self.outdir or create one with self.tmp_outdir_prefix."""
+        if self.outdir:
+            return self.outdir
+        return self.create_outdir()
+
+    def get_tmpdir(self) -> str:
+        """Return self.tmpdir or create one with self.tmpdir_prefix."""
+        if self.tmpdir:
+            return self.tmpdir
+        return self.create_tmpdir()
+
+    def get_stagedir(self) -> str:
+        """Return self.stagedir or create one with self.tmpdir_prefix."""
+        if self.stagedir:
+            return self.stagedir
+        tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix)
+        return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
+
+    def create_tmpdir(self) -> str:
+        """Create a temporary directory that respects self.tmpdir_prefix."""
+        tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix)
+        return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
+
+    def create_outdir(self) -> str:
+        """Create a temporary directory that respects self.tmp_outdir_prefix."""
+        out_dir, out_prefix = os.path.split(self.tmp_outdir_prefix)
+        return tempfile.mkdtemp(prefix=out_prefix, dir=out_dir)
+
+    def copy(self):
+        # type: () -> RuntimeContext
+        return copy.copy(self)
+
+
+def getdefault(val, default):
+    # type: (Any, Any) -> Any
+    if val is None:
+        return default
+    else:
+        return val