Mercurial > repos > fubar > jbrowse2
comparison jb2_GFF/GFFOutput.py @ 17:4c201a3d4755 draft
planemo upload for repository https://github.com/usegalaxy-eu/temporary-tools/tree/master/jbrowse2 commit a37bfdfc108501b11c7b2aa15efb1bd16f0c4b66
author | fubar |
---|---|
date | Sun, 28 Jan 2024 06:48:52 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
16:1fe91657bfd6 | 17:4c201a3d4755 |
---|---|
1 """Output Biopython SeqRecords and SeqFeatures to GFF3 format. | |
2 | |
3 The target format is GFF3, the current GFF standard: | |
4 http://www.sequenceontology.org/gff3.shtml | |
5 """ | |
6 from six.moves import urllib | |
7 | |
8 from Bio import SeqIO | |
9 | |
10 | |
11 class _IdHandler: | |
12 """Generate IDs for GFF3 Parent/Child | |
13 relationships where they don't exist.""" | |
14 | |
15 def __init__(self): | |
16 self._prefix = "biopygen" | |
17 self._counter = 1 | |
18 self._seen_ids = [] | |
19 | |
20 def _generate_id(self, quals): | |
21 """Generate a unique ID not present in our existing IDs.""" | |
22 gen_id = self._get_standard_id(quals) | |
23 if gen_id is None: | |
24 while 1: | |
25 gen_id = "%s%s" % (self._prefix, self._counter) | |
26 if gen_id not in self._seen_ids: | |
27 break | |
28 self._counter += 1 | |
29 return gen_id | |
30 | |
31 def _get_standard_id(self, quals): | |
32 """Retrieve standardized IDs from other sources like NCBI GenBank. | |
33 | |
34 This tries to find IDs from known key/values when stored differently | |
35 than GFF3 specifications. | |
36 """ | |
37 possible_keys = ["transcript_id", "protein_id"] | |
38 for test_key in possible_keys: | |
39 if test_key in quals: | |
40 cur_id = quals[test_key] | |
41 if isinstance(cur_id, tuple) or isinstance( | |
42 cur_id, list | |
43 ): | |
44 return cur_id[0] | |
45 else: | |
46 return cur_id | |
47 return None | |
48 | |
49 def update_quals(self, quals, has_children): | |
50 """Update a set of qualifiers, adding an ID if necessary.""" | |
51 cur_id = quals.get("ID", None) | |
52 # if we have an ID, record it | |
53 if cur_id: | |
54 if not isinstance(cur_id, list) and not isinstance( | |
55 cur_id, tuple | |
56 ): | |
57 cur_id = [cur_id] | |
58 for add_id in cur_id: | |
59 self._seen_ids.append(add_id) | |
60 # if we need one and don't have it, create a new one | |
61 elif has_children: | |
62 new_id = self._generate_id(quals) | |
63 self._seen_ids.append(new_id) | |
64 quals["ID"] = [new_id] | |
65 return quals | |
66 | |
67 | |
68 class GFF3Writer: | |
69 """Write GFF3 files starting with standard Biopython objects.""" | |
70 | |
71 def __init__(self): | |
72 pass | |
73 | |
74 def write(self, recs, out_handle, include_fasta=False): | |
75 """Write the provided records to the given handle in GFF3 format.""" | |
76 id_handler = _IdHandler() | |
77 self._write_header(out_handle) | |
78 fasta_recs = [] | |
79 try: | |
80 recs = iter(recs) | |
81 except TypeError: | |
82 recs = [recs] | |
83 for rec in recs: | |
84 self._write_rec(rec, out_handle) | |
85 self._write_annotations( | |
86 rec.annotations, rec.id, len(rec.seq), out_handle | |
87 ) | |
88 for sf in rec.features: | |
89 sf = self._clean_feature(sf) | |
90 id_handler = self._write_feature( | |
91 sf, rec.id, out_handle, id_handler | |
92 ) | |
93 if include_fasta and len(rec.seq) > 0: | |
94 fasta_recs.append(rec) | |
95 if len(fasta_recs) > 0: | |
96 self._write_fasta(fasta_recs, out_handle) | |
97 | |
98 def _clean_feature(self, feature): | |
99 quals = {} | |
100 for key, val in feature.qualifiers.items(): | |
101 if not isinstance(val, (list, tuple)): | |
102 val = [val] | |
103 val = [str(x) for x in val] | |
104 quals[key] = val | |
105 feature.qualifiers = quals | |
106 # Support for Biopython 1.68 and above, which removed sub_features | |
107 if not hasattr(feature, "sub_features"): | |
108 feature.sub_features = [] | |
109 clean_sub = [ | |
110 self._clean_feature(f) for f in feature.sub_features | |
111 ] | |
112 feature.sub_features = clean_sub | |
113 return feature | |
114 | |
115 def _write_rec(self, rec, out_handle): | |
116 # if we have a SeqRecord, write out optional directive | |
117 if len(rec.seq) > 0: | |
118 out_handle.write( | |
119 "##sequence-region %s 1 %s\n" % (rec.id, len(rec.seq)) | |
120 ) | |
121 | |
122 def _get_phase(self, feature): | |
123 if "phase" in feature.qualifiers: | |
124 phase = feature.qualifiers["phase"][0] | |
125 elif feature.type == "CDS": | |
126 phase = ( | |
127 int(feature.qualifiers.get("codon_start", [1])[0]) - 1 | |
128 ) | |
129 else: | |
130 phase = "." | |
131 return str(phase) | |
132 | |
133 def _write_feature( | |
134 self, feature, rec_id, out_handle, id_handler, parent_id=None | |
135 ): | |
136 """Write a feature with location information.""" | |
137 if feature.location.strand == 1: | |
138 strand = "+" | |
139 elif feature.location.strand == -1: | |
140 strand = "-" | |
141 else: | |
142 strand = "." | |
143 # remove any standard features from the qualifiers | |
144 quals = feature.qualifiers.copy() | |
145 for std_qual in ["source", "score", "phase"]: | |
146 if std_qual in quals and len(quals[std_qual]) == 1: | |
147 del quals[std_qual] | |
148 # add a link to a parent identifier if it exists | |
149 if parent_id: | |
150 if "Parent" not in quals: | |
151 quals["Parent"] = [] | |
152 quals["Parent"].append(parent_id) | |
153 quals = id_handler.update_quals( | |
154 quals, len(feature.sub_features) > 0 | |
155 ) | |
156 if feature.type: | |
157 ftype = feature.type | |
158 else: | |
159 ftype = "sequence_feature" | |
160 parts = [ | |
161 str(rec_id), | |
162 feature.qualifiers.get("source", ["feature"])[0], | |
163 ftype, | |
164 str(feature.location.start + 1), # 1-based indexing | |
165 str(feature.location.end), | |
166 feature.qualifiers.get("score", ["."])[0], | |
167 strand, | |
168 self._get_phase(feature), | |
169 self._format_keyvals(quals), | |
170 ] | |
171 out_handle.write("\t".join(parts) + "\n") | |
172 for sub_feature in feature.sub_features: | |
173 id_handler = self._write_feature( | |
174 sub_feature, | |
175 rec_id, | |
176 out_handle, | |
177 id_handler, | |
178 quals["ID"][0], | |
179 ) | |
180 return id_handler | |
181 | |
182 def _format_keyvals(self, keyvals): | |
183 format_kvs = [] | |
184 for key in sorted(keyvals.keys()): | |
185 values = keyvals[key] | |
186 key = key.strip() | |
187 format_vals = [] | |
188 if not isinstance(values, list) or isinstance( | |
189 values, tuple | |
190 ): | |
191 values = [values] | |
192 for val in values: | |
193 val = urllib.parse.quote(str(val).strip(), safe=":/ ") | |
194 if (key and val) and val not in format_vals: | |
195 format_vals.append(val) | |
196 format_kvs.append("%s=%s" % (key, ",".join(format_vals))) | |
197 return ";".join(format_kvs) | |
198 | |
199 def _write_annotations(self, anns, rec_id, size, out_handle): | |
200 """Add annotations which refer to an entire sequence.""" | |
201 format_anns = self._format_keyvals(anns) | |
202 if format_anns: | |
203 parts = [ | |
204 rec_id, | |
205 "annotation", | |
206 "remark", | |
207 "1", | |
208 str(size if size > 1 else 1), | |
209 ".", | |
210 ".", | |
211 ".", | |
212 format_anns, | |
213 ] | |
214 out_handle.write("\t".join(parts) + "\n") | |
215 | |
216 def _write_header(self, out_handle): | |
217 """Write out standard header directives.""" | |
218 out_handle.write("##gff-version 3\n") | |
219 | |
220 def _write_fasta(self, recs, out_handle): | |
221 """Write sequence records using the ##FASTA directive.""" | |
222 out_handle.write("##FASTA\n") | |
223 SeqIO.write(recs, out_handle, "fasta") | |
224 | |
225 | |
226 def write(recs, out_handle, include_fasta=False): | |
227 """High level interface to write GFF3 files from SeqRecords and SeqFeatures. | |
228 | |
229 If include_fasta is True, the GFF3 file will include sequence information | |
230 using the ##FASTA directive. | |
231 """ | |
232 writer = GFF3Writer() | |
233 return writer.write(recs, out_handle, include_fasta) |