Mercurial > repos > cpt > cpt_putative_osp
diff spaninFuncs.py @ 1:05b97a4dce94 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:51:44 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/spaninFuncs.py Mon Jun 05 02:51:44 2023 +0000 @@ -0,0 +1,530 @@ +""" +PREMISE +### Functions/Classes that are used in both generate-putative-osp.py and generate-putative-isp.py +###### Main premise here is to make the above scripts a little more DRY, as well as easily readable for execution. +###### Documentation will ATTEMPT to be thourough here +""" + +import re +from Bio import SeqIO +from Bio import Seq +from collections import OrderedDict + +# Not written in OOP for a LITTLE bit of trying to keep the complication down in case adjustments are needed by someone else. +# Much of the manipulation is string based; so it should be straightforward as well as moderately quick +################## GLobal Variables +Lys = "K" + + +def check_back_end_snorkels(seq, tmsize): + """ + Searches through the backend of a potential TMD snorkel. This is the 2nd part of a TMD snorkel lysine match. + --> seq : should be the sequence fed from the "search_region" portion of the sequence + --> tmsize : size of the potential TMD being investigated + """ + found = [] + if seq[tmsize - 4] == Lys and re.search(("[FIWLVMYCATGS]"), seq[tmsize - 5]): + found = "match" + return found + elif seq[tmsize - 3] == Lys and re.search(("[FIWLVMYCATGS]"), seq[tmsize - 4]): + found = "match" + return found + elif seq[tmsize - 2] == Lys and re.search(("[FIWLVMYCATGS]"), seq[tmsize - 3]): + found = "match" + return found + elif seq[tmsize - 1] == Lys and re.search(("[FIWLVMYCATGS]"), seq[tmsize - 2]): + found = "match" + return found + else: + found = "NOTmatch" + return found + + +def prep_a_gff3(fa, spanin_type, org): + """ + Function parses an input detailed 'fa' file and outputs a 'gff3' file + ---> fa = input .fa file + ---> output = output a returned list of data, easily portable to a gff3 next + ---> spanin_type = 'isp' or 'osp' + """ + with org as f: + header = f.readline() + orgacc = header.split(" ") + orgacc = orgacc[0].split(">")[1].strip() + fa_zip = tuple_fasta(fa) + data = [] + for a_pair in fa_zip: + # print(a_pair) + if re.search(("(\[1\])"), a_pair[0]): + strand = "+" + elif re.search(("(\[-1\])"), a_pair[0]): + strand = "-" # column 7 + start = re.search(("[\d]+\.\."), a_pair[0]).group(0).split("..")[0] # column 4 + end = re.search(("\.\.[\d]+"), a_pair[0]).group(0).split("..")[1] # column 5 + orfid = re.search(("(ORF)[\d]+"), a_pair[0]).group(0) # column 1 + if spanin_type == "isp": + methodtype = "CDS" # column 3 + spanin = "isp" + elif spanin_type == "osp": + methodtype = "CDS" # column 3 + spanin = "osp" + elif spanin_type == "usp": + methodtype = "CDS" + spanin = "usp" + else: + raise "need to input spanin type" + source = "cpt.py|putative-*.py" # column 2 + score = "." # column 6 + phase = "." # column 8 + attributes = ( + "ID=" + orgacc + "|" + orfid + ";ALIAS=" + spanin + ";SEQ=" + a_pair[1] + ) # column 9 + sequence = [ + [orgacc, source, methodtype, start, end, score, strand, phase, attributes] + ] + data += sequence + return data + + +def write_gff3(data, output="results.gff3"): + """ + Parses results from prep_a_gff3 into a gff3 file + ---> input : list from prep_a_gff3 + ---> output : gff3 file + """ + data = data + filename = output + with filename as f: + f.write("#gff-version 3\n") + for value in data: + f.write( + "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\n".format( + value[0], + value[1], + value[2], + value[3], + value[4], + value[5], + value[6], + value[7], + value[8], + ) + ) + f.close() + + +def find_tmd( + pair, + minimum=10, + maximum=30, + TMDmin=10, + TMDmax=20, + isp_mode=False, + peri_min=18, + peri_max=206, +): + """ + Function that searches for lysine snorkels and then for a spanning hydrophobic region that indicates a potential TMD + ---> pair : Input of tuple with description and AA sequence (str) + ---> minimum : How close from the initial start codon a TMD can be within + ---> maximum : How far from the initial start codon a TMD can be within + ---> TMDmin : The minimum size that a transmembrane can be (default = 10) + ---> TMDmax : The maximum size tha ta transmembrane can be (default = 20) + """ + # hydrophobicAAs = ['P', 'F', 'I', 'W', 'L', 'V', 'M', 'Y', 'C', 'A', 'T', 'G', 'S'] + tmd = [] + s = str(pair[1]) # sequence being analyzed + # print(s) # for trouble shooting + if maximum > len(s): + maximum = len(s) + search_region = s[minimum - 1 : maximum + 1] + # print(f"this is the search region: {search_region}") + # print(search_region) # for trouble shooting + + for tmsize in range(TMDmin, TMDmax + 1, 1): + # print(f"this is the current tmsize we're trying: {tmsize}") + # print('==============='+str(tmsize)+'================') # print for troubleshooting + pattern = ( + "[PFIWLVMYCATGS]{" + str(tmsize) + "}" + ) # searches for these hydrophobic residues tmsize total times + # print(pattern) + # print(f"sending to regex: {search_region}") + if re.search( + ("[K]"), search_region[1:8] + ): # grabbing one below with search region, so I want to grab one ahead here when I query. + store_search = re.search( + ("[K]"), search_region[1:8] + ) # storing regex object + where_we_are = store_search.start() # finding where we got the hit + if re.search( + ("[PFIWLVMYCATGS]"), search_region[where_we_are + 1] + ) and re.search( + ("[PFIWLVMYCATGS]"), search_region[where_we_are - 1] + ): # hydrophobic neighbor + # try: + g = re.search( + ("[PFIWLVMYCATGS]"), search_region[where_we_are + 1] + ).group() + backend = check_back_end_snorkels(search_region, tmsize) + if backend == "match": + if isp_mode: + g = re.search((pattern), search_region).group() + end_of_tmd = re.search((g), s).end() + 1 + amt_peri = len(s) - end_of_tmd + if peri_min <= amt_peri <= peri_max: + pair_desc = pair[0] + ", peri_count~=" + str(amt_peri) + new_pair = (pair_desc, pair[1]) + tmd.append(new_pair) + else: + tmd.append(pair) + else: + continue + # else: + # print("I'm continuing out of snorkel loop") + # print(f"{search_region}") + # continue + if re.search((pattern), search_region): + # print(f"found match: {}") + # print("I AM HEREEEEEEEEEEEEEEEEEEEEEEE") + # try: + if isp_mode: + g = re.search((pattern), search_region).group() + end_of_tmd = re.search((g), s).end() + 1 + amt_peri = len(s) - end_of_tmd + if peri_min <= amt_peri <= peri_max: + pair_desc = pair[0] + ", peri_count~=" + str(amt_peri) + new_pair = (pair_desc, pair[1]) + tmd.append(new_pair) + else: + tmd.append(pair) + else: + continue + + return tmd + + +def find_lipobox( + pair, minimum=10, maximum=50, min_after=30, max_after=185, regex=1, osp_mode=False +): + """ + Function that takes an input tuple, and will return pairs of sequences to their description that have a lipoobox + ---> minimum - min distance from start codon to first AA of lipobox + ---> maximum - max distance from start codon to first AA of lipobox + ---> regex - option 1 (default) => more strict regular expression ; option 2 => looser selection, imported from LipoRy + + """ + if regex == 1: + pattern = "[ILMFTV][^REKD][GAS]C" # regex for Lipobox from findSpanin.pl + elif regex == 2: + pattern = "[ACGSILMFTV][^REKD][GAS]C" # regex for Lipobox from LipoRy + + candidates = [] + s = str(pair[1]) + # print(s) # trouble shooting + search_region = s[ + minimum - 1 : maximum + 5 + ] # properly slice the input... add 4 to catch if it hangs off at max input + # print(search_region) # trouble shooting + patterns = ["[ILMFTV][^REKD][GAS]C", "AW[AGS]C"] + for pattern in patterns: + # print(pattern) # trouble shooting + if re.search((pattern), search_region): # lipobox must be WITHIN the range... + # searches the sequence with the input RegEx AND omits if + g = re.search( + (pattern), search_region + ).group() # find the exact group match + amt_peri = len(s) - re.search((g), s).end() + 1 + if min_after <= amt_peri <= max_after: # find the lipobox end region + if osp_mode: + pair_desc = pair[0] + ", peri_count~=" + str(amt_peri) + new_pair = (pair_desc, pair[1]) + candidates.append(new_pair) + else: + candidates.append(pair) + + return candidates + + +def tuple_fasta(fasta_file): + """ + #### INPUT: Fasta File + #### OUTPUT: zipped (zip) : pairwise relationship of description to sequence + #### + """ + fasta = SeqIO.parse(fasta_file, "fasta") + descriptions = [] + sequences = [] + for r in fasta: # iterates and stores each description and sequence + description = r.description + sequence = str(r.seq) + if ( + sequence[0] != "I" + ): # the translation table currently has I as a potential start codon ==> this will remove all ORFs that start with I + descriptions.append(description) + sequences.append(sequence) + else: + continue + + return zip(descriptions, sequences) + + +def lineWrapper(text, charactersize=60): + + if len(text) <= charactersize: + return text + else: + return ( + text[:charactersize] + + "\n" + + lineWrapper(text[charactersize:], charactersize) + ) + + +def getDescriptions(fasta): + """ + Takes an output FASTA file, and parses retrieves the description headers. These headers contain information needed + for finding locations of a potential i-spanin and o-spanin proximity to one another. + """ + desc = [] + with fasta as f: + for line in f: + if line.startswith(">"): + desc.append(line) + return desc + + +def splitStrands(text, strand="+"): + # positive_strands = [] + # negative_strands = [] + if strand == "+": + if re.search(("(\[1\])"), text): + return text + elif strand == "-": + if re.search(("(\[-1\])"), text): + return text + # return positive_strands, negative_strands + + +def parse_a_range(pair, start, end): + """ + Takes an input data tuple from a fasta tuple pair and keeps only those within the input sequence range + ---> data : fasta tuple data + ---> start : start range to keep + ---> end : end range to keep (will need to + 1) + """ + matches = [] + for each_pair in pair: + + s = re.search(("[\d]+\.\."), each_pair[0]).group(0) # Start of the sequence + s = int(s.split("..")[0]) + e = re.search(("\.\.[\d]+"), each_pair[0]).group(0) + e = int(e.split("..")[1]) + if start - 1 <= s and e <= end + 1: + matches.append(each_pair) + else: + continue + # else: + # continue + # if matches != []: + return matches + # else: + # print('no candidates within selected range') + + +def grabLocs(text): + """ + Grabs the locations of the spanin based on NT location (seen from ORF). Grabs the ORF name, as per named from the ORF class/module + from cpt.py + """ + start = re.search(("[\d]+\.\."), text).group( + 0 + ) # Start of the sequence ; looks for [numbers].. + end = re.search(("\.\.[\d]+"), text).group( + 0 + ) # End of the sequence ; Looks for ..[numbers] + orf = re.search(("(ORF)[\d]+"), text).group( + 0 + ) # Looks for ORF and the numbers that are after it + if re.search(("(\[1\])"), text): # stores strand + strand = "+" + elif re.search(("(\[-1\])"), text): # stores strand + strand = "-" + start = int(start.split("..")[0]) + end = int(end.split("..")[1]) + vals = [start, end, orf, strand] + + return vals + + +def spaninProximity(isp, osp, max_dist=30): + """ + _NOTE THIS FUNCTION COULD BE MODIFIED TO RETURN SEQUENCES_ + Compares the locations of i-spanins and o-spanins. max_dist is the distance in NT measurement from i-spanin END site + to o-spanin START. The user will be inputting AA distance, so a conversion will be necessary (<user_input> * 3) + I modified this on 07.30.2020 to bypass the pick + or - strand. To + INPUT: list of OSP and ISP candidates + OUTPUT: Return (improved) candidates for overlapping, embedded, and separate list + """ + + embedded = {} + overlap = {} + separate = {} + for iseq in isp: + embedded[iseq[2]] = [] + overlap[iseq[2]] = [] + separate[iseq[2]] = [] + for oseq in osp: + if iseq[3] == "+": + if oseq[3] == "+": + if iseq[0] < oseq[0] < iseq[1] and oseq[1] < iseq[1]: + ### EMBEDDED ### + combo = [ + iseq[0], + iseq[1], + oseq[2], + oseq[0], + oseq[1], + iseq[3], + ] # ordering a return for dic + embedded[iseq[2]] += [combo] + elif iseq[0] < oseq[0] <= iseq[1] and oseq[1] > iseq[1]: + ### OVERLAP / SEPARATE ### + if (iseq[1] - oseq[0]) < 6: + combo = [ + iseq[0], + iseq[1], + oseq[2], + oseq[0], + oseq[1], + iseq[3], + ] + separate[iseq[2]] += [combo] + else: + combo = [ + iseq[0], + iseq[1], + oseq[2], + oseq[0], + oseq[1], + iseq[3], + ] + overlap[iseq[2]] += [combo] + elif iseq[1] <= oseq[0] <= iseq[1] + max_dist: + combo = [iseq[0], iseq[1], oseq[2], oseq[0], oseq[1], iseq[3]] + separate[iseq[2]] += [combo] + else: + continue + if iseq[3] == "-": + if oseq[3] == "-": + if iseq[0] <= oseq[1] <= iseq[1] and oseq[0] > iseq[0]: + ### EMBEDDED ### + combo = [ + iseq[0], + iseq[1], + oseq[2], + oseq[0], + oseq[1], + iseq[3], + ] # ordering a return for dict + embedded[iseq[2]] += [combo] + elif iseq[0] <= oseq[1] <= iseq[1] and oseq[0] < iseq[0]: + if (oseq[1] - iseq[0]) < 6: + combo = [ + iseq[0], + iseq[1], + oseq[2], + oseq[0], + oseq[1], + iseq[3], + ] + separate[iseq[2]] += [combo] + else: + combo = [ + iseq[0], + iseq[1], + oseq[2], + oseq[0], + oseq[1], + iseq[3], + ] + overlap[iseq[2]] += [combo] + elif iseq[0] - 10 < oseq[1] < iseq[0]: + combo = [iseq[0], iseq[1], oseq[2], oseq[0], oseq[1], iseq[3]] + separate[iseq[2]] += [combo] + else: + continue + + embedded = {k: embedded[k] for k in embedded if embedded[k]} + overlap = {k: overlap[k] for k in overlap if overlap[k]} + separate = {k: separate[k] for k in separate if separate[k]} + + return embedded, overlap, separate + + +def check_for_usp(): + "pass" + + +############################################### TEST RANGE ######################################################################### +#################################################################################################################################### +if __name__ == "__main__": + + #### TMD TEST + test_desc = ["one", "two", "three", "four", "five"] + test_seq = [ + "XXXXXXXXXXXXXXXFMCFMCFMCFMCFMCXXXXXXXXXXXXXXXXXXXXXXXXXX", + "XXXXXXXXAAKKKKKKKKKKKKKKKXXXXXXXXXXXXX", + "XXXXXXX", + "XXXXXXXXXXXKXXXXXXXXXX", + "XXXXXXXXXXAKXXXXXXXXXXAKXXXXXXXX", + ] + # for l in + # combo = zip(test_desc,test_seq) + pairs = zip(test_desc, test_seq) + tmd = [] + for each_pair in pairs: + # print(each_pair) + try: + tmd += find_tmd(pair=each_pair) + except (IndexError, TypeError): + continue + # try:s = each_pair[1] + # tmd += find_tmd(seq=s, tmsize=15) + # print('\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n') + # print(tmd) + # print('\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n') + + #### tuple-fasta TEST + # fasta_file = 'out_isp.fa' + # ret = tuple_fasta(fasta_file) + # print('=============') + # for i in ret: + # print(i[1]) + + #### LipoBox TEST + test_desc = ["one", "two", "three", "four", "five", "six", "seven"] + test_seq = [ + "XXXXXXXXXTGGCXXXXXXXXXXXXXXXX", + "XXXXXXXXAAKKKKKKKKKKKKKKKXXXXXXXXXXXXX", + "XXXXXXX", + "AGGCXXXXXXXXXXXXXXXXXXXXTT", + "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXTGGC", + "XXXXXXXXXXXXXXXXXXXXXXXXXXTGGC", + "MSTLRELRLRRALKEQSMRYLLSIKKTLPRWKGALIGLFLICVATISGCASESKLPEPPMVSVDSSLMVEPNLTTEMLNVFSQ*", + ] + pairs = zip(test_desc, test_seq) + lipo = [] + for each_pair in pairs: + # print(each_pair) + # try: + try: + lipo += find_lipobox(pair=each_pair, regex=2) # , minimum=8) + except TypeError: # catches if something doesnt have the min/max requirements (something is too small) + continue + # except: + # continue + # print('\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n') + #############################3 + # g = prep_a_gff3(fa='putative_isp.fa', spanin_type='isp') + # print(g) + # write_gff3(data=g)