Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/tests/test_pack.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import json | |
2 import os | |
3 import tempfile | |
4 from collections.abc import Sized | |
5 from functools import partial | |
6 from io import StringIO | |
7 from pathlib import Path | |
8 from typing import Dict | |
9 | |
10 import pytest | |
11 from ruamel import yaml | |
12 | |
13 import cwltool.pack | |
14 import cwltool.workflow | |
15 from cwltool.context import LoadingContext | |
16 from cwltool.load_tool import fetch_document, resolve_and_validate_document | |
17 from cwltool.main import main, make_relative, print_pack | |
18 from cwltool.resolver import tool_resolver | |
19 from cwltool.utils import adjustDirObjs, adjustFileObjs | |
20 | |
21 from .util import get_data, needs_docker | |
22 | |
23 | |
24 def test_pack() -> None: | |
25 loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/revsort.cwl")) | |
26 | |
27 with open(get_data("tests/wf/expect_packed.cwl")) as packed_file: | |
28 expect_packed = yaml.main.safe_load(packed_file) | |
29 | |
30 packed = cwltool.pack.pack(loadingContext, uri) | |
31 adjustFileObjs( | |
32 packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))) | |
33 ) | |
34 adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))) | |
35 | |
36 assert "$schemas" in packed | |
37 packed_schemas = packed["$schemas"] | |
38 assert isinstance(packed_schemas, Sized) | |
39 assert len(packed_schemas) == len(expect_packed["$schemas"]) | |
40 del packed["$schemas"] | |
41 del expect_packed["$schemas"] | |
42 | |
43 assert packed == expect_packed | |
44 | |
45 | |
46 def test_pack_input_named_name() -> None: | |
47 loadingContext, workflowobj, uri = fetch_document( | |
48 get_data("tests/wf/trick_revsort.cwl") | |
49 ) | |
50 loadingContext.do_update = False | |
51 loadingContext, uri = resolve_and_validate_document( | |
52 loadingContext, workflowobj, uri | |
53 ) | |
54 loader = loadingContext.loader | |
55 assert loader | |
56 loader.resolve_ref(uri)[0] | |
57 | |
58 with open(get_data("tests/wf/expect_trick_packed.cwl")) as packed_file: | |
59 expect_packed = yaml.main.round_trip_load(packed_file) | |
60 | |
61 packed = cwltool.pack.pack(loadingContext, uri) | |
62 adjustFileObjs( | |
63 packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))) | |
64 ) | |
65 adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))) | |
66 | |
67 assert "$schemas" in packed | |
68 packed_schemas = packed["$schemas"] | |
69 assert isinstance(packed_schemas, Sized) | |
70 assert len(packed_schemas) == len(expect_packed["$schemas"]) | |
71 del packed["$schemas"] | |
72 del expect_packed["$schemas"] | |
73 | |
74 assert packed == expect_packed | |
75 | |
76 | |
77 def test_pack_single_tool() -> None: | |
78 loadingContext, workflowobj, uri = fetch_document( | |
79 get_data("tests/wf/formattest.cwl") | |
80 ) | |
81 loadingContext.do_update = False | |
82 loadingContext, uri = resolve_and_validate_document( | |
83 loadingContext, workflowobj, uri | |
84 ) | |
85 loader = loadingContext.loader | |
86 assert loader | |
87 loader.resolve_ref(uri)[0] | |
88 | |
89 packed = cwltool.pack.pack(loadingContext, uri) | |
90 assert "$schemas" in packed | |
91 | |
92 | |
93 def test_pack_fragment() -> None: | |
94 with open(get_data("tests/wf/scatter2_subwf.cwl")) as packed_file: | |
95 expect_packed = yaml.main.safe_load(packed_file) | |
96 | |
97 loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/scatter2.cwl")) | |
98 packed = cwltool.pack.pack(loadingContext, uri + "#scatterstep/mysub") | |
99 adjustFileObjs( | |
100 packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))) | |
101 ) | |
102 adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))) | |
103 | |
104 assert json.dumps(packed, sort_keys=True, indent=2) == json.dumps( | |
105 expect_packed, sort_keys=True, indent=2 | |
106 ) | |
107 | |
108 | |
109 def test_pack_rewrites() -> None: | |
110 rewrites = {} # type: Dict[str, str] | |
111 | |
112 loadingContext, workflowobj, uri = fetch_document( | |
113 get_data("tests/wf/default-wf5.cwl") | |
114 ) | |
115 loadingContext.do_update = False | |
116 loadingContext, uri = resolve_and_validate_document( | |
117 loadingContext, workflowobj, uri | |
118 ) | |
119 loader = loadingContext.loader | |
120 assert loader | |
121 loader.resolve_ref(uri)[0] | |
122 | |
123 cwltool.pack.pack( | |
124 loadingContext, | |
125 uri, | |
126 rewrite_out=rewrites, | |
127 ) | |
128 | |
129 assert len(rewrites) == 6 | |
130 | |
131 | |
132 cwl_missing_version_paths = [ | |
133 "tests/wf/hello_single_tool.cwl", | |
134 "tests/wf/hello-workflow.cwl", | |
135 ] | |
136 | |
137 | |
138 @pytest.mark.parametrize("cwl_path", cwl_missing_version_paths) | |
139 def test_pack_missing_cwlVersion(cwl_path: str) -> None: | |
140 """Ensure the generated pack output is not missing the `cwlVersion` in case of single tool workflow and single step workflow.""" | |
141 # Testing single tool workflow | |
142 loadingContext, workflowobj, uri = fetch_document(get_data(cwl_path)) | |
143 loadingContext.do_update = False | |
144 loadingContext, uri = resolve_and_validate_document( | |
145 loadingContext, workflowobj, uri | |
146 ) | |
147 loader = loadingContext.loader | |
148 assert loader | |
149 loader.resolve_ref(uri)[0] | |
150 | |
151 # generate pack output dict | |
152 packed = json.loads(print_pack(loadingContext, uri)) | |
153 | |
154 assert packed["cwlVersion"] == "v1.0" | |
155 | |
156 | |
157 def test_pack_idempotence_tool(tmp_path: Path) -> None: | |
158 """Ensure that pack produces exactly the same document for an already packed CommandLineTool.""" | |
159 _pack_idempotently("tests/wf/hello_single_tool.cwl", tmp_path) | |
160 | |
161 | |
162 def test_pack_idempotence_workflow(tmp_path: Path) -> None: | |
163 """Ensure that pack produces exactly the same document for an already packed workflow.""" | |
164 _pack_idempotently("tests/wf/count-lines1-wf.cwl", tmp_path) | |
165 | |
166 | |
167 def _pack_idempotently(document: str, tmp_path: Path) -> None: | |
168 loadingContext, workflowobj, uri = fetch_document(get_data(document)) | |
169 loadingContext.do_update = False | |
170 loadingContext, uri = resolve_and_validate_document( | |
171 loadingContext, workflowobj, uri | |
172 ) | |
173 loader = loadingContext.loader | |
174 assert loader | |
175 loader.resolve_ref(uri)[0] | |
176 | |
177 # generate pack output dict | |
178 packed_text = print_pack(loadingContext, uri) | |
179 packed = json.loads(packed_text) | |
180 | |
181 tmp_name = tmp_path / "packed.cwl" | |
182 tmp = tmp_name.open(mode="w") | |
183 tmp.write(packed_text) | |
184 tmp.flush() | |
185 tmp.close() | |
186 | |
187 loadingContext, workflowobj, uri2 = fetch_document(tmp.name) | |
188 loadingContext.do_update = False | |
189 loadingContext, uri2 = resolve_and_validate_document( | |
190 loadingContext, workflowobj, uri2 | |
191 ) | |
192 loader2 = loadingContext.loader | |
193 assert loader2 | |
194 loader2.resolve_ref(uri2)[0] | |
195 | |
196 # generate pack output dict | |
197 packed_text = print_pack(loadingContext, uri2) | |
198 double_packed = json.loads(packed_text) | |
199 | |
200 assert uri != uri2 | |
201 assert packed == double_packed | |
202 | |
203 | |
204 cwl_to_run = [ | |
205 ("tests/wf/count-lines1-wf.cwl", "tests/wf/wc-job.json", False), | |
206 ("tests/wf/formattest.cwl", "tests/wf/formattest-job.json", True), | |
207 ] | |
208 | |
209 | |
210 @needs_docker | |
211 @pytest.mark.parametrize("wf_path,job_path,namespaced", cwl_to_run) | |
212 def test_packed_workflow_execution( | |
213 wf_path: str, job_path: str, namespaced: bool, tmp_path: Path | |
214 ) -> None: | |
215 loadingContext = LoadingContext() | |
216 loadingContext.resolver = tool_resolver | |
217 loadingContext, workflowobj, uri = fetch_document(get_data(wf_path), loadingContext) | |
218 loadingContext.do_update = False | |
219 loadingContext, uri = resolve_and_validate_document( | |
220 loadingContext, workflowobj, uri | |
221 ) | |
222 loader = loadingContext.loader | |
223 assert loader | |
224 loader.resolve_ref(uri)[0] | |
225 packed = json.loads(print_pack(loadingContext, uri)) | |
226 | |
227 assert not namespaced or "$namespaces" in packed | |
228 | |
229 wf_packed_handle, wf_packed_path = tempfile.mkstemp() | |
230 with open(wf_packed_path, "w") as temp_file: | |
231 json.dump(packed, temp_file) | |
232 | |
233 normal_output = StringIO() | |
234 packed_output = StringIO() | |
235 | |
236 normal_params = ["--outdir", str(tmp_path), get_data(wf_path), get_data(job_path)] | |
237 packed_params = [ | |
238 "--outdir", | |
239 str(tmp_path), | |
240 "--debug", | |
241 wf_packed_path, | |
242 get_data(job_path), | |
243 ] | |
244 | |
245 assert main(normal_params, stdout=normal_output) == 0 | |
246 assert main(packed_params, stdout=packed_output) == 0 | |
247 | |
248 assert json.loads(packed_output.getvalue()) == json.loads(normal_output.getvalue()) | |
249 | |
250 os.close(wf_packed_handle) | |
251 os.remove(wf_packed_path) |