Mercurial > repos > cpt > cpt_find_spanins
diff findSpanin.py @ 3:fd70980a516b draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:42:01 +0000 |
parents | |
children | 673d1776d3b9 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/findSpanin.py Mon Jun 05 02:42:01 2023 +0000 @@ -0,0 +1,597 @@ +##### findSpanin.pl --> findSpanin.py +######### Much of this code is very "blocked", in the sense that one thing happens...then a function happens on the return...then another function...etc...etc... + +import argparse +import os +import re # new +import itertools # new +from collections import Counter, OrderedDict +from spaninFuncs import ( + getDescriptions, + grabLocs, + spaninProximity, + splitStrands, + tuple_fasta, + lineWrapper, +) + +### Requirement Inputs +#### INPUT : putative_isp.fa & putative_osp.fa (in that order) +#### PARAMETERS : + +############################################################################### +def write_output(candidates): + """output file function...maybe not needed""" + pass + + +def reconfigure_dict(spanins): + """ + re organizes dictionary to be more friendly for checks + """ + + new_spanin_dict = {} + + for each_spanin_type, data_dict in spanins.items(): + # print(f"{each_spanin_type} == {data_dict}") + new_spanin_dict[each_spanin_type] = {} + new_spanin_dict[each_spanin_type]["positive"] = {} + new_spanin_dict[each_spanin_type]["negative"] = {} + new_spanin_dict[each_spanin_type]["positive"]["coords"] = [] + new_spanin_dict[each_spanin_type]["negative"]["coords"] = [] + for outter_orf, inner_data in data_dict.items(): + list_of_hits = [] + for data_content in inner_data: + # print(data_content) + data_content.insert(0, outter_orf) + # print(f"new data_content -> {data_content}") + # print(data_content) + # list_of_hits += [data_content] + # new_spanin_dict[each_spanin_type] += [data_content] + if data_content[6] == "+": + # print(f"{each_spanin_type} @ POSITIVE") + new_spanin_dict[each_spanin_type]["positive"]["coords"] += [ + data_content + ] + elif data_content[6] == "-": + # print(f"{each_spanin_type} @ NEGATIVE") + new_spanin_dict[each_spanin_type]["negative"]["coords"] += [ + data_content + ] + # print(new_spanin_dict[each_spanin_type]) + # print(reorganized) + # print(f"{outter_orf} => {inner_data}") + # print(new_spanin_dict) + + # print('\n') + # for k, v in new_spanin_dict.items(): + # print(k) + # print(v) + return new_spanin_dict + + +def check_for_uniques(spanins): + """ + Checks for unique spanins based on spanin_type. + If the positive strand end site is _the same_ for a i-spanin, we would group that as "1". + i.e. if ORF1, ORF2, and ORF3 all ended with location 4231, they would not be unique. + """ + pair_dict = {} + pair_dict = { + "pairs": { + "location_amount": [], + "pair_number": {}, + } + } + for each_spanin_type, spanin_data in spanins.items(): + # print(f"{each_spanin_type} ===> {spanin_data}") + # early declarations for cases of no results + pos_check = [] # end checks + pos_uniques = [] + neg_check = [] # start checks + neg_uniques = [] + unique_ends = [] + pos_amt_unique = 0 + neg_amt_unique = 0 + amt_positive = 0 + amt_negative = 0 + spanin_data["uniques"] = 0 + spanin_data["amount"] = 0 + # spanin_data['positive']['amt_positive'] = 0 + # spanin_data['positive']['pos_amt_unique'] = 0 + # spanin_data['positive']['isp_match'] = [] + # spanin_data['negative']['amt_negative'] = 0 + # spanin_data['negative']['neg_amt_unique'] = 0 + # spanin_data['negative']['isp_match'] = [] + # print(spanin_data) + if spanin_data["positive"]["coords"]: + # do something... + # print('in other function') + # print(spanin_data['positive']['coords']) + for each_hit in spanin_data["positive"]["coords"]: + pos_check.append(each_hit[2]) + pair_dict["pairs"]["location_amount"].append(each_hit[2]) + pos_uniques = list( + set( + [ + end_site + for end_site in pos_check + if pos_check.count(end_site) >= 1 + ] + ) + ) + # print(pos_check) + # print(pos_uniques) + amt_positive = len(spanin_data["positive"]["coords"]) + pos_amt_unique = len(pos_uniques) + if amt_positive: + spanin_data["positive"]["amt_positive"] = amt_positive + spanin_data["positive"]["pos_amt_unique"] = pos_amt_unique + # pair_dict['pairs']['locations'].extend(pos_uniques) + else: + spanin_data["positive"]["amt_positive"] = 0 + spanin_data["positive"]["pos_amt_unique"] = 0 + if spanin_data["negative"]["coords"]: + + # do something else... + # print('in other function') + # print(spanin_data['negative']['coords']) + for each_hit in spanin_data["negative"]["coords"]: + neg_check.append(each_hit[1]) + pair_dict["pairs"]["location_amount"].append(each_hit[1]) + neg_uniques = list( + set( + [ + start_site + for start_site in neg_check + if neg_check.count(start_site) >= 1 + ] + ) + ) + # print(neg_uniques) + amt_negative = len(spanin_data["negative"]["coords"]) + neg_amt_unique = len(neg_uniques) + if amt_negative: + spanin_data["negative"]["amt_negative"] = amt_negative + spanin_data["negative"]["neg_amt_unique"] = neg_amt_unique + # pair_dict['pairs']['locations'].extend(neg_uniques) + else: + spanin_data["negative"]["amt_negative"] = 0 + spanin_data["negative"]["neg_amt_unique"] = 0 + spanin_data["uniques"] += ( + spanin_data["positive"]["pos_amt_unique"] + + spanin_data["negative"]["neg_amt_unique"] + ) + spanin_data["amount"] += ( + spanin_data["positive"]["amt_positive"] + + spanin_data["negative"]["amt_negative"] + ) + # print(spanin_data['uniques']) + list(set(pair_dict["pairs"]["location_amount"])) + pair_dict["pairs"]["location_amount"] = dict( + Counter(pair_dict["pairs"]["location_amount"]) + ) + for data in pair_dict.values(): + # print(data['locations']) + # print(type(data['locations'])) + v = 0 + for loc, count in data["location_amount"].items(): + # data['pair_number'] = {loc + v += 1 + data["pair_number"][loc] = v + # print(dict(Counter(pair_dict['pairs']['locations']))) + # print(pair_dict) + spanins["total_amount"] = ( + spanins["EMBEDDED"]["amount"] + + spanins["SEPARATED"]["amount"] + + spanins["OVERLAPPED"]["amount"] + ) + spanins["total_unique"] = ( + spanins["EMBEDDED"]["uniques"] + + spanins["SEPARATED"]["uniques"] + + spanins["OVERLAPPED"]["uniques"] + ) + # spanins['total_unique'] = len(pair_dict['pairs']['pair_number']) + return spanins, pair_dict + + +if __name__ == "__main__": + + # Common parameters for both ISP / OSP portion of script + + parser = argparse.ArgumentParser( + description="Trim the putative protein candidates and find potential i-spanin / o-spanin pairs" + ) + + parser.add_argument( + "putative_isp_fasta_file", + type=argparse.FileType("r"), + help='Putative i-spanin FASTA file, output of "generate-putative-isp"', + ) # the "input" argument + + parser.add_argument( + "putative_osp_fasta_file", + type=argparse.FileType("r"), + help='Putative o-spanin FASTA file, output of "generate-putative-osp"', + ) + + parser.add_argument( + "--max_isp_osp_distance", + dest="max_isp_osp_distance", + default=10, + type=int, + help="max distance from end of i-spanin to start of o-spanin, measured in AAs", + ) + + parser.add_argument( + "--embedded_txt", + dest="embedded_txt", + type=argparse.FileType("w"), + default="_findSpanin_embedded_results.txt", + help="Results of potential embedded spanins", + ) + parser.add_argument( + "--overlap_txt", + dest="overlap_txt", + type=argparse.FileType("w"), + default="_findSpanin_overlap_results.txt", + help="Results of potential overlapping spanins", + ) + parser.add_argument( + "--separate_txt", + dest="separate_txt", + type=argparse.FileType("w"), + default="_findSpanin_separated_results.txt", + help="Results of potential separated spanins", + ) + + parser.add_argument( + "--summary_txt", + dest="summary_txt", + type=argparse.FileType("w"), + default="_findSpanin_summary.txt", + help="Results of potential spanin pairs", + ) + parser.add_argument( + "-v", action="version", version="0.3.0" + ) # Is this manually updated? + args = parser.parse_args() + + #### RE-WRITE + SPANIN_TYPES = {} + SPANIN_TYPES["EMBEDDED"] = {} + SPANIN_TYPES["OVERLAPPED"] = {} + SPANIN_TYPES["SEPARATED"] = {} + # SPANIN_TYPES = { + # 'EMBEDDED' : {}, + # 'OVERLAPPED' : {}, + # 'SEPARATED' : {}, + # } + + isp = getDescriptions(args.putative_isp_fasta_file) + args.putative_isp_fasta_file = open(args.putative_isp_fasta_file.name, "r") + isp_full = tuple_fasta(args.putative_isp_fasta_file) + + osp = getDescriptions(args.putative_osp_fasta_file) + args.putative_osp_fasta_file = open(args.putative_osp_fasta_file.name, "r") + osp_full = tuple_fasta(args.putative_osp_fasta_file) + + #### location data + location_data = {"isp": [], "osp": []} + spanins = [isp, osp] + for idx, each_spanin_type in enumerate(spanins): + for description in each_spanin_type: + locations = grabLocs(description) + if idx == 0: # i-spanin + location_data["isp"].append(locations) + elif idx == 1: # o-spanin + location_data["osp"].append(locations) + + #### Check for types of spanins + embedded, overlap, separate = spaninProximity( + isp=location_data["isp"], + osp=location_data["osp"], + max_dist=args.max_isp_osp_distance * 3, + ) + + SPANIN_TYPES["EMBEDDED"] = embedded + SPANIN_TYPES["OVERLAPPED"] = overlap + SPANIN_TYPES["SEPARATED"] = separate + + # for spanin_type, spanin in SPANIN_TYPES.items(): + # s = 0 + # for sequence in spanin.values(): + # s += len(sequence) + # SPANIN_TYPES[spanin_type]['amount'] = s + # SPANIN_TYPES[spanin_type]['unique'] = len(spanin.keys()) + + # check_for_unique_spanins(SPANIN_TYPES) + spanins = reconfigure_dict(SPANIN_TYPES) + spanins, pair_dict = check_for_uniques(spanins) + # print(pair_dict) + with args.summary_txt as f: + for each_spanin_type, spanin_data in spanins.items(): + try: + if each_spanin_type not in ["total_amount", "total_unique"]: + # print(each_spanin_type) + # print(each_spanin_type) + f.write( + "=~~~~~= " + + str(each_spanin_type) + + " Spanin Candidate Statistics =~~~~~=\n" + ) + f.writelines( + "Total Candidate Pairs = " + str(spanin_data["amount"]) + "\n" + ) + f.writelines( + "Total Unique Pairs = " + str(spanin_data["uniques"]) + "\n" + ) + if each_spanin_type == "EMBEDDED": + for k, v in SPANIN_TYPES["EMBEDDED"].items(): + # print(k) + f.writelines( + "" + + str(k) + + " ==> Amount of corresponding candidate o-spanins(s): " + + str(len(v)) + + "\n" + ) + if each_spanin_type == "SEPARATED": + for k, v in SPANIN_TYPES["SEPARATED"].items(): + f.writelines( + "" + + str(k) + + " ==> Amount of corresponding candidate o-spanins(s): " + + str(len(v)) + + "\n" + ) + if each_spanin_type == "OVERLAPPED": + for k, v in SPANIN_TYPES["OVERLAPPED"].items(): + f.writelines( + "" + + str(k) + + " ==> Amount of corresponding candidate o-spanins(s): " + + str(len(v)) + + "\n" + ) + except TypeError: + continue + f.write("\n=~~~~~= Tally from ALL spanin types =~~~~~=\n") + f.writelines("Total Candidates = " + str(spanins["total_amount"]) + "\n") + f.writelines( + "Total Unique Candidate Pairs = " + str(spanins["total_unique"]) + "\n" + ) + + args.putative_isp_fasta_file = open(args.putative_isp_fasta_file.name, "r") + isp_full = tuple_fasta(args.putative_isp_fasta_file) + + args.putative_osp_fasta_file = open(args.putative_osp_fasta_file.name, "r") + osp_full = tuple_fasta(args.putative_osp_fasta_file) + + # print(isp_full) + isp_seqs = [] + osp_seqs = [] + for isp_tupe in isp_full: + # print(isp_tupe) + for pisp, posp in embedded.items(): + # print(f"ISP = searching for {pisp} in {isp_tupe[0]}") + if re.search(("(" + str(pisp) + ")\D"), isp_tupe[0]): + # print(isp_tupe[0]) + # print(peri_count) + peri_count = str.split(isp_tupe[0], "~=")[1] + isp_seqs.append((pisp, isp_tupe[1], peri_count)) + # print(isp_seqs) + for osp_tupe in osp_full: + for pisp, posp in embedded.items(): + for data in posp: + # print(f"OSP = searching for {data[3]} in {osp_tupe[0]}, coming from this object: {data}") + if re.search(("(" + str(data[3]) + ")\D"), osp_tupe[0]): + peri_count = str.split(osp_tupe[0], "~=")[1] + osp_seqs.append((data[3], osp_tupe[1], peri_count)) + + with args.embedded_txt as f: + f.write("================ embedded spanin candidates =================\n") + f.write( + "isp\tisp_start\tisp_end\tosp\tosp_start\tosp_end\tstrand\tpair_number\n" + ) + if embedded != {}: + # print(embedded) + for pisp, posp in embedded.items(): + # print(f"{pisp} - {posp}") + f.write(pisp + "\n") + for each_posp in posp: + # print(posp) + f.write( + "\t{}\t{}\t{}\t{}\t{}\t{}\t".format( + each_posp[1], + each_posp[2], + each_posp[3], + each_posp[4], + each_posp[5], + each_posp[6], + ) + ) + if each_posp[6] == "+": + if each_posp[2] in pair_dict["pairs"]["pair_number"].keys(): + f.write( + "" + + str(pair_dict["pairs"]["pair_number"][each_posp[2]]) + + "\n" + ) + elif each_posp[6] == "-": + if each_posp[1] in pair_dict["pairs"]["pair_number"].keys(): + f.write( + "" + + str(pair_dict["pairs"]["pair_number"][each_posp[1]]) + + "\n" + ) + else: + f.write("nothing found") + + with open(args.embedded_txt.name, "a") as f: + f.write("\n================= embedded candidate sequences ================\n") + f.write("======================= isp ==========================\n\n") + for isp_data in isp_seqs: + # print(isp_data) + f.write( + ">isp_orf::{}-peri_count~={}\n{}\n".format( + isp_data[0], isp_data[2], lineWrapper(isp_data[1]) + ) + ) + f.write("\n======================= osp ========================\n\n") + for osp_data in osp_seqs: + f.write( + ">osp_orf::{}-peri_count~={}\n{}\n".format( + osp_data[0], osp_data[2], lineWrapper(osp_data[1]) + ) + ) + + args.putative_isp_fasta_file = open(args.putative_isp_fasta_file.name, "r") + isp_full = tuple_fasta(args.putative_isp_fasta_file) + + args.putative_osp_fasta_file = open(args.putative_osp_fasta_file.name, "r") + osp_full = tuple_fasta(args.putative_osp_fasta_file) + + isp_seqs = [] + osp_seqs = [] + for isp_tupe in isp_full: + peri_count = str.split(isp_tupe[0], "~=")[1] + for pisp, posp in overlap.items(): + if re.search(("(" + str(pisp) + ")\D"), isp_tupe[0]): + peri_count = str.split(isp_tupe[0], "~=")[1] + isp_seqs.append((pisp, isp_tupe[1], peri_count)) + + for osp_tupe in osp_full: + for pisp, posp in overlap.items(): + for data in posp: + if re.search(("(" + str(data[3]) + ")\D"), osp_tupe[0]): + peri_count = str.split(osp_tupe[0], "~=")[1] + osp_seqs.append((data[3], osp_tupe[1], peri_count)) + + with args.overlap_txt as f: + f.write("================ overlap spanin candidates =================\n") + f.write( + "isp\tisp_start\tisp_end\tosp\tosp_start\tosp_end\tstrand\tpair_number\n" + ) + if overlap != {}: + for pisp, posp in overlap.items(): + f.write(pisp + "\n") + for each_posp in posp: + f.write( + "\t{}\t{}\t{}\t{}\t{}\t{}\t".format( + each_posp[1], + each_posp[2], + each_posp[3], + each_posp[4], + each_posp[5], + each_posp[6], + ) + ) + if each_posp[6] == "+": + if each_posp[2] in pair_dict["pairs"]["pair_number"].keys(): + # print('ovl ; +') + f.write( + "" + + str(pair_dict["pairs"]["pair_number"][each_posp[2]]) + + "\n" + ) + elif each_posp[6] == "-": + if each_posp[1] in pair_dict["pairs"]["pair_number"].keys(): + f.write( + "" + + str(pair_dict["pairs"]["pair_number"][each_posp[1]]) + + "\n" + ) + else: + f.write("nothing found") + + with open(args.overlap_txt.name, "a") as f: + # print(isp_seqs) + f.write("\n================= overlap candidate sequences ================\n") + f.write("======================= isp ==========================\n\n") + for isp_data in isp_seqs: + f.write( + ">isp_orf::{}-pericount~={}\n{}\n".format( + isp_data[0], isp_data[2], lineWrapper(isp_data[1]) + ) + ) + f.write("\n======================= osp ========================\n\n") + for osp_data in osp_seqs: + f.write( + ">osp_orf::{}-pericount~={}\n{}\n".format( + osp_data[0], osp_data[2], lineWrapper(osp_data[1]) + ) + ) + + args.putative_isp_fasta_file = open(args.putative_isp_fasta_file.name, "r") + isp_full = tuple_fasta(args.putative_isp_fasta_file) + args.putative_osp_fasta_file = open(args.putative_osp_fasta_file.name, "r") + osp_full = tuple_fasta(args.putative_osp_fasta_file) + + isp_seqs = [] + osp_seqs = [] + for isp_tupe in isp_full: + for pisp, posp in separate.items(): + if re.search(("(" + str(pisp) + ")\D"), isp_tupe[0]): + peri_count = str.split(isp_tupe[0], "~=")[1] + isp_seqs.append((pisp, isp_tupe[1], peri_count)) + # print(isp_seqs) + for osp_tupe in osp_full: + for pisp, posp in separate.items(): + for data in posp: + if re.search(("(" + str(data[3]) + ")\D"), osp_tupe[0]): + peri_count = str.split(osp_tupe[0], "~=")[1] + osp_seqs.append((data[3], osp_tupe[1], peri_count)) + + with args.separate_txt as f: + f.write("================ separated spanin candidates =================\n") + f.write( + "isp\tisp_start\tisp_end\tosp\tosp_start\tosp_end\tstrand\tpair_number\n" + ) + if separate != {}: + for pisp, posp in separate.items(): + f.write(pisp + "\n") + for each_posp in posp: + f.write( + "\t{}\t{}\t{}\t{}\t{}\t{}\t".format( + each_posp[1], + each_posp[2], + each_posp[3], + each_posp[4], + each_posp[5], + each_posp[6], + ) + ) + if each_posp[6] == "+": + if each_posp[2] in pair_dict["pairs"]["pair_number"].keys(): + f.write( + "" + + str(pair_dict["pairs"]["pair_number"][each_posp[2]]) + + "\n" + ) + elif each_posp[6] == "-": + if each_posp[1] in pair_dict["pairs"]["pair_number"].keys(): + f.write( + "" + + str(pair_dict["pairs"]["pair_number"][each_posp[1]]) + + "\n" + ) + else: + f.write("nothing found") + + with open(args.separate_txt.name, "a") as f: + f.write("\n================= separated candidate sequences ================\n") + f.write("======================= isp ==========================\n\n") + for isp_data in isp_seqs: + f.write( + ">isp_orf::{}-pericount~={}\n{}\n".format( + isp_data[0], isp_data[2], lineWrapper(isp_data[1]) + ) + ) + f.write("\n======================= osp ========================\n\n") + for osp_data in osp_seqs: + f.write( + ">osp_orf::{}-pericount~={}\n{}\n".format( + osp_data[0], osp_data[2], lineWrapper(osp_data[1]) + ) + )