comparison env/lib/python3.9/site-packages/planemo/workflow_lint.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import os
2 import re
3
4 import yaml
5 from galaxy.tool_util.lint import LintContext
6 from galaxy.tool_util.loader_directory import EXCLUDE_WALK_DIRS
7 from gxformat2._yaml import ordered_load
8 from gxformat2.lint import lint_format2, lint_ga
9
10
11 from planemo.exit_codes import (
12 EXIT_CODE_GENERIC_FAILURE,
13 EXIT_CODE_OK,
14 )
15 from planemo.galaxy.workflows import input_labels, output_labels, required_input_labels
16 from planemo.runnable import cases, for_path
17 from planemo.shed import DOCKSTORE_REGISTRY_CONF
18
19 POTENTIAL_WORKFLOW_FILES = re.compile(r'^.*(\.yml|\.yaml|\.ga)$')
20 DOCKSTORE_REGISTRY_CONF_VERSION = "1.2"
21
22
23 class WorkflowLintContext(LintContext):
24 # Setup training topic for linting - probably should pass this through
25 # from click arguments.
26 training_topic = None
27
28
29 def generate_dockstore_yaml(directory):
30 workflows = []
31 for workflow_path in find_workflow_descriptions(directory):
32 workflows.append({
33 # TODO: support CWL
34 "subclass": "Galaxy",
35 "primaryDescriptorPath": os.path.relpath(workflow_path, directory)
36 })
37 # Force version to the top of file but serializing rest of config seprately
38 contents = "version: %s\n" % DOCKSTORE_REGISTRY_CONF_VERSION
39 contents += yaml.dump({"workflows": workflows})
40 return contents
41
42
43 def lint_workflow_artifacts_on_paths(ctx, paths, lint_args):
44 report_level = lint_args["level"]
45 lint_context = WorkflowLintContext(report_level, skip_types=lint_args["skip_types"])
46 for path in paths:
47 _lint_workflow_artifacts_on_path(lint_context, path, lint_args)
48
49 if lint_context.failed(lint_args["fail_level"]):
50 return EXIT_CODE_GENERIC_FAILURE
51 else:
52 return EXIT_CODE_OK
53
54
55 def _lint_workflow_artifacts_on_path(lint_context, path, lint_args):
56 for potential_workflow_artifact_path in find_potential_workflow_files(path):
57 if os.path.basename(potential_workflow_artifact_path) == DOCKSTORE_REGISTRY_CONF:
58 lint_context.lint("lint_dockstore", _lint_dockstore_config, potential_workflow_artifact_path)
59
60 elif looks_like_a_workflow(potential_workflow_artifact_path):
61
62 def structure(path, lint_context):
63 with open(path, "r") as f:
64 workflow_dict = ordered_load(f)
65 workflow_class = workflow_dict.get("class")
66 lint_func = lint_format2 if workflow_class == "GalaxyWorkflow" else lint_ga
67 lint_func(lint_context, workflow_dict, path=path)
68
69 lint_context.lint("lint_structure", structure, potential_workflow_artifact_path)
70
71 lint_context.lint("lint_tests", _lint_tsts, potential_workflow_artifact_path)
72 else:
73 # Allow linting ro crates and such also
74 pass
75
76
77 # misspell for pytest
78 def _lint_tsts(path, lint_context):
79 runnables = for_path(path, return_all=True)
80 if not isinstance(runnables, list):
81 runnables = [runnables]
82 for runnable in runnables:
83 test_cases = cases(runnable)
84 all_tests_valid = False
85 if len(test_cases) == 0:
86 lint_context.warn("Workflow missing test cases.")
87 else:
88 all_tests_valid = True
89 for test_case in test_cases:
90 if not _lint_case(path, test_case, lint_context):
91 all_tests_valid = False
92
93 if all_tests_valid:
94 lint_context.valid(f"Tests appear structurally correct for {runnable.path}")
95
96
97 def _lint_case(path, test_case, lint_context):
98 test_valid = True
99
100 i_labels = input_labels(workflow_path=path)
101 job_keys = test_case.input_ids
102 for key in job_keys:
103 if key not in i_labels:
104 # consider an error instead?
105 lint_context.warn("Unknown workflow input in test job definition [%s], workflow inputs are [%s]" % (key, i_labels))
106 test_valid = False
107
108 # check non-optional parameters are set
109 for required_label in required_input_labels(path):
110 if required_label not in job_keys:
111 template = "Non-optional input has no value specified in workflow test job [%s], job specifies inputs [%s]"
112 lint_context.error(template % (required_label, job_keys))
113 test_valid = False
114
115 for input_id, input_def in test_case._job.items():
116 if not _tst_input_valid(test_case, input_id, input_def, lint_context):
117 test_valid = False
118
119 test_output_ids = test_case.tested_output_ids
120 o_labels = output_labels(path)
121 found_valid_expectation = False
122 for test_output_id in test_output_ids:
123 if test_output_id not in o_labels:
124 template = "Test found for unknown workflow output [%s], workflow outputs [%s]"
125 lint_context.error(template % (test_output_id, o_labels))
126 test_valid = False
127 else:
128 found_valid_expectation = True
129 # TODO: validate structure of test expectations
130
131 if not found_valid_expectation:
132 lint_context.warn("Found no valid test expectations for workflow test")
133 test_valid = False
134
135 return test_valid
136
137
138 def _tst_input_valid(test_case, input_id, input_def, lint_context):
139 if type(input_def) == dict: # else assume it is a parameter
140 clazz = input_def.get("class")
141 if clazz == "File":
142 input_path = input_def.get("path")
143 if input_path:
144 if not os.path.isabs(input_path):
145 input_path = os.path.join(test_case.tests_directory, input_path)
146 if not os.path.exists(input_path):
147 message = "Test referenced File path [%s] not found" % input_path
148 lint_context.warn(message)
149 return False
150 elif clazz == "Collection":
151 for elem in input_def.get('elements', []):
152 elem_valid = _tst_input_valid(test_case, input_id, elem, lint_context)
153 if not elem_valid:
154 return False
155 return True
156
157
158 def _lint_dockstore_config(path, lint_context):
159 dockstore_yaml = None
160 try:
161 with open(path, "r") as f:
162 dockstore_yaml = yaml.safe_load(f)
163 except Exception:
164 lint_context.error("Invalid YAML found in %s" % DOCKSTORE_REGISTRY_CONF)
165 return
166
167 if not isinstance(dockstore_yaml, dict):
168 lint_context.error("Invalid YAML contents found in %s" % DOCKSTORE_REGISTRY_CONF)
169 return
170
171 if "workflows" not in dockstore_yaml:
172 lint_context.error("Invalid YAML contents found in %s, no workflows defined" % DOCKSTORE_REGISTRY_CONF)
173 return
174
175 workflow_entries = dockstore_yaml.get("workflows")
176 if not isinstance(workflow_entries, list):
177 lint_context.error("Invalid YAML contents found in %s, workflows not a list" % DOCKSTORE_REGISTRY_CONF)
178 return
179
180 for workflow_entry in workflow_entries:
181 _lint_dockstore_workflow_entry(lint_context, os.path.dirname(path), workflow_entry)
182
183
184 def _lint_dockstore_workflow_entry(lint_context, directory, workflow_entry):
185 if not isinstance(workflow_entry, dict):
186 lint_context.error("Invalid YAML contents found in %s, workflow entry not a dict" % DOCKSTORE_REGISTRY_CONF)
187 return
188
189 found_errors = False
190 for required_key in ["primaryDescriptorPath", "subclass"]:
191 if required_key not in workflow_entry:
192 lint_context.error("%s workflow entry missing required key %s" % (DOCKSTORE_REGISTRY_CONF, required_key))
193 found_errors = True
194
195 for recommended_key in ["testParameterFiles"]:
196 if recommended_key not in workflow_entry:
197 lint_context.warn("%s workflow entry missing recommended key %s" % (DOCKSTORE_REGISTRY_CONF, recommended_key))
198
199 if found_errors:
200 # Don't do the rest of the validation for a broken file.
201 return
202
203 # TODO: validate subclass
204 descriptor_path = workflow_entry["primaryDescriptorPath"]
205 test_files = workflow_entry.get("testParameterFiles", [])
206
207 for referenced_file in [descriptor_path] + test_files:
208 referenced_path = os.path.join(directory, referenced_file[1:])
209 if not os.path.exists(referenced_path):
210 lint_context.error("%s workflow entry references absent file %s" % (DOCKSTORE_REGISTRY_CONF, referenced_file))
211
212
213 def looks_like_a_workflow(path):
214 """Return boolean indicating if this path looks like a workflow."""
215 if POTENTIAL_WORKFLOW_FILES.match(os.path.basename(path)):
216 with open(path, "r") as f:
217 workflow_dict = ordered_load(f)
218 if not isinstance(workflow_dict, dict):
219 # Not exactly right - could have a #main def - do better and sync with Galaxy.
220 return False
221 return workflow_dict.get("class") == "GalaxyWorkflow" or workflow_dict.get("a_galaxy_workflow")
222 return False
223
224
225 def find_workflow_descriptions(directory):
226 for potential_workflow_artifact_path in find_potential_workflow_files(directory):
227 if looks_like_a_workflow(potential_workflow_artifact_path):
228 yield potential_workflow_artifact_path
229
230
231 def find_potential_workflow_files(directory):
232 """Return a list of potential workflow files in a directory."""
233 if not os.path.exists(directory):
234 raise ValueError("Directory not found {}".format(directory))
235
236 matches = []
237 if os.path.isdir(directory):
238 for root, dirnames, filenames in os.walk(directory):
239 # exclude some directories (like .hg) from traversing
240 dirnames[:] = [dir for dir in dirnames if dir not in EXCLUDE_WALK_DIRS]
241 for filename in filenames:
242 if POTENTIAL_WORKFLOW_FILES.match(filename):
243 matches.append(os.path.join(root, filename))
244 else:
245 matches.append(directory)
246 return matches