diff env/lib/python3.9/site-packages/planemo/shed_lint.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/planemo/shed_lint.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,361 @@
+"""Logic related to linting shed repositories."""
+from __future__ import absolute_import
+
+import os
+import xml.etree.ElementTree as ET
+
+import yaml
+from galaxy.tool_util.lint import lint_tool_source_with
+from galaxy.tool_util.linters.help import rst_invalid
+from galaxy.util import unicodify
+
+from planemo.io import info
+from planemo.lint import (
+    handle_lint_complete,
+    lint_urls,
+    lint_xsd,
+    setup_lint,
+)
+from planemo.shed import (
+    CURRENT_CATEGORIES,
+    REPO_TYPE_SUITE,
+    REPO_TYPE_TOOL_DEP,
+    REPO_TYPE_UNRESTRICTED,
+    validate_repo_name,
+    validate_repo_owner,
+)
+from planemo.shed2tap import base
+from planemo.tool_lint import (
+    build_tool_lint_args,
+    handle_tool_load_error,
+)
+from planemo.tools import yield_tool_sources
+from planemo.xml import XSDS_PATH
+
+
+TOOL_DEPENDENCIES_XSD = os.path.join(XSDS_PATH, "tool_dependencies.xsd")
+REPO_DEPENDENCIES_XSD = os.path.join(XSDS_PATH, "repository_dependencies.xsd")
+
+
+VALID_REPOSITORY_TYPES = [
+    REPO_TYPE_UNRESTRICTED,
+    REPO_TYPE_TOOL_DEP,
+    REPO_TYPE_SUITE,
+]
+
+SHED_METADATA = [
+    "description",
+    "long_description",
+    "remote_repository_url",
+    "homepage_url",
+    "categories",
+]
+
+
+def lint_repository(ctx, realized_repository, **kwds):
+    """Lint a realized shed repository.
+
+    See :mod:`planemo.shed` for details on constructing a realized
+    repository data structure.
+    """
+    failed = False
+    path = realized_repository.real_path
+    info("Linting repository %s" % path)
+    lint_args = build_tool_lint_args(ctx, **kwds)
+    lint_args, lint_ctx = setup_lint(ctx, lint_args=lint_args, **kwds)
+    lint_ctx.lint(
+        "lint_expansion",
+        lint_expansion,
+        realized_repository,
+    )
+    lint_ctx.lint(
+        "lint_expected_files",
+        lint_expected_files,
+        realized_repository,
+    )
+    lint_ctx.lint(
+        "lint_tool_dependencies_xsd",
+        lint_tool_dependencies_xsd,
+        realized_repository,
+    )
+    lint_ctx.lint(
+        "lint_tool_dependencies_sha256sum",
+        lint_tool_dependencies_sha256sum,
+        realized_repository,
+    )
+    lint_ctx.lint(
+        "lint_tool_dependencies_actions",
+        lint_tool_dependencies_actions,
+        realized_repository,
+    )
+    lint_ctx.lint(
+        "lint_repository_dependencies",
+        lint_repository_dependencies,
+        realized_repository,
+    )
+    lint_ctx.lint(
+        "lint_shed_yaml",
+        lint_shed_yaml,
+        realized_repository,
+    )
+    lint_ctx.lint(
+        "lint_readme",
+        lint_readme,
+        realized_repository,
+    )
+    if kwds["urls"]:
+        lint_ctx.lint(
+            "lint_urls",
+            lint_tool_dependencies_urls,
+            realized_repository,
+        )
+    if kwds["tools"]:
+        tools_failed = lint_repository_tools(ctx, realized_repository, lint_ctx, lint_args)
+        failed = failed or tools_failed
+    if kwds["ensure_metadata"]:
+        lint_ctx.lint(
+            "lint_shed_metadata",
+            lint_shed_metadata,
+            realized_repository,
+        )
+    return handle_lint_complete(lint_ctx, lint_args, failed=failed)
+
+
+def lint_repository_tools(ctx, realized_repository, lint_ctx, lint_args):
+    path = realized_repository.path
+    for (tool_path, tool_source) in yield_tool_sources(ctx, path,
+                                                       recursive=True):
+        original_path = tool_path.replace(path, realized_repository.real_path)
+        info("+Linting tool %s" % original_path)
+        if handle_tool_load_error(tool_path, tool_source):
+            return True
+        lint_tool_source_with(
+            lint_ctx,
+            tool_source,
+            extra_modules=lint_args["extra_modules"]
+        )
+
+
+def lint_expansion(realized_repository, lint_ctx):
+    missing = realized_repository.missing
+    if missing:
+        msg = "Failed to expand inclusions %s" % missing
+        lint_ctx.warn(msg)
+    else:
+        lint_ctx.info("Included files all found.")
+
+
+def lint_shed_metadata(realized_repository, lint_ctx):
+    found_all = True
+    for key in SHED_METADATA:
+        if key not in realized_repository.config:
+            found_all = False
+            lint_ctx.warn(
+                "Missing shed metadata field [%s] for repository" % key
+            )
+    if found_all:
+        lint_ctx.info(
+            "Found all shed metadata fields required for automated repository "
+            "creation and/or updates."
+        )
+
+
+def lint_readme(realized_repository, lint_ctx):
+    path = realized_repository.real_path
+    readme_rst = os.path.join(path, "README.rst")
+    readme = os.path.join(path, "README")
+    readme_txt = os.path.join(path, "README.txt")
+
+    readme_found = False
+    for readme in [readme_rst, readme, readme_txt]:
+        if os.path.exists(readme):
+            readme_found = readme
+
+    readme_md = os.path.join(path, "README.md")
+    if not readme_found and os.path.exists(readme_md):
+        lint_ctx.warn("Tool Shed doesn't render markdown, "
+                      "README.md is invalid readme.")
+        return
+
+    if not readme_found:
+        # TODO: filter on TYPE and make this a warning if
+        # unrestricted repository - need to update iuc standards
+        # first though.
+        lint_ctx.info("No README found skipping.")
+        return
+
+    if readme_found.endswith(".rst"):
+        with open(readme_found, "r") as fh:
+            readme_text = fh.read()
+        invalid_rst = rst_invalid(readme_text)
+        if invalid_rst:
+            template = "Invalid restructured text found in README [%s]."
+            msg = template % invalid_rst
+            lint_ctx.warn(msg)
+            return
+        lint_ctx.info("README found containing valid reStructuredText.")
+    else:
+        lint_ctx.info("README found containing plain text.")
+
+
+def lint_tool_dependencies_urls(realized_repository, lint_ctx):
+    path = realized_repository.real_path
+    tool_dependencies = os.path.join(path, "tool_dependencies.xml")
+    if not os.path.exists(tool_dependencies):
+        lint_ctx.info("No tool_dependencies.xml, skipping.")
+        return
+
+    root = ET.parse(tool_dependencies).getroot()
+    lint_urls(root, lint_ctx)
+
+
+def lint_tool_dependencies_sha256sum(realized_repository, lint_ctx):
+    tool_dependencies = os.path.join(realized_repository.real_path, "tool_dependencies.xml")
+    if not os.path.exists(tool_dependencies):
+        lint_ctx.info("No tool_dependencies.xml, skipping.")
+        return
+
+    root = ET.parse(tool_dependencies).getroot()
+
+    count = 0
+    for action in root.findall(".//action"):
+        assert action.tag == "action"
+        if action.attrib.get('type', '') not in ['download_by_url', 'download_file']:
+            continue
+        url = action.text.strip()
+        checksum = action.attrib.get('sha256sum', '')
+        if not checksum:
+            lint_ctx.warn("Missing checksum for %s" % url)
+        elif len(checksum) != 64 or not set("0123456789abcdef").issuperset(checksum.lower()):
+            lint_ctx.error("Invalid checksum %r for %s" % (checksum, url))
+        else:
+            # TODO - See planned --verify option to check it matches
+            # lint_ctx.info("SHA256 checkum listed for %s" % url)
+            count += 1
+    if count:
+        lint_ctx.info("Found %i download action(s) with SHA256 checksums" % count)
+
+
+def lint_tool_dependencies_xsd(realized_repository, lint_ctx):
+    path = realized_repository.real_path
+    tool_dependencies = os.path.join(path, "tool_dependencies.xml")
+    if not os.path.exists(tool_dependencies):
+        lint_ctx.info("No tool_dependencies.xml, skipping.")
+        return
+    lint_xsd(lint_ctx, TOOL_DEPENDENCIES_XSD, tool_dependencies)
+
+
+def lint_tool_dependencies_actions(realized_repository, lint_ctx):
+    path = realized_repository.real_path
+    tool_dependencies = os.path.join(path, "tool_dependencies.xml")
+    if not os.path.exists(tool_dependencies):
+        lint_ctx.info("No tool_dependencies.xml, skipping.")
+        return
+    try:
+        base.Dependencies(tool_dependencies)
+        lint_ctx.info("Parsed tool dependencies.")
+    except Exception as e:
+        import sys
+        import traceback
+        exc_type, exc_value, exc_traceback = sys.exc_info()
+        traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
+        traceback.print_exc()
+        template = "Problem parsing tool_dependenies.xml [%s]"
+        msg = template % unicodify(e)
+        lint_ctx.warn(msg)
+        return
+
+
+def lint_expected_files(realized_repository, lint_ctx):
+    if realized_repository.is_package:
+        if not os.path.exists(realized_repository.tool_dependencies_path):
+            lint_ctx.warn("Package repository does not contain a "
+                          "tool_dependencies.xml file.")
+
+    if realized_repository.is_suite:
+        if not os.path.exists(realized_repository.repo_dependencies_path):
+            lint_ctx.warn("Suite repository does not contain a "
+                          "repository_dependencies.xml file.")
+
+
+def lint_repository_dependencies(realized_repository, lint_ctx):
+    path = realized_repository.real_path
+    repo_dependencies = os.path.join(path, "repository_dependencies.xml")
+    if not os.path.exists(repo_dependencies):
+        lint_ctx.info("No repository_dependencies.xml, skipping.")
+        return
+    lint_xsd(lint_ctx, REPO_DEPENDENCIES_XSD, repo_dependencies)
+
+
+def lint_shed_yaml(realized_repository, lint_ctx):
+    path = realized_repository.real_path
+    shed_yaml = os.path.join(path, ".shed.yml")
+    if not os.path.exists(shed_yaml):
+        lint_ctx.info("No .shed.yml file found, skipping.")
+        return
+    try:
+        with open(shed_yaml, "r") as fh:
+            yaml.safe_load(fh)
+    except Exception as e:
+        lint_ctx.warn("Failed to parse .shed.yml file [%s]" % unicodify(e))
+        return
+    lint_ctx.info(".shed.yml found and appears to be valid YAML.")
+    _lint_shed_contents(lint_ctx, realized_repository)
+
+
+def _lint_shed_contents(lint_ctx, realized_repository):
+    config = realized_repository.config
+
+    def _lint_if_present(key, func, *args):
+        value = config.get(key, None)
+        if value is not None:
+            msg = func(value, *args)
+            if msg:
+                lint_ctx.warn(msg)
+
+    _lint_if_present("owner", validate_repo_owner)
+    _lint_if_present("name", validate_repo_name)
+    _lint_if_present("type", _validate_repo_type, config["name"])
+    _lint_if_present("categories", _validate_categories, realized_repository)
+
+
+def _validate_repo_type(repo_type, name):
+    if repo_type not in VALID_REPOSITORY_TYPES:
+        return "Invalid repository type specified [%s]" % repo_type
+
+    is_dep = repo_type == "tool_dependency_definition"
+    is_suite = repo_type == "repository_suite_definition"
+    if is_dep and not name.startswith("package_"):
+        return ("Tool dependency definition repositories should have names "
+                "starting with package_")
+    if is_suite and not name.startswith("suite_"):
+        return ("Repository suite definition repositories should have names "
+                "starting with suite_")
+    if name.startswith("package_") or name.startswith("suite_"):
+        if repo_type == "unrestricted":
+            return ("Repository name indicated specialized repository type "
+                    "but repository is listed as unrestricted.")
+
+
+def _validate_categories(categories, realized_repository):
+    msg = None
+    if len(categories) == 0:
+        msg = "Repository should specify one or more categories."
+    else:
+        for category in categories:
+            unknown_categories = []
+            if category not in CURRENT_CATEGORIES:
+                unknown_categories.append(category)
+            if unknown_categories:
+                msg = "Categories [%s] unknown." % unknown_categories
+        if realized_repository.is_package:
+            if "Tool Dependency Packages" not in categories:
+                msg = ("Packages should be placed and should only be placed "
+                       "in the category 'Tool Dependency Packages'.")
+
+    return msg
+
+
+__all__ = (
+    "lint_repository",
+)