comparison env/lib/python3.9/site-packages/cwltool/factory.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import os
2 from typing import Any, Dict, Optional, Union
3
4 from . import load_tool
5 from .context import LoadingContext, RuntimeContext
6 from .errors import WorkflowException
7 from .executors import JobExecutor, SingleJobExecutor
8 from .process import Process
9 from .utils import CWLObjectType
10
11
12 class WorkflowStatus(Exception):
13 def __init__(self, out: Optional[CWLObjectType], status: str) -> None:
14 """Signaling exception for the status of a Workflow."""
15 super().__init__("Completed %s" % status)
16 self.out = out
17 self.status = status
18
19
20 class Callable:
21 """Result of Factory.make()."""
22
23 def __init__(self, t: Process, factory: "Factory") -> None:
24 """Initialize."""
25 self.t = t
26 self.factory = factory
27
28 def __call__(self, **kwargs):
29 # type: (**Any) -> Union[str, Optional[CWLObjectType]]
30 runtime_context = self.factory.runtime_context.copy()
31 runtime_context.basedir = os.getcwd()
32 out, status = self.factory.executor(self.t, kwargs, runtime_context)
33 if status != "success":
34 raise WorkflowStatus(out, status)
35 else:
36 return out
37
38
39 class Factory:
40 """Easy way to load a CWL document for execution."""
41
42 def __init__(
43 self,
44 executor: Optional[JobExecutor] = None,
45 loading_context: Optional[LoadingContext] = None,
46 runtime_context: Optional[RuntimeContext] = None,
47 ) -> None:
48 if executor is None:
49 executor = SingleJobExecutor()
50 self.executor = executor
51 self.loading_context = loading_context
52 if loading_context is None:
53 self.loading_context = LoadingContext()
54 if runtime_context is None:
55 self.runtime_context = RuntimeContext()
56 else:
57 self.runtime_context = runtime_context
58
59 def make(self, cwl: Union[str, Dict[str, Any]]) -> Callable:
60 """Instantiate a CWL object from a CWl document."""
61 load = load_tool.load_tool(cwl, self.loading_context)
62 if isinstance(load, int):
63 raise WorkflowException("Error loading tool")
64 return Callable(load, self)