Mercurial > repos > cpt > cpt_xmfa_to_table
comparison cpt_convert_xmfa/xmfa2tbl.py @ 0:06d8e28d0bd7 draft default tip
Uploaded
author | cpt |
---|---|
date | Fri, 10 Jun 2022 08:49:43 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:06d8e28d0bd7 |
---|---|
1 #!/usr/bin/env python | |
2 from Bio import SeqIO | |
3 import sys | |
4 from xmfa import parse_xmfa, percent_identity | |
5 import argparse | |
6 import logging | |
7 import itertools | |
8 from collections import Counter | |
9 | |
10 logging.basicConfig(level=logging.INFO) | |
11 log = logging.getLogger(__name__) | |
12 | |
13 | |
14 def _id_tn_dict(sequences): | |
15 """Figure out sequence IDs AND sequence lengths from fasta file | |
16 """ | |
17 label_convert = {} | |
18 if sequences is not None: | |
19 if len(sequences) == 1: | |
20 for i, record in enumerate(SeqIO.parse(sequences[0], "fasta")): | |
21 label_convert[str(i + 1)] = {"id": record.id, "len": len(record)} | |
22 else: | |
23 for i, sequence in enumerate(sequences): | |
24 for record in SeqIO.parse(sequence, "fasta"): | |
25 label_convert[str(i + 1)] = {"id": record.id, "len": len(record)} | |
26 continue | |
27 # check for repeated sequence IDs | |
28 id_dupes = Counter(list(x["id"] for x in label_convert.values())) | |
29 for dupe in id_dupes: | |
30 if id_dupes[dupe] > 1: | |
31 log.debug("Duplicate FASTA ID Found: %s\n" % (dupe)) | |
32 for xmfaid in label_convert.keys(): | |
33 # Change the duplicate labels to have the XMFA identifer | |
34 # as a prefix so not to break the table generation later | |
35 if label_convert[xmfaid]["id"] == dupe: | |
36 label_convert[xmfaid]["id"] = "%s_%s" % (xmfaid, dupe) | |
37 | |
38 return label_convert | |
39 | |
40 | |
41 def total_similarity(xmfa_file, sequences=None, dice=False): | |
42 if sequences is None: | |
43 raise Exception("Must provide a non-zero number of sequence files") | |
44 | |
45 label_convert = _id_tn_dict(sequences) | |
46 lcbs = parse_xmfa(xmfa_file) | |
47 | |
48 # make a matrix based on number of sequences | |
49 table = {} | |
50 | |
51 for lcb in lcbs: | |
52 # ignore LCBs containing only one sequence | |
53 if len(lcb) == 0: | |
54 continue | |
55 | |
56 # permutations based on num sequences to compare for current LCB | |
57 compare_seqs = list(itertools.permutations(range(0, len(lcb)), 2)) | |
58 for permutation in compare_seqs: | |
59 (i, j) = permutation | |
60 similarity = percent_identity(lcb[i]["seq"], lcb[j]["seq"]) | |
61 | |
62 i_name = label_convert[lcb[i]["id"]]["id"] | |
63 j_name = label_convert[lcb[j]["id"]]["id"] | |
64 # find length of sequence in LCB | |
65 length_seq_lcb = lcb[i]["end"] - (lcb[i]["start"] - 1) | |
66 # populate table with normalized similarity value based on length_seq_lcb | |
67 if (i_name, j_name) not in table: | |
68 table[(i_name, j_name)] = 0 | |
69 table[(i_name, j_name)] += length_seq_lcb * similarity | |
70 | |
71 # finalize total percent similarity by dividing by length of parent sequence | |
72 for i in label_convert.keys(): | |
73 for j in label_convert.keys(): | |
74 i_name = label_convert[i]["id"] | |
75 j_name = label_convert[j]["id"] | |
76 if (i_name, j_name) in table: | |
77 if dice: | |
78 table[(i_name, j_name)] = ( | |
79 2 | |
80 * table[(i_name, j_name)] | |
81 / (label_convert[i]["len"] + label_convert[j]["len"]) | |
82 ) | |
83 else: | |
84 table[(i_name, j_name)] = ( | |
85 table[(i_name, j_name)] / label_convert[i]["len"] | |
86 ) | |
87 else: | |
88 table[(i_name, j_name)] = 0 | |
89 | |
90 if i_name == j_name: | |
91 table[(i_name, j_name)] = 100 | |
92 | |
93 # print table | |
94 names = [] | |
95 table_keys = sorted(label_convert.keys()) | |
96 | |
97 for i in table_keys: | |
98 names.append(label_convert[i]["id"]) | |
99 | |
100 sys.stdout.write("\t" + "\t".join(names) + "\n") | |
101 for j in table_keys: | |
102 j_key = label_convert[j]["id"] | |
103 sys.stdout.write(j_key) | |
104 for i in table_keys: | |
105 i_key = label_convert[i]["id"] | |
106 sys.stdout.write("\t%0.2f" % table[(i_key, j_key)]) | |
107 sys.stdout.write("\n") | |
108 | |
109 | |
110 if __name__ == "__main__": | |
111 parser = argparse.ArgumentParser( | |
112 description="Convert XMFA alignments to gff3", prog="xmfa2gff3" | |
113 ) | |
114 parser.add_argument("xmfa_file", type=argparse.FileType("r"), help="XMFA File") | |
115 parser.add_argument( | |
116 "sequences", | |
117 type=argparse.FileType("r"), | |
118 nargs="+", | |
119 help="Fasta files (in same order) passed to parent for reconstructing proper IDs", | |
120 ) | |
121 parser.add_argument( | |
122 "--dice", action="store_true", help="Use dice method for calculating % identity" | |
123 ) | |
124 parser.add_argument("--version", action="version", version="%(prog)s 1.0") | |
125 | |
126 args = parser.parse_args() | |
127 | |
128 total_similarity(**vars(args)) |