diff env/lib/python3.9/site-packages/galaxy/tool_util/cwl/parser.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/galaxy/tool_util/cwl/parser.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,1308 @@
+""" This module provides proxy objects around objects from the common
+workflow language reference implementation library cwltool. These proxies
+adapt cwltool to Galaxy features and abstract the library away from the rest
+of the framework.
+"""
+
+import base64
+import copy
+import json
+import logging
+import os
+import pickle
+from abc import ABCMeta, abstractmethod
+from uuid import uuid4
+
+
+from galaxy.exceptions import MessageException
+from galaxy.util import (
+    listify,
+    safe_makedirs,
+    unicodify,
+)
+from galaxy.util.bunch import Bunch
+from .cwltool_deps import (
+    beta_relaxed_fmt_check,
+    ensure_cwltool_available,
+    getdefault,
+    pathmapper,
+    process,
+    ref_resolver,
+    relink_initialworkdir,
+    RuntimeContext,
+    sourceline,
+    StdFsAccess,
+)
+from .representation import (
+    field_to_field_type,
+    FIELD_TYPE_REPRESENTATION,
+    INPUT_TYPE,
+    type_descriptions_for_field_types,
+    USE_FIELD_TYPES,
+    USE_STEP_PARAMETERS,
+)
+from .schema import (
+    non_strict_non_validating_schema_loader,
+    schema_loader,
+)
+from .util import SECONDARY_FILES_EXTRA_PREFIX
+
+log = logging.getLogger(__name__)
+
+JOB_JSON_FILE = ".cwl_job.json"
+
+DOCKER_REQUIREMENT = "DockerRequirement"
+SUPPORTED_TOOL_REQUIREMENTS = [
+    "CreateFileRequirement",
+    "DockerRequirement",
+    "EnvVarRequirement",
+    "InitialWorkDirRequirement",
+    "InlineJavascriptRequirement",
+    "ResourceRequirement",
+    "ShellCommandRequirement",
+    "ScatterFeatureRequirement",
+    "SchemaDefRequirement",
+    "SubworkflowFeatureRequirement",
+    "StepInputExpressionRequirement",
+    "MultipleInputFeatureRequirement",
+]
+
+
+SUPPORTED_WORKFLOW_REQUIREMENTS = SUPPORTED_TOOL_REQUIREMENTS + [
+]
+
+PERSISTED_REPRESENTATION = "cwl_tool_object"
+
+
+def tool_proxy(tool_path=None, tool_object=None, strict_cwl_validation=True, tool_directory=None, uuid=None):
+    """ Provide a proxy object to cwltool data structures to just
+    grab relevant data.
+    """
+    ensure_cwltool_available()
+    tool = _to_cwl_tool_object(
+        tool_path=tool_path,
+        tool_object=tool_object,
+        strict_cwl_validation=strict_cwl_validation,
+        tool_directory=tool_directory,
+        uuid=uuid
+    )
+    return tool
+
+
+def tool_proxy_from_persistent_representation(persisted_tool, strict_cwl_validation=True, tool_directory=None):
+    """Load a ToolProxy from a previously persisted representation."""
+    ensure_cwltool_available()
+    if PERSISTED_REPRESENTATION == "cwl_tool_object":
+        kwds = {"cwl_tool_object": ToolProxy.from_persistent_representation(persisted_tool)}
+    else:
+        raw_process_reference = persisted_tool  # ???
+        kwds = {"raw_process_reference": ToolProxy.from_persistent_representation(raw_process_reference)}
+    uuid = persisted_tool["uuid"]
+    tool = _to_cwl_tool_object(uuid=uuid, strict_cwl_validation=strict_cwl_validation, tool_directory=tool_directory, **kwds)
+    return tool
+
+
+def workflow_proxy(workflow_path, strict_cwl_validation=True):
+    ensure_cwltool_available()
+    workflow = _to_cwl_workflow_object(workflow_path, strict_cwl_validation=strict_cwl_validation)
+    return workflow
+
+
+def load_job_proxy(job_directory, strict_cwl_validation=True):
+    ensure_cwltool_available()
+    job_objects_path = os.path.join(job_directory, JOB_JSON_FILE)
+    job_objects = json.load(open(job_objects_path))
+    job_inputs = job_objects["job_inputs"]
+    output_dict = job_objects["output_dict"]
+    # Any reason to retain older tool_path variant of this? Probably not?
+    if "tool_path" in job_objects:
+        tool_path = job_objects["tool_path"]
+        cwl_tool = tool_proxy(tool_path, strict_cwl_validation=strict_cwl_validation)
+    else:
+        persisted_tool = job_objects["tool_representation"]
+        cwl_tool = tool_proxy_from_persistent_representation(persisted_tool=persisted_tool, strict_cwl_validation=strict_cwl_validation)
+    cwl_job = cwl_tool.job_proxy(job_inputs, output_dict, job_directory=job_directory)
+    return cwl_job
+
+
+def _to_cwl_tool_object(tool_path=None, tool_object=None, cwl_tool_object=None, raw_process_reference=None, strict_cwl_validation=False, tool_directory=None, uuid=None):
+    if uuid is None:
+        uuid = str(uuid4())
+    schema_loader = _schema_loader(strict_cwl_validation)
+    if raw_process_reference is None and tool_path is not None:
+        assert cwl_tool_object is None
+        assert tool_object is None
+
+        raw_process_reference = schema_loader.raw_process_reference(tool_path)
+        cwl_tool = schema_loader.tool(
+            raw_process_reference=raw_process_reference,
+        )
+    elif tool_object is not None:
+        assert raw_process_reference is None
+        assert cwl_tool_object is None
+
+        # Allow loading tools from YAML...
+        from ruamel import yaml as ryaml
+        as_str = json.dumps(tool_object)
+        tool_object = ryaml.round_trip_load(as_str)
+        path = tool_directory
+        if path is None:
+            path = os.getcwd()
+        uri = ref_resolver.file_uri(path) + "/"
+        sourceline.add_lc_filename(tool_object, uri)
+        raw_process_reference = schema_loader.raw_process_reference_for_object(
+            tool_object,
+            uri=uri
+        )
+        cwl_tool = schema_loader.tool(
+            raw_process_reference=raw_process_reference,
+        )
+    else:
+        cwl_tool = cwl_tool_object
+
+    if isinstance(cwl_tool, int):
+        raise Exception("Failed to load tool.")
+
+    raw_tool = cwl_tool.tool
+    # Apply Galaxy hacks to CWL tool representation to bridge semantic differences
+    # between Galaxy and cwltool.
+    _hack_cwl_requirements(cwl_tool)
+    check_requirements(raw_tool)
+    return _cwl_tool_object_to_proxy(cwl_tool, uuid, raw_process_reference=raw_process_reference, tool_path=tool_path)
+
+
+def _cwl_tool_object_to_proxy(cwl_tool, uuid, raw_process_reference=None, tool_path=None):
+    raw_tool = cwl_tool.tool
+    if "class" not in raw_tool:
+        raise Exception("File does not declare a class, not a valid Draft 3+ CWL tool.")
+
+    process_class = raw_tool["class"]
+    if process_class == "CommandLineTool":
+        proxy_class = CommandLineToolProxy
+    elif process_class == "ExpressionTool":
+        proxy_class = ExpressionToolProxy
+    else:
+        raise Exception("File not a CWL CommandLineTool.")
+    top_level_object = tool_path is not None
+    if top_level_object and ("cwlVersion" not in raw_tool):
+        raise Exception("File does not declare a CWL version, pre-draft 3 CWL tools are not supported.")
+
+    proxy = proxy_class(cwl_tool, uuid, raw_process_reference, tool_path)
+    return proxy
+
+
+def _to_cwl_workflow_object(workflow_path, strict_cwl_validation=None):
+    proxy_class = WorkflowProxy
+    cwl_workflow = _schema_loader(strict_cwl_validation).tool(path=workflow_path)
+    raw_workflow = cwl_workflow.tool
+    check_requirements(raw_workflow, tool=False)
+
+    proxy = proxy_class(cwl_workflow, workflow_path)
+    return proxy
+
+
+def _schema_loader(strict_cwl_validation):
+    target_schema_loader = schema_loader if strict_cwl_validation else non_strict_non_validating_schema_loader
+    return target_schema_loader
+
+
+def _hack_cwl_requirements(cwl_tool):
+    move_to_hints = []
+    for i, requirement in enumerate(cwl_tool.requirements):
+        if requirement["class"] == DOCKER_REQUIREMENT:
+            move_to_hints.insert(0, i)
+
+    for i in move_to_hints:
+        del cwl_tool.requirements[i]
+        cwl_tool.hints.append(requirement)
+
+
+def check_requirements(rec, tool=True):
+    if isinstance(rec, dict):
+        if "requirements" in rec:
+            for r in rec["requirements"]:
+                if tool:
+                    possible = SUPPORTED_TOOL_REQUIREMENTS
+                else:
+                    possible = SUPPORTED_WORKFLOW_REQUIREMENTS
+                if r["class"] not in possible:
+                    raise Exception("Unsupported requirement %s" % r["class"])
+        for d in rec:
+            check_requirements(rec[d], tool=tool)
+    if isinstance(rec, list):
+        for d in rec:
+            check_requirements(d, tool=tool)
+
+
+class ToolProxy(metaclass=ABCMeta):
+
+    def __init__(self, tool, uuid, raw_process_reference=None, tool_path=None):
+        self._tool = tool
+        self._uuid = uuid
+        self._tool_path = tool_path
+        self._raw_process_reference = raw_process_reference
+        # remove input parameter formats from CWL files so that cwltool
+        # does not complain they are missing in the input data
+        for input_field in self._tool.inputs_record_schema["fields"]:
+            if 'format' in input_field:
+                del input_field['format']
+
+    def job_proxy(self, input_dict, output_dict, job_directory="."):
+        """ Build a cwltool.job.Job describing computation using a input_json
+        Galaxy will generate mapping the Galaxy description of the inputs into
+        a cwltool compatible variant.
+        """
+        return JobProxy(self, input_dict, output_dict, job_directory=job_directory)
+
+    @property
+    def id(self):
+        raw_id = self._tool.tool.get("id", None)
+        return raw_id
+
+    def galaxy_id(self):
+        raw_id = self.id
+        tool_id = None
+        # don't reduce "search.cwl#index" to search
+        if raw_id:
+            tool_id = os.path.basename(raw_id)
+            # tool_id = os.path.splitext(os.path.basename(raw_id))[0]
+        if not tool_id:
+            return self._uuid
+        assert tool_id
+        if tool_id.startswith("#"):
+            tool_id = tool_id[1:]
+        return tool_id
+
+    @abstractmethod
+    def input_instances(self):
+        """ Return InputInstance objects describing mapping to Galaxy inputs. """
+
+    @abstractmethod
+    def output_instances(self):
+        """ Return OutputInstance objects describing mapping to Galaxy inputs. """
+
+    @abstractmethod
+    def docker_identifier(self):
+        """ Return docker identifier for embedding in tool description. """
+
+    @abstractmethod
+    def description(self):
+        """ Return description to tool. """
+
+    @abstractmethod
+    def label(self):
+        """ Return label for tool. """
+
+    def to_persistent_representation(self):
+        """Return a JSON representation of this tool. Not for serialization
+        over the wire, but serialization in a database."""
+        # TODO: Replace this with some more readable serialization,
+        # I really don't like using pickle here.
+        if PERSISTED_REPRESENTATION == "cwl_tool_object":
+            persisted_obj = remove_pickle_problems(self._tool)
+        else:
+            persisted_obj = self._raw_process_reference
+        return {
+            "class": self._class,
+            "pickle": unicodify(base64.b64encode(pickle.dumps(persisted_obj, pickle.HIGHEST_PROTOCOL))),
+            "uuid": self._uuid,
+        }
+
+    @staticmethod
+    def from_persistent_representation(as_object):
+        """Recover an object serialized with to_persistent_representation."""
+        if "class" not in as_object:
+            raise Exception("Failed to deserialize tool proxy from JSON object - no class found.")
+        if "pickle" not in as_object:
+            raise Exception("Failed to deserialize tool proxy from JSON object - no pickle representation found.")
+        if "uuid" not in as_object:
+            raise Exception("Failed to deserialize tool proxy from JSON object - no uuid found.")
+        to_unpickle = base64.b64decode(as_object["pickle"])
+        loaded_object = pickle.loads(to_unpickle)
+        return loaded_object
+
+
+class CommandLineToolProxy(ToolProxy):
+    _class = "CommandLineTool"
+
+    def description(self):
+        # Don't use description - typically too verbose.
+        return ''
+
+    def doc(self):
+        # TODO: parse multiple lines and merge - valid in cwl-1.1
+        doc = self._tool.tool.get('doc')
+        return doc
+
+    def label(self):
+        label = self._tool.tool.get('label')
+
+        if label is not None:
+            return label.partition(":")[0]  # return substring before ':'
+        else:
+            return ''
+
+    def input_fields(self):
+        input_records_schema = self._eval_schema(self._tool.inputs_record_schema)
+        if input_records_schema["type"] != "record":
+            raise Exception("Unhandled CWL tool input structure")
+
+        # TODO: handle this somewhere else?
+        # schemadef_req_tool_param
+        rval = []
+        for input in input_records_schema["fields"]:
+            input_copy = copy.deepcopy(input)
+            input_type = input.get("type")
+            if isinstance(input_type, list) or isinstance(input_type, dict):
+                rval.append(input_copy)
+                continue
+
+            if input_type in self._tool.schemaDefs:
+                input_copy["type"] = self._tool.schemaDefs[input_type]
+
+            rval.append(input_copy)
+        return rval
+
+    def _eval_schema(self, io_schema):
+        schema_type = io_schema.get("type")
+        if schema_type in self._tool.schemaDefs:
+            io_schema = self._tool.schemaDefs[schema_type]
+        return io_schema
+
+    def input_instances(self):
+        return [_outer_field_to_input_instance(_) for _ in self.input_fields()]
+
+    def output_instances(self):
+        outputs_schema = self._eval_schema(self._tool.outputs_record_schema)
+        if outputs_schema["type"] != "record":
+            raise Exception("Unhandled CWL tool output structure")
+
+        rval = []
+        for output in outputs_schema["fields"]:
+            rval.append(_simple_field_to_output(output))
+
+        return rval
+
+    def docker_identifier(self):
+        for hint in self.hints_or_requirements_of_class("DockerRequirement"):
+            if "dockerImageId" in hint:
+                return hint["dockerImageId"]
+            else:
+                return hint["dockerPull"]
+
+        return None
+
+    def hints_or_requirements_of_class(self, class_name):
+        tool = self._tool.tool
+        reqs_and_hints = tool.get("requirements", []) + tool.get("hints", [])
+        for hint in reqs_and_hints:
+            if hint["class"] == class_name:
+                yield hint
+
+    def software_requirements(self):
+        # Roughest imaginable pass at parsing requirements, really need to take in specs, handle
+        # multiple versions, etc...
+        tool = self._tool.tool
+        reqs_and_hints = tool.get("requirements", []) + tool.get("hints", [])
+        requirements = []
+        for hint in reqs_and_hints:
+            if hint["class"] == "SoftwareRequirement":
+                packages = hint.get("packages", [])
+                for package in packages:
+                    versions = package.get("version", [])
+                    first_version = None if not versions else versions[0]
+                    requirements.append((package["package"], first_version))
+        return requirements
+
+    @property
+    def requirements(self):
+        return getattr(self._tool, "requirements", [])
+
+
+class ExpressionToolProxy(CommandLineToolProxy):
+    _class = "ExpressionTool"
+
+
+class JobProxy:
+
+    def __init__(self, tool_proxy, input_dict, output_dict, job_directory):
+        self._tool_proxy = tool_proxy
+        self._input_dict = input_dict
+        self._output_dict = output_dict
+        self._job_directory = job_directory
+
+        self._final_output = None
+        self._ok = True
+        self._cwl_job = None
+        self._is_command_line_job = None
+
+        self._normalize_job()
+
+    def cwl_job(self):
+        self._ensure_cwl_job_initialized()
+        return self._cwl_job
+
+    @property
+    def is_command_line_job(self):
+        self._ensure_cwl_job_initialized()
+        assert self._is_command_line_job is not None
+        return self._is_command_line_job
+
+    def _ensure_cwl_job_initialized(self):
+        if self._cwl_job is None:
+            job_args = dict(
+                basedir=self._job_directory,
+                select_resources=self._select_resources,
+                outdir=os.path.join(self._job_directory, "working"),
+                tmpdir=os.path.join(self._job_directory, "cwltmp"),
+                stagedir=os.path.join(self._job_directory, "cwlstagedir"),
+                use_container=False,
+                beta_relaxed_fmt_check=beta_relaxed_fmt_check,
+            )
+
+            args = []
+            kwargs = {}
+            if RuntimeContext is not None:
+                args.append(RuntimeContext(job_args))
+            else:
+                kwargs = job_args
+            self._cwl_job = next(self._tool_proxy._tool.job(
+                self._input_dict,
+                self._output_callback,
+                *args, **kwargs
+            ))
+            self._is_command_line_job = hasattr(self._cwl_job, "command_line")
+
+    def _normalize_job(self):
+        # Somehow reuse whatever causes validate in cwltool... maybe?
+        def pathToLoc(p):
+            if "location" not in p and "path" in p:
+                p["location"] = p["path"]
+                del p["path"]
+
+        runtime_context = RuntimeContext({})
+        make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess)
+        fs_access = make_fs_access(runtime_context.basedir)
+        process.fill_in_defaults(self._tool_proxy._tool.tool["inputs"], self._input_dict, fs_access)
+        process.visit_class(self._input_dict, ("File", "Directory"), pathToLoc)
+        # TODO: Why doesn't fillInDefault fill in locations instead of paths?
+        process.normalizeFilesDirs(self._input_dict)
+        # TODO: validate like cwltool process _init_job.
+        #    validate.validate_ex(self.names.get_name("input_record_schema", ""), builder.job,
+        #                         strict=False, logger=_logger_validation_warnings)
+
+    def rewrite_inputs_for_staging(self):
+        if hasattr(self._cwl_job, "pathmapper"):
+            pass
+            # DO SOMETHING LIKE THE FOLLOWING?
+            # path_rewrites = {}
+            # for f, p in self._cwl_job.pathmapper.items():
+            #     if not p.staged:
+            #         continue
+            #     if p.type in ("File", "Directory"):
+            #         path_rewrites[p.resolved] = p.target
+            # for key, value in self._input_dict.items():
+            #     if key in path_rewrites:
+            #         self._input_dict[key]["location"] = path_rewrites[value]
+        else:
+            stagedir = os.path.join(self._job_directory, "cwlstagedir")
+            safe_makedirs(stagedir)
+
+            def stage_recursive(value):
+                is_list = isinstance(value, list)
+                is_dict = isinstance(value, dict)
+                log.info(f"handling value {value}, is_list {is_list}, is_dict {is_dict}")
+                if is_list:
+                    for val in value:
+                        stage_recursive(val)
+                elif is_dict:
+                    if "location" in value and "basename" in value:
+                        location = value["location"]
+                        basename = value["basename"]
+                        if not location.endswith(basename):  # TODO: sep()[-1]
+                            staged_loc = os.path.join(stagedir, basename)
+                            if not os.path.exists(staged_loc):
+                                os.symlink(location, staged_loc)
+                            value["location"] = staged_loc
+                    for dict_value in value.values():
+                        stage_recursive(dict_value)
+                else:
+                    log.info("skipping simple value...")
+            stage_recursive(self._input_dict)
+
+    def _select_resources(self, request, runtime_context=None):
+        new_request = request.copy()
+        new_request["cores"] = "$GALAXY_SLOTS"
+        return new_request
+
+    @property
+    def command_line(self):
+        if self.is_command_line_job:
+            return self.cwl_job().command_line
+        else:
+            return ["true"]
+
+    @property
+    def stdin(self):
+        if self.is_command_line_job:
+            return self.cwl_job().stdin
+        else:
+            return None
+
+    @property
+    def stdout(self):
+        if self.is_command_line_job:
+            return self.cwl_job().stdout
+        else:
+            return None
+
+    @property
+    def stderr(self):
+        if self.is_command_line_job:
+            return self.cwl_job().stderr
+        else:
+            return None
+
+    @property
+    def environment(self):
+        if self.is_command_line_job:
+            return self.cwl_job().environment
+        else:
+            return {}
+
+    @property
+    def generate_files(self):
+        if self.is_command_line_job:
+            return self.cwl_job().generatefiles
+        else:
+            return {}
+
+    def _output_callback(self, out, process_status):
+        self._process_status = process_status
+        if process_status == "success":
+            self._final_output = out
+        else:
+            self._ok = False
+
+        log.info(f"Output are {out}, status is {process_status}")
+
+    def collect_outputs(self, tool_working_directory, rcode):
+        if not self.is_command_line_job:
+            cwl_job = self.cwl_job()
+            if RuntimeContext is not None:
+                cwl_job.run(
+                    RuntimeContext({})
+                )
+            else:
+                cwl_job.run()
+            if not self._ok:
+                raise Exception("Final process state not ok, [%s]" % self._process_status)
+            return self._final_output
+        else:
+            return self.cwl_job().collect_outputs(tool_working_directory, rcode)
+
+    def save_job(self):
+        job_file = JobProxy._job_file(self._job_directory)
+        job_objects = {
+            # "tool_path": os.path.abspath(self._tool_proxy._tool_path),
+            "tool_representation": self._tool_proxy.to_persistent_representation(),
+            "job_inputs": self._input_dict,
+            "output_dict": self._output_dict,
+        }
+        json.dump(job_objects, open(job_file, "w"))
+
+    def _output_extra_files_dir(self, output_name):
+        output_id = self.output_id(output_name)
+        return os.path.join(self._job_directory, "outputs", "dataset_%s_files" % output_id)
+
+    def output_id(self, output_name):
+        output_id = self._output_dict[output_name]["id"]
+        return output_id
+
+    def output_path(self, output_name):
+        output_id = self._output_dict[output_name]["path"]
+        return output_id
+
+    def output_directory_contents_dir(self, output_name, create=False):
+        extra_files_dir = self._output_extra_files_dir(output_name)
+        return extra_files_dir
+
+    def output_secondary_files_dir(self, output_name, create=False):
+        extra_files_dir = self._output_extra_files_dir(output_name)
+        secondary_files_dir = os.path.join(extra_files_dir, SECONDARY_FILES_EXTRA_PREFIX)
+        if create and not os.path.exists(secondary_files_dir):
+            safe_makedirs(secondary_files_dir)
+        return secondary_files_dir
+
+    def stage_files(self):
+        cwl_job = self.cwl_job()
+
+        def stageFunc(resolved_path, target_path):
+            log.info(f"resolving {resolved_path} to {target_path}")
+            try:
+                os.symlink(resolved_path, target_path)
+            except OSError:
+                pass
+
+        if hasattr(cwl_job, "pathmapper"):
+            process.stage_files(cwl_job.pathmapper, stageFunc, ignore_writable=True, symlink=False)
+
+        if hasattr(cwl_job, "generatefiles"):
+            outdir = os.path.join(self._job_directory, "working")
+            # TODO: Why doesn't cwl_job.generatemapper work?
+            generate_mapper = pathmapper.PathMapper(cwl_job.generatefiles["listing"],
+                                                    outdir, outdir, separateDirs=False)
+            # TODO: figure out what inplace_update should be.
+            inplace_update = cwl_job.inplace_update
+            process.stage_files(generate_mapper, stageFunc, ignore_writable=inplace_update, symlink=False)
+            relink_initialworkdir(generate_mapper, outdir, outdir, inplace_update=inplace_update)
+        # else: expression tools do not have a path mapper.
+
+    @staticmethod
+    def _job_file(job_directory):
+        return os.path.join(job_directory, JOB_JSON_FILE)
+
+
+class WorkflowProxy:
+
+    def __init__(self, workflow, workflow_path=None):
+        self._workflow = workflow
+        self._workflow_path = workflow_path
+        self._step_proxies = None
+
+    @property
+    def cwl_id(self):
+        return self._workflow.tool["id"]
+
+    def get_outputs_for_label(self, label):
+        outputs = []
+        for output in self._workflow.tool['outputs']:
+            step, output_name = split_step_references(
+                output["outputSource"],
+                multiple=False,
+                workflow_id=self.cwl_id,
+            )
+            if step == label:
+                output_id = output["id"]
+                if "#" not in self.cwl_id:
+                    _, output_label = output_id.rsplit("#", 1)
+                else:
+                    _, output_label = output_id.rsplit("/", 1)
+
+                outputs.append({
+                    "output_name": output_name,
+                    "label": output_label,
+                })
+        return outputs
+
+    def tool_reference_proxies(self):
+        """Fetch tool source definitions for all referenced tools."""
+        references = []
+        for step in self.step_proxies():
+            references.extend(step.tool_reference_proxies())
+        return references
+
+    def step_proxies(self):
+        if self._step_proxies is None:
+            proxies = []
+            num_input_steps = len(self._workflow.tool['inputs'])
+            for i, step in enumerate(self._workflow.steps):
+                proxies.append(build_step_proxy(self, step, i + num_input_steps))
+            self._step_proxies = proxies
+        return self._step_proxies
+
+    @property
+    def runnables(self):
+        runnables = []
+        for step in self._workflow.steps:
+            if "run" in step.tool:
+                runnables.append(step.tool["run"])
+        return runnables
+
+    def cwl_ids_to_index(self, step_proxies):
+        index = 0
+        cwl_ids_to_index = {}
+        for input_dict in self._workflow.tool['inputs']:
+            cwl_ids_to_index[input_dict["id"]] = index
+            index += 1
+
+        for step_proxy in step_proxies:
+            cwl_ids_to_index[step_proxy.cwl_id] = index
+            index += 1
+
+        return cwl_ids_to_index
+
+    @property
+    def output_labels(self):
+        return [self.jsonld_id_to_label(o['id']) for o in self._workflow.tool['outputs']]
+
+    def input_connections_by_step(self, step_proxies):
+        cwl_ids_to_index = self.cwl_ids_to_index(step_proxies)
+        input_connections_by_step = []
+        for step_proxy in step_proxies:
+            input_connections_step = {}
+            for input_proxy in step_proxy.input_proxies:
+                cwl_source_id = input_proxy.cwl_source_id
+                input_name = input_proxy.input_name
+                # Consider only allow multiple if MultipleInputFeatureRequirement is enabled
+                for (output_step_name, output_name) in split_step_references(cwl_source_id, workflow_id=self.cwl_id):
+                    if "#" in self.cwl_id:
+                        sep_on = "/"
+                    else:
+                        sep_on = "#"
+                    output_step_id = self.cwl_id + sep_on + output_step_name
+
+                    if output_step_id not in cwl_ids_to_index:
+                        template = "Output [%s] does not appear in ID-to-index map [%s]."
+                        msg = template % (output_step_id, cwl_ids_to_index.keys())
+                        raise AssertionError(msg)
+
+                    if input_name not in input_connections_step:
+                        input_connections_step[input_name] = []
+
+                    input_connections_step[input_name].append({
+                        "id": cwl_ids_to_index[output_step_id],
+                        "output_name": output_name,
+                        "input_type": "dataset"
+                    })
+
+            input_connections_by_step.append(input_connections_step)
+
+        return input_connections_by_step
+
+    def to_dict(self):
+        name = os.path.basename(self._workflow.tool.get('label') or self._workflow_path or 'TODO - derive a name from ID')
+        steps = {}
+
+        step_proxies = self.step_proxies()
+        input_connections_by_step = self.input_connections_by_step(step_proxies)
+        index = 0
+        for i, input_dict in enumerate(self._workflow.tool['inputs']):
+            steps[index] = self.cwl_input_to_galaxy_step(input_dict, i)
+            index += 1
+
+        for i, step_proxy in enumerate(step_proxies):
+            input_connections = input_connections_by_step[i]
+            steps[index] = step_proxy.to_dict(input_connections)
+            index += 1
+
+        return {
+            'name': name,
+            'steps': steps,
+            'annotation': self.cwl_object_to_annotation(self._workflow.tool),
+        }
+
+    def find_inputs_step_index(self, label):
+        for i, input in enumerate(self._workflow.tool['inputs']):
+            if self.jsonld_id_to_label(input["id"]) == label:
+                return i
+
+        raise Exception("Failed to find index for label %s" % label)
+
+    def jsonld_id_to_label(self, id):
+        if "#" in self.cwl_id:
+            return id.rsplit("/", 1)[-1]
+        else:
+            return id.rsplit("#", 1)[-1]
+
+    def cwl_input_to_galaxy_step(self, input, i):
+        input_type = input["type"]
+        label = self.jsonld_id_to_label(input["id"])
+        input_as_dict = {
+            "id": i,
+            "label": label,
+            "position": {"left": 0, "top": 0},
+            "annotation": self.cwl_object_to_annotation(input),
+            "input_connections": {},  # Should the Galaxy API really require this? - Seems to.
+            "workflow_outputs": self.get_outputs_for_label(label),
+        }
+
+        if input_type == "File" and "default" not in input:
+            input_as_dict["type"] = "data_input"
+        elif isinstance(input_type, dict) and input_type.get("type") == "array":
+            input_as_dict["type"] = "data_collection_input"
+            input_as_dict["collection_type"] = "list"
+        elif isinstance(input_type, dict) and input_type.get("type") == "record":
+            input_as_dict["type"] = "data_collection_input"
+            input_as_dict["collection_type"] = "record"
+        else:
+            if USE_STEP_PARAMETERS:
+                input_as_dict["type"] = "parameter_input"
+                # TODO: dispatch on actual type so this doesn't always need
+                # to be field - simpler types could be simpler inputs.
+                tool_state = {}
+                tool_state["parameter_type"] = "field"
+                default_set = "default" in input
+                default_value = input.get("default")
+                optional = default_set
+                if isinstance(input_type, list) and "null" in input_type:
+                    optional = True
+                if not optional and isinstance(input_type, dict) and "type" in input_type:
+                    raise ValueError("'type' detected in non-optional input dictionary.")
+                if default_set:
+                    tool_state["default"] = {"src": "json", "value": default_value}
+                tool_state["optional"] = optional
+                input_as_dict["tool_state"] = tool_state
+            else:
+                input_as_dict["type"] = "data_input"
+                # TODO: format = expression.json
+
+        return input_as_dict
+
+    def cwl_object_to_annotation(self, cwl_obj):
+        return cwl_obj.get("doc", None)
+
+
+def split_step_references(step_references, workflow_id=None, multiple=True):
+    """Split a CWL step input or output reference into step id and name."""
+    # Trim off the workflow id part of the reference.
+    step_references = listify(step_references)
+    split_references = []
+
+    for step_reference in step_references:
+        if workflow_id is None:
+            # This path works fine for some simple workflows - but not so much
+            # for subworkflows (maybe same for $graph workflows?)
+            assert "#" in step_reference
+            _, step_reference = step_reference.split("#", 1)
+        else:
+            if "#" in workflow_id:
+                sep_on = "/"
+            else:
+                sep_on = "#"
+            expected_prefix = workflow_id + sep_on
+            if not step_reference.startswith(expected_prefix):
+                raise AssertionError(f"step_reference [{step_reference}] doesn't start with {expected_prefix}")
+            step_reference = step_reference[len(expected_prefix):]
+
+        # Now just grab the step name and input/output name.
+        assert "#" not in step_reference
+        if "/" in step_reference:
+            step_name, io_name = step_reference.split("/", 1)
+        else:
+            # Referencing an input, not a step.
+            # In Galaxy workflows input steps have an implicit output named
+            # "output" for consistency with tools - in cwl land
+            # just the input name is referenced.
+            step_name = step_reference
+            io_name = "output"
+        split_references.append((step_name, io_name))
+
+    if multiple:
+        return split_references
+    else:
+        assert len(split_references) == 1
+        return split_references[0]
+
+
+def build_step_proxy(workflow_proxy, step, index):
+    step_type = step.embedded_tool.tool["class"]
+    if step_type == "Workflow":
+        return SubworkflowStepProxy(workflow_proxy, step, index)
+    else:
+        return ToolStepProxy(workflow_proxy, step, index)
+
+
+class BaseStepProxy:
+
+    def __init__(self, workflow_proxy, step, index):
+        self._workflow_proxy = workflow_proxy
+        self._step = step
+        self._index = index
+        self._uuid = str(uuid4())
+        self._input_proxies = None
+
+    @property
+    def step_class(self):
+        return self.cwl_tool_object.tool["class"]
+
+    @property
+    def cwl_id(self):
+        return self._step.id
+
+    @property
+    def cwl_workflow_id(self):
+        return self._workflow_proxy.cwl_id
+
+    @property
+    def requirements(self):
+        return self._step.requirements
+
+    @property
+    def hints(self):
+        return self._step.hints
+
+    @property
+    def label(self):
+        label = self._workflow_proxy.jsonld_id_to_label(self._step.id)
+        return label
+
+    def galaxy_workflow_outputs_list(self):
+        return self._workflow_proxy.get_outputs_for_label(self.label)
+
+    @property
+    def cwl_tool_object(self):
+        return self._step.embedded_tool
+
+    @property
+    def input_proxies(self):
+        if self._input_proxies is None:
+            input_proxies = []
+            cwl_inputs = self._step.tool["inputs"]
+            for cwl_input in cwl_inputs:
+                input_proxies.append(InputProxy(self, cwl_input))
+            self._input_proxies = input_proxies
+        return self._input_proxies
+
+    def inputs_to_dicts(self):
+        inputs_as_dicts = []
+        for input_proxy in self.input_proxies:
+            inputs_as_dicts.append(input_proxy.to_dict())
+        return inputs_as_dicts
+
+
+class InputProxy:
+
+    def __init__(self, step_proxy, cwl_input):
+        self._cwl_input = cwl_input
+        self.step_proxy = step_proxy
+        self.workflow_proxy = step_proxy._workflow_proxy
+
+        cwl_input_id = cwl_input["id"]
+        cwl_source_id = cwl_input.get("source", None)
+        if cwl_source_id is None:
+            if "valueFrom" not in cwl_input and "default" not in cwl_input:
+                msg = "Workflow step input must define a source, a valueFrom, or a default value. Obtained [%s]." % cwl_input
+                raise MessageException(msg)
+
+        assert cwl_input_id
+        step_name, input_name = split_step_references(
+            cwl_input_id,
+            multiple=False,
+            workflow_id=step_proxy.cwl_workflow_id
+        )
+        self.step_name = step_name
+        self.input_name = input_name
+
+        self.cwl_input_id = cwl_input_id
+        self.cwl_source_id = cwl_source_id
+
+        scatter_inputs = [split_step_references(
+            i, multiple=False, workflow_id=step_proxy.cwl_workflow_id
+        )[1] for i in listify(step_proxy._step.tool.get("scatter", []))]
+        scatter = self.input_name in scatter_inputs
+        self.scatter = scatter
+
+    def to_dict(self):
+        as_dict = {
+            "name": self.input_name,
+        }
+        if "linkMerge" in self._cwl_input:
+            as_dict["merge_type"] = self._cwl_input["linkMerge"]
+        if "scatterMethod" in self.step_proxy._step.tool:
+            as_dict["scatter_type"] = self.step_proxy._step.tool.get("scatterMethod", "dotproduct")
+        else:
+            as_dict["scatter_type"] = "dotproduct" if self.scatter else "disabled"
+        if "valueFrom" in self._cwl_input:
+            # TODO: Add a table for expressions - mark the type as CWL 1.0 JavaScript.
+            as_dict["value_from"] = self._cwl_input["valueFrom"]
+        if "default" in self._cwl_input:
+            as_dict["default"] = self._cwl_input["default"]
+        return as_dict
+
+
+class ToolStepProxy(BaseStepProxy):
+
+    def __init__(self, workflow_proxy, step, index):
+        super().__init__(workflow_proxy, step, index)
+        self._tool_proxy = None
+
+    @property
+    def tool_proxy(self):
+        # Neeeds to be cached so UUID that is loaded matches UUID generated with to_dict.
+        if self._tool_proxy is None:
+            self._tool_proxy = _cwl_tool_object_to_proxy(self.cwl_tool_object, uuid=str(uuid4()))
+        return self._tool_proxy
+
+    def tool_reference_proxies(self):
+        # Return a list so we can handle subworkflows recursively.
+        return [self.tool_proxy]
+
+    def to_dict(self, input_connections):
+        # We need to stub out null entries for things getting replaced by
+        # connections. This doesn't seem ideal - consider just making Galaxy
+        # handle this.
+        tool_state = {}
+        for input_name in input_connections.keys():
+            tool_state[input_name] = None
+
+        outputs = self.galaxy_workflow_outputs_list()
+        return {
+            "id": self._index,
+            "tool_uuid": self.tool_proxy._uuid,  # TODO: make sure this is respected...
+            "label": self.label,
+            "position": {"left": 0, "top": 0},
+            "type": "tool",
+            "annotation": self._workflow_proxy.cwl_object_to_annotation(self._step.tool),
+            "input_connections": input_connections,
+            "inputs": self.inputs_to_dicts(),
+            "workflow_outputs": outputs,
+        }
+
+
+class SubworkflowStepProxy(BaseStepProxy):
+
+    def __init__(self, workflow_proxy, step, index):
+        super().__init__(workflow_proxy, step, index)
+        self._subworkflow_proxy = None
+
+    def to_dict(self, input_connections):
+        outputs = self.galaxy_workflow_outputs_list()
+        for key, input_connection_list in input_connections.items():
+            input_subworkflow_step_id = self.subworkflow_proxy.find_inputs_step_index(
+                key
+            )
+            for input_connection in input_connection_list:
+                input_connection["input_subworkflow_step_id"] = input_subworkflow_step_id
+
+        return {
+            "id": self._index,
+            "label": self.label,
+            "position": {"left": 0, "top": 0},
+            "type": "subworkflow",
+            "subworkflow": self.subworkflow_proxy.to_dict(),
+            "annotation": self.subworkflow_proxy.cwl_object_to_annotation(self._step.tool),
+            "input_connections": input_connections,
+            "inputs": self.inputs_to_dicts(),
+            "workflow_outputs": outputs,
+        }
+
+    def tool_reference_proxies(self):
+        return self.subworkflow_proxy.tool_reference_proxies()
+
+    @property
+    def subworkflow_proxy(self):
+        if self._subworkflow_proxy is None:
+            self._subworkflow_proxy = WorkflowProxy(self.cwl_tool_object)
+        return self._subworkflow_proxy
+
+
+def remove_pickle_problems(obj):
+    """doc_loader does not pickle correctly"""
+    if hasattr(obj, "doc_loader"):
+        obj.doc_loader = None
+    if hasattr(obj, "embedded_tool"):
+        obj.embedded_tool = remove_pickle_problems(obj.embedded_tool)
+    if hasattr(obj, "steps"):
+        obj.steps = [remove_pickle_problems(s) for s in obj.steps]
+    return obj
+
+
+class WorkflowToolReference(metaclass=ABCMeta):
+    pass
+
+
+class EmbeddedWorkflowToolReference(WorkflowToolReference):
+    pass
+
+
+class ExternalWorkflowToolReference(WorkflowToolReference):
+    pass
+
+
+def _outer_field_to_input_instance(field):
+    field_type = field_to_field_type(field)  # Must be a list if in here?
+    if not isinstance(field_type, list):
+        field_type = [field_type]
+
+    name, label, description = _field_metadata(field)
+
+    case_name = "_cwl__type_"
+    case_label = "Specify Parameter %s As" % label
+
+    def value_input(type_description):
+        value_name = "_cwl__value_"
+        value_label = label
+        value_description = description
+        return InputInstance(
+            value_name,
+            value_label,
+            value_description,
+            input_type=type_description.galaxy_param_type,
+            collection_type=type_description.collection_type,
+        )
+
+    select_options = []
+    case_options = []
+    type_descriptions = type_descriptions_for_field_types(field_type)
+    for type_description in type_descriptions:
+        select_options.append({"value": type_description.name, "label": type_description.label})
+        input_instances = []
+        if type_description.uses_param:
+            input_instances.append(value_input(type_description))
+        case_options.append((type_description.name, input_instances))
+
+    # If there is more than one way to represent this parameter - produce a conditional
+    # requesting user to ask for what form they want to submit the data in, else just map
+    # a simple Galaxy parameter.
+    if len(case_options) > 1 and not USE_FIELD_TYPES:
+        case_input = SelectInputInstance(
+            name=case_name,
+            label=case_label,
+            description=False,
+            options=select_options,
+        )
+
+        return ConditionalInstance(name, case_input, case_options)
+    else:
+        if len(case_options) > 1:
+            only_type_description = FIELD_TYPE_REPRESENTATION
+        else:
+            only_type_description = type_descriptions[0]
+
+        return InputInstance(
+            name, label, description, input_type=only_type_description.galaxy_param_type, collection_type=only_type_description.collection_type
+        )
+
+    # Older array to repeat handling, now we are just representing arrays as
+    # dataset collections - we should offer a blended approach in the future.
+    # if field_type in simple_map_type_map.keys():
+    #     input_type = simple_map_type_map[field_type]
+    #     return {"input_type": input_type, "array": False}
+    # elif field_type == "array":
+    #     if isinstance(field["type"], dict):
+    #         array_type = field["type"]["items"]
+    #     else:
+    #         array_type = field["items"]
+    #     if array_type in simple_map_type_map.keys():
+    #         input_type = simple_map_type_map[array_type]
+    #     return {"input_type": input_type, "array": True}
+    # else:
+    #     raise Exception("Unhandled simple field type encountered - [%s]." % field_type)
+
+
+def _field_metadata(field):
+    name = field["name"]
+    label = field.get("label", None)
+    description = field.get("doc", None)
+    return name, label, description
+
+
+def _simple_field_to_output(field):
+    name = field["name"]
+    output_data_class = field["type"]
+    output_instance = OutputInstance(
+        name,
+        output_data_type=output_data_class,
+        output_type=OUTPUT_TYPE.GLOB
+    )
+    return output_instance
+
+
+class ConditionalInstance:
+
+    def __init__(self, name, case, whens):
+        self.input_type = INPUT_TYPE.CONDITIONAL
+        self.name = name
+        self.case = case
+        self.whens = whens
+
+    def to_dict(self):
+
+        as_dict = dict(
+            name=self.name,
+            type=INPUT_TYPE.CONDITIONAL,
+            test=self.case.to_dict(),
+            when={},
+        )
+        for value, block in self.whens:
+            as_dict["when"][value] = [i.to_dict() for i in block]
+
+        return as_dict
+
+
+class SelectInputInstance:
+
+    def __init__(self, name, label, description, options):
+        self.input_type = INPUT_TYPE.SELECT
+        self.name = name
+        self.label = label
+        self.description = description
+        self.options = options
+
+    def to_dict(self):
+        # TODO: serialize options...
+        as_dict = dict(
+            name=self.name,
+            label=self.label or self.name,
+            help=self.description,
+            type=self.input_type,
+            options=self.options,
+        )
+        return as_dict
+
+
+class InputInstance:
+
+    def __init__(self, name, label, description, input_type, array=False, area=False, collection_type=None):
+        self.input_type = input_type
+        self.collection_type = collection_type
+        self.name = name
+        self.label = label
+        self.description = description
+        self.required = True
+        self.array = array
+        self.area = area
+
+    def to_dict(self, itemwise=True):
+        if itemwise and self.array:
+            as_dict = dict(
+                type="repeat",
+                name="%s_repeat" % self.name,
+                title="%s" % self.name,
+                blocks=[
+                    self.to_dict(itemwise=False)
+                ]
+            )
+        else:
+            as_dict = dict(
+                name=self.name,
+                label=self.label or self.name,
+                help=self.description,
+                type=self.input_type,
+                optional=not self.required,
+            )
+            if self.area:
+                as_dict["area"] = True
+
+            if self.input_type == INPUT_TYPE.INTEGER:
+                as_dict["value"] = "0"
+            if self.input_type == INPUT_TYPE.FLOAT:
+                as_dict["value"] = "0.0"
+            elif self.input_type == INPUT_TYPE.DATA_COLLECTON:
+                as_dict["collection_type"] = self.collection_type
+
+        return as_dict
+
+
+OUTPUT_TYPE = Bunch(
+    GLOB="glob",
+    STDOUT="stdout",
+)
+
+
+# TODO: Different subclasses - this is representing different types of things.
+class OutputInstance:
+
+    def __init__(self, name, output_data_type, output_type, path=None, fields=None):
+        self.name = name
+        self.output_data_type = output_data_type
+        self.output_type = output_type
+        self.path = path
+        self.fields = fields
+
+
+__all__ = (
+    'tool_proxy',
+    'load_job_proxy',
+)