diff get_tests.py @ 14:86a29d233856 draft default tip

planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/openms commit 5c080b1e2b99f1c88f4557e9fec8c45c9d23b906
author galaxyp
date Fri, 14 Jun 2024 21:35:44 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/get_tests.py	Fri Jun 14 21:35:44 2024 +0000
@@ -0,0 +1,344 @@
+#!/usr/bin/env python
+
+import argparse
+import os.path
+import re
+import shlex
+import sys
+import tempfile
+from typing import (
+    Dict,
+    List,
+    Optional,
+    TextIO,
+    Tuple,
+)
+
+from ctdconverter.common.utils import (
+    ParameterHardcoder,
+    parse_hardcoded_parameters,
+    parse_input_ctds,
+)
+from ctdconverter.galaxy.converter import convert_models
+from CTDopts.CTDopts import (
+    CTDModel,
+    ModelTypeError,
+    Parameters,
+)
+
+SKIP_LIST = [
+    r"_prepare\"",
+    r"_convert",
+    r"WRITEINI",
+    r"WRITECTD",
+    r"INVALIDVALUE",
+    r"\.ini\.json",
+    r"OpenSwathMzMLFileCacher .*-convert_back",  # - OpenSwathMzMLFileCacher with -convert_back argument https://github.com/OpenMS/OpenMS/issues/4399
+    r"MaRaClusterAdapter.*-consensus_out",  # - MaRaCluster with -consensus_out (parameter blacklister: https://github.com/OpenMS/OpenMS/issues/4456)
+    r"FileMerger_1_input1.dta2d.*FileMerger_1_input2.dta ",  # - FileMerger with mixed dta dta2d input (ftype can not be specified in the test, dta can not be sniffed)
+    r'^(TOPP_OpenSwathAnalyzer_test_3|TOPP_OpenSwathAnalyzer_test_4)$',  # no  suppert for cached mzML
+    r'TOPP_SiriusAdapter_[0-9]+$',  # Do not test SiriusAdapter https://github.com/OpenMS/OpenMS/issues/7000 .. will be removed anyway
+    r'TOPP_AssayGeneratorMetabo_(7|8|9|10|11|12|13|14|15|16|17|18)$'  # Skip AssayGeneratorMetabo tests using Sirius  https://github.com/OpenMS/OpenMS/issues/7150 (will be replaced by two tools)
+]
+
+
+def get_failing_tests(cmake: List[str]) -> List[str]:
+    failing_tests = []
+    re_fail = re.compile(r"set_tests_properties\(\"([^\"]+)\" PROPERTIES WILL_FAIL 1\)")
+
+    for cmake in args.cmake:
+        with open(cmake) as cmake_fh:
+            for line in cmake_fh:
+                match = re_fail.search(line)
+                if match:
+                    failing_tests.append(match.group(1))
+    return failing_tests
+
+
+def fix_tmp_files(line: str, diff_pairs: Dict[str, str]) -> str:
+    """
+    OpenMS tests output to tmp files and compare with FuzzyDiff to the expected file.
+    problem: the extension of the tmp files is unusable for test generation.
+    unfortunately the extensions used in the DIFF lines are not always usable for the CLI
+    (e.g. for prepare_test_data, e.g. CLI expects csv but test file is txt)
+    this function replaces the tmp file by the expected file.
+    """
+    cmd = shlex.split(line)
+    for i, e in enumerate(cmd):
+        if e in diff_pairs:
+            dst = os.path.join("test-data", diff_pairs[e])
+            if os.path.exists(dst):
+                os.unlink(dst)
+            sys.stderr.write(f"symlink {e} {dst}\n")
+            os.symlink(e, dst)
+            cmd[i] = diff_pairs[e]
+    return shlex.join(cmd)
+
+
+def get_ini(line: str, tool_id: str) -> Tuple[str, str]:
+    """
+    if there is an ini file then we use this to generate the test
+    otherwise the ctd file is used
+    other command line parameters are inserted later into this xml
+    """
+    cmd = shlex.split(line)
+    ini = None
+    for i, e in enumerate(cmd):
+        if e == "-ini":
+            ini = cmd[i + 1]
+            cmd = cmd[:i] + cmd[i + 2:]
+    if ini:
+        return os.path.join("test-data", ini), shlex.join(cmd)
+    else:
+        return os.path.join("ctd", f"{tool_id}.ctd"), line
+
+
+def unique_files(line: str):
+    """
+    some tests use the same file twice which does not work in planemo tests
+    hence we create symlinks for each file used twice
+    """
+    cmd = shlex.split(line)
+    # print(f"{cmd}")
+    files = {}
+    # determine the list of indexes where each file argument (anything appearing in test-data/) appears
+    for idx, e in enumerate(cmd):
+        p = os.path.join("test-data", e)
+        if not os.path.exists(p) and not os.path.islink(p):
+            continue
+        try:
+            files[e].append(idx)
+        except KeyError:
+            files[e] = [idx]
+    # print(f"{files=}")
+    for f in files:
+        if len(files[f]) < 2:
+            continue
+        for i, idx in enumerate(files[f]):
+            f_parts = f.split(".")
+            f_parts[0] = f"{f_parts[0]}_{i}"
+            new_f = ".".join(f_parts)
+            # if os.path.exists(os.path.join("test-data", new_f)):
+            #     os.unlink(os.path.join("test-data", new_f))
+            sys.stderr.write(
+                f'\tsymlink {os.path.join("test-data", new_f)} {f}\n'
+            )
+            try:
+                os.symlink(f, os.path.join("test-data", new_f))
+            except FileExistsError:
+                pass
+            cmd[idx] = new_f
+    return shlex.join(cmd)
+
+
+def fill_ctd_clargs(ini: str, line: str, ctd_tmp: TextIO) -> None:
+    cmd = shlex.split(line)
+
+    # load CTDModel
+    ini_model = None
+    try:
+        ini_model = CTDModel(from_file=ini)
+    except ModelTypeError:
+        pass
+    try:
+        ini_model = Parameters(from_file=ini)
+    except ModelTypeError:
+        pass
+    assert ini_model is not None, "Could not parse %s, seems to be no CTD/PARAMS" % (
+        args.ini_file
+    )
+
+    # get a dictionary of the ctd arguments where the values of the parameters
+    # given on the command line are overwritten
+    ini_values = ini_model.parse_cl_args(cl_args=cmd, ignore_required=True)
+    ini_model.write_ctd(ctd_tmp, ini_values)
+
+
+def process_test_line(
+    id: str,
+    line: str,
+    failing_tests: List[str],
+    skip_list: List[str],
+    diff_pairs: Dict[str, str],
+) -> Optional[str]:
+
+    re_test_id = re.compile(r"add_test\(\"([^\"]+)\" ([^ ]+) (.*)")
+    re_id_out_test = re.compile(r"_out_?[0-9]?")
+
+    # TODO auto extract from  set(OLD_OSW_PARAM ... lin
+    line = line.replace(
+        "${OLD_OSW_PARAM}",
+        " -test -mz_extraction_window 0.05 -mz_extraction_window_unit Th -ms1_isotopes 0 -Scoring:TransitionGroupPicker:compute_peak_quality -Scoring:Scores:use_ms1_mi false -Scoring:Scores:use_mi_score false",
+    )
+
+    line = line.replace("${TOPP_BIN_PATH}/", "")
+    line = line.replace("${DATA_DIR_TOPP}/", "")
+    line = line.replace("THIRDPARTY/", "")
+    line = line.replace("${DATA_DIR_SHARE}/", "")
+    # IDRipper PATH gets empty causing problems. TODO But overall the option needs to be handled differently
+    line = line.replace("${TMP_RIP_PATH}/", "")
+    # some input files are originally in a subdir (degenerated cases/), but not in test-data
+    line = line.replace("degenerate_cases/", "")
+    # determine the test and tool ids and remove the 1) add_test("TESTID" 2) trailing )
+    match = re_test_id.match(line)
+    if not match:
+        sys.exit(f"Ill formated test line {line}\n")
+    test_id = match.group(1)
+    tool_id = match.group(2)
+
+    line = f"{match.group(2)} {match.group(3)}"
+
+    if test_id in failing_tests:
+        sys.stderr.write(f"    skip failing {test_id} {line}\n")
+        return
+
+    if id != tool_id:
+        sys.stderr.write(f"    skip {test_id} ({id} != {tool_id}) {line}\n")
+        return
+
+    if re_id_out_test.search(test_id):
+        sys.stderr.write(f"    skip {test_id} {line}\n")
+        return
+
+    for skip in skip_list:
+        if re.search(skip, line):
+            return
+        if re.search(skip, test_id):
+            return
+
+    line = fix_tmp_files(line, diff_pairs)
+    # print(f"fix {line=}")
+    line = unique_files(line)
+    # print(f"unq {line=}")
+    ini, line = get_ini(line, tool_id)
+
+    from dataclasses import dataclass, field
+
+    @dataclass
+    class CTDConverterArgs:
+        input_files: list
+        output_destination: str
+        default_executable_path: Optional[str] = None
+        hardcoded_parameters: Optional[str] = None
+        parameter_hardcoder: Optional[ParameterHardcoder] = None
+        xsd_location: Optional[str] = None
+        formats_file: Optional[str] = None
+        add_to_command_line: str = ""
+        required_tools_file: Optional[str] = None
+        skip_tools_file: Optional[str] = None
+        macros_files: Optional[List[str]] = field(default_factory=list)
+        test_macros_files: Optional[List[str]] = field(default_factory=list)
+        test_macros_prefix: Optional[List[str]] = field(default_factory=list)
+        test_test: bool = False
+        test_only: bool = False
+        test_unsniffable: Optional[List[str]] = field(default_factory=list)
+        test_condition: Optional[List[str]] = ("compare=sim_size", "delta_frac=0.05")
+        tool_version: str = None
+        tool_profile: str = None
+        bump_file: str = None
+
+    # create an ini/ctd file where the values are equal to the arguments from the command line
+    # and transform it to xml
+    test = [f"<!-- {test_id} -->\n"]
+    with tempfile.NamedTemporaryFile(
+        mode="w+", delete_on_close=False
+    ) as ctd_tmp, tempfile.NamedTemporaryFile(
+        mode="w+", delete_on_close=False
+    ) as xml_tmp:
+        fill_ctd_clargs(ini, line, ctd_tmp)
+        ctd_tmp.close()
+        xml_tmp.close()
+        parsed_ctd = parse_input_ctds(None, [ctd_tmp.name], xml_tmp.name, "xml")
+        ctd_args = CTDConverterArgs(
+            input_files=[ctd_tmp.name],
+            output_destination=xml_tmp.name,
+            macros_files=["macros.xml"],
+            skip_tools_file="aux/tools_blacklist.txt",
+            formats_file="aux/filetypes.txt",
+            # tool_conf_destination = "tool.conf",
+            hardcoded_parameters="aux/hardcoded_params.json",
+            tool_version="3.1",
+            test_only=True,
+            test_unsniffable=[
+                "csv",
+                "tsv",
+                "txt",
+                "dta",
+                "dta2d",
+                "edta",
+                "mrm",
+                "splib",
+            ],
+            test_condition=["compare=sim_size", "delta_frac=0.7"],
+        )
+        ctd_args.parameter_hardcoder = parse_hardcoded_parameters(
+            ctd_args.hardcoded_parameters
+        )
+        convert_models(ctd_args, parsed_ctd)
+        xml_tmp = open(xml_tmp.name, "r")
+        for l in xml_tmp:
+            test.append(l)
+
+    return "".join(test)
+
+
+parser = argparse.ArgumentParser(description="Create Galaxy tests for a OpenMS tools")
+parser.add_argument("--id", dest="id", help="tool id")
+parser.add_argument("--cmake", nargs="+", help="OpenMS test CMake files")
+args = parser.parse_args()
+sys.stderr.write(f"generate tests for {args.id}\n")
+
+re_comment = re.compile("#.*")
+re_empty_prefix = re.compile(r"^\s*")
+re_empty_suffix = re.compile(r"\s*$")
+re_add_test = re.compile(r"add_test\(\"(TOPP|UTILS)_.*/" + args.id)
+re_diff = re.compile(r"\$\{DIFF\}.* -in1 ([^ ]+) -in2 ([^ ]+)")
+failing_tests = get_failing_tests(args.cmake)
+tests = []
+
+# process the given CMake files and compile lists of
+# - test lines .. essentially add_test(...)
+# - and pairs of files that are diffed
+jline = ""
+test_lines = []
+diff_pairs = {}
+for cmake in args.cmake:
+    with open(cmake) as cmake_fh:
+        for line in cmake_fh:
+            # remove comments, empty prefixes and suffixes
+            line = re_comment.sub("", line)
+            line = re_empty_prefix.sub("", line)
+            line = re_empty_suffix.sub("", line)
+            # skip empty lines
+            if line == "":
+                continue
+
+            # join test statements that are split over multiple lines
+            if line.endswith(")"):
+                jline += " " + line[:-1]
+            else:
+                jline = line
+                continue
+            line, jline = jline.strip(), ""
+            match = re_diff.search(line)
+            if match:
+                in1 = match.group(1).split("/")[-1]
+                in2 = match.group(2).split("/")[-1]
+                if in1 != in2:
+                    diff_pairs[in1] = in2
+            elif re_add_test.match(line):
+                test_lines.append(line)
+
+for line in test_lines:
+    test = process_test_line(args.id, line, failing_tests, SKIP_LIST, diff_pairs)
+    if test:
+        tests.append(test)
+
+tests = "\n".join(tests)
+print(
+    f"""
+<xml name="autotest_{args.id}">
+{tests}
+</xml>
+"""
+)