diff env/lib/python3.9/site-packages/planemo/engine/interface.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/planemo/engine/interface.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,136 @@
+"""Module contianing the :class:`Engine` abstraction."""
+
+import abc
+import json
+import os
+import tempfile
+from typing import List
+
+from six import add_metaclass
+
+from planemo.exit_codes import EXIT_CODE_UNSUPPORTED_FILE_TYPE
+from planemo.io import error
+from planemo.runnable import (
+    cases,
+    RunnableType,
+)
+from planemo.test.results import StructuredData
+
+
+@add_metaclass(abc.ABCMeta)
+class Engine(object):
+    """Abstract description of an external process for running tools or workflows.
+    """
+
+    @abc.abstractmethod
+    def run(self, path, job_path):
+        """Run a job using a compatible artifact (workflow or tool)."""
+
+    @abc.abstractmethod
+    def cleanup(self):
+        """Release any resources used to run/test with this engine."""
+
+    @abc.abstractmethod
+    def test(self, runnables):
+        """Test runnable artifacts (workflow or tool)."""
+
+
+class BaseEngine(Engine):
+    """Base class providing context and keywords for Engine implementations."""
+
+    handled_runnable_types = []  # type: List[RunnableType]
+
+    def __init__(self, ctx, **kwds):
+        """Store context and kwds."""
+        self._ctx = ctx
+        self._kwds = kwds
+
+    def can_run(self, runnable):
+        """Use subclass's ``handled_runnable_types`` variable to infer ``can_run``."""
+        return runnable.type in self.handled_runnable_types
+
+    def cleanup(self):
+        """Default no-op cleanup method."""
+
+    def run(self, runnable, job_path):
+        """Run a job using a compatible artifact (workflow or tool)."""
+        self._check_can_run(runnable)
+        run_response = self._run(runnable, job_path)
+        return run_response
+
+    @abc.abstractmethod
+    def _run(self, runnable, job_path):
+        """Run a job using a compatible artifact (workflow or tool) wrapped as a runnable."""
+
+    def _check_can_run(self, runnable):
+        if not self.can_run(runnable):
+            template = "Engine type [%s] cannot execute [%s]s"
+            message = template % (self.__class__, runnable.type)
+            error(message)
+            self._ctx.exit(EXIT_CODE_UNSUPPORTED_FILE_TYPE)
+
+    def _check_can_run_all(self, runnables):
+        for runnable in runnables:
+            self._check_can_run(runnable)
+
+    def test(self, runnables):
+        """Test runnable artifacts (workflow or tool)."""
+        self._check_can_run_all(runnables)
+        test_cases = [t for tl in map(cases, runnables) for t in tl]
+        test_results = self._collect_test_results(test_cases)
+        tests = []
+        for (test_case, run_response) in test_results:
+            test_case_data = test_case.structured_test_data(run_response)
+            tests.append(test_case_data)
+        test_data = {
+            'version': '0.1',
+            'tests': tests,
+        }
+        structured_results = StructuredData(data=test_data)
+        structured_results.calculate_summary_data()
+        return structured_results
+
+    def _collect_test_results(self, test_cases):
+        test_results = []
+        for test_case in test_cases:
+            self._ctx.vlog(
+                "Running tests %s" % test_case
+            )
+            run_response = self._run_test_case(test_case)
+            self._ctx.vlog(
+                "Test case [%s] resulted in run response [%s]",
+                test_case,
+                run_response,
+            )
+            test_results.append((test_case, run_response))
+        return test_results
+
+    def _run_test_case(self, test_case):
+        runnable = test_case.runnable
+        job_path = test_case.job_path
+        tmp_path = None
+        if job_path is None:
+            job = test_case.job
+            f = tempfile.NamedTemporaryFile(
+                dir=test_case.tests_directory,
+                suffix=".json",
+                prefix="plnmotmptestjob",
+                delete=False,
+                mode="w+",
+            )
+            tmp_path = f.name
+            job_path = tmp_path
+            json.dump(job, f)
+            f.close()
+        try:
+            run_response = self._run(runnable, job_path)
+        finally:
+            if tmp_path:
+                os.remove(tmp_path)
+        return run_response
+
+
+__all__ = (
+    "Engine",
+    "BaseEngine",
+)