comparison get_tests.py @ 5:a93b05cf3a38 draft default tip

planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/openms commit 5c080b1e2b99f1c88f4557e9fec8c45c9d23b906
author galaxyp
date Fri, 14 Jun 2024 21:30:15 +0000
parents
children
comparison
equal deleted inserted replaced
4:ae20e8a6d309 5:a93b05cf3a38
1 #!/usr/bin/env python
2
3 import argparse
4 import os.path
5 import re
6 import shlex
7 import sys
8 import tempfile
9 from typing import (
10 Dict,
11 List,
12 Optional,
13 TextIO,
14 Tuple,
15 )
16
17 from ctdconverter.common.utils import (
18 ParameterHardcoder,
19 parse_hardcoded_parameters,
20 parse_input_ctds,
21 )
22 from ctdconverter.galaxy.converter import convert_models
23 from CTDopts.CTDopts import (
24 CTDModel,
25 ModelTypeError,
26 Parameters,
27 )
28
29 SKIP_LIST = [
30 r"_prepare\"",
31 r"_convert",
32 r"WRITEINI",
33 r"WRITECTD",
34 r"INVALIDVALUE",
35 r"\.ini\.json",
36 r"OpenSwathMzMLFileCacher .*-convert_back", # - OpenSwathMzMLFileCacher with -convert_back argument https://github.com/OpenMS/OpenMS/issues/4399
37 r"MaRaClusterAdapter.*-consensus_out", # - MaRaCluster with -consensus_out (parameter blacklister: https://github.com/OpenMS/OpenMS/issues/4456)
38 r"FileMerger_1_input1.dta2d.*FileMerger_1_input2.dta ", # - FileMerger with mixed dta dta2d input (ftype can not be specified in the test, dta can not be sniffed)
39 r'^(TOPP_OpenSwathAnalyzer_test_3|TOPP_OpenSwathAnalyzer_test_4)$', # no suppert for cached mzML
40 r'TOPP_SiriusAdapter_[0-9]+$', # Do not test SiriusAdapter https://github.com/OpenMS/OpenMS/issues/7000 .. will be removed anyway
41 r'TOPP_AssayGeneratorMetabo_(7|8|9|10|11|12|13|14|15|16|17|18)$' # Skip AssayGeneratorMetabo tests using Sirius https://github.com/OpenMS/OpenMS/issues/7150 (will be replaced by two tools)
42 ]
43
44
45 def get_failing_tests(cmake: List[str]) -> List[str]:
46 failing_tests = []
47 re_fail = re.compile(r"set_tests_properties\(\"([^\"]+)\" PROPERTIES WILL_FAIL 1\)")
48
49 for cmake in args.cmake:
50 with open(cmake) as cmake_fh:
51 for line in cmake_fh:
52 match = re_fail.search(line)
53 if match:
54 failing_tests.append(match.group(1))
55 return failing_tests
56
57
58 def fix_tmp_files(line: str, diff_pairs: Dict[str, str]) -> str:
59 """
60 OpenMS tests output to tmp files and compare with FuzzyDiff to the expected file.
61 problem: the extension of the tmp files is unusable for test generation.
62 unfortunately the extensions used in the DIFF lines are not always usable for the CLI
63 (e.g. for prepare_test_data, e.g. CLI expects csv but test file is txt)
64 this function replaces the tmp file by the expected file.
65 """
66 cmd = shlex.split(line)
67 for i, e in enumerate(cmd):
68 if e in diff_pairs:
69 dst = os.path.join("test-data", diff_pairs[e])
70 if os.path.exists(dst):
71 os.unlink(dst)
72 sys.stderr.write(f"symlink {e} {dst}\n")
73 os.symlink(e, dst)
74 cmd[i] = diff_pairs[e]
75 return shlex.join(cmd)
76
77
78 def get_ini(line: str, tool_id: str) -> Tuple[str, str]:
79 """
80 if there is an ini file then we use this to generate the test
81 otherwise the ctd file is used
82 other command line parameters are inserted later into this xml
83 """
84 cmd = shlex.split(line)
85 ini = None
86 for i, e in enumerate(cmd):
87 if e == "-ini":
88 ini = cmd[i + 1]
89 cmd = cmd[:i] + cmd[i + 2:]
90 if ini:
91 return os.path.join("test-data", ini), shlex.join(cmd)
92 else:
93 return os.path.join("ctd", f"{tool_id}.ctd"), line
94
95
96 def unique_files(line: str):
97 """
98 some tests use the same file twice which does not work in planemo tests
99 hence we create symlinks for each file used twice
100 """
101 cmd = shlex.split(line)
102 # print(f"{cmd}")
103 files = {}
104 # determine the list of indexes where each file argument (anything appearing in test-data/) appears
105 for idx, e in enumerate(cmd):
106 p = os.path.join("test-data", e)
107 if not os.path.exists(p) and not os.path.islink(p):
108 continue
109 try:
110 files[e].append(idx)
111 except KeyError:
112 files[e] = [idx]
113 # print(f"{files=}")
114 for f in files:
115 if len(files[f]) < 2:
116 continue
117 for i, idx in enumerate(files[f]):
118 f_parts = f.split(".")
119 f_parts[0] = f"{f_parts[0]}_{i}"
120 new_f = ".".join(f_parts)
121 # if os.path.exists(os.path.join("test-data", new_f)):
122 # os.unlink(os.path.join("test-data", new_f))
123 sys.stderr.write(
124 f'\tsymlink {os.path.join("test-data", new_f)} {f}\n'
125 )
126 try:
127 os.symlink(f, os.path.join("test-data", new_f))
128 except FileExistsError:
129 pass
130 cmd[idx] = new_f
131 return shlex.join(cmd)
132
133
134 def fill_ctd_clargs(ini: str, line: str, ctd_tmp: TextIO) -> None:
135 cmd = shlex.split(line)
136
137 # load CTDModel
138 ini_model = None
139 try:
140 ini_model = CTDModel(from_file=ini)
141 except ModelTypeError:
142 pass
143 try:
144 ini_model = Parameters(from_file=ini)
145 except ModelTypeError:
146 pass
147 assert ini_model is not None, "Could not parse %s, seems to be no CTD/PARAMS" % (
148 args.ini_file
149 )
150
151 # get a dictionary of the ctd arguments where the values of the parameters
152 # given on the command line are overwritten
153 ini_values = ini_model.parse_cl_args(cl_args=cmd, ignore_required=True)
154 ini_model.write_ctd(ctd_tmp, ini_values)
155
156
157 def process_test_line(
158 id: str,
159 line: str,
160 failing_tests: List[str],
161 skip_list: List[str],
162 diff_pairs: Dict[str, str],
163 ) -> Optional[str]:
164
165 re_test_id = re.compile(r"add_test\(\"([^\"]+)\" ([^ ]+) (.*)")
166 re_id_out_test = re.compile(r"_out_?[0-9]?")
167
168 # TODO auto extract from set(OLD_OSW_PARAM ... lin
169 line = line.replace(
170 "${OLD_OSW_PARAM}",
171 " -test -mz_extraction_window 0.05 -mz_extraction_window_unit Th -ms1_isotopes 0 -Scoring:TransitionGroupPicker:compute_peak_quality -Scoring:Scores:use_ms1_mi false -Scoring:Scores:use_mi_score false",
172 )
173
174 line = line.replace("${TOPP_BIN_PATH}/", "")
175 line = line.replace("${DATA_DIR_TOPP}/", "")
176 line = line.replace("THIRDPARTY/", "")
177 line = line.replace("${DATA_DIR_SHARE}/", "")
178 # IDRipper PATH gets empty causing problems. TODO But overall the option needs to be handled differently
179 line = line.replace("${TMP_RIP_PATH}/", "")
180 # some input files are originally in a subdir (degenerated cases/), but not in test-data
181 line = line.replace("degenerate_cases/", "")
182 # determine the test and tool ids and remove the 1) add_test("TESTID" 2) trailing )
183 match = re_test_id.match(line)
184 if not match:
185 sys.exit(f"Ill formated test line {line}\n")
186 test_id = match.group(1)
187 tool_id = match.group(2)
188
189 line = f"{match.group(2)} {match.group(3)}"
190
191 if test_id in failing_tests:
192 sys.stderr.write(f" skip failing {test_id} {line}\n")
193 return
194
195 if id != tool_id:
196 sys.stderr.write(f" skip {test_id} ({id} != {tool_id}) {line}\n")
197 return
198
199 if re_id_out_test.search(test_id):
200 sys.stderr.write(f" skip {test_id} {line}\n")
201 return
202
203 for skip in skip_list:
204 if re.search(skip, line):
205 return
206 if re.search(skip, test_id):
207 return
208
209 line = fix_tmp_files(line, diff_pairs)
210 # print(f"fix {line=}")
211 line = unique_files(line)
212 # print(f"unq {line=}")
213 ini, line = get_ini(line, tool_id)
214
215 from dataclasses import dataclass, field
216
217 @dataclass
218 class CTDConverterArgs:
219 input_files: list
220 output_destination: str
221 default_executable_path: Optional[str] = None
222 hardcoded_parameters: Optional[str] = None
223 parameter_hardcoder: Optional[ParameterHardcoder] = None
224 xsd_location: Optional[str] = None
225 formats_file: Optional[str] = None
226 add_to_command_line: str = ""
227 required_tools_file: Optional[str] = None
228 skip_tools_file: Optional[str] = None
229 macros_files: Optional[List[str]] = field(default_factory=list)
230 test_macros_files: Optional[List[str]] = field(default_factory=list)
231 test_macros_prefix: Optional[List[str]] = field(default_factory=list)
232 test_test: bool = False
233 test_only: bool = False
234 test_unsniffable: Optional[List[str]] = field(default_factory=list)
235 test_condition: Optional[List[str]] = ("compare=sim_size", "delta_frac=0.05")
236 tool_version: str = None
237 tool_profile: str = None
238 bump_file: str = None
239
240 # create an ini/ctd file where the values are equal to the arguments from the command line
241 # and transform it to xml
242 test = [f"<!-- {test_id} -->\n"]
243 with tempfile.NamedTemporaryFile(
244 mode="w+", delete_on_close=False
245 ) as ctd_tmp, tempfile.NamedTemporaryFile(
246 mode="w+", delete_on_close=False
247 ) as xml_tmp:
248 fill_ctd_clargs(ini, line, ctd_tmp)
249 ctd_tmp.close()
250 xml_tmp.close()
251 parsed_ctd = parse_input_ctds(None, [ctd_tmp.name], xml_tmp.name, "xml")
252 ctd_args = CTDConverterArgs(
253 input_files=[ctd_tmp.name],
254 output_destination=xml_tmp.name,
255 macros_files=["macros.xml"],
256 skip_tools_file="aux/tools_blacklist.txt",
257 formats_file="aux/filetypes.txt",
258 # tool_conf_destination = "tool.conf",
259 hardcoded_parameters="aux/hardcoded_params.json",
260 tool_version="3.1",
261 test_only=True,
262 test_unsniffable=[
263 "csv",
264 "tsv",
265 "txt",
266 "dta",
267 "dta2d",
268 "edta",
269 "mrm",
270 "splib",
271 ],
272 test_condition=["compare=sim_size", "delta_frac=0.7"],
273 )
274 ctd_args.parameter_hardcoder = parse_hardcoded_parameters(
275 ctd_args.hardcoded_parameters
276 )
277 convert_models(ctd_args, parsed_ctd)
278 xml_tmp = open(xml_tmp.name, "r")
279 for l in xml_tmp:
280 test.append(l)
281
282 return "".join(test)
283
284
285 parser = argparse.ArgumentParser(description="Create Galaxy tests for a OpenMS tools")
286 parser.add_argument("--id", dest="id", help="tool id")
287 parser.add_argument("--cmake", nargs="+", help="OpenMS test CMake files")
288 args = parser.parse_args()
289 sys.stderr.write(f"generate tests for {args.id}\n")
290
291 re_comment = re.compile("#.*")
292 re_empty_prefix = re.compile(r"^\s*")
293 re_empty_suffix = re.compile(r"\s*$")
294 re_add_test = re.compile(r"add_test\(\"(TOPP|UTILS)_.*/" + args.id)
295 re_diff = re.compile(r"\$\{DIFF\}.* -in1 ([^ ]+) -in2 ([^ ]+)")
296 failing_tests = get_failing_tests(args.cmake)
297 tests = []
298
299 # process the given CMake files and compile lists of
300 # - test lines .. essentially add_test(...)
301 # - and pairs of files that are diffed
302 jline = ""
303 test_lines = []
304 diff_pairs = {}
305 for cmake in args.cmake:
306 with open(cmake) as cmake_fh:
307 for line in cmake_fh:
308 # remove comments, empty prefixes and suffixes
309 line = re_comment.sub("", line)
310 line = re_empty_prefix.sub("", line)
311 line = re_empty_suffix.sub("", line)
312 # skip empty lines
313 if line == "":
314 continue
315
316 # join test statements that are split over multiple lines
317 if line.endswith(")"):
318 jline += " " + line[:-1]
319 else:
320 jline = line
321 continue
322 line, jline = jline.strip(), ""
323 match = re_diff.search(line)
324 if match:
325 in1 = match.group(1).split("/")[-1]
326 in2 = match.group(2).split("/")[-1]
327 if in1 != in2:
328 diff_pairs[in1] = in2
329 elif re_add_test.match(line):
330 test_lines.append(line)
331
332 for line in test_lines:
333 test = process_test_line(args.id, line, failing_tests, SKIP_LIST, diff_pairs)
334 if test:
335 tests.append(test)
336
337 tests = "\n".join(tests)
338 print(
339 f"""
340 <xml name="autotest_{args.id}">
341 {tests}
342 </xml>
343 """
344 )