diff env/lib/python3.7/site-packages/cwltool/validate_js.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/validate_js.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,210 +0,0 @@
-import copy
-import itertools
-import json
-import logging
-from collections import namedtuple
-from typing import (cast, Any, Dict, List, MutableMapping, MutableSequence,
-                    Optional, Tuple, Union)
-
-from pkg_resources import resource_stream
-from ruamel.yaml.comments import CommentedMap  # pylint: disable=unused-import
-from six import string_types
-from typing_extensions import Text  # pylint: disable=unused-import
-from schema_salad import avro
-from schema_salad.sourceline import SourceLine
-from schema_salad.validate import Schema  # pylint: disable=unused-import
-from schema_salad.validate import validate_ex
-
-from .errors import WorkflowException
-from .expression import scanner as scan_expression
-from .expression import SubstitutionError
-from .loghandler import _logger
-from .sandboxjs import code_fragment_to_js, exec_js_process
-from .utils import json_dumps
-
-
-def is_expression(tool, schema):
-    # type: (Any, Optional[Schema]) -> bool
-    return isinstance(schema, avro.schema.EnumSchema) \
-        and schema.name == "Expression" and isinstance(tool, string_types)
-
-class SuppressLog(logging.Filter):
-    def __init__(self, name):  # type: (Text) -> None
-        """Initialize this log suppressor."""
-        name = str(name)
-        super(SuppressLog, self).__init__(name)
-
-    def filter(self, record):  # type: (logging.LogRecord) -> bool
-        return False
-
-
-_logger_validation_warnings = logging.getLogger("cwltool.validation_warnings")
-_logger_validation_warnings.addFilter(SuppressLog("cwltool.validation_warnings"))
-
-def get_expressions(tool,             # type: Union[CommentedMap, Text]
-                    schema,           # type: Optional[avro.schema.Schema]
-                    source_line=None  # type: Optional[SourceLine]
-                   ):  # type: (...) -> List[Tuple[Text, Optional[SourceLine]]]
-    if is_expression(tool, schema):
-        return [(cast(Text, tool), source_line)]
-    elif isinstance(schema, avro.schema.UnionSchema):
-        valid_schema = None
-
-        for possible_schema in schema.schemas:
-            if is_expression(tool, possible_schema):
-                return [(cast(Text, tool), source_line)]
-            elif validate_ex(possible_schema, tool, raise_ex=False,
-                             logger=_logger_validation_warnings):
-                valid_schema = possible_schema
-
-        return get_expressions(tool, valid_schema, source_line)
-    elif isinstance(schema, avro.schema.ArraySchema):
-        if not isinstance(tool, MutableSequence):
-            return []
-
-        return list(itertools.chain(
-            *map(lambda x: get_expressions(x[1], schema.items, SourceLine(tool, x[0])), enumerate(tool))
-        ))
-
-    elif isinstance(schema, avro.schema.RecordSchema):
-        if not isinstance(tool, MutableMapping):
-            return []
-
-        expression_nodes = []
-
-        for schema_field in schema.fields:
-            if schema_field.name in tool:
-                expression_nodes.extend(get_expressions(
-                    tool[schema_field.name],
-                    schema_field.type,
-                    SourceLine(tool, schema_field.name)
-                ))
-
-        return expression_nodes
-    else:
-        return []
-
-
-JSHintJSReturn = namedtuple("jshint_return", ["errors", "globals"])
-
-def jshint_js(js_text,       # type: Text
-              globals=None,  # type: Optional[List[Text]]
-              options=None   # type: Optional[Dict[Text, Union[List[Text], Text, int]]]
-             ):  # type: (...) -> Tuple[List[Text], List[Text]]
-    if globals is None:
-        globals = []
-    if options is None:
-        options = {
-            "includewarnings": [
-                "W117",  # <VARIABLE> not defined
-                "W104", "W119"  # using ES6 features
-            ],
-            "strict": "implied",
-            "esversion": 5
-        }
-
-    with resource_stream(__name__, "jshint/jshint.js") as file:
-        # NOTE: we need a global variable for lodash (which jshint depends on)
-        jshint_functions_text = "var global = this;" + file.read().decode('utf-8')
-
-    with resource_stream(__name__, "jshint/jshint_wrapper.js") as file:
-        # NOTE: we need to assign to ob, as the expression {validateJS: validateJS} as an expression
-        # is interpreted as a block with a label `validateJS`
-        jshint_functions_text += "\n" + file.read().decode('utf-8') + "\nvar ob = {validateJS: validateJS}; ob"
-
-    returncode, stdout, stderr = exec_js_process(
-        "validateJS(%s)" % json_dumps({
-            "code": js_text,
-            "options": options,
-            "globals": globals
-        }),
-        timeout=30,
-        context=jshint_functions_text
-    )
-
-    def dump_jshint_error():
-        # type: () -> None
-        raise RuntimeError("jshint failed to run succesfully\nreturncode: %d\nstdout: \"%s\"\nstderr: \"%s\"" % (
-            returncode,
-            stdout,
-            stderr
-        ))
-
-    if returncode == -1:
-        _logger.warning("jshint process timed out")
-
-    if returncode != 0:
-        dump_jshint_error()
-
-    try:
-        jshint_json = json.loads(stdout)
-    except ValueError:
-        dump_jshint_error()
-
-    jshint_errors = []  # type: List[Text]
-
-    js_text_lines = js_text.split("\n")
-
-    for jshint_error_obj in jshint_json.get("errors", []):
-        text = u"JSHINT: " + js_text_lines[jshint_error_obj["line"] - 1] + "\n"
-        text += u"JSHINT: " + " " * (jshint_error_obj["character"] - 1) + "^\n"
-        text += u"JSHINT: %s: %s" % (jshint_error_obj["code"], jshint_error_obj["reason"])
-        jshint_errors.append(text)
-
-    return JSHintJSReturn(jshint_errors, jshint_json.get("globals", []))
-
-
-def print_js_hint_messages(js_hint_messages, source_line):
-    # type: (List[Text], Optional[SourceLine]) -> None
-    if source_line is not None:
-        for js_hint_message in js_hint_messages:
-            _logger.warning(source_line.makeError(js_hint_message))
-
-def validate_js_expressions(tool,                # type: CommentedMap
-                            schema,              # type: Schema
-                            jshint_options=None  # type: Optional[Dict[Text, Union[List[Text], Text, int]]]
-                           ):  # type: (...) -> None
-
-    if tool.get("requirements") is None:
-        return
-
-    requirements = tool["requirements"]
-
-    default_globals = [u"self", u"inputs", u"runtime", u"console"]
-
-    for prop in reversed(requirements):
-        if prop["class"] == "InlineJavascriptRequirement":
-            expression_lib = prop.get("expressionLib", [])
-            break
-    else:
-        return
-
-    js_globals = copy.deepcopy(default_globals)
-
-    for i, expression_lib_line in enumerate(expression_lib):
-        expression_lib_line_errors, expression_lib_line_globals = jshint_js(expression_lib_line, js_globals, jshint_options)
-        js_globals.extend(expression_lib_line_globals)
-        print_js_hint_messages(expression_lib_line_errors, SourceLine(expression_lib, i))
-
-    expressions = get_expressions(tool, schema)
-
-    for expression, source_line in expressions:
-        unscanned_str = expression.strip()
-        try:
-            scan_slice = scan_expression(unscanned_str)
-        except SubstitutionError as se:
-            if source_line:
-                source_line.raise_type = WorkflowException
-                raise source_line.makeError(str(se))
-            else:
-                raise se
-
-        while scan_slice:
-            if unscanned_str[scan_slice[0]] == '$':
-                code_fragment = unscanned_str[scan_slice[0] + 1:scan_slice[1]]
-                code_fragment_js = code_fragment_to_js(code_fragment, "")
-                expression_errors, _ = jshint_js(code_fragment_js, js_globals, jshint_options)
-                print_js_hint_messages(expression_errors, source_line)
-
-            unscanned_str = unscanned_str[scan_slice[1]:]
-            scan_slice = scan_expression(unscanned_str)