comparison glimmerHMM/BCBio/GFF/GFFOutput.py @ 0:c9699375fcf6 draft

planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/tools/glimmer_hmm commit 0dc67759bcbdf5a8a285ded9ba751340d741fe63
author bgruening
date Wed, 13 Jul 2016 10:55:48 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:c9699375fcf6
1 """Output Biopython SeqRecords and SeqFeatures to GFF3 format.
2
3 The target format is GFF3, the current GFF standard:
4 http://www.sequenceontology.org/gff3.shtml
5 """
6 import urllib
7
8 class _IdHandler:
9 """Generate IDs for GFF3 Parent/Child relationships where they don't exist.
10 """
11 def __init__(self):
12 self._prefix = "biopygen"
13 self._counter = 1
14 self._seen_ids = []
15
16 def _generate_id(self, quals):
17 """Generate a unique ID not present in our existing IDs.
18 """
19 gen_id = self._get_standard_id(quals)
20 if gen_id is None:
21 while 1:
22 gen_id = "%s%s" % (self._prefix, self._counter)
23 if gen_id not in self._seen_ids:
24 break
25 self._counter += 1
26 return gen_id
27
28 def _get_standard_id(self, quals):
29 """Retrieve standardized IDs from other sources like NCBI GenBank.
30
31 This tries to find IDs from known key/values when stored differently
32 than GFF3 specifications.
33 """
34 possible_keys = ["transcript_id", "protein_id"]
35 for test_key in possible_keys:
36 if quals.has_key(test_key):
37 cur_id = quals[test_key]
38 if isinstance(cur_id, tuple) or isinstance(cur_id, list):
39 return cur_id[0]
40 else:
41 return cur_id
42 return None
43
44 def update_quals(self, quals, has_children):
45 """Update a set of qualifiers, adding an ID if necessary.
46 """
47 cur_id = quals.get("ID", None)
48 # if we have an ID, record it
49 if cur_id:
50 if not isinstance(cur_id, list) and not isinstance(cur_id, tuple):
51 cur_id = [cur_id]
52 for add_id in cur_id:
53 self._seen_ids.append(add_id)
54 # if we need one and don't have it, create a new one
55 elif has_children:
56 new_id = self._generate_id(quals)
57 self._seen_ids.append(new_id)
58 quals["ID"] = [new_id]
59 return quals
60
61 class GFF3Writer:
62 """Write GFF3 files starting with standard Biopython objects.
63 """
64 def __init__(self):
65 pass
66
67 def write(self, recs, out_handle):
68 """Write the provided records to the given handle in GFF3 format.
69 """
70 id_handler = _IdHandler()
71 self._write_header(out_handle)
72 for rec in recs:
73 self._write_rec(rec, out_handle)
74 self._write_annotations(rec.annotations, rec.id, out_handle)
75 for sf in rec.features:
76 sf = self._clean_feature(sf)
77 id_handler = self._write_feature(sf, rec.id, out_handle,
78 id_handler)
79
80 def _clean_feature(self, feature):
81 quals = {}
82 for key, val in feature.qualifiers.items():
83 if not isinstance(val, (list, tuple)):
84 val = [val]
85 val = [str(x) for x in val]
86 quals[key] = val
87 feature.qualifiers = quals
88 clean_sub = [self._clean_feature(f) for f in feature.sub_features]
89 feature.sub_features = clean_sub
90 return feature
91
92 def _write_rec(self, rec, out_handle):
93 # if we have a SeqRecord, write out optional directive
94 if len(rec.seq) > 0:
95 out_handle.write("##sequence-region %s 1 %s\n" % (rec.id, len(rec.seq)))
96
97 def _write_feature(self, feature, rec_id, out_handle, id_handler,
98 parent_id=None):
99 """Write a feature with location information.
100 """
101 if feature.strand == 1:
102 strand = '+'
103 elif feature.strand == -1:
104 strand = '-'
105 else:
106 strand = '.'
107 # remove any standard features from the qualifiers
108 quals = feature.qualifiers.copy()
109 for std_qual in ["source", "score", "phase"]:
110 if quals.has_key(std_qual) and len(quals[std_qual]) == 1:
111 del quals[std_qual]
112 # add a link to a parent identifier if it exists
113 if parent_id:
114 if not quals.has_key("Parent"):
115 quals["Parent"] = []
116 quals["Parent"].append(parent_id)
117 quals = id_handler.update_quals(quals, len(feature.sub_features) > 0)
118 if feature.type:
119 ftype = feature.type
120 else:
121 ftype = "sequence_feature"
122 parts = [str(rec_id),
123 feature.qualifiers.get("source", ["feature"])[0],
124 ftype,
125 str(feature.location.nofuzzy_start + 1), # 1-based indexing
126 str(feature.location.nofuzzy_end),
127 feature.qualifiers.get("score", ["."])[0],
128 strand,
129 str(feature.qualifiers.get("phase", ["."])[0]),
130 self._format_keyvals(quals)]
131 out_handle.write("\t".join(parts) + "\n")
132 for sub_feature in feature.sub_features:
133 id_handler = self._write_feature(sub_feature, rec_id, out_handle,
134 id_handler, quals["ID"][0])
135 return id_handler
136
137 def _format_keyvals(self, keyvals):
138 format_kvs = []
139 for key, values in keyvals.items():
140 key = key.strip()
141 format_vals = []
142 if not isinstance(values, list) or isinstance(values, tuple):
143 values = [values]
144 for val in values:
145 val = urllib.quote(str(val).strip())
146 if ((key and val) and val not in format_vals):
147 format_vals.append(val)
148 format_kvs.append("%s=%s" % (key, ",".join(format_vals)))
149 return ";".join(format_kvs)
150
151 def _write_annotations(self, anns, rec_id, out_handle):
152 """Add annotations which refer to an entire sequence.
153 """
154 format_anns = self._format_keyvals(anns)
155 if format_anns:
156 parts = [rec_id, "annotation", "remark", ".", ".", ".", ".", ".",
157 format_anns]
158 out_handle.write("\t".join(parts) + "\n")
159
160 def _write_header(self, out_handle):
161 """Write out standard header directives.
162 """
163 out_handle.write("##gff-version 3\n")
164
165 def write(recs, out_handle):
166 """High level interface to write GFF3 files from SeqRecords and SeqFeatures.
167 """
168 writer = GFF3Writer()
169 return writer.write(recs, out_handle)