Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/tests/test_examples.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
import json import logging import os import stat import subprocess import sys from io import StringIO from pathlib import Path from typing import Any, Dict, List, Union, cast from urllib.parse import urlparse import pydot # type: ignore import pytest from ruamel.yaml.comments import CommentedMap, CommentedSeq from schema_salad.exceptions import ValidationException import cwltool.checker import cwltool.expression as expr import cwltool.factory import cwltool.pathmapper import cwltool.process import cwltool.workflow from cwltool.checker import can_assign_src_to_sink from cwltool.context import RuntimeContext from cwltool.errors import WorkflowException from cwltool.main import main from cwltool.process import CWL_IANA from cwltool.sandboxjs import JavascriptException from cwltool.utils import CWLObjectType, dedup, onWindows from .util import ( get_data, get_main_output, get_windows_safe_factory, needs_docker, windows_needs_docker, ) sys.argv = [""] expression_match = [ ("(foo)", True), ("(foo.bar)", True), ("(foo['bar'])", True), ('(foo["bar"])', True), ("(foo.bar.baz)", True), ("(foo['bar'].baz)", True), ("(foo['bar']['baz'])", True), ("(foo['b\\'ar']['baz'])", True), ("(foo['b ar']['baz'])", True), ("(foo_bar)", True), ('(foo.["bar"])', False), ('(.foo["bar"])', False), ('(foo ["bar"])', False), ('( foo["bar"])', False), ("(foo[bar].baz)", False), ("(foo['bar\"].baz)", False), ("(foo['bar].baz)", False), ("{foo}", False), ("(foo.bar", False), ("foo.bar)", False), ("foo.b ar)", False), ("foo.b'ar)", False), ("(foo+bar", False), ("(foo bar", False), ] @pytest.mark.parametrize("expression,expected", expression_match) def test_expression_match(expression: str, expected: bool) -> None: match = expr.param_re.match(expression) assert (match is not None) == expected interpolate_input = { "foo": { "bar": {"baz": "zab1"}, "b ar": {"baz": 2}, "b'ar": {"baz": True}, 'b"ar': {"baz": None}, }, "lst": ["A", "B"], } # type: Dict[str, Any] interpolate_parameters = [ ("$(foo)", interpolate_input["foo"]), ("$(foo.bar)", interpolate_input["foo"]["bar"]), ("$(foo['bar'])", interpolate_input["foo"]["bar"]), ('$(foo["bar"])', interpolate_input["foo"]["bar"]), ("$(foo.bar.baz)", interpolate_input["foo"]["bar"]["baz"]), ("$(foo['bar'].baz)", interpolate_input["foo"]["bar"]["baz"]), ("$(foo['bar'][\"baz\"])", interpolate_input["foo"]["bar"]["baz"]), ("$(foo.bar['baz'])", interpolate_input["foo"]["bar"]["baz"]), ("$(foo['b\\'ar'].baz)", True), ('$(foo["b\'ar"].baz)', True), ("$(foo['b\\\"ar'].baz)", None), ("$(lst[0])", "A"), ("$(lst[1])", "B"), ("$(lst.length)", 2), ("$(lst['length'])", 2), ("-$(foo.bar)", """-{"baz": "zab1"}"""), ("-$(foo['bar'])", """-{"baz": "zab1"}"""), ('-$(foo["bar"])', """-{"baz": "zab1"}"""), ("-$(foo.bar.baz)", "-zab1"), ("-$(foo['bar'].baz)", "-zab1"), ("-$(foo['bar'][\"baz\"])", "-zab1"), ("-$(foo.bar['baz'])", "-zab1"), ("-$(foo['b ar'].baz)", "-2"), ("-$(foo['b\\'ar'].baz)", "-true"), ('-$(foo["b\\\'ar"].baz)', "-true"), ("-$(foo['b\\\"ar'].baz)", "-null"), ("$(foo.bar) $(foo.bar)", """{"baz": "zab1"} {"baz": "zab1"}"""), ("$(foo['bar']) $(foo['bar'])", """{"baz": "zab1"} {"baz": "zab1"}"""), ('$(foo["bar"]) $(foo["bar"])', """{"baz": "zab1"} {"baz": "zab1"}"""), ("$(foo.bar.baz) $(foo.bar.baz)", "zab1 zab1"), ("$(foo['bar'].baz) $(foo['bar'].baz)", "zab1 zab1"), ("$(foo['bar'][\"baz\"]) $(foo['bar'][\"baz\"])", "zab1 zab1"), ("$(foo.bar['baz']) $(foo.bar['baz'])", "zab1 zab1"), ("$(foo['b ar'].baz) $(foo['b ar'].baz)", "2 2"), ("$(foo['b\\'ar'].baz) $(foo['b\\'ar'].baz)", "true true"), ('$(foo["b\\\'ar"].baz) $(foo["b\\\'ar"].baz)', "true true"), ("$(foo['b\\\"ar'].baz) $(foo['b\\\"ar'].baz)", "null null"), ] @pytest.mark.parametrize("pattern,expected", interpolate_parameters) def test_expression_interpolate(pattern: str, expected: Any) -> None: assert expr.interpolate(pattern, interpolate_input) == expected parameter_to_expressions = [ ( "-$(foo)", r"""-{"bar":{"baz":"zab1"},"b ar":{"baz":2},"b'ar":{"baz":true},"b\"ar":{"baz":null}}""", ), ("-$(foo.bar)", """-{"baz":"zab1"}"""), ("-$(foo['bar'])", """-{"baz":"zab1"}"""), ('-$(foo["bar"])', """-{"baz":"zab1"}"""), ("-$(foo.bar.baz)", "-zab1"), ("-$(foo['bar'].baz)", "-zab1"), ("-$(foo['bar'][\"baz\"])", "-zab1"), ("-$(foo.bar['baz'])", "-zab1"), ("-$(foo['b ar'].baz)", "-2"), ("-$(foo['b\\'ar'].baz)", "-true"), ('-$(foo["b\\\'ar"].baz)', "-true"), ("-$(foo['b\\\"ar'].baz)", "-null"), ("$(foo.bar) $(foo.bar)", """{"baz":"zab1"} {"baz":"zab1"}"""), ("$(foo['bar']) $(foo['bar'])", """{"baz":"zab1"} {"baz":"zab1"}"""), ('$(foo["bar"]) $(foo["bar"])', """{"baz":"zab1"} {"baz":"zab1"}"""), ("$(foo.bar.baz) $(foo.bar.baz)", "zab1 zab1"), ("$(foo['bar'].baz) $(foo['bar'].baz)", "zab1 zab1"), ("$(foo['bar'][\"baz\"]) $(foo['bar'][\"baz\"])", "zab1 zab1"), ("$(foo.bar['baz']) $(foo.bar['baz'])", "zab1 zab1"), ("$(foo['b ar'].baz) $(foo['b ar'].baz)", "2 2"), ("$(foo['b\\'ar'].baz) $(foo['b\\'ar'].baz)", "true true"), ('$(foo["b\\\'ar"].baz) $(foo["b\\\'ar"].baz)', "true true"), ("$(foo['b\\\"ar'].baz) $(foo['b\\\"ar'].baz)", "null null"), ] @pytest.mark.parametrize("pattern,expected", parameter_to_expressions) def test_parameter_to_expression(pattern: str, expected: Any) -> None: """Test the interpolate convert_to_expression feature.""" expression = expr.interpolate(pattern, {}, convert_to_expression=True) assert isinstance(expression, str) assert ( expr.interpolate( expression, {}, jslib=expr.jshead([], interpolate_input), fullJS=True, debug=True, ) == expected ) param_to_expr_interpolate_escapebehavior = ( ("\\$(foo.bar.baz)", "$(foo.bar.baz)", 1), ("\\\\$(foo.bar.baz)", "\\zab1", 1), ("\\\\\\$(foo.bar.baz)", "\\$(foo.bar.baz)", 1), ("\\\\\\\\$(foo.bar.baz)", "\\\\zab1", 1), ("\\$foo", "$foo", 1), ("\\foo", "foo", 1), ("\\x", "x", 1), ("\\\\x", "\\x", 1), ("\\\\\\x", "\\x", 1), ("\\\\\\\\x", "\\\\x", 1), ("\\$(foo.bar.baz)", "$(foo.bar.baz)", 2), ("\\\\$(foo.bar.baz)", "\\zab1", 2), ("\\\\\\$(foo.bar.baz)", "\\$(foo.bar.baz)", 2), ("\\\\\\\\$(foo.bar.baz)", "\\\\zab1", 2), ("\\$foo", "\\$foo", 2), ("\\foo", "\\foo", 2), ("\\x", "\\x", 2), ("\\\\x", "\\x", 2), ("\\\\\\x", "\\\\x", 2), ("\\\\\\\\x", "\\\\x", 2), ) @pytest.mark.parametrize( "pattern,expected,behavior", param_to_expr_interpolate_escapebehavior ) def test_parameter_to_expression_interpolate_escapebehavior( pattern: str, expected: str, behavior: int ) -> None: """Test escaping behavior in an convert_to_expression context.""" expression = expr.interpolate( pattern, {}, escaping_behavior=behavior, convert_to_expression=True ) assert isinstance(expression, str) assert ( expr.interpolate( expression, {}, jslib=expr.jshead([], interpolate_input), fullJS=True, debug=True, ) == expected ) interpolate_bad_parameters = [ ("$(fooz)"), ("$(foo.barz)"), ("$(foo['barz'])"), ('$(foo["barz"])'), ("$(foo.bar.bazz)"), ("$(foo['bar'].bazz)"), ("$(foo['bar'][\"bazz\"])"), ("$(foo.bar['bazz'])"), ("$(foo['b\\'ar'].bazz)"), ('$(foo["b\'ar"].bazz)'), ("$(foo['b\\\"ar'].bazz)"), ("$(lst[O])"), # not "0" the number, but the letter O ("$(lst[2])"), ("$(lst.lengthz)"), ("$(lst['lengthz'])"), ("-$(foo.barz)"), ("-$(foo['barz'])"), ('-$(foo["barz"])'), ("-$(foo.bar.bazz)"), ("-$(foo['bar'].bazz)"), ("-$(foo['bar'][\"bazz\"])"), ("-$(foo.bar['bazz'])"), ("-$(foo['b ar'].bazz)"), ("-$(foo['b\\'ar'].bazz)"), ('-$(foo["b\\\'ar"].bazz)'), ("-$(foo['b\\\"ar'].bazz)"), ] @pytest.mark.parametrize("pattern", interpolate_bad_parameters) def test_expression_interpolate_failures(pattern: str) -> None: result = None with pytest.raises(JavascriptException): result = expr.interpolate(pattern, interpolate_input) interpolate_escapebehavior = ( ("\\$(foo.bar.baz)", "$(foo.bar.baz)", 1), ("\\\\$(foo.bar.baz)", "\\zab1", 1), ("\\\\\\$(foo.bar.baz)", "\\$(foo.bar.baz)", 1), ("\\\\\\\\$(foo.bar.baz)", "\\\\zab1", 1), ("\\$foo", "$foo", 1), ("\\foo", "foo", 1), ("\\x", "x", 1), ("\\\\x", "\\x", 1), ("\\\\\\x", "\\x", 1), ("\\\\\\\\x", "\\\\x", 1), ("\\$(foo.bar.baz)", "$(foo.bar.baz)", 2), ("\\\\$(foo.bar.baz)", "\\zab1", 2), ("\\\\\\$(foo.bar.baz)", "\\$(foo.bar.baz)", 2), ("\\\\\\\\$(foo.bar.baz)", "\\\\zab1", 2), ("\\$foo", "\\$foo", 2), ("\\foo", "\\foo", 2), ("\\x", "\\x", 2), ("\\\\x", "\\x", 2), ("\\\\\\x", "\\\\x", 2), ("\\\\\\\\x", "\\\\x", 2), ) @pytest.mark.parametrize("pattern,expected,behavior", interpolate_escapebehavior) def test_expression_interpolate_escapebehavior( pattern: str, expected: str, behavior: int ) -> None: """Test escaping behavior in an interpolation context.""" assert ( expr.interpolate(pattern, interpolate_input, escaping_behavior=behavior) == expected ) @windows_needs_docker def test_factory() -> None: factory = get_windows_safe_factory() echo = factory.make(get_data("tests/echo.cwl")) assert echo(inp="foo") == {"out": "foo\n"} def test_factory_bad_outputs() -> None: factory = cwltool.factory.Factory() with pytest.raises(ValidationException): factory.make(get_data("tests/echo_broken_outputs.cwl")) def test_factory_default_args() -> None: factory = cwltool.factory.Factory() assert factory.runtime_context.use_container is True assert factory.runtime_context.on_error == "stop" def test_factory_redefined_args() -> None: runtime_context = RuntimeContext() runtime_context.use_container = False runtime_context.on_error = "continue" factory = cwltool.factory.Factory(runtime_context=runtime_context) assert factory.runtime_context.use_container is False assert factory.runtime_context.on_error == "continue" def test_factory_partial_scatter() -> None: runtime_context = RuntimeContext() runtime_context.on_error = "continue" factory = cwltool.factory.Factory(runtime_context=runtime_context) with pytest.raises(cwltool.factory.WorkflowStatus) as err_info: factory.make(get_data("tests/wf/scatterfail.cwl"))() result = err_info.value.out assert isinstance(result, dict) assert ( result["out"][0]["checksum"] == "sha1$e5fa44f2b31c1fb553b6021e7360d07d5d91ff5e" ) assert result["out"][1] is None assert ( result["out"][2]["checksum"] == "sha1$a3db5c13ff90a36963278c6a39e4ee3c22e2a436" ) def test_factory_partial_output() -> None: runtime_context = RuntimeContext() runtime_context.on_error = "continue" factory = cwltool.factory.Factory(runtime_context=runtime_context) with pytest.raises(cwltool.factory.WorkflowStatus) as err_info: factory.make(get_data("tests/wf/wffail.cwl"))() result = err_info.value.out assert isinstance(result, dict) assert result["out1"]["checksum"] == "sha1$e5fa44f2b31c1fb553b6021e7360d07d5d91ff5e" assert result["out2"] is None def test_scandeps() -> None: obj: CWLObjectType = { "id": "file:///example/foo.cwl", "steps": [ { "id": "file:///example/foo.cwl#step1", "inputs": [ { "id": "file:///example/foo.cwl#input1", "default": { "class": "File", "location": "file:///example/data.txt", }, } ], "run": { "id": "file:///example/bar.cwl", "inputs": [ { "id": "file:///example/bar.cwl#input2", "default": { "class": "Directory", "location": "file:///example/data2", "listing": [ { "class": "File", "location": "file:///example/data3.txt", "secondaryFiles": [ { "class": "File", "location": "file:///example/data5.txt", } ], } ], }, }, { "id": "file:///example/bar.cwl#input3", "default": { "class": "Directory", "listing": [ { "class": "File", "location": "file:///example/data4.txt", } ], }, }, { "id": "file:///example/bar.cwl#input4", "default": {"class": "File", "contents": "file literal"}, }, ], }, } ], } def loadref(base: str, p: str) -> Union[CommentedMap, CommentedSeq, str, None]: if isinstance(p, dict): return p raise Exception("test case can't load things") scanned_deps = cast( List[Dict[str, Any]], cwltool.process.scandeps( cast(str, obj["id"]), obj, {"$import", "run"}, {"$include", "$schemas", "location"}, loadref, ), ) scanned_deps.sort(key=lambda k: cast(str, k["basename"])) expected_deps = [ { "basename": "bar.cwl", "nameroot": "bar", "class": "File", "format": CWL_IANA, "nameext": ".cwl", "location": "file:///example/bar.cwl", }, { "basename": "data.txt", "nameroot": "data", "class": "File", "nameext": ".txt", "location": "file:///example/data.txt", }, { "basename": "data2", "class": "Directory", "location": "file:///example/data2", "listing": [ { "basename": "data3.txt", "nameroot": "data3", "class": "File", "nameext": ".txt", "location": "file:///example/data3.txt", "secondaryFiles": [ { "class": "File", "basename": "data5.txt", "location": "file:///example/data5.txt", "nameext": ".txt", "nameroot": "data5", } ], } ], }, { "basename": "data4.txt", "nameroot": "data4", "class": "File", "nameext": ".txt", "location": "file:///example/data4.txt", }, ] assert scanned_deps == expected_deps scanned_deps2 = cast( List[Dict[str, Any]], cwltool.process.scandeps( cast(str, obj["id"]), obj, set( ("run"), ), set(), loadref, ), ) scanned_deps2.sort(key=lambda k: cast(str, k["basename"])) expected_deps = [ { "basename": "bar.cwl", "nameroot": "bar", "format": CWL_IANA, "class": "File", "nameext": ".cwl", "location": "file:///example/bar.cwl", } ] assert scanned_deps2 == expected_deps def test_trick_scandeps() -> None: stream = StringIO() main( ["--print-deps", "--debug", get_data("tests/wf/trick_defaults.cwl")], stdout=stream, ) assert json.loads(stream.getvalue())["secondaryFiles"][0]["location"][:2] != "_:" def test_scandeps_defaults_with_secondaryfiles() -> None: stream = StringIO() main( [ "--print-deps", "--relative-deps=cwd", "--debug", get_data("tests/wf/trick_defaults2.cwl"), ], stdout=stream, ) assert json.loads(stream.getvalue())["secondaryFiles"][0]["secondaryFiles"][0][ "location" ].endswith(os.path.join("tests", "wf", "indir1")) def test_input_deps() -> None: stream = StringIO() main( [ "--print-input-deps", get_data("tests/wf/count-lines1-wf.cwl"), get_data("tests/wf/wc-job.json"), ], stdout=stream, ) expected = { "class": "File", "location": "wc-job.json", "format": CWL_IANA, "secondaryFiles": [ { "class": "File", "location": "whale.txt", "basename": "whale.txt", "nameroot": "whale", "nameext": ".txt", } ], } assert json.loads(stream.getvalue()) == expected def test_input_deps_cmdline_opts() -> None: stream = StringIO() main( [ "--print-input-deps", get_data("tests/wf/count-lines1-wf.cwl"), "--file1", get_data("tests/wf/whale.txt"), ], stdout=stream, ) expected = { "class": "File", "location": "", "format": CWL_IANA, "secondaryFiles": [ { "class": "File", "location": "whale.txt", "basename": "whale.txt", "nameroot": "whale", "nameext": ".txt", } ], } assert json.loads(stream.getvalue()) == expected def test_input_deps_cmdline_opts_relative_deps_cwd() -> None: stream = StringIO() data_path = get_data("tests/wf/whale.txt") main( [ "--print-input-deps", "--relative-deps", "cwd", get_data("tests/wf/count-lines1-wf.cwl"), "--file1", data_path, ], stdout=stream, ) goal = { "class": "File", "location": "", "format": CWL_IANA, "secondaryFiles": [ { "class": "File", "location": str(Path(os.path.relpath(data_path, os.path.curdir))), "basename": "whale.txt", "nameroot": "whale", "nameext": ".txt", } ], } assert json.loads(stream.getvalue()) == goal def test_dedupe() -> None: not_deduped = [ {"class": "File", "location": "file:///example/a"}, {"class": "File", "location": "file:///example/a"}, {"class": "File", "location": "file:///example/d"}, { "class": "Directory", "location": "file:///example/c", "listing": [{"class": "File", "location": "file:///example/d"}], }, ] # type: List[CWLObjectType] expected = [ {"class": "File", "location": "file:///example/a"}, { "class": "Directory", "location": "file:///example/c", "listing": [{"class": "File", "location": "file:///example/d"}], }, ] assert dedup(not_deduped) == expected record = { "fields": [ { "type": {"items": "string", "type": "array"}, "name": "file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/wf-variantcall.cwl#vc_rec/vc_rec/description", }, { "type": {"items": "File", "type": "array"}, "name": "file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/wf-variantcall.cwl#vc_rec/vc_rec/vrn_file", }, ], "type": "record", "name": "file:///home/chapmanb/drive/work/cwl/test_bcbio_cwl/run_info-cwl-workflow/wf-variantcall.cwl#vc_rec/vc_rec", } source_to_sink = [ ( "0", {"items": ["string", "null"], "type": "array"}, {"items": ["string", "null"], "type": "array"}, True, ), ( "1", {"items": ["string"], "type": "array"}, {"items": ["string", "null"], "type": "array"}, True, ), ( "2", {"items": ["string", "null"], "type": "array"}, {"items": ["string"], "type": "array"}, True, ), ( "3", {"items": ["string"], "type": "array"}, {"items": ["int"], "type": "array"}, False, ), ("record 0", record, record, True), ("record 1", record, {"items": "string", "type": "array"}, False), ] @pytest.mark.parametrize("name, source, sink, expected", source_to_sink) def test_compare_types( name: str, source: Dict[str, Any], sink: Dict[str, Any], expected: bool ) -> None: assert can_assign_src_to_sink(source, sink) == expected, name source_to_sink_strict = [ ("0", ["string", "null"], ["string", "null"], True), ("1", ["string"], ["string", "null"], True), ("2", ["string", "int"], ["string", "null"], False), ( "3", {"items": ["string"], "type": "array"}, {"items": ["string", "null"], "type": "array"}, True, ), ( "4", {"items": ["string", "int"], "type": "array"}, {"items": ["string", "null"], "type": "array"}, False, ), ] @pytest.mark.parametrize("name, source, sink, expected", source_to_sink_strict) def test_compare_types_strict( name: str, source: Dict[str, Any], sink: Dict[str, Any], expected: bool ) -> None: assert can_assign_src_to_sink(source, sink, strict=True) == expected, name typechecks = [ (["string", "int"], ["string", "int", "null"], None, None, "pass"), (["string", "int"], ["string", "null"], None, None, "warning"), (["File", "int"], ["string", "null"], None, None, "exception"), ( {"items": ["string", "int"], "type": "array"}, {"items": ["string", "int", "null"], "type": "array"}, None, None, "pass", ), ( {"items": ["string", "int"], "type": "array"}, {"items": ["string", "null"], "type": "array"}, None, None, "warning", ), ( {"items": ["File", "int"], "type": "array"}, {"items": ["string", "null"], "type": "array"}, None, None, "exception", ), # check linkMerge when sinktype is not an array (["string", "int"], ["string", "int", "null"], "merge_nested", None, "exception"), # check linkMerge: merge_nested ( ["string", "int"], {"items": ["string", "int", "null"], "type": "array"}, "merge_nested", None, "pass", ), ( ["string", "int"], {"items": ["string", "null"], "type": "array"}, "merge_nested", None, "warning", ), ( ["File", "int"], {"items": ["string", "null"], "type": "array"}, "merge_nested", None, "exception", ), # check linkMerge: merge_nested and sinktype is "Any" (["string", "int"], "Any", "merge_nested", None, "pass"), # check linkMerge: merge_flattened ( ["string", "int"], {"items": ["string", "int", "null"], "type": "array"}, "merge_flattened", None, "pass", ), ( ["string", "int"], {"items": ["string", "null"], "type": "array"}, "merge_flattened", None, "warning", ), ( ["File", "int"], {"items": ["string", "null"], "type": "array"}, "merge_flattened", None, "exception", ), ( {"items": ["string", "int"], "type": "array"}, {"items": ["string", "int", "null"], "type": "array"}, "merge_flattened", None, "pass", ), ( {"items": ["string", "int"], "type": "array"}, {"items": ["string", "null"], "type": "array"}, "merge_flattened", None, "warning", ), ( {"items": ["File", "int"], "type": "array"}, {"items": ["string", "null"], "type": "array"}, "merge_flattened", None, "exception", ), # check linkMerge: merge_flattened and sinktype is "Any" (["string", "int"], "Any", "merge_flattened", None, "pass"), ( {"items": ["string", "int"], "type": "array"}, "Any", "merge_flattened", None, "pass", ), # check linkMerge: merge_flattened when srctype is a list ( [{"items": "string", "type": "array"}], {"items": "string", "type": "array"}, "merge_flattened", None, "pass", ), # check valueFrom ( {"items": ["File", "int"], "type": "array"}, {"items": ["string", "null"], "type": "array"}, "merge_flattened", "special value", "pass", ), ] @pytest.mark.parametrize( "src_type,sink_type,link_merge,value_from,expected_type", typechecks ) def test_typechecking( src_type: Any, sink_type: Any, link_merge: str, value_from: Any, expected_type: str ) -> None: assert ( cwltool.checker.check_types( src_type, sink_type, linkMerge=link_merge, valueFrom=value_from ) == expected_type ) def test_lifting() -> None: # check that lifting the types of the process outputs to the workflow step # fails if the step 'out' doesn't match. factory = cwltool.factory.Factory() with pytest.raises(ValidationException): echo = factory.make(get_data("tests/test_bad_outputs_wf.cwl")) assert echo(inp="foo") == {"out": "foo\n"} def test_malformed_outputs() -> None: # check that tool validation fails if one of the outputs is not a valid CWL type factory = cwltool.factory.Factory() with pytest.raises(ValidationException): factory.make(get_data("tests/wf/malformed_outputs.cwl"))() def test_separate_without_prefix() -> None: # check that setting 'separate = false' on an inputBinding without prefix fails the workflow factory = cwltool.factory.Factory() with pytest.raises(WorkflowException): factory.make(get_data("tests/wf/separate_without_prefix.cwl"))() def test_static_checker() -> None: # check that the static checker raises exception when a source type # mismatches its sink type. factory = cwltool.factory.Factory() with pytest.raises(ValidationException): factory.make(get_data("tests/checker_wf/broken-wf.cwl")) with pytest.raises(ValidationException): factory.make(get_data("tests/checker_wf/broken-wf2.cwl")) with pytest.raises(ValidationException): factory.make(get_data("tests/checker_wf/broken-wf3.cwl")) def test_var_spool_cwl_checker1() -> None: """Confirm that references to /var/spool/cwl are caught.""" stream = StringIO() streamhandler = logging.StreamHandler(stream) _logger = logging.getLogger("cwltool") _logger.addHandler(streamhandler) factory = cwltool.factory.Factory() try: factory.make(get_data("tests/non_portable.cwl")) assert ( "non_portable.cwl:18:4: Non-portable reference to /var/spool/cwl detected" in stream.getvalue() ) finally: _logger.removeHandler(streamhandler) def test_var_spool_cwl_checker2() -> None: """Confirm that references to /var/spool/cwl are caught.""" stream = StringIO() streamhandler = logging.StreamHandler(stream) _logger = logging.getLogger("cwltool") _logger.addHandler(streamhandler) factory = cwltool.factory.Factory() try: factory.make(get_data("tests/non_portable2.cwl")) assert ( "non_portable2.cwl:19:4: Non-portable reference to /var/spool/cwl detected" in stream.getvalue() ) finally: _logger.removeHandler(streamhandler) def test_var_spool_cwl_checker3() -> None: """Confirm that references to /var/spool/cwl are caught.""" stream = StringIO() streamhandler = logging.StreamHandler(stream) _logger = logging.getLogger("cwltool") _logger.addHandler(streamhandler) factory = cwltool.factory.Factory() try: factory.make(get_data("tests/portable.cwl")) assert ( "Non-portable reference to /var/spool/cwl detected" not in stream.getvalue() ) finally: _logger.removeHandler(streamhandler) def test_print_dot() -> None: # print Workflow cwl_path = get_data("tests/wf/revsort.cwl") cwl_posix_path = Path(cwl_path).as_posix() expected_dot = pydot.graph_from_dot_data( """ digraph {{ graph [bgcolor="#eeeeee", clusterrank=local, labeljust=right, labelloc=bottom ]; subgraph cluster_inputs {{ graph [label="Workflow Inputs", rank=same, style=dashed ]; "file://{cwl_posix_path}#workflow_input" [fillcolor="#94DDF4", label=workflow_input, style=filled]; "file://{cwl_posix_path}#reverse_sort" [fillcolor="#94DDF4", label=reverse_sort, style=filled]; }} subgraph cluster_outputs {{ graph [label="Workflow Outputs", labelloc=b, rank=same, style=dashed ]; "file://{cwl_posix_path}#sorted_output" [fillcolor="#94DDF4", label=sorted_output, style=filled]; }} "file://{cwl_posix_path}#rev" [fillcolor=lightgoldenrodyellow, label=rev, style=filled]; "file://{cwl_posix_path}#sorted" [fillcolor=lightgoldenrodyellow, label=sorted, style=filled]; "file://{cwl_posix_path}#rev" -> "file://{cwl_posix_path}#sorted"; "file://{cwl_posix_path}#sorted" -> "file://{cwl_posix_path}#sorted_output"; "file://{cwl_posix_path}#workflow_input" -> "file://{cwl_posix_path}#rev"; "file://{cwl_posix_path}#reverse_sort" -> "file://{cwl_posix_path}#sorted"; }} """.format( cwl_posix_path=cwl_posix_path ) )[0] stdout = StringIO() assert main(["--print-dot", cwl_path], stdout=stdout) == 0 computed_dot = pydot.graph_from_dot_data(stdout.getvalue())[0] computed_edges = sorted( [ (urlparse(source).fragment, urlparse(target).fragment) for source, target in computed_dot.obj_dict["edges"] ] ) expected_edges = sorted( [ (urlparse(source).fragment, urlparse(target).fragment) for source, target in expected_dot.obj_dict["edges"] ] ) assert computed_edges == expected_edges # print CommandLineTool cwl_path = get_data("tests/wf/echo.cwl") stdout = StringIO() assert main(["--debug", "--print-dot", cwl_path], stdout=stdout) == 1 test_factors = [(""), ("--parallel"), ("--debug"), ("--parallel --debug")] @pytest.mark.parametrize("factor", test_factors) def test_js_console_cmd_line_tool(factor: str) -> None: for test_file in ("js_output.cwl", "js_output_workflow.cwl"): commands = factor.split() commands.extend( ["--js-console", "--no-container", get_data("tests/wf/" + test_file)] ) error_code, _, stderr = get_main_output(commands) assert "[log] Log message" in stderr assert "[err] Error message" in stderr assert error_code == 0, stderr @pytest.mark.parametrize("factor", test_factors) def test_no_js_console(factor: str) -> None: for test_file in ("js_output.cwl", "js_output_workflow.cwl"): commands = factor.split() commands.extend(["--no-container", get_data("tests/wf/" + test_file)]) _, _, stderr = get_main_output(commands) assert "[log] Log message" not in stderr assert "[err] Error message" not in stderr @needs_docker @pytest.mark.parametrize("factor", test_factors) def test_cid_file_dir(tmp_path: Path, factor: str) -> None: """Test --cidfile-dir option works.""" test_file = "cache_test_workflow.cwl" cwd = Path.cwd() os.chdir(tmp_path) commands = factor.split() commands.extend(["--cidfile-dir", str(tmp_path), get_data("tests/wf/" + test_file)]) error_code, stdout, stderr = get_main_output(commands) assert "completed success" in stderr assert error_code == 0 cidfiles_count = sum(1 for _ in tmp_path.glob("**/*")) assert cidfiles_count == 2 os.chdir(cwd) @needs_docker @pytest.mark.parametrize("factor", test_factors) def test_cid_file_dir_arg_is_file_instead_of_dir(tmp_path: Path, factor: str) -> None: """Test --cidfile-dir with a file produces the correct error.""" test_file = "cache_test_workflow.cwl" bad_cidfile_dir = tmp_path / "cidfile-dir-actually-a-file" bad_cidfile_dir.touch() commands = factor.split() commands.extend( ["--cidfile-dir", str(bad_cidfile_dir), get_data("tests/wf/" + test_file)] ) error_code, _, stderr = get_main_output(commands) assert "is not a directory, please check it first" in stderr, stderr assert error_code == 2 or error_code == 1, stderr @needs_docker @pytest.mark.parametrize("factor", test_factors) def test_cid_file_non_existing_dir(tmp_path: Path, factor: str) -> None: """Test that --cachedir with a bad path should produce a specific error.""" test_file = "cache_test_workflow.cwl" bad_cidfile_dir = tmp_path / "cidfile-dir-badpath" commands = factor.split() commands.extend( [ "--record-container-id", "--cidfile-dir", str(bad_cidfile_dir), get_data("tests/wf/" + test_file), ] ) error_code, _, stderr = get_main_output(commands) assert "directory doesn't exist, please create it first" in stderr, stderr assert error_code == 2 or error_code == 1, stderr @needs_docker @pytest.mark.parametrize("factor", test_factors) def test_cid_file_w_prefix(tmp_path: Path, factor: str) -> None: """Test that --cidfile-prefix works.""" test_file = "cache_test_workflow.cwl" cwd = Path.cwd() os.chdir(tmp_path) try: commands = factor.split() commands.extend( [ "--record-container-id", "--cidfile-prefix=pytestcid", get_data("tests/wf/" + test_file), ] ) error_code, stdout, stderr = get_main_output(commands) finally: listing = tmp_path.iterdir() os.chdir(cwd) cidfiles_count = sum(1 for _ in tmp_path.glob("**/pytestcid*")) assert "completed success" in stderr assert error_code == 0 assert cidfiles_count == 2, "{}/n{}".format(list(listing), stderr) @needs_docker @pytest.mark.parametrize("factor", test_factors) def test_secondary_files_v1_1(factor: str) -> None: test_file = "secondary-files.cwl" test_job_file = "secondary-files-job.yml" try: old_umask = os.umask(stat.S_IWOTH) # test run with umask 002 commands = factor.split() commands.extend( [ "--enable-dev", get_data(os.path.join("tests", test_file)), get_data(os.path.join("tests", test_job_file)), ] ) error_code, _, stderr = get_main_output(commands) finally: # 664 in octal, '-rw-rw-r--' assert stat.S_IMODE(os.stat("lsout").st_mode) == 436 os.umask(old_umask) # revert back to original umask assert "completed success" in stderr assert error_code == 0 @needs_docker @pytest.mark.parametrize("factor", test_factors) def test_secondary_files_v1_0(factor: str) -> None: test_file = "secondary-files-string-v1.cwl" test_job_file = "secondary-files-job.yml" try: old_umask = os.umask(stat.S_IWOTH) # test run with umask 002 commands = factor.split() commands.extend( [ get_data(os.path.join("tests", test_file)), get_data(os.path.join("tests", test_job_file)), ] ) error_code, _, stderr = get_main_output(commands) finally: # 664 in octal, '-rw-rw-r--' assert stat.S_IMODE(os.stat("lsout").st_mode) == 436 os.umask(old_umask) # revert back to original umask assert "completed success" in stderr assert error_code == 0 @needs_docker @pytest.mark.parametrize("factor", test_factors) def test_wf_without_container(tmp_path: Path, factor: str) -> None: """Confirm that we can run a workflow without a container.""" test_file = "hello-workflow.cwl" cache_dir = str(tmp_path / "cwltool_cache") commands = factor.split() commands.extend( [ "--cachedir", cache_dir, "--outdir", str(tmp_path / "outdir"), get_data("tests/wf/" + test_file), "--usermessage", "hello", ] ) error_code, _, stderr = get_main_output(commands) assert "completed success" in stderr assert error_code == 0 @needs_docker @pytest.mark.parametrize("factor", test_factors) def test_issue_740_fixed(tmp_path: Path, factor: str) -> None: """Confirm that re-running a particular workflow with caching suceeds.""" test_file = "cache_test_workflow.cwl" cache_dir = str(tmp_path / "cwltool_cache") commands = factor.split() commands.extend(["--cachedir", cache_dir, get_data("tests/wf/" + test_file)]) error_code, _, stderr = get_main_output(commands) assert "completed success" in stderr assert error_code == 0 commands = factor.split() commands.extend(["--cachedir", cache_dir, get_data("tests/wf/" + test_file)]) error_code, _, stderr = get_main_output(commands) assert "Output of job will be cached in" not in stderr assert error_code == 0, stderr @needs_docker def test_compute_checksum() -> None: runtime_context = RuntimeContext() runtime_context.compute_checksum = True runtime_context.use_container = onWindows() factory = cwltool.factory.Factory(runtime_context=runtime_context) echo = factory.make(get_data("tests/wf/cat-tool.cwl")) output = echo( file1={"class": "File", "location": get_data("tests/wf/whale.txt")}, reverse=False, ) assert isinstance(output, dict) result = output["output"] assert isinstance(result, dict) assert result["checksum"] == "sha1$327fc7aedf4f6b69a42a7c8b808dc5a7aff61376" @needs_docker @pytest.mark.parametrize("factor", test_factors) def test_no_compute_chcksum(tmp_path: Path, factor: str) -> None: test_file = "tests/wf/wc-tool.cwl" job_file = "tests/wf/wc-job.json" commands = factor.split() commands.extend( [ "--no-compute-checksum", "--outdir", str(tmp_path), get_data(test_file), get_data(job_file), ] ) error_code, stdout, stderr = get_main_output(commands) assert "completed success" in stderr assert error_code == 0 assert "checksum" not in stdout @pytest.mark.skipif(onWindows(), reason="udocker is Linux/macOS only") @pytest.mark.parametrize("factor", test_factors) def test_bad_userspace_runtime(factor: str) -> None: test_file = "tests/wf/wc-tool.cwl" job_file = "tests/wf/wc-job.json" commands = factor.split() commands.extend( [ "--user-space-docker-cmd=quaquioN", "--default-container=debian", get_data(test_file), get_data(job_file), ] ) error_code, stdout, stderr = get_main_output(commands) assert "or quaquioN is missing or broken" in stderr, stderr assert error_code == 1 @windows_needs_docker @pytest.mark.parametrize("factor", test_factors) def test_bad_basecommand(factor: str) -> None: test_file = "tests/wf/missing-tool.cwl" commands = factor.split() commands.extend([get_data(test_file)]) error_code, stdout, stderr = get_main_output(commands) assert "'neenooGo' not found" in stderr, stderr assert error_code == 1 @needs_docker @pytest.mark.parametrize("factor", test_factors) def test_bad_basecommand_docker(factor: str) -> None: test_file = "tests/wf/missing-tool.cwl" commands = factor.split() commands.extend(["--debug", "--default-container", "debian", get_data(test_file)]) error_code, stdout, stderr = get_main_output(commands) assert "permanentFail" in stderr, stderr assert error_code == 1 @pytest.mark.parametrize("factor", test_factors) def test_v1_0_position_expression(factor: str) -> None: test_file = "tests/echo-position-expr.cwl" test_job = "tests/echo-position-expr-job.yml" commands = factor.split() commands.extend(["--debug", get_data(test_file), get_data(test_job)]) error_code, stdout, stderr = get_main_output(commands) assert "is not int" in stderr, stderr assert error_code == 1 @windows_needs_docker @pytest.mark.parametrize("factor", test_factors) def test_optional_numeric_output_0(factor: str) -> None: test_file = "tests/wf/optional-numerical-output-0.cwl" commands = factor.split() commands.extend([get_data(test_file)]) error_code, stdout, stderr = get_main_output(commands) assert "completed success" in stderr assert error_code == 0 assert json.loads(stdout)["out"] == 0 @pytest.mark.parametrize("factor", test_factors) @windows_needs_docker def test_env_filtering(factor: str) -> None: test_file = "tests/env.cwl" commands = factor.split() commands.extend([get_data(test_file)]) error_code, stdout, stderr = get_main_output(commands) process = subprocess.Popen( [ "sh", "-c", r"""getTrueShellExeName() { local trueExe nextTarget 2>/dev/null trueExe=$(ps -o comm= $$) || return 1 [ "${trueExe#-}" = "$trueExe" ] || trueExe=${trueExe#-} [ "${trueExe#/}" != "$trueExe" ] || trueExe=$([ -n "$ZSH_VERSION" ] && which -p "$trueExe" || which "$trueExe") while nextTarget=$(readlink "$trueExe"); do trueExe=$nextTarget; done printf '%s\n' "$(basename "$trueExe")" } ; getTrueShellExeName""", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=None, ) sh_name_b, sh_name_err = process.communicate() assert sh_name_b sh_name = sh_name_b.decode("utf-8").strip() assert "completed success" in stderr, (error_code, stdout, stderr) assert error_code == 0, (error_code, stdout, stderr) if onWindows(): target = 5 elif sh_name == "dash": target = 4 else: # bash adds "SHLVL" and "_" environment variables target = 6 result = json.loads(stdout)["env_count"] details = "" if result != target: _, details, _ = get_main_output(["--quiet", get_data("tests/env2.cwl")]) print(sh_name) print(sh_name_err) print(details) assert result == target, (error_code, sh_name, sh_name_err, details, stdout, stderr) @windows_needs_docker def test_v1_0_arg_empty_prefix_separate_false() -> None: test_file = "tests/arg-empty-prefix-separate-false.cwl" error_code, stdout, stderr = get_main_output( ["--debug", get_data(test_file), "--echo"] ) assert "completed success" in stderr assert error_code == 0 def test_scatter_output_filenames(tmp_path: Path) -> None: """If a scatter step produces identically named output then confirm that the final output is renamed correctly.""" cwd = Path.cwd() os.chdir(tmp_path) rtc = RuntimeContext() rtc.outdir = str(cwd) factory = cwltool.factory.Factory(runtime_context=rtc) output_names = ["output.txt", "output.txt_2", "output.txt_3"] scatter_workflow = factory.make(get_data("tests/scatter_numbers.cwl")) result = scatter_workflow(range=3) assert isinstance(result, dict) assert "output" in result locations = sorted([element["location"] for element in result["output"]]) assert ( locations[0].endswith("output.txt") and locations[1].endswith("output.txt_2") and locations[2].endswith("output.txt_3") ), f"Locations {locations} do not end with {output_names}"