Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/galaxy/tool_util/cwl/util.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Client-centric CWL-related utilities. | |
2 | |
3 Used to share code between the Galaxy test framework | |
4 and other Galaxy CWL clients (e.g. Planemo).""" | |
5 import hashlib | |
6 import io | |
7 import json | |
8 import os | |
9 import tarfile | |
10 import tempfile | |
11 from collections import namedtuple | |
12 | |
13 import yaml | |
14 | |
15 from galaxy.util import unicodify | |
16 | |
17 STORE_SECONDARY_FILES_WITH_BASENAME = True | |
18 SECONDARY_FILES_EXTRA_PREFIX = "__secondary_files__" | |
19 SECONDARY_FILES_INDEX_PATH = "__secondary_files_index.json" | |
20 | |
21 | |
22 def set_basename_and_derived_properties(properties, basename): | |
23 properties["basename"] = basename | |
24 properties["nameroot"], properties["nameext"] = os.path.splitext(basename) | |
25 return properties | |
26 | |
27 | |
28 def output_properties(path=None, content=None, basename=None, pseduo_location=False): | |
29 checksum = hashlib.sha1() | |
30 properties = { | |
31 "class": "File", | |
32 } | |
33 if path is not None: | |
34 properties["path"] = path | |
35 f = open(path, "rb") | |
36 else: | |
37 f = io.BytesIO(content) | |
38 | |
39 try: | |
40 contents = f.read(1024 * 1024) | |
41 filesize = 0 | |
42 while contents: | |
43 checksum.update(contents) | |
44 filesize += len(contents) | |
45 contents = f.read(1024 * 1024) | |
46 finally: | |
47 f.close() | |
48 properties["checksum"] = "sha1$%s" % checksum.hexdigest() | |
49 properties["size"] = filesize | |
50 set_basename_and_derived_properties(properties, basename) | |
51 _handle_pseudo_location(properties, pseduo_location) | |
52 return properties | |
53 | |
54 | |
55 def _handle_pseudo_location(properties, pseduo_location): | |
56 if pseduo_location: | |
57 properties["location"] = properties["basename"] | |
58 | |
59 | |
60 def abs_path_or_uri(path_or_uri, relative_to): | |
61 """Return an absolute path if this isn't a URI, otherwise keep the URI the same. | |
62 """ | |
63 is_uri = "://" in path_or_uri | |
64 if not is_uri and not os.path.isabs(path_or_uri): | |
65 path_or_uri = os.path.join(relative_to, path_or_uri) | |
66 if not is_uri: | |
67 _ensure_file_exists(path_or_uri) | |
68 return path_or_uri | |
69 | |
70 | |
71 def abs_path(path_or_uri, relative_to): | |
72 path_or_uri = abs_path_or_uri(path_or_uri, relative_to) | |
73 if path_or_uri.startswith("file://"): | |
74 path_or_uri = path_or_uri[len("file://"):] | |
75 | |
76 return path_or_uri | |
77 | |
78 | |
79 def path_or_uri_to_uri(path_or_uri): | |
80 if "://" not in path_or_uri: | |
81 return "file://%s" % path_or_uri | |
82 else: | |
83 return path_or_uri | |
84 | |
85 | |
86 def galactic_job_json( | |
87 job, test_data_directory, upload_func, collection_create_func, tool_or_workflow="workflow" | |
88 ): | |
89 """Adapt a CWL job object to the Galaxy API. | |
90 | |
91 CWL derived tools in Galaxy can consume a job description sort of like | |
92 CWL job objects via the API but paths need to be replaced with datasets | |
93 and records and arrays with collection references. This function will | |
94 stage files and modify the job description to adapt to these changes | |
95 for Galaxy. | |
96 """ | |
97 | |
98 datasets = [] | |
99 dataset_collections = [] | |
100 | |
101 def response_to_hda(target, upload_response): | |
102 assert isinstance(upload_response, dict), upload_response | |
103 assert "outputs" in upload_response, upload_response | |
104 assert len(upload_response["outputs"]) > 0, upload_response | |
105 dataset = upload_response["outputs"][0] | |
106 datasets.append((dataset, target)) | |
107 dataset_id = dataset["id"] | |
108 return {"src": "hda", "id": dataset_id} | |
109 | |
110 def upload_file(file_path, secondary_files, **kwargs): | |
111 file_path = abs_path_or_uri(file_path, test_data_directory) | |
112 target = FileUploadTarget(file_path, secondary_files, **kwargs) | |
113 upload_response = upload_func(target) | |
114 return response_to_hda(target, upload_response) | |
115 | |
116 def upload_file_literal(contents, **kwd): | |
117 target = FileLiteralTarget(contents, **kwd) | |
118 upload_response = upload_func(target) | |
119 return response_to_hda(target, upload_response) | |
120 | |
121 def upload_tar(file_path): | |
122 file_path = abs_path_or_uri(file_path, test_data_directory) | |
123 target = DirectoryUploadTarget(file_path) | |
124 upload_response = upload_func(target) | |
125 return response_to_hda(target, upload_response) | |
126 | |
127 def upload_file_with_composite_data(file_path, composite_data, **kwargs): | |
128 if file_path is not None: | |
129 file_path = abs_path_or_uri(file_path, test_data_directory) | |
130 composite_data_resolved = [] | |
131 for cd in composite_data: | |
132 composite_data_resolved.append(abs_path_or_uri(cd, test_data_directory)) | |
133 target = FileUploadTarget(file_path, composite_data=composite_data_resolved, **kwargs) | |
134 upload_response = upload_func(target) | |
135 return response_to_hda(target, upload_response) | |
136 | |
137 def upload_object(the_object): | |
138 target = ObjectUploadTarget(the_object) | |
139 upload_response = upload_func(target) | |
140 return response_to_hda(target, upload_response) | |
141 | |
142 def replacement_item(value, force_to_file=False): | |
143 is_dict = isinstance(value, dict) | |
144 item_class = None if not is_dict else value.get("class", None) | |
145 is_file = item_class == "File" | |
146 is_directory = item_class == "Directory" | |
147 is_collection = item_class == "Collection" # Galaxy extension. | |
148 | |
149 if force_to_file: | |
150 if is_file: | |
151 return replacement_file(value) | |
152 else: | |
153 return upload_object(value) | |
154 | |
155 if isinstance(value, list): | |
156 return replacement_list(value) | |
157 elif not isinstance(value, dict): | |
158 if tool_or_workflow == "workflow": | |
159 # All inputs represented as dataset or collection parameters | |
160 return upload_object(value) | |
161 else: | |
162 return value | |
163 | |
164 if is_file: | |
165 return replacement_file(value) | |
166 elif is_directory: | |
167 return replacement_directory(value) | |
168 elif is_collection: | |
169 return replacement_collection(value) | |
170 else: | |
171 return replacement_record(value) | |
172 | |
173 def replacement_file(value): | |
174 if value.get('galaxy_id'): | |
175 return {"src": "hda", "id": value['galaxy_id']} | |
176 file_path = value.get("location", None) or value.get("path", None) | |
177 # format to match output definitions in tool, where did filetype come from? | |
178 filetype = value.get("filetype", None) or value.get("format", None) | |
179 composite_data_raw = value.get("composite_data", None) | |
180 kwd = {} | |
181 if "tags" in value: | |
182 kwd["tags"] = value.get("tags") | |
183 if "dbkey" in value: | |
184 kwd["dbkey"] = value.get("dbkey") | |
185 if composite_data_raw: | |
186 composite_data = [] | |
187 for entry in composite_data_raw: | |
188 path = None | |
189 if isinstance(entry, dict): | |
190 path = entry.get("location", None) or entry.get("path", None) | |
191 else: | |
192 path = entry | |
193 composite_data.append(path) | |
194 rval_c = upload_file_with_composite_data(None, composite_data, filetype=filetype, **kwd) | |
195 return rval_c | |
196 | |
197 if file_path is None: | |
198 contents = value.get("contents", None) | |
199 if contents is not None: | |
200 return upload_file_literal(contents, **kwd) | |
201 | |
202 return value | |
203 | |
204 secondary_files = value.get("secondaryFiles", []) | |
205 secondary_files_tar_path = None | |
206 if secondary_files: | |
207 tmp = tempfile.NamedTemporaryFile(delete=False) | |
208 tf = tarfile.open(fileobj=tmp, mode='w:') | |
209 order = [] | |
210 index_contents = { | |
211 "order": order | |
212 } | |
213 for secondary_file in secondary_files: | |
214 secondary_file_path = secondary_file.get("location", None) or secondary_file.get("path", None) | |
215 assert secondary_file_path, "Invalid secondaryFile entry found [%s]" % secondary_file | |
216 full_secondary_file_path = os.path.join(test_data_directory, secondary_file_path) | |
217 basename = secondary_file.get("basename") or os.path.basename(secondary_file_path) | |
218 order.append(unicodify(basename)) | |
219 tf.add(full_secondary_file_path, os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename)) | |
220 tmp_index = tempfile.NamedTemporaryFile(delete=False, mode="w") | |
221 json.dump(index_contents, tmp_index) | |
222 tmp_index.close() | |
223 tf.add(tmp_index.name, SECONDARY_FILES_INDEX_PATH) | |
224 tf.close() | |
225 secondary_files_tar_path = tmp.name | |
226 | |
227 return upload_file(file_path, secondary_files_tar_path, filetype=filetype, **kwd) | |
228 | |
229 def replacement_directory(value): | |
230 file_path = value.get("location", None) or value.get("path", None) | |
231 if file_path is None: | |
232 return value | |
233 | |
234 if not os.path.isabs(file_path): | |
235 file_path = os.path.join(test_data_directory, file_path) | |
236 | |
237 tmp = tempfile.NamedTemporaryFile(delete=False) | |
238 tf = tarfile.open(fileobj=tmp, mode='w:') | |
239 tf.add(file_path, '.') | |
240 tf.close() | |
241 | |
242 return upload_tar(tmp.name) | |
243 | |
244 def replacement_list(value): | |
245 collection_element_identifiers = [] | |
246 for i, item in enumerate(value): | |
247 dataset = replacement_item(item, force_to_file=True) | |
248 collection_element = dataset.copy() | |
249 collection_element["name"] = str(i) | |
250 collection_element_identifiers.append(collection_element) | |
251 | |
252 # TODO: handle nested lists/arrays | |
253 collection = collection_create_func(collection_element_identifiers, "list") | |
254 dataset_collections.append(collection) | |
255 hdca_id = collection["id"] | |
256 return {"src": "hdca", "id": hdca_id} | |
257 | |
258 def to_elements(value, rank_collection_type): | |
259 collection_element_identifiers = [] | |
260 assert "elements" in value | |
261 elements = value["elements"] | |
262 | |
263 is_nested_collection = ":" in rank_collection_type | |
264 for element in elements: | |
265 if not is_nested_collection: | |
266 # flat collection | |
267 dataset = replacement_item(element, force_to_file=True) | |
268 collection_element = dataset.copy() | |
269 collection_element["name"] = element["identifier"] | |
270 collection_element_identifiers.append(collection_element) | |
271 else: | |
272 # nested collection | |
273 sub_collection_type = rank_collection_type[rank_collection_type.find(":") + 1:] | |
274 collection_element = { | |
275 "name": element["identifier"], | |
276 "src": "new_collection", | |
277 "collection_type": sub_collection_type, | |
278 "element_identifiers": to_elements(element, sub_collection_type) | |
279 } | |
280 collection_element_identifiers.append(collection_element) | |
281 | |
282 return collection_element_identifiers | |
283 | |
284 def replacement_collection(value): | |
285 if value.get('galaxy_id'): | |
286 return {"src": "hdca", "id": value['galaxy_id']} | |
287 assert "collection_type" in value | |
288 collection_type = value["collection_type"] | |
289 elements = to_elements(value, collection_type) | |
290 | |
291 collection = collection_create_func(elements, collection_type) | |
292 dataset_collections.append(collection) | |
293 hdca_id = collection["id"] | |
294 return {"src": "hdca", "id": hdca_id} | |
295 | |
296 def replacement_record(value): | |
297 collection_element_identifiers = [] | |
298 for record_key, record_value in value.items(): | |
299 if not isinstance(record_value, dict) or record_value.get("class") != "File": | |
300 dataset = replacement_item(record_value, force_to_file=True) | |
301 collection_element = dataset.copy() | |
302 else: | |
303 dataset = upload_file(record_value["location"], []) | |
304 collection_element = dataset.copy() | |
305 | |
306 collection_element["name"] = record_key | |
307 collection_element_identifiers.append(collection_element) | |
308 | |
309 collection = collection_create_func(collection_element_identifiers, "record") | |
310 dataset_collections.append(collection) | |
311 hdca_id = collection["id"] | |
312 return {"src": "hdca", "id": hdca_id} | |
313 | |
314 replace_keys = {} | |
315 for key, value in job.items(): | |
316 replace_keys[key] = replacement_item(value) | |
317 | |
318 job.update(replace_keys) | |
319 return job, datasets | |
320 | |
321 | |
322 def _ensure_file_exists(file_path): | |
323 if not os.path.exists(file_path): | |
324 template = "File [%s] does not exist - parent directory [%s] does %sexist, cwd is [%s]" | |
325 parent_directory = os.path.dirname(file_path) | |
326 message = template % ( | |
327 file_path, | |
328 parent_directory, | |
329 "" if os.path.exists(parent_directory) else "not ", | |
330 os.getcwd(), | |
331 ) | |
332 raise Exception(message) | |
333 | |
334 | |
335 class FileLiteralTarget: | |
336 | |
337 def __init__(self, contents, path=None, **kwargs): | |
338 self.contents = contents | |
339 self.properties = kwargs | |
340 self.path = path | |
341 | |
342 def __str__(self): | |
343 return f"FileLiteralTarget[contents={self.contents}] with {self.properties}" | |
344 | |
345 | |
346 class FileUploadTarget: | |
347 | |
348 def __init__(self, path, secondary_files=None, **kwargs): | |
349 self.path = path | |
350 self.secondary_files = secondary_files | |
351 self.composite_data = kwargs.get("composite_data", []) | |
352 self.properties = kwargs | |
353 | |
354 def __str__(self): | |
355 return f"FileUploadTarget[path={self.path}] with {self.properties}" | |
356 | |
357 | |
358 class ObjectUploadTarget: | |
359 | |
360 def __init__(self, the_object): | |
361 self.object = the_object | |
362 self.properties = {} | |
363 | |
364 def __str__(self): | |
365 return f"ObjectUploadTarget[object={self.object} with {self.properties}]" | |
366 | |
367 | |
368 class DirectoryUploadTarget: | |
369 | |
370 def __init__(self, tar_path): | |
371 self.tar_path = tar_path | |
372 | |
373 def __str__(self): | |
374 return "DirectoryUploadTarget[tar_path=%s]" % self.tar_path | |
375 | |
376 | |
377 GalaxyOutput = namedtuple("GalaxyOutput", ["history_id", "history_content_type", "history_content_id", "metadata"]) | |
378 | |
379 | |
380 def tool_response_to_output(tool_response, history_id, output_id): | |
381 for output in tool_response["outputs"]: | |
382 if output["output_name"] == output_id: | |
383 return GalaxyOutput(history_id, "dataset", output["id"], None) | |
384 | |
385 for output_collection in tool_response["output_collections"]: | |
386 if output_collection["output_name"] == output_id: | |
387 return GalaxyOutput(history_id, "dataset_collection", output_collection["id"], None) | |
388 | |
389 raise Exception("Failed to find output with label [%s]" % output_id) | |
390 | |
391 | |
392 def invocation_to_output(invocation, history_id, output_id): | |
393 if output_id in invocation["outputs"]: | |
394 dataset = invocation["outputs"][output_id] | |
395 galaxy_output = GalaxyOutput(history_id, "dataset", dataset["id"], None) | |
396 elif output_id in invocation["output_collections"]: | |
397 collection = invocation["output_collections"][output_id] | |
398 galaxy_output = GalaxyOutput(history_id, "dataset_collection", collection["id"], None) | |
399 elif output_id in invocation["output_values"]: | |
400 output_value = invocation["output_values"][output_id] | |
401 galaxy_output = GalaxyOutput(None, "raw_value", output_value, None) | |
402 else: | |
403 raise Exception(f"Failed to find output with label [{output_id}] in [{invocation}]") | |
404 | |
405 return galaxy_output | |
406 | |
407 | |
408 def output_to_cwl_json( | |
409 galaxy_output, get_metadata, get_dataset, get_extra_files, pseduo_location=False, | |
410 ): | |
411 """Convert objects in a Galaxy history into a CWL object. | |
412 | |
413 Useful in running conformance tests and implementing the cwl-runner | |
414 interface via Galaxy. | |
415 """ | |
416 def element_to_cwl_json(element): | |
417 object = element["object"] | |
418 content_type = object.get("history_content_type") | |
419 metadata = None | |
420 if content_type is None: | |
421 content_type = "dataset_collection" | |
422 metadata = element["object"] | |
423 metadata["history_content_type"] = content_type | |
424 element_output = GalaxyOutput( | |
425 galaxy_output.history_id, | |
426 content_type, | |
427 object["id"], | |
428 metadata, | |
429 ) | |
430 return output_to_cwl_json(element_output, get_metadata, get_dataset, get_extra_files, pseduo_location=pseduo_location) | |
431 | |
432 output_metadata = galaxy_output.metadata | |
433 if output_metadata is None: | |
434 output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id) | |
435 | |
436 def dataset_dict_to_json_content(dataset_dict): | |
437 if "content" in dataset_dict: | |
438 return json.loads(dataset_dict["content"]) | |
439 else: | |
440 with open(dataset_dict["path"]) as f: | |
441 return json.safe_load(f) | |
442 | |
443 if galaxy_output.history_content_type == "raw_value": | |
444 return galaxy_output.history_content_id | |
445 elif output_metadata["history_content_type"] == "dataset": | |
446 ext = output_metadata["file_ext"] | |
447 assert output_metadata["state"] == "ok" | |
448 if ext == "expression.json": | |
449 dataset_dict = get_dataset(output_metadata) | |
450 return dataset_dict_to_json_content(dataset_dict) | |
451 else: | |
452 file_or_directory = "Directory" if ext == "directory" else "File" | |
453 secondary_files = [] | |
454 | |
455 if file_or_directory == "File": | |
456 dataset_dict = get_dataset(output_metadata) | |
457 properties = output_properties(pseduo_location=pseduo_location, **dataset_dict) | |
458 basename = properties["basename"] | |
459 extra_files = get_extra_files(output_metadata) | |
460 found_index = False | |
461 for extra_file in extra_files: | |
462 if extra_file["class"] == "File": | |
463 path = extra_file["path"] | |
464 if path == SECONDARY_FILES_INDEX_PATH: | |
465 found_index = True | |
466 | |
467 if found_index: | |
468 ec = get_dataset(output_metadata, filename=SECONDARY_FILES_INDEX_PATH) | |
469 index = dataset_dict_to_json_content(ec) | |
470 | |
471 def dir_listing(dir_path): | |
472 listing = [] | |
473 for extra_file in extra_files: | |
474 path = extra_file["path"] | |
475 extra_file_class = extra_file["class"] | |
476 extra_file_basename = os.path.basename(path) | |
477 if os.path.join(dir_path, extra_file_basename) != path: | |
478 continue | |
479 | |
480 if extra_file_class == "File": | |
481 ec = get_dataset(output_metadata, filename=path) | |
482 ec["basename"] = extra_file_basename | |
483 ec_properties = output_properties(pseduo_location=pseduo_location, **ec) | |
484 elif extra_file_class == "Directory": | |
485 ec_properties = {} | |
486 ec_properties["class"] = "Directory" | |
487 ec_properties["location"] = ec_basename | |
488 ec_properties["listing"] = dir_listing(path) | |
489 else: | |
490 raise Exception("Unknown output type encountered....") | |
491 listing.append(ec_properties) | |
492 return listing | |
493 | |
494 for basename in index["order"]: | |
495 for extra_file in extra_files: | |
496 path = extra_file["path"] | |
497 if path != os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename): | |
498 continue | |
499 | |
500 extra_file_class = extra_file["class"] | |
501 | |
502 # This is wrong... | |
503 if not STORE_SECONDARY_FILES_WITH_BASENAME: | |
504 ec_basename = basename + os.path.basename(path) | |
505 else: | |
506 ec_basename = os.path.basename(path) | |
507 | |
508 if extra_file_class == "File": | |
509 ec = get_dataset(output_metadata, filename=path) | |
510 ec["basename"] = ec_basename | |
511 ec_properties = output_properties(pseduo_location=pseduo_location, **ec) | |
512 elif extra_file_class == "Directory": | |
513 ec_properties = {} | |
514 ec_properties["class"] = "Directory" | |
515 ec_properties["location"] = ec_basename | |
516 ec_properties["listing"] = dir_listing(path) | |
517 else: | |
518 raise Exception("Unknown output type encountered....") | |
519 secondary_files.append(ec_properties) | |
520 | |
521 else: | |
522 basename = output_metadata.get("created_from_basename") | |
523 if not basename: | |
524 basename = output_metadata.get("name") | |
525 | |
526 listing = [] | |
527 properties = { | |
528 "class": "Directory", | |
529 "basename": basename, | |
530 "listing": listing, | |
531 } | |
532 | |
533 extra_files = get_extra_files(output_metadata) | |
534 for extra_file in extra_files: | |
535 if extra_file["class"] == "File": | |
536 path = extra_file["path"] | |
537 ec = get_dataset(output_metadata, filename=path) | |
538 ec["basename"] = os.path.basename(path) | |
539 ec_properties = output_properties(pseduo_location=pseduo_location, **ec) | |
540 listing.append(ec_properties) | |
541 | |
542 if secondary_files: | |
543 properties["secondaryFiles"] = secondary_files | |
544 return properties | |
545 | |
546 elif output_metadata["history_content_type"] == "dataset_collection": | |
547 rval = None | |
548 collection_type = output_metadata["collection_type"].split(":", 1)[0] | |
549 if collection_type in ["list", "paired"]: | |
550 rval = [] | |
551 for element in output_metadata["elements"]: | |
552 rval.append(element_to_cwl_json(element)) | |
553 elif collection_type == "record": | |
554 rval = {} | |
555 for element in output_metadata["elements"]: | |
556 rval[element["element_identifier"]] = element_to_cwl_json(element) | |
557 return rval | |
558 else: | |
559 raise NotImplementedError("Unknown history content type encountered") | |
560 | |
561 | |
562 def download_output(galaxy_output, get_metadata, get_dataset, get_extra_files, output_path): | |
563 output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id) | |
564 dataset_dict = get_dataset(output_metadata) | |
565 with open(output_path, 'wb') as fh: | |
566 fh.write(dataset_dict['content']) | |
567 | |
568 | |
569 def guess_artifact_type(path): | |
570 # TODO: Handle IDs within files. | |
571 tool_or_workflow = "workflow" | |
572 try: | |
573 with open(path) as f: | |
574 artifact = yaml.safe_load(f) | |
575 | |
576 tool_or_workflow = "tool" if artifact["class"] != "Workflow" else "workflow" | |
577 | |
578 except Exception as e: | |
579 print(e) | |
580 | |
581 return tool_or_workflow |