view env/lib/python3.9/site-packages/cwltool/sandboxjs.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""Evaluate CWL Javascript Expressions in a sandbox."""

import errno
import json
import os
import queue
import re
import select
import subprocess  # nosec
import sys
import threading
from io import BytesIO
from typing import List, Optional, Tuple, cast

from pkg_resources import resource_stream
from schema_salad.utils import json_dumps

from .loghandler import _logger
from .utils import CWLOutputType, onWindows, processes_to_kill


class JavascriptException(Exception):
    pass


localdata = threading.local()

default_timeout = 20
have_node_slim = False
# minimum acceptable version of nodejs engine
minimum_node_version_str = "0.10.26"


def check_js_threshold_version(working_alias: str) -> bool:
    """
    Check if the nodeJS engine version on the system with the allowed minimum version.

    https://github.com/nodejs/node/blob/master/CHANGELOG.md#nodejs-changelog
    """
    # parse nodejs version into int Tuple: 'v4.2.6\n' -> [4, 2, 6]
    current_version_str = subprocess.check_output(  # nosec
        [working_alias, "-v"], universal_newlines=True
    )

    current_version = [
        int(v) for v in current_version_str.strip().strip("v").split(".")
    ]
    minimum_node_version = [int(v) for v in minimum_node_version_str.split(".")]

    return current_version >= minimum_node_version


def new_js_proc(js_text: str, force_docker_pull: bool = False):
    # type: (...) -> subprocess.Popen[str]

    required_node_version, docker = (False,) * 2
    nodejs = None  # type: Optional[subprocess.Popen[str]]
    trynodes = ("nodejs", "node")
    for n in trynodes:
        try:
            if (
                subprocess.check_output(  # nosec
                    [n, "--eval", "process.stdout.write('t')"], universal_newlines=True
                )
                != "t"
            ):
                continue
            else:
                nodejs = subprocess.Popen(  # nosec
                    [n, "--eval", js_text],
                    stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    universal_newlines=True,
                )
                processes_to_kill.append(nodejs)
                required_node_version = check_js_threshold_version(n)
                break
        except (subprocess.CalledProcessError, OSError):
            pass

    if nodejs is None or nodejs is not None and required_node_version is False:
        try:
            nodeimg = "node:slim"
            global have_node_slim

            if not have_node_slim:
                dockerimgs = subprocess.check_output(  # nosec
                    ["docker", "images", "-q", nodeimg], universal_newlines=True
                )
                # if output is an empty string
                if (len(dockerimgs.split("\n")) <= 1) or force_docker_pull:
                    # pull node:slim docker container
                    nodejsimg = subprocess.check_output(  # nosec
                        ["docker", "pull", nodeimg], universal_newlines=True
                    )
                    _logger.info("Pulled Docker image %s %s", nodeimg, nodejsimg)
                have_node_slim = True
            nodejs = subprocess.Popen(  # nosec
                [
                    "docker",
                    "run",
                    "--attach=STDIN",
                    "--attach=STDOUT",
                    "--attach=STDERR",
                    "--sig-proxy=true",
                    "--interactive",
                    "--rm",
                    nodeimg,
                    "node",
                    "--eval",
                    js_text,
                ],
                universal_newlines=True,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
            processes_to_kill.append(nodejs)
            docker = True
        except OSError as e:
            if e.errno == errno.ENOENT:
                pass
            else:
                raise
        except subprocess.CalledProcessError:
            pass

    # docker failed and nodejs not on system
    if nodejs is None:
        raise JavascriptException(
            "cwltool requires Node.js engine to evaluate and validate "
            "Javascript expressions, but couldn't find it.  Tried {}, "
            "docker run node:slim".format(", ".join(trynodes))
        )

    # docker failed, but nodejs is installed on system but the version is below the required version
    if docker is False and required_node_version is False:
        raise JavascriptException(
            "cwltool requires minimum v{} version of Node.js engine.".format(
                minimum_node_version_str
            ),
            "Try updating: https://docs.npmjs.com/getting-started/installing-node",
        )

    return nodejs


PROCESS_FINISHED_STR = "r1cepzbhUTxtykz5XTC4\n"


def exec_js_process(
    js_text: str,
    timeout: float = default_timeout,
    js_console: bool = False,
    context: Optional[str] = None,
    force_docker_pull: bool = False,
) -> Tuple[int, str, str]:

    if not hasattr(localdata, "procs"):
        localdata.procs = {}

    if js_console and context is not None:
        raise NotImplementedError("js_console=True and context not implemented")

    if js_console:
        js_engine = "cwlNodeEngineJSConsole.js"
        _logger.warning(
            "Running with support for javascript console in expressions (DO NOT USE IN PRODUCTION)"
        )
    elif context is not None:
        js_engine = "cwlNodeEngineWithContext.js"
    else:
        js_engine = "cwlNodeEngine.js"

    created_new_process = False

    if context is not None:
        nodejs = localdata.procs.get((js_engine, context))
    else:
        nodejs = localdata.procs.get(js_engine)

    if nodejs is None or nodejs.poll() is not None or onWindows():
        res = resource_stream(__name__, js_engine)
        js_engine_code = res.read().decode("utf-8")

        created_new_process = True

        new_proc = new_js_proc(js_engine_code, force_docker_pull=force_docker_pull)

        if context is None:
            localdata.procs[js_engine] = new_proc
            nodejs = new_proc
        else:
            localdata.procs[(js_engine, context)] = new_proc
            nodejs = new_proc

    killed = []

    def terminate() -> None:
        """Kill the node process if it exceeds timeout limit."""
        try:
            killed.append(True)
            nodejs.kill()
        except OSError:
            pass

    timer = threading.Timer(timeout, terminate)
    timer.daemon = True
    timer.start()

    stdin_text = ""
    if created_new_process and context is not None:
        stdin_text = json_dumps(context) + "\n"
    stdin_text += json_dumps(js_text) + "\n"

    stdin_buf = BytesIO(stdin_text.encode("utf-8"))
    stdout_buf = BytesIO()
    stderr_buf = BytesIO()

    rselect = [nodejs.stdout, nodejs.stderr]  # type: List[BytesIO]
    wselect = [nodejs.stdin]  # type: List[BytesIO]

    def process_finished() -> bool:
        return stdout_buf.getvalue().decode("utf-8").endswith(
            PROCESS_FINISHED_STR
        ) and stderr_buf.getvalue().decode("utf-8").endswith(PROCESS_FINISHED_STR)

    # On windows system standard input/output are not handled properly by select module
    # (modules like  pywin32, msvcrt, gevent don't work either)
    if sys.platform == "win32":
        READ_BYTES_SIZE = 512

        # creating queue for reading from a thread to queue
        input_queue = queue.Queue()
        output_queue = queue.Queue()
        error_queue = queue.Queue()

        # To tell threads that output has ended and threads can safely exit
        no_more_output = threading.Lock()
        no_more_output.acquire()
        no_more_error = threading.Lock()
        no_more_error.acquire()

        # put constructed command to input queue which then will be passed to nodejs's stdin
        def put_input(input_queue):
            while True:
                buf = stdin_buf.read(READ_BYTES_SIZE)
                if buf:
                    input_queue.put(buf)
                else:
                    break

        # get the output from nodejs's stdout and continue till output ends
        def get_output(output_queue):
            while not no_more_output.acquire(False):
                buf = os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE)
                if buf:
                    output_queue.put(buf)

        # get the output from nodejs's stderr and continue till error output ends
        def get_error(error_queue):
            while not no_more_error.acquire(False):
                buf = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE)
                if buf:
                    error_queue.put(buf)

        # Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively
        input_thread = threading.Thread(target=put_input, args=(input_queue,))
        input_thread.daemon = True
        input_thread.start()
        output_thread = threading.Thread(target=get_output, args=(output_queue,))
        output_thread.daemon = True
        output_thread.start()
        error_thread = threading.Thread(target=get_error, args=(error_queue,))
        error_thread.daemon = True
        error_thread.start()

        finished = False

        while not finished and timer.is_alive():
            try:
                if nodejs.stdin in wselect:
                    if not input_queue.empty():
                        os.write(nodejs.stdin.fileno(), input_queue.get())
                    elif not input_thread.is_alive():
                        wselect = []
                if nodejs.stdout in rselect:
                    if not output_queue.empty():
                        stdout_buf.write(output_queue.get())

                if nodejs.stderr in rselect:
                    if not error_queue.empty():
                        stderr_buf.write(error_queue.get())

                if process_finished() and error_queue.empty() and output_queue.empty():
                    finished = True
                    no_more_output.release()
                    no_more_error.release()
            except OSError:
                break

    else:
        while not process_finished() and timer.is_alive():
            rready, wready, _ = select.select(rselect, wselect, [])
            try:
                if nodejs.stdin in wready:
                    buf = stdin_buf.read(select.PIPE_BUF)
                    if buf:
                        os.write(nodejs.stdin.fileno(), buf)
                for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
                    if pipes[0] in rready:
                        buf = os.read(pipes[0].fileno(), select.PIPE_BUF)
                        if buf:
                            pipes[1].write(buf)
            except OSError:
                break
    timer.cancel()

    stdin_buf.close()
    stdoutdata = stdout_buf.getvalue()[: -len(PROCESS_FINISHED_STR) - 1]
    stderrdata = stderr_buf.getvalue()[: -len(PROCESS_FINISHED_STR) - 1]

    nodejs.poll()

    if nodejs.poll() not in (None, 0):
        if killed:
            returncode = -1
        else:
            returncode = nodejs.returncode
    else:
        returncode = 0
        # On windows currently a new instance of nodejs process is used due to
        # problem with blocking on read operation on windows
        if onWindows():
            nodejs.kill()

    return returncode, stdoutdata.decode("utf-8"), stderrdata.decode("utf-8")


def code_fragment_to_js(jscript: str, jslib: str = "") -> str:
    if isinstance(jscript, str) and len(jscript) > 1 and jscript[0] == "{":
        inner_js = jscript
    else:
        inner_js = "{return (%s);}" % jscript

    return f'"use strict";\n{jslib}\n(function(){inner_js})()'


def execjs(
    js: str,
    jslib: str,
    timeout: float,
    force_docker_pull: bool = False,
    debug: bool = False,
    js_console: bool = False,
) -> CWLOutputType:

    fn = code_fragment_to_js(js, jslib)

    returncode, stdout, stderr = exec_js_process(
        fn, timeout, js_console=js_console, force_docker_pull=force_docker_pull
    )

    if js_console:
        if stderr is not None:
            _logger.info("Javascript console output:")
            _logger.info("----------------------------------------")
            _logger.info(
                "\n".join(
                    re.findall(r"^[[](?:log|err)[]].*$", stderr, flags=re.MULTILINE)
                )
            )
            _logger.info("----------------------------------------")

    def stdfmt(data: str) -> str:
        if "\n" in data:
            return "\n" + data.strip()
        return data

    def fn_linenum() -> str:
        lines = fn.splitlines()
        ofs = 0
        maxlines = 99
        if len(lines) > maxlines:
            ofs = len(lines) - maxlines
            lines = lines[-maxlines:]
        return "\n".join("%02i %s" % (i + ofs + 1, b) for i, b in enumerate(lines))

    if returncode != 0:
        if debug:
            info = (
                "returncode was: %s\nscript was:\n%s\nstdout was: %s\nstderr was: %s\n"
                % (returncode, fn_linenum(), stdfmt(stdout), stdfmt(stderr))
            )
        else:
            info = (
                "Javascript expression was: {}\nstdout was: {}\nstderr was: {}".format(
                    js,
                    stdfmt(stdout),
                    stdfmt(stderr),
                )
            )

        if returncode == -1:
            raise JavascriptException(
                f"Long-running script killed after {timeout} seconds: {info}"
            )
        else:
            raise JavascriptException(info)

    try:
        return cast(CWLOutputType, json.loads(stdout))
    except ValueError as err:
        raise JavascriptException(
            "{}\nscript was:\n{}\nstdout was: '{}'\nstderr was: '{}'\n".format(
                err, fn_linenum(), stdout, stderr
            )
        ) from err