Mercurial > repos > cpt > cpt_gbk_compare
comparison gbk_compare.py @ 1:1909729a1fd3 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:42:47 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:cae700761678 | 1:1909729a1fd3 |
---|---|
1 #!/usr/bin/env python3 | |
2 """ | |
3 Copyright 2019 Ryan Wick (rrwick@gmail.com) | |
4 https://github.com/rrwick/Compare-annotations/ | |
5 This program is free software: you can redistribute it and/or modify it under the terms of the GNU | |
6 General Public License as published by the Free Software Foundation, either version 3 of the | |
7 License, or (at your option) any later version. | |
8 This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without | |
9 even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
10 General Public License for more details. | |
11 You should have received a copy of the GNU General Public License along with this program. If not, | |
12 see <https://www.gnu.org/licenses/>. | |
13 """ | |
14 | |
15 import argparse | |
16 from Bio import SeqIO | |
17 from Bio import pairwise2 | |
18 from Bio.pairwise2 import format_alignment | |
19 import itertools | |
20 import sys | |
21 | |
22 | |
23 def addArr(arrA, arrB): | |
24 res = [] | |
25 for x in range(0, min(len(arrA), len(arrB))): | |
26 res.append(arrA[x] + arrB[x]) | |
27 return res | |
28 | |
29 | |
30 def get_arguments(): | |
31 parser = argparse.ArgumentParser( | |
32 description="Compare GenBank annotations", | |
33 formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
34 ) | |
35 | |
36 parser.add_argument( | |
37 "annotation_1", type=str, help="First annotated genome in Genbank format" | |
38 ) | |
39 parser.add_argument( | |
40 "annotation_2", type=str, help="Second annotated genome in Genbank format" | |
41 ) | |
42 | |
43 parser.add_argument( | |
44 "--match_identity_threshold", | |
45 type=float, | |
46 default=0.7, | |
47 help="Two genes must have at least this identity to be considerd the same (0.0 to 1.0)", | |
48 ) | |
49 parser.add_argument( | |
50 "--allowed_skipped_genes", | |
51 type=int, | |
52 default=10, | |
53 help="This many missing genes are allowed when aligning the annotations", | |
54 ) | |
55 parser.add_argument("--addNotes", action="store_true", help="Add Note fields") | |
56 | |
57 parser.add_argument("-sumOut", type=argparse.FileType("w"), help="Summary out file") | |
58 args = parser.parse_args() | |
59 return args | |
60 | |
61 | |
62 def main(): | |
63 args = get_arguments() | |
64 | |
65 # Load in the CDS features from the two assemblies. | |
66 old = SeqIO.parse(args.annotation_1, "genbank") | |
67 new = SeqIO.parse(args.annotation_2, "genbank") | |
68 old_record = next(old) | |
69 new_record = next(new) | |
70 old_features, new_features = [], [] | |
71 for f in old_record.features: | |
72 if f.type == "CDS": | |
73 old_features.append(f) | |
74 for f in new_record.features: | |
75 if f.type == "CDS": | |
76 new_features.append(f) | |
77 | |
78 args.sumOut.write( | |
79 "Features in First Genbank's assembly:\t" + str(len(old_features)) + "\n" | |
80 ) | |
81 args.sumOut.write( | |
82 "Features in Second Genbank's assembly:\t" + str(len(new_features)) + "\n\n" | |
83 ) | |
84 | |
85 # Align the features to each other. | |
86 offsets = sorted( | |
87 list( | |
88 itertools.product( | |
89 range(args.allowed_skipped_genes + 1), | |
90 range(args.allowed_skipped_genes + 1), | |
91 ) | |
92 ), | |
93 key=lambda x: x[0] + x[1], | |
94 ) | |
95 old_i, new_i = 0, 0 | |
96 exactRec = 0 | |
97 inexactRec = [0, 0, 0] | |
98 hypoRec = [0, 0, 0] | |
99 newCount = 0 | |
100 oldCount = 0 | |
101 if args.addNotes: | |
102 print( | |
103 "First Record CDS Product\tSimilarity\tSecond Record CDS Product\tPercent Identity\tLength Difference\tFirst Gbk's CDS Location\tSecond Gbk's CDS Location\tHypothetical Status\tFirst Record's Notes\tSecond Record's Notes\n" | |
104 ) | |
105 else: | |
106 print( | |
107 "First Record CDS Product\tSimilarity\tSecond Record CDS Product\tPercent Identity\tLength Difference\tFirst Gbk's CDS Location\tSecond Gbk's CDS Location\tHypothetical Status\n" | |
108 ) | |
109 while True: | |
110 if old_i >= len(old_features) and new_i >= len(new_features): | |
111 break | |
112 | |
113 for old_offset, new_offset in offsets: | |
114 try: | |
115 old_feature = old_features[old_i + old_offset] | |
116 except IndexError: | |
117 old_feature = None | |
118 try: | |
119 new_feature = new_features[new_i + new_offset] | |
120 except IndexError: | |
121 new_feature = None | |
122 try: | |
123 match, identity, length_diff = compare_features( | |
124 old_feature, | |
125 new_feature, | |
126 old_record, | |
127 new_record, | |
128 args.match_identity_threshold, | |
129 ) | |
130 except TypeError: | |
131 break | |
132 if match: | |
133 for j in range(old_offset): | |
134 print_in_old_not_new(old_features[old_i + j]) | |
135 oldCount += 1 | |
136 for j in range(new_offset): | |
137 print_in_new_not_old(new_features[new_i + j]) | |
138 newCount += 1 | |
139 if identity == 1.0: | |
140 exactRec += 1 | |
141 res1, res2 = print_match( | |
142 old_features[old_i + old_offset], | |
143 new_features[new_i + new_offset], | |
144 identity, | |
145 length_diff, | |
146 args.addNotes, | |
147 ) | |
148 inexactRec = addArr(inexactRec, res1) | |
149 hypoRec = addArr(hypoRec, res2) | |
150 old_i += old_offset | |
151 new_i += new_offset | |
152 break | |
153 else: | |
154 sys.stderr.write( | |
155 "Exceeded allowed number of skipped genes (" | |
156 + str(args.allowed_skipped_genes) | |
157 + "), unable to maintain alignment and continue comparison.\n" | |
158 ) | |
159 exit(2) | |
160 | |
161 if old_feature is None and new_feature is None: | |
162 break | |
163 | |
164 old_i += 1 | |
165 new_i += 1 | |
166 | |
167 args.sumOut.write("Exact Match:\t" + str(exactRec) + "\n\n") | |
168 | |
169 args.sumOut.write( | |
170 "Inexact Match:\t" + str(inexactRec[0] + inexactRec[1] + inexactRec[2]) + "\n" | |
171 ) | |
172 args.sumOut.write(" Same length:\t" + str(inexactRec[0]) + "\n") | |
173 args.sumOut.write(" Second Gbk Seq longer:\t" + str(inexactRec[2]) + "\n") | |
174 args.sumOut.write(" First Gbk Seq longer:\t" + str(inexactRec[1]) + "\n\n") | |
175 | |
176 args.sumOut.write("In Second Gbk but not in first:\t" + str(newCount) + "\n") | |
177 args.sumOut.write("In First Gbk but not in second:\t" + str(oldCount) + "\n\n") | |
178 | |
179 args.sumOut.write( | |
180 "Hypothetical Annotation Change:\t" + str(hypoRec[1] + hypoRec[2]) + "\n" | |
181 ) | |
182 args.sumOut.write("Hypothetical:\t" + str(hypoRec[0] + hypoRec[2]) + "\n") | |
183 | |
184 | |
185 def print_match(f1, f2, identity, length_diff, outNotes): | |
186 # print('', flush=True) | |
187 line = f1.qualifiers["product"][0] + "\t" | |
188 matchArr = [0, 0, 0] | |
189 hypoArr = [0, 0, 0] | |
190 if identity == 1.0: | |
191 # print('Exact match') | |
192 line += "Exact match\t" + f2.qualifiers["product"][0] + "\t100.0\tSame Length\t" | |
193 else: | |
194 # print('Inexact match (' + '%.2f' % (identity * 100.0) + '% ID, ', end='') | |
195 line += ( | |
196 "Inexact match\t" | |
197 + f2.qualifiers["product"][0] | |
198 + "\t%.2f\t" % (identity * 100.0) | |
199 ) | |
200 if length_diff == 0: | |
201 # print('same length)') | |
202 line += "Same Length\t" | |
203 matchArr[0] += 1 | |
204 elif length_diff > 0: | |
205 # print('old seq longer)') | |
206 line += "First Gbk Seq Longer\t" | |
207 matchArr[1] += 1 | |
208 elif length_diff < 0: | |
209 # print('new seq longer)') | |
210 line += "Second Gbk Seq Longer\t" | |
211 matchArr[2] += 1 | |
212 # print(' old: ', end='') | |
213 # print_feature_one_line(f1) | |
214 line += print_feature_one_line(f1) + "\t" | |
215 # print(' new: ', end='') | |
216 # print_feature_one_line(f2) | |
217 line += print_feature_one_line(f2) + "\t" | |
218 p1 = f1.qualifiers["product"][0].lower() | |
219 p2 = f2.qualifiers["product"][0].lower() | |
220 if "hypothetical" in p1 and "hypothetical" in p2: | |
221 # print(' still hypothetical') | |
222 line += "Hypothetical\t" | |
223 hypoArr[0] += 1 | |
224 elif "hypothetical" in p1 and "hypothetical" not in p2: | |
225 # print(' no longer hypothetical') | |
226 line += "No Longer Hypothetical\t" | |
227 hypoArr[1] += 1 | |
228 elif "hypothetical" not in p1 and "hypothetical" in p2: | |
229 # print(' became hypothetical') | |
230 line += "Became Hypothetical\t" | |
231 hypoArr[2] += 1 | |
232 else: | |
233 line += "'Hypothetical' not in second nor first Gbk's product tag" | |
234 | |
235 if outNotes: | |
236 line += "\t" | |
237 if "note" in f1.qualifiers.keys(): | |
238 for x in f1.qualifiers["note"]: | |
239 line += x | |
240 line += "\t" | |
241 else: | |
242 line += "N/A\t" | |
243 if "note" in f2.qualifiers.keys(): | |
244 for x in f2.qualifiers["note"]: | |
245 line += x | |
246 else: | |
247 line += "N/A" | |
248 | |
249 print(line) | |
250 return matchArr, hypoArr | |
251 | |
252 | |
253 def print_in_old_not_new(f): # rename file outputs | |
254 line = ( | |
255 f.qualifiers["product"][0] | |
256 + "\tIn First Gbk but not Second\tN/A\t0.00\t" | |
257 + str(f.location.end - f.location.start) | |
258 + "\t" | |
259 + print_feature_one_line(f) | |
260 + "\tN/A\tN/A" | |
261 ) | |
262 # print('') | |
263 # print('In old but not in new:') | |
264 # print(' ', end='') | |
265 # print_feature_one_line(f) | |
266 print(line) | |
267 | |
268 | |
269 def print_in_new_not_old(f): # rename file outputs | |
270 line = ( | |
271 "N/A\tIn Second Gbk but not First\t" | |
272 + f.qualifiers["product"][0] | |
273 + "\t0.00\t" | |
274 + str(f.location.end - f.location.start) | |
275 + "\tN/A\t" | |
276 + print_feature_one_line(f) | |
277 + "\tN/A" | |
278 ) | |
279 # print('') | |
280 # print('In new but not in old:') | |
281 # print(' ', end='') | |
282 # print_feature_one_line(f) | |
283 print(line) | |
284 | |
285 | |
286 def print_feature_one_line(f): | |
287 # f_str = f.qualifiers['product'][0] | |
288 f_str = "" | |
289 strand = "+" if f.location.strand == 1 else "-" | |
290 f_str += ( | |
291 "(" + str(f.location.start) + "-" + str(f.location.end) + " " + strand + ", " | |
292 ) | |
293 f_str += str(f.location.end - f.location.start) + " bp)" | |
294 return f_str | |
295 | |
296 | |
297 def compare_features(f1, f2, r1, r2, match_identity_threshold): | |
298 if f1 is None or f2 is None: | |
299 return False | |
300 | |
301 s1 = f1.extract(r1).seq | |
302 s2 = f2.extract(r2).seq | |
303 score = pairwise2.align.globalms(s1, s2, 1, 0, 0, 0, score_only=True) | |
304 identity = score / max(len(s1), len(s2)) | |
305 match = identity >= match_identity_threshold | |
306 length_diff = len(s1) - len(s2) | |
307 return match, identity, length_diff | |
308 | |
309 | |
310 if __name__ == "__main__": | |
311 main() |