diff env/lib/python3.9/site-packages/cwltool/tests/test_pack.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/tests/test_pack.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,251 @@
+import json
+import os
+import tempfile
+from collections.abc import Sized
+from functools import partial
+from io import StringIO
+from pathlib import Path
+from typing import Dict
+
+import pytest
+from ruamel import yaml
+
+import cwltool.pack
+import cwltool.workflow
+from cwltool.context import LoadingContext
+from cwltool.load_tool import fetch_document, resolve_and_validate_document
+from cwltool.main import main, make_relative, print_pack
+from cwltool.resolver import tool_resolver
+from cwltool.utils import adjustDirObjs, adjustFileObjs
+
+from .util import get_data, needs_docker
+
+
+def test_pack() -> None:
+    loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/revsort.cwl"))
+
+    with open(get_data("tests/wf/expect_packed.cwl")) as packed_file:
+        expect_packed = yaml.main.safe_load(packed_file)
+
+    packed = cwltool.pack.pack(loadingContext, uri)
+    adjustFileObjs(
+        packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))
+    )
+    adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))))
+
+    assert "$schemas" in packed
+    packed_schemas = packed["$schemas"]
+    assert isinstance(packed_schemas, Sized)
+    assert len(packed_schemas) == len(expect_packed["$schemas"])
+    del packed["$schemas"]
+    del expect_packed["$schemas"]
+
+    assert packed == expect_packed
+
+
+def test_pack_input_named_name() -> None:
+    loadingContext, workflowobj, uri = fetch_document(
+        get_data("tests/wf/trick_revsort.cwl")
+    )
+    loadingContext.do_update = False
+    loadingContext, uri = resolve_and_validate_document(
+        loadingContext, workflowobj, uri
+    )
+    loader = loadingContext.loader
+    assert loader
+    loader.resolve_ref(uri)[0]
+
+    with open(get_data("tests/wf/expect_trick_packed.cwl")) as packed_file:
+        expect_packed = yaml.main.round_trip_load(packed_file)
+
+    packed = cwltool.pack.pack(loadingContext, uri)
+    adjustFileObjs(
+        packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))
+    )
+    adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))))
+
+    assert "$schemas" in packed
+    packed_schemas = packed["$schemas"]
+    assert isinstance(packed_schemas, Sized)
+    assert len(packed_schemas) == len(expect_packed["$schemas"])
+    del packed["$schemas"]
+    del expect_packed["$schemas"]
+
+    assert packed == expect_packed
+
+
+def test_pack_single_tool() -> None:
+    loadingContext, workflowobj, uri = fetch_document(
+        get_data("tests/wf/formattest.cwl")
+    )
+    loadingContext.do_update = False
+    loadingContext, uri = resolve_and_validate_document(
+        loadingContext, workflowobj, uri
+    )
+    loader = loadingContext.loader
+    assert loader
+    loader.resolve_ref(uri)[0]
+
+    packed = cwltool.pack.pack(loadingContext, uri)
+    assert "$schemas" in packed
+
+
+def test_pack_fragment() -> None:
+    with open(get_data("tests/wf/scatter2_subwf.cwl")) as packed_file:
+        expect_packed = yaml.main.safe_load(packed_file)
+
+    loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/scatter2.cwl"))
+    packed = cwltool.pack.pack(loadingContext, uri + "#scatterstep/mysub")
+    adjustFileObjs(
+        packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))
+    )
+    adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))))
+
+    assert json.dumps(packed, sort_keys=True, indent=2) == json.dumps(
+        expect_packed, sort_keys=True, indent=2
+    )
+
+
+def test_pack_rewrites() -> None:
+    rewrites = {}  # type: Dict[str, str]
+
+    loadingContext, workflowobj, uri = fetch_document(
+        get_data("tests/wf/default-wf5.cwl")
+    )
+    loadingContext.do_update = False
+    loadingContext, uri = resolve_and_validate_document(
+        loadingContext, workflowobj, uri
+    )
+    loader = loadingContext.loader
+    assert loader
+    loader.resolve_ref(uri)[0]
+
+    cwltool.pack.pack(
+        loadingContext,
+        uri,
+        rewrite_out=rewrites,
+    )
+
+    assert len(rewrites) == 6
+
+
+cwl_missing_version_paths = [
+    "tests/wf/hello_single_tool.cwl",
+    "tests/wf/hello-workflow.cwl",
+]
+
+
+@pytest.mark.parametrize("cwl_path", cwl_missing_version_paths)
+def test_pack_missing_cwlVersion(cwl_path: str) -> None:
+    """Ensure the generated pack output is not missing the `cwlVersion` in case of single tool workflow and single step workflow."""
+    # Testing single tool workflow
+    loadingContext, workflowobj, uri = fetch_document(get_data(cwl_path))
+    loadingContext.do_update = False
+    loadingContext, uri = resolve_and_validate_document(
+        loadingContext, workflowobj, uri
+    )
+    loader = loadingContext.loader
+    assert loader
+    loader.resolve_ref(uri)[0]
+
+    # generate pack output dict
+    packed = json.loads(print_pack(loadingContext, uri))
+
+    assert packed["cwlVersion"] == "v1.0"
+
+
+def test_pack_idempotence_tool(tmp_path: Path) -> None:
+    """Ensure that pack produces exactly the same document for an already packed CommandLineTool."""
+    _pack_idempotently("tests/wf/hello_single_tool.cwl", tmp_path)
+
+
+def test_pack_idempotence_workflow(tmp_path: Path) -> None:
+    """Ensure that pack produces exactly the same document for an already packed workflow."""
+    _pack_idempotently("tests/wf/count-lines1-wf.cwl", tmp_path)
+
+
+def _pack_idempotently(document: str, tmp_path: Path) -> None:
+    loadingContext, workflowobj, uri = fetch_document(get_data(document))
+    loadingContext.do_update = False
+    loadingContext, uri = resolve_and_validate_document(
+        loadingContext, workflowobj, uri
+    )
+    loader = loadingContext.loader
+    assert loader
+    loader.resolve_ref(uri)[0]
+
+    # generate pack output dict
+    packed_text = print_pack(loadingContext, uri)
+    packed = json.loads(packed_text)
+
+    tmp_name = tmp_path / "packed.cwl"
+    tmp = tmp_name.open(mode="w")
+    tmp.write(packed_text)
+    tmp.flush()
+    tmp.close()
+
+    loadingContext, workflowobj, uri2 = fetch_document(tmp.name)
+    loadingContext.do_update = False
+    loadingContext, uri2 = resolve_and_validate_document(
+        loadingContext, workflowobj, uri2
+    )
+    loader2 = loadingContext.loader
+    assert loader2
+    loader2.resolve_ref(uri2)[0]
+
+    # generate pack output dict
+    packed_text = print_pack(loadingContext, uri2)
+    double_packed = json.loads(packed_text)
+
+    assert uri != uri2
+    assert packed == double_packed
+
+
+cwl_to_run = [
+    ("tests/wf/count-lines1-wf.cwl", "tests/wf/wc-job.json", False),
+    ("tests/wf/formattest.cwl", "tests/wf/formattest-job.json", True),
+]
+
+
+@needs_docker
+@pytest.mark.parametrize("wf_path,job_path,namespaced", cwl_to_run)
+def test_packed_workflow_execution(
+    wf_path: str, job_path: str, namespaced: bool, tmp_path: Path
+) -> None:
+    loadingContext = LoadingContext()
+    loadingContext.resolver = tool_resolver
+    loadingContext, workflowobj, uri = fetch_document(get_data(wf_path), loadingContext)
+    loadingContext.do_update = False
+    loadingContext, uri = resolve_and_validate_document(
+        loadingContext, workflowobj, uri
+    )
+    loader = loadingContext.loader
+    assert loader
+    loader.resolve_ref(uri)[0]
+    packed = json.loads(print_pack(loadingContext, uri))
+
+    assert not namespaced or "$namespaces" in packed
+
+    wf_packed_handle, wf_packed_path = tempfile.mkstemp()
+    with open(wf_packed_path, "w") as temp_file:
+        json.dump(packed, temp_file)
+
+    normal_output = StringIO()
+    packed_output = StringIO()
+
+    normal_params = ["--outdir", str(tmp_path), get_data(wf_path), get_data(job_path)]
+    packed_params = [
+        "--outdir",
+        str(tmp_path),
+        "--debug",
+        wf_packed_path,
+        get_data(job_path),
+    ]
+
+    assert main(normal_params, stdout=normal_output) == 0
+    assert main(packed_params, stdout=packed_output) == 0
+
+    assert json.loads(packed_output.getvalue()) == json.loads(normal_output.getvalue())
+
+    os.close(wf_packed_handle)
+    os.remove(wf_packed_path)