diff env/lib/python3.9/site-packages/galaxy/tool_util/loader_directory.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/galaxy/tool_util/loader_directory.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,319 @@
+"""Utilities for loading and reasoning about unparsed tools in directories."""
+import fnmatch
+import glob
+import logging
+import os
+import re
+import sys
+
+import yaml
+
+from galaxy.tool_util import loader
+from galaxy.tool_util.parser import get_tool_source
+from galaxy.util import checkers
+
+log = logging.getLogger(__name__)
+
+PATH_DOES_NOT_EXIST_ERROR = "Could not load tools from path [%s] - this path does not exist."
+PATH_AND_RECURSIVE_ERROR = "Cannot specify a single file and recursive."
+LOAD_FAILURE_ERROR = "Failed to load tool with path %s."
+TOOL_LOAD_ERROR = object()
+TOOL_REGEX = re.compile(r"<tool\s")
+DATA_MANAGER_REGEX = re.compile(r"\stool_type=\"manage_data\"")
+
+YAML_EXTENSIONS = [".yaml", ".yml", ".json"]
+CWL_EXTENSIONS = YAML_EXTENSIONS + [".cwl"]
+EXCLUDE_WALK_DIRS = ['.hg', '.git', '.venv']
+
+
+def load_exception_handler(path, exc_info):
+    """Default exception handler for use by load_tool_elements_from_path."""
+    log.warning(LOAD_FAILURE_ERROR % path, exc_info=exc_info)
+
+
+def find_possible_tools_from_path(
+    path,
+    recursive=False,
+    enable_beta_formats=False,
+):
+    """Walk a directory and find potential tool files."""
+    possible_tool_files = []
+    for possible_tool_file in _find_tool_files(
+        path, recursive=recursive,
+        enable_beta_formats=enable_beta_formats
+    ):
+        try:
+            does_look_like_a_tool = looks_like_a_tool(
+                possible_tool_file,
+                enable_beta_formats=enable_beta_formats
+            )
+        except OSError:
+            # Some problem reading the tool file, skip.
+            continue
+
+        if does_look_like_a_tool:
+            possible_tool_files.append(possible_tool_file)
+
+    return possible_tool_files
+
+
+def load_tool_sources_from_path(
+    path,
+    load_exception_handler=load_exception_handler,
+    recursive=False,
+    register_load_errors=False,
+):
+    """Walk a directory and ToolSource objects."""
+    return _load_tools_from_path(
+        path,
+        load_exception_handler=load_exception_handler,
+        recursive=recursive,
+        register_load_errors=register_load_errors,
+        loader_func=get_tool_source,
+        enable_beta_formats=True,
+    )
+
+
+def load_tool_elements_from_path(
+    path,
+    load_exception_handler=load_exception_handler,
+    recursive=False,
+    register_load_errors=False,
+):
+    """Walk a directory and load tool XML elements."""
+    return _load_tools_from_path(
+        path,
+        load_exception_handler=load_exception_handler,
+        recursive=recursive,
+        register_load_errors=register_load_errors,
+        loader_func=loader.load_tool,
+        enable_beta_formats=False,
+    )
+
+
+def _load_tools_from_path(
+    path,
+    load_exception_handler,
+    recursive,
+    register_load_errors,
+    loader_func,
+    enable_beta_formats,
+):
+    loaded_objects = []
+    for possible_tool_file in find_possible_tools_from_path(
+        path,
+        recursive=recursive,
+        enable_beta_formats=enable_beta_formats,
+    ):
+        try:
+            tool_element = loader_func(possible_tool_file)
+            loaded_objects.append((possible_tool_file, tool_element))
+        except Exception:
+            exc_info = sys.exc_info()
+            load_exception_handler(possible_tool_file, exc_info)
+            if register_load_errors:
+                loaded_objects.append((possible_tool_file, TOOL_LOAD_ERROR))
+    return loaded_objects
+
+
+def is_tool_load_error(obj):
+    """Predicate to determine if object loaded for tool is a tool error."""
+    return obj is TOOL_LOAD_ERROR
+
+
+def looks_like_a_tool(path_or_uri_like, invalid_names=None, enable_beta_formats=False):
+    """Quick check to see if a file looks like it may be a tool file.
+
+    Whether true in a strict sense or not, lets say the intention and
+    purpose of this procedure is to serve as a filter - all valid tools must
+    "looks_like_a_tool" but not everything that looks like a tool is actually
+    a valid tool.
+
+    invalid_names may be supplied in the context of the tool shed to quickly
+    rule common tool shed XML files.
+    """
+    invalid_names = invalid_names or []
+    path = resolved_path(path_or_uri_like)
+    if path is UNRESOLVED_URI:
+        # Assume the path maps to a real tool.
+        return True
+
+    looks = False
+
+    if os.path.basename(path) in invalid_names:
+        return False
+
+    if looks_like_a_tool_xml(path):
+        looks = True
+
+    if not looks and enable_beta_formats:
+        for tool_checker in BETA_TOOL_CHECKERS.values():
+            if tool_checker(path):
+                looks = True
+                break
+
+    return looks
+
+
+def looks_like_xml(path, regex=TOOL_REGEX):
+    full_path = os.path.abspath(path)
+
+    if not full_path.endswith(".xml"):
+        return False
+
+    if not os.path.getsize(full_path):
+        return False
+
+    if(checkers.check_binary(full_path) or
+       checkers.check_image(full_path) or
+       checkers.is_gzip(full_path) or
+       checkers.is_bz2(full_path) or
+       checkers.is_zip(full_path)):
+        return False
+
+    with open(path, encoding='utf-8') as f:
+        try:
+            start_contents = f.read(5 * 1024)
+        except UnicodeDecodeError:
+            return False
+        if regex.search(start_contents):
+            return True
+
+    return False
+
+
+def looks_like_a_tool_xml(path):
+    """Quick check to see if a file looks like it may be a Galaxy XML tool file."""
+    return looks_like_xml(path=path, regex=TOOL_REGEX)
+
+
+def looks_like_a_data_manager_xml(path):
+    """Quick check to see if a file looks like it may be a Galaxy data manager XML file."""
+    return looks_like_xml(path=path, regex=DATA_MANAGER_REGEX)
+
+
+def is_a_yaml_with_class(path, classes):
+    """Determine if a file is a valid YAML with a supplied ``class`` entry."""
+    if not _has_extension(path, YAML_EXTENSIONS):
+        return False
+
+    with open(path) as f:
+        try:
+            as_dict = yaml.safe_load(f)
+        except Exception:
+            return False
+
+    if not isinstance(as_dict, dict):
+        return False
+
+    file_class = as_dict.get("class", None)
+    return file_class in classes
+
+
+def looks_like_a_tool_yaml(path):
+    """Quick check to see if a file looks like it may be a Galaxy YAML tool file."""
+    return is_a_yaml_with_class(path, ["GalaxyTool"])
+
+
+def looks_like_a_cwl_artifact(path, classes=None):
+    """Quick check to see if a file looks like it may be a CWL artifact."""
+    if not _has_extension(path, CWL_EXTENSIONS):
+        return False
+
+    with open(path) as f:
+        try:
+            as_dict = yaml.safe_load(f)
+        except Exception:
+            return False
+
+    if not isinstance(as_dict, dict):
+        return False
+
+    file_class = as_dict.get("class", None)
+    if classes is not None and file_class not in classes:
+        return False
+
+    file_cwl_version = as_dict.get("cwlVersion", None)
+    return file_cwl_version is not None
+
+
+def looks_like_a_tool_cwl(path):
+    """Quick check to see if a file looks like it may be a CWL tool."""
+    return looks_like_a_cwl_artifact(path, classes=["CommandLineTool", "ExpressionTool"])
+
+
+def _find_tool_files(path_or_uri_like, recursive, enable_beta_formats):
+    path = resolved_path(path_or_uri_like)
+    if path is UNRESOLVED_URI:
+        # Pass the URI through and assume it maps to a real tool.
+        return [path_or_uri_like]
+
+    is_file = not os.path.isdir(path)
+    if not os.path.exists(path):
+        raise Exception(PATH_DOES_NOT_EXIST_ERROR)
+    elif is_file and recursive:
+        raise Exception(PATH_AND_RECURSIVE_ERROR)
+    elif is_file:
+        return [os.path.abspath(path)]
+    else:
+        if enable_beta_formats:
+            if not recursive:
+                files = glob.glob(path + "/*")
+            else:
+                files = _find_files(path, "*")
+        else:
+            if not recursive:
+                files = glob.glob(path + "/*.xml")
+            else:
+                files = _find_files(path, "*.xml")
+        return [os.path.abspath(_) for _ in files]
+
+
+def _has_extension(path, extensions):
+    return any(path.endswith(e) for e in extensions)
+
+
+def _find_files(directory, pattern='*'):
+    if not os.path.exists(directory):
+        raise ValueError(f"Directory not found {directory}")
+
+    matches = []
+    for root, dirnames, filenames in os.walk(directory):
+        # exclude some directories (like .hg) from traversing
+        dirnames[:] = [dir for dir in dirnames if dir not in EXCLUDE_WALK_DIRS]
+        for filename in filenames:
+            full_path = os.path.join(root, filename)
+            if fnmatch.filter([full_path], pattern):
+                matches.append(os.path.join(root, filename))
+    return matches
+
+
+UNRESOLVED_URI = object()
+
+
+def resolved_path(path_or_uri_like):
+    """If this is a simple file path, return the path else UNRESOLVED_URI."""
+    if "://" not in path_or_uri_like:
+        return path_or_uri_like
+    elif path_or_uri_like.startswith("file://"):
+        return path_or_uri_like[len("file://"):]
+    else:
+        return UNRESOLVED_URI
+
+
+BETA_TOOL_CHECKERS = {
+    'yaml': looks_like_a_tool_yaml,
+    'cwl': looks_like_a_tool_cwl,
+}
+
+__all__ = (
+    "find_possible_tools_from_path",
+    "is_a_yaml_with_class",
+    "is_tool_load_error",
+    "load_tool_elements_from_path",
+    "load_tool_sources_from_path",
+    "looks_like_a_cwl_artifact",
+    "looks_like_a_tool_cwl",
+    "looks_like_a_tool_xml",
+    "looks_like_a_tool_yaml",
+)