comparison env/lib/python3.9/site-packages/cwltool/tests/test_pack.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import json
2 import os
3 import tempfile
4 from collections.abc import Sized
5 from functools import partial
6 from io import StringIO
7 from pathlib import Path
8 from typing import Dict
9
10 import pytest
11 from ruamel import yaml
12
13 import cwltool.pack
14 import cwltool.workflow
15 from cwltool.context import LoadingContext
16 from cwltool.load_tool import fetch_document, resolve_and_validate_document
17 from cwltool.main import main, make_relative, print_pack
18 from cwltool.resolver import tool_resolver
19 from cwltool.utils import adjustDirObjs, adjustFileObjs
20
21 from .util import get_data, needs_docker
22
23
24 def test_pack() -> None:
25 loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/revsort.cwl"))
26
27 with open(get_data("tests/wf/expect_packed.cwl")) as packed_file:
28 expect_packed = yaml.main.safe_load(packed_file)
29
30 packed = cwltool.pack.pack(loadingContext, uri)
31 adjustFileObjs(
32 packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))
33 )
34 adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))))
35
36 assert "$schemas" in packed
37 packed_schemas = packed["$schemas"]
38 assert isinstance(packed_schemas, Sized)
39 assert len(packed_schemas) == len(expect_packed["$schemas"])
40 del packed["$schemas"]
41 del expect_packed["$schemas"]
42
43 assert packed == expect_packed
44
45
46 def test_pack_input_named_name() -> None:
47 loadingContext, workflowobj, uri = fetch_document(
48 get_data("tests/wf/trick_revsort.cwl")
49 )
50 loadingContext.do_update = False
51 loadingContext, uri = resolve_and_validate_document(
52 loadingContext, workflowobj, uri
53 )
54 loader = loadingContext.loader
55 assert loader
56 loader.resolve_ref(uri)[0]
57
58 with open(get_data("tests/wf/expect_trick_packed.cwl")) as packed_file:
59 expect_packed = yaml.main.round_trip_load(packed_file)
60
61 packed = cwltool.pack.pack(loadingContext, uri)
62 adjustFileObjs(
63 packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))
64 )
65 adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))))
66
67 assert "$schemas" in packed
68 packed_schemas = packed["$schemas"]
69 assert isinstance(packed_schemas, Sized)
70 assert len(packed_schemas) == len(expect_packed["$schemas"])
71 del packed["$schemas"]
72 del expect_packed["$schemas"]
73
74 assert packed == expect_packed
75
76
77 def test_pack_single_tool() -> None:
78 loadingContext, workflowobj, uri = fetch_document(
79 get_data("tests/wf/formattest.cwl")
80 )
81 loadingContext.do_update = False
82 loadingContext, uri = resolve_and_validate_document(
83 loadingContext, workflowobj, uri
84 )
85 loader = loadingContext.loader
86 assert loader
87 loader.resolve_ref(uri)[0]
88
89 packed = cwltool.pack.pack(loadingContext, uri)
90 assert "$schemas" in packed
91
92
93 def test_pack_fragment() -> None:
94 with open(get_data("tests/wf/scatter2_subwf.cwl")) as packed_file:
95 expect_packed = yaml.main.safe_load(packed_file)
96
97 loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/scatter2.cwl"))
98 packed = cwltool.pack.pack(loadingContext, uri + "#scatterstep/mysub")
99 adjustFileObjs(
100 packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))
101 )
102 adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))))
103
104 assert json.dumps(packed, sort_keys=True, indent=2) == json.dumps(
105 expect_packed, sort_keys=True, indent=2
106 )
107
108
109 def test_pack_rewrites() -> None:
110 rewrites = {} # type: Dict[str, str]
111
112 loadingContext, workflowobj, uri = fetch_document(
113 get_data("tests/wf/default-wf5.cwl")
114 )
115 loadingContext.do_update = False
116 loadingContext, uri = resolve_and_validate_document(
117 loadingContext, workflowobj, uri
118 )
119 loader = loadingContext.loader
120 assert loader
121 loader.resolve_ref(uri)[0]
122
123 cwltool.pack.pack(
124 loadingContext,
125 uri,
126 rewrite_out=rewrites,
127 )
128
129 assert len(rewrites) == 6
130
131
132 cwl_missing_version_paths = [
133 "tests/wf/hello_single_tool.cwl",
134 "tests/wf/hello-workflow.cwl",
135 ]
136
137
138 @pytest.mark.parametrize("cwl_path", cwl_missing_version_paths)
139 def test_pack_missing_cwlVersion(cwl_path: str) -> None:
140 """Ensure the generated pack output is not missing the `cwlVersion` in case of single tool workflow and single step workflow."""
141 # Testing single tool workflow
142 loadingContext, workflowobj, uri = fetch_document(get_data(cwl_path))
143 loadingContext.do_update = False
144 loadingContext, uri = resolve_and_validate_document(
145 loadingContext, workflowobj, uri
146 )
147 loader = loadingContext.loader
148 assert loader
149 loader.resolve_ref(uri)[0]
150
151 # generate pack output dict
152 packed = json.loads(print_pack(loadingContext, uri))
153
154 assert packed["cwlVersion"] == "v1.0"
155
156
157 def test_pack_idempotence_tool(tmp_path: Path) -> None:
158 """Ensure that pack produces exactly the same document for an already packed CommandLineTool."""
159 _pack_idempotently("tests/wf/hello_single_tool.cwl", tmp_path)
160
161
162 def test_pack_idempotence_workflow(tmp_path: Path) -> None:
163 """Ensure that pack produces exactly the same document for an already packed workflow."""
164 _pack_idempotently("tests/wf/count-lines1-wf.cwl", tmp_path)
165
166
167 def _pack_idempotently(document: str, tmp_path: Path) -> None:
168 loadingContext, workflowobj, uri = fetch_document(get_data(document))
169 loadingContext.do_update = False
170 loadingContext, uri = resolve_and_validate_document(
171 loadingContext, workflowobj, uri
172 )
173 loader = loadingContext.loader
174 assert loader
175 loader.resolve_ref(uri)[0]
176
177 # generate pack output dict
178 packed_text = print_pack(loadingContext, uri)
179 packed = json.loads(packed_text)
180
181 tmp_name = tmp_path / "packed.cwl"
182 tmp = tmp_name.open(mode="w")
183 tmp.write(packed_text)
184 tmp.flush()
185 tmp.close()
186
187 loadingContext, workflowobj, uri2 = fetch_document(tmp.name)
188 loadingContext.do_update = False
189 loadingContext, uri2 = resolve_and_validate_document(
190 loadingContext, workflowobj, uri2
191 )
192 loader2 = loadingContext.loader
193 assert loader2
194 loader2.resolve_ref(uri2)[0]
195
196 # generate pack output dict
197 packed_text = print_pack(loadingContext, uri2)
198 double_packed = json.loads(packed_text)
199
200 assert uri != uri2
201 assert packed == double_packed
202
203
204 cwl_to_run = [
205 ("tests/wf/count-lines1-wf.cwl", "tests/wf/wc-job.json", False),
206 ("tests/wf/formattest.cwl", "tests/wf/formattest-job.json", True),
207 ]
208
209
210 @needs_docker
211 @pytest.mark.parametrize("wf_path,job_path,namespaced", cwl_to_run)
212 def test_packed_workflow_execution(
213 wf_path: str, job_path: str, namespaced: bool, tmp_path: Path
214 ) -> None:
215 loadingContext = LoadingContext()
216 loadingContext.resolver = tool_resolver
217 loadingContext, workflowobj, uri = fetch_document(get_data(wf_path), loadingContext)
218 loadingContext.do_update = False
219 loadingContext, uri = resolve_and_validate_document(
220 loadingContext, workflowobj, uri
221 )
222 loader = loadingContext.loader
223 assert loader
224 loader.resolve_ref(uri)[0]
225 packed = json.loads(print_pack(loadingContext, uri))
226
227 assert not namespaced or "$namespaces" in packed
228
229 wf_packed_handle, wf_packed_path = tempfile.mkstemp()
230 with open(wf_packed_path, "w") as temp_file:
231 json.dump(packed, temp_file)
232
233 normal_output = StringIO()
234 packed_output = StringIO()
235
236 normal_params = ["--outdir", str(tmp_path), get_data(wf_path), get_data(job_path)]
237 packed_params = [
238 "--outdir",
239 str(tmp_path),
240 "--debug",
241 wf_packed_path,
242 get_data(job_path),
243 ]
244
245 assert main(normal_params, stdout=normal_output) == 0
246 assert main(packed_params, stdout=packed_output) == 0
247
248 assert json.loads(packed_output.getvalue()) == json.loads(normal_output.getvalue())
249
250 os.close(wf_packed_handle)
251 os.remove(wf_packed_path)