view env/lib/python3.9/site-packages/galaxy/tool_util/cwl/representation.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

""" This module is responsible for converting between Galaxy's tool
input description and the CWL description for a job json. """

import json
import logging
import os
from enum import Enum
from typing import Any, NamedTuple, Optional

from galaxy.exceptions import RequestParameterInvalidException
from galaxy.util import safe_makedirs, string_as_bool
from .util import set_basename_and_derived_properties


log = logging.getLogger(__name__)

NOT_PRESENT = object()

NO_GALAXY_INPUT = object()


class INPUT_TYPE(str, Enum):
    DATA = "data"
    INTEGER = "integer"
    FLOAT = "float"
    TEXT = "text"
    BOOLEAN = "boolean"
    SELECT = "select"
    FIELD = "field"
    CONDITIONAL = "conditional"
    DATA_COLLECTON = "data_collection"


# There are two approaches to mapping CWL tool state to Galaxy tool state
# one is to map CWL types to compound Galaxy tool parameters combinations
# with conditionals and the other is to use a new Galaxy parameter type that
# allows unions, optional specifications, etc.... The problem with the former
# is that it doesn't work with the workflow parameters for instance and is
# very complex on the backend. The problem with the latter is that the GUI
# for this parameter type is undefined curently.
USE_FIELD_TYPES = True

# There are two approaches to mapping CWL workflow inputs to Galaxy workflow
# steps. The first is to simply map everything to expressions and stick them into
# files and use data inputs - the second is to use parameter_input steps with
# fields types. We are dispatching on USE_FIELD_TYPES for now - to choose but
# may diverge later?
# There are open issues with each approach:
#  - Mapping everything to files makes the GUI harder to imagine but the backend
#     easier to manage in someways.
USE_STEP_PARAMETERS = USE_FIELD_TYPES


class TypeRepresentation(NamedTuple):
    name: str
    galaxy_param_type: Any
    label: str
    collection_type: Optional[str]

    @property
    def uses_param(self):
        return self.galaxy_param_type is not NO_GALAXY_INPUT


TYPE_REPRESENTATIONS = [
    TypeRepresentation("null", NO_GALAXY_INPUT, "no input", None),
    TypeRepresentation("integer", INPUT_TYPE.INTEGER, "an integer", None),
    TypeRepresentation("float", INPUT_TYPE.FLOAT, "a decimal number", None),
    TypeRepresentation("double", INPUT_TYPE.FLOAT, "a decimal number", None),
    TypeRepresentation("file", INPUT_TYPE.DATA, "a dataset", None),
    TypeRepresentation("directory", INPUT_TYPE.DATA, "a directory", None),
    TypeRepresentation("boolean", INPUT_TYPE.BOOLEAN, "a boolean", None),
    TypeRepresentation("text", INPUT_TYPE.TEXT, "a simple text field", None),
    TypeRepresentation("record", INPUT_TYPE.DATA_COLLECTON, "record as a dataset collection", "record"),
    TypeRepresentation("json", INPUT_TYPE.TEXT, "arbitrary JSON structure", None),
    TypeRepresentation("array", INPUT_TYPE.DATA_COLLECTON, "as a dataset list", "list"),
    TypeRepresentation("enum", INPUT_TYPE.TEXT, "enum value", None),  # TODO: make this a select...
    TypeRepresentation("field", INPUT_TYPE.FIELD, "arbitrary JSON structure", None),
]
FIELD_TYPE_REPRESENTATION = TYPE_REPRESENTATIONS[-1]

if not USE_FIELD_TYPES:
    CWL_TYPE_TO_REPRESENTATIONS = {
        "Any": ["integer", "float", "file", "boolean", "text", "record", "json"],
        "array": ["array"],
        "string": ["text"],
        "boolean": ["boolean"],
        "int": ["integer"],
        "float": ["float"],
        "File": ["file"],
        "Directory": ["directory"],
        "null": ["null"],
        "record": ["record"],
    }
else:
    CWL_TYPE_TO_REPRESENTATIONS = {
        "Any": ["field"],
        "array": ["array"],
        "string": ["text"],
        "boolean": ["boolean"],
        "int": ["integer"],
        "float": ["float"],
        "File": ["file"],
        "Directory": ["directory"],
        "null": ["null"],
        "record": ["record"],
        "enum": ["enum"],
        "double": ["double"],
    }


def type_representation_from_name(type_representation_name):
    for type_representation in TYPE_REPRESENTATIONS:
        if type_representation.name == type_representation_name:
            return type_representation

    assert False


def type_descriptions_for_field_types(field_types):
    type_representation_names = set()
    for field_type in field_types:
        if isinstance(field_type, dict) and field_type.get("type"):
            field_type = field_type.get("type")

        try:
            type_representation_names_for_field_type = CWL_TYPE_TO_REPRESENTATIONS.get(field_type)
        except TypeError:
            raise Exception("Failed to convert field_type %s" % field_type)
        if type_representation_names_for_field_type is None:
            raise Exception("Failed to convert type %s" % field_type)
        type_representation_names.update(type_representation_names_for_field_type)
    type_representations = []
    for type_representation in TYPE_REPRESENTATIONS:
        if type_representation.name in type_representation_names:
            type_representations.append(type_representation)
    return type_representations


def dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper):
    if dataset_wrapper.ext == "expression.json":
        with open(dataset_wrapper.file_name) as f:
            return json.load(f)

    if dataset_wrapper.ext == "directory":
        return dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper)

    extra_files_path = dataset_wrapper.extra_files_path
    secondary_files_path = os.path.join(extra_files_path, "__secondary_files__")
    path = str(dataset_wrapper)
    raw_file_object = {"class": "File"}

    if os.path.exists(secondary_files_path):
        safe_makedirs(inputs_dir)
        name = os.path.basename(path)
        new_input_path = os.path.join(inputs_dir, name)
        os.symlink(path, new_input_path)
        secondary_files = []
        for secondary_file_name in os.listdir(secondary_files_path):
            secondary_file_path = os.path.join(secondary_files_path, secondary_file_name)
            target = os.path.join(inputs_dir, secondary_file_name)
            log.info(f"linking [{secondary_file_path}] to [{target}]")
            os.symlink(secondary_file_path, target)
            is_dir = os.path.isdir(os.path.realpath(secondary_file_path))
            secondary_files.append({"class": "File" if not is_dir else "Directory", "location": target})

        raw_file_object["secondaryFiles"] = secondary_files
        path = new_input_path

    raw_file_object["location"] = path

    # Verify it isn't a NoneDataset
    if dataset_wrapper.unsanitized:
        raw_file_object["size"] = int(dataset_wrapper.get_size())

    set_basename_and_derived_properties(raw_file_object, str(dataset_wrapper.created_from_basename or dataset_wrapper.name))
    return raw_file_object


def dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper):
    assert dataset_wrapper.ext == "directory"

    # get directory name
    archive_name = str(dataset_wrapper.created_from_basename or dataset_wrapper.name)
    nameroot, nameext = os.path.splitext(archive_name)
    directory_name = nameroot  # assume archive file name contains the directory name

    # get archive location
    try:
        archive_location = dataset_wrapper.unsanitized.file_name
    except Exception:
        archive_location = None

    directory_json = {"location": dataset_wrapper.extra_files_path,
                      "class": "Directory",
                      "name": directory_name,
                      "archive_location": archive_location,
                      "archive_nameext": nameext,
                      "archive_nameroot": nameroot}

    return directory_json


def collection_wrapper_to_array(inputs_dir, wrapped_value):
    rval = []
    for value in wrapped_value:
        rval.append(dataset_wrapper_to_file_json(inputs_dir, value))
    return rval


def collection_wrapper_to_record(inputs_dir, wrapped_value):
    rval = {}
    for key, value in wrapped_value.items():
        rval[key] = dataset_wrapper_to_file_json(inputs_dir, value)
    return rval


def to_cwl_job(tool, param_dict, local_working_directory):
    """ tool is Galaxy's representation of the tool and param_dict is the
    parameter dictionary with wrapped values.
    """
    tool_proxy = tool._cwl_tool_proxy
    input_fields = tool_proxy.input_fields()
    inputs = tool.inputs
    input_json = {}

    inputs_dir = os.path.join(local_working_directory, "_inputs")

    def simple_value(input, param_dict_value, type_representation_name=None):
        type_representation = type_representation_from_name(type_representation_name)
        # Hmm... cwl_type isn't really the cwl type in every case,
        # like in the case of json for instance.

        if type_representation.galaxy_param_type == NO_GALAXY_INPUT:
            assert param_dict_value is None
            return None

        if type_representation.name == "file":
            dataset_wrapper = param_dict_value
            return dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper)
        elif type_representation.name == "directory":
            dataset_wrapper = param_dict_value
            return dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper)
        elif type_representation.name == "integer":
            return int(str(param_dict_value))
        elif type_representation.name == "long":
            return int(str(param_dict_value))
        elif type_representation.name in ["float", "double"]:
            return float(str(param_dict_value))
        elif type_representation.name == "boolean":
            return string_as_bool(param_dict_value)
        elif type_representation.name == "text":
            return str(param_dict_value)
        elif type_representation.name == "enum":
            return str(param_dict_value)
        elif type_representation.name == "json":
            raw_value = param_dict_value.value
            return json.loads(raw_value)
        elif type_representation.name == "field":
            if param_dict_value is None:
                return None
            if hasattr(param_dict_value, "value"):
                # Is InputValueWrapper
                rval = param_dict_value.value
                if isinstance(rval, dict) and "src" in rval and rval["src"] == "json":
                    # needed for wf_step_connect_undeclared_param, so non-file defaults?
                    return rval["value"]
                return rval
            elif not param_dict_value.is_collection:
                # Is DatasetFilenameWrapper
                return dataset_wrapper_to_file_json(inputs_dir, param_dict_value)
            else:
                # Is DatasetCollectionWrapper
                hdca_wrapper = param_dict_value
                if hdca_wrapper.collection_type == "list":
                    # TODO: generalize to lists of lists and lists of non-files...
                    return collection_wrapper_to_array(inputs_dir, hdca_wrapper)
                elif hdca_wrapper.collection_type.collection_type == "record":
                    return collection_wrapper_to_record(inputs_dir, hdca_wrapper)

        elif type_representation.name == "array":
            # TODO: generalize to lists of lists and lists of non-files...
            return collection_wrapper_to_array(inputs_dir, param_dict_value)
        elif type_representation.name == "record":
            return collection_wrapper_to_record(inputs_dir, param_dict_value)
        else:
            return str(param_dict_value)

    for input_name, input in inputs.items():
        if input.type == "repeat":
            only_input = next(iter(input.inputs.values()))
            array_value = []
            for instance in param_dict[input_name]:
                array_value.append(simple_value(only_input, instance[input_name[:-len("_repeat")]]))
            input_json[input_name[:-len("_repeat")]] = array_value
        elif input.type == "conditional":
            assert input_name in param_dict, f"No value for {input_name} in {param_dict}"
            current_case = param_dict[input_name]["_cwl__type_"]
            if str(current_case) != "null":  # str because it is a wrapped...
                case_index = input.get_current_case(current_case)
                case_input = input.cases[case_index].inputs["_cwl__value_"]
                case_value = param_dict[input_name]["_cwl__value_"]
                input_json[input_name] = simple_value(case_input, case_value, current_case)
        else:
            matched_field = None
            for field in input_fields:
                if field["name"] == input_name:
                    matched_field = field
            field_type = field_to_field_type(matched_field)
            if isinstance(field_type, list):
                assert USE_FIELD_TYPES
                type_descriptions = [FIELD_TYPE_REPRESENTATION]
            else:
                type_descriptions = type_descriptions_for_field_types([field_type])
            assert len(type_descriptions) == 1
            type_description_name = type_descriptions[0].name
            input_json[input_name] = simple_value(input, param_dict[input_name], type_description_name)

    log.debug("Galaxy Tool State is CWL State is %s" % input_json)
    return input_json


def to_galaxy_parameters(tool, as_dict):
    """ Tool is Galaxy's representation of the tool and as_dict is a Galaxified
    representation of the input json (no paths, HDA references for instance).
    """
    inputs = tool.inputs
    galaxy_request = {}

    def from_simple_value(input, param_dict_value, type_representation_name=None):
        if type_representation_name == "json":
            return json.dumps(param_dict_value)
        else:
            return param_dict_value

    for input_name, input in inputs.items():
        as_dict_value = as_dict.get(input_name, NOT_PRESENT)
        galaxy_input_type = input.type

        if galaxy_input_type == "repeat":
            if input_name not in as_dict:
                continue

            only_input = next(iter(input.inputs.values()))
            for index, value in enumerate(as_dict_value):
                key = f"{input_name}_repeat_0|{only_input.name}"
                galaxy_value = from_simple_value(only_input, value)
                galaxy_request[key] = galaxy_value
        elif galaxy_input_type == "conditional":
            case_strings = input.case_strings
            # TODO: less crazy handling of defaults...
            if (as_dict_value is NOT_PRESENT or as_dict_value is None) and "null" in case_strings:
                type_representation_name = "null"
            elif (as_dict_value is NOT_PRESENT or as_dict_value is None):
                raise RequestParameterInvalidException(
                    "Cannot translate CWL datatype - value [{}] of type [{}] with case_strings [{}]. Non-null property must be set.".format(
                        as_dict_value, type(as_dict_value), case_strings
                    )
                )
            elif isinstance(as_dict_value, bool) and "boolean" in case_strings:
                type_representation_name = "boolean"
            elif isinstance(as_dict_value, int) and "integer" in case_strings:
                type_representation_name = "integer"
            elif isinstance(as_dict_value, int) and "long" in case_strings:
                type_representation_name = "long"
            elif isinstance(as_dict_value, (int, float)) and "float" in case_strings:
                type_representation_name = "float"
            elif isinstance(as_dict_value, (int, float)) and "double" in case_strings:
                type_representation_name = "double"
            elif isinstance(as_dict_value, str) and "string" in case_strings:
                type_representation_name = "string"
            elif isinstance(as_dict_value, dict) and "src" in as_dict_value and "id" in as_dict_value and "file" in case_strings:
                type_representation_name = "file"
            elif isinstance(as_dict_value, dict) and "src" in as_dict_value and "id" in as_dict_value and "directory" in case_strings:
                # TODO: can't disambiuate with above if both are available...
                type_representation_name = "directory"
            elif "field" in case_strings:
                type_representation_name = "field"
            elif "json" in case_strings and as_dict_value is not None:
                type_representation_name = "json"
            else:
                raise RequestParameterInvalidException(
                    "Cannot translate CWL datatype - value [{}] of type [{}] with case_strings [{}].".format(
                        as_dict_value, type(as_dict_value), case_strings
                    )
                )
            galaxy_request["%s|_cwl__type_" % input_name] = type_representation_name
            if type_representation_name != "null":
                current_case_index = input.get_current_case(type_representation_name)
                current_case_inputs = input.cases[current_case_index].inputs
                current_case_input = current_case_inputs["_cwl__value_"]
                galaxy_value = from_simple_value(current_case_input, as_dict_value, type_representation_name)
                galaxy_request["%s|_cwl__value_" % input_name] = galaxy_value
        elif as_dict_value is NOT_PRESENT:
            continue
        else:
            galaxy_value = from_simple_value(input, as_dict_value)
            galaxy_request[input_name] = galaxy_value

    log.info("Converted galaxy_request is %s" % galaxy_request)
    return galaxy_request


def field_to_field_type(field):
    field_type = field["type"]
    if isinstance(field_type, dict):
        field_type = field_type["type"]
    if isinstance(field_type, list):
        field_type_length = len(field_type)
        if field_type_length == 0:
            raise Exception("Zero-length type list encountered, invalid CWL?")
        elif len(field_type) == 1:
            field_type = field_type[0]

    return field_type