Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/galaxy/tool_util/cwl/runtime_actions.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
import json import os import shutil from galaxy.util import safe_makedirs from .cwltool_deps import ref_resolver from .parser import ( JOB_JSON_FILE, load_job_proxy, ) from .util import ( SECONDARY_FILES_INDEX_PATH, STORE_SECONDARY_FILES_WITH_BASENAME, ) def file_dict_to_description(file_dict): output_class = file_dict["class"] assert output_class in ["File", "Directory"], file_dict location = file_dict["location"] if location.startswith("_:"): assert output_class == "File" return LiteralFileDescription(file_dict["contents"]) elif output_class == "File": return PathFileDescription(_possible_uri_to_path(location)) else: return PathDirectoryDescription(_possible_uri_to_path(location)) class FileDescription: pass class PathFileDescription: def __init__(self, path): self.path = path def write_to(self, destination): # TODO: Move if we can be sure this is in the working directory for instance... shutil.copy(self.path, destination) class PathDirectoryDescription: def __init__(self, path): self.path = path def write_to(self, destination): shutil.copytree(self.path, destination) class LiteralFileDescription: def __init__(self, content): self.content = content def write_to(self, destination): with open(destination, "wb") as f: f.write(self.content.encode("UTF-8")) def _possible_uri_to_path(location): if location.startswith("file://"): path = ref_resolver.uri_file_path(location) else: path = location return path def handle_outputs(job_directory=None): # Relocate dynamically collected files to pre-determined locations # registered with ToolOutput objects via from_work_dir handling. if job_directory is None: job_directory = os.path.join(os.getcwd(), os.path.pardir) metadata_directory = os.path.join(job_directory, "metadata") metadata_params_path = os.path.join(metadata_directory, "params.json") try: with open(metadata_params_path) as f: metadata_params = json.load(f) except OSError: raise Exception("Failed to find params.json from metadata directory [%s]" % metadata_directory) cwl_job_file = os.path.join(job_directory, JOB_JSON_FILE) if not os.path.exists(cwl_job_file): # Not a CWL job, just continue return # So we only need to do strict validation when the tool was loaded, # no reason to do it again during job execution - so this shortcut # allows us to not need Galaxy's full configuration on job nodes. job_proxy = load_job_proxy(job_directory, strict_cwl_validation=False) tool_working_directory = os.path.join(job_directory, "working") job_id_tag = metadata_params["job_id_tag"] from galaxy.job_execution.output_collect import default_exit_code_file, read_exit_code_from exit_code_file = default_exit_code_file(".", job_id_tag) tool_exit_code = read_exit_code_from(exit_code_file, job_id_tag) outputs = job_proxy.collect_outputs(tool_working_directory, tool_exit_code) # Build galaxy.json file. provided_metadata = {} def move_directory(output, target_path, output_name=None): assert output["class"] == "Directory" output_path = _possible_uri_to_path(output["location"]) if output_path.startswith("_:"): assert "listing" in output, "Do not know how to handle output, no 'listing' found." listing = output["listing"] # No a real path, just copy listing to target path. safe_makedirs(target_path) for listed_file in listing: # TODO: handle directories assert listed_file["class"] == "File" file_description = file_dict_to_description(listed_file) file_description.write_to(os.path.join(target_path, listed_file["basename"])) else: shutil.move(output_path, target_path) return {"created_from_basename": output["basename"]} def move_output(output, target_path, output_name=None): assert output["class"] == "File" file_description = file_dict_to_description(output) file_description.write_to(target_path) secondary_files = output.get("secondaryFiles", []) if secondary_files: order = [] index_contents = { "order": order } for secondary_file in secondary_files: if output_name is None: raise NotImplementedError("secondaryFiles are unimplemented for dynamic list elements") # TODO: handle nested files... secondary_file_description = file_dict_to_description(secondary_file) # assert secondary_file_path.startswith(output_path), "[%s] does not start with [%s]" % (secondary_file_path, output_path) secondary_file_basename = secondary_file["basename"] if not STORE_SECONDARY_FILES_WITH_BASENAME: output_basename = output["basename"] prefix = "" while True: if secondary_file_basename.startswith(output_basename): secondary_file_name = prefix + secondary_file_basename[len(output_basename):] break prefix = "^%s" % prefix if "." not in output_basename: secondary_file_name = prefix + secondary_file_name break else: output_basename = output_basename.rsplit(".", 1)[0] else: secondary_file_name = secondary_file_basename # Convert to ^ format.... secondary_files_dir = job_proxy.output_secondary_files_dir( output_name, create=True ) extra_target = os.path.join(secondary_files_dir, secondary_file_name) secondary_file_description.write_to(extra_target) order.append(secondary_file_name) with open(os.path.join(secondary_files_dir, "..", SECONDARY_FILES_INDEX_PATH), "w") as f: json.dump(index_contents, f) return {"created_from_basename": output["basename"]} def handle_known_output(output, output_key, output_name): # if output["class"] != "File": # # This case doesn't seem like it would be reached - why is this here? # provided_metadata[output_name] = { # "ext": "expression.json", # } # else: assert output_name if output["class"] == "File": target_path = job_proxy.output_path(output_name) file_metadata = move_output(output, target_path, output_name=output_name) elif output["class"] == "Directory": target_path = job_proxy.output_directory_contents_dir(output_name) file_metadata = move_directory(output, target_path, output_name=output_name) else: raise Exception("Unknown output type [%s] encountered" % output) provided_metadata[output_name] = file_metadata def handle_known_output_json(output, output_name): target_path = job_proxy.output_path(output_name) with open(target_path, "w") as f: f.write(json.dumps(output)) provided_metadata[output_name] = { "ext": "expression.json", } handled_outputs = [] for output_name, output in outputs.items(): handled_outputs.append(output_name) if isinstance(output, dict) and "location" in output: handle_known_output(output, output_name, output_name) elif isinstance(output, dict): prefix = "%s|__part__|" % output_name for record_key, record_value in output.items(): record_value_output_key = f"{prefix}{record_key}" if isinstance(record_value, dict) and "class" in record_value: handle_known_output(record_value, record_value_output_key, output_name) else: # param_evaluation_noexpr handle_known_output_json(output, output_name) elif isinstance(output, list): elements = [] for index, el in enumerate(output): if isinstance(el, dict) and el["class"] == "File": output_path = _possible_uri_to_path(el["location"]) elements.append({"name": str(index), "filename": output_path, "created_from_basename": el["basename"]}) else: target_path = "{}____{}".format(output_name, str(index)) with open(target_path, "w") as f: f.write(json.dumps(el)) elements.append({"name": str(index), "filename": target_path, "ext": "expression.json"}) provided_metadata[output_name] = {"elements": elements} else: handle_known_output_json(output, output_name) for output_instance in job_proxy._tool_proxy.output_instances(): output_name = output_instance.name if output_name not in handled_outputs: handle_known_output_json(None, output_name) with open("galaxy.json", "w") as f: json.dump(provided_metadata, f) __all__ = ( 'handle_outputs', )