Mercurial > repos > cpt > cpt_xmfa_to_table
diff cpt_convert_xmfa/xmfa2tbl.py @ 0:06d8e28d0bd7 draft default tip
Uploaded
author | cpt |
---|---|
date | Fri, 10 Jun 2022 08:49:43 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cpt_convert_xmfa/xmfa2tbl.py Fri Jun 10 08:49:43 2022 +0000 @@ -0,0 +1,128 @@ +#!/usr/bin/env python +from Bio import SeqIO +import sys +from xmfa import parse_xmfa, percent_identity +import argparse +import logging +import itertools +from collections import Counter + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + + +def _id_tn_dict(sequences): + """Figure out sequence IDs AND sequence lengths from fasta file + """ + label_convert = {} + if sequences is not None: + if len(sequences) == 1: + for i, record in enumerate(SeqIO.parse(sequences[0], "fasta")): + label_convert[str(i + 1)] = {"id": record.id, "len": len(record)} + else: + for i, sequence in enumerate(sequences): + for record in SeqIO.parse(sequence, "fasta"): + label_convert[str(i + 1)] = {"id": record.id, "len": len(record)} + continue + # check for repeated sequence IDs + id_dupes = Counter(list(x["id"] for x in label_convert.values())) + for dupe in id_dupes: + if id_dupes[dupe] > 1: + log.debug("Duplicate FASTA ID Found: %s\n" % (dupe)) + for xmfaid in label_convert.keys(): + # Change the duplicate labels to have the XMFA identifer + # as a prefix so not to break the table generation later + if label_convert[xmfaid]["id"] == dupe: + label_convert[xmfaid]["id"] = "%s_%s" % (xmfaid, dupe) + + return label_convert + + +def total_similarity(xmfa_file, sequences=None, dice=False): + if sequences is None: + raise Exception("Must provide a non-zero number of sequence files") + + label_convert = _id_tn_dict(sequences) + lcbs = parse_xmfa(xmfa_file) + + # make a matrix based on number of sequences + table = {} + + for lcb in lcbs: + # ignore LCBs containing only one sequence + if len(lcb) == 0: + continue + + # permutations based on num sequences to compare for current LCB + compare_seqs = list(itertools.permutations(range(0, len(lcb)), 2)) + for permutation in compare_seqs: + (i, j) = permutation + similarity = percent_identity(lcb[i]["seq"], lcb[j]["seq"]) + + i_name = label_convert[lcb[i]["id"]]["id"] + j_name = label_convert[lcb[j]["id"]]["id"] + # find length of sequence in LCB + length_seq_lcb = lcb[i]["end"] - (lcb[i]["start"] - 1) + # populate table with normalized similarity value based on length_seq_lcb + if (i_name, j_name) not in table: + table[(i_name, j_name)] = 0 + table[(i_name, j_name)] += length_seq_lcb * similarity + + # finalize total percent similarity by dividing by length of parent sequence + for i in label_convert.keys(): + for j in label_convert.keys(): + i_name = label_convert[i]["id"] + j_name = label_convert[j]["id"] + if (i_name, j_name) in table: + if dice: + table[(i_name, j_name)] = ( + 2 + * table[(i_name, j_name)] + / (label_convert[i]["len"] + label_convert[j]["len"]) + ) + else: + table[(i_name, j_name)] = ( + table[(i_name, j_name)] / label_convert[i]["len"] + ) + else: + table[(i_name, j_name)] = 0 + + if i_name == j_name: + table[(i_name, j_name)] = 100 + + # print table + names = [] + table_keys = sorted(label_convert.keys()) + + for i in table_keys: + names.append(label_convert[i]["id"]) + + sys.stdout.write("\t" + "\t".join(names) + "\n") + for j in table_keys: + j_key = label_convert[j]["id"] + sys.stdout.write(j_key) + for i in table_keys: + i_key = label_convert[i]["id"] + sys.stdout.write("\t%0.2f" % table[(i_key, j_key)]) + sys.stdout.write("\n") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Convert XMFA alignments to gff3", prog="xmfa2gff3" + ) + parser.add_argument("xmfa_file", type=argparse.FileType("r"), help="XMFA File") + parser.add_argument( + "sequences", + type=argparse.FileType("r"), + nargs="+", + help="Fasta files (in same order) passed to parent for reconstructing proper IDs", + ) + parser.add_argument( + "--dice", action="store_true", help="Use dice method for calculating % identity" + ) + parser.add_argument("--version", action="version", version="%(prog)s 1.0") + + args = parser.parse_args() + + total_similarity(**vars(args))