Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/galaxy/tool_util/cwl/parser.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """ This module provides proxy objects around objects from the common | |
2 workflow language reference implementation library cwltool. These proxies | |
3 adapt cwltool to Galaxy features and abstract the library away from the rest | |
4 of the framework. | |
5 """ | |
6 | |
7 import base64 | |
8 import copy | |
9 import json | |
10 import logging | |
11 import os | |
12 import pickle | |
13 from abc import ABCMeta, abstractmethod | |
14 from uuid import uuid4 | |
15 | |
16 | |
17 from galaxy.exceptions import MessageException | |
18 from galaxy.util import ( | |
19 listify, | |
20 safe_makedirs, | |
21 unicodify, | |
22 ) | |
23 from galaxy.util.bunch import Bunch | |
24 from .cwltool_deps import ( | |
25 beta_relaxed_fmt_check, | |
26 ensure_cwltool_available, | |
27 getdefault, | |
28 pathmapper, | |
29 process, | |
30 ref_resolver, | |
31 relink_initialworkdir, | |
32 RuntimeContext, | |
33 sourceline, | |
34 StdFsAccess, | |
35 ) | |
36 from .representation import ( | |
37 field_to_field_type, | |
38 FIELD_TYPE_REPRESENTATION, | |
39 INPUT_TYPE, | |
40 type_descriptions_for_field_types, | |
41 USE_FIELD_TYPES, | |
42 USE_STEP_PARAMETERS, | |
43 ) | |
44 from .schema import ( | |
45 non_strict_non_validating_schema_loader, | |
46 schema_loader, | |
47 ) | |
48 from .util import SECONDARY_FILES_EXTRA_PREFIX | |
49 | |
50 log = logging.getLogger(__name__) | |
51 | |
52 JOB_JSON_FILE = ".cwl_job.json" | |
53 | |
54 DOCKER_REQUIREMENT = "DockerRequirement" | |
55 SUPPORTED_TOOL_REQUIREMENTS = [ | |
56 "CreateFileRequirement", | |
57 "DockerRequirement", | |
58 "EnvVarRequirement", | |
59 "InitialWorkDirRequirement", | |
60 "InlineJavascriptRequirement", | |
61 "ResourceRequirement", | |
62 "ShellCommandRequirement", | |
63 "ScatterFeatureRequirement", | |
64 "SchemaDefRequirement", | |
65 "SubworkflowFeatureRequirement", | |
66 "StepInputExpressionRequirement", | |
67 "MultipleInputFeatureRequirement", | |
68 ] | |
69 | |
70 | |
71 SUPPORTED_WORKFLOW_REQUIREMENTS = SUPPORTED_TOOL_REQUIREMENTS + [ | |
72 ] | |
73 | |
74 PERSISTED_REPRESENTATION = "cwl_tool_object" | |
75 | |
76 | |
77 def tool_proxy(tool_path=None, tool_object=None, strict_cwl_validation=True, tool_directory=None, uuid=None): | |
78 """ Provide a proxy object to cwltool data structures to just | |
79 grab relevant data. | |
80 """ | |
81 ensure_cwltool_available() | |
82 tool = _to_cwl_tool_object( | |
83 tool_path=tool_path, | |
84 tool_object=tool_object, | |
85 strict_cwl_validation=strict_cwl_validation, | |
86 tool_directory=tool_directory, | |
87 uuid=uuid | |
88 ) | |
89 return tool | |
90 | |
91 | |
92 def tool_proxy_from_persistent_representation(persisted_tool, strict_cwl_validation=True, tool_directory=None): | |
93 """Load a ToolProxy from a previously persisted representation.""" | |
94 ensure_cwltool_available() | |
95 if PERSISTED_REPRESENTATION == "cwl_tool_object": | |
96 kwds = {"cwl_tool_object": ToolProxy.from_persistent_representation(persisted_tool)} | |
97 else: | |
98 raw_process_reference = persisted_tool # ??? | |
99 kwds = {"raw_process_reference": ToolProxy.from_persistent_representation(raw_process_reference)} | |
100 uuid = persisted_tool["uuid"] | |
101 tool = _to_cwl_tool_object(uuid=uuid, strict_cwl_validation=strict_cwl_validation, tool_directory=tool_directory, **kwds) | |
102 return tool | |
103 | |
104 | |
105 def workflow_proxy(workflow_path, strict_cwl_validation=True): | |
106 ensure_cwltool_available() | |
107 workflow = _to_cwl_workflow_object(workflow_path, strict_cwl_validation=strict_cwl_validation) | |
108 return workflow | |
109 | |
110 | |
111 def load_job_proxy(job_directory, strict_cwl_validation=True): | |
112 ensure_cwltool_available() | |
113 job_objects_path = os.path.join(job_directory, JOB_JSON_FILE) | |
114 job_objects = json.load(open(job_objects_path)) | |
115 job_inputs = job_objects["job_inputs"] | |
116 output_dict = job_objects["output_dict"] | |
117 # Any reason to retain older tool_path variant of this? Probably not? | |
118 if "tool_path" in job_objects: | |
119 tool_path = job_objects["tool_path"] | |
120 cwl_tool = tool_proxy(tool_path, strict_cwl_validation=strict_cwl_validation) | |
121 else: | |
122 persisted_tool = job_objects["tool_representation"] | |
123 cwl_tool = tool_proxy_from_persistent_representation(persisted_tool=persisted_tool, strict_cwl_validation=strict_cwl_validation) | |
124 cwl_job = cwl_tool.job_proxy(job_inputs, output_dict, job_directory=job_directory) | |
125 return cwl_job | |
126 | |
127 | |
128 def _to_cwl_tool_object(tool_path=None, tool_object=None, cwl_tool_object=None, raw_process_reference=None, strict_cwl_validation=False, tool_directory=None, uuid=None): | |
129 if uuid is None: | |
130 uuid = str(uuid4()) | |
131 schema_loader = _schema_loader(strict_cwl_validation) | |
132 if raw_process_reference is None and tool_path is not None: | |
133 assert cwl_tool_object is None | |
134 assert tool_object is None | |
135 | |
136 raw_process_reference = schema_loader.raw_process_reference(tool_path) | |
137 cwl_tool = schema_loader.tool( | |
138 raw_process_reference=raw_process_reference, | |
139 ) | |
140 elif tool_object is not None: | |
141 assert raw_process_reference is None | |
142 assert cwl_tool_object is None | |
143 | |
144 # Allow loading tools from YAML... | |
145 from ruamel import yaml as ryaml | |
146 as_str = json.dumps(tool_object) | |
147 tool_object = ryaml.round_trip_load(as_str) | |
148 path = tool_directory | |
149 if path is None: | |
150 path = os.getcwd() | |
151 uri = ref_resolver.file_uri(path) + "/" | |
152 sourceline.add_lc_filename(tool_object, uri) | |
153 raw_process_reference = schema_loader.raw_process_reference_for_object( | |
154 tool_object, | |
155 uri=uri | |
156 ) | |
157 cwl_tool = schema_loader.tool( | |
158 raw_process_reference=raw_process_reference, | |
159 ) | |
160 else: | |
161 cwl_tool = cwl_tool_object | |
162 | |
163 if isinstance(cwl_tool, int): | |
164 raise Exception("Failed to load tool.") | |
165 | |
166 raw_tool = cwl_tool.tool | |
167 # Apply Galaxy hacks to CWL tool representation to bridge semantic differences | |
168 # between Galaxy and cwltool. | |
169 _hack_cwl_requirements(cwl_tool) | |
170 check_requirements(raw_tool) | |
171 return _cwl_tool_object_to_proxy(cwl_tool, uuid, raw_process_reference=raw_process_reference, tool_path=tool_path) | |
172 | |
173 | |
174 def _cwl_tool_object_to_proxy(cwl_tool, uuid, raw_process_reference=None, tool_path=None): | |
175 raw_tool = cwl_tool.tool | |
176 if "class" not in raw_tool: | |
177 raise Exception("File does not declare a class, not a valid Draft 3+ CWL tool.") | |
178 | |
179 process_class = raw_tool["class"] | |
180 if process_class == "CommandLineTool": | |
181 proxy_class = CommandLineToolProxy | |
182 elif process_class == "ExpressionTool": | |
183 proxy_class = ExpressionToolProxy | |
184 else: | |
185 raise Exception("File not a CWL CommandLineTool.") | |
186 top_level_object = tool_path is not None | |
187 if top_level_object and ("cwlVersion" not in raw_tool): | |
188 raise Exception("File does not declare a CWL version, pre-draft 3 CWL tools are not supported.") | |
189 | |
190 proxy = proxy_class(cwl_tool, uuid, raw_process_reference, tool_path) | |
191 return proxy | |
192 | |
193 | |
194 def _to_cwl_workflow_object(workflow_path, strict_cwl_validation=None): | |
195 proxy_class = WorkflowProxy | |
196 cwl_workflow = _schema_loader(strict_cwl_validation).tool(path=workflow_path) | |
197 raw_workflow = cwl_workflow.tool | |
198 check_requirements(raw_workflow, tool=False) | |
199 | |
200 proxy = proxy_class(cwl_workflow, workflow_path) | |
201 return proxy | |
202 | |
203 | |
204 def _schema_loader(strict_cwl_validation): | |
205 target_schema_loader = schema_loader if strict_cwl_validation else non_strict_non_validating_schema_loader | |
206 return target_schema_loader | |
207 | |
208 | |
209 def _hack_cwl_requirements(cwl_tool): | |
210 move_to_hints = [] | |
211 for i, requirement in enumerate(cwl_tool.requirements): | |
212 if requirement["class"] == DOCKER_REQUIREMENT: | |
213 move_to_hints.insert(0, i) | |
214 | |
215 for i in move_to_hints: | |
216 del cwl_tool.requirements[i] | |
217 cwl_tool.hints.append(requirement) | |
218 | |
219 | |
220 def check_requirements(rec, tool=True): | |
221 if isinstance(rec, dict): | |
222 if "requirements" in rec: | |
223 for r in rec["requirements"]: | |
224 if tool: | |
225 possible = SUPPORTED_TOOL_REQUIREMENTS | |
226 else: | |
227 possible = SUPPORTED_WORKFLOW_REQUIREMENTS | |
228 if r["class"] not in possible: | |
229 raise Exception("Unsupported requirement %s" % r["class"]) | |
230 for d in rec: | |
231 check_requirements(rec[d], tool=tool) | |
232 if isinstance(rec, list): | |
233 for d in rec: | |
234 check_requirements(d, tool=tool) | |
235 | |
236 | |
237 class ToolProxy(metaclass=ABCMeta): | |
238 | |
239 def __init__(self, tool, uuid, raw_process_reference=None, tool_path=None): | |
240 self._tool = tool | |
241 self._uuid = uuid | |
242 self._tool_path = tool_path | |
243 self._raw_process_reference = raw_process_reference | |
244 # remove input parameter formats from CWL files so that cwltool | |
245 # does not complain they are missing in the input data | |
246 for input_field in self._tool.inputs_record_schema["fields"]: | |
247 if 'format' in input_field: | |
248 del input_field['format'] | |
249 | |
250 def job_proxy(self, input_dict, output_dict, job_directory="."): | |
251 """ Build a cwltool.job.Job describing computation using a input_json | |
252 Galaxy will generate mapping the Galaxy description of the inputs into | |
253 a cwltool compatible variant. | |
254 """ | |
255 return JobProxy(self, input_dict, output_dict, job_directory=job_directory) | |
256 | |
257 @property | |
258 def id(self): | |
259 raw_id = self._tool.tool.get("id", None) | |
260 return raw_id | |
261 | |
262 def galaxy_id(self): | |
263 raw_id = self.id | |
264 tool_id = None | |
265 # don't reduce "search.cwl#index" to search | |
266 if raw_id: | |
267 tool_id = os.path.basename(raw_id) | |
268 # tool_id = os.path.splitext(os.path.basename(raw_id))[0] | |
269 if not tool_id: | |
270 return self._uuid | |
271 assert tool_id | |
272 if tool_id.startswith("#"): | |
273 tool_id = tool_id[1:] | |
274 return tool_id | |
275 | |
276 @abstractmethod | |
277 def input_instances(self): | |
278 """ Return InputInstance objects describing mapping to Galaxy inputs. """ | |
279 | |
280 @abstractmethod | |
281 def output_instances(self): | |
282 """ Return OutputInstance objects describing mapping to Galaxy inputs. """ | |
283 | |
284 @abstractmethod | |
285 def docker_identifier(self): | |
286 """ Return docker identifier for embedding in tool description. """ | |
287 | |
288 @abstractmethod | |
289 def description(self): | |
290 """ Return description to tool. """ | |
291 | |
292 @abstractmethod | |
293 def label(self): | |
294 """ Return label for tool. """ | |
295 | |
296 def to_persistent_representation(self): | |
297 """Return a JSON representation of this tool. Not for serialization | |
298 over the wire, but serialization in a database.""" | |
299 # TODO: Replace this with some more readable serialization, | |
300 # I really don't like using pickle here. | |
301 if PERSISTED_REPRESENTATION == "cwl_tool_object": | |
302 persisted_obj = remove_pickle_problems(self._tool) | |
303 else: | |
304 persisted_obj = self._raw_process_reference | |
305 return { | |
306 "class": self._class, | |
307 "pickle": unicodify(base64.b64encode(pickle.dumps(persisted_obj, pickle.HIGHEST_PROTOCOL))), | |
308 "uuid": self._uuid, | |
309 } | |
310 | |
311 @staticmethod | |
312 def from_persistent_representation(as_object): | |
313 """Recover an object serialized with to_persistent_representation.""" | |
314 if "class" not in as_object: | |
315 raise Exception("Failed to deserialize tool proxy from JSON object - no class found.") | |
316 if "pickle" not in as_object: | |
317 raise Exception("Failed to deserialize tool proxy from JSON object - no pickle representation found.") | |
318 if "uuid" not in as_object: | |
319 raise Exception("Failed to deserialize tool proxy from JSON object - no uuid found.") | |
320 to_unpickle = base64.b64decode(as_object["pickle"]) | |
321 loaded_object = pickle.loads(to_unpickle) | |
322 return loaded_object | |
323 | |
324 | |
325 class CommandLineToolProxy(ToolProxy): | |
326 _class = "CommandLineTool" | |
327 | |
328 def description(self): | |
329 # Don't use description - typically too verbose. | |
330 return '' | |
331 | |
332 def doc(self): | |
333 # TODO: parse multiple lines and merge - valid in cwl-1.1 | |
334 doc = self._tool.tool.get('doc') | |
335 return doc | |
336 | |
337 def label(self): | |
338 label = self._tool.tool.get('label') | |
339 | |
340 if label is not None: | |
341 return label.partition(":")[0] # return substring before ':' | |
342 else: | |
343 return '' | |
344 | |
345 def input_fields(self): | |
346 input_records_schema = self._eval_schema(self._tool.inputs_record_schema) | |
347 if input_records_schema["type"] != "record": | |
348 raise Exception("Unhandled CWL tool input structure") | |
349 | |
350 # TODO: handle this somewhere else? | |
351 # schemadef_req_tool_param | |
352 rval = [] | |
353 for input in input_records_schema["fields"]: | |
354 input_copy = copy.deepcopy(input) | |
355 input_type = input.get("type") | |
356 if isinstance(input_type, list) or isinstance(input_type, dict): | |
357 rval.append(input_copy) | |
358 continue | |
359 | |
360 if input_type in self._tool.schemaDefs: | |
361 input_copy["type"] = self._tool.schemaDefs[input_type] | |
362 | |
363 rval.append(input_copy) | |
364 return rval | |
365 | |
366 def _eval_schema(self, io_schema): | |
367 schema_type = io_schema.get("type") | |
368 if schema_type in self._tool.schemaDefs: | |
369 io_schema = self._tool.schemaDefs[schema_type] | |
370 return io_schema | |
371 | |
372 def input_instances(self): | |
373 return [_outer_field_to_input_instance(_) for _ in self.input_fields()] | |
374 | |
375 def output_instances(self): | |
376 outputs_schema = self._eval_schema(self._tool.outputs_record_schema) | |
377 if outputs_schema["type"] != "record": | |
378 raise Exception("Unhandled CWL tool output structure") | |
379 | |
380 rval = [] | |
381 for output in outputs_schema["fields"]: | |
382 rval.append(_simple_field_to_output(output)) | |
383 | |
384 return rval | |
385 | |
386 def docker_identifier(self): | |
387 for hint in self.hints_or_requirements_of_class("DockerRequirement"): | |
388 if "dockerImageId" in hint: | |
389 return hint["dockerImageId"] | |
390 else: | |
391 return hint["dockerPull"] | |
392 | |
393 return None | |
394 | |
395 def hints_or_requirements_of_class(self, class_name): | |
396 tool = self._tool.tool | |
397 reqs_and_hints = tool.get("requirements", []) + tool.get("hints", []) | |
398 for hint in reqs_and_hints: | |
399 if hint["class"] == class_name: | |
400 yield hint | |
401 | |
402 def software_requirements(self): | |
403 # Roughest imaginable pass at parsing requirements, really need to take in specs, handle | |
404 # multiple versions, etc... | |
405 tool = self._tool.tool | |
406 reqs_and_hints = tool.get("requirements", []) + tool.get("hints", []) | |
407 requirements = [] | |
408 for hint in reqs_and_hints: | |
409 if hint["class"] == "SoftwareRequirement": | |
410 packages = hint.get("packages", []) | |
411 for package in packages: | |
412 versions = package.get("version", []) | |
413 first_version = None if not versions else versions[0] | |
414 requirements.append((package["package"], first_version)) | |
415 return requirements | |
416 | |
417 @property | |
418 def requirements(self): | |
419 return getattr(self._tool, "requirements", []) | |
420 | |
421 | |
422 class ExpressionToolProxy(CommandLineToolProxy): | |
423 _class = "ExpressionTool" | |
424 | |
425 | |
426 class JobProxy: | |
427 | |
428 def __init__(self, tool_proxy, input_dict, output_dict, job_directory): | |
429 self._tool_proxy = tool_proxy | |
430 self._input_dict = input_dict | |
431 self._output_dict = output_dict | |
432 self._job_directory = job_directory | |
433 | |
434 self._final_output = None | |
435 self._ok = True | |
436 self._cwl_job = None | |
437 self._is_command_line_job = None | |
438 | |
439 self._normalize_job() | |
440 | |
441 def cwl_job(self): | |
442 self._ensure_cwl_job_initialized() | |
443 return self._cwl_job | |
444 | |
445 @property | |
446 def is_command_line_job(self): | |
447 self._ensure_cwl_job_initialized() | |
448 assert self._is_command_line_job is not None | |
449 return self._is_command_line_job | |
450 | |
451 def _ensure_cwl_job_initialized(self): | |
452 if self._cwl_job is None: | |
453 job_args = dict( | |
454 basedir=self._job_directory, | |
455 select_resources=self._select_resources, | |
456 outdir=os.path.join(self._job_directory, "working"), | |
457 tmpdir=os.path.join(self._job_directory, "cwltmp"), | |
458 stagedir=os.path.join(self._job_directory, "cwlstagedir"), | |
459 use_container=False, | |
460 beta_relaxed_fmt_check=beta_relaxed_fmt_check, | |
461 ) | |
462 | |
463 args = [] | |
464 kwargs = {} | |
465 if RuntimeContext is not None: | |
466 args.append(RuntimeContext(job_args)) | |
467 else: | |
468 kwargs = job_args | |
469 self._cwl_job = next(self._tool_proxy._tool.job( | |
470 self._input_dict, | |
471 self._output_callback, | |
472 *args, **kwargs | |
473 )) | |
474 self._is_command_line_job = hasattr(self._cwl_job, "command_line") | |
475 | |
476 def _normalize_job(self): | |
477 # Somehow reuse whatever causes validate in cwltool... maybe? | |
478 def pathToLoc(p): | |
479 if "location" not in p and "path" in p: | |
480 p["location"] = p["path"] | |
481 del p["path"] | |
482 | |
483 runtime_context = RuntimeContext({}) | |
484 make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) | |
485 fs_access = make_fs_access(runtime_context.basedir) | |
486 process.fill_in_defaults(self._tool_proxy._tool.tool["inputs"], self._input_dict, fs_access) | |
487 process.visit_class(self._input_dict, ("File", "Directory"), pathToLoc) | |
488 # TODO: Why doesn't fillInDefault fill in locations instead of paths? | |
489 process.normalizeFilesDirs(self._input_dict) | |
490 # TODO: validate like cwltool process _init_job. | |
491 # validate.validate_ex(self.names.get_name("input_record_schema", ""), builder.job, | |
492 # strict=False, logger=_logger_validation_warnings) | |
493 | |
494 def rewrite_inputs_for_staging(self): | |
495 if hasattr(self._cwl_job, "pathmapper"): | |
496 pass | |
497 # DO SOMETHING LIKE THE FOLLOWING? | |
498 # path_rewrites = {} | |
499 # for f, p in self._cwl_job.pathmapper.items(): | |
500 # if not p.staged: | |
501 # continue | |
502 # if p.type in ("File", "Directory"): | |
503 # path_rewrites[p.resolved] = p.target | |
504 # for key, value in self._input_dict.items(): | |
505 # if key in path_rewrites: | |
506 # self._input_dict[key]["location"] = path_rewrites[value] | |
507 else: | |
508 stagedir = os.path.join(self._job_directory, "cwlstagedir") | |
509 safe_makedirs(stagedir) | |
510 | |
511 def stage_recursive(value): | |
512 is_list = isinstance(value, list) | |
513 is_dict = isinstance(value, dict) | |
514 log.info(f"handling value {value}, is_list {is_list}, is_dict {is_dict}") | |
515 if is_list: | |
516 for val in value: | |
517 stage_recursive(val) | |
518 elif is_dict: | |
519 if "location" in value and "basename" in value: | |
520 location = value["location"] | |
521 basename = value["basename"] | |
522 if not location.endswith(basename): # TODO: sep()[-1] | |
523 staged_loc = os.path.join(stagedir, basename) | |
524 if not os.path.exists(staged_loc): | |
525 os.symlink(location, staged_loc) | |
526 value["location"] = staged_loc | |
527 for dict_value in value.values(): | |
528 stage_recursive(dict_value) | |
529 else: | |
530 log.info("skipping simple value...") | |
531 stage_recursive(self._input_dict) | |
532 | |
533 def _select_resources(self, request, runtime_context=None): | |
534 new_request = request.copy() | |
535 new_request["cores"] = "$GALAXY_SLOTS" | |
536 return new_request | |
537 | |
538 @property | |
539 def command_line(self): | |
540 if self.is_command_line_job: | |
541 return self.cwl_job().command_line | |
542 else: | |
543 return ["true"] | |
544 | |
545 @property | |
546 def stdin(self): | |
547 if self.is_command_line_job: | |
548 return self.cwl_job().stdin | |
549 else: | |
550 return None | |
551 | |
552 @property | |
553 def stdout(self): | |
554 if self.is_command_line_job: | |
555 return self.cwl_job().stdout | |
556 else: | |
557 return None | |
558 | |
559 @property | |
560 def stderr(self): | |
561 if self.is_command_line_job: | |
562 return self.cwl_job().stderr | |
563 else: | |
564 return None | |
565 | |
566 @property | |
567 def environment(self): | |
568 if self.is_command_line_job: | |
569 return self.cwl_job().environment | |
570 else: | |
571 return {} | |
572 | |
573 @property | |
574 def generate_files(self): | |
575 if self.is_command_line_job: | |
576 return self.cwl_job().generatefiles | |
577 else: | |
578 return {} | |
579 | |
580 def _output_callback(self, out, process_status): | |
581 self._process_status = process_status | |
582 if process_status == "success": | |
583 self._final_output = out | |
584 else: | |
585 self._ok = False | |
586 | |
587 log.info(f"Output are {out}, status is {process_status}") | |
588 | |
589 def collect_outputs(self, tool_working_directory, rcode): | |
590 if not self.is_command_line_job: | |
591 cwl_job = self.cwl_job() | |
592 if RuntimeContext is not None: | |
593 cwl_job.run( | |
594 RuntimeContext({}) | |
595 ) | |
596 else: | |
597 cwl_job.run() | |
598 if not self._ok: | |
599 raise Exception("Final process state not ok, [%s]" % self._process_status) | |
600 return self._final_output | |
601 else: | |
602 return self.cwl_job().collect_outputs(tool_working_directory, rcode) | |
603 | |
604 def save_job(self): | |
605 job_file = JobProxy._job_file(self._job_directory) | |
606 job_objects = { | |
607 # "tool_path": os.path.abspath(self._tool_proxy._tool_path), | |
608 "tool_representation": self._tool_proxy.to_persistent_representation(), | |
609 "job_inputs": self._input_dict, | |
610 "output_dict": self._output_dict, | |
611 } | |
612 json.dump(job_objects, open(job_file, "w")) | |
613 | |
614 def _output_extra_files_dir(self, output_name): | |
615 output_id = self.output_id(output_name) | |
616 return os.path.join(self._job_directory, "outputs", "dataset_%s_files" % output_id) | |
617 | |
618 def output_id(self, output_name): | |
619 output_id = self._output_dict[output_name]["id"] | |
620 return output_id | |
621 | |
622 def output_path(self, output_name): | |
623 output_id = self._output_dict[output_name]["path"] | |
624 return output_id | |
625 | |
626 def output_directory_contents_dir(self, output_name, create=False): | |
627 extra_files_dir = self._output_extra_files_dir(output_name) | |
628 return extra_files_dir | |
629 | |
630 def output_secondary_files_dir(self, output_name, create=False): | |
631 extra_files_dir = self._output_extra_files_dir(output_name) | |
632 secondary_files_dir = os.path.join(extra_files_dir, SECONDARY_FILES_EXTRA_PREFIX) | |
633 if create and not os.path.exists(secondary_files_dir): | |
634 safe_makedirs(secondary_files_dir) | |
635 return secondary_files_dir | |
636 | |
637 def stage_files(self): | |
638 cwl_job = self.cwl_job() | |
639 | |
640 def stageFunc(resolved_path, target_path): | |
641 log.info(f"resolving {resolved_path} to {target_path}") | |
642 try: | |
643 os.symlink(resolved_path, target_path) | |
644 except OSError: | |
645 pass | |
646 | |
647 if hasattr(cwl_job, "pathmapper"): | |
648 process.stage_files(cwl_job.pathmapper, stageFunc, ignore_writable=True, symlink=False) | |
649 | |
650 if hasattr(cwl_job, "generatefiles"): | |
651 outdir = os.path.join(self._job_directory, "working") | |
652 # TODO: Why doesn't cwl_job.generatemapper work? | |
653 generate_mapper = pathmapper.PathMapper(cwl_job.generatefiles["listing"], | |
654 outdir, outdir, separateDirs=False) | |
655 # TODO: figure out what inplace_update should be. | |
656 inplace_update = cwl_job.inplace_update | |
657 process.stage_files(generate_mapper, stageFunc, ignore_writable=inplace_update, symlink=False) | |
658 relink_initialworkdir(generate_mapper, outdir, outdir, inplace_update=inplace_update) | |
659 # else: expression tools do not have a path mapper. | |
660 | |
661 @staticmethod | |
662 def _job_file(job_directory): | |
663 return os.path.join(job_directory, JOB_JSON_FILE) | |
664 | |
665 | |
666 class WorkflowProxy: | |
667 | |
668 def __init__(self, workflow, workflow_path=None): | |
669 self._workflow = workflow | |
670 self._workflow_path = workflow_path | |
671 self._step_proxies = None | |
672 | |
673 @property | |
674 def cwl_id(self): | |
675 return self._workflow.tool["id"] | |
676 | |
677 def get_outputs_for_label(self, label): | |
678 outputs = [] | |
679 for output in self._workflow.tool['outputs']: | |
680 step, output_name = split_step_references( | |
681 output["outputSource"], | |
682 multiple=False, | |
683 workflow_id=self.cwl_id, | |
684 ) | |
685 if step == label: | |
686 output_id = output["id"] | |
687 if "#" not in self.cwl_id: | |
688 _, output_label = output_id.rsplit("#", 1) | |
689 else: | |
690 _, output_label = output_id.rsplit("/", 1) | |
691 | |
692 outputs.append({ | |
693 "output_name": output_name, | |
694 "label": output_label, | |
695 }) | |
696 return outputs | |
697 | |
698 def tool_reference_proxies(self): | |
699 """Fetch tool source definitions for all referenced tools.""" | |
700 references = [] | |
701 for step in self.step_proxies(): | |
702 references.extend(step.tool_reference_proxies()) | |
703 return references | |
704 | |
705 def step_proxies(self): | |
706 if self._step_proxies is None: | |
707 proxies = [] | |
708 num_input_steps = len(self._workflow.tool['inputs']) | |
709 for i, step in enumerate(self._workflow.steps): | |
710 proxies.append(build_step_proxy(self, step, i + num_input_steps)) | |
711 self._step_proxies = proxies | |
712 return self._step_proxies | |
713 | |
714 @property | |
715 def runnables(self): | |
716 runnables = [] | |
717 for step in self._workflow.steps: | |
718 if "run" in step.tool: | |
719 runnables.append(step.tool["run"]) | |
720 return runnables | |
721 | |
722 def cwl_ids_to_index(self, step_proxies): | |
723 index = 0 | |
724 cwl_ids_to_index = {} | |
725 for input_dict in self._workflow.tool['inputs']: | |
726 cwl_ids_to_index[input_dict["id"]] = index | |
727 index += 1 | |
728 | |
729 for step_proxy in step_proxies: | |
730 cwl_ids_to_index[step_proxy.cwl_id] = index | |
731 index += 1 | |
732 | |
733 return cwl_ids_to_index | |
734 | |
735 @property | |
736 def output_labels(self): | |
737 return [self.jsonld_id_to_label(o['id']) for o in self._workflow.tool['outputs']] | |
738 | |
739 def input_connections_by_step(self, step_proxies): | |
740 cwl_ids_to_index = self.cwl_ids_to_index(step_proxies) | |
741 input_connections_by_step = [] | |
742 for step_proxy in step_proxies: | |
743 input_connections_step = {} | |
744 for input_proxy in step_proxy.input_proxies: | |
745 cwl_source_id = input_proxy.cwl_source_id | |
746 input_name = input_proxy.input_name | |
747 # Consider only allow multiple if MultipleInputFeatureRequirement is enabled | |
748 for (output_step_name, output_name) in split_step_references(cwl_source_id, workflow_id=self.cwl_id): | |
749 if "#" in self.cwl_id: | |
750 sep_on = "/" | |
751 else: | |
752 sep_on = "#" | |
753 output_step_id = self.cwl_id + sep_on + output_step_name | |
754 | |
755 if output_step_id not in cwl_ids_to_index: | |
756 template = "Output [%s] does not appear in ID-to-index map [%s]." | |
757 msg = template % (output_step_id, cwl_ids_to_index.keys()) | |
758 raise AssertionError(msg) | |
759 | |
760 if input_name not in input_connections_step: | |
761 input_connections_step[input_name] = [] | |
762 | |
763 input_connections_step[input_name].append({ | |
764 "id": cwl_ids_to_index[output_step_id], | |
765 "output_name": output_name, | |
766 "input_type": "dataset" | |
767 }) | |
768 | |
769 input_connections_by_step.append(input_connections_step) | |
770 | |
771 return input_connections_by_step | |
772 | |
773 def to_dict(self): | |
774 name = os.path.basename(self._workflow.tool.get('label') or self._workflow_path or 'TODO - derive a name from ID') | |
775 steps = {} | |
776 | |
777 step_proxies = self.step_proxies() | |
778 input_connections_by_step = self.input_connections_by_step(step_proxies) | |
779 index = 0 | |
780 for i, input_dict in enumerate(self._workflow.tool['inputs']): | |
781 steps[index] = self.cwl_input_to_galaxy_step(input_dict, i) | |
782 index += 1 | |
783 | |
784 for i, step_proxy in enumerate(step_proxies): | |
785 input_connections = input_connections_by_step[i] | |
786 steps[index] = step_proxy.to_dict(input_connections) | |
787 index += 1 | |
788 | |
789 return { | |
790 'name': name, | |
791 'steps': steps, | |
792 'annotation': self.cwl_object_to_annotation(self._workflow.tool), | |
793 } | |
794 | |
795 def find_inputs_step_index(self, label): | |
796 for i, input in enumerate(self._workflow.tool['inputs']): | |
797 if self.jsonld_id_to_label(input["id"]) == label: | |
798 return i | |
799 | |
800 raise Exception("Failed to find index for label %s" % label) | |
801 | |
802 def jsonld_id_to_label(self, id): | |
803 if "#" in self.cwl_id: | |
804 return id.rsplit("/", 1)[-1] | |
805 else: | |
806 return id.rsplit("#", 1)[-1] | |
807 | |
808 def cwl_input_to_galaxy_step(self, input, i): | |
809 input_type = input["type"] | |
810 label = self.jsonld_id_to_label(input["id"]) | |
811 input_as_dict = { | |
812 "id": i, | |
813 "label": label, | |
814 "position": {"left": 0, "top": 0}, | |
815 "annotation": self.cwl_object_to_annotation(input), | |
816 "input_connections": {}, # Should the Galaxy API really require this? - Seems to. | |
817 "workflow_outputs": self.get_outputs_for_label(label), | |
818 } | |
819 | |
820 if input_type == "File" and "default" not in input: | |
821 input_as_dict["type"] = "data_input" | |
822 elif isinstance(input_type, dict) and input_type.get("type") == "array": | |
823 input_as_dict["type"] = "data_collection_input" | |
824 input_as_dict["collection_type"] = "list" | |
825 elif isinstance(input_type, dict) and input_type.get("type") == "record": | |
826 input_as_dict["type"] = "data_collection_input" | |
827 input_as_dict["collection_type"] = "record" | |
828 else: | |
829 if USE_STEP_PARAMETERS: | |
830 input_as_dict["type"] = "parameter_input" | |
831 # TODO: dispatch on actual type so this doesn't always need | |
832 # to be field - simpler types could be simpler inputs. | |
833 tool_state = {} | |
834 tool_state["parameter_type"] = "field" | |
835 default_set = "default" in input | |
836 default_value = input.get("default") | |
837 optional = default_set | |
838 if isinstance(input_type, list) and "null" in input_type: | |
839 optional = True | |
840 if not optional and isinstance(input_type, dict) and "type" in input_type: | |
841 raise ValueError("'type' detected in non-optional input dictionary.") | |
842 if default_set: | |
843 tool_state["default"] = {"src": "json", "value": default_value} | |
844 tool_state["optional"] = optional | |
845 input_as_dict["tool_state"] = tool_state | |
846 else: | |
847 input_as_dict["type"] = "data_input" | |
848 # TODO: format = expression.json | |
849 | |
850 return input_as_dict | |
851 | |
852 def cwl_object_to_annotation(self, cwl_obj): | |
853 return cwl_obj.get("doc", None) | |
854 | |
855 | |
856 def split_step_references(step_references, workflow_id=None, multiple=True): | |
857 """Split a CWL step input or output reference into step id and name.""" | |
858 # Trim off the workflow id part of the reference. | |
859 step_references = listify(step_references) | |
860 split_references = [] | |
861 | |
862 for step_reference in step_references: | |
863 if workflow_id is None: | |
864 # This path works fine for some simple workflows - but not so much | |
865 # for subworkflows (maybe same for $graph workflows?) | |
866 assert "#" in step_reference | |
867 _, step_reference = step_reference.split("#", 1) | |
868 else: | |
869 if "#" in workflow_id: | |
870 sep_on = "/" | |
871 else: | |
872 sep_on = "#" | |
873 expected_prefix = workflow_id + sep_on | |
874 if not step_reference.startswith(expected_prefix): | |
875 raise AssertionError(f"step_reference [{step_reference}] doesn't start with {expected_prefix}") | |
876 step_reference = step_reference[len(expected_prefix):] | |
877 | |
878 # Now just grab the step name and input/output name. | |
879 assert "#" not in step_reference | |
880 if "/" in step_reference: | |
881 step_name, io_name = step_reference.split("/", 1) | |
882 else: | |
883 # Referencing an input, not a step. | |
884 # In Galaxy workflows input steps have an implicit output named | |
885 # "output" for consistency with tools - in cwl land | |
886 # just the input name is referenced. | |
887 step_name = step_reference | |
888 io_name = "output" | |
889 split_references.append((step_name, io_name)) | |
890 | |
891 if multiple: | |
892 return split_references | |
893 else: | |
894 assert len(split_references) == 1 | |
895 return split_references[0] | |
896 | |
897 | |
898 def build_step_proxy(workflow_proxy, step, index): | |
899 step_type = step.embedded_tool.tool["class"] | |
900 if step_type == "Workflow": | |
901 return SubworkflowStepProxy(workflow_proxy, step, index) | |
902 else: | |
903 return ToolStepProxy(workflow_proxy, step, index) | |
904 | |
905 | |
906 class BaseStepProxy: | |
907 | |
908 def __init__(self, workflow_proxy, step, index): | |
909 self._workflow_proxy = workflow_proxy | |
910 self._step = step | |
911 self._index = index | |
912 self._uuid = str(uuid4()) | |
913 self._input_proxies = None | |
914 | |
915 @property | |
916 def step_class(self): | |
917 return self.cwl_tool_object.tool["class"] | |
918 | |
919 @property | |
920 def cwl_id(self): | |
921 return self._step.id | |
922 | |
923 @property | |
924 def cwl_workflow_id(self): | |
925 return self._workflow_proxy.cwl_id | |
926 | |
927 @property | |
928 def requirements(self): | |
929 return self._step.requirements | |
930 | |
931 @property | |
932 def hints(self): | |
933 return self._step.hints | |
934 | |
935 @property | |
936 def label(self): | |
937 label = self._workflow_proxy.jsonld_id_to_label(self._step.id) | |
938 return label | |
939 | |
940 def galaxy_workflow_outputs_list(self): | |
941 return self._workflow_proxy.get_outputs_for_label(self.label) | |
942 | |
943 @property | |
944 def cwl_tool_object(self): | |
945 return self._step.embedded_tool | |
946 | |
947 @property | |
948 def input_proxies(self): | |
949 if self._input_proxies is None: | |
950 input_proxies = [] | |
951 cwl_inputs = self._step.tool["inputs"] | |
952 for cwl_input in cwl_inputs: | |
953 input_proxies.append(InputProxy(self, cwl_input)) | |
954 self._input_proxies = input_proxies | |
955 return self._input_proxies | |
956 | |
957 def inputs_to_dicts(self): | |
958 inputs_as_dicts = [] | |
959 for input_proxy in self.input_proxies: | |
960 inputs_as_dicts.append(input_proxy.to_dict()) | |
961 return inputs_as_dicts | |
962 | |
963 | |
964 class InputProxy: | |
965 | |
966 def __init__(self, step_proxy, cwl_input): | |
967 self._cwl_input = cwl_input | |
968 self.step_proxy = step_proxy | |
969 self.workflow_proxy = step_proxy._workflow_proxy | |
970 | |
971 cwl_input_id = cwl_input["id"] | |
972 cwl_source_id = cwl_input.get("source", None) | |
973 if cwl_source_id is None: | |
974 if "valueFrom" not in cwl_input and "default" not in cwl_input: | |
975 msg = "Workflow step input must define a source, a valueFrom, or a default value. Obtained [%s]." % cwl_input | |
976 raise MessageException(msg) | |
977 | |
978 assert cwl_input_id | |
979 step_name, input_name = split_step_references( | |
980 cwl_input_id, | |
981 multiple=False, | |
982 workflow_id=step_proxy.cwl_workflow_id | |
983 ) | |
984 self.step_name = step_name | |
985 self.input_name = input_name | |
986 | |
987 self.cwl_input_id = cwl_input_id | |
988 self.cwl_source_id = cwl_source_id | |
989 | |
990 scatter_inputs = [split_step_references( | |
991 i, multiple=False, workflow_id=step_proxy.cwl_workflow_id | |
992 )[1] for i in listify(step_proxy._step.tool.get("scatter", []))] | |
993 scatter = self.input_name in scatter_inputs | |
994 self.scatter = scatter | |
995 | |
996 def to_dict(self): | |
997 as_dict = { | |
998 "name": self.input_name, | |
999 } | |
1000 if "linkMerge" in self._cwl_input: | |
1001 as_dict["merge_type"] = self._cwl_input["linkMerge"] | |
1002 if "scatterMethod" in self.step_proxy._step.tool: | |
1003 as_dict["scatter_type"] = self.step_proxy._step.tool.get("scatterMethod", "dotproduct") | |
1004 else: | |
1005 as_dict["scatter_type"] = "dotproduct" if self.scatter else "disabled" | |
1006 if "valueFrom" in self._cwl_input: | |
1007 # TODO: Add a table for expressions - mark the type as CWL 1.0 JavaScript. | |
1008 as_dict["value_from"] = self._cwl_input["valueFrom"] | |
1009 if "default" in self._cwl_input: | |
1010 as_dict["default"] = self._cwl_input["default"] | |
1011 return as_dict | |
1012 | |
1013 | |
1014 class ToolStepProxy(BaseStepProxy): | |
1015 | |
1016 def __init__(self, workflow_proxy, step, index): | |
1017 super().__init__(workflow_proxy, step, index) | |
1018 self._tool_proxy = None | |
1019 | |
1020 @property | |
1021 def tool_proxy(self): | |
1022 # Neeeds to be cached so UUID that is loaded matches UUID generated with to_dict. | |
1023 if self._tool_proxy is None: | |
1024 self._tool_proxy = _cwl_tool_object_to_proxy(self.cwl_tool_object, uuid=str(uuid4())) | |
1025 return self._tool_proxy | |
1026 | |
1027 def tool_reference_proxies(self): | |
1028 # Return a list so we can handle subworkflows recursively. | |
1029 return [self.tool_proxy] | |
1030 | |
1031 def to_dict(self, input_connections): | |
1032 # We need to stub out null entries for things getting replaced by | |
1033 # connections. This doesn't seem ideal - consider just making Galaxy | |
1034 # handle this. | |
1035 tool_state = {} | |
1036 for input_name in input_connections.keys(): | |
1037 tool_state[input_name] = None | |
1038 | |
1039 outputs = self.galaxy_workflow_outputs_list() | |
1040 return { | |
1041 "id": self._index, | |
1042 "tool_uuid": self.tool_proxy._uuid, # TODO: make sure this is respected... | |
1043 "label": self.label, | |
1044 "position": {"left": 0, "top": 0}, | |
1045 "type": "tool", | |
1046 "annotation": self._workflow_proxy.cwl_object_to_annotation(self._step.tool), | |
1047 "input_connections": input_connections, | |
1048 "inputs": self.inputs_to_dicts(), | |
1049 "workflow_outputs": outputs, | |
1050 } | |
1051 | |
1052 | |
1053 class SubworkflowStepProxy(BaseStepProxy): | |
1054 | |
1055 def __init__(self, workflow_proxy, step, index): | |
1056 super().__init__(workflow_proxy, step, index) | |
1057 self._subworkflow_proxy = None | |
1058 | |
1059 def to_dict(self, input_connections): | |
1060 outputs = self.galaxy_workflow_outputs_list() | |
1061 for key, input_connection_list in input_connections.items(): | |
1062 input_subworkflow_step_id = self.subworkflow_proxy.find_inputs_step_index( | |
1063 key | |
1064 ) | |
1065 for input_connection in input_connection_list: | |
1066 input_connection["input_subworkflow_step_id"] = input_subworkflow_step_id | |
1067 | |
1068 return { | |
1069 "id": self._index, | |
1070 "label": self.label, | |
1071 "position": {"left": 0, "top": 0}, | |
1072 "type": "subworkflow", | |
1073 "subworkflow": self.subworkflow_proxy.to_dict(), | |
1074 "annotation": self.subworkflow_proxy.cwl_object_to_annotation(self._step.tool), | |
1075 "input_connections": input_connections, | |
1076 "inputs": self.inputs_to_dicts(), | |
1077 "workflow_outputs": outputs, | |
1078 } | |
1079 | |
1080 def tool_reference_proxies(self): | |
1081 return self.subworkflow_proxy.tool_reference_proxies() | |
1082 | |
1083 @property | |
1084 def subworkflow_proxy(self): | |
1085 if self._subworkflow_proxy is None: | |
1086 self._subworkflow_proxy = WorkflowProxy(self.cwl_tool_object) | |
1087 return self._subworkflow_proxy | |
1088 | |
1089 | |
1090 def remove_pickle_problems(obj): | |
1091 """doc_loader does not pickle correctly""" | |
1092 if hasattr(obj, "doc_loader"): | |
1093 obj.doc_loader = None | |
1094 if hasattr(obj, "embedded_tool"): | |
1095 obj.embedded_tool = remove_pickle_problems(obj.embedded_tool) | |
1096 if hasattr(obj, "steps"): | |
1097 obj.steps = [remove_pickle_problems(s) for s in obj.steps] | |
1098 return obj | |
1099 | |
1100 | |
1101 class WorkflowToolReference(metaclass=ABCMeta): | |
1102 pass | |
1103 | |
1104 | |
1105 class EmbeddedWorkflowToolReference(WorkflowToolReference): | |
1106 pass | |
1107 | |
1108 | |
1109 class ExternalWorkflowToolReference(WorkflowToolReference): | |
1110 pass | |
1111 | |
1112 | |
1113 def _outer_field_to_input_instance(field): | |
1114 field_type = field_to_field_type(field) # Must be a list if in here? | |
1115 if not isinstance(field_type, list): | |
1116 field_type = [field_type] | |
1117 | |
1118 name, label, description = _field_metadata(field) | |
1119 | |
1120 case_name = "_cwl__type_" | |
1121 case_label = "Specify Parameter %s As" % label | |
1122 | |
1123 def value_input(type_description): | |
1124 value_name = "_cwl__value_" | |
1125 value_label = label | |
1126 value_description = description | |
1127 return InputInstance( | |
1128 value_name, | |
1129 value_label, | |
1130 value_description, | |
1131 input_type=type_description.galaxy_param_type, | |
1132 collection_type=type_description.collection_type, | |
1133 ) | |
1134 | |
1135 select_options = [] | |
1136 case_options = [] | |
1137 type_descriptions = type_descriptions_for_field_types(field_type) | |
1138 for type_description in type_descriptions: | |
1139 select_options.append({"value": type_description.name, "label": type_description.label}) | |
1140 input_instances = [] | |
1141 if type_description.uses_param: | |
1142 input_instances.append(value_input(type_description)) | |
1143 case_options.append((type_description.name, input_instances)) | |
1144 | |
1145 # If there is more than one way to represent this parameter - produce a conditional | |
1146 # requesting user to ask for what form they want to submit the data in, else just map | |
1147 # a simple Galaxy parameter. | |
1148 if len(case_options) > 1 and not USE_FIELD_TYPES: | |
1149 case_input = SelectInputInstance( | |
1150 name=case_name, | |
1151 label=case_label, | |
1152 description=False, | |
1153 options=select_options, | |
1154 ) | |
1155 | |
1156 return ConditionalInstance(name, case_input, case_options) | |
1157 else: | |
1158 if len(case_options) > 1: | |
1159 only_type_description = FIELD_TYPE_REPRESENTATION | |
1160 else: | |
1161 only_type_description = type_descriptions[0] | |
1162 | |
1163 return InputInstance( | |
1164 name, label, description, input_type=only_type_description.galaxy_param_type, collection_type=only_type_description.collection_type | |
1165 ) | |
1166 | |
1167 # Older array to repeat handling, now we are just representing arrays as | |
1168 # dataset collections - we should offer a blended approach in the future. | |
1169 # if field_type in simple_map_type_map.keys(): | |
1170 # input_type = simple_map_type_map[field_type] | |
1171 # return {"input_type": input_type, "array": False} | |
1172 # elif field_type == "array": | |
1173 # if isinstance(field["type"], dict): | |
1174 # array_type = field["type"]["items"] | |
1175 # else: | |
1176 # array_type = field["items"] | |
1177 # if array_type in simple_map_type_map.keys(): | |
1178 # input_type = simple_map_type_map[array_type] | |
1179 # return {"input_type": input_type, "array": True} | |
1180 # else: | |
1181 # raise Exception("Unhandled simple field type encountered - [%s]." % field_type) | |
1182 | |
1183 | |
1184 def _field_metadata(field): | |
1185 name = field["name"] | |
1186 label = field.get("label", None) | |
1187 description = field.get("doc", None) | |
1188 return name, label, description | |
1189 | |
1190 | |
1191 def _simple_field_to_output(field): | |
1192 name = field["name"] | |
1193 output_data_class = field["type"] | |
1194 output_instance = OutputInstance( | |
1195 name, | |
1196 output_data_type=output_data_class, | |
1197 output_type=OUTPUT_TYPE.GLOB | |
1198 ) | |
1199 return output_instance | |
1200 | |
1201 | |
1202 class ConditionalInstance: | |
1203 | |
1204 def __init__(self, name, case, whens): | |
1205 self.input_type = INPUT_TYPE.CONDITIONAL | |
1206 self.name = name | |
1207 self.case = case | |
1208 self.whens = whens | |
1209 | |
1210 def to_dict(self): | |
1211 | |
1212 as_dict = dict( | |
1213 name=self.name, | |
1214 type=INPUT_TYPE.CONDITIONAL, | |
1215 test=self.case.to_dict(), | |
1216 when={}, | |
1217 ) | |
1218 for value, block in self.whens: | |
1219 as_dict["when"][value] = [i.to_dict() for i in block] | |
1220 | |
1221 return as_dict | |
1222 | |
1223 | |
1224 class SelectInputInstance: | |
1225 | |
1226 def __init__(self, name, label, description, options): | |
1227 self.input_type = INPUT_TYPE.SELECT | |
1228 self.name = name | |
1229 self.label = label | |
1230 self.description = description | |
1231 self.options = options | |
1232 | |
1233 def to_dict(self): | |
1234 # TODO: serialize options... | |
1235 as_dict = dict( | |
1236 name=self.name, | |
1237 label=self.label or self.name, | |
1238 help=self.description, | |
1239 type=self.input_type, | |
1240 options=self.options, | |
1241 ) | |
1242 return as_dict | |
1243 | |
1244 | |
1245 class InputInstance: | |
1246 | |
1247 def __init__(self, name, label, description, input_type, array=False, area=False, collection_type=None): | |
1248 self.input_type = input_type | |
1249 self.collection_type = collection_type | |
1250 self.name = name | |
1251 self.label = label | |
1252 self.description = description | |
1253 self.required = True | |
1254 self.array = array | |
1255 self.area = area | |
1256 | |
1257 def to_dict(self, itemwise=True): | |
1258 if itemwise and self.array: | |
1259 as_dict = dict( | |
1260 type="repeat", | |
1261 name="%s_repeat" % self.name, | |
1262 title="%s" % self.name, | |
1263 blocks=[ | |
1264 self.to_dict(itemwise=False) | |
1265 ] | |
1266 ) | |
1267 else: | |
1268 as_dict = dict( | |
1269 name=self.name, | |
1270 label=self.label or self.name, | |
1271 help=self.description, | |
1272 type=self.input_type, | |
1273 optional=not self.required, | |
1274 ) | |
1275 if self.area: | |
1276 as_dict["area"] = True | |
1277 | |
1278 if self.input_type == INPUT_TYPE.INTEGER: | |
1279 as_dict["value"] = "0" | |
1280 if self.input_type == INPUT_TYPE.FLOAT: | |
1281 as_dict["value"] = "0.0" | |
1282 elif self.input_type == INPUT_TYPE.DATA_COLLECTON: | |
1283 as_dict["collection_type"] = self.collection_type | |
1284 | |
1285 return as_dict | |
1286 | |
1287 | |
1288 OUTPUT_TYPE = Bunch( | |
1289 GLOB="glob", | |
1290 STDOUT="stdout", | |
1291 ) | |
1292 | |
1293 | |
1294 # TODO: Different subclasses - this is representing different types of things. | |
1295 class OutputInstance: | |
1296 | |
1297 def __init__(self, name, output_data_type, output_type, path=None, fields=None): | |
1298 self.name = name | |
1299 self.output_data_type = output_data_type | |
1300 self.output_type = output_type | |
1301 self.path = path | |
1302 self.fields = fields | |
1303 | |
1304 | |
1305 __all__ = ( | |
1306 'tool_proxy', | |
1307 'load_job_proxy', | |
1308 ) |