view env/lib/python3.9/site-packages/cwltool/validate_js.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

import copy
import itertools
import json
import logging
from collections import namedtuple
from typing import (
    Any,
    Dict,
    List,
    MutableMapping,
    MutableSequence,
    Optional,
    Tuple,
    Union,
    cast,
)

from pkg_resources import resource_stream
from ruamel.yaml.comments import CommentedMap
from schema_salad.avro.schema import (
    ArraySchema,
    EnumSchema,
    RecordSchema,
    Schema,
    UnionSchema,
)
from schema_salad.sourceline import SourceLine
from schema_salad.utils import json_dumps
from schema_salad.validate import validate_ex

from .errors import WorkflowException
from .expression import SubstitutionError
from .expression import scanner as scan_expression
from .loghandler import _logger
from .sandboxjs import code_fragment_to_js, exec_js_process


def is_expression(tool, schema):
    # type: (Any, Optional[Schema]) -> bool
    return (
        isinstance(schema, EnumSchema)
        and schema.name == "Expression"
        and isinstance(tool, str)
    )


class SuppressLog(logging.Filter):
    def __init__(self, name):  # type: (str) -> None
        """Initialize this log suppressor."""
        name = str(name)
        super().__init__(name)

    def filter(self, record):  # type: (logging.LogRecord) -> bool
        return False


_logger_validation_warnings = logging.getLogger("cwltool.validation_warnings")
_logger_validation_warnings.addFilter(SuppressLog("cwltool.validation_warnings"))


def get_expressions(
    tool: Union[CommentedMap, str],
    schema: Optional[Union[Schema, ArraySchema]],
    source_line: Optional[SourceLine] = None,
) -> List[Tuple[str, Optional[SourceLine]]]:
    if is_expression(tool, schema):
        return [(cast(str, tool), source_line)]
    elif isinstance(schema, UnionSchema):
        valid_schema = None

        for possible_schema in schema.schemas:
            if is_expression(tool, possible_schema):
                return [(cast(str, tool), source_line)]
            elif validate_ex(
                possible_schema,
                tool,
                raise_ex=False,
                logger=_logger_validation_warnings,
            ):
                valid_schema = possible_schema

        return get_expressions(tool, valid_schema, source_line)
    elif isinstance(schema, ArraySchema):
        if not isinstance(tool, MutableSequence):
            return []

        return list(
            itertools.chain(
                *map(
                    lambda x: get_expressions(
                        x[1], schema.items, SourceLine(tool, x[0])  # type: ignore
                    ),
                    enumerate(tool),
                )
            )
        )

    elif isinstance(schema, RecordSchema):
        if not isinstance(tool, MutableMapping):
            return []

        expression_nodes = []

        for schema_field in schema.fields:
            if schema_field.name in tool:
                expression_nodes.extend(
                    get_expressions(
                        tool[schema_field.name],
                        schema_field.type,
                        SourceLine(tool, schema_field.name),
                    )
                )

        return expression_nodes
    else:
        return []


JSHintJSReturn = namedtuple("JSHintJSReturn", ["errors", "globals"])


def jshint_js(
    js_text: str,
    globals: Optional[List[str]] = None,
    options: Optional[Dict[str, Union[List[str], str, int]]] = None,
) -> JSHintJSReturn:
    if globals is None:
        globals = []
    if options is None:
        options = {
            "includewarnings": [
                "W117",  # <VARIABLE> not defined
                "W104",
                "W119",  # using ES6 features
            ],
            "strict": "implied",
            "esversion": 5,
        }

    with resource_stream(__name__, "jshint/jshint.js") as res:
        # NOTE: we need a global variable for lodash (which jshint depends on)
        jshint_functions_text = "var global = this;" + res.read().decode("utf-8")

    with resource_stream(__name__, "jshint/jshint_wrapper.js") as res2:
        # NOTE: we need to assign to ob, as the expression {validateJS: validateJS} as an expression
        # is interpreted as a block with a label `validateJS`
        jshint_functions_text += (
            "\n"
            + res2.read().decode("utf-8")
            + "\nvar ob = {validateJS: validateJS}; ob"
        )

    returncode, stdout, stderr = exec_js_process(
        "validateJS(%s)"
        % json_dumps({"code": js_text, "options": options, "globals": globals}),
        timeout=30,
        context=jshint_functions_text,
    )

    def dump_jshint_error():
        # type: () -> None
        raise RuntimeError(
            'jshint failed to run succesfully\nreturncode: %d\nstdout: "%s"\nstderr: "%s"'
            % (returncode, stdout, stderr)
        )

    if returncode == -1:
        _logger.warning("jshint process timed out")

    if returncode != 0:
        dump_jshint_error()

    try:
        jshint_json = json.loads(stdout)
    except ValueError:
        dump_jshint_error()

    jshint_errors = []  # type: List[str]

    js_text_lines = js_text.split("\n")

    for jshint_error_obj in jshint_json.get("errors", []):
        text = "JSHINT: " + js_text_lines[jshint_error_obj["line"] - 1] + "\n"
        text += "JSHINT: " + " " * (jshint_error_obj["character"] - 1) + "^\n"
        text += "JSHINT: {}: {}".format(
            jshint_error_obj["code"],
            jshint_error_obj["reason"],
        )
        jshint_errors.append(text)

    return JSHintJSReturn(jshint_errors, jshint_json.get("globals", []))


def print_js_hint_messages(
    js_hint_messages: List[str], source_line: Optional[SourceLine]
) -> None:
    if source_line is not None:
        for js_hint_message in js_hint_messages:
            _logger.warning(source_line.makeError(js_hint_message))


def validate_js_expressions(
    tool: CommentedMap,
    schema: Schema,
    jshint_options: Optional[Dict[str, Union[List[str], str, int]]] = None,
) -> None:

    if tool.get("requirements") is None:
        return

    requirements = tool["requirements"]

    default_globals = ["self", "inputs", "runtime", "console"]

    for prop in reversed(requirements):
        if prop["class"] == "InlineJavascriptRequirement":
            expression_lib = prop.get("expressionLib", [])
            break
    else:
        return

    js_globals = copy.deepcopy(default_globals)

    for i, expression_lib_line in enumerate(expression_lib):
        expression_lib_line_errors, expression_lib_line_globals = jshint_js(
            expression_lib_line, js_globals, jshint_options
        )
        js_globals.extend(expression_lib_line_globals)
        print_js_hint_messages(
            expression_lib_line_errors, SourceLine(expression_lib, i)
        )

    expressions = get_expressions(tool, schema)

    for expression, source_line in expressions:
        unscanned_str = expression.strip()
        try:
            scan_slice = scan_expression(unscanned_str)
        except SubstitutionError as se:
            if source_line:
                source_line.raise_type = WorkflowException
                raise source_line.makeError(str(se))
            else:
                raise se

        while scan_slice:
            if unscanned_str[scan_slice[0]] == "$":
                code_fragment = unscanned_str[scan_slice[0] + 1 : scan_slice[1]]
                code_fragment_js = code_fragment_to_js(code_fragment, "")
                expression_errors, _ = jshint_js(
                    code_fragment_js, js_globals, jshint_options
                )
                print_js_hint_messages(expression_errors, source_line)

            unscanned_str = unscanned_str[scan_slice[1] :]
            scan_slice = scan_expression(unscanned_str)