diff env/lib/python3.9/site-packages/galaxy/tool_util/cwl/util.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/galaxy/tool_util/cwl/util.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,581 @@
+"""Client-centric CWL-related utilities.
+
+Used to share code between the Galaxy test framework
+and other Galaxy CWL clients (e.g. Planemo)."""
+import hashlib
+import io
+import json
+import os
+import tarfile
+import tempfile
+from collections import namedtuple
+
+import yaml
+
+from galaxy.util import unicodify
+
+STORE_SECONDARY_FILES_WITH_BASENAME = True
+SECONDARY_FILES_EXTRA_PREFIX = "__secondary_files__"
+SECONDARY_FILES_INDEX_PATH = "__secondary_files_index.json"
+
+
+def set_basename_and_derived_properties(properties, basename):
+    properties["basename"] = basename
+    properties["nameroot"], properties["nameext"] = os.path.splitext(basename)
+    return properties
+
+
+def output_properties(path=None, content=None, basename=None, pseduo_location=False):
+    checksum = hashlib.sha1()
+    properties = {
+        "class": "File",
+    }
+    if path is not None:
+        properties["path"] = path
+        f = open(path, "rb")
+    else:
+        f = io.BytesIO(content)
+
+    try:
+        contents = f.read(1024 * 1024)
+        filesize = 0
+        while contents:
+            checksum.update(contents)
+            filesize += len(contents)
+            contents = f.read(1024 * 1024)
+    finally:
+        f.close()
+    properties["checksum"] = "sha1$%s" % checksum.hexdigest()
+    properties["size"] = filesize
+    set_basename_and_derived_properties(properties, basename)
+    _handle_pseudo_location(properties, pseduo_location)
+    return properties
+
+
+def _handle_pseudo_location(properties, pseduo_location):
+    if pseduo_location:
+        properties["location"] = properties["basename"]
+
+
+def abs_path_or_uri(path_or_uri, relative_to):
+    """Return an absolute path if this isn't a URI, otherwise keep the URI the same.
+    """
+    is_uri = "://" in path_or_uri
+    if not is_uri and not os.path.isabs(path_or_uri):
+        path_or_uri = os.path.join(relative_to, path_or_uri)
+    if not is_uri:
+        _ensure_file_exists(path_or_uri)
+    return path_or_uri
+
+
+def abs_path(path_or_uri, relative_to):
+    path_or_uri = abs_path_or_uri(path_or_uri, relative_to)
+    if path_or_uri.startswith("file://"):
+        path_or_uri = path_or_uri[len("file://"):]
+
+    return path_or_uri
+
+
+def path_or_uri_to_uri(path_or_uri):
+    if "://" not in path_or_uri:
+        return "file://%s" % path_or_uri
+    else:
+        return path_or_uri
+
+
+def galactic_job_json(
+    job, test_data_directory, upload_func, collection_create_func, tool_or_workflow="workflow"
+):
+    """Adapt a CWL job object to the Galaxy API.
+
+    CWL derived tools in Galaxy can consume a job description sort of like
+    CWL job objects via the API but paths need to be replaced with datasets
+    and records and arrays with collection references. This function will
+    stage files and modify the job description to adapt to these changes
+    for Galaxy.
+    """
+
+    datasets = []
+    dataset_collections = []
+
+    def response_to_hda(target, upload_response):
+        assert isinstance(upload_response, dict), upload_response
+        assert "outputs" in upload_response, upload_response
+        assert len(upload_response["outputs"]) > 0, upload_response
+        dataset = upload_response["outputs"][0]
+        datasets.append((dataset, target))
+        dataset_id = dataset["id"]
+        return {"src": "hda", "id": dataset_id}
+
+    def upload_file(file_path, secondary_files, **kwargs):
+        file_path = abs_path_or_uri(file_path, test_data_directory)
+        target = FileUploadTarget(file_path, secondary_files, **kwargs)
+        upload_response = upload_func(target)
+        return response_to_hda(target, upload_response)
+
+    def upload_file_literal(contents, **kwd):
+        target = FileLiteralTarget(contents, **kwd)
+        upload_response = upload_func(target)
+        return response_to_hda(target, upload_response)
+
+    def upload_tar(file_path):
+        file_path = abs_path_or_uri(file_path, test_data_directory)
+        target = DirectoryUploadTarget(file_path)
+        upload_response = upload_func(target)
+        return response_to_hda(target, upload_response)
+
+    def upload_file_with_composite_data(file_path, composite_data, **kwargs):
+        if file_path is not None:
+            file_path = abs_path_or_uri(file_path, test_data_directory)
+        composite_data_resolved = []
+        for cd in composite_data:
+            composite_data_resolved.append(abs_path_or_uri(cd, test_data_directory))
+        target = FileUploadTarget(file_path, composite_data=composite_data_resolved, **kwargs)
+        upload_response = upload_func(target)
+        return response_to_hda(target, upload_response)
+
+    def upload_object(the_object):
+        target = ObjectUploadTarget(the_object)
+        upload_response = upload_func(target)
+        return response_to_hda(target, upload_response)
+
+    def replacement_item(value, force_to_file=False):
+        is_dict = isinstance(value, dict)
+        item_class = None if not is_dict else value.get("class", None)
+        is_file = item_class == "File"
+        is_directory = item_class == "Directory"
+        is_collection = item_class == "Collection"  # Galaxy extension.
+
+        if force_to_file:
+            if is_file:
+                return replacement_file(value)
+            else:
+                return upload_object(value)
+
+        if isinstance(value, list):
+            return replacement_list(value)
+        elif not isinstance(value, dict):
+            if tool_or_workflow == "workflow":
+                # All inputs represented as dataset or collection parameters
+                return upload_object(value)
+            else:
+                return value
+
+        if is_file:
+            return replacement_file(value)
+        elif is_directory:
+            return replacement_directory(value)
+        elif is_collection:
+            return replacement_collection(value)
+        else:
+            return replacement_record(value)
+
+    def replacement_file(value):
+        if value.get('galaxy_id'):
+            return {"src": "hda", "id": value['galaxy_id']}
+        file_path = value.get("location", None) or value.get("path", None)
+        # format to match output definitions in tool, where did filetype come from?
+        filetype = value.get("filetype", None) or value.get("format", None)
+        composite_data_raw = value.get("composite_data", None)
+        kwd = {}
+        if "tags" in value:
+            kwd["tags"] = value.get("tags")
+        if "dbkey" in value:
+            kwd["dbkey"] = value.get("dbkey")
+        if composite_data_raw:
+            composite_data = []
+            for entry in composite_data_raw:
+                path = None
+                if isinstance(entry, dict):
+                    path = entry.get("location", None) or entry.get("path", None)
+                else:
+                    path = entry
+                composite_data.append(path)
+            rval_c = upload_file_with_composite_data(None, composite_data, filetype=filetype, **kwd)
+            return rval_c
+
+        if file_path is None:
+            contents = value.get("contents", None)
+            if contents is not None:
+                return upload_file_literal(contents, **kwd)
+
+            return value
+
+        secondary_files = value.get("secondaryFiles", [])
+        secondary_files_tar_path = None
+        if secondary_files:
+            tmp = tempfile.NamedTemporaryFile(delete=False)
+            tf = tarfile.open(fileobj=tmp, mode='w:')
+            order = []
+            index_contents = {
+                "order": order
+            }
+            for secondary_file in secondary_files:
+                secondary_file_path = secondary_file.get("location", None) or secondary_file.get("path", None)
+                assert secondary_file_path, "Invalid secondaryFile entry found [%s]" % secondary_file
+                full_secondary_file_path = os.path.join(test_data_directory, secondary_file_path)
+                basename = secondary_file.get("basename") or os.path.basename(secondary_file_path)
+                order.append(unicodify(basename))
+                tf.add(full_secondary_file_path, os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename))
+            tmp_index = tempfile.NamedTemporaryFile(delete=False, mode="w")
+            json.dump(index_contents, tmp_index)
+            tmp_index.close()
+            tf.add(tmp_index.name, SECONDARY_FILES_INDEX_PATH)
+            tf.close()
+            secondary_files_tar_path = tmp.name
+
+        return upload_file(file_path, secondary_files_tar_path, filetype=filetype, **kwd)
+
+    def replacement_directory(value):
+        file_path = value.get("location", None) or value.get("path", None)
+        if file_path is None:
+            return value
+
+        if not os.path.isabs(file_path):
+            file_path = os.path.join(test_data_directory, file_path)
+
+        tmp = tempfile.NamedTemporaryFile(delete=False)
+        tf = tarfile.open(fileobj=tmp, mode='w:')
+        tf.add(file_path, '.')
+        tf.close()
+
+        return upload_tar(tmp.name)
+
+    def replacement_list(value):
+        collection_element_identifiers = []
+        for i, item in enumerate(value):
+            dataset = replacement_item(item, force_to_file=True)
+            collection_element = dataset.copy()
+            collection_element["name"] = str(i)
+            collection_element_identifiers.append(collection_element)
+
+        # TODO: handle nested lists/arrays
+        collection = collection_create_func(collection_element_identifiers, "list")
+        dataset_collections.append(collection)
+        hdca_id = collection["id"]
+        return {"src": "hdca", "id": hdca_id}
+
+    def to_elements(value, rank_collection_type):
+        collection_element_identifiers = []
+        assert "elements" in value
+        elements = value["elements"]
+
+        is_nested_collection = ":" in rank_collection_type
+        for element in elements:
+            if not is_nested_collection:
+                # flat collection
+                dataset = replacement_item(element, force_to_file=True)
+                collection_element = dataset.copy()
+                collection_element["name"] = element["identifier"]
+                collection_element_identifiers.append(collection_element)
+            else:
+                # nested collection
+                sub_collection_type = rank_collection_type[rank_collection_type.find(":") + 1:]
+                collection_element = {
+                    "name": element["identifier"],
+                    "src": "new_collection",
+                    "collection_type": sub_collection_type,
+                    "element_identifiers": to_elements(element, sub_collection_type)
+                }
+                collection_element_identifiers.append(collection_element)
+
+        return collection_element_identifiers
+
+    def replacement_collection(value):
+        if value.get('galaxy_id'):
+            return {"src": "hdca", "id": value['galaxy_id']}
+        assert "collection_type" in value
+        collection_type = value["collection_type"]
+        elements = to_elements(value, collection_type)
+
+        collection = collection_create_func(elements, collection_type)
+        dataset_collections.append(collection)
+        hdca_id = collection["id"]
+        return {"src": "hdca", "id": hdca_id}
+
+    def replacement_record(value):
+        collection_element_identifiers = []
+        for record_key, record_value in value.items():
+            if not isinstance(record_value, dict) or record_value.get("class") != "File":
+                dataset = replacement_item(record_value, force_to_file=True)
+                collection_element = dataset.copy()
+            else:
+                dataset = upload_file(record_value["location"], [])
+                collection_element = dataset.copy()
+
+            collection_element["name"] = record_key
+            collection_element_identifiers.append(collection_element)
+
+        collection = collection_create_func(collection_element_identifiers, "record")
+        dataset_collections.append(collection)
+        hdca_id = collection["id"]
+        return {"src": "hdca", "id": hdca_id}
+
+    replace_keys = {}
+    for key, value in job.items():
+        replace_keys[key] = replacement_item(value)
+
+    job.update(replace_keys)
+    return job, datasets
+
+
+def _ensure_file_exists(file_path):
+    if not os.path.exists(file_path):
+        template = "File [%s] does not exist - parent directory [%s] does %sexist, cwd is [%s]"
+        parent_directory = os.path.dirname(file_path)
+        message = template % (
+            file_path,
+            parent_directory,
+            "" if os.path.exists(parent_directory) else "not ",
+            os.getcwd(),
+        )
+        raise Exception(message)
+
+
+class FileLiteralTarget:
+
+    def __init__(self, contents, path=None, **kwargs):
+        self.contents = contents
+        self.properties = kwargs
+        self.path = path
+
+    def __str__(self):
+        return f"FileLiteralTarget[contents={self.contents}] with {self.properties}"
+
+
+class FileUploadTarget:
+
+    def __init__(self, path, secondary_files=None, **kwargs):
+        self.path = path
+        self.secondary_files = secondary_files
+        self.composite_data = kwargs.get("composite_data", [])
+        self.properties = kwargs
+
+    def __str__(self):
+        return f"FileUploadTarget[path={self.path}] with {self.properties}"
+
+
+class ObjectUploadTarget:
+
+    def __init__(self, the_object):
+        self.object = the_object
+        self.properties = {}
+
+    def __str__(self):
+        return f"ObjectUploadTarget[object={self.object} with {self.properties}]"
+
+
+class DirectoryUploadTarget:
+
+    def __init__(self, tar_path):
+        self.tar_path = tar_path
+
+    def __str__(self):
+        return "DirectoryUploadTarget[tar_path=%s]" % self.tar_path
+
+
+GalaxyOutput = namedtuple("GalaxyOutput", ["history_id", "history_content_type", "history_content_id", "metadata"])
+
+
+def tool_response_to_output(tool_response, history_id, output_id):
+    for output in tool_response["outputs"]:
+        if output["output_name"] == output_id:
+            return GalaxyOutput(history_id, "dataset", output["id"], None)
+
+    for output_collection in tool_response["output_collections"]:
+        if output_collection["output_name"] == output_id:
+            return GalaxyOutput(history_id, "dataset_collection", output_collection["id"], None)
+
+    raise Exception("Failed to find output with label [%s]" % output_id)
+
+
+def invocation_to_output(invocation, history_id, output_id):
+    if output_id in invocation["outputs"]:
+        dataset = invocation["outputs"][output_id]
+        galaxy_output = GalaxyOutput(history_id, "dataset", dataset["id"], None)
+    elif output_id in invocation["output_collections"]:
+        collection = invocation["output_collections"][output_id]
+        galaxy_output = GalaxyOutput(history_id, "dataset_collection", collection["id"], None)
+    elif output_id in invocation["output_values"]:
+        output_value = invocation["output_values"][output_id]
+        galaxy_output = GalaxyOutput(None, "raw_value", output_value, None)
+    else:
+        raise Exception(f"Failed to find output with label [{output_id}] in [{invocation}]")
+
+    return galaxy_output
+
+
+def output_to_cwl_json(
+    galaxy_output, get_metadata, get_dataset, get_extra_files, pseduo_location=False,
+):
+    """Convert objects in a Galaxy history into a CWL object.
+
+    Useful in running conformance tests and implementing the cwl-runner
+    interface via Galaxy.
+    """
+    def element_to_cwl_json(element):
+        object = element["object"]
+        content_type = object.get("history_content_type")
+        metadata = None
+        if content_type is None:
+            content_type = "dataset_collection"
+            metadata = element["object"]
+            metadata["history_content_type"] = content_type
+        element_output = GalaxyOutput(
+            galaxy_output.history_id,
+            content_type,
+            object["id"],
+            metadata,
+        )
+        return output_to_cwl_json(element_output, get_metadata, get_dataset, get_extra_files, pseduo_location=pseduo_location)
+
+    output_metadata = galaxy_output.metadata
+    if output_metadata is None:
+        output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id)
+
+    def dataset_dict_to_json_content(dataset_dict):
+        if "content" in dataset_dict:
+            return json.loads(dataset_dict["content"])
+        else:
+            with open(dataset_dict["path"]) as f:
+                return json.safe_load(f)
+
+    if galaxy_output.history_content_type == "raw_value":
+        return galaxy_output.history_content_id
+    elif output_metadata["history_content_type"] == "dataset":
+        ext = output_metadata["file_ext"]
+        assert output_metadata["state"] == "ok"
+        if ext == "expression.json":
+            dataset_dict = get_dataset(output_metadata)
+            return dataset_dict_to_json_content(dataset_dict)
+        else:
+            file_or_directory = "Directory" if ext == "directory" else "File"
+            secondary_files = []
+
+            if file_or_directory == "File":
+                dataset_dict = get_dataset(output_metadata)
+                properties = output_properties(pseduo_location=pseduo_location, **dataset_dict)
+                basename = properties["basename"]
+                extra_files = get_extra_files(output_metadata)
+                found_index = False
+                for extra_file in extra_files:
+                    if extra_file["class"] == "File":
+                        path = extra_file["path"]
+                        if path == SECONDARY_FILES_INDEX_PATH:
+                            found_index = True
+
+                if found_index:
+                    ec = get_dataset(output_metadata, filename=SECONDARY_FILES_INDEX_PATH)
+                    index = dataset_dict_to_json_content(ec)
+
+                    def dir_listing(dir_path):
+                        listing = []
+                        for extra_file in extra_files:
+                            path = extra_file["path"]
+                            extra_file_class = extra_file["class"]
+                            extra_file_basename = os.path.basename(path)
+                            if os.path.join(dir_path, extra_file_basename) != path:
+                                continue
+
+                            if extra_file_class == "File":
+                                ec = get_dataset(output_metadata, filename=path)
+                                ec["basename"] = extra_file_basename
+                                ec_properties = output_properties(pseduo_location=pseduo_location, **ec)
+                            elif extra_file_class == "Directory":
+                                ec_properties = {}
+                                ec_properties["class"] = "Directory"
+                                ec_properties["location"] = ec_basename
+                                ec_properties["listing"] = dir_listing(path)
+                            else:
+                                raise Exception("Unknown output type encountered....")
+                            listing.append(ec_properties)
+                        return listing
+
+                    for basename in index["order"]:
+                        for extra_file in extra_files:
+                            path = extra_file["path"]
+                            if path != os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename):
+                                continue
+
+                            extra_file_class = extra_file["class"]
+
+                            # This is wrong...
+                            if not STORE_SECONDARY_FILES_WITH_BASENAME:
+                                ec_basename = basename + os.path.basename(path)
+                            else:
+                                ec_basename = os.path.basename(path)
+
+                            if extra_file_class == "File":
+                                ec = get_dataset(output_metadata, filename=path)
+                                ec["basename"] = ec_basename
+                                ec_properties = output_properties(pseduo_location=pseduo_location, **ec)
+                            elif extra_file_class == "Directory":
+                                ec_properties = {}
+                                ec_properties["class"] = "Directory"
+                                ec_properties["location"] = ec_basename
+                                ec_properties["listing"] = dir_listing(path)
+                            else:
+                                raise Exception("Unknown output type encountered....")
+                            secondary_files.append(ec_properties)
+
+            else:
+                basename = output_metadata.get("created_from_basename")
+                if not basename:
+                    basename = output_metadata.get("name")
+
+                listing = []
+                properties = {
+                    "class": "Directory",
+                    "basename": basename,
+                    "listing": listing,
+                }
+
+                extra_files = get_extra_files(output_metadata)
+                for extra_file in extra_files:
+                    if extra_file["class"] == "File":
+                        path = extra_file["path"]
+                        ec = get_dataset(output_metadata, filename=path)
+                        ec["basename"] = os.path.basename(path)
+                        ec_properties = output_properties(pseduo_location=pseduo_location, **ec)
+                        listing.append(ec_properties)
+
+            if secondary_files:
+                properties["secondaryFiles"] = secondary_files
+            return properties
+
+    elif output_metadata["history_content_type"] == "dataset_collection":
+        rval = None
+        collection_type = output_metadata["collection_type"].split(":", 1)[0]
+        if collection_type in ["list", "paired"]:
+            rval = []
+            for element in output_metadata["elements"]:
+                rval.append(element_to_cwl_json(element))
+        elif collection_type == "record":
+            rval = {}
+            for element in output_metadata["elements"]:
+                rval[element["element_identifier"]] = element_to_cwl_json(element)
+        return rval
+    else:
+        raise NotImplementedError("Unknown history content type encountered")
+
+
+def download_output(galaxy_output, get_metadata, get_dataset, get_extra_files, output_path):
+    output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id)
+    dataset_dict = get_dataset(output_metadata)
+    with open(output_path, 'wb') as fh:
+        fh.write(dataset_dict['content'])
+
+
+def guess_artifact_type(path):
+    # TODO: Handle IDs within files.
+    tool_or_workflow = "workflow"
+    try:
+        with open(path) as f:
+            artifact = yaml.safe_load(f)
+
+        tool_or_workflow = "tool" if artifact["class"] != "Workflow" else "workflow"
+
+    except Exception as e:
+        print(e)
+
+    return tool_or_workflow