diff env/lib/python3.9/site-packages/planemo/workflow_lint.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/planemo/workflow_lint.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,246 @@
+import os
+import re
+
+import yaml
+from galaxy.tool_util.lint import LintContext
+from galaxy.tool_util.loader_directory import EXCLUDE_WALK_DIRS
+from gxformat2._yaml import ordered_load
+from gxformat2.lint import lint_format2, lint_ga
+
+
+from planemo.exit_codes import (
+    EXIT_CODE_GENERIC_FAILURE,
+    EXIT_CODE_OK,
+)
+from planemo.galaxy.workflows import input_labels, output_labels, required_input_labels
+from planemo.runnable import cases, for_path
+from planemo.shed import DOCKSTORE_REGISTRY_CONF
+
+POTENTIAL_WORKFLOW_FILES = re.compile(r'^.*(\.yml|\.yaml|\.ga)$')
+DOCKSTORE_REGISTRY_CONF_VERSION = "1.2"
+
+
+class WorkflowLintContext(LintContext):
+    # Setup training topic for linting - probably should pass this through
+    # from click arguments.
+    training_topic = None
+
+
+def generate_dockstore_yaml(directory):
+    workflows = []
+    for workflow_path in find_workflow_descriptions(directory):
+        workflows.append({
+            # TODO: support CWL
+            "subclass": "Galaxy",
+            "primaryDescriptorPath": os.path.relpath(workflow_path, directory)
+        })
+    # Force version to the top of file but serializing rest of config seprately
+    contents = "version: %s\n" % DOCKSTORE_REGISTRY_CONF_VERSION
+    contents += yaml.dump({"workflows": workflows})
+    return contents
+
+
+def lint_workflow_artifacts_on_paths(ctx, paths, lint_args):
+    report_level = lint_args["level"]
+    lint_context = WorkflowLintContext(report_level, skip_types=lint_args["skip_types"])
+    for path in paths:
+        _lint_workflow_artifacts_on_path(lint_context, path, lint_args)
+
+    if lint_context.failed(lint_args["fail_level"]):
+        return EXIT_CODE_GENERIC_FAILURE
+    else:
+        return EXIT_CODE_OK
+
+
+def _lint_workflow_artifacts_on_path(lint_context, path, lint_args):
+    for potential_workflow_artifact_path in find_potential_workflow_files(path):
+        if os.path.basename(potential_workflow_artifact_path) == DOCKSTORE_REGISTRY_CONF:
+            lint_context.lint("lint_dockstore", _lint_dockstore_config, potential_workflow_artifact_path)
+
+        elif looks_like_a_workflow(potential_workflow_artifact_path):
+
+            def structure(path, lint_context):
+                with open(path, "r") as f:
+                    workflow_dict = ordered_load(f)
+                workflow_class = workflow_dict.get("class")
+                lint_func = lint_format2 if workflow_class == "GalaxyWorkflow" else lint_ga
+                lint_func(lint_context, workflow_dict, path=path)
+
+            lint_context.lint("lint_structure", structure, potential_workflow_artifact_path)
+
+            lint_context.lint("lint_tests", _lint_tsts, potential_workflow_artifact_path)
+        else:
+            # Allow linting ro crates and such also
+            pass
+
+
+# misspell for pytest
+def _lint_tsts(path, lint_context):
+    runnables = for_path(path, return_all=True)
+    if not isinstance(runnables, list):
+        runnables = [runnables]
+    for runnable in runnables:
+        test_cases = cases(runnable)
+        all_tests_valid = False
+        if len(test_cases) == 0:
+            lint_context.warn("Workflow missing test cases.")
+        else:
+            all_tests_valid = True
+        for test_case in test_cases:
+            if not _lint_case(path, test_case, lint_context):
+                all_tests_valid = False
+
+        if all_tests_valid:
+            lint_context.valid(f"Tests appear structurally correct for {runnable.path}")
+
+
+def _lint_case(path, test_case, lint_context):
+    test_valid = True
+
+    i_labels = input_labels(workflow_path=path)
+    job_keys = test_case.input_ids
+    for key in job_keys:
+        if key not in i_labels:
+            # consider an error instead?
+            lint_context.warn("Unknown workflow input in test job definition [%s], workflow inputs are [%s]" % (key, i_labels))
+            test_valid = False
+
+    # check non-optional parameters are set
+    for required_label in required_input_labels(path):
+        if required_label not in job_keys:
+            template = "Non-optional input has no value specified in workflow test job [%s], job specifies inputs [%s]"
+            lint_context.error(template % (required_label, job_keys))
+            test_valid = False
+
+    for input_id, input_def in test_case._job.items():
+        if not _tst_input_valid(test_case, input_id, input_def, lint_context):
+            test_valid = False
+
+    test_output_ids = test_case.tested_output_ids
+    o_labels = output_labels(path)
+    found_valid_expectation = False
+    for test_output_id in test_output_ids:
+        if test_output_id not in o_labels:
+            template = "Test found for unknown workflow output [%s], workflow outputs [%s]"
+            lint_context.error(template % (test_output_id, o_labels))
+            test_valid = False
+        else:
+            found_valid_expectation = True
+        # TODO: validate structure of test expectations
+
+    if not found_valid_expectation:
+        lint_context.warn("Found no valid test expectations for workflow test")
+        test_valid = False
+
+    return test_valid
+
+
+def _tst_input_valid(test_case, input_id, input_def, lint_context):
+    if type(input_def) == dict:  # else assume it is a parameter
+        clazz = input_def.get("class")
+        if clazz == "File":
+            input_path = input_def.get("path")
+            if input_path:
+                if not os.path.isabs(input_path):
+                    input_path = os.path.join(test_case.tests_directory, input_path)
+                if not os.path.exists(input_path):
+                    message = "Test referenced File path [%s] not found" % input_path
+                    lint_context.warn(message)
+                    return False
+        elif clazz == "Collection":
+            for elem in input_def.get('elements', []):
+                elem_valid = _tst_input_valid(test_case, input_id, elem, lint_context)
+                if not elem_valid:
+                    return False
+    return True
+
+
+def _lint_dockstore_config(path, lint_context):
+    dockstore_yaml = None
+    try:
+        with open(path, "r") as f:
+            dockstore_yaml = yaml.safe_load(f)
+    except Exception:
+        lint_context.error("Invalid YAML found in %s" % DOCKSTORE_REGISTRY_CONF)
+        return
+
+    if not isinstance(dockstore_yaml, dict):
+        lint_context.error("Invalid YAML contents found in %s" % DOCKSTORE_REGISTRY_CONF)
+        return
+
+    if "workflows" not in dockstore_yaml:
+        lint_context.error("Invalid YAML contents found in %s, no workflows defined" % DOCKSTORE_REGISTRY_CONF)
+        return
+
+    workflow_entries = dockstore_yaml.get("workflows")
+    if not isinstance(workflow_entries, list):
+        lint_context.error("Invalid YAML contents found in %s, workflows not a list" % DOCKSTORE_REGISTRY_CONF)
+        return
+
+    for workflow_entry in workflow_entries:
+        _lint_dockstore_workflow_entry(lint_context, os.path.dirname(path), workflow_entry)
+
+
+def _lint_dockstore_workflow_entry(lint_context, directory, workflow_entry):
+    if not isinstance(workflow_entry, dict):
+        lint_context.error("Invalid YAML contents found in %s, workflow entry not a dict" % DOCKSTORE_REGISTRY_CONF)
+        return
+
+    found_errors = False
+    for required_key in ["primaryDescriptorPath", "subclass"]:
+        if required_key not in workflow_entry:
+            lint_context.error("%s workflow entry missing required key %s" % (DOCKSTORE_REGISTRY_CONF, required_key))
+            found_errors = True
+
+    for recommended_key in ["testParameterFiles"]:
+        if recommended_key not in workflow_entry:
+            lint_context.warn("%s workflow entry missing recommended key %s" % (DOCKSTORE_REGISTRY_CONF, recommended_key))
+
+    if found_errors:
+        # Don't do the rest of the validation for a broken file.
+        return
+
+    # TODO: validate subclass
+    descriptor_path = workflow_entry["primaryDescriptorPath"]
+    test_files = workflow_entry.get("testParameterFiles", [])
+
+    for referenced_file in [descriptor_path] + test_files:
+        referenced_path = os.path.join(directory, referenced_file[1:])
+        if not os.path.exists(referenced_path):
+            lint_context.error("%s workflow entry references absent file %s" % (DOCKSTORE_REGISTRY_CONF, referenced_file))
+
+
+def looks_like_a_workflow(path):
+    """Return boolean indicating if this path looks like a workflow."""
+    if POTENTIAL_WORKFLOW_FILES.match(os.path.basename(path)):
+        with open(path, "r") as f:
+            workflow_dict = ordered_load(f)
+        if not isinstance(workflow_dict, dict):
+            # Not exactly right - could have a #main def - do better and sync with Galaxy.
+            return False
+        return workflow_dict.get("class") == "GalaxyWorkflow" or workflow_dict.get("a_galaxy_workflow")
+    return False
+
+
+def find_workflow_descriptions(directory):
+    for potential_workflow_artifact_path in find_potential_workflow_files(directory):
+        if looks_like_a_workflow(potential_workflow_artifact_path):
+            yield potential_workflow_artifact_path
+
+
+def find_potential_workflow_files(directory):
+    """Return a list of potential workflow files in a directory."""
+    if not os.path.exists(directory):
+        raise ValueError("Directory not found {}".format(directory))
+
+    matches = []
+    if os.path.isdir(directory):
+        for root, dirnames, filenames in os.walk(directory):
+            # exclude some directories (like .hg) from traversing
+            dirnames[:] = [dir for dir in dirnames if dir not in EXCLUDE_WALK_DIRS]
+            for filename in filenames:
+                if POTENTIAL_WORKFLOW_FILES.match(filename):
+                    matches.append(os.path.join(root, filename))
+    else:
+        matches.append(directory)
+    return matches