Mercurial > repos > cpt > cpt_export_seq_unique
comparison gff3_extract_sequence.py @ 1:ac766d7dd641 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:41:21 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:aaed5a0c774c | 1:ac766d7dd641 |
---|---|
1 #!/usr/bin/env python | |
2 import sys | |
3 import argparse | |
4 import logging | |
5 import uuid | |
6 from CPT_GFFParser import gffParse, gffWrite | |
7 from Bio import SeqIO | |
8 from Bio.Seq import Seq | |
9 from Bio.SeqRecord import SeqRecord | |
10 from Bio.SeqFeature import FeatureLocation, CompoundLocation | |
11 from gff3 import feature_lambda, feature_test_type, get_id | |
12 | |
13 logging.basicConfig(level=logging.INFO) | |
14 log = logging.getLogger(__name__) | |
15 | |
16 | |
17 def main(fasta, gff3, feature_filter=None, nodesc=False): | |
18 if feature_filter == "nice_cds": | |
19 from gff2gb import gff3_to_genbank as cpt_Gff2Gbk | |
20 | |
21 for rec in cpt_Gff2Gbk(gff3, fasta, 11): | |
22 seenList = {} | |
23 if rec.seq[0] == "?": | |
24 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
25 exit(1) | |
26 for feat in sorted(rec.features, key=lambda x: x.location.start): | |
27 if feat.type != "CDS": | |
28 continue | |
29 | |
30 ind = 0 | |
31 if ( | |
32 str( | |
33 feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-") | |
34 ) | |
35 in seenList.keys() | |
36 ): | |
37 seenList[ | |
38 str( | |
39 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
40 " ", "-" | |
41 ) | |
42 ) | |
43 ] += 1 | |
44 ind = seenList[ | |
45 str( | |
46 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
47 " ", "-" | |
48 ) | |
49 ) | |
50 ] | |
51 else: | |
52 seenList[ | |
53 str( | |
54 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
55 " ", "-" | |
56 ) | |
57 ) | |
58 ] = 1 | |
59 append = "" | |
60 if ind != 0: | |
61 append = "_" + str(ind) | |
62 | |
63 if nodesc: | |
64 description = "" | |
65 else: | |
66 feat.qualifiers["ID"] = [feat._ID] | |
67 product = feat.qualifiers.get("product", "") | |
68 description = ( | |
69 "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format( | |
70 feat, product | |
71 ) | |
72 ) | |
73 yield [ | |
74 SeqRecord( | |
75 feat.extract(rec).seq, | |
76 id=str( | |
77 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
78 " ", "-" | |
79 ) | |
80 ) | |
81 + append, | |
82 description=description, | |
83 ) | |
84 ] | |
85 | |
86 elif feature_filter == "unique_cds": | |
87 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
88 seen_ids = {} | |
89 | |
90 for rec in gffParse(gff3, base_dict=seq_dict): | |
91 noMatch = True | |
92 if "Alias" in rec.features[0].qualifiers.keys(): | |
93 lColumn = rec.features[0].qualifiers["Alias"][0] | |
94 else: | |
95 lColumn = "" | |
96 for x in seq_dict: | |
97 if x == rec.id or x == lColumn: | |
98 noMatch = False | |
99 if noMatch: | |
100 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
101 exit(1) | |
102 newfeats = [] | |
103 for feat in sorted( | |
104 feature_lambda( | |
105 rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False | |
106 ), | |
107 key=lambda f: f.location.start, | |
108 ): | |
109 nid = rec.id + "____" + feat.id | |
110 if nid in seen_ids: | |
111 nid = nid + "__" + uuid.uuid4().hex | |
112 feat.qualifiers["ID"] = [nid] | |
113 newfeats.append(feat) | |
114 seen_ids[nid] = True | |
115 | |
116 if nodesc: | |
117 description = "" | |
118 else: | |
119 if feat.strand == -1: | |
120 important_data = { | |
121 "Location": FeatureLocation( | |
122 feat.location.start + 1, | |
123 feat.location.end - feat.phase, | |
124 feat.strand, | |
125 ) | |
126 } | |
127 else: | |
128 important_data = { | |
129 "Location": FeatureLocation( | |
130 feat.location.start + 1 + feat.phase, | |
131 feat.location.end, | |
132 feat.strand, | |
133 ) | |
134 } | |
135 if "Name" in feat.qualifiers: | |
136 important_data["Name"] = feat.qualifiers.get("Name", [""])[0] | |
137 | |
138 description = "[{}]".format( | |
139 ";".join( | |
140 [ | |
141 "{key}={value}".format(key=k, value=v) | |
142 for (k, v) in important_data.items() | |
143 ] | |
144 ) | |
145 ) | |
146 # if feat.id == "CPT_Privateer_006.p01": | |
147 # print(feat) | |
148 # exit() | |
149 | |
150 if isinstance(feat.location, CompoundLocation): | |
151 finSeq = "" | |
152 if feat.strand == -1: | |
153 for x in feat.location.parts: | |
154 finSeq += str( | |
155 ( | |
156 rec.seq[ | |
157 feat.location.start : feat.location.end | |
158 - feat.phase | |
159 ] | |
160 ).reverse_complement() | |
161 ) | |
162 else: | |
163 for x in feat.location.parts: | |
164 finSeq += str( | |
165 rec.seq[ | |
166 feat.location.start + feat.phase : feat.location.end | |
167 ] | |
168 ) | |
169 yield [ | |
170 SeqRecord( | |
171 finSeq, | |
172 id=nid.replace(" ", "-"), | |
173 description=description, | |
174 ) | |
175 ] | |
176 elif feat.strand == -1: | |
177 yield [ | |
178 SeqRecord( | |
179 ( | |
180 rec.seq[ | |
181 feat.location.start : feat.location.end - feat.phase | |
182 ] | |
183 ).reverse_complement(), | |
184 id=nid.replace(" ", "-"), | |
185 description=description, | |
186 ) | |
187 ] | |
188 else: | |
189 yield [ | |
190 SeqRecord( | |
191 # feat.extract(rec).seq, | |
192 rec.seq[ | |
193 feat.location.start + feat.phase : feat.location.end | |
194 ], | |
195 id=nid.replace(" ", "-"), | |
196 description=description, | |
197 ) | |
198 ] | |
199 rec.features = newfeats | |
200 rec.annotations = {} | |
201 # gffWrite([rec], sys.stdout) | |
202 else: | |
203 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
204 | |
205 for rec in gffParse(gff3, base_dict=seq_dict): | |
206 noMatch = True | |
207 if "Alias" in rec.features[0].qualifiers.keys(): | |
208 lColumn = rec.features[0].qualifiers["Alias"][0] | |
209 else: | |
210 lColumn = "" | |
211 for x in seq_dict: | |
212 if x == rec.id or x == lColumn: | |
213 noMatch = False | |
214 if noMatch: | |
215 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
216 exit(1) | |
217 for feat in sorted( | |
218 feature_lambda( | |
219 rec.features, | |
220 feature_test_type, | |
221 {"type": feature_filter}, | |
222 subfeatures=True, | |
223 ), | |
224 key=lambda f: f.location.start, | |
225 ): | |
226 id = feat.id | |
227 if len(id) == 0: | |
228 id = get_id(feat) | |
229 | |
230 if nodesc: | |
231 description = "" | |
232 else: | |
233 if feat.strand == -1: | |
234 important_data = { | |
235 "Location": FeatureLocation( | |
236 feat.location.start + 1, | |
237 feat.location.end - feat.phase, | |
238 feat.strand, | |
239 ) | |
240 } | |
241 else: | |
242 important_data = { | |
243 "Location": FeatureLocation( | |
244 feat.location.start + 1 + feat.phase, | |
245 feat.location.end, | |
246 feat.strand, | |
247 ) | |
248 } | |
249 if "Name" in feat.qualifiers: | |
250 important_data["Name"] = feat.qualifiers.get("Name", [""])[0] | |
251 | |
252 description = "[{}]".format( | |
253 ";".join( | |
254 [ | |
255 "{key}={value}".format(key=k, value=v) | |
256 for (k, v) in important_data.items() | |
257 ] | |
258 ) | |
259 ) | |
260 | |
261 if isinstance(feat.location, CompoundLocation): | |
262 finSeq = "" | |
263 if feat.strand == -1: | |
264 for x in feat.location.parts: | |
265 finSeq += str( | |
266 ( | |
267 rec.seq[x.start : x.end - feat.phase] | |
268 ).reverse_complement() | |
269 ) | |
270 else: | |
271 for x in feat.location.parts: | |
272 finSeq += str(rec.seq[x.start + feat.phase : x.end]) | |
273 yield [ | |
274 SeqRecord( | |
275 Seq(finSeq), | |
276 id=id.replace(" ", "-"), | |
277 description=description, | |
278 ) | |
279 ] | |
280 | |
281 else: | |
282 | |
283 if feat.strand == -1: | |
284 yield [ | |
285 SeqRecord( | |
286 seq=Seq( | |
287 str( | |
288 rec.seq[ | |
289 feat.location.start : feat.location.end | |
290 - feat.phase | |
291 ] | |
292 ) | |
293 ).reverse_complement(), | |
294 id=id.replace(" ", "-"), | |
295 description=description, | |
296 ) | |
297 ] | |
298 else: | |
299 yield [ | |
300 SeqRecord( | |
301 # feat.extract(rec).seq, | |
302 seq=Seq( | |
303 str( | |
304 rec.seq[ | |
305 feat.location.start | |
306 + feat.phase : feat.location.end | |
307 ] | |
308 ) | |
309 ), | |
310 id=id.replace(" ", "-"), | |
311 description=description, | |
312 ) | |
313 ] | |
314 | |
315 | |
316 if __name__ == "__main__": | |
317 parser = argparse.ArgumentParser( | |
318 description="Export corresponding sequence in genome from GFF3", epilog="" | |
319 ) | |
320 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") | |
321 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File") | |
322 parser.add_argument( | |
323 "--feature_filter", default=None, help="Filter for specific feature types" | |
324 ) | |
325 parser.add_argument( | |
326 "--nodesc", action="store_true", help="Strip description field off" | |
327 ) | |
328 args = parser.parse_args() | |
329 for seq in main(**vars(args)): | |
330 # if isinstance(seq, list): | |
331 # for x in seq: | |
332 # print(type(x.seq)) | |
333 # SeqIO.write(x, sys.stdout, "fasta") | |
334 # else: | |
335 SeqIO.write(seq, sys.stdout, "fasta") |