comparison env/lib/python3.9/site-packages/cwltool/validate_js.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import copy
2 import itertools
3 import json
4 import logging
5 from collections import namedtuple
6 from typing import (
7 Any,
8 Dict,
9 List,
10 MutableMapping,
11 MutableSequence,
12 Optional,
13 Tuple,
14 Union,
15 cast,
16 )
17
18 from pkg_resources import resource_stream
19 from ruamel.yaml.comments import CommentedMap
20 from schema_salad.avro.schema import (
21 ArraySchema,
22 EnumSchema,
23 RecordSchema,
24 Schema,
25 UnionSchema,
26 )
27 from schema_salad.sourceline import SourceLine
28 from schema_salad.utils import json_dumps
29 from schema_salad.validate import validate_ex
30
31 from .errors import WorkflowException
32 from .expression import SubstitutionError
33 from .expression import scanner as scan_expression
34 from .loghandler import _logger
35 from .sandboxjs import code_fragment_to_js, exec_js_process
36
37
38 def is_expression(tool, schema):
39 # type: (Any, Optional[Schema]) -> bool
40 return (
41 isinstance(schema, EnumSchema)
42 and schema.name == "Expression"
43 and isinstance(tool, str)
44 )
45
46
47 class SuppressLog(logging.Filter):
48 def __init__(self, name): # type: (str) -> None
49 """Initialize this log suppressor."""
50 name = str(name)
51 super().__init__(name)
52
53 def filter(self, record): # type: (logging.LogRecord) -> bool
54 return False
55
56
57 _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings")
58 _logger_validation_warnings.addFilter(SuppressLog("cwltool.validation_warnings"))
59
60
61 def get_expressions(
62 tool: Union[CommentedMap, str],
63 schema: Optional[Union[Schema, ArraySchema]],
64 source_line: Optional[SourceLine] = None,
65 ) -> List[Tuple[str, Optional[SourceLine]]]:
66 if is_expression(tool, schema):
67 return [(cast(str, tool), source_line)]
68 elif isinstance(schema, UnionSchema):
69 valid_schema = None
70
71 for possible_schema in schema.schemas:
72 if is_expression(tool, possible_schema):
73 return [(cast(str, tool), source_line)]
74 elif validate_ex(
75 possible_schema,
76 tool,
77 raise_ex=False,
78 logger=_logger_validation_warnings,
79 ):
80 valid_schema = possible_schema
81
82 return get_expressions(tool, valid_schema, source_line)
83 elif isinstance(schema, ArraySchema):
84 if not isinstance(tool, MutableSequence):
85 return []
86
87 return list(
88 itertools.chain(
89 *map(
90 lambda x: get_expressions(
91 x[1], schema.items, SourceLine(tool, x[0]) # type: ignore
92 ),
93 enumerate(tool),
94 )
95 )
96 )
97
98 elif isinstance(schema, RecordSchema):
99 if not isinstance(tool, MutableMapping):
100 return []
101
102 expression_nodes = []
103
104 for schema_field in schema.fields:
105 if schema_field.name in tool:
106 expression_nodes.extend(
107 get_expressions(
108 tool[schema_field.name],
109 schema_field.type,
110 SourceLine(tool, schema_field.name),
111 )
112 )
113
114 return expression_nodes
115 else:
116 return []
117
118
119 JSHintJSReturn = namedtuple("JSHintJSReturn", ["errors", "globals"])
120
121
122 def jshint_js(
123 js_text: str,
124 globals: Optional[List[str]] = None,
125 options: Optional[Dict[str, Union[List[str], str, int]]] = None,
126 ) -> JSHintJSReturn:
127 if globals is None:
128 globals = []
129 if options is None:
130 options = {
131 "includewarnings": [
132 "W117", # <VARIABLE> not defined
133 "W104",
134 "W119", # using ES6 features
135 ],
136 "strict": "implied",
137 "esversion": 5,
138 }
139
140 with resource_stream(__name__, "jshint/jshint.js") as res:
141 # NOTE: we need a global variable for lodash (which jshint depends on)
142 jshint_functions_text = "var global = this;" + res.read().decode("utf-8")
143
144 with resource_stream(__name__, "jshint/jshint_wrapper.js") as res2:
145 # NOTE: we need to assign to ob, as the expression {validateJS: validateJS} as an expression
146 # is interpreted as a block with a label `validateJS`
147 jshint_functions_text += (
148 "\n"
149 + res2.read().decode("utf-8")
150 + "\nvar ob = {validateJS: validateJS}; ob"
151 )
152
153 returncode, stdout, stderr = exec_js_process(
154 "validateJS(%s)"
155 % json_dumps({"code": js_text, "options": options, "globals": globals}),
156 timeout=30,
157 context=jshint_functions_text,
158 )
159
160 def dump_jshint_error():
161 # type: () -> None
162 raise RuntimeError(
163 'jshint failed to run succesfully\nreturncode: %d\nstdout: "%s"\nstderr: "%s"'
164 % (returncode, stdout, stderr)
165 )
166
167 if returncode == -1:
168 _logger.warning("jshint process timed out")
169
170 if returncode != 0:
171 dump_jshint_error()
172
173 try:
174 jshint_json = json.loads(stdout)
175 except ValueError:
176 dump_jshint_error()
177
178 jshint_errors = [] # type: List[str]
179
180 js_text_lines = js_text.split("\n")
181
182 for jshint_error_obj in jshint_json.get("errors", []):
183 text = "JSHINT: " + js_text_lines[jshint_error_obj["line"] - 1] + "\n"
184 text += "JSHINT: " + " " * (jshint_error_obj["character"] - 1) + "^\n"
185 text += "JSHINT: {}: {}".format(
186 jshint_error_obj["code"],
187 jshint_error_obj["reason"],
188 )
189 jshint_errors.append(text)
190
191 return JSHintJSReturn(jshint_errors, jshint_json.get("globals", []))
192
193
194 def print_js_hint_messages(
195 js_hint_messages: List[str], source_line: Optional[SourceLine]
196 ) -> None:
197 if source_line is not None:
198 for js_hint_message in js_hint_messages:
199 _logger.warning(source_line.makeError(js_hint_message))
200
201
202 def validate_js_expressions(
203 tool: CommentedMap,
204 schema: Schema,
205 jshint_options: Optional[Dict[str, Union[List[str], str, int]]] = None,
206 ) -> None:
207
208 if tool.get("requirements") is None:
209 return
210
211 requirements = tool["requirements"]
212
213 default_globals = ["self", "inputs", "runtime", "console"]
214
215 for prop in reversed(requirements):
216 if prop["class"] == "InlineJavascriptRequirement":
217 expression_lib = prop.get("expressionLib", [])
218 break
219 else:
220 return
221
222 js_globals = copy.deepcopy(default_globals)
223
224 for i, expression_lib_line in enumerate(expression_lib):
225 expression_lib_line_errors, expression_lib_line_globals = jshint_js(
226 expression_lib_line, js_globals, jshint_options
227 )
228 js_globals.extend(expression_lib_line_globals)
229 print_js_hint_messages(
230 expression_lib_line_errors, SourceLine(expression_lib, i)
231 )
232
233 expressions = get_expressions(tool, schema)
234
235 for expression, source_line in expressions:
236 unscanned_str = expression.strip()
237 try:
238 scan_slice = scan_expression(unscanned_str)
239 except SubstitutionError as se:
240 if source_line:
241 source_line.raise_type = WorkflowException
242 raise source_line.makeError(str(se))
243 else:
244 raise se
245
246 while scan_slice:
247 if unscanned_str[scan_slice[0]] == "$":
248 code_fragment = unscanned_str[scan_slice[0] + 1 : scan_slice[1]]
249 code_fragment_js = code_fragment_to_js(code_fragment, "")
250 expression_errors, _ = jshint_js(
251 code_fragment_js, js_globals, jshint_options
252 )
253 print_js_hint_messages(expression_errors, source_line)
254
255 unscanned_str = unscanned_str[scan_slice[1] :]
256 scan_slice = scan_expression(unscanned_str)