Mercurial > repos > cpt > cpt_gbk_compare
diff gbk_compare.py @ 1:1909729a1fd3 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:42:47 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gbk_compare.py Mon Jun 05 02:42:47 2023 +0000 @@ -0,0 +1,311 @@ +#!/usr/bin/env python3 +""" +Copyright 2019 Ryan Wick (rrwick@gmail.com) +https://github.com/rrwick/Compare-annotations/ +This program is free software: you can redistribute it and/or modify it under the terms of the GNU +General Public License as published by the Free Software Foundation, either version 3 of the +License, or (at your option) any later version. +This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without +even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. +You should have received a copy of the GNU General Public License along with this program. If not, +see <https://www.gnu.org/licenses/>. +""" + +import argparse +from Bio import SeqIO +from Bio import pairwise2 +from Bio.pairwise2 import format_alignment +import itertools +import sys + + +def addArr(arrA, arrB): + res = [] + for x in range(0, min(len(arrA), len(arrB))): + res.append(arrA[x] + arrB[x]) + return res + + +def get_arguments(): + parser = argparse.ArgumentParser( + description="Compare GenBank annotations", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "annotation_1", type=str, help="First annotated genome in Genbank format" + ) + parser.add_argument( + "annotation_2", type=str, help="Second annotated genome in Genbank format" + ) + + parser.add_argument( + "--match_identity_threshold", + type=float, + default=0.7, + help="Two genes must have at least this identity to be considerd the same (0.0 to 1.0)", + ) + parser.add_argument( + "--allowed_skipped_genes", + type=int, + default=10, + help="This many missing genes are allowed when aligning the annotations", + ) + parser.add_argument("--addNotes", action="store_true", help="Add Note fields") + + parser.add_argument("-sumOut", type=argparse.FileType("w"), help="Summary out file") + args = parser.parse_args() + return args + + +def main(): + args = get_arguments() + + # Load in the CDS features from the two assemblies. + old = SeqIO.parse(args.annotation_1, "genbank") + new = SeqIO.parse(args.annotation_2, "genbank") + old_record = next(old) + new_record = next(new) + old_features, new_features = [], [] + for f in old_record.features: + if f.type == "CDS": + old_features.append(f) + for f in new_record.features: + if f.type == "CDS": + new_features.append(f) + + args.sumOut.write( + "Features in First Genbank's assembly:\t" + str(len(old_features)) + "\n" + ) + args.sumOut.write( + "Features in Second Genbank's assembly:\t" + str(len(new_features)) + "\n\n" + ) + + # Align the features to each other. + offsets = sorted( + list( + itertools.product( + range(args.allowed_skipped_genes + 1), + range(args.allowed_skipped_genes + 1), + ) + ), + key=lambda x: x[0] + x[1], + ) + old_i, new_i = 0, 0 + exactRec = 0 + inexactRec = [0, 0, 0] + hypoRec = [0, 0, 0] + newCount = 0 + oldCount = 0 + if args.addNotes: + print( + "First Record CDS Product\tSimilarity\tSecond Record CDS Product\tPercent Identity\tLength Difference\tFirst Gbk's CDS Location\tSecond Gbk's CDS Location\tHypothetical Status\tFirst Record's Notes\tSecond Record's Notes\n" + ) + else: + print( + "First Record CDS Product\tSimilarity\tSecond Record CDS Product\tPercent Identity\tLength Difference\tFirst Gbk's CDS Location\tSecond Gbk's CDS Location\tHypothetical Status\n" + ) + while True: + if old_i >= len(old_features) and new_i >= len(new_features): + break + + for old_offset, new_offset in offsets: + try: + old_feature = old_features[old_i + old_offset] + except IndexError: + old_feature = None + try: + new_feature = new_features[new_i + new_offset] + except IndexError: + new_feature = None + try: + match, identity, length_diff = compare_features( + old_feature, + new_feature, + old_record, + new_record, + args.match_identity_threshold, + ) + except TypeError: + break + if match: + for j in range(old_offset): + print_in_old_not_new(old_features[old_i + j]) + oldCount += 1 + for j in range(new_offset): + print_in_new_not_old(new_features[new_i + j]) + newCount += 1 + if identity == 1.0: + exactRec += 1 + res1, res2 = print_match( + old_features[old_i + old_offset], + new_features[new_i + new_offset], + identity, + length_diff, + args.addNotes, + ) + inexactRec = addArr(inexactRec, res1) + hypoRec = addArr(hypoRec, res2) + old_i += old_offset + new_i += new_offset + break + else: + sys.stderr.write( + "Exceeded allowed number of skipped genes (" + + str(args.allowed_skipped_genes) + + "), unable to maintain alignment and continue comparison.\n" + ) + exit(2) + + if old_feature is None and new_feature is None: + break + + old_i += 1 + new_i += 1 + + args.sumOut.write("Exact Match:\t" + str(exactRec) + "\n\n") + + args.sumOut.write( + "Inexact Match:\t" + str(inexactRec[0] + inexactRec[1] + inexactRec[2]) + "\n" + ) + args.sumOut.write(" Same length:\t" + str(inexactRec[0]) + "\n") + args.sumOut.write(" Second Gbk Seq longer:\t" + str(inexactRec[2]) + "\n") + args.sumOut.write(" First Gbk Seq longer:\t" + str(inexactRec[1]) + "\n\n") + + args.sumOut.write("In Second Gbk but not in first:\t" + str(newCount) + "\n") + args.sumOut.write("In First Gbk but not in second:\t" + str(oldCount) + "\n\n") + + args.sumOut.write( + "Hypothetical Annotation Change:\t" + str(hypoRec[1] + hypoRec[2]) + "\n" + ) + args.sumOut.write("Hypothetical:\t" + str(hypoRec[0] + hypoRec[2]) + "\n") + + +def print_match(f1, f2, identity, length_diff, outNotes): + # print('', flush=True) + line = f1.qualifiers["product"][0] + "\t" + matchArr = [0, 0, 0] + hypoArr = [0, 0, 0] + if identity == 1.0: + # print('Exact match') + line += "Exact match\t" + f2.qualifiers["product"][0] + "\t100.0\tSame Length\t" + else: + # print('Inexact match (' + '%.2f' % (identity * 100.0) + '% ID, ', end='') + line += ( + "Inexact match\t" + + f2.qualifiers["product"][0] + + "\t%.2f\t" % (identity * 100.0) + ) + if length_diff == 0: + # print('same length)') + line += "Same Length\t" + matchArr[0] += 1 + elif length_diff > 0: + # print('old seq longer)') + line += "First Gbk Seq Longer\t" + matchArr[1] += 1 + elif length_diff < 0: + # print('new seq longer)') + line += "Second Gbk Seq Longer\t" + matchArr[2] += 1 + # print(' old: ', end='') + # print_feature_one_line(f1) + line += print_feature_one_line(f1) + "\t" + # print(' new: ', end='') + # print_feature_one_line(f2) + line += print_feature_one_line(f2) + "\t" + p1 = f1.qualifiers["product"][0].lower() + p2 = f2.qualifiers["product"][0].lower() + if "hypothetical" in p1 and "hypothetical" in p2: + # print(' still hypothetical') + line += "Hypothetical\t" + hypoArr[0] += 1 + elif "hypothetical" in p1 and "hypothetical" not in p2: + # print(' no longer hypothetical') + line += "No Longer Hypothetical\t" + hypoArr[1] += 1 + elif "hypothetical" not in p1 and "hypothetical" in p2: + # print(' became hypothetical') + line += "Became Hypothetical\t" + hypoArr[2] += 1 + else: + line += "'Hypothetical' not in second nor first Gbk's product tag" + + if outNotes: + line += "\t" + if "note" in f1.qualifiers.keys(): + for x in f1.qualifiers["note"]: + line += x + line += "\t" + else: + line += "N/A\t" + if "note" in f2.qualifiers.keys(): + for x in f2.qualifiers["note"]: + line += x + else: + line += "N/A" + + print(line) + return matchArr, hypoArr + + +def print_in_old_not_new(f): # rename file outputs + line = ( + f.qualifiers["product"][0] + + "\tIn First Gbk but not Second\tN/A\t0.00\t" + + str(f.location.end - f.location.start) + + "\t" + + print_feature_one_line(f) + + "\tN/A\tN/A" + ) + # print('') + # print('In old but not in new:') + # print(' ', end='') + # print_feature_one_line(f) + print(line) + + +def print_in_new_not_old(f): # rename file outputs + line = ( + "N/A\tIn Second Gbk but not First\t" + + f.qualifiers["product"][0] + + "\t0.00\t" + + str(f.location.end - f.location.start) + + "\tN/A\t" + + print_feature_one_line(f) + + "\tN/A" + ) + # print('') + # print('In new but not in old:') + # print(' ', end='') + # print_feature_one_line(f) + print(line) + + +def print_feature_one_line(f): + # f_str = f.qualifiers['product'][0] + f_str = "" + strand = "+" if f.location.strand == 1 else "-" + f_str += ( + "(" + str(f.location.start) + "-" + str(f.location.end) + " " + strand + ", " + ) + f_str += str(f.location.end - f.location.start) + " bp)" + return f_str + + +def compare_features(f1, f2, r1, r2, match_identity_threshold): + if f1 is None or f2 is None: + return False + + s1 = f1.extract(r1).seq + s2 = f2.extract(r2).seq + score = pairwise2.align.globalms(s1, s2, 1, 0, 0, 0, score_only=True) + identity = score / max(len(s1), len(s2)) + match = identity >= match_identity_threshold + length_diff = len(s1) - len(s2) + return match, identity, length_diff + + +if __name__ == "__main__": + main()