0
|
1 #!/usr/bin/env python3
|
|
2
|
|
3 import argparse
|
|
4 import configuration
|
|
5 import time
|
|
6 import os
|
|
7 from collections import defaultdict
|
|
8 from Bio import SeqIO
|
|
9 import textwrap
|
|
10
|
|
11 t_nt_seqs_extraction = time.time()
|
|
12
|
|
13
|
|
14 def str2bool(v):
|
|
15 if v.lower() in ('yes', 'true', 't', 'y', '1'):
|
|
16 return True
|
|
17 elif v.lower() in ('no', 'false', 'f', 'n', '0'):
|
|
18 return False
|
|
19 else:
|
|
20 raise argparse.ArgumentTypeError('Boolean value expected')
|
|
21
|
|
22
|
|
23 def check_file_start(gff_file):
|
|
24 count_comment = 0
|
|
25 with open(gff_file, "r") as gff_all:
|
|
26 line = gff_all.readline()
|
|
27 while line.startswith("#"):
|
|
28 line = gff_all.readline()
|
|
29 count_comment += 1
|
|
30 return count_comment, line
|
|
31
|
|
32
|
|
33 def extract_nt_seqs(DNA_SEQ, DOM_GFF, OUT_DIR, CLASS_TBL, EXTENDED):
|
|
34 ''' Extract nucleotide sequences of protein domains found by DANTE from input DNA seq.
|
|
35 Sequences are saved in fasta files separately for each transposon lineage.
|
|
36 Sequences extraction is based on position of Best_Hit alignment reported by LASTAL.
|
|
37 The positions can be extended (optional) based on what part of database domain was aligned (Best_Hit_DB_Pos attribute).
|
|
38 The strand orientation needs to be considered in extending and extracting the sequence itself
|
|
39 '''
|
|
40 [count_comment, first_line] = check_file_start(DOM_GFF)
|
|
41 unique_classes = get_unique_classes(CLASS_TBL)
|
|
42 files_dict = defaultdict(str)
|
|
43 domains_counts_dict = defaultdict(int)
|
|
44 allSeqs = SeqIO.to_dict(SeqIO.parse(DNA_SEQ, 'fasta'))
|
|
45 with open(DOM_GFF, "r") as domains:
|
|
46 for comment_idx in range(count_comment):
|
|
47 next(domains)
|
|
48 seq_id_stored = first_line.split("\t")[0]
|
|
49 allSeqs = SeqIO.to_dict(SeqIO.parse(DNA_SEQ, 'fasta'))
|
|
50 seq_nt = allSeqs[seq_id_stored]
|
|
51 for line in domains:
|
|
52 seq_id = line.split("\t")[0]
|
|
53 dom_type = line.split("\t")[8].split(";")[0].split("=")[1]
|
|
54 elem_type = line.split("\t")[8].split(";")[1].split("=")[1]
|
|
55 strand = line.split("\t")[6]
|
|
56 align_nt_start = int(line.split("\t")[8].split(";")[3].split(":")[
|
|
57 -1].split("-")[0])
|
|
58 align_nt_end = int(line.split("\t")[8].split(";")[3].split(":")[
|
|
59 -1].split("-")[1].split("[")[0])
|
|
60 if seq_id != seq_id_stored:
|
|
61 seq_id_stored = seq_id
|
|
62 seq_nt = allSeqs[seq_id_stored]
|
|
63 if EXTENDED:
|
|
64 ## which part of database sequence was aligned
|
|
65 db_part = line.split("\t")[8].split(";")[4].split("=")[1]
|
|
66 ## datatabse seq length
|
|
67 dom_len = int(db_part.split("of")[1])
|
|
68 ## start of alignment on database seq
|
|
69 db_start = int(db_part.split("of")[0].split(":")[0])
|
|
70 ## end of alignment on database seq
|
|
71 db_end = int(db_part.split("of")[0].split(":")[1])
|
|
72 ## number of nucleotides missing in the beginning
|
|
73 dom_nt_prefix = (db_start - 1) * 3
|
|
74 ## number of nucleotides missing in the end
|
|
75 dom_nt_suffix = (dom_len - db_end) * 3
|
|
76 if strand == "+":
|
|
77 dom_nt_start = align_nt_start - dom_nt_prefix
|
|
78 dom_nt_end = align_nt_end + dom_nt_suffix
|
|
79 ## reverse extending for - strand
|
|
80 else:
|
|
81 dom_nt_start = align_nt_start - dom_nt_suffix
|
|
82 dom_nt_end = align_nt_end + dom_nt_prefix
|
|
83 ## correction for domain after extending having negative starting positon
|
|
84 dom_nt_start = max(1, dom_nt_start)
|
|
85 else:
|
|
86 dom_nt_start = align_nt_start
|
|
87 dom_nt_end = align_nt_end
|
|
88 full_dom_nt = seq_nt.seq[dom_nt_start - 1:dom_nt_end]
|
|
89 ## for - strand take reverse complement of the extracted sequence
|
|
90 if strand == "-":
|
|
91 full_dom_nt = full_dom_nt.reverse_complement()
|
|
92 full_dom_nt = str(full_dom_nt)
|
|
93 ## report when domain classified to the last level and no Ns in extracted seq
|
|
94 if elem_type in unique_classes and "N" not in full_dom_nt:
|
|
95 # lineages reported in separate fasta files
|
|
96 if not elem_type in files_dict:
|
|
97 files_dict[elem_type] = os.path.join(
|
|
98 OUT_DIR, "{}.fasta".format(elem_type.split("|")[
|
|
99 -1].replace("/", "_")))
|
|
100 with open(files_dict[elem_type], "a") as out_nt_seq:
|
|
101 out_nt_seq.write(">{}:{}-{}|{}[{}]\n{}\n".format(
|
|
102 seq_nt.id, dom_nt_start, dom_nt_end, dom_type,
|
|
103 elem_type, textwrap.fill(full_dom_nt,
|
|
104 configuration.FASTA_LINE)))
|
|
105 domains_counts_dict[elem_type] += 1
|
|
106 return domains_counts_dict
|
|
107
|
|
108
|
|
109 def get_unique_classes(CLASS_TBL):
|
|
110 ''' Get all the lineages of current domains classification table to check if domains are classified to the last level.
|
|
111 Only the sequences of unambiguous and completely classified domains will be extracted.
|
|
112 '''
|
|
113 unique_classes = []
|
|
114 with open(CLASS_TBL, "r") as class_tbl:
|
|
115 for line in class_tbl:
|
|
116 line_class = "|".join(line.rstrip().split("\t")[1:])
|
|
117 if line_class not in unique_classes:
|
|
118 unique_classes.append(line_class)
|
|
119 return unique_classes
|
|
120
|
|
121
|
|
122 def write_domains_stat(domains_counts_dict, OUT_DIR):
|
|
123 ''' Report counts of domains for individual lineages
|
|
124 '''
|
|
125 total = 0
|
|
126 with open(
|
|
127 os.path.join(OUT_DIR,
|
|
128 configuration.EXTRACT_DOM_STAT), "w") as dom_stat:
|
|
129 for domain, count in domains_counts_dict.items():
|
|
130 dom_stat.write(";{}:{}\n".format(domain, count))
|
|
131 total += count
|
|
132 dom_stat.write(";TOTAL:{}\n".format(total))
|
|
133
|
|
134
|
|
135 def main(args):
|
|
136
|
|
137 DNA_SEQ = args.input_dna
|
|
138 DOM_GFF = args.domains_gff
|
|
139 OUT_DIR = args.out_dir
|
|
140 CLASS_TBL = args.classification
|
|
141 EXTENDED = args.extended
|
|
142
|
|
143 if not os.path.exists(OUT_DIR):
|
|
144 os.makedirs(OUT_DIR)
|
|
145
|
|
146 domains_counts_dict = extract_nt_seqs(DNA_SEQ, DOM_GFF, OUT_DIR, CLASS_TBL,
|
|
147 EXTENDED)
|
|
148 write_domains_stat(domains_counts_dict, OUT_DIR)
|
|
149
|
|
150 print("ELAPSED_TIME_EXTRACTION = {} s\n".format(time.time() -
|
|
151 t_nt_seqs_extraction))
|
|
152
|
|
153
|
|
154 if __name__ == "__main__":
|
|
155
|
|
156 # Command line arguments
|
|
157 parser = argparse.ArgumentParser()
|
|
158 parser.add_argument('-i',
|
|
159 '--input_dna',
|
|
160 type=str,
|
|
161 required=True,
|
|
162 help='path to input DNA sequence')
|
|
163 parser.add_argument('-d',
|
|
164 '--domains_gff',
|
|
165 type=str,
|
|
166 required=True,
|
|
167 help='GFF file of protein domains')
|
|
168 parser.add_argument('-cs',
|
|
169 '--classification',
|
|
170 type=str,
|
|
171 required=True,
|
|
172 help='protein domains classification file')
|
|
173 parser.add_argument('-out',
|
|
174 '--out_dir',
|
|
175 type=str,
|
|
176 default=configuration.EXTRACT_OUT_DIR,
|
|
177 help='output directory')
|
|
178 parser.add_argument(
|
|
179 '-ex',
|
|
180 '--extended',
|
|
181 type=str2bool,
|
|
182 default=True,
|
|
183 help=
|
|
184 'extend the domains edges if not the whole datatabase sequence was aligned')
|
|
185 args = parser.parse_args()
|
|
186 main(args)
|