comparison env/lib/python3.9/site-packages/cwltool/tests/test_load_tool.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 from pathlib import Path
2
3 import pytest
4
5 from cwltool.context import LoadingContext, RuntimeContext
6 from cwltool.errors import WorkflowException
7 from cwltool.load_tool import load_tool
8 from cwltool.process import use_custom_schema, use_standard_schema
9 from cwltool.update import INTERNAL_VERSION
10 from cwltool.utils import CWLObjectType
11
12 from .test_fetch import norm
13 from .util import get_data, windows_needs_docker
14
15
16 @windows_needs_docker
17 def test_check_version() -> None:
18 """
19 It is permitted to load without updating, but not execute.
20
21 Attempting to execute without updating to the internal version should raise an error.
22 """
23 joborder = {"inp": "abc"} # type: CWLObjectType
24 loadingContext = LoadingContext({"do_update": True})
25 tool = load_tool(get_data("tests/echo.cwl"), loadingContext)
26 for _ in tool.job(joborder, None, RuntimeContext()):
27 pass
28
29 loadingContext = LoadingContext({"do_update": False})
30 tool = load_tool(get_data("tests/echo.cwl"), loadingContext)
31 with pytest.raises(WorkflowException):
32 for _ in tool.job(joborder, None, RuntimeContext()):
33 pass
34
35
36 def test_use_metadata() -> None:
37 """Use the version from loadingContext.metadata if cwlVersion isn't present in the document."""
38 loadingContext = LoadingContext({"do_update": False})
39 tool = load_tool(get_data("tests/echo.cwl"), loadingContext)
40
41 loadingContext = LoadingContext()
42 loadingContext.metadata = tool.metadata
43 tooldata = tool.tool.copy()
44 del tooldata["cwlVersion"]
45 load_tool(tooldata, loadingContext)
46
47
48 def test_checklink_outputSource() -> None:
49 """Is outputSource resolved correctly independent of value of do_validate."""
50 outsrc = (
51 norm(Path(get_data("tests/wf/1st-workflow.cwl")).as_uri())
52 + "#argument/classfile"
53 )
54
55 loadingContext = LoadingContext({"do_validate": True})
56 tool = load_tool(get_data("tests/wf/1st-workflow.cwl"), loadingContext)
57 assert norm(tool.tool["outputs"][0]["outputSource"]) == outsrc
58
59 loadingContext = LoadingContext({"do_validate": False})
60 tool = load_tool(get_data("tests/wf/1st-workflow.cwl"), loadingContext)
61 assert norm(tool.tool["outputs"][0]["outputSource"]) == outsrc
62
63
64 def test_load_graph_fragment() -> None:
65 """Reloading from a dictionary without a cwlVersion."""
66 loadingContext = LoadingContext()
67 uri = Path(get_data("tests/wf/scatter-wf4.cwl")).as_uri() + "#main"
68 tool = load_tool(uri, loadingContext)
69
70 loader = tool.doc_loader
71 assert loader
72 rs, metadata = loader.resolve_ref(uri)
73 # Reload from a dict (in 'rs'), not a URI. The dict is a fragment
74 # of original document and doesn't have cwlVersion set, so test
75 # that it correctly looks up the root document to get the
76 # cwlVersion.
77 assert isinstance(rs, str) or isinstance(rs, dict)
78 tool = load_tool(rs, loadingContext)
79 assert tool.metadata["cwlVersion"] == INTERNAL_VERSION
80
81
82 def test_load_graph_fragment_from_packed() -> None:
83 """Loading a fragment from packed with update."""
84 loadingContext = LoadingContext()
85 uri = Path(get_data("tests/wf/packed-with-loadlisting.cwl")).as_uri() + "#main"
86 try:
87 with open(get_data("cwltool/extensions.yml")) as res:
88 use_custom_schema("v1.0", "http://commonwl.org/cwltool", res.read())
89
90 # The updater transforms LoadListingRequirement from an
91 # extension (in v1.0) to a core feature (in v1.1) but there
92 # was a bug when loading a packed workflow and loading a
93 # specific fragment it would get the un-updated document.
94 # This recreates that case and asserts that we are using the
95 # updated document like we should.
96
97 tool = load_tool(uri, loadingContext)
98
99 assert tool.tool["requirements"] == [
100 {"class": "LoadListingRequirement", "loadListing": "no_listing"}
101 ]
102 finally:
103 use_standard_schema("v1.0")