comparison env/lib/python3.9/site-packages/galaxy/tool_util/cwl/parser.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """ This module provides proxy objects around objects from the common
2 workflow language reference implementation library cwltool. These proxies
3 adapt cwltool to Galaxy features and abstract the library away from the rest
4 of the framework.
5 """
6
7 import base64
8 import copy
9 import json
10 import logging
11 import os
12 import pickle
13 from abc import ABCMeta, abstractmethod
14 from uuid import uuid4
15
16
17 from galaxy.exceptions import MessageException
18 from galaxy.util import (
19 listify,
20 safe_makedirs,
21 unicodify,
22 )
23 from galaxy.util.bunch import Bunch
24 from .cwltool_deps import (
25 beta_relaxed_fmt_check,
26 ensure_cwltool_available,
27 getdefault,
28 pathmapper,
29 process,
30 ref_resolver,
31 relink_initialworkdir,
32 RuntimeContext,
33 sourceline,
34 StdFsAccess,
35 )
36 from .representation import (
37 field_to_field_type,
38 FIELD_TYPE_REPRESENTATION,
39 INPUT_TYPE,
40 type_descriptions_for_field_types,
41 USE_FIELD_TYPES,
42 USE_STEP_PARAMETERS,
43 )
44 from .schema import (
45 non_strict_non_validating_schema_loader,
46 schema_loader,
47 )
48 from .util import SECONDARY_FILES_EXTRA_PREFIX
49
50 log = logging.getLogger(__name__)
51
52 JOB_JSON_FILE = ".cwl_job.json"
53
54 DOCKER_REQUIREMENT = "DockerRequirement"
55 SUPPORTED_TOOL_REQUIREMENTS = [
56 "CreateFileRequirement",
57 "DockerRequirement",
58 "EnvVarRequirement",
59 "InitialWorkDirRequirement",
60 "InlineJavascriptRequirement",
61 "ResourceRequirement",
62 "ShellCommandRequirement",
63 "ScatterFeatureRequirement",
64 "SchemaDefRequirement",
65 "SubworkflowFeatureRequirement",
66 "StepInputExpressionRequirement",
67 "MultipleInputFeatureRequirement",
68 ]
69
70
71 SUPPORTED_WORKFLOW_REQUIREMENTS = SUPPORTED_TOOL_REQUIREMENTS + [
72 ]
73
74 PERSISTED_REPRESENTATION = "cwl_tool_object"
75
76
77 def tool_proxy(tool_path=None, tool_object=None, strict_cwl_validation=True, tool_directory=None, uuid=None):
78 """ Provide a proxy object to cwltool data structures to just
79 grab relevant data.
80 """
81 ensure_cwltool_available()
82 tool = _to_cwl_tool_object(
83 tool_path=tool_path,
84 tool_object=tool_object,
85 strict_cwl_validation=strict_cwl_validation,
86 tool_directory=tool_directory,
87 uuid=uuid
88 )
89 return tool
90
91
92 def tool_proxy_from_persistent_representation(persisted_tool, strict_cwl_validation=True, tool_directory=None):
93 """Load a ToolProxy from a previously persisted representation."""
94 ensure_cwltool_available()
95 if PERSISTED_REPRESENTATION == "cwl_tool_object":
96 kwds = {"cwl_tool_object": ToolProxy.from_persistent_representation(persisted_tool)}
97 else:
98 raw_process_reference = persisted_tool # ???
99 kwds = {"raw_process_reference": ToolProxy.from_persistent_representation(raw_process_reference)}
100 uuid = persisted_tool["uuid"]
101 tool = _to_cwl_tool_object(uuid=uuid, strict_cwl_validation=strict_cwl_validation, tool_directory=tool_directory, **kwds)
102 return tool
103
104
105 def workflow_proxy(workflow_path, strict_cwl_validation=True):
106 ensure_cwltool_available()
107 workflow = _to_cwl_workflow_object(workflow_path, strict_cwl_validation=strict_cwl_validation)
108 return workflow
109
110
111 def load_job_proxy(job_directory, strict_cwl_validation=True):
112 ensure_cwltool_available()
113 job_objects_path = os.path.join(job_directory, JOB_JSON_FILE)
114 job_objects = json.load(open(job_objects_path))
115 job_inputs = job_objects["job_inputs"]
116 output_dict = job_objects["output_dict"]
117 # Any reason to retain older tool_path variant of this? Probably not?
118 if "tool_path" in job_objects:
119 tool_path = job_objects["tool_path"]
120 cwl_tool = tool_proxy(tool_path, strict_cwl_validation=strict_cwl_validation)
121 else:
122 persisted_tool = job_objects["tool_representation"]
123 cwl_tool = tool_proxy_from_persistent_representation(persisted_tool=persisted_tool, strict_cwl_validation=strict_cwl_validation)
124 cwl_job = cwl_tool.job_proxy(job_inputs, output_dict, job_directory=job_directory)
125 return cwl_job
126
127
128 def _to_cwl_tool_object(tool_path=None, tool_object=None, cwl_tool_object=None, raw_process_reference=None, strict_cwl_validation=False, tool_directory=None, uuid=None):
129 if uuid is None:
130 uuid = str(uuid4())
131 schema_loader = _schema_loader(strict_cwl_validation)
132 if raw_process_reference is None and tool_path is not None:
133 assert cwl_tool_object is None
134 assert tool_object is None
135
136 raw_process_reference = schema_loader.raw_process_reference(tool_path)
137 cwl_tool = schema_loader.tool(
138 raw_process_reference=raw_process_reference,
139 )
140 elif tool_object is not None:
141 assert raw_process_reference is None
142 assert cwl_tool_object is None
143
144 # Allow loading tools from YAML...
145 from ruamel import yaml as ryaml
146 as_str = json.dumps(tool_object)
147 tool_object = ryaml.round_trip_load(as_str)
148 path = tool_directory
149 if path is None:
150 path = os.getcwd()
151 uri = ref_resolver.file_uri(path) + "/"
152 sourceline.add_lc_filename(tool_object, uri)
153 raw_process_reference = schema_loader.raw_process_reference_for_object(
154 tool_object,
155 uri=uri
156 )
157 cwl_tool = schema_loader.tool(
158 raw_process_reference=raw_process_reference,
159 )
160 else:
161 cwl_tool = cwl_tool_object
162
163 if isinstance(cwl_tool, int):
164 raise Exception("Failed to load tool.")
165
166 raw_tool = cwl_tool.tool
167 # Apply Galaxy hacks to CWL tool representation to bridge semantic differences
168 # between Galaxy and cwltool.
169 _hack_cwl_requirements(cwl_tool)
170 check_requirements(raw_tool)
171 return _cwl_tool_object_to_proxy(cwl_tool, uuid, raw_process_reference=raw_process_reference, tool_path=tool_path)
172
173
174 def _cwl_tool_object_to_proxy(cwl_tool, uuid, raw_process_reference=None, tool_path=None):
175 raw_tool = cwl_tool.tool
176 if "class" not in raw_tool:
177 raise Exception("File does not declare a class, not a valid Draft 3+ CWL tool.")
178
179 process_class = raw_tool["class"]
180 if process_class == "CommandLineTool":
181 proxy_class = CommandLineToolProxy
182 elif process_class == "ExpressionTool":
183 proxy_class = ExpressionToolProxy
184 else:
185 raise Exception("File not a CWL CommandLineTool.")
186 top_level_object = tool_path is not None
187 if top_level_object and ("cwlVersion" not in raw_tool):
188 raise Exception("File does not declare a CWL version, pre-draft 3 CWL tools are not supported.")
189
190 proxy = proxy_class(cwl_tool, uuid, raw_process_reference, tool_path)
191 return proxy
192
193
194 def _to_cwl_workflow_object(workflow_path, strict_cwl_validation=None):
195 proxy_class = WorkflowProxy
196 cwl_workflow = _schema_loader(strict_cwl_validation).tool(path=workflow_path)
197 raw_workflow = cwl_workflow.tool
198 check_requirements(raw_workflow, tool=False)
199
200 proxy = proxy_class(cwl_workflow, workflow_path)
201 return proxy
202
203
204 def _schema_loader(strict_cwl_validation):
205 target_schema_loader = schema_loader if strict_cwl_validation else non_strict_non_validating_schema_loader
206 return target_schema_loader
207
208
209 def _hack_cwl_requirements(cwl_tool):
210 move_to_hints = []
211 for i, requirement in enumerate(cwl_tool.requirements):
212 if requirement["class"] == DOCKER_REQUIREMENT:
213 move_to_hints.insert(0, i)
214
215 for i in move_to_hints:
216 del cwl_tool.requirements[i]
217 cwl_tool.hints.append(requirement)
218
219
220 def check_requirements(rec, tool=True):
221 if isinstance(rec, dict):
222 if "requirements" in rec:
223 for r in rec["requirements"]:
224 if tool:
225 possible = SUPPORTED_TOOL_REQUIREMENTS
226 else:
227 possible = SUPPORTED_WORKFLOW_REQUIREMENTS
228 if r["class"] not in possible:
229 raise Exception("Unsupported requirement %s" % r["class"])
230 for d in rec:
231 check_requirements(rec[d], tool=tool)
232 if isinstance(rec, list):
233 for d in rec:
234 check_requirements(d, tool=tool)
235
236
237 class ToolProxy(metaclass=ABCMeta):
238
239 def __init__(self, tool, uuid, raw_process_reference=None, tool_path=None):
240 self._tool = tool
241 self._uuid = uuid
242 self._tool_path = tool_path
243 self._raw_process_reference = raw_process_reference
244 # remove input parameter formats from CWL files so that cwltool
245 # does not complain they are missing in the input data
246 for input_field in self._tool.inputs_record_schema["fields"]:
247 if 'format' in input_field:
248 del input_field['format']
249
250 def job_proxy(self, input_dict, output_dict, job_directory="."):
251 """ Build a cwltool.job.Job describing computation using a input_json
252 Galaxy will generate mapping the Galaxy description of the inputs into
253 a cwltool compatible variant.
254 """
255 return JobProxy(self, input_dict, output_dict, job_directory=job_directory)
256
257 @property
258 def id(self):
259 raw_id = self._tool.tool.get("id", None)
260 return raw_id
261
262 def galaxy_id(self):
263 raw_id = self.id
264 tool_id = None
265 # don't reduce "search.cwl#index" to search
266 if raw_id:
267 tool_id = os.path.basename(raw_id)
268 # tool_id = os.path.splitext(os.path.basename(raw_id))[0]
269 if not tool_id:
270 return self._uuid
271 assert tool_id
272 if tool_id.startswith("#"):
273 tool_id = tool_id[1:]
274 return tool_id
275
276 @abstractmethod
277 def input_instances(self):
278 """ Return InputInstance objects describing mapping to Galaxy inputs. """
279
280 @abstractmethod
281 def output_instances(self):
282 """ Return OutputInstance objects describing mapping to Galaxy inputs. """
283
284 @abstractmethod
285 def docker_identifier(self):
286 """ Return docker identifier for embedding in tool description. """
287
288 @abstractmethod
289 def description(self):
290 """ Return description to tool. """
291
292 @abstractmethod
293 def label(self):
294 """ Return label for tool. """
295
296 def to_persistent_representation(self):
297 """Return a JSON representation of this tool. Not for serialization
298 over the wire, but serialization in a database."""
299 # TODO: Replace this with some more readable serialization,
300 # I really don't like using pickle here.
301 if PERSISTED_REPRESENTATION == "cwl_tool_object":
302 persisted_obj = remove_pickle_problems(self._tool)
303 else:
304 persisted_obj = self._raw_process_reference
305 return {
306 "class": self._class,
307 "pickle": unicodify(base64.b64encode(pickle.dumps(persisted_obj, pickle.HIGHEST_PROTOCOL))),
308 "uuid": self._uuid,
309 }
310
311 @staticmethod
312 def from_persistent_representation(as_object):
313 """Recover an object serialized with to_persistent_representation."""
314 if "class" not in as_object:
315 raise Exception("Failed to deserialize tool proxy from JSON object - no class found.")
316 if "pickle" not in as_object:
317 raise Exception("Failed to deserialize tool proxy from JSON object - no pickle representation found.")
318 if "uuid" not in as_object:
319 raise Exception("Failed to deserialize tool proxy from JSON object - no uuid found.")
320 to_unpickle = base64.b64decode(as_object["pickle"])
321 loaded_object = pickle.loads(to_unpickle)
322 return loaded_object
323
324
325 class CommandLineToolProxy(ToolProxy):
326 _class = "CommandLineTool"
327
328 def description(self):
329 # Don't use description - typically too verbose.
330 return ''
331
332 def doc(self):
333 # TODO: parse multiple lines and merge - valid in cwl-1.1
334 doc = self._tool.tool.get('doc')
335 return doc
336
337 def label(self):
338 label = self._tool.tool.get('label')
339
340 if label is not None:
341 return label.partition(":")[0] # return substring before ':'
342 else:
343 return ''
344
345 def input_fields(self):
346 input_records_schema = self._eval_schema(self._tool.inputs_record_schema)
347 if input_records_schema["type"] != "record":
348 raise Exception("Unhandled CWL tool input structure")
349
350 # TODO: handle this somewhere else?
351 # schemadef_req_tool_param
352 rval = []
353 for input in input_records_schema["fields"]:
354 input_copy = copy.deepcopy(input)
355 input_type = input.get("type")
356 if isinstance(input_type, list) or isinstance(input_type, dict):
357 rval.append(input_copy)
358 continue
359
360 if input_type in self._tool.schemaDefs:
361 input_copy["type"] = self._tool.schemaDefs[input_type]
362
363 rval.append(input_copy)
364 return rval
365
366 def _eval_schema(self, io_schema):
367 schema_type = io_schema.get("type")
368 if schema_type in self._tool.schemaDefs:
369 io_schema = self._tool.schemaDefs[schema_type]
370 return io_schema
371
372 def input_instances(self):
373 return [_outer_field_to_input_instance(_) for _ in self.input_fields()]
374
375 def output_instances(self):
376 outputs_schema = self._eval_schema(self._tool.outputs_record_schema)
377 if outputs_schema["type"] != "record":
378 raise Exception("Unhandled CWL tool output structure")
379
380 rval = []
381 for output in outputs_schema["fields"]:
382 rval.append(_simple_field_to_output(output))
383
384 return rval
385
386 def docker_identifier(self):
387 for hint in self.hints_or_requirements_of_class("DockerRequirement"):
388 if "dockerImageId" in hint:
389 return hint["dockerImageId"]
390 else:
391 return hint["dockerPull"]
392
393 return None
394
395 def hints_or_requirements_of_class(self, class_name):
396 tool = self._tool.tool
397 reqs_and_hints = tool.get("requirements", []) + tool.get("hints", [])
398 for hint in reqs_and_hints:
399 if hint["class"] == class_name:
400 yield hint
401
402 def software_requirements(self):
403 # Roughest imaginable pass at parsing requirements, really need to take in specs, handle
404 # multiple versions, etc...
405 tool = self._tool.tool
406 reqs_and_hints = tool.get("requirements", []) + tool.get("hints", [])
407 requirements = []
408 for hint in reqs_and_hints:
409 if hint["class"] == "SoftwareRequirement":
410 packages = hint.get("packages", [])
411 for package in packages:
412 versions = package.get("version", [])
413 first_version = None if not versions else versions[0]
414 requirements.append((package["package"], first_version))
415 return requirements
416
417 @property
418 def requirements(self):
419 return getattr(self._tool, "requirements", [])
420
421
422 class ExpressionToolProxy(CommandLineToolProxy):
423 _class = "ExpressionTool"
424
425
426 class JobProxy:
427
428 def __init__(self, tool_proxy, input_dict, output_dict, job_directory):
429 self._tool_proxy = tool_proxy
430 self._input_dict = input_dict
431 self._output_dict = output_dict
432 self._job_directory = job_directory
433
434 self._final_output = None
435 self._ok = True
436 self._cwl_job = None
437 self._is_command_line_job = None
438
439 self._normalize_job()
440
441 def cwl_job(self):
442 self._ensure_cwl_job_initialized()
443 return self._cwl_job
444
445 @property
446 def is_command_line_job(self):
447 self._ensure_cwl_job_initialized()
448 assert self._is_command_line_job is not None
449 return self._is_command_line_job
450
451 def _ensure_cwl_job_initialized(self):
452 if self._cwl_job is None:
453 job_args = dict(
454 basedir=self._job_directory,
455 select_resources=self._select_resources,
456 outdir=os.path.join(self._job_directory, "working"),
457 tmpdir=os.path.join(self._job_directory, "cwltmp"),
458 stagedir=os.path.join(self._job_directory, "cwlstagedir"),
459 use_container=False,
460 beta_relaxed_fmt_check=beta_relaxed_fmt_check,
461 )
462
463 args = []
464 kwargs = {}
465 if RuntimeContext is not None:
466 args.append(RuntimeContext(job_args))
467 else:
468 kwargs = job_args
469 self._cwl_job = next(self._tool_proxy._tool.job(
470 self._input_dict,
471 self._output_callback,
472 *args, **kwargs
473 ))
474 self._is_command_line_job = hasattr(self._cwl_job, "command_line")
475
476 def _normalize_job(self):
477 # Somehow reuse whatever causes validate in cwltool... maybe?
478 def pathToLoc(p):
479 if "location" not in p and "path" in p:
480 p["location"] = p["path"]
481 del p["path"]
482
483 runtime_context = RuntimeContext({})
484 make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess)
485 fs_access = make_fs_access(runtime_context.basedir)
486 process.fill_in_defaults(self._tool_proxy._tool.tool["inputs"], self._input_dict, fs_access)
487 process.visit_class(self._input_dict, ("File", "Directory"), pathToLoc)
488 # TODO: Why doesn't fillInDefault fill in locations instead of paths?
489 process.normalizeFilesDirs(self._input_dict)
490 # TODO: validate like cwltool process _init_job.
491 # validate.validate_ex(self.names.get_name("input_record_schema", ""), builder.job,
492 # strict=False, logger=_logger_validation_warnings)
493
494 def rewrite_inputs_for_staging(self):
495 if hasattr(self._cwl_job, "pathmapper"):
496 pass
497 # DO SOMETHING LIKE THE FOLLOWING?
498 # path_rewrites = {}
499 # for f, p in self._cwl_job.pathmapper.items():
500 # if not p.staged:
501 # continue
502 # if p.type in ("File", "Directory"):
503 # path_rewrites[p.resolved] = p.target
504 # for key, value in self._input_dict.items():
505 # if key in path_rewrites:
506 # self._input_dict[key]["location"] = path_rewrites[value]
507 else:
508 stagedir = os.path.join(self._job_directory, "cwlstagedir")
509 safe_makedirs(stagedir)
510
511 def stage_recursive(value):
512 is_list = isinstance(value, list)
513 is_dict = isinstance(value, dict)
514 log.info(f"handling value {value}, is_list {is_list}, is_dict {is_dict}")
515 if is_list:
516 for val in value:
517 stage_recursive(val)
518 elif is_dict:
519 if "location" in value and "basename" in value:
520 location = value["location"]
521 basename = value["basename"]
522 if not location.endswith(basename): # TODO: sep()[-1]
523 staged_loc = os.path.join(stagedir, basename)
524 if not os.path.exists(staged_loc):
525 os.symlink(location, staged_loc)
526 value["location"] = staged_loc
527 for dict_value in value.values():
528 stage_recursive(dict_value)
529 else:
530 log.info("skipping simple value...")
531 stage_recursive(self._input_dict)
532
533 def _select_resources(self, request, runtime_context=None):
534 new_request = request.copy()
535 new_request["cores"] = "$GALAXY_SLOTS"
536 return new_request
537
538 @property
539 def command_line(self):
540 if self.is_command_line_job:
541 return self.cwl_job().command_line
542 else:
543 return ["true"]
544
545 @property
546 def stdin(self):
547 if self.is_command_line_job:
548 return self.cwl_job().stdin
549 else:
550 return None
551
552 @property
553 def stdout(self):
554 if self.is_command_line_job:
555 return self.cwl_job().stdout
556 else:
557 return None
558
559 @property
560 def stderr(self):
561 if self.is_command_line_job:
562 return self.cwl_job().stderr
563 else:
564 return None
565
566 @property
567 def environment(self):
568 if self.is_command_line_job:
569 return self.cwl_job().environment
570 else:
571 return {}
572
573 @property
574 def generate_files(self):
575 if self.is_command_line_job:
576 return self.cwl_job().generatefiles
577 else:
578 return {}
579
580 def _output_callback(self, out, process_status):
581 self._process_status = process_status
582 if process_status == "success":
583 self._final_output = out
584 else:
585 self._ok = False
586
587 log.info(f"Output are {out}, status is {process_status}")
588
589 def collect_outputs(self, tool_working_directory, rcode):
590 if not self.is_command_line_job:
591 cwl_job = self.cwl_job()
592 if RuntimeContext is not None:
593 cwl_job.run(
594 RuntimeContext({})
595 )
596 else:
597 cwl_job.run()
598 if not self._ok:
599 raise Exception("Final process state not ok, [%s]" % self._process_status)
600 return self._final_output
601 else:
602 return self.cwl_job().collect_outputs(tool_working_directory, rcode)
603
604 def save_job(self):
605 job_file = JobProxy._job_file(self._job_directory)
606 job_objects = {
607 # "tool_path": os.path.abspath(self._tool_proxy._tool_path),
608 "tool_representation": self._tool_proxy.to_persistent_representation(),
609 "job_inputs": self._input_dict,
610 "output_dict": self._output_dict,
611 }
612 json.dump(job_objects, open(job_file, "w"))
613
614 def _output_extra_files_dir(self, output_name):
615 output_id = self.output_id(output_name)
616 return os.path.join(self._job_directory, "outputs", "dataset_%s_files" % output_id)
617
618 def output_id(self, output_name):
619 output_id = self._output_dict[output_name]["id"]
620 return output_id
621
622 def output_path(self, output_name):
623 output_id = self._output_dict[output_name]["path"]
624 return output_id
625
626 def output_directory_contents_dir(self, output_name, create=False):
627 extra_files_dir = self._output_extra_files_dir(output_name)
628 return extra_files_dir
629
630 def output_secondary_files_dir(self, output_name, create=False):
631 extra_files_dir = self._output_extra_files_dir(output_name)
632 secondary_files_dir = os.path.join(extra_files_dir, SECONDARY_FILES_EXTRA_PREFIX)
633 if create and not os.path.exists(secondary_files_dir):
634 safe_makedirs(secondary_files_dir)
635 return secondary_files_dir
636
637 def stage_files(self):
638 cwl_job = self.cwl_job()
639
640 def stageFunc(resolved_path, target_path):
641 log.info(f"resolving {resolved_path} to {target_path}")
642 try:
643 os.symlink(resolved_path, target_path)
644 except OSError:
645 pass
646
647 if hasattr(cwl_job, "pathmapper"):
648 process.stage_files(cwl_job.pathmapper, stageFunc, ignore_writable=True, symlink=False)
649
650 if hasattr(cwl_job, "generatefiles"):
651 outdir = os.path.join(self._job_directory, "working")
652 # TODO: Why doesn't cwl_job.generatemapper work?
653 generate_mapper = pathmapper.PathMapper(cwl_job.generatefiles["listing"],
654 outdir, outdir, separateDirs=False)
655 # TODO: figure out what inplace_update should be.
656 inplace_update = cwl_job.inplace_update
657 process.stage_files(generate_mapper, stageFunc, ignore_writable=inplace_update, symlink=False)
658 relink_initialworkdir(generate_mapper, outdir, outdir, inplace_update=inplace_update)
659 # else: expression tools do not have a path mapper.
660
661 @staticmethod
662 def _job_file(job_directory):
663 return os.path.join(job_directory, JOB_JSON_FILE)
664
665
666 class WorkflowProxy:
667
668 def __init__(self, workflow, workflow_path=None):
669 self._workflow = workflow
670 self._workflow_path = workflow_path
671 self._step_proxies = None
672
673 @property
674 def cwl_id(self):
675 return self._workflow.tool["id"]
676
677 def get_outputs_for_label(self, label):
678 outputs = []
679 for output in self._workflow.tool['outputs']:
680 step, output_name = split_step_references(
681 output["outputSource"],
682 multiple=False,
683 workflow_id=self.cwl_id,
684 )
685 if step == label:
686 output_id = output["id"]
687 if "#" not in self.cwl_id:
688 _, output_label = output_id.rsplit("#", 1)
689 else:
690 _, output_label = output_id.rsplit("/", 1)
691
692 outputs.append({
693 "output_name": output_name,
694 "label": output_label,
695 })
696 return outputs
697
698 def tool_reference_proxies(self):
699 """Fetch tool source definitions for all referenced tools."""
700 references = []
701 for step in self.step_proxies():
702 references.extend(step.tool_reference_proxies())
703 return references
704
705 def step_proxies(self):
706 if self._step_proxies is None:
707 proxies = []
708 num_input_steps = len(self._workflow.tool['inputs'])
709 for i, step in enumerate(self._workflow.steps):
710 proxies.append(build_step_proxy(self, step, i + num_input_steps))
711 self._step_proxies = proxies
712 return self._step_proxies
713
714 @property
715 def runnables(self):
716 runnables = []
717 for step in self._workflow.steps:
718 if "run" in step.tool:
719 runnables.append(step.tool["run"])
720 return runnables
721
722 def cwl_ids_to_index(self, step_proxies):
723 index = 0
724 cwl_ids_to_index = {}
725 for input_dict in self._workflow.tool['inputs']:
726 cwl_ids_to_index[input_dict["id"]] = index
727 index += 1
728
729 for step_proxy in step_proxies:
730 cwl_ids_to_index[step_proxy.cwl_id] = index
731 index += 1
732
733 return cwl_ids_to_index
734
735 @property
736 def output_labels(self):
737 return [self.jsonld_id_to_label(o['id']) for o in self._workflow.tool['outputs']]
738
739 def input_connections_by_step(self, step_proxies):
740 cwl_ids_to_index = self.cwl_ids_to_index(step_proxies)
741 input_connections_by_step = []
742 for step_proxy in step_proxies:
743 input_connections_step = {}
744 for input_proxy in step_proxy.input_proxies:
745 cwl_source_id = input_proxy.cwl_source_id
746 input_name = input_proxy.input_name
747 # Consider only allow multiple if MultipleInputFeatureRequirement is enabled
748 for (output_step_name, output_name) in split_step_references(cwl_source_id, workflow_id=self.cwl_id):
749 if "#" in self.cwl_id:
750 sep_on = "/"
751 else:
752 sep_on = "#"
753 output_step_id = self.cwl_id + sep_on + output_step_name
754
755 if output_step_id not in cwl_ids_to_index:
756 template = "Output [%s] does not appear in ID-to-index map [%s]."
757 msg = template % (output_step_id, cwl_ids_to_index.keys())
758 raise AssertionError(msg)
759
760 if input_name not in input_connections_step:
761 input_connections_step[input_name] = []
762
763 input_connections_step[input_name].append({
764 "id": cwl_ids_to_index[output_step_id],
765 "output_name": output_name,
766 "input_type": "dataset"
767 })
768
769 input_connections_by_step.append(input_connections_step)
770
771 return input_connections_by_step
772
773 def to_dict(self):
774 name = os.path.basename(self._workflow.tool.get('label') or self._workflow_path or 'TODO - derive a name from ID')
775 steps = {}
776
777 step_proxies = self.step_proxies()
778 input_connections_by_step = self.input_connections_by_step(step_proxies)
779 index = 0
780 for i, input_dict in enumerate(self._workflow.tool['inputs']):
781 steps[index] = self.cwl_input_to_galaxy_step(input_dict, i)
782 index += 1
783
784 for i, step_proxy in enumerate(step_proxies):
785 input_connections = input_connections_by_step[i]
786 steps[index] = step_proxy.to_dict(input_connections)
787 index += 1
788
789 return {
790 'name': name,
791 'steps': steps,
792 'annotation': self.cwl_object_to_annotation(self._workflow.tool),
793 }
794
795 def find_inputs_step_index(self, label):
796 for i, input in enumerate(self._workflow.tool['inputs']):
797 if self.jsonld_id_to_label(input["id"]) == label:
798 return i
799
800 raise Exception("Failed to find index for label %s" % label)
801
802 def jsonld_id_to_label(self, id):
803 if "#" in self.cwl_id:
804 return id.rsplit("/", 1)[-1]
805 else:
806 return id.rsplit("#", 1)[-1]
807
808 def cwl_input_to_galaxy_step(self, input, i):
809 input_type = input["type"]
810 label = self.jsonld_id_to_label(input["id"])
811 input_as_dict = {
812 "id": i,
813 "label": label,
814 "position": {"left": 0, "top": 0},
815 "annotation": self.cwl_object_to_annotation(input),
816 "input_connections": {}, # Should the Galaxy API really require this? - Seems to.
817 "workflow_outputs": self.get_outputs_for_label(label),
818 }
819
820 if input_type == "File" and "default" not in input:
821 input_as_dict["type"] = "data_input"
822 elif isinstance(input_type, dict) and input_type.get("type") == "array":
823 input_as_dict["type"] = "data_collection_input"
824 input_as_dict["collection_type"] = "list"
825 elif isinstance(input_type, dict) and input_type.get("type") == "record":
826 input_as_dict["type"] = "data_collection_input"
827 input_as_dict["collection_type"] = "record"
828 else:
829 if USE_STEP_PARAMETERS:
830 input_as_dict["type"] = "parameter_input"
831 # TODO: dispatch on actual type so this doesn't always need
832 # to be field - simpler types could be simpler inputs.
833 tool_state = {}
834 tool_state["parameter_type"] = "field"
835 default_set = "default" in input
836 default_value = input.get("default")
837 optional = default_set
838 if isinstance(input_type, list) and "null" in input_type:
839 optional = True
840 if not optional and isinstance(input_type, dict) and "type" in input_type:
841 raise ValueError("'type' detected in non-optional input dictionary.")
842 if default_set:
843 tool_state["default"] = {"src": "json", "value": default_value}
844 tool_state["optional"] = optional
845 input_as_dict["tool_state"] = tool_state
846 else:
847 input_as_dict["type"] = "data_input"
848 # TODO: format = expression.json
849
850 return input_as_dict
851
852 def cwl_object_to_annotation(self, cwl_obj):
853 return cwl_obj.get("doc", None)
854
855
856 def split_step_references(step_references, workflow_id=None, multiple=True):
857 """Split a CWL step input or output reference into step id and name."""
858 # Trim off the workflow id part of the reference.
859 step_references = listify(step_references)
860 split_references = []
861
862 for step_reference in step_references:
863 if workflow_id is None:
864 # This path works fine for some simple workflows - but not so much
865 # for subworkflows (maybe same for $graph workflows?)
866 assert "#" in step_reference
867 _, step_reference = step_reference.split("#", 1)
868 else:
869 if "#" in workflow_id:
870 sep_on = "/"
871 else:
872 sep_on = "#"
873 expected_prefix = workflow_id + sep_on
874 if not step_reference.startswith(expected_prefix):
875 raise AssertionError(f"step_reference [{step_reference}] doesn't start with {expected_prefix}")
876 step_reference = step_reference[len(expected_prefix):]
877
878 # Now just grab the step name and input/output name.
879 assert "#" not in step_reference
880 if "/" in step_reference:
881 step_name, io_name = step_reference.split("/", 1)
882 else:
883 # Referencing an input, not a step.
884 # In Galaxy workflows input steps have an implicit output named
885 # "output" for consistency with tools - in cwl land
886 # just the input name is referenced.
887 step_name = step_reference
888 io_name = "output"
889 split_references.append((step_name, io_name))
890
891 if multiple:
892 return split_references
893 else:
894 assert len(split_references) == 1
895 return split_references[0]
896
897
898 def build_step_proxy(workflow_proxy, step, index):
899 step_type = step.embedded_tool.tool["class"]
900 if step_type == "Workflow":
901 return SubworkflowStepProxy(workflow_proxy, step, index)
902 else:
903 return ToolStepProxy(workflow_proxy, step, index)
904
905
906 class BaseStepProxy:
907
908 def __init__(self, workflow_proxy, step, index):
909 self._workflow_proxy = workflow_proxy
910 self._step = step
911 self._index = index
912 self._uuid = str(uuid4())
913 self._input_proxies = None
914
915 @property
916 def step_class(self):
917 return self.cwl_tool_object.tool["class"]
918
919 @property
920 def cwl_id(self):
921 return self._step.id
922
923 @property
924 def cwl_workflow_id(self):
925 return self._workflow_proxy.cwl_id
926
927 @property
928 def requirements(self):
929 return self._step.requirements
930
931 @property
932 def hints(self):
933 return self._step.hints
934
935 @property
936 def label(self):
937 label = self._workflow_proxy.jsonld_id_to_label(self._step.id)
938 return label
939
940 def galaxy_workflow_outputs_list(self):
941 return self._workflow_proxy.get_outputs_for_label(self.label)
942
943 @property
944 def cwl_tool_object(self):
945 return self._step.embedded_tool
946
947 @property
948 def input_proxies(self):
949 if self._input_proxies is None:
950 input_proxies = []
951 cwl_inputs = self._step.tool["inputs"]
952 for cwl_input in cwl_inputs:
953 input_proxies.append(InputProxy(self, cwl_input))
954 self._input_proxies = input_proxies
955 return self._input_proxies
956
957 def inputs_to_dicts(self):
958 inputs_as_dicts = []
959 for input_proxy in self.input_proxies:
960 inputs_as_dicts.append(input_proxy.to_dict())
961 return inputs_as_dicts
962
963
964 class InputProxy:
965
966 def __init__(self, step_proxy, cwl_input):
967 self._cwl_input = cwl_input
968 self.step_proxy = step_proxy
969 self.workflow_proxy = step_proxy._workflow_proxy
970
971 cwl_input_id = cwl_input["id"]
972 cwl_source_id = cwl_input.get("source", None)
973 if cwl_source_id is None:
974 if "valueFrom" not in cwl_input and "default" not in cwl_input:
975 msg = "Workflow step input must define a source, a valueFrom, or a default value. Obtained [%s]." % cwl_input
976 raise MessageException(msg)
977
978 assert cwl_input_id
979 step_name, input_name = split_step_references(
980 cwl_input_id,
981 multiple=False,
982 workflow_id=step_proxy.cwl_workflow_id
983 )
984 self.step_name = step_name
985 self.input_name = input_name
986
987 self.cwl_input_id = cwl_input_id
988 self.cwl_source_id = cwl_source_id
989
990 scatter_inputs = [split_step_references(
991 i, multiple=False, workflow_id=step_proxy.cwl_workflow_id
992 )[1] for i in listify(step_proxy._step.tool.get("scatter", []))]
993 scatter = self.input_name in scatter_inputs
994 self.scatter = scatter
995
996 def to_dict(self):
997 as_dict = {
998 "name": self.input_name,
999 }
1000 if "linkMerge" in self._cwl_input:
1001 as_dict["merge_type"] = self._cwl_input["linkMerge"]
1002 if "scatterMethod" in self.step_proxy._step.tool:
1003 as_dict["scatter_type"] = self.step_proxy._step.tool.get("scatterMethod", "dotproduct")
1004 else:
1005 as_dict["scatter_type"] = "dotproduct" if self.scatter else "disabled"
1006 if "valueFrom" in self._cwl_input:
1007 # TODO: Add a table for expressions - mark the type as CWL 1.0 JavaScript.
1008 as_dict["value_from"] = self._cwl_input["valueFrom"]
1009 if "default" in self._cwl_input:
1010 as_dict["default"] = self._cwl_input["default"]
1011 return as_dict
1012
1013
1014 class ToolStepProxy(BaseStepProxy):
1015
1016 def __init__(self, workflow_proxy, step, index):
1017 super().__init__(workflow_proxy, step, index)
1018 self._tool_proxy = None
1019
1020 @property
1021 def tool_proxy(self):
1022 # Neeeds to be cached so UUID that is loaded matches UUID generated with to_dict.
1023 if self._tool_proxy is None:
1024 self._tool_proxy = _cwl_tool_object_to_proxy(self.cwl_tool_object, uuid=str(uuid4()))
1025 return self._tool_proxy
1026
1027 def tool_reference_proxies(self):
1028 # Return a list so we can handle subworkflows recursively.
1029 return [self.tool_proxy]
1030
1031 def to_dict(self, input_connections):
1032 # We need to stub out null entries for things getting replaced by
1033 # connections. This doesn't seem ideal - consider just making Galaxy
1034 # handle this.
1035 tool_state = {}
1036 for input_name in input_connections.keys():
1037 tool_state[input_name] = None
1038
1039 outputs = self.galaxy_workflow_outputs_list()
1040 return {
1041 "id": self._index,
1042 "tool_uuid": self.tool_proxy._uuid, # TODO: make sure this is respected...
1043 "label": self.label,
1044 "position": {"left": 0, "top": 0},
1045 "type": "tool",
1046 "annotation": self._workflow_proxy.cwl_object_to_annotation(self._step.tool),
1047 "input_connections": input_connections,
1048 "inputs": self.inputs_to_dicts(),
1049 "workflow_outputs": outputs,
1050 }
1051
1052
1053 class SubworkflowStepProxy(BaseStepProxy):
1054
1055 def __init__(self, workflow_proxy, step, index):
1056 super().__init__(workflow_proxy, step, index)
1057 self._subworkflow_proxy = None
1058
1059 def to_dict(self, input_connections):
1060 outputs = self.galaxy_workflow_outputs_list()
1061 for key, input_connection_list in input_connections.items():
1062 input_subworkflow_step_id = self.subworkflow_proxy.find_inputs_step_index(
1063 key
1064 )
1065 for input_connection in input_connection_list:
1066 input_connection["input_subworkflow_step_id"] = input_subworkflow_step_id
1067
1068 return {
1069 "id": self._index,
1070 "label": self.label,
1071 "position": {"left": 0, "top": 0},
1072 "type": "subworkflow",
1073 "subworkflow": self.subworkflow_proxy.to_dict(),
1074 "annotation": self.subworkflow_proxy.cwl_object_to_annotation(self._step.tool),
1075 "input_connections": input_connections,
1076 "inputs": self.inputs_to_dicts(),
1077 "workflow_outputs": outputs,
1078 }
1079
1080 def tool_reference_proxies(self):
1081 return self.subworkflow_proxy.tool_reference_proxies()
1082
1083 @property
1084 def subworkflow_proxy(self):
1085 if self._subworkflow_proxy is None:
1086 self._subworkflow_proxy = WorkflowProxy(self.cwl_tool_object)
1087 return self._subworkflow_proxy
1088
1089
1090 def remove_pickle_problems(obj):
1091 """doc_loader does not pickle correctly"""
1092 if hasattr(obj, "doc_loader"):
1093 obj.doc_loader = None
1094 if hasattr(obj, "embedded_tool"):
1095 obj.embedded_tool = remove_pickle_problems(obj.embedded_tool)
1096 if hasattr(obj, "steps"):
1097 obj.steps = [remove_pickle_problems(s) for s in obj.steps]
1098 return obj
1099
1100
1101 class WorkflowToolReference(metaclass=ABCMeta):
1102 pass
1103
1104
1105 class EmbeddedWorkflowToolReference(WorkflowToolReference):
1106 pass
1107
1108
1109 class ExternalWorkflowToolReference(WorkflowToolReference):
1110 pass
1111
1112
1113 def _outer_field_to_input_instance(field):
1114 field_type = field_to_field_type(field) # Must be a list if in here?
1115 if not isinstance(field_type, list):
1116 field_type = [field_type]
1117
1118 name, label, description = _field_metadata(field)
1119
1120 case_name = "_cwl__type_"
1121 case_label = "Specify Parameter %s As" % label
1122
1123 def value_input(type_description):
1124 value_name = "_cwl__value_"
1125 value_label = label
1126 value_description = description
1127 return InputInstance(
1128 value_name,
1129 value_label,
1130 value_description,
1131 input_type=type_description.galaxy_param_type,
1132 collection_type=type_description.collection_type,
1133 )
1134
1135 select_options = []
1136 case_options = []
1137 type_descriptions = type_descriptions_for_field_types(field_type)
1138 for type_description in type_descriptions:
1139 select_options.append({"value": type_description.name, "label": type_description.label})
1140 input_instances = []
1141 if type_description.uses_param:
1142 input_instances.append(value_input(type_description))
1143 case_options.append((type_description.name, input_instances))
1144
1145 # If there is more than one way to represent this parameter - produce a conditional
1146 # requesting user to ask for what form they want to submit the data in, else just map
1147 # a simple Galaxy parameter.
1148 if len(case_options) > 1 and not USE_FIELD_TYPES:
1149 case_input = SelectInputInstance(
1150 name=case_name,
1151 label=case_label,
1152 description=False,
1153 options=select_options,
1154 )
1155
1156 return ConditionalInstance(name, case_input, case_options)
1157 else:
1158 if len(case_options) > 1:
1159 only_type_description = FIELD_TYPE_REPRESENTATION
1160 else:
1161 only_type_description = type_descriptions[0]
1162
1163 return InputInstance(
1164 name, label, description, input_type=only_type_description.galaxy_param_type, collection_type=only_type_description.collection_type
1165 )
1166
1167 # Older array to repeat handling, now we are just representing arrays as
1168 # dataset collections - we should offer a blended approach in the future.
1169 # if field_type in simple_map_type_map.keys():
1170 # input_type = simple_map_type_map[field_type]
1171 # return {"input_type": input_type, "array": False}
1172 # elif field_type == "array":
1173 # if isinstance(field["type"], dict):
1174 # array_type = field["type"]["items"]
1175 # else:
1176 # array_type = field["items"]
1177 # if array_type in simple_map_type_map.keys():
1178 # input_type = simple_map_type_map[array_type]
1179 # return {"input_type": input_type, "array": True}
1180 # else:
1181 # raise Exception("Unhandled simple field type encountered - [%s]." % field_type)
1182
1183
1184 def _field_metadata(field):
1185 name = field["name"]
1186 label = field.get("label", None)
1187 description = field.get("doc", None)
1188 return name, label, description
1189
1190
1191 def _simple_field_to_output(field):
1192 name = field["name"]
1193 output_data_class = field["type"]
1194 output_instance = OutputInstance(
1195 name,
1196 output_data_type=output_data_class,
1197 output_type=OUTPUT_TYPE.GLOB
1198 )
1199 return output_instance
1200
1201
1202 class ConditionalInstance:
1203
1204 def __init__(self, name, case, whens):
1205 self.input_type = INPUT_TYPE.CONDITIONAL
1206 self.name = name
1207 self.case = case
1208 self.whens = whens
1209
1210 def to_dict(self):
1211
1212 as_dict = dict(
1213 name=self.name,
1214 type=INPUT_TYPE.CONDITIONAL,
1215 test=self.case.to_dict(),
1216 when={},
1217 )
1218 for value, block in self.whens:
1219 as_dict["when"][value] = [i.to_dict() for i in block]
1220
1221 return as_dict
1222
1223
1224 class SelectInputInstance:
1225
1226 def __init__(self, name, label, description, options):
1227 self.input_type = INPUT_TYPE.SELECT
1228 self.name = name
1229 self.label = label
1230 self.description = description
1231 self.options = options
1232
1233 def to_dict(self):
1234 # TODO: serialize options...
1235 as_dict = dict(
1236 name=self.name,
1237 label=self.label or self.name,
1238 help=self.description,
1239 type=self.input_type,
1240 options=self.options,
1241 )
1242 return as_dict
1243
1244
1245 class InputInstance:
1246
1247 def __init__(self, name, label, description, input_type, array=False, area=False, collection_type=None):
1248 self.input_type = input_type
1249 self.collection_type = collection_type
1250 self.name = name
1251 self.label = label
1252 self.description = description
1253 self.required = True
1254 self.array = array
1255 self.area = area
1256
1257 def to_dict(self, itemwise=True):
1258 if itemwise and self.array:
1259 as_dict = dict(
1260 type="repeat",
1261 name="%s_repeat" % self.name,
1262 title="%s" % self.name,
1263 blocks=[
1264 self.to_dict(itemwise=False)
1265 ]
1266 )
1267 else:
1268 as_dict = dict(
1269 name=self.name,
1270 label=self.label or self.name,
1271 help=self.description,
1272 type=self.input_type,
1273 optional=not self.required,
1274 )
1275 if self.area:
1276 as_dict["area"] = True
1277
1278 if self.input_type == INPUT_TYPE.INTEGER:
1279 as_dict["value"] = "0"
1280 if self.input_type == INPUT_TYPE.FLOAT:
1281 as_dict["value"] = "0.0"
1282 elif self.input_type == INPUT_TYPE.DATA_COLLECTON:
1283 as_dict["collection_type"] = self.collection_type
1284
1285 return as_dict
1286
1287
1288 OUTPUT_TYPE = Bunch(
1289 GLOB="glob",
1290 STDOUT="stdout",
1291 )
1292
1293
1294 # TODO: Different subclasses - this is representing different types of things.
1295 class OutputInstance:
1296
1297 def __init__(self, name, output_data_type, output_type, path=None, fields=None):
1298 self.name = name
1299 self.output_data_type = output_data_type
1300 self.output_type = output_type
1301 self.path = path
1302 self.fields = fields
1303
1304
1305 __all__ = (
1306 'tool_proxy',
1307 'load_job_proxy',
1308 )