diff sm_api_wrapper.py @ 0:0c3f56c85e98 draft default tip

planemo upload for repository https://github.com/galaxyproject/tools-iuc/main/tools/swissmodel_modelling_api commit 43b5bef8757185b4c077effd0bad846f25d408db
author iuc
date Thu, 11 Dec 2025 19:32:14 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sm_api_wrapper.py	Thu Dec 11 19:32:14 2025 +0000
@@ -0,0 +1,428 @@
+"""Wrapper for the SWISS-MODEL API."""
+
+import argparse
+import json
+import os
+import sys
+import time
+from urllib.parse import urlsplit
+
+import requests
+
+
+class _SmApiWhisperer:
+    """Parent class for talking to the SWISS-MODEL API."""
+
+    PROJECT_TYPE = ""
+
+    def __init__(self, targets, token, project_title="Untitled Project"):
+        self.project_id = None
+        self.project_title = project_title
+        self.targets = targets
+        self.token = token
+
+    def get_json_payload(self):
+        """Needs to be implemented per project type."""
+        raise NotImplementedError
+
+    def submit_request(self):
+        """Send off a request to the SM API."""
+        json_payload = self.get_json_payload()
+        json_payload["project_title"] = self.project_title
+        try:
+            response = requests.post(
+                f"https://swissmodel.expasy.org/{self.PROJECT_TYPE}",
+                headers={"Authorization": f"Token {self.token}"},
+                json=json_payload,
+                timeout=60,
+            )
+        except requests.exceptions.ConnectTimeout:
+            print(
+                "SWISS-MODEL seems to temporarily unavailable",
+                file=sys.stderr,
+            )
+            sys.exit(3)
+        if response.ok is not True:
+            raise RuntimeError(
+                f"Submitting modelling job failed ({response.status_code})"
+            )
+        self.project_id = response.json()["project_id"]
+
+        return response.status_code
+
+    def wait(self):
+        """Poll the API for job to be finished."""
+        response = None
+        # Wait at the end, there is a chance that this project is already
+        # available from cache.
+        while True:
+            # Update the status from the server
+            # response = requests.get(
+            #     f"https://swissmodel.expasy.org/project/{self.project_id}/"
+            #     + "models/summary/",
+            #     headers={"Authorization": f"Token {self.token}"},
+            #     timeout=360,
+            # )
+            response = requests.get(
+                f"https://swissmodel.expasy.org/project/{self.project_id}/"
+                + "models/full-details/",
+                headers={"Authorization": f"Token {self.token}"},
+                timeout=360,
+            )
+            # Update the status
+            status = response.json()["status"]
+            if status.upper() in ["COMPLETED", "FAILED"]:
+                break
+            # Wait for some time before the next request
+            time.sleep(17)
+
+        return response.json()
+
+    def fetch_results(
+        self, response_object, output_dir, fetch_modelcif=True, fetch_pdb=True
+    ):
+        """Get results of the modelling job."""
+
+        def _store_model_json(model_json, outdir):
+            fname = f"model_{model_json['model_id']}.json"
+            with open(
+                os.path.join(outdir, "JSON", fname), "w", encoding="utf8"
+            ) as jfh:
+                json.dump(model_json, jfh)
+
+        def _fetch_file(url, file_type, outdir):
+            response = requests.get(url, timeout=360)
+            if response.ok is not True:
+                raise RuntimeError(
+                    f"Fetching {file_type} output failed ("
+                    + f"{response.status_code})."
+                )
+            try:
+                os.mkdir(os.path.join(outdir, file_type))
+            except FileExistsError:
+                pass
+            fname = f"model_{os.path.basename(urlsplit(url).path)}"
+            with open(os.path.join(outdir, file_type, fname), "wb") as mfh:
+                for chunk in response.iter_content(chunk_size=8192):
+                    mfh.write(chunk)
+
+        # make sure a JSON directory exists
+        os.mkdir(os.path.join(output_dir, "JSON"))
+        if response_object["status"] == "COMPLETED":
+            for model in response_object["models"]:
+                _store_model_json(model, output_dir)
+                if fetch_modelcif:
+                    _fetch_file(model["modelcif_url"], "ModelCIF", output_dir)
+                if fetch_pdb:
+                    _fetch_file(model["coordinates_url"], "PDB", output_dir)
+
+
+class _AutoModelWhisperer(_SmApiWhisperer):
+    """SM automodel project."""
+
+    PROJECT_TYPE = "automodel"
+
+    def get_json_payload(self):
+        """Payload for automodel mode."""
+        return {"target_sequences": self.targets}
+
+
+class _AlignmentWhisperer(_SmApiWhisperer):
+    """SM alignemt project."""
+
+    PROJECT_TYPE = "alignment"
+
+    def __init__(
+        self,
+        targets,
+        token,
+        template_sequence,
+        template_seqres_offset,
+        pdb_id,
+        auth_asym_id,
+        assembly_id,
+        project_title="Untitled Project",
+    ):
+        # Not sure how to reduce the number of arguments as they are required
+        # by the API, so make an exception in Pylint.
+        # pylint: disable=too-many-arguments,too-many-positional-arguments
+        """Initialise alignment mode, add mode-specific info to the method."""
+        super().__init__(targets, token, project_title=project_title)
+        self.assembly_id = assembly_id
+        self.auth_asym_id = auth_asym_id
+        self.pdb_id = pdb_id.lower()
+        self.template_seqres_offset = template_seqres_offset
+        self.template_sequence = template_sequence
+
+    def get_json_payload(self):
+        """Payload for alignment mode."""
+
+        return {
+            "assembly_id": self.assembly_id,
+            "auth_asym_id": self.auth_asym_id,
+            "pdb_id": self.pdb_id,
+            "target_sequences": self.targets,
+            "template_seqres_offset": self.template_seqres_offset,
+            "template_sequence": self.template_sequence,
+        }
+
+
+class _UserTemplateWhisperer(_SmApiWhisperer):
+    """SM user-template project."""
+
+    PROJECT_TYPE = "user_template"
+
+    def __init__(
+        self,
+        targets,
+        token,
+        template_file,
+        project_title="Untitled Project",
+    ):
+        """Initialise user template mode."""
+        super().__init__(targets, token, project_title=project_title)
+        self.template_file = template_file
+
+    def get_json_payload(self):
+        """Payload for user upload mode."""
+        with open(self.template_file, encoding="utf8") as tfh:
+            template_coordinates = tfh.read()
+
+        return {
+            "project_title": self.project_title,
+            "target_sequences": self.targets,
+            "template_coordinates": template_coordinates,
+        }
+
+
+def _defastarise_targets(sequences):
+    """In case some of the targets carry FastA headers, remove them."""
+    targets = []
+    for seq in sequences:
+        seq = seq.split(" ")
+        if len(seq) > 1:
+            if seq[0].strip().startswith((">", "__gt__")):
+                targets.append("".join(seq[1:]))
+            else:
+                targets.append("".join(seq))
+        else:
+            targets.extend(seq)
+
+    return targets
+
+
+def _parse_args():
+    """Get command line arguments."""
+    parser = argparse.ArgumentParser(description=__doc__)
+
+    parser.add_argument(
+        "-d",
+        "--project-title",
+        help="Title for the modelling project",
+        metavar="<TITLE>",
+    )
+    parser.add_argument(
+        "-m",
+        "--no-modelcif",
+        help="Do not download models in ModelCIF format.",
+        default=False,
+        action="store_true",
+    )
+    parser.add_argument(
+        "-l",
+        "--fetch-pdb",
+        help="Download models in PDB legacy format.",
+        default=False,
+        action="store_true",
+    )
+    parser.add_argument(
+        "-t",
+        "--template-sequence",
+        help="The template sequence used for alignment mode",
+        metavar="<SEQUENCE>",
+    )
+    # ToDo: do we need the offset from the user? Doesn't interactive alignment
+    #       mode compute it?
+    parser.add_argument(
+        "-o",
+        "--template-seqres-offset",
+        help="Offset of the template sequence segment compared to the full "
+        + "template sequence",
+        metavar="<NUMBER>",
+        type=int,
+    )
+    parser.add_argument(
+        "-p",
+        "--pdb-id",
+        help="PDB ID (SMTL ID) for the template used in alignment mode",
+        metavar="<PDB ID>",
+    )
+    parser.add_argument(
+        "-c",
+        "--auth-asym-id",
+        help="The chain name to be used in alignment mode",
+        metavar="<CHAIN NAME>",
+    )
+    parser.add_argument(
+        "-a",
+        "--assembly-id",
+        help="ID of the assembly of the SMTL template to be used in alignment "
+        + "mode",
+        metavar="<NUMBER>",
+        type=int,
+    )
+    parser.add_argument(
+        "-f",
+        "--template-file",
+        help="PDB formatted file to serve as template for modelling",
+        metavar="<PDB FILE>",
+    )
+    parser.add_argument(
+        "project_type",
+        choices=("alignment", "automodel", "usertemplate"),
+        help="Kind of project ('alignmet', 'automodel', 'usertemplate')",
+        metavar="<PROJECT TYPE>",
+    )
+    metas = {
+        "outdir": "<OUTPUT DIRECTORY>",
+        "target_sequences": "<SEQUENCE[S]>",
+        "token": "<TOKEN>",
+    }
+    parser.add_argument(
+        "token",
+        help="Authentication token for SWISS-MODEL",
+        metavar=metas["token"],
+    )
+    parser.add_argument(
+        "outdir",
+        help="Directory to store results in",
+        metavar=metas["outdir"],
+    )
+    parser.add_argument(
+        "target_sequences",
+        help="Target sequence to be modelled; to add multiple sequences, "
+        + "delimit with a space",
+        metavar=metas["target_sequences"],
+        nargs=argparse.REMAINDER,
+    )
+
+    opts = parser.parse_args()
+
+    # Make sure arguments for the different modelling modes are there
+    req_opts = {
+        "alignment": [
+            "assembly_id",
+            "auth_asym_id",
+            "pdb_id",
+            "template_seqres_offset",
+            "template_sequence",
+        ],
+        "automodel": [],
+        "usertemplate": ["template_file"],
+    }
+    # check mandatory arguments
+    for req in req_opts[opts.project_type]:
+        value = getattr(opts, req)
+        if value is None:
+            print(
+                f"Option '--{req.replace('_', '-')}' missing for "
+                + f"'{opts.project_type}' mode",
+                file=sys.stderr,
+            )
+            sys.exit(2)
+        if isinstance(value, str) and len(value) == 0:
+            print(
+                f"Option '--{req.replace('_', '-')}' can not be an empty "
+                + "string",
+                file=sys.stderr,
+            )
+            sys.exit(2)
+    # check positional arguments
+    for req, mta in metas.items():
+        value = getattr(opts, req)
+        if isinstance(value, str):
+            if len(value) == 0:
+                print(
+                    f"Argument of '{mta}' can not be an empty string",
+                    file=sys.stderr,
+                )
+                sys.exit(2)
+        elif isinstance(value, list):
+            if len(value) == 0 or not all(value):
+                print(
+                    f"Argument of '{mta}' can not be an empty",
+                    file=sys.stderr,
+                )
+                sys.exit(2)
+        else:
+            raise RuntimeError(
+                f"Value with unknown type '{type(value).__name__}' found for "
+                + f"'{mta}'"
+            )
+    # check optional & positional arguments
+    for opt in ["project_title"]:
+        value = getattr(opts, opt)
+        if value is not None and len(value) == 0:
+            print(
+                f"Option '--{opt.replace('_', '-')}' can not have an empty "
+                + "string as value",
+                file=sys.stderr,
+            )
+            sys.exit(2)
+
+    return opts
+
+
+def _main():
+    """Run as script."""
+    opts = _parse_args()
+
+    target_sequences = _defastarise_targets(opts.target_sequences)
+    # determine class
+    whsprr = None
+    if opts.project_type.lower() == "automodel":
+        whsprr = _AutoModelWhisperer(
+            target_sequences, opts.token, project_title=opts.project_title
+        )
+    elif opts.project_type.lower() == "alignment":
+        template_sequence = _defastarise_targets([opts.template_sequence])
+        assert len(template_sequence) == 1
+        template_sequence = template_sequence[0]
+        whsprr = _AlignmentWhisperer(
+            target_sequences,
+            opts.token,
+            template_sequence,
+            opts.template_seqres_offset,
+            opts.pdb_id,
+            opts.auth_asym_id,
+            opts.assembly_id,
+            project_title=opts.project_title,
+        )
+    elif opts.project_type.lower() == "usertemplate":
+        whsprr = _UserTemplateWhisperer(
+            target_sequences,
+            opts.token,
+            opts.template_file,
+            project_title=opts.project_title,
+        )
+    else:
+        raise RuntimeError(
+            f"Not a suitable project type: '{opts.project_type}'"
+        )
+    # run the modelling job and wait for it to finish
+    whsprr.submit_request()
+    response = whsprr.wait()
+    whsprr.fetch_results(
+        response,
+        opts.outdir,
+        fetch_modelcif=not opts.no_modelcif,
+        fetch_pdb=opts.fetch_pdb,
+    )
+
+    sys.exit(0)
+
+
+if __name__ == "__main__":
+    _main()
+
+#  LocalWords:  Pylint