Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/cwltool/factory.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler | 
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 | 
| parents | |
| children | 
line wrap: on
 line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/cwltool/factory.py Fri Jul 31 00:18:57 2020 -0400 @@ -0,0 +1,64 @@ +from __future__ import absolute_import + +import os +from typing import Callable as tCallable # pylint: disable=unused-import +from typing import Any, Dict, Optional, Tuple, Union + +from typing_extensions import Text # pylint: disable=unused-import +# move to a regular typing import when Python 3.3-3.6 is no longer supported + +from . import load_tool +from .context import LoadingContext, RuntimeContext +from .executors import SingleJobExecutor +from .process import Process + + +class WorkflowStatus(Exception): + def __init__(self, out, status): + # type: (Dict[Text,Any], Text) -> None + """Signaling exception for the status of a Workflow.""" + super(WorkflowStatus, self).__init__("Completed %s" % status) + self.out = out + self.status = status + + +class Callable(object): + def __init__(self, t, factory): # type: (Process, Factory) -> None + """Initialize.""" + self.t = t + self.factory = factory + + def __call__(self, **kwargs): + # type: (**Any) -> Union[Text, Dict[Text, Text]] + runtime_context = self.factory.runtime_context.copy() + runtime_context.basedir = os.getcwd() + out, status = self.factory.executor(self.t, kwargs, runtime_context) + if status != "success": + raise WorkflowStatus(out, status) + else: + return out + +class Factory(object): + def __init__(self, + executor=None, # type: Optional[tCallable[...,Tuple[Dict[Text,Any], Text]]] + loading_context=None, # type: Optional[LoadingContext] + runtime_context=None # type: Optional[RuntimeContext] + ): # type: (...) -> None + """Easy way to load a CWL document for execution.""" + if executor is None: + executor = SingleJobExecutor() + self.executor = executor + self.loading_context = loading_context + if loading_context is None: + self.loading_context = LoadingContext() + if runtime_context is None: + self.runtime_context = RuntimeContext() + else: + self.runtime_context = runtime_context + + def make(self, cwl): # type: (Union[Text, Dict[Text, Any]]) -> Callable + """Instantiate a CWL object from a CWl document.""" + load = load_tool.load_tool(cwl, self.loading_context) + if isinstance(load, int): + raise Exception("Error loading tool") + return Callable(load, self)
