comparison jb2_GFF/GFFOutput.py @ 17:4c201a3d4755 draft

planemo upload for repository https://github.com/usegalaxy-eu/temporary-tools/tree/master/jbrowse2 commit a37bfdfc108501b11c7b2aa15efb1bd16f0c4b66
author fubar
date Sun, 28 Jan 2024 06:48:52 +0000
parents
children
comparison
equal deleted inserted replaced
16:1fe91657bfd6 17:4c201a3d4755
1 """Output Biopython SeqRecords and SeqFeatures to GFF3 format.
2
3 The target format is GFF3, the current GFF standard:
4 http://www.sequenceontology.org/gff3.shtml
5 """
6 from six.moves import urllib
7
8 from Bio import SeqIO
9
10
11 class _IdHandler:
12 """Generate IDs for GFF3 Parent/Child
13 relationships where they don't exist."""
14
15 def __init__(self):
16 self._prefix = "biopygen"
17 self._counter = 1
18 self._seen_ids = []
19
20 def _generate_id(self, quals):
21 """Generate a unique ID not present in our existing IDs."""
22 gen_id = self._get_standard_id(quals)
23 if gen_id is None:
24 while 1:
25 gen_id = "%s%s" % (self._prefix, self._counter)
26 if gen_id not in self._seen_ids:
27 break
28 self._counter += 1
29 return gen_id
30
31 def _get_standard_id(self, quals):
32 """Retrieve standardized IDs from other sources like NCBI GenBank.
33
34 This tries to find IDs from known key/values when stored differently
35 than GFF3 specifications.
36 """
37 possible_keys = ["transcript_id", "protein_id"]
38 for test_key in possible_keys:
39 if test_key in quals:
40 cur_id = quals[test_key]
41 if isinstance(cur_id, tuple) or isinstance(
42 cur_id, list
43 ):
44 return cur_id[0]
45 else:
46 return cur_id
47 return None
48
49 def update_quals(self, quals, has_children):
50 """Update a set of qualifiers, adding an ID if necessary."""
51 cur_id = quals.get("ID", None)
52 # if we have an ID, record it
53 if cur_id:
54 if not isinstance(cur_id, list) and not isinstance(
55 cur_id, tuple
56 ):
57 cur_id = [cur_id]
58 for add_id in cur_id:
59 self._seen_ids.append(add_id)
60 # if we need one and don't have it, create a new one
61 elif has_children:
62 new_id = self._generate_id(quals)
63 self._seen_ids.append(new_id)
64 quals["ID"] = [new_id]
65 return quals
66
67
68 class GFF3Writer:
69 """Write GFF3 files starting with standard Biopython objects."""
70
71 def __init__(self):
72 pass
73
74 def write(self, recs, out_handle, include_fasta=False):
75 """Write the provided records to the given handle in GFF3 format."""
76 id_handler = _IdHandler()
77 self._write_header(out_handle)
78 fasta_recs = []
79 try:
80 recs = iter(recs)
81 except TypeError:
82 recs = [recs]
83 for rec in recs:
84 self._write_rec(rec, out_handle)
85 self._write_annotations(
86 rec.annotations, rec.id, len(rec.seq), out_handle
87 )
88 for sf in rec.features:
89 sf = self._clean_feature(sf)
90 id_handler = self._write_feature(
91 sf, rec.id, out_handle, id_handler
92 )
93 if include_fasta and len(rec.seq) > 0:
94 fasta_recs.append(rec)
95 if len(fasta_recs) > 0:
96 self._write_fasta(fasta_recs, out_handle)
97
98 def _clean_feature(self, feature):
99 quals = {}
100 for key, val in feature.qualifiers.items():
101 if not isinstance(val, (list, tuple)):
102 val = [val]
103 val = [str(x) for x in val]
104 quals[key] = val
105 feature.qualifiers = quals
106 # Support for Biopython 1.68 and above, which removed sub_features
107 if not hasattr(feature, "sub_features"):
108 feature.sub_features = []
109 clean_sub = [
110 self._clean_feature(f) for f in feature.sub_features
111 ]
112 feature.sub_features = clean_sub
113 return feature
114
115 def _write_rec(self, rec, out_handle):
116 # if we have a SeqRecord, write out optional directive
117 if len(rec.seq) > 0:
118 out_handle.write(
119 "##sequence-region %s 1 %s\n" % (rec.id, len(rec.seq))
120 )
121
122 def _get_phase(self, feature):
123 if "phase" in feature.qualifiers:
124 phase = feature.qualifiers["phase"][0]
125 elif feature.type == "CDS":
126 phase = (
127 int(feature.qualifiers.get("codon_start", [1])[0]) - 1
128 )
129 else:
130 phase = "."
131 return str(phase)
132
133 def _write_feature(
134 self, feature, rec_id, out_handle, id_handler, parent_id=None
135 ):
136 """Write a feature with location information."""
137 if feature.location.strand == 1:
138 strand = "+"
139 elif feature.location.strand == -1:
140 strand = "-"
141 else:
142 strand = "."
143 # remove any standard features from the qualifiers
144 quals = feature.qualifiers.copy()
145 for std_qual in ["source", "score", "phase"]:
146 if std_qual in quals and len(quals[std_qual]) == 1:
147 del quals[std_qual]
148 # add a link to a parent identifier if it exists
149 if parent_id:
150 if "Parent" not in quals:
151 quals["Parent"] = []
152 quals["Parent"].append(parent_id)
153 quals = id_handler.update_quals(
154 quals, len(feature.sub_features) > 0
155 )
156 if feature.type:
157 ftype = feature.type
158 else:
159 ftype = "sequence_feature"
160 parts = [
161 str(rec_id),
162 feature.qualifiers.get("source", ["feature"])[0],
163 ftype,
164 str(feature.location.start + 1), # 1-based indexing
165 str(feature.location.end),
166 feature.qualifiers.get("score", ["."])[0],
167 strand,
168 self._get_phase(feature),
169 self._format_keyvals(quals),
170 ]
171 out_handle.write("\t".join(parts) + "\n")
172 for sub_feature in feature.sub_features:
173 id_handler = self._write_feature(
174 sub_feature,
175 rec_id,
176 out_handle,
177 id_handler,
178 quals["ID"][0],
179 )
180 return id_handler
181
182 def _format_keyvals(self, keyvals):
183 format_kvs = []
184 for key in sorted(keyvals.keys()):
185 values = keyvals[key]
186 key = key.strip()
187 format_vals = []
188 if not isinstance(values, list) or isinstance(
189 values, tuple
190 ):
191 values = [values]
192 for val in values:
193 val = urllib.parse.quote(str(val).strip(), safe=":/ ")
194 if (key and val) and val not in format_vals:
195 format_vals.append(val)
196 format_kvs.append("%s=%s" % (key, ",".join(format_vals)))
197 return ";".join(format_kvs)
198
199 def _write_annotations(self, anns, rec_id, size, out_handle):
200 """Add annotations which refer to an entire sequence."""
201 format_anns = self._format_keyvals(anns)
202 if format_anns:
203 parts = [
204 rec_id,
205 "annotation",
206 "remark",
207 "1",
208 str(size if size > 1 else 1),
209 ".",
210 ".",
211 ".",
212 format_anns,
213 ]
214 out_handle.write("\t".join(parts) + "\n")
215
216 def _write_header(self, out_handle):
217 """Write out standard header directives."""
218 out_handle.write("##gff-version 3\n")
219
220 def _write_fasta(self, recs, out_handle):
221 """Write sequence records using the ##FASTA directive."""
222 out_handle.write("##FASTA\n")
223 SeqIO.write(recs, out_handle, "fasta")
224
225
226 def write(recs, out_handle, include_fasta=False):
227 """High level interface to write GFF3 files from SeqRecords and SeqFeatures.
228
229 If include_fasta is True, the GFF3 file will include sequence information
230 using the ##FASTA directive.
231 """
232 writer = GFF3Writer()
233 return writer.write(recs, out_handle, include_fasta)