view env/lib/python3.9/site-packages/cwltool/context.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""Shared context objects that replace use of kwargs."""
import copy
import os
import tempfile
import threading
from typing import IO, Any, Callable, Dict, Iterable, List, Optional, TextIO, Union

# move to a regular typing import when Python 3.3-3.6 is no longer supported
from ruamel.yaml.comments import CommentedMap
from schema_salad.avro.schema import Names
from schema_salad.ref_resolver import Loader
from schema_salad.utils import FetcherCallableType
from typing_extensions import TYPE_CHECKING

from .builder import Builder, HasReqsHints
from .mpi import MpiConfig
from .mutation import MutationManager
from .pathmapper import PathMapper
from .secrets import SecretStore
from .software_requirements import DependenciesConfiguration
from .stdfsaccess import StdFsAccess
from .utils import DEFAULT_TMP_PREFIX, CWLObjectType, ResolverType

if TYPE_CHECKING:
    from .process import Process
    from .provenance import ResearchObject  # pylint: disable=unused-import
    from .provenance_profile import ProvenanceProfile


class ContextBase:
    """Shared kwargs based initilizer for {Runtime,Loading}Context."""

    def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
        """Initialize."""
        if kwargs:
            for k, v in kwargs.items():
                if hasattr(self, k):
                    setattr(self, k, v)


def make_tool_notimpl(
    toolpath_object: CommentedMap, loadingContext: "LoadingContext"
) -> "Process":
    raise NotImplementedError()


default_make_tool = make_tool_notimpl


class LoadingContext(ContextBase):
    def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
        """Initialize the LoadingContext from the kwargs."""
        self.debug = False  # type: bool
        self.metadata = {}  # type: CWLObjectType
        self.requirements = None  # type: Optional[List[CWLObjectType]]
        self.hints = None  # type: Optional[List[CWLObjectType]]
        self.overrides_list = []  # type: List[CWLObjectType]
        self.loader = None  # type: Optional[Loader]
        self.avsc_names = None  # type: Optional[Names]
        self.disable_js_validation = False  # type: bool
        self.js_hint_options_file = None
        self.do_validate = True  # type: bool
        self.enable_dev = False  # type: bool
        self.strict = True  # type: bool
        self.resolver = None  # type: Optional[ResolverType]
        self.fetcher_constructor = None  # type: Optional[FetcherCallableType]
        self.construct_tool_object = default_make_tool
        self.research_obj = None  # type: Optional[ResearchObject]
        self.orcid = ""  # type: str
        self.cwl_full_name = ""  # type: str
        self.host_provenance = False  # type: bool
        self.user_provenance = False  # type: bool
        self.prov_obj = None  # type: Optional[ProvenanceProfile]
        self.do_update = None  # type: Optional[bool]
        self.jobdefaults = None  # type: Optional[CommentedMap]
        self.doc_cache = True  # type: bool
        self.relax_path_checks = False  # type: bool

        super().__init__(kwargs)

    def copy(self):
        # type: () -> LoadingContext
        return copy.copy(self)


class RuntimeContext(ContextBase):
    def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
        """Initialize the RuntimeContext from the kwargs."""
        select_resources_callable = Callable[  # pylint: disable=unused-variable
            [Dict[str, Union[int, float, str]], RuntimeContext],
            Dict[str, Union[int, float, str]],
        ]
        self.user_space_docker_cmd = ""  # type: Optional[str]
        self.secret_store = None  # type: Optional[SecretStore]
        self.no_read_only = False  # type: bool
        self.custom_net = ""  # type: Optional[str]
        self.no_match_user = False  # type: bool
        self.preserve_environment = ""  # type: Optional[Iterable[str]]
        self.preserve_entire_environment = False  # type: bool
        self.use_container = True  # type: bool
        self.force_docker_pull = False  # type: bool

        self.tmp_outdir_prefix = ""  # type: str
        self.tmpdir_prefix = DEFAULT_TMP_PREFIX  # type: str
        self.tmpdir = ""  # type: str
        self.rm_tmpdir = True  # type: bool
        self.pull_image = True  # type: bool
        self.rm_container = True  # type: bool
        self.move_outputs = "move"  # type: str

        self.singularity = False  # type: bool
        self.disable_net = False  # type: bool
        self.debug = False  # type: bool
        self.compute_checksum = True  # type: bool
        self.name = ""  # type: str
        self.default_container = ""  # type: Optional[str]
        self.find_default_container = (
            None
        )  # type: Optional[Callable[[HasReqsHints], Optional[str]]]
        self.cachedir = None  # type: Optional[str]
        self.outdir = None  # type: Optional[str]
        self.stagedir = ""  # type: str
        self.part_of = ""  # type: str
        self.basedir = ""  # type: str
        self.toplevel = False  # type: bool
        self.mutation_manager = None  # type: Optional[MutationManager]
        self.make_fs_access = StdFsAccess  # type: Callable[[str], StdFsAccess]
        self.path_mapper = PathMapper
        self.builder = None  # type: Optional[Builder]
        self.docker_outdir = ""  # type: str
        self.docker_tmpdir = ""  # type: str
        self.docker_stagedir = ""  # type: str
        self.js_console = False  # type: bool
        self.job_script_provider = None  # type: Optional[DependenciesConfiguration]
        self.select_resources = None  # type: Optional[select_resources_callable]
        self.eval_timeout = 20  # type: float
        self.postScatterEval = (
            None
        )  # type: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]]
        self.on_error = "stop"  # type: str
        self.strict_memory_limit = False  # type: bool

        self.cidfile_dir = None  # type: Optional[str]
        self.cidfile_prefix = None  # type: Optional[str]

        self.workflow_eval_lock = None  # type: Optional[threading.Condition]
        self.research_obj = None  # type: Optional[ResearchObject]
        self.orcid = ""  # type: str
        self.cwl_full_name = ""  # type: str
        self.process_run_id = None  # type: Optional[str]
        self.prov_obj = None  # type: Optional[ProvenanceProfile]
        self.mpi_config = MpiConfig()  # type: MpiConfig
        self.default_stdout = None  # type: Optional[Union[IO[bytes], TextIO]]
        self.default_stderr = None  # type: Optional[Union[IO[bytes], TextIO]]
        super().__init__(kwargs)
        if self.tmp_outdir_prefix == "":
            self.tmp_outdir_prefix = self.tmpdir_prefix

    def get_outdir(self) -> str:
        """Return self.outdir or create one with self.tmp_outdir_prefix."""
        if self.outdir:
            return self.outdir
        return self.create_outdir()

    def get_tmpdir(self) -> str:
        """Return self.tmpdir or create one with self.tmpdir_prefix."""
        if self.tmpdir:
            return self.tmpdir
        return self.create_tmpdir()

    def get_stagedir(self) -> str:
        """Return self.stagedir or create one with self.tmpdir_prefix."""
        if self.stagedir:
            return self.stagedir
        tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix)
        return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)

    def create_tmpdir(self) -> str:
        """Create a temporary directory that respects self.tmpdir_prefix."""
        tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix)
        return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)

    def create_outdir(self) -> str:
        """Create a temporary directory that respects self.tmp_outdir_prefix."""
        out_dir, out_prefix = os.path.split(self.tmp_outdir_prefix)
        return tempfile.mkdtemp(prefix=out_prefix, dir=out_dir)

    def copy(self):
        # type: () -> RuntimeContext
        return copy.copy(self)


def getdefault(val, default):
    # type: (Any, Any) -> Any
    if val is None:
        return default
    else:
        return val