view env/lib/python3.9/site-packages/cwltool/mpi.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""Experimental support for MPI."""
import inspect
import os
import re
from typing import List, Mapping, MutableMapping, Optional, Type, TypeVar, Union

from ruamel import yaml

MpiConfigT = TypeVar("MpiConfigT", bound="MpiConfig")

MPIRequirementName = "http://commonwl.org/cwltool#MPIRequirement"


class MpiConfig:
    def __init__(
        self,
        runner: str = "mpirun",
        nproc_flag: str = "-n",
        default_nproc: Union[int, str] = 1,
        extra_flags: Optional[List[str]] = None,
        env_pass: Optional[List[str]] = None,
        env_pass_regex: Optional[List[str]] = None,
        env_set: Optional[Mapping[str, str]] = None,
    ) -> None:
        """
        Initialize from the argument mapping.

        Defaults are:
        runner: "mpirun"
        nproc_flag: "-n"
        default_nproc: 1
        extra_flags: []
        env_pass: []
        env_pass_regex: []
        env_set: {}

        Any unknown keys will result in an exception.
        """
        self.runner = runner
        self.nproc_flag = nproc_flag
        self.default_nproc = int(default_nproc)
        self.extra_flags = extra_flags or []
        self.env_pass = env_pass or []
        self.env_pass_regex = env_pass_regex or []
        self.env_set = env_set or {}

    @classmethod
    def load(cls: Type[MpiConfigT], config_file_name: str) -> MpiConfigT:
        """Create the MpiConfig object from the contents of a YAML file.

        The file must contain exactly one object, whose attributes must
        be in the list allowed in the class initialiser (all are
        optional).
        """
        with open(config_file_name) as cf:
            data = yaml.main.round_trip_load(cf)
        try:
            return cls(**data)
        except TypeError as e:
            unknown = set(data.keys()) - set(inspect.signature(cls).parameters)
            raise ValueError(f"Unknown key(s) in MPI configuration: {unknown}")

    def pass_through_env_vars(self, env: MutableMapping[str, str]) -> None:
        """Take the configured list of environment variables and pass them to the executed process."""
        for var in self.env_pass:
            if var in os.environ:
                env[var] = os.environ[var]

        for var_re in self.env_pass_regex:
            r = re.compile(var_re)
            for k in os.environ:
                if r.match(k):
                    env[k] = os.environ[k]

    def set_env_vars(self, env: MutableMapping[str, str]) -> None:
        """Set some variables to the value configured."""
        env.update(self.env_set)