comparison env/lib/python3.9/site-packages/planemo/runnable.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Describe artifacts that can be run, tested, and linted."""
2
3 from __future__ import absolute_import
4
5 import abc
6 import os
7 from distutils.dir_util import copy_tree
8 from enum import auto, Enum
9 from pathlib import Path
10 from typing import NamedTuple
11 from urllib.parse import urlparse
12
13 import yaml
14 from galaxy.tool_util.cwl.parser import workflow_proxy
15 from galaxy.tool_util.loader_directory import (
16 is_a_yaml_with_class,
17 looks_like_a_cwl_artifact,
18 looks_like_a_data_manager_xml,
19 looks_like_a_tool_cwl,
20 looks_like_a_tool_xml,
21 )
22 from galaxy.tool_util.parser import get_tool_source
23 from six import (
24 add_metaclass,
25 python_2_unicode_compatible,
26 )
27
28 from planemo.exit_codes import EXIT_CODE_UNKNOWN_FILE_TYPE, ExitCodeException
29 from planemo.galaxy.workflows import describe_outputs, GALAXY_WORKFLOWS_PREFIX
30 from planemo.io import error
31 from planemo.shed import DOCKSTORE_REGISTRY_CONF
32 from planemo.test import check_output, for_collections
33
34 TEST_SUFFIXES = [
35 "-tests", "_tests", "-test", "_test"
36 ]
37 TEST_EXTENSIONS = [".yml", ".yaml", ".json"]
38
39 TEST_FILE_NOT_LIST_MESSAGE = ("Invalid test definition file [%s] - file must "
40 "contain a list of tests")
41 TEST_FIELD_MISSING_MESSAGE = ("Invalid test definition [test #%d in %s] -"
42 "defintion must field [%s].")
43
44
45 class RunnableType(Enum):
46 galaxy_tool = auto()
47 galaxy_datamanager = auto()
48 galaxy_workflow = auto()
49 cwl_tool = auto()
50 cwl_workflow = auto()
51 directory = auto()
52
53 @property
54 def has_tools(runnable_type):
55 return runnable_type.name in ["galaxy_tool", "galaxy_datamanager", "cwl_tool", "directory"]
56
57 @property
58 def is_single_artifact(runnable_type):
59 return runnable_type.name not in ["directory"]
60
61 @property
62 def test_data_in_parent_dir(runnable_type):
63 return runnable_type.name in ["galaxy_datamanager"]
64
65 @property
66 def is_galaxy_artifact(runnable_type):
67 return "galaxy" in runnable_type.name
68
69 @property
70 def is_cwl_artifact(runnable_type):
71 return "cwl" in runnable_type.name
72
73
74 class Runnable(NamedTuple):
75 """Abstraction describing tools and workflows."""
76 uri: str
77 type: RunnableType
78
79 @property
80 def path(self):
81 uri = self.uri
82 if self.is_remote_workflow_uri:
83 parse_result = urlparse(uri)
84 query = parse_result.query
85 if query:
86 assert query.startswith("runnable_path=")
87 return query[len("runnable_path="):]
88 else:
89 raise ValueError(f"Runnable with URI {uri} is remote resource without local path")
90 else:
91 return uri
92
93 @property
94 def has_path(self):
95 try:
96 self.path
97 return True
98 except ValueError:
99 return False
100
101 @property
102 def is_remote_workflow_uri(self):
103 return self.uri.startswith(GALAXY_WORKFLOWS_PREFIX)
104
105 @property
106 def test_data_search_path(self):
107 """During testing, path to search for test data files."""
108 if self.type.name in ['galaxy_datamanager']:
109 return os.path.join(os.path.dirname(self.path), os.path.pardir)
110 else:
111 return self.path
112
113 @property
114 def tool_data_search_path(self):
115 """During testing, path to search for Galaxy tool data tables."""
116 return self.test_data_search_path
117
118 @property
119 def data_manager_conf_path(self):
120 """Path of a Galaxy data manager configuration for runnable or None."""
121 if self.type.name in ['galaxy_datamanager']:
122 return os.path.join(os.path.dirname(self.path), os.pardir, 'data_manager_conf.xml')
123
124 @property
125 def has_tools(self):
126 """Boolean indicating if this runnable corresponds to one or more tools."""
127 return _runnable_delegate_attribute('has_tools')
128
129 @property
130 def is_single_artifact(self):
131 """Boolean indicating if this runnable is a single artifact.
132
133 Currently only directories are considered not a single artifact.
134 """
135 return _runnable_delegate_attribute('is_single_artifact')
136
137
138 def _runnable_delegate_attribute(attribute):
139
140 @property
141 def getter(runnable):
142 return getattr(runnable.type, attribute)
143
144 return getter
145
146
147 def _copy_runnable_tree(path, runnable_type, temp_path):
148 dir_to_copy = None
149 if runnable_type in {RunnableType.galaxy_tool, RunnableType.cwl_tool}:
150 dir_to_copy = os.path.dirname(path)
151 path = os.path.join(temp_path, os.path.basename(path))
152 elif runnable_type == RunnableType.directory:
153 dir_to_copy = path
154 path = temp_path
155 elif runnable_type == RunnableType.galaxy_datamanager:
156 dir_to_copy = os.path.join(os.path.dirname(path), os.pardir)
157 path_to_data_manager_tool = os.path.relpath(path, dir_to_copy)
158 path = os.path.join(temp_path, path_to_data_manager_tool)
159 if dir_to_copy:
160 copy_tree(dir_to_copy, temp_path, update=True)
161 return path
162
163
164 def workflows_from_dockstore_yaml(path):
165 workflows = []
166 parent_dir = Path(path).absolute().parent
167 with open(path) as y:
168 for workflow in yaml.safe_load(y).get('workflows', []):
169 workflow_path = workflow.get('primaryDescriptorPath')
170 if workflow_path:
171 if workflow_path.startswith('/'):
172 workflow_path = workflow_path[1:]
173 workflows.append(parent_dir.joinpath(workflow_path))
174 return workflows
175
176
177 def workfow_dir_runnables(path, return_all=False):
178 dockstore_path = os.path.join(path, DOCKSTORE_REGISTRY_CONF)
179 if os.path.exists(dockstore_path):
180 runnables = [Runnable(str(path), RunnableType.galaxy_workflow) for path in workflows_from_dockstore_yaml(dockstore_path)]
181 if return_all:
182 return runnables
183 else:
184 return runnables[0]
185
186
187 def for_path(path, temp_path=None, return_all=False):
188 """Produce a class:`Runnable` for supplied path."""
189 runnable_type = None
190 if os.path.isdir(path):
191 runnable = workfow_dir_runnables(path, return_all=return_all)
192 if runnable:
193 return runnable
194 runnable_type = RunnableType.directory
195 elif looks_like_a_tool_cwl(path):
196 runnable_type = RunnableType.cwl_tool
197 elif looks_like_a_data_manager_xml(path):
198 runnable_type = RunnableType.galaxy_datamanager
199 elif looks_like_a_tool_xml(path):
200 runnable_type = RunnableType.galaxy_tool
201 elif is_a_yaml_with_class(path, ["GalaxyWorkflow"]):
202 runnable_type = RunnableType.galaxy_workflow
203 elif path.endswith(".ga"):
204 runnable_type = RunnableType.galaxy_workflow
205 elif looks_like_a_cwl_artifact(path, ["Workflow"]):
206 runnable_type = RunnableType.cwl_workflow
207 else:
208 # Check to see if it is a Galaxy workflow with a different extension
209 try:
210 with open(path, "r") as f:
211 as_dict = yaml.safe_load(f)
212 if as_dict.get("a_galaxy_workflow", False):
213 runnable_type = RunnableType.galaxy_workflow
214 except Exception:
215 pass
216
217 if runnable_type is None:
218 error("Unable to determine runnable type for path [%s]" % path)
219 raise ExitCodeException(EXIT_CODE_UNKNOWN_FILE_TYPE)
220
221 if temp_path:
222 path = _copy_runnable_tree(path, runnable_type, temp_path)
223
224 return Runnable(path, runnable_type)
225
226
227 def for_paths(paths, temp_path=None):
228 """Return a specialized list of Runnable objects for paths."""
229 return [for_path(path, temp_path=temp_path) for path in paths]
230
231
232 def for_uri(uri):
233 """Produce a class:`Runnable` for supplied Galaxy workflow ID."""
234 # TODO: allow galaxy_tool also, this trick would work fine for running tools
235 runnable = Runnable(uri, RunnableType.galaxy_workflow)
236 return runnable
237
238
239 def cases(runnable):
240 """Build a `list` of :class:`TestCase` objects for specified runnable."""
241 cases = []
242
243 tests_path = _tests_path(runnable)
244 if tests_path is None:
245 if runnable.type == RunnableType.galaxy_tool:
246 tool_source = get_tool_source(runnable.path)
247 test_dicts = tool_source.parse_tests_to_dict()
248 tool_id = tool_source.parse_id()
249 tool_version = tool_source.parse_version()
250 for i, test_dict in enumerate(test_dicts.get("tests", [])):
251 cases.append(ExternalGalaxyToolTestCase(runnable, tool_id, tool_version, i, test_dict))
252 return cases
253
254 tests_directory = os.path.abspath(os.path.dirname(tests_path))
255
256 def normalize_to_tests_path(path):
257 if not os.path.isabs(path):
258 absolute_path = os.path.join(tests_directory, path)
259 else:
260 absolute_path = path
261 return os.path.normpath(absolute_path)
262
263 with open(tests_path, "r") as f:
264 tests_def = yaml.safe_load(f)
265
266 if not isinstance(tests_def, list):
267 message = TEST_FILE_NOT_LIST_MESSAGE % tests_path
268 raise Exception(message)
269
270 for i, test_def in enumerate(tests_def):
271 if "job" not in test_def:
272 message = TEST_FIELD_MISSING_MESSAGE % (
273 i + 1, tests_path, "job"
274 )
275 raise Exception(message)
276 job_def = test_def["job"]
277 if isinstance(job_def, dict):
278 job_path = None
279 job = job_def
280 else:
281 job_path = normalize_to_tests_path(job_def)
282 job = None
283
284 doc = test_def.get("doc", None)
285 output_expectations = test_def.get("outputs", {})
286 case = TestCase(
287 runnable=runnable,
288 tests_directory=tests_directory,
289 output_expectations=output_expectations,
290 index=i,
291 job_path=job_path,
292 job=job,
293 doc=doc,
294 )
295 cases.append(case)
296
297 return cases
298
299
300 @add_metaclass(abc.ABCMeta)
301 class AbstractTestCase(object):
302 """Description of a test case for a runnable."""
303
304 def structured_test_data(self, run_response):
305 """Result of executing this test case - a "structured_data" dict.
306
307 :rtype: dict
308 :return:
309 For example::
310
311 {
312 "id": "",
313 "has_data": true,
314 "data": {
315 "status": "success", // error, skip,
316 "job": {
317 "command_line": "cat moo",
318 "stdout": "",
319 "stderr": ""
320 },
321 "output_problems": [],
322 "execution_problem": "",
323 "inputs" = {},
324 "problem_log": ""
325 }
326 }
327 """
328
329
330 class TestCase(AbstractTestCase):
331 """Describe an abstract test case for a specified runnable."""
332
333 def __init__(self, runnable, tests_directory, output_expectations, job_path, job, index, doc):
334 """Construct TestCase object from required attributes."""
335 self.runnable = runnable
336 self.job_path = job_path
337 self.job = job
338 self.output_expectations = output_expectations
339 self.tests_directory = tests_directory
340 self.index = index
341 self.doc = doc
342
343 def __repr__(self):
344 return 'TestCase (%s) for runnable (%s) with job (%s) and expected outputs (%s) in directory (%s) with id (%s)' % \
345 (self.doc, self.runnable, self.job, self.output_expectations, self.tests_directory, self.index)
346
347 def structured_test_data(self, run_response):
348 """Check a test case against outputs dictionary."""
349 output_problems = []
350 if run_response.was_successful:
351 outputs_dict = run_response.outputs_dict
352 execution_problem = None
353 for output_id, output_test in self.output_expectations.items():
354 if output_id not in outputs_dict:
355 message = "Expected output [%s] not found in results." % output_id
356 output_problems.append(message)
357 continue
358
359 output_value = outputs_dict[output_id]
360 output_problems.extend(
361 self._check_output(output_id, output_value, output_test)
362 )
363 if output_problems:
364 status = "failure"
365 else:
366 status = "success"
367 else:
368 execution_problem = run_response.error_message
369 status = "error"
370 data_dict = dict(
371 status=status
372 )
373 if status != "success":
374 data_dict["output_problems"] = output_problems
375 data_dict["execution_problem"] = execution_problem
376 log = run_response.log
377 if log is not None:
378 data_dict["problem_log"] = log
379 job_info = run_response.job_info
380 if job_info is not None:
381 data_dict["job"] = job_info
382 invocation_details = run_response.invocation_details
383 if invocation_details is not None:
384 data_dict["invocation_details"] = invocation_details
385 data_dict["inputs"] = self._job
386 return dict(
387 id=("%s_%s" % (self._test_id, self.index)),
388 has_data=True,
389 data=data_dict,
390 doc=self.doc,
391 test_type=self.runnable.type.name,
392 )
393
394 @property
395 def _job(self):
396 if self.job_path is not None:
397 with open(self.job_path, "r") as f:
398 return f.read()
399 else:
400 return self.job
401
402 @property
403 def input_ids(self):
404 """Labels of inputs specified in test description."""
405 return list(self._job.keys())
406
407 @property
408 def tested_output_ids(self):
409 """Labels of outputs checked in test description."""
410 return list(self.output_expectations.keys())
411
412 def _check_output(self, output_id, output_value, output_test):
413 output_problems = []
414 if not isinstance(output_test, dict):
415 if output_test != output_value:
416 template = "Output [%s] value [%s] does not match expected value [%s]."
417 message = template % (output_id, output_value, output_test)
418 output_problems.append(message)
419 else:
420 if not for_collections(output_test):
421 if not isinstance(output_value, dict):
422 message = "Expected file properties for output [%s]" % output_id
423 print(message)
424 print(output_value)
425 output_problems.append(message)
426 return output_problems
427 if "path" not in output_value and "location" in output_value:
428 assert output_value["location"].startswith("file://")
429 output_value["path"] = output_value["location"][len("file://"):]
430 if "path" not in output_value:
431 message = "No path specified for expected output file [%s]" % output_id
432 output_problems.append(message)
433 print(message)
434 return output_problems
435 else:
436 output_test["name"] = output_id
437
438 output_problems.extend(
439 check_output(
440 self.runnable,
441 output_value,
442 output_test,
443 # TODO: needs kwds in here...
444 )
445 )
446
447 return output_problems
448
449 @property
450 def _test_id(self):
451 if self.runnable.type in [
452 RunnableType.cwl_tool,
453 RunnableType.galaxy_tool,
454 ]:
455 return get_tool_source(self.runnable.path).parse_id()
456 else:
457 return os.path.basename(self.runnable.path)
458
459
460 class ExternalGalaxyToolTestCase(AbstractTestCase):
461 """Special class of AbstractCase that doesn't use job_path but uses test data from a Galaxy server."""
462
463 def __init__(self, runnable, tool_id, tool_version, test_index, test_dict):
464 """Construct TestCase object from required attributes."""
465 self.runnable = runnable
466 self.tool_id = tool_id
467 self.tool_version = tool_version
468 self.test_index = test_index
469 self.test_dict = test_dict
470
471 def structured_test_data(self, run_response):
472 """Just return the structured_test_data generated from galaxy-tool-util for this test variant."""
473 return run_response
474
475
476 def _tests_path(runnable):
477 if not runnable.is_single_artifact:
478 raise NotImplementedError("Tests for directories are not yet implemented.")
479
480 runnable_path = runnable.path
481 base, _ = os.path.splitext(runnable_path)
482
483 for test_suffix in TEST_SUFFIXES:
484 for test_extension in TEST_EXTENSIONS:
485 test_path = base + test_suffix + test_extension
486 if os.path.exists(test_path):
487 return test_path
488
489 return None
490
491
492 def get_outputs(runnable, gi=None):
493 """Return a list of :class:`RunnableOutput` objects for this runnable.
494
495 Supply bioblend user Galaxy instance object (as gi) if additional context
496 needed to resolve workflow details.
497 """
498 if not runnable.is_single_artifact:
499 raise NotImplementedError("Cannot generate outputs for a directory.")
500 if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]:
501 tool_source = get_tool_source(runnable.path)
502 # TODO: do something with collections at some point
503 output_datasets, _ = tool_source.parse_outputs(None)
504 outputs = [ToolOutput(o) for o in output_datasets.values()]
505 return outputs
506 elif runnable.type == RunnableType.galaxy_workflow:
507 workflow_outputs = describe_outputs(runnable, gi=gi)
508 return [GalaxyWorkflowOutput(o) for o in workflow_outputs]
509 elif runnable.type == RunnableType.cwl_workflow:
510 workflow = workflow_proxy(runnable.path, strict_cwl_validation=False)
511 return [CwlWorkflowOutput(label) for label in workflow.output_labels]
512 else:
513 raise NotImplementedError("Getting outputs for this artifact type is not yet supported.")
514
515
516 @add_metaclass(abc.ABCMeta)
517 class RunnableOutput(object):
518 """Description of a single output of an execution of a Runnable."""
519
520 @abc.abstractproperty
521 def get_id(self):
522 """An identifier that describes this output."""
523
524
525 class ToolOutput(RunnableOutput):
526 """Implementation of RunnableOutput corresponding to Galaxy tool outputs."""
527
528 def __init__(self, tool_output):
529 self._tool_output = tool_output
530
531 def get_id(self):
532 return self._tool_output.name
533
534
535 class GalaxyWorkflowOutput(RunnableOutput):
536 """Implementation of RunnableOutput corresponding to Galaxy workflow outputs."""
537
538 def __init__(self, workflow_output):
539 self._workflow_output = workflow_output
540
541 def get_id(self):
542 return self._workflow_output.label
543
544 @property
545 def workflow_output(self):
546 return self._workflow_output
547
548
549 class CwlWorkflowOutput(RunnableOutput):
550 """Implementation of RunnableOutput corresponding to CWL outputs."""
551
552 def __init__(self, label):
553 self._label = label
554
555 def get_id(self):
556 return self._label
557
558
559 @add_metaclass(abc.ABCMeta)
560 class RunResponse(object):
561 """Description of an attempt for an engine to execute a Runnable."""
562
563 @abc.abstractproperty
564 def was_successful(self):
565 """Indicate whether an error was encountered while executing this runnable.
566
567 If successful, response should conform to the SuccessfulRunResponse interface,
568 otherwise it will conform to the ErrorRunResponse interface.
569 """
570
571 @abc.abstractproperty
572 def job_info(self):
573 """If job information is available, return as dictionary."""
574
575 @abc.abstractproperty
576 def invocation_details(self):
577 """If workflow invocation details are available, return as dictionary."""
578
579 @abc.abstractproperty
580 def log(self):
581 """If engine related log is available, return as text data."""
582
583
584 @add_metaclass(abc.ABCMeta)
585 class SuccessfulRunResponse(RunResponse):
586 """Description of the results of an engine executing a Runnable."""
587
588 def was_successful(self):
589 """Return `True` to indicate this run was successful."""
590 return True
591
592 @abc.abstractproperty
593 def outputs_dict(self):
594 """Return a dict of output descriptions."""
595
596
597 @python_2_unicode_compatible
598 class ErrorRunResponse(RunResponse):
599 """Description of an error while attempting to execute a Runnable."""
600
601 def __init__(self, error_message, job_info=None, invocation_details=None, log=None):
602 """Create an ErrorRunResponse with specified error message."""
603 self._error_message = error_message
604 self._job_info = job_info
605 self._invocation_details = invocation_details
606 self._log = log
607
608 @property
609 def error_message(self):
610 """Error message describing the problem with execution of the runnable."""
611 return self._error_message
612
613 @property
614 def was_successful(self):
615 """Return `False` to indicate this run was successful."""
616 return False
617
618 @property
619 def job_info(self):
620 """Return potentially null stored `job_info` dict."""
621 return self._job_info
622
623 @property
624 def invocation_details(self):
625 return self._invocation_details
626
627 @property
628 def log(self):
629 """Return potentially null stored `log` text."""
630 return self._log
631
632 def __str__(self):
633 """Print a helpful error description of run."""
634 message = "Run failed with message [%s]" % self.error_message
635 log = self.log
636 if log:
637 message += " and log [%s]" % log
638 return message
639
640
641 __all__ = (
642 "cases",
643 "ErrorRunResponse",
644 "for_path",
645 "for_paths",
646 "get_outputs",
647 "Runnable",
648 "RunnableType",
649 "RunResponse",
650 "RunnableOutput",
651 "SuccessfulRunResponse",
652 "TestCase",
653 )