Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/tests/test_pack.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
import json import os import tempfile from collections.abc import Sized from functools import partial from io import StringIO from pathlib import Path from typing import Dict import pytest from ruamel import yaml import cwltool.pack import cwltool.workflow from cwltool.context import LoadingContext from cwltool.load_tool import fetch_document, resolve_and_validate_document from cwltool.main import main, make_relative, print_pack from cwltool.resolver import tool_resolver from cwltool.utils import adjustDirObjs, adjustFileObjs from .util import get_data, needs_docker def test_pack() -> None: loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/revsort.cwl")) with open(get_data("tests/wf/expect_packed.cwl")) as packed_file: expect_packed = yaml.main.safe_load(packed_file) packed = cwltool.pack.pack(loadingContext, uri) adjustFileObjs( packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))) ) adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))) assert "$schemas" in packed packed_schemas = packed["$schemas"] assert isinstance(packed_schemas, Sized) assert len(packed_schemas) == len(expect_packed["$schemas"]) del packed["$schemas"] del expect_packed["$schemas"] assert packed == expect_packed def test_pack_input_named_name() -> None: loadingContext, workflowobj, uri = fetch_document( get_data("tests/wf/trick_revsort.cwl") ) loadingContext.do_update = False loadingContext, uri = resolve_and_validate_document( loadingContext, workflowobj, uri ) loader = loadingContext.loader assert loader loader.resolve_ref(uri)[0] with open(get_data("tests/wf/expect_trick_packed.cwl")) as packed_file: expect_packed = yaml.main.round_trip_load(packed_file) packed = cwltool.pack.pack(loadingContext, uri) adjustFileObjs( packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))) ) adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))) assert "$schemas" in packed packed_schemas = packed["$schemas"] assert isinstance(packed_schemas, Sized) assert len(packed_schemas) == len(expect_packed["$schemas"]) del packed["$schemas"] del expect_packed["$schemas"] assert packed == expect_packed def test_pack_single_tool() -> None: loadingContext, workflowobj, uri = fetch_document( get_data("tests/wf/formattest.cwl") ) loadingContext.do_update = False loadingContext, uri = resolve_and_validate_document( loadingContext, workflowobj, uri ) loader = loadingContext.loader assert loader loader.resolve_ref(uri)[0] packed = cwltool.pack.pack(loadingContext, uri) assert "$schemas" in packed def test_pack_fragment() -> None: with open(get_data("tests/wf/scatter2_subwf.cwl")) as packed_file: expect_packed = yaml.main.safe_load(packed_file) loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/scatter2.cwl")) packed = cwltool.pack.pack(loadingContext, uri + "#scatterstep/mysub") adjustFileObjs( packed, partial(make_relative, os.path.abspath(get_data("tests/wf"))) ) adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))) assert json.dumps(packed, sort_keys=True, indent=2) == json.dumps( expect_packed, sort_keys=True, indent=2 ) def test_pack_rewrites() -> None: rewrites = {} # type: Dict[str, str] loadingContext, workflowobj, uri = fetch_document( get_data("tests/wf/default-wf5.cwl") ) loadingContext.do_update = False loadingContext, uri = resolve_and_validate_document( loadingContext, workflowobj, uri ) loader = loadingContext.loader assert loader loader.resolve_ref(uri)[0] cwltool.pack.pack( loadingContext, uri, rewrite_out=rewrites, ) assert len(rewrites) == 6 cwl_missing_version_paths = [ "tests/wf/hello_single_tool.cwl", "tests/wf/hello-workflow.cwl", ] @pytest.mark.parametrize("cwl_path", cwl_missing_version_paths) def test_pack_missing_cwlVersion(cwl_path: str) -> None: """Ensure the generated pack output is not missing the `cwlVersion` in case of single tool workflow and single step workflow.""" # Testing single tool workflow loadingContext, workflowobj, uri = fetch_document(get_data(cwl_path)) loadingContext.do_update = False loadingContext, uri = resolve_and_validate_document( loadingContext, workflowobj, uri ) loader = loadingContext.loader assert loader loader.resolve_ref(uri)[0] # generate pack output dict packed = json.loads(print_pack(loadingContext, uri)) assert packed["cwlVersion"] == "v1.0" def test_pack_idempotence_tool(tmp_path: Path) -> None: """Ensure that pack produces exactly the same document for an already packed CommandLineTool.""" _pack_idempotently("tests/wf/hello_single_tool.cwl", tmp_path) def test_pack_idempotence_workflow(tmp_path: Path) -> None: """Ensure that pack produces exactly the same document for an already packed workflow.""" _pack_idempotently("tests/wf/count-lines1-wf.cwl", tmp_path) def _pack_idempotently(document: str, tmp_path: Path) -> None: loadingContext, workflowobj, uri = fetch_document(get_data(document)) loadingContext.do_update = False loadingContext, uri = resolve_and_validate_document( loadingContext, workflowobj, uri ) loader = loadingContext.loader assert loader loader.resolve_ref(uri)[0] # generate pack output dict packed_text = print_pack(loadingContext, uri) packed = json.loads(packed_text) tmp_name = tmp_path / "packed.cwl" tmp = tmp_name.open(mode="w") tmp.write(packed_text) tmp.flush() tmp.close() loadingContext, workflowobj, uri2 = fetch_document(tmp.name) loadingContext.do_update = False loadingContext, uri2 = resolve_and_validate_document( loadingContext, workflowobj, uri2 ) loader2 = loadingContext.loader assert loader2 loader2.resolve_ref(uri2)[0] # generate pack output dict packed_text = print_pack(loadingContext, uri2) double_packed = json.loads(packed_text) assert uri != uri2 assert packed == double_packed cwl_to_run = [ ("tests/wf/count-lines1-wf.cwl", "tests/wf/wc-job.json", False), ("tests/wf/formattest.cwl", "tests/wf/formattest-job.json", True), ] @needs_docker @pytest.mark.parametrize("wf_path,job_path,namespaced", cwl_to_run) def test_packed_workflow_execution( wf_path: str, job_path: str, namespaced: bool, tmp_path: Path ) -> None: loadingContext = LoadingContext() loadingContext.resolver = tool_resolver loadingContext, workflowobj, uri = fetch_document(get_data(wf_path), loadingContext) loadingContext.do_update = False loadingContext, uri = resolve_and_validate_document( loadingContext, workflowobj, uri ) loader = loadingContext.loader assert loader loader.resolve_ref(uri)[0] packed = json.loads(print_pack(loadingContext, uri)) assert not namespaced or "$namespaces" in packed wf_packed_handle, wf_packed_path = tempfile.mkstemp() with open(wf_packed_path, "w") as temp_file: json.dump(packed, temp_file) normal_output = StringIO() packed_output = StringIO() normal_params = ["--outdir", str(tmp_path), get_data(wf_path), get_data(job_path)] packed_params = [ "--outdir", str(tmp_path), "--debug", wf_packed_path, get_data(job_path), ] assert main(normal_params, stdout=normal_output) == 0 assert main(packed_params, stdout=packed_output) == 0 assert json.loads(packed_output.getvalue()) == json.loads(normal_output.getvalue()) os.close(wf_packed_handle) os.remove(wf_packed_path)