Mercurial > repos > petr-novak > dante
comparison dante_gff_to_dna.py @ 0:77d9f2ecb28a draft
Uploaded
| author | petr-novak |
|---|---|
| date | Wed, 03 Jul 2019 02:45:00 -0400 |
| parents | |
| children | d0431a839606 |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:77d9f2ecb28a |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 import argparse | |
| 4 import configuration | |
| 5 import time | |
| 6 import os | |
| 7 from collections import defaultdict | |
| 8 from Bio import SeqIO | |
| 9 import textwrap | |
| 10 | |
| 11 t_nt_seqs_extraction = time.time() | |
| 12 | |
| 13 | |
| 14 def str2bool(v): | |
| 15 if v.lower() in ('yes', 'true', 't', 'y', '1'): | |
| 16 return True | |
| 17 elif v.lower() in ('no', 'false', 'f', 'n', '0'): | |
| 18 return False | |
| 19 else: | |
| 20 raise argparse.ArgumentTypeError('Boolean value expected') | |
| 21 | |
| 22 | |
| 23 def check_file_start(gff_file): | |
| 24 count_comment = 0 | |
| 25 with open(gff_file, "r") as gff_all: | |
| 26 line = gff_all.readline() | |
| 27 while line.startswith("#"): | |
| 28 line = gff_all.readline() | |
| 29 count_comment += 1 | |
| 30 return count_comment, line | |
| 31 | |
| 32 | |
| 33 def extract_nt_seqs(DNA_SEQ, DOM_GFF, OUT_DIR, CLASS_TBL, EXTENDED): | |
| 34 ''' Extract nucleotide sequences of protein domains found by DANTE from input DNA seq. | |
| 35 Sequences are saved in fasta files separately for each transposon lineage. | |
| 36 Sequences extraction is based on position of Best_Hit alignment reported by LASTAL. | |
| 37 The positions can be extended (optional) based on what part of database domain was aligned (Best_Hit_DB_Pos attribute). | |
| 38 The strand orientation needs to be considered in extending and extracting the sequence itself | |
| 39 ''' | |
| 40 [count_comment, first_line] = check_file_start(DOM_GFF) | |
| 41 unique_classes = get_unique_classes(CLASS_TBL) | |
| 42 files_dict = defaultdict(str) | |
| 43 domains_counts_dict = defaultdict(int) | |
| 44 allSeqs = SeqIO.to_dict(SeqIO.parse(DNA_SEQ, 'fasta')) | |
| 45 with open(DOM_GFF, "r") as domains: | |
| 46 for comment_idx in range(count_comment): | |
| 47 next(domains) | |
| 48 seq_id_stored = first_line.split("\t")[0] | |
| 49 allSeqs = SeqIO.to_dict(SeqIO.parse(DNA_SEQ, 'fasta')) | |
| 50 seq_nt = allSeqs[seq_id_stored] | |
| 51 for line in domains: | |
| 52 seq_id = line.split("\t")[0] | |
| 53 dom_type = line.split("\t")[8].split(";")[0].split("=")[1] | |
| 54 elem_type = line.split("\t")[8].split(";")[1].split("=")[1] | |
| 55 strand = line.split("\t")[6] | |
| 56 align_nt_start = int(line.split("\t")[8].split(";")[3].split(":")[ | |
| 57 -1].split("-")[0]) | |
| 58 align_nt_end = int(line.split("\t")[8].split(";")[3].split(":")[ | |
| 59 -1].split("-")[1].split("[")[0]) | |
| 60 if seq_id != seq_id_stored: | |
| 61 seq_id_stored = seq_id | |
| 62 seq_nt = allSeqs[seq_id_stored] | |
| 63 if EXTENDED: | |
| 64 ## which part of database sequence was aligned | |
| 65 db_part = line.split("\t")[8].split(";")[4].split("=")[1] | |
| 66 ## datatabse seq length | |
| 67 dom_len = int(db_part.split("of")[1]) | |
| 68 ## start of alignment on database seq | |
| 69 db_start = int(db_part.split("of")[0].split(":")[0]) | |
| 70 ## end of alignment on database seq | |
| 71 db_end = int(db_part.split("of")[0].split(":")[1]) | |
| 72 ## number of nucleotides missing in the beginning | |
| 73 dom_nt_prefix = (db_start - 1) * 3 | |
| 74 ## number of nucleotides missing in the end | |
| 75 dom_nt_suffix = (dom_len - db_end) * 3 | |
| 76 if strand == "+": | |
| 77 dom_nt_start = align_nt_start - dom_nt_prefix | |
| 78 dom_nt_end = align_nt_end + dom_nt_suffix | |
| 79 ## reverse extending for - strand | |
| 80 else: | |
| 81 dom_nt_start = align_nt_start - dom_nt_suffix | |
| 82 dom_nt_end = align_nt_end + dom_nt_prefix | |
| 83 ## correction for domain after extending having negative starting positon | |
| 84 dom_nt_start = max(1, dom_nt_start) | |
| 85 else: | |
| 86 dom_nt_start = align_nt_start | |
| 87 dom_nt_end = align_nt_end | |
| 88 full_dom_nt = seq_nt.seq[dom_nt_start - 1:dom_nt_end] | |
| 89 ## for - strand take reverse complement of the extracted sequence | |
| 90 if strand == "-": | |
| 91 full_dom_nt = full_dom_nt.reverse_complement() | |
| 92 full_dom_nt = str(full_dom_nt) | |
| 93 ## report when domain classified to the last level and no Ns in extracted seq | |
| 94 if elem_type in unique_classes and "N" not in full_dom_nt: | |
| 95 # lineages reported in separate fasta files | |
| 96 if not elem_type in files_dict: | |
| 97 files_dict[elem_type] = os.path.join( | |
| 98 OUT_DIR, "{}.fasta".format(elem_type.split("|")[ | |
| 99 -1].replace("/", "_"))) | |
| 100 with open(files_dict[elem_type], "a") as out_nt_seq: | |
| 101 out_nt_seq.write(">{}:{}-{}|{}[{}]\n{}\n".format( | |
| 102 seq_nt.id, dom_nt_start, dom_nt_end, dom_type, | |
| 103 elem_type, textwrap.fill(full_dom_nt, | |
| 104 configuration.FASTA_LINE))) | |
| 105 domains_counts_dict[elem_type] += 1 | |
| 106 return domains_counts_dict | |
| 107 | |
| 108 | |
| 109 def get_unique_classes(CLASS_TBL): | |
| 110 ''' Get all the lineages of current domains classification table to check if domains are classified to the last level. | |
| 111 Only the sequences of unambiguous and completely classified domains will be extracted. | |
| 112 ''' | |
| 113 unique_classes = [] | |
| 114 with open(CLASS_TBL, "r") as class_tbl: | |
| 115 for line in class_tbl: | |
| 116 line_class = "|".join(line.rstrip().split("\t")[1:]) | |
| 117 if line_class not in unique_classes: | |
| 118 unique_classes.append(line_class) | |
| 119 return unique_classes | |
| 120 | |
| 121 | |
| 122 def write_domains_stat(domains_counts_dict, OUT_DIR): | |
| 123 ''' Report counts of domains for individual lineages | |
| 124 ''' | |
| 125 total = 0 | |
| 126 with open( | |
| 127 os.path.join(OUT_DIR, | |
| 128 configuration.EXTRACT_DOM_STAT), "w") as dom_stat: | |
| 129 for domain, count in domains_counts_dict.items(): | |
| 130 dom_stat.write(";{}:{}\n".format(domain, count)) | |
| 131 total += count | |
| 132 dom_stat.write(";TOTAL:{}\n".format(total)) | |
| 133 | |
| 134 | |
| 135 def main(args): | |
| 136 | |
| 137 DNA_SEQ = args.input_dna | |
| 138 DOM_GFF = args.domains_gff | |
| 139 OUT_DIR = args.out_dir | |
| 140 CLASS_TBL = args.classification | |
| 141 EXTENDED = args.extended | |
| 142 | |
| 143 if not os.path.exists(OUT_DIR): | |
| 144 os.makedirs(OUT_DIR) | |
| 145 | |
| 146 domains_counts_dict = extract_nt_seqs(DNA_SEQ, DOM_GFF, OUT_DIR, CLASS_TBL, | |
| 147 EXTENDED) | |
| 148 write_domains_stat(domains_counts_dict, OUT_DIR) | |
| 149 | |
| 150 print("ELAPSED_TIME_EXTRACTION = {} s\n".format(time.time() - | |
| 151 t_nt_seqs_extraction)) | |
| 152 | |
| 153 | |
| 154 if __name__ == "__main__": | |
| 155 | |
| 156 # Command line arguments | |
| 157 parser = argparse.ArgumentParser() | |
| 158 parser.add_argument('-i', | |
| 159 '--input_dna', | |
| 160 type=str, | |
| 161 required=True, | |
| 162 help='path to input DNA sequence') | |
| 163 parser.add_argument('-d', | |
| 164 '--domains_gff', | |
| 165 type=str, | |
| 166 required=True, | |
| 167 help='GFF file of protein domains') | |
| 168 parser.add_argument('-cs', | |
| 169 '--classification', | |
| 170 type=str, | |
| 171 required=True, | |
| 172 help='protein domains classification file') | |
| 173 parser.add_argument('-out', | |
| 174 '--out_dir', | |
| 175 type=str, | |
| 176 default=configuration.EXTRACT_OUT_DIR, | |
| 177 help='output directory') | |
| 178 parser.add_argument( | |
| 179 '-ex', | |
| 180 '--extended', | |
| 181 type=str2bool, | |
| 182 default=True, | |
| 183 help= | |
| 184 'extend the domains edges if not the whole datatabase sequence was aligned') | |
| 185 args = parser.parse_args() | |
| 186 main(args) |
