comparison gff3_extract_sequence.py @ 4:34b80e483fb8 draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:43:58 +0000
parents
children
comparison
equal deleted inserted replaced
3:73390562b5a2 4:34b80e483fb8
1 #!/usr/bin/env python
2 import sys
3 import argparse
4 import logging
5 import uuid
6 from CPT_GFFParser import gffParse, gffWrite
7 from Bio import SeqIO
8 from Bio.Seq import Seq
9 from Bio.SeqRecord import SeqRecord
10 from Bio.SeqFeature import FeatureLocation, CompoundLocation
11 from gff3 import feature_lambda, feature_test_type, get_id
12
13 logging.basicConfig(level=logging.INFO)
14 log = logging.getLogger(__name__)
15
16
17 def main(fasta, gff3, feature_filter=None, nodesc=False):
18 if feature_filter == "nice_cds":
19 from gff2gb import gff3_to_genbank as cpt_Gff2Gbk
20
21 for rec in cpt_Gff2Gbk(gff3, fasta, 11):
22 seenList = {}
23 if rec.seq[0] == "?":
24 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
25 exit(1)
26 for feat in sorted(rec.features, key=lambda x: x.location.start):
27 if feat.type != "CDS":
28 continue
29
30 ind = 0
31 if (
32 str(
33 feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-")
34 )
35 in seenList.keys()
36 ):
37 seenList[
38 str(
39 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
40 " ", "-"
41 )
42 )
43 ] += 1
44 ind = seenList[
45 str(
46 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
47 " ", "-"
48 )
49 )
50 ]
51 else:
52 seenList[
53 str(
54 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
55 " ", "-"
56 )
57 )
58 ] = 1
59 append = ""
60 if ind != 0:
61 append = "_" + str(ind)
62
63 if nodesc:
64 description = ""
65 else:
66 feat.qualifiers["ID"] = [feat._ID]
67 product = feat.qualifiers.get("product", "")
68 description = (
69 "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format(
70 feat, product
71 )
72 )
73 yield [
74 SeqRecord(
75 feat.extract(rec).seq,
76 id=str(
77 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
78 " ", "-"
79 )
80 )
81 + append,
82 description=description,
83 )
84 ]
85
86 elif feature_filter == "unique_cds":
87 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
88 seen_ids = {}
89
90 for rec in gffParse(gff3, base_dict=seq_dict):
91 noMatch = True
92 if "Alias" in rec.features[0].qualifiers.keys():
93 lColumn = rec.features[0].qualifiers["Alias"][0]
94 else:
95 lColumn = ""
96 for x in seq_dict:
97 if x == rec.id or x == lColumn:
98 noMatch = False
99 if noMatch:
100 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
101 exit(1)
102 newfeats = []
103 for feat in sorted(
104 feature_lambda(
105 rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False
106 ),
107 key=lambda f: f.location.start,
108 ):
109 nid = rec.id + "____" + feat.id
110 if nid in seen_ids:
111 nid = nid + "__" + uuid.uuid4().hex
112 feat.qualifiers["ID"] = [nid]
113 newfeats.append(feat)
114 seen_ids[nid] = True
115
116 if nodesc:
117 description = ""
118 else:
119 if feat.strand == -1:
120 important_data = {
121 "Location": FeatureLocation(
122 feat.location.start + 1,
123 feat.location.end - feat.phase,
124 feat.strand,
125 )
126 }
127 else:
128 important_data = {
129 "Location": FeatureLocation(
130 feat.location.start + 1 + feat.phase,
131 feat.location.end,
132 feat.strand,
133 )
134 }
135 if "Name" in feat.qualifiers:
136 important_data["Name"] = feat.qualifiers.get("Name", [""])[0]
137
138 description = "[{}]".format(
139 ";".join(
140 [
141 "{key}={value}".format(key=k, value=v)
142 for (k, v) in important_data.items()
143 ]
144 )
145 )
146 # if feat.id == "CPT_Privateer_006.p01":
147 # print(feat)
148 # exit()
149
150 if isinstance(feat.location, CompoundLocation):
151 finSeq = ""
152 if feat.strand == -1:
153 for x in feat.location.parts:
154 finSeq += str(
155 (
156 rec.seq[
157 feat.location.start : feat.location.end
158 - feat.phase
159 ]
160 ).reverse_complement()
161 )
162 else:
163 for x in feat.location.parts:
164 finSeq += str(
165 rec.seq[
166 feat.location.start + feat.phase : feat.location.end
167 ]
168 )
169 yield [
170 SeqRecord(
171 finSeq,
172 id=nid.replace(" ", "-"),
173 description=description,
174 )
175 ]
176 elif feat.strand == -1:
177 yield [
178 SeqRecord(
179 (
180 rec.seq[
181 feat.location.start : feat.location.end - feat.phase
182 ]
183 ).reverse_complement(),
184 id=nid.replace(" ", "-"),
185 description=description,
186 )
187 ]
188 else:
189 yield [
190 SeqRecord(
191 # feat.extract(rec).seq,
192 rec.seq[
193 feat.location.start + feat.phase : feat.location.end
194 ],
195 id=nid.replace(" ", "-"),
196 description=description,
197 )
198 ]
199 rec.features = newfeats
200 rec.annotations = {}
201 # gffWrite([rec], sys.stdout)
202 else:
203 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
204
205 for rec in gffParse(gff3, base_dict=seq_dict):
206 noMatch = True
207 if "Alias" in rec.features[0].qualifiers.keys():
208 lColumn = rec.features[0].qualifiers["Alias"][0]
209 else:
210 lColumn = ""
211 for x in seq_dict:
212 if x == rec.id or x == lColumn:
213 noMatch = False
214 if noMatch:
215 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
216 exit(1)
217 for feat in sorted(
218 feature_lambda(
219 rec.features,
220 feature_test_type,
221 {"type": feature_filter},
222 subfeatures=True,
223 ),
224 key=lambda f: f.location.start,
225 ):
226 id = feat.id
227 if len(id) == 0:
228 id = get_id(feat)
229
230 if nodesc:
231 description = ""
232 else:
233 if feat.strand == -1:
234 important_data = {
235 "Location": FeatureLocation(
236 feat.location.start + 1,
237 feat.location.end - feat.phase,
238 feat.strand,
239 )
240 }
241 else:
242 important_data = {
243 "Location": FeatureLocation(
244 feat.location.start + 1 + feat.phase,
245 feat.location.end,
246 feat.strand,
247 )
248 }
249 if "Name" in feat.qualifiers:
250 important_data["Name"] = feat.qualifiers.get("Name", [""])[0]
251
252 description = "[{}]".format(
253 ";".join(
254 [
255 "{key}={value}".format(key=k, value=v)
256 for (k, v) in important_data.items()
257 ]
258 )
259 )
260
261 if isinstance(feat.location, CompoundLocation):
262 finSeq = ""
263 if feat.strand == -1:
264 for x in feat.location.parts:
265 finSeq += str(
266 (
267 rec.seq[x.start : x.end - feat.phase]
268 ).reverse_complement()
269 )
270 else:
271 for x in feat.location.parts:
272 finSeq += str(rec.seq[x.start + feat.phase : x.end])
273 yield [
274 SeqRecord(
275 Seq(finSeq),
276 id=id.replace(" ", "-"),
277 description=description,
278 )
279 ]
280
281 else:
282
283 if feat.strand == -1:
284 yield [
285 SeqRecord(
286 seq=Seq(
287 str(
288 rec.seq[
289 feat.location.start : feat.location.end
290 - feat.phase
291 ]
292 )
293 ).reverse_complement(),
294 id=id.replace(" ", "-"),
295 description=description,
296 )
297 ]
298 else:
299 yield [
300 SeqRecord(
301 # feat.extract(rec).seq,
302 seq=Seq(
303 str(
304 rec.seq[
305 feat.location.start
306 + feat.phase : feat.location.end
307 ]
308 )
309 ),
310 id=id.replace(" ", "-"),
311 description=description,
312 )
313 ]
314
315
316 if __name__ == "__main__":
317 parser = argparse.ArgumentParser(
318 description="Export corresponding sequence in genome from GFF3", epilog=""
319 )
320 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome")
321 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File")
322 parser.add_argument(
323 "--feature_filter", default=None, help="Filter for specific feature types"
324 )
325 parser.add_argument(
326 "--nodesc", action="store_true", help="Strip description field off"
327 )
328 args = parser.parse_args()
329 for seq in main(**vars(args)):
330 # if isinstance(seq, list):
331 # for x in seq:
332 # print(type(x.seq))
333 # SeqIO.write(x, sys.stdout, "fasta")
334 # else:
335 SeqIO.write(seq, sys.stdout, "fasta")