diff findSpanin.py @ 3:fd70980a516b draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:42:01 +0000
parents
children 673d1776d3b9
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/findSpanin.py	Mon Jun 05 02:42:01 2023 +0000
@@ -0,0 +1,597 @@
+##### findSpanin.pl --> findSpanin.py
+######### Much of this code is very "blocked", in the sense that one thing happens...then a function happens on the return...then another function...etc...etc...
+
+import argparse
+import os
+import re  # new
+import itertools  # new
+from collections import Counter, OrderedDict
+from spaninFuncs import (
+    getDescriptions,
+    grabLocs,
+    spaninProximity,
+    splitStrands,
+    tuple_fasta,
+    lineWrapper,
+)
+
+### Requirement Inputs
+#### INPUT : putative_isp.fa & putative_osp.fa (in that order)
+#### PARAMETERS :
+
+###############################################################################
+def write_output(candidates):
+    """output file function...maybe not needed"""
+    pass
+
+
+def reconfigure_dict(spanins):
+    """
+    re organizes dictionary to be more friendly for checks
+    """
+
+    new_spanin_dict = {}
+
+    for each_spanin_type, data_dict in spanins.items():
+        # print(f"{each_spanin_type} == {data_dict}")
+        new_spanin_dict[each_spanin_type] = {}
+        new_spanin_dict[each_spanin_type]["positive"] = {}
+        new_spanin_dict[each_spanin_type]["negative"] = {}
+        new_spanin_dict[each_spanin_type]["positive"]["coords"] = []
+        new_spanin_dict[each_spanin_type]["negative"]["coords"] = []
+        for outter_orf, inner_data in data_dict.items():
+            list_of_hits = []
+            for data_content in inner_data:
+                # print(data_content)
+                data_content.insert(0, outter_orf)
+                # print(f"new data_content -> {data_content}")
+                # print(data_content)
+                # list_of_hits += [data_content]
+                # new_spanin_dict[each_spanin_type] += [data_content]
+                if data_content[6] == "+":
+                    # print(f"{each_spanin_type} @ POSITIVE")
+                    new_spanin_dict[each_spanin_type]["positive"]["coords"] += [
+                        data_content
+                    ]
+                elif data_content[6] == "-":
+                    # print(f"{each_spanin_type} @ NEGATIVE")
+                    new_spanin_dict[each_spanin_type]["negative"]["coords"] += [
+                        data_content
+                    ]
+        # print(new_spanin_dict[each_spanin_type])
+        # print(reorganized)
+        # print(f"{outter_orf} => {inner_data}")
+        # print(new_spanin_dict)
+
+        # print('\n')
+    # for k, v in new_spanin_dict.items():
+    # print(k)
+    # print(v)
+    return new_spanin_dict
+
+
+def check_for_uniques(spanins):
+    """
+    Checks for unique spanins based on spanin_type.
+    If the positive strand end site is _the same_ for a i-spanin, we would group that as "1".
+    i.e. if ORF1, ORF2, and ORF3 all ended with location 4231, they would not be unique.
+    """
+    pair_dict = {}
+    pair_dict = {
+        "pairs": {
+            "location_amount": [],
+            "pair_number": {},
+        }
+    }
+    for each_spanin_type, spanin_data in spanins.items():
+        # print(f"{each_spanin_type} ===> {spanin_data}")
+        # early declarations for cases of no results
+        pos_check = []  # end checks
+        pos_uniques = []
+        neg_check = []  # start checks
+        neg_uniques = []
+        unique_ends = []
+        pos_amt_unique = 0
+        neg_amt_unique = 0
+        amt_positive = 0
+        amt_negative = 0
+        spanin_data["uniques"] = 0
+        spanin_data["amount"] = 0
+        # spanin_data['positive']['amt_positive'] = 0
+        # spanin_data['positive']['pos_amt_unique'] = 0
+        # spanin_data['positive']['isp_match'] = []
+        # spanin_data['negative']['amt_negative'] = 0
+        # spanin_data['negative']['neg_amt_unique'] = 0
+        # spanin_data['negative']['isp_match'] = []
+        # print(spanin_data)
+        if spanin_data["positive"]["coords"]:
+            # do something...
+            # print('in other function')
+            # print(spanin_data['positive']['coords'])
+            for each_hit in spanin_data["positive"]["coords"]:
+                pos_check.append(each_hit[2])
+                pair_dict["pairs"]["location_amount"].append(each_hit[2])
+                pos_uniques = list(
+                    set(
+                        [
+                            end_site
+                            for end_site in pos_check
+                            if pos_check.count(end_site) >= 1
+                        ]
+                    )
+                )
+            # print(pos_check)
+            # print(pos_uniques)
+            amt_positive = len(spanin_data["positive"]["coords"])
+            pos_amt_unique = len(pos_uniques)
+            if amt_positive:
+                spanin_data["positive"]["amt_positive"] = amt_positive
+                spanin_data["positive"]["pos_amt_unique"] = pos_amt_unique
+            # pair_dict['pairs']['locations'].extend(pos_uniques)
+        else:
+            spanin_data["positive"]["amt_positive"] = 0
+            spanin_data["positive"]["pos_amt_unique"] = 0
+        if spanin_data["negative"]["coords"]:
+
+            # do something else...
+            # print('in other function')
+            # print(spanin_data['negative']['coords'])
+            for each_hit in spanin_data["negative"]["coords"]:
+                neg_check.append(each_hit[1])
+                pair_dict["pairs"]["location_amount"].append(each_hit[1])
+                neg_uniques = list(
+                    set(
+                        [
+                            start_site
+                            for start_site in neg_check
+                            if neg_check.count(start_site) >= 1
+                        ]
+                    )
+                )
+            # print(neg_uniques)
+            amt_negative = len(spanin_data["negative"]["coords"])
+            neg_amt_unique = len(neg_uniques)
+            if amt_negative:
+                spanin_data["negative"]["amt_negative"] = amt_negative
+                spanin_data["negative"]["neg_amt_unique"] = neg_amt_unique
+            # pair_dict['pairs']['locations'].extend(neg_uniques)
+        else:
+            spanin_data["negative"]["amt_negative"] = 0
+            spanin_data["negative"]["neg_amt_unique"] = 0
+        spanin_data["uniques"] += (
+            spanin_data["positive"]["pos_amt_unique"]
+            + spanin_data["negative"]["neg_amt_unique"]
+        )
+        spanin_data["amount"] += (
+            spanin_data["positive"]["amt_positive"]
+            + spanin_data["negative"]["amt_negative"]
+        )
+        # print(spanin_data['uniques'])
+    list(set(pair_dict["pairs"]["location_amount"]))
+    pair_dict["pairs"]["location_amount"] = dict(
+        Counter(pair_dict["pairs"]["location_amount"])
+    )
+    for data in pair_dict.values():
+        # print(data['locations'])
+        # print(type(data['locations']))
+        v = 0
+        for loc, count in data["location_amount"].items():
+            # data['pair_number'] = {loc
+            v += 1
+            data["pair_number"][loc] = v
+    # print(dict(Counter(pair_dict['pairs']['locations'])))
+    # print(pair_dict)
+    spanins["total_amount"] = (
+        spanins["EMBEDDED"]["amount"]
+        + spanins["SEPARATED"]["amount"]
+        + spanins["OVERLAPPED"]["amount"]
+    )
+    spanins["total_unique"] = (
+        spanins["EMBEDDED"]["uniques"]
+        + spanins["SEPARATED"]["uniques"]
+        + spanins["OVERLAPPED"]["uniques"]
+    )
+    # spanins['total_unique'] = len(pair_dict['pairs']['pair_number'])
+    return spanins, pair_dict
+
+
+if __name__ == "__main__":
+
+    # Common parameters for both ISP / OSP portion of script
+
+    parser = argparse.ArgumentParser(
+        description="Trim the putative protein candidates and find potential i-spanin / o-spanin pairs"
+    )
+
+    parser.add_argument(
+        "putative_isp_fasta_file",
+        type=argparse.FileType("r"),
+        help='Putative i-spanin FASTA file, output of "generate-putative-isp"',
+    )  # the "input" argument
+
+    parser.add_argument(
+        "putative_osp_fasta_file",
+        type=argparse.FileType("r"),
+        help='Putative o-spanin FASTA file, output of "generate-putative-osp"',
+    )
+
+    parser.add_argument(
+        "--max_isp_osp_distance",
+        dest="max_isp_osp_distance",
+        default=10,
+        type=int,
+        help="max distance from end of i-spanin to start of o-spanin, measured in AAs",
+    )
+
+    parser.add_argument(
+        "--embedded_txt",
+        dest="embedded_txt",
+        type=argparse.FileType("w"),
+        default="_findSpanin_embedded_results.txt",
+        help="Results of potential embedded spanins",
+    )
+    parser.add_argument(
+        "--overlap_txt",
+        dest="overlap_txt",
+        type=argparse.FileType("w"),
+        default="_findSpanin_overlap_results.txt",
+        help="Results of potential overlapping spanins",
+    )
+    parser.add_argument(
+        "--separate_txt",
+        dest="separate_txt",
+        type=argparse.FileType("w"),
+        default="_findSpanin_separated_results.txt",
+        help="Results of potential separated spanins",
+    )
+
+    parser.add_argument(
+        "--summary_txt",
+        dest="summary_txt",
+        type=argparse.FileType("w"),
+        default="_findSpanin_summary.txt",
+        help="Results of potential spanin pairs",
+    )
+    parser.add_argument(
+        "-v", action="version", version="0.3.0"
+    )  # Is this manually updated?
+    args = parser.parse_args()
+
+    #### RE-WRITE
+    SPANIN_TYPES = {}
+    SPANIN_TYPES["EMBEDDED"] = {}
+    SPANIN_TYPES["OVERLAPPED"] = {}
+    SPANIN_TYPES["SEPARATED"] = {}
+    # SPANIN_TYPES = {
+    #    'EMBEDDED' : {},
+    #    'OVERLAPPED' : {},
+    #    'SEPARATED' : {},
+    # }
+
+    isp = getDescriptions(args.putative_isp_fasta_file)
+    args.putative_isp_fasta_file = open(args.putative_isp_fasta_file.name, "r")
+    isp_full = tuple_fasta(args.putative_isp_fasta_file)
+
+    osp = getDescriptions(args.putative_osp_fasta_file)
+    args.putative_osp_fasta_file = open(args.putative_osp_fasta_file.name, "r")
+    osp_full = tuple_fasta(args.putative_osp_fasta_file)
+
+    #### location data
+    location_data = {"isp": [], "osp": []}
+    spanins = [isp, osp]
+    for idx, each_spanin_type in enumerate(spanins):
+        for description in each_spanin_type:
+            locations = grabLocs(description)
+            if idx == 0:  # i-spanin
+                location_data["isp"].append(locations)
+            elif idx == 1:  # o-spanin
+                location_data["osp"].append(locations)
+
+    #### Check for types of spanins
+    embedded, overlap, separate = spaninProximity(
+        isp=location_data["isp"],
+        osp=location_data["osp"],
+        max_dist=args.max_isp_osp_distance * 3,
+    )
+
+    SPANIN_TYPES["EMBEDDED"] = embedded
+    SPANIN_TYPES["OVERLAPPED"] = overlap
+    SPANIN_TYPES["SEPARATED"] = separate
+
+    # for spanin_type, spanin in SPANIN_TYPES.items():
+    #    s = 0
+    #    for sequence in spanin.values():
+    #        s += len(sequence)
+    #    SPANIN_TYPES[spanin_type]['amount'] = s
+    #    SPANIN_TYPES[spanin_type]['unique'] = len(spanin.keys())
+
+    # check_for_unique_spanins(SPANIN_TYPES)
+    spanins = reconfigure_dict(SPANIN_TYPES)
+    spanins, pair_dict = check_for_uniques(spanins)
+    # print(pair_dict)
+    with args.summary_txt as f:
+        for each_spanin_type, spanin_data in spanins.items():
+            try:
+                if each_spanin_type not in ["total_amount", "total_unique"]:
+                    # print(each_spanin_type)
+                    # print(each_spanin_type)
+                    f.write(
+                        "=~~~~~= "
+                        + str(each_spanin_type)
+                        + " Spanin Candidate Statistics =~~~~~=\n"
+                    )
+                    f.writelines(
+                        "Total Candidate Pairs = " + str(spanin_data["amount"]) + "\n"
+                    )
+                    f.writelines(
+                        "Total Unique Pairs = " + str(spanin_data["uniques"]) + "\n"
+                    )
+                    if each_spanin_type == "EMBEDDED":
+                        for k, v in SPANIN_TYPES["EMBEDDED"].items():
+                            # print(k)
+                            f.writelines(
+                                ""
+                                + str(k)
+                                + " ==> Amount of corresponding candidate o-spanins(s): "
+                                + str(len(v))
+                                + "\n"
+                            )
+                    if each_spanin_type == "SEPARATED":
+                        for k, v in SPANIN_TYPES["SEPARATED"].items():
+                            f.writelines(
+                                ""
+                                + str(k)
+                                + " ==> Amount of corresponding candidate o-spanins(s): "
+                                + str(len(v))
+                                + "\n"
+                            )
+                    if each_spanin_type == "OVERLAPPED":
+                        for k, v in SPANIN_TYPES["OVERLAPPED"].items():
+                            f.writelines(
+                                ""
+                                + str(k)
+                                + " ==> Amount of corresponding candidate o-spanins(s): "
+                                + str(len(v))
+                                + "\n"
+                            )
+            except TypeError:
+                continue
+        f.write("\n=~~~~~= Tally from ALL spanin types =~~~~~=\n")
+        f.writelines("Total Candidates = " + str(spanins["total_amount"]) + "\n")
+        f.writelines(
+            "Total Unique Candidate Pairs = " + str(spanins["total_unique"]) + "\n"
+        )
+
+    args.putative_isp_fasta_file = open(args.putative_isp_fasta_file.name, "r")
+    isp_full = tuple_fasta(args.putative_isp_fasta_file)
+
+    args.putative_osp_fasta_file = open(args.putative_osp_fasta_file.name, "r")
+    osp_full = tuple_fasta(args.putative_osp_fasta_file)
+
+    # print(isp_full)
+    isp_seqs = []
+    osp_seqs = []
+    for isp_tupe in isp_full:
+        # print(isp_tupe)
+        for pisp, posp in embedded.items():
+            # print(f"ISP = searching for {pisp} in {isp_tupe[0]}")
+            if re.search(("(" + str(pisp) + ")\D"), isp_tupe[0]):
+                # print(isp_tupe[0])
+                # print(peri_count)
+                peri_count = str.split(isp_tupe[0], "~=")[1]
+                isp_seqs.append((pisp, isp_tupe[1], peri_count))
+    # print(isp_seqs)
+    for osp_tupe in osp_full:
+        for pisp, posp in embedded.items():
+            for data in posp:
+                # print(f"OSP = searching for {data[3]} in {osp_tupe[0]}, coming from this object: {data}")
+                if re.search(("(" + str(data[3]) + ")\D"), osp_tupe[0]):
+                    peri_count = str.split(osp_tupe[0], "~=")[1]
+                    osp_seqs.append((data[3], osp_tupe[1], peri_count))
+
+    with args.embedded_txt as f:
+        f.write("================ embedded spanin candidates =================\n")
+        f.write(
+            "isp\tisp_start\tisp_end\tosp\tosp_start\tosp_end\tstrand\tpair_number\n"
+        )
+        if embedded != {}:
+            # print(embedded)
+            for pisp, posp in embedded.items():
+                # print(f"{pisp} - {posp}")
+                f.write(pisp + "\n")
+                for each_posp in posp:
+                    # print(posp)
+                    f.write(
+                        "\t{}\t{}\t{}\t{}\t{}\t{}\t".format(
+                            each_posp[1],
+                            each_posp[2],
+                            each_posp[3],
+                            each_posp[4],
+                            each_posp[5],
+                            each_posp[6],
+                        )
+                    )
+                    if each_posp[6] == "+":
+                        if each_posp[2] in pair_dict["pairs"]["pair_number"].keys():
+                            f.write(
+                                ""
+                                + str(pair_dict["pairs"]["pair_number"][each_posp[2]])
+                                + "\n"
+                            )
+                    elif each_posp[6] == "-":
+                        if each_posp[1] in pair_dict["pairs"]["pair_number"].keys():
+                            f.write(
+                                ""
+                                + str(pair_dict["pairs"]["pair_number"][each_posp[1]])
+                                + "\n"
+                            )
+        else:
+            f.write("nothing found")
+
+    with open(args.embedded_txt.name, "a") as f:
+        f.write("\n================= embedded candidate sequences ================\n")
+        f.write("======================= isp ==========================\n\n")
+        for isp_data in isp_seqs:
+            # print(isp_data)
+            f.write(
+                ">isp_orf::{}-peri_count~={}\n{}\n".format(
+                    isp_data[0], isp_data[2], lineWrapper(isp_data[1])
+                )
+            )
+        f.write("\n======================= osp ========================\n\n")
+        for osp_data in osp_seqs:
+            f.write(
+                ">osp_orf::{}-peri_count~={}\n{}\n".format(
+                    osp_data[0], osp_data[2], lineWrapper(osp_data[1])
+                )
+            )
+
+    args.putative_isp_fasta_file = open(args.putative_isp_fasta_file.name, "r")
+    isp_full = tuple_fasta(args.putative_isp_fasta_file)
+
+    args.putative_osp_fasta_file = open(args.putative_osp_fasta_file.name, "r")
+    osp_full = tuple_fasta(args.putative_osp_fasta_file)
+
+    isp_seqs = []
+    osp_seqs = []
+    for isp_tupe in isp_full:
+        peri_count = str.split(isp_tupe[0], "~=")[1]
+        for pisp, posp in overlap.items():
+            if re.search(("(" + str(pisp) + ")\D"), isp_tupe[0]):
+                peri_count = str.split(isp_tupe[0], "~=")[1]
+                isp_seqs.append((pisp, isp_tupe[1], peri_count))
+
+    for osp_tupe in osp_full:
+        for pisp, posp in overlap.items():
+            for data in posp:
+                if re.search(("(" + str(data[3]) + ")\D"), osp_tupe[0]):
+                    peri_count = str.split(osp_tupe[0], "~=")[1]
+                    osp_seqs.append((data[3], osp_tupe[1], peri_count))
+
+    with args.overlap_txt as f:
+        f.write("================ overlap spanin candidates =================\n")
+        f.write(
+            "isp\tisp_start\tisp_end\tosp\tosp_start\tosp_end\tstrand\tpair_number\n"
+        )
+        if overlap != {}:
+            for pisp, posp in overlap.items():
+                f.write(pisp + "\n")
+                for each_posp in posp:
+                    f.write(
+                        "\t{}\t{}\t{}\t{}\t{}\t{}\t".format(
+                            each_posp[1],
+                            each_posp[2],
+                            each_posp[3],
+                            each_posp[4],
+                            each_posp[5],
+                            each_posp[6],
+                        )
+                    )
+                    if each_posp[6] == "+":
+                        if each_posp[2] in pair_dict["pairs"]["pair_number"].keys():
+                            # print('ovl ; +')
+                            f.write(
+                                ""
+                                + str(pair_dict["pairs"]["pair_number"][each_posp[2]])
+                                + "\n"
+                            )
+                    elif each_posp[6] == "-":
+                        if each_posp[1] in pair_dict["pairs"]["pair_number"].keys():
+                            f.write(
+                                ""
+                                + str(pair_dict["pairs"]["pair_number"][each_posp[1]])
+                                + "\n"
+                            )
+        else:
+            f.write("nothing found")
+
+    with open(args.overlap_txt.name, "a") as f:
+        # print(isp_seqs)
+        f.write("\n================= overlap candidate sequences ================\n")
+        f.write("======================= isp ==========================\n\n")
+        for isp_data in isp_seqs:
+            f.write(
+                ">isp_orf::{}-pericount~={}\n{}\n".format(
+                    isp_data[0], isp_data[2], lineWrapper(isp_data[1])
+                )
+            )
+        f.write("\n======================= osp ========================\n\n")
+        for osp_data in osp_seqs:
+            f.write(
+                ">osp_orf::{}-pericount~={}\n{}\n".format(
+                    osp_data[0], osp_data[2], lineWrapper(osp_data[1])
+                )
+            )
+
+    args.putative_isp_fasta_file = open(args.putative_isp_fasta_file.name, "r")
+    isp_full = tuple_fasta(args.putative_isp_fasta_file)
+    args.putative_osp_fasta_file = open(args.putative_osp_fasta_file.name, "r")
+    osp_full = tuple_fasta(args.putative_osp_fasta_file)
+
+    isp_seqs = []
+    osp_seqs = []
+    for isp_tupe in isp_full:
+        for pisp, posp in separate.items():
+            if re.search(("(" + str(pisp) + ")\D"), isp_tupe[0]):
+                peri_count = str.split(isp_tupe[0], "~=")[1]
+                isp_seqs.append((pisp, isp_tupe[1], peri_count))
+    # print(isp_seqs)
+    for osp_tupe in osp_full:
+        for pisp, posp in separate.items():
+            for data in posp:
+                if re.search(("(" + str(data[3]) + ")\D"), osp_tupe[0]):
+                    peri_count = str.split(osp_tupe[0], "~=")[1]
+                    osp_seqs.append((data[3], osp_tupe[1], peri_count))
+
+    with args.separate_txt as f:
+        f.write("================ separated spanin candidates =================\n")
+        f.write(
+            "isp\tisp_start\tisp_end\tosp\tosp_start\tosp_end\tstrand\tpair_number\n"
+        )
+        if separate != {}:
+            for pisp, posp in separate.items():
+                f.write(pisp + "\n")
+                for each_posp in posp:
+                    f.write(
+                        "\t{}\t{}\t{}\t{}\t{}\t{}\t".format(
+                            each_posp[1],
+                            each_posp[2],
+                            each_posp[3],
+                            each_posp[4],
+                            each_posp[5],
+                            each_posp[6],
+                        )
+                    )
+                    if each_posp[6] == "+":
+                        if each_posp[2] in pair_dict["pairs"]["pair_number"].keys():
+                            f.write(
+                                ""
+                                + str(pair_dict["pairs"]["pair_number"][each_posp[2]])
+                                + "\n"
+                            )
+                    elif each_posp[6] == "-":
+                        if each_posp[1] in pair_dict["pairs"]["pair_number"].keys():
+                            f.write(
+                                ""
+                                + str(pair_dict["pairs"]["pair_number"][each_posp[1]])
+                                + "\n"
+                            )
+        else:
+            f.write("nothing found")
+
+    with open(args.separate_txt.name, "a") as f:
+        f.write("\n================= separated candidate sequences ================\n")
+        f.write("======================= isp ==========================\n\n")
+        for isp_data in isp_seqs:
+            f.write(
+                ">isp_orf::{}-pericount~={}\n{}\n".format(
+                    isp_data[0], isp_data[2], lineWrapper(isp_data[1])
+                )
+            )
+        f.write("\n======================= osp ========================\n\n")
+        for osp_data in osp_seqs:
+            f.write(
+                ">osp_orf::{}-pericount~={}\n{}\n".format(
+                    osp_data[0], osp_data[2], lineWrapper(osp_data[1])
+                )
+            )