Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/tests/test_pack.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/tests/test_pack.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,251 @@ +import json +import os +import tempfile +from collections.abc import Sized +from functools import partial +from io import StringIO +from pathlib import Path +from typing import Dict + +import pytest +from ruamel import yaml + +import cwltool.pack +import cwltool.workflow +from cwltool.context import LoadingContext +from cwltool.load_tool import fetch_document, resolve_and_validate_document +from cwltool.main import main, make_relative, print_pack +from cwltool.resolver import tool_resolver +from cwltool.utils import adjustDirObjs, adjustFileObjs + +from .util import get_data, needs_docker + + +def test_pack() -> None: + loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/revsort.cwl")) + + with open(get_data("tests/wf/expect_packed.cwl")) as packed_file: + expect_packed = yaml.main.safe_load(packed_file) + + packed = cwltool.pack.pack(loadingContext, uri) + adjustFileObjs( + packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))) + ) + adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))) + + assert "$schemas" in packed + packed_schemas = packed["$schemas"] + assert isinstance(packed_schemas, Sized) + assert len(packed_schemas) == len(expect_packed["$schemas"]) + del packed["$schemas"] + del expect_packed["$schemas"] + + assert packed == expect_packed + + +def test_pack_input_named_name() -> None: + loadingContext, workflowobj, uri = fetch_document( + get_data("tests/wf/trick_revsort.cwl") + ) + loadingContext.do_update = False + loadingContext, uri = resolve_and_validate_document( + loadingContext, workflowobj, uri + ) + loader = loadingContext.loader + assert loader + loader.resolve_ref(uri)[0] + + with open(get_data("tests/wf/expect_trick_packed.cwl")) as packed_file: + expect_packed = yaml.main.round_trip_load(packed_file) + + packed = cwltool.pack.pack(loadingContext, uri) + adjustFileObjs( + packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))) + ) + adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))) + + assert "$schemas" in packed + packed_schemas = packed["$schemas"] + assert isinstance(packed_schemas, Sized) + assert len(packed_schemas) == len(expect_packed["$schemas"]) + del packed["$schemas"] + del expect_packed["$schemas"] + + assert packed == expect_packed + + +def test_pack_single_tool() -> None: + loadingContext, workflowobj, uri = fetch_document( + get_data("tests/wf/formattest.cwl") + ) + loadingContext.do_update = False + loadingContext, uri = resolve_and_validate_document( + loadingContext, workflowobj, uri + ) + loader = loadingContext.loader + assert loader + loader.resolve_ref(uri)[0] + + packed = cwltool.pack.pack(loadingContext, uri) + assert "$schemas" in packed + + +def test_pack_fragment() -> None: + with open(get_data("tests/wf/scatter2_subwf.cwl")) as packed_file: + expect_packed = yaml.main.safe_load(packed_file) + + loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/scatter2.cwl")) + packed = cwltool.pack.pack(loadingContext, uri + "#scatterstep/mysub") + adjustFileObjs( + packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))) + ) + adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))) + + assert json.dumps(packed, sort_keys=True, indent=2) == json.dumps( + expect_packed, sort_keys=True, indent=2 + ) + + +def test_pack_rewrites() -> None: + rewrites = {} # type: Dict[str, str] + + loadingContext, workflowobj, uri = fetch_document( + get_data("tests/wf/default-wf5.cwl") + ) + loadingContext.do_update = False + loadingContext, uri = resolve_and_validate_document( + loadingContext, workflowobj, uri + ) + loader = loadingContext.loader + assert loader + loader.resolve_ref(uri)[0] + + cwltool.pack.pack( + loadingContext, + uri, + rewrite_out=rewrites, + ) + + assert len(rewrites) == 6 + + +cwl_missing_version_paths = [ + "tests/wf/hello_single_tool.cwl", + "tests/wf/hello-workflow.cwl", +] + + +@pytest.mark.parametrize("cwl_path", cwl_missing_version_paths) +def test_pack_missing_cwlVersion(cwl_path: str) -> None: + """Ensure the generated pack output is not missing the `cwlVersion` in case of single tool workflow and single step workflow.""" + # Testing single tool workflow + loadingContext, workflowobj, uri = fetch_document(get_data(cwl_path)) + loadingContext.do_update = False + loadingContext, uri = resolve_and_validate_document( + loadingContext, workflowobj, uri + ) + loader = loadingContext.loader + assert loader + loader.resolve_ref(uri)[0] + + # generate pack output dict + packed = json.loads(print_pack(loadingContext, uri)) + + assert packed["cwlVersion"] == "v1.0" + + +def test_pack_idempotence_tool(tmp_path: Path) -> None: + """Ensure that pack produces exactly the same document for an already packed CommandLineTool.""" + _pack_idempotently("tests/wf/hello_single_tool.cwl", tmp_path) + + +def test_pack_idempotence_workflow(tmp_path: Path) -> None: + """Ensure that pack produces exactly the same document for an already packed workflow.""" + _pack_idempotently("tests/wf/count-lines1-wf.cwl", tmp_path) + + +def _pack_idempotently(document: str, tmp_path: Path) -> None: + loadingContext, workflowobj, uri = fetch_document(get_data(document)) + loadingContext.do_update = False + loadingContext, uri = resolve_and_validate_document( + loadingContext, workflowobj, uri + ) + loader = loadingContext.loader + assert loader + loader.resolve_ref(uri)[0] + + # generate pack output dict + packed_text = print_pack(loadingContext, uri) + packed = json.loads(packed_text) + + tmp_name = tmp_path / "packed.cwl" + tmp = tmp_name.open(mode="w") + tmp.write(packed_text) + tmp.flush() + tmp.close() + + loadingContext, workflowobj, uri2 = fetch_document(tmp.name) + loadingContext.do_update = False + loadingContext, uri2 = resolve_and_validate_document( + loadingContext, workflowobj, uri2 + ) + loader2 = loadingContext.loader + assert loader2 + loader2.resolve_ref(uri2)[0] + + # generate pack output dict + packed_text = print_pack(loadingContext, uri2) + double_packed = json.loads(packed_text) + + assert uri != uri2 + assert packed == double_packed + + +cwl_to_run = [ + ("tests/wf/count-lines1-wf.cwl", "tests/wf/wc-job.json", False), + ("tests/wf/formattest.cwl", "tests/wf/formattest-job.json", True), +] + + +@needs_docker +@pytest.mark.parametrize("wf_path,job_path,namespaced", cwl_to_run) +def test_packed_workflow_execution( + wf_path: str, job_path: str, namespaced: bool, tmp_path: Path +) -> None: + loadingContext = LoadingContext() + loadingContext.resolver = tool_resolver + loadingContext, workflowobj, uri = fetch_document(get_data(wf_path), loadingContext) + loadingContext.do_update = False + loadingContext, uri = resolve_and_validate_document( + loadingContext, workflowobj, uri + ) + loader = loadingContext.loader + assert loader + loader.resolve_ref(uri)[0] + packed = json.loads(print_pack(loadingContext, uri)) + + assert not namespaced or "$namespaces" in packed + + wf_packed_handle, wf_packed_path = tempfile.mkstemp() + with open(wf_packed_path, "w") as temp_file: + json.dump(packed, temp_file) + + normal_output = StringIO() + packed_output = StringIO() + + normal_params = ["--outdir", str(tmp_path), get_data(wf_path), get_data(job_path)] + packed_params = [ + "--outdir", + str(tmp_path), + "--debug", + wf_packed_path, + get_data(job_path), + ] + + assert main(normal_params, stdout=normal_output) == 0 + assert main(packed_params, stdout=packed_output) == 0 + + assert json.loads(packed_output.getvalue()) == json.loads(normal_output.getvalue()) + + os.close(wf_packed_handle) + os.remove(wf_packed_path)