0
|
1 #!/usr/bin/env python
|
|
2 from Bio import SeqIO
|
|
3 import sys
|
|
4 from xmfa import parse_xmfa, percent_identity
|
|
5 import argparse
|
|
6 import logging
|
|
7 import itertools
|
|
8 from collections import Counter
|
|
9
|
|
10 logging.basicConfig(level=logging.INFO)
|
|
11 log = logging.getLogger(__name__)
|
|
12
|
|
13
|
|
14 def _id_tn_dict(sequences):
|
|
15 """Figure out sequence IDs AND sequence lengths from fasta file
|
|
16 """
|
|
17 label_convert = {}
|
|
18 if sequences is not None:
|
|
19 if len(sequences) == 1:
|
|
20 for i, record in enumerate(SeqIO.parse(sequences[0], "fasta")):
|
|
21 label_convert[str(i + 1)] = {"id": record.id, "len": len(record)}
|
|
22 else:
|
|
23 for i, sequence in enumerate(sequences):
|
|
24 for record in SeqIO.parse(sequence, "fasta"):
|
|
25 label_convert[str(i + 1)] = {"id": record.id, "len": len(record)}
|
|
26 continue
|
|
27 # check for repeated sequence IDs
|
|
28 id_dupes = Counter(list(x["id"] for x in label_convert.values()))
|
|
29 for dupe in id_dupes:
|
|
30 if id_dupes[dupe] > 1:
|
|
31 log.debug("Duplicate FASTA ID Found: %s\n" % (dupe))
|
|
32 for xmfaid in label_convert.keys():
|
|
33 # Change the duplicate labels to have the XMFA identifer
|
|
34 # as a prefix so not to break the table generation later
|
|
35 if label_convert[xmfaid]["id"] == dupe:
|
|
36 label_convert[xmfaid]["id"] = "%s_%s" % (xmfaid, dupe)
|
|
37
|
|
38 return label_convert
|
|
39
|
|
40
|
|
41 def total_similarity(xmfa_file, sequences=None, dice=False):
|
|
42 if sequences is None:
|
|
43 raise Exception("Must provide a non-zero number of sequence files")
|
|
44
|
|
45 label_convert = _id_tn_dict(sequences)
|
|
46 lcbs = parse_xmfa(xmfa_file)
|
|
47
|
|
48 # make a matrix based on number of sequences
|
|
49 table = {}
|
|
50
|
|
51 for lcb in lcbs:
|
|
52 # ignore LCBs containing only one sequence
|
|
53 if len(lcb) == 0:
|
|
54 continue
|
|
55
|
|
56 # permutations based on num sequences to compare for current LCB
|
|
57 compare_seqs = list(itertools.permutations(range(0, len(lcb)), 2))
|
|
58 for permutation in compare_seqs:
|
|
59 (i, j) = permutation
|
|
60 similarity = percent_identity(lcb[i]["seq"], lcb[j]["seq"])
|
|
61
|
|
62 i_name = label_convert[lcb[i]["id"]]["id"]
|
|
63 j_name = label_convert[lcb[j]["id"]]["id"]
|
|
64 # find length of sequence in LCB
|
|
65 length_seq_lcb = lcb[i]["end"] - (lcb[i]["start"] - 1)
|
|
66 # populate table with normalized similarity value based on length_seq_lcb
|
|
67 if (i_name, j_name) not in table:
|
|
68 table[(i_name, j_name)] = 0
|
|
69 table[(i_name, j_name)] += length_seq_lcb * similarity
|
|
70
|
|
71 # finalize total percent similarity by dividing by length of parent sequence
|
|
72 for i in label_convert.keys():
|
|
73 for j in label_convert.keys():
|
|
74 i_name = label_convert[i]["id"]
|
|
75 j_name = label_convert[j]["id"]
|
|
76 if (i_name, j_name) in table:
|
|
77 if dice:
|
|
78 table[(i_name, j_name)] = (
|
|
79 2
|
|
80 * table[(i_name, j_name)]
|
|
81 / (label_convert[i]["len"] + label_convert[j]["len"])
|
|
82 )
|
|
83 else:
|
|
84 table[(i_name, j_name)] = (
|
|
85 table[(i_name, j_name)] / label_convert[i]["len"]
|
|
86 )
|
|
87 else:
|
|
88 table[(i_name, j_name)] = 0
|
|
89
|
|
90 if i_name == j_name:
|
|
91 table[(i_name, j_name)] = 100
|
|
92
|
|
93 # print table
|
|
94 names = []
|
|
95 table_keys = sorted(label_convert.keys())
|
|
96
|
|
97 for i in table_keys:
|
|
98 names.append(label_convert[i]["id"])
|
|
99
|
|
100 sys.stdout.write("\t" + "\t".join(names) + "\n")
|
|
101 for j in table_keys:
|
|
102 j_key = label_convert[j]["id"]
|
|
103 sys.stdout.write(j_key)
|
|
104 for i in table_keys:
|
|
105 i_key = label_convert[i]["id"]
|
|
106 sys.stdout.write("\t%0.2f" % table[(i_key, j_key)])
|
|
107 sys.stdout.write("\n")
|
|
108
|
|
109
|
|
110 if __name__ == "__main__":
|
|
111 parser = argparse.ArgumentParser(
|
|
112 description="Convert XMFA alignments to gff3", prog="xmfa2gff3"
|
|
113 )
|
|
114 parser.add_argument("xmfa_file", type=argparse.FileType("r"), help="XMFA File")
|
|
115 parser.add_argument(
|
|
116 "sequences",
|
|
117 type=argparse.FileType("r"),
|
|
118 nargs="+",
|
|
119 help="Fasta files (in same order) passed to parent for reconstructing proper IDs",
|
|
120 )
|
|
121 parser.add_argument(
|
|
122 "--dice", action="store_true", help="Use dice method for calculating % identity"
|
|
123 )
|
|
124 parser.add_argument("--version", action="version", version="%(prog)s 1.0")
|
|
125
|
|
126 args = parser.parse_args()
|
|
127
|
|
128 total_similarity(**vars(args))
|