view uniprotxml_downloader.py @ 6:a371252a2cf6 draft default tip

planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/uniprotxml_downloader commit 0c5222345ace5054df44da29cab278f4a02e2b41
author galaxyp
date Thu, 06 Jul 2023 21:15:39 +0000
parents 265c35540faa
children
line wrap: on
line source

#!/usr/bin/env python
"""
#
#------------------------------------------------------------------------------
#                         University of Minnesota
#         Copyright 2016, Regents of the University of Minnesota
#------------------------------------------------------------------------------
# Author:
#
#  James E Johnson
#
#------------------------------------------------------------------------------
"""
import optparse
import re
import sys
from urllib import parse

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

DEFAULT_TIMEOUT = 5  # seconds
retry_strategy = Retry(
    total=5,
    backoff_factor=2,
    status_forcelist=[429, 500, 502, 503, 504],
    allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
)


class TimeoutHTTPAdapter(HTTPAdapter):
    def __init__(self, *args, **kwargs):
        self.timeout = DEFAULT_TIMEOUT
        if "timeout" in kwargs:
            self.timeout = kwargs["timeout"]
            del kwargs["timeout"]
        super().__init__(*args, **kwargs)

    def send(self, request, **kwargs):
        timeout = kwargs.get("timeout")
        if timeout is None:
            kwargs["timeout"] = self.timeout
        return super().send(request, **kwargs)


def __main__():
    # Parse Command Line
    parser = optparse.OptionParser()
    parser.add_option('-i', '--input', dest='input', default=None, help='Tabular file containing a column of search search_ids')
    parser.add_option('-c', '--column', dest='column', type='int', default=0, help='The column (zero-based) in the tabular file that contains search search_ids')
    parser.add_option('-s', '--search-id', dest='search_id', action='append', default=[], help='ID to search in Uniprot')
    parser.add_option('-r', '--reviewed', dest='reviewed', help='Only uniprot reviewed entries')
    parser.add_option('-f', '--format', dest='format', choices=['xml', 'fasta'], default='xml', help='output format')
    parser.add_option('-k', '--field', dest='field', choices=['taxonomy_name', 'taxonomy_id', 'accession'], default='taxonomy_name', help='query field')
    parser.add_option('-o', '--output', dest='output', help='file path for the downloaded uniprot xml')
    parser.add_option('-d', '--debug', dest='debug', action='store_true', default=False, help='Turn on wrapper debugging to stderr')
    (options, args) = parser.parse_args()
    search_ids = set(options.search_id)
    if options.input:
        with open(options.input, 'r') as inputFile:
            for linenum, line in enumerate(inputFile):
                if line.startswith('#'):
                    continue
                fields = line.rstrip('\r\n').split('\t')
                if len(fields) > abs(options.column):
                    search_id = fields[options.column].strip()
                    if search_id:
                        search_ids.add(search_id)
    search_queries = [f'{options.field}:"{search_id}"' for search_id in search_ids]
    search_query = ' OR '.join(search_queries)
    if options.output:
        dest_path = options.output
    else:
        dest_path = "uniprot_%s.xml" % '_'.join(search_ids)
    reviewed = " reviewed:%s" % options.reviewed if options.reviewed else ''
    try:
        url = 'https://rest.uniprot.org/uniprotkb/stream'
        query = "%s%s" % (search_query, reviewed)
        params = {'query': query, 'format': options.format}
        if options.debug:
            print("%s ? %s" % (url, params), file=sys.stderr)
        data = parse.urlencode(params)
        print(f"Retrieving: {url}?{data}")
        adapter = TimeoutHTTPAdapter(max_retries=retry_strategy)

        http = requests.Session()
        http.mount("https://", adapter)
        response = http.get(url, params=params)
        http.close()

        if response.status_code != 200:
            exit(f"Request failed with status code {response.status_code}:\n{response.text}")

        with open(dest_path, 'w') as fh:
            fh.write(response.text)

        if options.format == 'xml':
            with open(dest_path, 'r') as contents:
                while True:
                    line = contents.readline()
                    if options.debug:
                        print(line, file=sys.stderr)
                    if line is None:
                        break
                    if line.startswith('<?'):
                        continue
                    # pattern match <root or <ns:root for any ns string
                    pattern = r'^<(\w*:)?uniprot'
                    if re.match(pattern, line):
                        break
                    else:
                        print("failed: Not a uniprot xml file", file=sys.stderr)
                        exit(1)
        print("Search IDs:%s" % search_ids, file=sys.stdout)
        if 'X-UniProt-Release' in response.headers:
            print("UniProt-Release:%s" % response.headers['X-UniProt-Release'], file=sys.stdout)
        if 'X-Total-Results' in response.headers:
            print("Entries:%s" % response.headers['X-Total-Results'], file=sys.stdout)
    except Exception as e:
        exit("%s" % e)


if __name__ == "__main__":
    __main__()