comparison env/lib/python3.9/site-packages/galaxy/tool_util/cwl/util.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Client-centric CWL-related utilities.
2
3 Used to share code between the Galaxy test framework
4 and other Galaxy CWL clients (e.g. Planemo)."""
5 import hashlib
6 import io
7 import json
8 import os
9 import tarfile
10 import tempfile
11 from collections import namedtuple
12
13 import yaml
14
15 from galaxy.util import unicodify
16
17 STORE_SECONDARY_FILES_WITH_BASENAME = True
18 SECONDARY_FILES_EXTRA_PREFIX = "__secondary_files__"
19 SECONDARY_FILES_INDEX_PATH = "__secondary_files_index.json"
20
21
22 def set_basename_and_derived_properties(properties, basename):
23 properties["basename"] = basename
24 properties["nameroot"], properties["nameext"] = os.path.splitext(basename)
25 return properties
26
27
28 def output_properties(path=None, content=None, basename=None, pseduo_location=False):
29 checksum = hashlib.sha1()
30 properties = {
31 "class": "File",
32 }
33 if path is not None:
34 properties["path"] = path
35 f = open(path, "rb")
36 else:
37 f = io.BytesIO(content)
38
39 try:
40 contents = f.read(1024 * 1024)
41 filesize = 0
42 while contents:
43 checksum.update(contents)
44 filesize += len(contents)
45 contents = f.read(1024 * 1024)
46 finally:
47 f.close()
48 properties["checksum"] = "sha1$%s" % checksum.hexdigest()
49 properties["size"] = filesize
50 set_basename_and_derived_properties(properties, basename)
51 _handle_pseudo_location(properties, pseduo_location)
52 return properties
53
54
55 def _handle_pseudo_location(properties, pseduo_location):
56 if pseduo_location:
57 properties["location"] = properties["basename"]
58
59
60 def abs_path_or_uri(path_or_uri, relative_to):
61 """Return an absolute path if this isn't a URI, otherwise keep the URI the same.
62 """
63 is_uri = "://" in path_or_uri
64 if not is_uri and not os.path.isabs(path_or_uri):
65 path_or_uri = os.path.join(relative_to, path_or_uri)
66 if not is_uri:
67 _ensure_file_exists(path_or_uri)
68 return path_or_uri
69
70
71 def abs_path(path_or_uri, relative_to):
72 path_or_uri = abs_path_or_uri(path_or_uri, relative_to)
73 if path_or_uri.startswith("file://"):
74 path_or_uri = path_or_uri[len("file://"):]
75
76 return path_or_uri
77
78
79 def path_or_uri_to_uri(path_or_uri):
80 if "://" not in path_or_uri:
81 return "file://%s" % path_or_uri
82 else:
83 return path_or_uri
84
85
86 def galactic_job_json(
87 job, test_data_directory, upload_func, collection_create_func, tool_or_workflow="workflow"
88 ):
89 """Adapt a CWL job object to the Galaxy API.
90
91 CWL derived tools in Galaxy can consume a job description sort of like
92 CWL job objects via the API but paths need to be replaced with datasets
93 and records and arrays with collection references. This function will
94 stage files and modify the job description to adapt to these changes
95 for Galaxy.
96 """
97
98 datasets = []
99 dataset_collections = []
100
101 def response_to_hda(target, upload_response):
102 assert isinstance(upload_response, dict), upload_response
103 assert "outputs" in upload_response, upload_response
104 assert len(upload_response["outputs"]) > 0, upload_response
105 dataset = upload_response["outputs"][0]
106 datasets.append((dataset, target))
107 dataset_id = dataset["id"]
108 return {"src": "hda", "id": dataset_id}
109
110 def upload_file(file_path, secondary_files, **kwargs):
111 file_path = abs_path_or_uri(file_path, test_data_directory)
112 target = FileUploadTarget(file_path, secondary_files, **kwargs)
113 upload_response = upload_func(target)
114 return response_to_hda(target, upload_response)
115
116 def upload_file_literal(contents, **kwd):
117 target = FileLiteralTarget(contents, **kwd)
118 upload_response = upload_func(target)
119 return response_to_hda(target, upload_response)
120
121 def upload_tar(file_path):
122 file_path = abs_path_or_uri(file_path, test_data_directory)
123 target = DirectoryUploadTarget(file_path)
124 upload_response = upload_func(target)
125 return response_to_hda(target, upload_response)
126
127 def upload_file_with_composite_data(file_path, composite_data, **kwargs):
128 if file_path is not None:
129 file_path = abs_path_or_uri(file_path, test_data_directory)
130 composite_data_resolved = []
131 for cd in composite_data:
132 composite_data_resolved.append(abs_path_or_uri(cd, test_data_directory))
133 target = FileUploadTarget(file_path, composite_data=composite_data_resolved, **kwargs)
134 upload_response = upload_func(target)
135 return response_to_hda(target, upload_response)
136
137 def upload_object(the_object):
138 target = ObjectUploadTarget(the_object)
139 upload_response = upload_func(target)
140 return response_to_hda(target, upload_response)
141
142 def replacement_item(value, force_to_file=False):
143 is_dict = isinstance(value, dict)
144 item_class = None if not is_dict else value.get("class", None)
145 is_file = item_class == "File"
146 is_directory = item_class == "Directory"
147 is_collection = item_class == "Collection" # Galaxy extension.
148
149 if force_to_file:
150 if is_file:
151 return replacement_file(value)
152 else:
153 return upload_object(value)
154
155 if isinstance(value, list):
156 return replacement_list(value)
157 elif not isinstance(value, dict):
158 if tool_or_workflow == "workflow":
159 # All inputs represented as dataset or collection parameters
160 return upload_object(value)
161 else:
162 return value
163
164 if is_file:
165 return replacement_file(value)
166 elif is_directory:
167 return replacement_directory(value)
168 elif is_collection:
169 return replacement_collection(value)
170 else:
171 return replacement_record(value)
172
173 def replacement_file(value):
174 if value.get('galaxy_id'):
175 return {"src": "hda", "id": value['galaxy_id']}
176 file_path = value.get("location", None) or value.get("path", None)
177 # format to match output definitions in tool, where did filetype come from?
178 filetype = value.get("filetype", None) or value.get("format", None)
179 composite_data_raw = value.get("composite_data", None)
180 kwd = {}
181 if "tags" in value:
182 kwd["tags"] = value.get("tags")
183 if "dbkey" in value:
184 kwd["dbkey"] = value.get("dbkey")
185 if composite_data_raw:
186 composite_data = []
187 for entry in composite_data_raw:
188 path = None
189 if isinstance(entry, dict):
190 path = entry.get("location", None) or entry.get("path", None)
191 else:
192 path = entry
193 composite_data.append(path)
194 rval_c = upload_file_with_composite_data(None, composite_data, filetype=filetype, **kwd)
195 return rval_c
196
197 if file_path is None:
198 contents = value.get("contents", None)
199 if contents is not None:
200 return upload_file_literal(contents, **kwd)
201
202 return value
203
204 secondary_files = value.get("secondaryFiles", [])
205 secondary_files_tar_path = None
206 if secondary_files:
207 tmp = tempfile.NamedTemporaryFile(delete=False)
208 tf = tarfile.open(fileobj=tmp, mode='w:')
209 order = []
210 index_contents = {
211 "order": order
212 }
213 for secondary_file in secondary_files:
214 secondary_file_path = secondary_file.get("location", None) or secondary_file.get("path", None)
215 assert secondary_file_path, "Invalid secondaryFile entry found [%s]" % secondary_file
216 full_secondary_file_path = os.path.join(test_data_directory, secondary_file_path)
217 basename = secondary_file.get("basename") or os.path.basename(secondary_file_path)
218 order.append(unicodify(basename))
219 tf.add(full_secondary_file_path, os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename))
220 tmp_index = tempfile.NamedTemporaryFile(delete=False, mode="w")
221 json.dump(index_contents, tmp_index)
222 tmp_index.close()
223 tf.add(tmp_index.name, SECONDARY_FILES_INDEX_PATH)
224 tf.close()
225 secondary_files_tar_path = tmp.name
226
227 return upload_file(file_path, secondary_files_tar_path, filetype=filetype, **kwd)
228
229 def replacement_directory(value):
230 file_path = value.get("location", None) or value.get("path", None)
231 if file_path is None:
232 return value
233
234 if not os.path.isabs(file_path):
235 file_path = os.path.join(test_data_directory, file_path)
236
237 tmp = tempfile.NamedTemporaryFile(delete=False)
238 tf = tarfile.open(fileobj=tmp, mode='w:')
239 tf.add(file_path, '.')
240 tf.close()
241
242 return upload_tar(tmp.name)
243
244 def replacement_list(value):
245 collection_element_identifiers = []
246 for i, item in enumerate(value):
247 dataset = replacement_item(item, force_to_file=True)
248 collection_element = dataset.copy()
249 collection_element["name"] = str(i)
250 collection_element_identifiers.append(collection_element)
251
252 # TODO: handle nested lists/arrays
253 collection = collection_create_func(collection_element_identifiers, "list")
254 dataset_collections.append(collection)
255 hdca_id = collection["id"]
256 return {"src": "hdca", "id": hdca_id}
257
258 def to_elements(value, rank_collection_type):
259 collection_element_identifiers = []
260 assert "elements" in value
261 elements = value["elements"]
262
263 is_nested_collection = ":" in rank_collection_type
264 for element in elements:
265 if not is_nested_collection:
266 # flat collection
267 dataset = replacement_item(element, force_to_file=True)
268 collection_element = dataset.copy()
269 collection_element["name"] = element["identifier"]
270 collection_element_identifiers.append(collection_element)
271 else:
272 # nested collection
273 sub_collection_type = rank_collection_type[rank_collection_type.find(":") + 1:]
274 collection_element = {
275 "name": element["identifier"],
276 "src": "new_collection",
277 "collection_type": sub_collection_type,
278 "element_identifiers": to_elements(element, sub_collection_type)
279 }
280 collection_element_identifiers.append(collection_element)
281
282 return collection_element_identifiers
283
284 def replacement_collection(value):
285 if value.get('galaxy_id'):
286 return {"src": "hdca", "id": value['galaxy_id']}
287 assert "collection_type" in value
288 collection_type = value["collection_type"]
289 elements = to_elements(value, collection_type)
290
291 collection = collection_create_func(elements, collection_type)
292 dataset_collections.append(collection)
293 hdca_id = collection["id"]
294 return {"src": "hdca", "id": hdca_id}
295
296 def replacement_record(value):
297 collection_element_identifiers = []
298 for record_key, record_value in value.items():
299 if not isinstance(record_value, dict) or record_value.get("class") != "File":
300 dataset = replacement_item(record_value, force_to_file=True)
301 collection_element = dataset.copy()
302 else:
303 dataset = upload_file(record_value["location"], [])
304 collection_element = dataset.copy()
305
306 collection_element["name"] = record_key
307 collection_element_identifiers.append(collection_element)
308
309 collection = collection_create_func(collection_element_identifiers, "record")
310 dataset_collections.append(collection)
311 hdca_id = collection["id"]
312 return {"src": "hdca", "id": hdca_id}
313
314 replace_keys = {}
315 for key, value in job.items():
316 replace_keys[key] = replacement_item(value)
317
318 job.update(replace_keys)
319 return job, datasets
320
321
322 def _ensure_file_exists(file_path):
323 if not os.path.exists(file_path):
324 template = "File [%s] does not exist - parent directory [%s] does %sexist, cwd is [%s]"
325 parent_directory = os.path.dirname(file_path)
326 message = template % (
327 file_path,
328 parent_directory,
329 "" if os.path.exists(parent_directory) else "not ",
330 os.getcwd(),
331 )
332 raise Exception(message)
333
334
335 class FileLiteralTarget:
336
337 def __init__(self, contents, path=None, **kwargs):
338 self.contents = contents
339 self.properties = kwargs
340 self.path = path
341
342 def __str__(self):
343 return f"FileLiteralTarget[contents={self.contents}] with {self.properties}"
344
345
346 class FileUploadTarget:
347
348 def __init__(self, path, secondary_files=None, **kwargs):
349 self.path = path
350 self.secondary_files = secondary_files
351 self.composite_data = kwargs.get("composite_data", [])
352 self.properties = kwargs
353
354 def __str__(self):
355 return f"FileUploadTarget[path={self.path}] with {self.properties}"
356
357
358 class ObjectUploadTarget:
359
360 def __init__(self, the_object):
361 self.object = the_object
362 self.properties = {}
363
364 def __str__(self):
365 return f"ObjectUploadTarget[object={self.object} with {self.properties}]"
366
367
368 class DirectoryUploadTarget:
369
370 def __init__(self, tar_path):
371 self.tar_path = tar_path
372
373 def __str__(self):
374 return "DirectoryUploadTarget[tar_path=%s]" % self.tar_path
375
376
377 GalaxyOutput = namedtuple("GalaxyOutput", ["history_id", "history_content_type", "history_content_id", "metadata"])
378
379
380 def tool_response_to_output(tool_response, history_id, output_id):
381 for output in tool_response["outputs"]:
382 if output["output_name"] == output_id:
383 return GalaxyOutput(history_id, "dataset", output["id"], None)
384
385 for output_collection in tool_response["output_collections"]:
386 if output_collection["output_name"] == output_id:
387 return GalaxyOutput(history_id, "dataset_collection", output_collection["id"], None)
388
389 raise Exception("Failed to find output with label [%s]" % output_id)
390
391
392 def invocation_to_output(invocation, history_id, output_id):
393 if output_id in invocation["outputs"]:
394 dataset = invocation["outputs"][output_id]
395 galaxy_output = GalaxyOutput(history_id, "dataset", dataset["id"], None)
396 elif output_id in invocation["output_collections"]:
397 collection = invocation["output_collections"][output_id]
398 galaxy_output = GalaxyOutput(history_id, "dataset_collection", collection["id"], None)
399 elif output_id in invocation["output_values"]:
400 output_value = invocation["output_values"][output_id]
401 galaxy_output = GalaxyOutput(None, "raw_value", output_value, None)
402 else:
403 raise Exception(f"Failed to find output with label [{output_id}] in [{invocation}]")
404
405 return galaxy_output
406
407
408 def output_to_cwl_json(
409 galaxy_output, get_metadata, get_dataset, get_extra_files, pseduo_location=False,
410 ):
411 """Convert objects in a Galaxy history into a CWL object.
412
413 Useful in running conformance tests and implementing the cwl-runner
414 interface via Galaxy.
415 """
416 def element_to_cwl_json(element):
417 object = element["object"]
418 content_type = object.get("history_content_type")
419 metadata = None
420 if content_type is None:
421 content_type = "dataset_collection"
422 metadata = element["object"]
423 metadata["history_content_type"] = content_type
424 element_output = GalaxyOutput(
425 galaxy_output.history_id,
426 content_type,
427 object["id"],
428 metadata,
429 )
430 return output_to_cwl_json(element_output, get_metadata, get_dataset, get_extra_files, pseduo_location=pseduo_location)
431
432 output_metadata = galaxy_output.metadata
433 if output_metadata is None:
434 output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id)
435
436 def dataset_dict_to_json_content(dataset_dict):
437 if "content" in dataset_dict:
438 return json.loads(dataset_dict["content"])
439 else:
440 with open(dataset_dict["path"]) as f:
441 return json.safe_load(f)
442
443 if galaxy_output.history_content_type == "raw_value":
444 return galaxy_output.history_content_id
445 elif output_metadata["history_content_type"] == "dataset":
446 ext = output_metadata["file_ext"]
447 assert output_metadata["state"] == "ok"
448 if ext == "expression.json":
449 dataset_dict = get_dataset(output_metadata)
450 return dataset_dict_to_json_content(dataset_dict)
451 else:
452 file_or_directory = "Directory" if ext == "directory" else "File"
453 secondary_files = []
454
455 if file_or_directory == "File":
456 dataset_dict = get_dataset(output_metadata)
457 properties = output_properties(pseduo_location=pseduo_location, **dataset_dict)
458 basename = properties["basename"]
459 extra_files = get_extra_files(output_metadata)
460 found_index = False
461 for extra_file in extra_files:
462 if extra_file["class"] == "File":
463 path = extra_file["path"]
464 if path == SECONDARY_FILES_INDEX_PATH:
465 found_index = True
466
467 if found_index:
468 ec = get_dataset(output_metadata, filename=SECONDARY_FILES_INDEX_PATH)
469 index = dataset_dict_to_json_content(ec)
470
471 def dir_listing(dir_path):
472 listing = []
473 for extra_file in extra_files:
474 path = extra_file["path"]
475 extra_file_class = extra_file["class"]
476 extra_file_basename = os.path.basename(path)
477 if os.path.join(dir_path, extra_file_basename) != path:
478 continue
479
480 if extra_file_class == "File":
481 ec = get_dataset(output_metadata, filename=path)
482 ec["basename"] = extra_file_basename
483 ec_properties = output_properties(pseduo_location=pseduo_location, **ec)
484 elif extra_file_class == "Directory":
485 ec_properties = {}
486 ec_properties["class"] = "Directory"
487 ec_properties["location"] = ec_basename
488 ec_properties["listing"] = dir_listing(path)
489 else:
490 raise Exception("Unknown output type encountered....")
491 listing.append(ec_properties)
492 return listing
493
494 for basename in index["order"]:
495 for extra_file in extra_files:
496 path = extra_file["path"]
497 if path != os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename):
498 continue
499
500 extra_file_class = extra_file["class"]
501
502 # This is wrong...
503 if not STORE_SECONDARY_FILES_WITH_BASENAME:
504 ec_basename = basename + os.path.basename(path)
505 else:
506 ec_basename = os.path.basename(path)
507
508 if extra_file_class == "File":
509 ec = get_dataset(output_metadata, filename=path)
510 ec["basename"] = ec_basename
511 ec_properties = output_properties(pseduo_location=pseduo_location, **ec)
512 elif extra_file_class == "Directory":
513 ec_properties = {}
514 ec_properties["class"] = "Directory"
515 ec_properties["location"] = ec_basename
516 ec_properties["listing"] = dir_listing(path)
517 else:
518 raise Exception("Unknown output type encountered....")
519 secondary_files.append(ec_properties)
520
521 else:
522 basename = output_metadata.get("created_from_basename")
523 if not basename:
524 basename = output_metadata.get("name")
525
526 listing = []
527 properties = {
528 "class": "Directory",
529 "basename": basename,
530 "listing": listing,
531 }
532
533 extra_files = get_extra_files(output_metadata)
534 for extra_file in extra_files:
535 if extra_file["class"] == "File":
536 path = extra_file["path"]
537 ec = get_dataset(output_metadata, filename=path)
538 ec["basename"] = os.path.basename(path)
539 ec_properties = output_properties(pseduo_location=pseduo_location, **ec)
540 listing.append(ec_properties)
541
542 if secondary_files:
543 properties["secondaryFiles"] = secondary_files
544 return properties
545
546 elif output_metadata["history_content_type"] == "dataset_collection":
547 rval = None
548 collection_type = output_metadata["collection_type"].split(":", 1)[0]
549 if collection_type in ["list", "paired"]:
550 rval = []
551 for element in output_metadata["elements"]:
552 rval.append(element_to_cwl_json(element))
553 elif collection_type == "record":
554 rval = {}
555 for element in output_metadata["elements"]:
556 rval[element["element_identifier"]] = element_to_cwl_json(element)
557 return rval
558 else:
559 raise NotImplementedError("Unknown history content type encountered")
560
561
562 def download_output(galaxy_output, get_metadata, get_dataset, get_extra_files, output_path):
563 output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id)
564 dataset_dict = get_dataset(output_metadata)
565 with open(output_path, 'wb') as fh:
566 fh.write(dataset_dict['content'])
567
568
569 def guess_artifact_type(path):
570 # TODO: Handle IDs within files.
571 tool_or_workflow = "workflow"
572 try:
573 with open(path) as f:
574 artifact = yaml.safe_load(f)
575
576 tool_or_workflow = "tool" if artifact["class"] != "Workflow" else "workflow"
577
578 except Exception as e:
579 print(e)
580
581 return tool_or_workflow