Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/planemo/workflow_lint.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import os | |
2 import re | |
3 | |
4 import yaml | |
5 from galaxy.tool_util.lint import LintContext | |
6 from galaxy.tool_util.loader_directory import EXCLUDE_WALK_DIRS | |
7 from gxformat2._yaml import ordered_load | |
8 from gxformat2.lint import lint_format2, lint_ga | |
9 | |
10 | |
11 from planemo.exit_codes import ( | |
12 EXIT_CODE_GENERIC_FAILURE, | |
13 EXIT_CODE_OK, | |
14 ) | |
15 from planemo.galaxy.workflows import input_labels, output_labels, required_input_labels | |
16 from planemo.runnable import cases, for_path | |
17 from planemo.shed import DOCKSTORE_REGISTRY_CONF | |
18 | |
19 POTENTIAL_WORKFLOW_FILES = re.compile(r'^.*(\.yml|\.yaml|\.ga)$') | |
20 DOCKSTORE_REGISTRY_CONF_VERSION = "1.2" | |
21 | |
22 | |
23 class WorkflowLintContext(LintContext): | |
24 # Setup training topic for linting - probably should pass this through | |
25 # from click arguments. | |
26 training_topic = None | |
27 | |
28 | |
29 def generate_dockstore_yaml(directory): | |
30 workflows = [] | |
31 for workflow_path in find_workflow_descriptions(directory): | |
32 workflows.append({ | |
33 # TODO: support CWL | |
34 "subclass": "Galaxy", | |
35 "primaryDescriptorPath": os.path.relpath(workflow_path, directory) | |
36 }) | |
37 # Force version to the top of file but serializing rest of config seprately | |
38 contents = "version: %s\n" % DOCKSTORE_REGISTRY_CONF_VERSION | |
39 contents += yaml.dump({"workflows": workflows}) | |
40 return contents | |
41 | |
42 | |
43 def lint_workflow_artifacts_on_paths(ctx, paths, lint_args): | |
44 report_level = lint_args["level"] | |
45 lint_context = WorkflowLintContext(report_level, skip_types=lint_args["skip_types"]) | |
46 for path in paths: | |
47 _lint_workflow_artifacts_on_path(lint_context, path, lint_args) | |
48 | |
49 if lint_context.failed(lint_args["fail_level"]): | |
50 return EXIT_CODE_GENERIC_FAILURE | |
51 else: | |
52 return EXIT_CODE_OK | |
53 | |
54 | |
55 def _lint_workflow_artifacts_on_path(lint_context, path, lint_args): | |
56 for potential_workflow_artifact_path in find_potential_workflow_files(path): | |
57 if os.path.basename(potential_workflow_artifact_path) == DOCKSTORE_REGISTRY_CONF: | |
58 lint_context.lint("lint_dockstore", _lint_dockstore_config, potential_workflow_artifact_path) | |
59 | |
60 elif looks_like_a_workflow(potential_workflow_artifact_path): | |
61 | |
62 def structure(path, lint_context): | |
63 with open(path, "r") as f: | |
64 workflow_dict = ordered_load(f) | |
65 workflow_class = workflow_dict.get("class") | |
66 lint_func = lint_format2 if workflow_class == "GalaxyWorkflow" else lint_ga | |
67 lint_func(lint_context, workflow_dict, path=path) | |
68 | |
69 lint_context.lint("lint_structure", structure, potential_workflow_artifact_path) | |
70 | |
71 lint_context.lint("lint_tests", _lint_tsts, potential_workflow_artifact_path) | |
72 else: | |
73 # Allow linting ro crates and such also | |
74 pass | |
75 | |
76 | |
77 # misspell for pytest | |
78 def _lint_tsts(path, lint_context): | |
79 runnables = for_path(path, return_all=True) | |
80 if not isinstance(runnables, list): | |
81 runnables = [runnables] | |
82 for runnable in runnables: | |
83 test_cases = cases(runnable) | |
84 all_tests_valid = False | |
85 if len(test_cases) == 0: | |
86 lint_context.warn("Workflow missing test cases.") | |
87 else: | |
88 all_tests_valid = True | |
89 for test_case in test_cases: | |
90 if not _lint_case(path, test_case, lint_context): | |
91 all_tests_valid = False | |
92 | |
93 if all_tests_valid: | |
94 lint_context.valid(f"Tests appear structurally correct for {runnable.path}") | |
95 | |
96 | |
97 def _lint_case(path, test_case, lint_context): | |
98 test_valid = True | |
99 | |
100 i_labels = input_labels(workflow_path=path) | |
101 job_keys = test_case.input_ids | |
102 for key in job_keys: | |
103 if key not in i_labels: | |
104 # consider an error instead? | |
105 lint_context.warn("Unknown workflow input in test job definition [%s], workflow inputs are [%s]" % (key, i_labels)) | |
106 test_valid = False | |
107 | |
108 # check non-optional parameters are set | |
109 for required_label in required_input_labels(path): | |
110 if required_label not in job_keys: | |
111 template = "Non-optional input has no value specified in workflow test job [%s], job specifies inputs [%s]" | |
112 lint_context.error(template % (required_label, job_keys)) | |
113 test_valid = False | |
114 | |
115 for input_id, input_def in test_case._job.items(): | |
116 if not _tst_input_valid(test_case, input_id, input_def, lint_context): | |
117 test_valid = False | |
118 | |
119 test_output_ids = test_case.tested_output_ids | |
120 o_labels = output_labels(path) | |
121 found_valid_expectation = False | |
122 for test_output_id in test_output_ids: | |
123 if test_output_id not in o_labels: | |
124 template = "Test found for unknown workflow output [%s], workflow outputs [%s]" | |
125 lint_context.error(template % (test_output_id, o_labels)) | |
126 test_valid = False | |
127 else: | |
128 found_valid_expectation = True | |
129 # TODO: validate structure of test expectations | |
130 | |
131 if not found_valid_expectation: | |
132 lint_context.warn("Found no valid test expectations for workflow test") | |
133 test_valid = False | |
134 | |
135 return test_valid | |
136 | |
137 | |
138 def _tst_input_valid(test_case, input_id, input_def, lint_context): | |
139 if type(input_def) == dict: # else assume it is a parameter | |
140 clazz = input_def.get("class") | |
141 if clazz == "File": | |
142 input_path = input_def.get("path") | |
143 if input_path: | |
144 if not os.path.isabs(input_path): | |
145 input_path = os.path.join(test_case.tests_directory, input_path) | |
146 if not os.path.exists(input_path): | |
147 message = "Test referenced File path [%s] not found" % input_path | |
148 lint_context.warn(message) | |
149 return False | |
150 elif clazz == "Collection": | |
151 for elem in input_def.get('elements', []): | |
152 elem_valid = _tst_input_valid(test_case, input_id, elem, lint_context) | |
153 if not elem_valid: | |
154 return False | |
155 return True | |
156 | |
157 | |
158 def _lint_dockstore_config(path, lint_context): | |
159 dockstore_yaml = None | |
160 try: | |
161 with open(path, "r") as f: | |
162 dockstore_yaml = yaml.safe_load(f) | |
163 except Exception: | |
164 lint_context.error("Invalid YAML found in %s" % DOCKSTORE_REGISTRY_CONF) | |
165 return | |
166 | |
167 if not isinstance(dockstore_yaml, dict): | |
168 lint_context.error("Invalid YAML contents found in %s" % DOCKSTORE_REGISTRY_CONF) | |
169 return | |
170 | |
171 if "workflows" not in dockstore_yaml: | |
172 lint_context.error("Invalid YAML contents found in %s, no workflows defined" % DOCKSTORE_REGISTRY_CONF) | |
173 return | |
174 | |
175 workflow_entries = dockstore_yaml.get("workflows") | |
176 if not isinstance(workflow_entries, list): | |
177 lint_context.error("Invalid YAML contents found in %s, workflows not a list" % DOCKSTORE_REGISTRY_CONF) | |
178 return | |
179 | |
180 for workflow_entry in workflow_entries: | |
181 _lint_dockstore_workflow_entry(lint_context, os.path.dirname(path), workflow_entry) | |
182 | |
183 | |
184 def _lint_dockstore_workflow_entry(lint_context, directory, workflow_entry): | |
185 if not isinstance(workflow_entry, dict): | |
186 lint_context.error("Invalid YAML contents found in %s, workflow entry not a dict" % DOCKSTORE_REGISTRY_CONF) | |
187 return | |
188 | |
189 found_errors = False | |
190 for required_key in ["primaryDescriptorPath", "subclass"]: | |
191 if required_key not in workflow_entry: | |
192 lint_context.error("%s workflow entry missing required key %s" % (DOCKSTORE_REGISTRY_CONF, required_key)) | |
193 found_errors = True | |
194 | |
195 for recommended_key in ["testParameterFiles"]: | |
196 if recommended_key not in workflow_entry: | |
197 lint_context.warn("%s workflow entry missing recommended key %s" % (DOCKSTORE_REGISTRY_CONF, recommended_key)) | |
198 | |
199 if found_errors: | |
200 # Don't do the rest of the validation for a broken file. | |
201 return | |
202 | |
203 # TODO: validate subclass | |
204 descriptor_path = workflow_entry["primaryDescriptorPath"] | |
205 test_files = workflow_entry.get("testParameterFiles", []) | |
206 | |
207 for referenced_file in [descriptor_path] + test_files: | |
208 referenced_path = os.path.join(directory, referenced_file[1:]) | |
209 if not os.path.exists(referenced_path): | |
210 lint_context.error("%s workflow entry references absent file %s" % (DOCKSTORE_REGISTRY_CONF, referenced_file)) | |
211 | |
212 | |
213 def looks_like_a_workflow(path): | |
214 """Return boolean indicating if this path looks like a workflow.""" | |
215 if POTENTIAL_WORKFLOW_FILES.match(os.path.basename(path)): | |
216 with open(path, "r") as f: | |
217 workflow_dict = ordered_load(f) | |
218 if not isinstance(workflow_dict, dict): | |
219 # Not exactly right - could have a #main def - do better and sync with Galaxy. | |
220 return False | |
221 return workflow_dict.get("class") == "GalaxyWorkflow" or workflow_dict.get("a_galaxy_workflow") | |
222 return False | |
223 | |
224 | |
225 def find_workflow_descriptions(directory): | |
226 for potential_workflow_artifact_path in find_potential_workflow_files(directory): | |
227 if looks_like_a_workflow(potential_workflow_artifact_path): | |
228 yield potential_workflow_artifact_path | |
229 | |
230 | |
231 def find_potential_workflow_files(directory): | |
232 """Return a list of potential workflow files in a directory.""" | |
233 if not os.path.exists(directory): | |
234 raise ValueError("Directory not found {}".format(directory)) | |
235 | |
236 matches = [] | |
237 if os.path.isdir(directory): | |
238 for root, dirnames, filenames in os.walk(directory): | |
239 # exclude some directories (like .hg) from traversing | |
240 dirnames[:] = [dir for dir in dirnames if dir not in EXCLUDE_WALK_DIRS] | |
241 for filename in filenames: | |
242 if POTENTIAL_WORKFLOW_FILES.match(filename): | |
243 matches.append(os.path.join(root, filename)) | |
244 else: | |
245 matches.append(directory) | |
246 return matches |