comparison GFFOutput.py @ 17:4c201a3d4755 draft

planemo upload for repository https://github.com/usegalaxy-eu/temporary-tools/tree/master/jbrowse2 commit a37bfdfc108501b11c7b2aa15efb1bd16f0c4b66
author fubar
date Sun, 28 Jan 2024 06:48:52 +0000
parents
children
comparison
equal deleted inserted replaced
16:1fe91657bfd6 17:4c201a3d4755
1 """Output Biopython SeqRecords and SeqFeatures to GFF3 format.
2
3 The target format is GFF3, the current GFF standard:
4 http://www.sequenceontology.org/gff3.shtml
5 """
6 from six.moves import urllib
7
8 from Bio import SeqIO
9
10
11 class _IdHandler:
12 """Generate IDs for GFF3 Parent/Child
13 relationships where they don't exist."""
14
15 def __init__(self):
16 self._prefix = "biopygen"
17 self._counter = 1
18 self._seen_ids = []
19
20 def _generate_id(self, quals):
21 """Generate a unique ID not present in our existing IDs."""
22 gen_id = self._get_standard_id(quals)
23 if gen_id is None:
24 while 1:
25 gen_id = "%s%s" % (self._prefix, self._counter)
26 if gen_id not in self._seen_ids:
27 break
28 self._counter += 1
29 return gen_id
30
31 def _get_standard_id(self, quals):
32 """Retrieve standardized IDs from other sources like NCBI GenBank.
33
34 This tries to find IDs from known key/values when stored differently
35 than GFF3 specifications.
36 """
37 possible_keys = ["transcript_id", "protein_id"]
38 for test_key in possible_keys:
39 if test_key in quals:
40 cur_id = quals[test_key]
41 if isinstance(cur_id, tuple) or isinstance(cur_id, list):
42 return cur_id[0]
43 else:
44 return cur_id
45 return None
46
47 def update_quals(self, quals, has_children):
48 """Update a set of qualifiers, adding an ID if necessary."""
49 cur_id = quals.get("ID", None)
50 # if we have an ID, record it
51 if cur_id:
52 if not isinstance(cur_id, list) and not isinstance(cur_id, tuple):
53 cur_id = [cur_id]
54 for add_id in cur_id:
55 self._seen_ids.append(add_id)
56 # if we need one and don't have it, create a new one
57 elif has_children:
58 new_id = self._generate_id(quals)
59 self._seen_ids.append(new_id)
60 quals["ID"] = [new_id]
61 return quals
62
63
64 class GFF3Writer:
65 """Write GFF3 files starting with standard Biopython objects."""
66
67 def __init__(self):
68 pass
69
70 def write(self, recs, out_handle, include_fasta=False):
71 """Write the provided records to the given handle in GFF3 format."""
72 id_handler = _IdHandler()
73 self._write_header(out_handle)
74 fasta_recs = []
75 try:
76 recs = iter(recs)
77 except TypeError:
78 recs = [recs]
79 for rec in recs:
80 self._write_rec(rec, out_handle)
81 self._write_annotations(rec.annotations, rec.id, len(rec.seq), out_handle)
82 for sf in rec.features:
83 sf = self._clean_feature(sf)
84 id_handler = self._write_feature(sf, rec.id, out_handle, id_handler)
85 if include_fasta and len(rec.seq) > 0:
86 fasta_recs.append(rec)
87 if len(fasta_recs) > 0:
88 self._write_fasta(fasta_recs, out_handle)
89
90 def _clean_feature(self, feature):
91 quals = {}
92 for key, val in feature.qualifiers.items():
93 if not isinstance(val, (list, tuple)):
94 val = [val]
95 val = [str(x) for x in val]
96 quals[key] = val
97 feature.qualifiers = quals
98 # Support for Biopython 1.68 and above, which removed sub_features
99 if not hasattr(feature, "sub_features"):
100 feature.sub_features = []
101 clean_sub = [self._clean_feature(f) for f in feature.sub_features]
102 feature.sub_features = clean_sub
103 return feature
104
105 def _write_rec(self, rec, out_handle):
106 # if we have a SeqRecord, write out optional directive
107 if len(rec.seq) > 0:
108 out_handle.write("##sequence-region %s 1 %s\n" % (rec.id, len(rec.seq)))
109
110 def _get_phase(self, feature):
111 if "phase" in feature.qualifiers:
112 phase = feature.qualifiers["phase"][0]
113 elif feature.type == "CDS":
114 phase = int(feature.qualifiers.get("codon_start", [1])[0]) - 1
115 else:
116 phase = "."
117 return str(phase)
118
119 def _write_feature(self, feature, rec_id, out_handle, id_handler, parent_id=None):
120 """Write a feature with location information."""
121 if feature.location.strand == 1:
122 strand = "+"
123 elif feature.location.strand == -1:
124 strand = "-"
125 else:
126 strand = "."
127 # remove any standard features from the qualifiers
128 quals = feature.qualifiers.copy()
129 for std_qual in ["source", "score", "phase"]:
130 if std_qual in quals and len(quals[std_qual]) == 1:
131 del quals[std_qual]
132 # add a link to a parent identifier if it exists
133 if parent_id:
134 if "Parent" not in quals:
135 quals["Parent"] = []
136 quals["Parent"].append(parent_id)
137 quals = id_handler.update_quals(quals, len(feature.sub_features) > 0)
138 if feature.type:
139 ftype = feature.type
140 else:
141 ftype = "sequence_feature"
142 parts = [
143 str(rec_id),
144 feature.qualifiers.get("source", ["feature"])[0],
145 ftype,
146 str(feature.location.start + 1), # 1-based indexing
147 str(feature.location.end),
148 feature.qualifiers.get("score", ["."])[0],
149 strand,
150 self._get_phase(feature),
151 self._format_keyvals(quals),
152 ]
153 out_handle.write("\t".join(parts) + "\n")
154 for sub_feature in feature.sub_features:
155 id_handler = self._write_feature(
156 sub_feature,
157 rec_id,
158 out_handle,
159 id_handler,
160 quals["ID"][0],
161 )
162 return id_handler
163
164 def _format_keyvals(self, keyvals):
165 format_kvs = []
166 for key in sorted(keyvals.keys()):
167 values = keyvals[key]
168 key = key.strip()
169 format_vals = []
170 if not isinstance(values, list) or isinstance(values, tuple):
171 values = [values]
172 for val in values:
173 val = urllib.parse.quote(str(val).strip(), safe=":/ ")
174 if (key and val) and val not in format_vals:
175 format_vals.append(val)
176 format_kvs.append("%s=%s" % (key, ",".join(format_vals)))
177 return ";".join(format_kvs)
178
179 def _write_annotations(self, anns, rec_id, size, out_handle):
180 """Add annotations which refer to an entire sequence."""
181 format_anns = self._format_keyvals(anns)
182 if format_anns:
183 parts = [
184 rec_id,
185 "annotation",
186 "remark",
187 "1",
188 str(size if size > 1 else 1),
189 ".",
190 ".",
191 ".",
192 format_anns,
193 ]
194 out_handle.write("\t".join(parts) + "\n")
195
196 def _write_header(self, out_handle):
197 """Write out standard header directives."""
198 out_handle.write("##gff-version 3\n")
199
200 def _write_fasta(self, recs, out_handle):
201 """Write sequence records using the ##FASTA directive."""
202 out_handle.write("##FASTA\n")
203 SeqIO.write(recs, out_handle, "fasta")
204
205
206 def write(recs, out_handle, include_fasta=False):
207 """High level interface to write GFF3 files from SeqRecords and SeqFeatures.
208
209 If include_fasta is True, the GFF3 file will include sequence information
210 using the ##FASTA directive.
211 """
212 writer = GFF3Writer()
213 return writer.write(recs, out_handle, include_fasta)