diff GFFOutput.py @ 17:4c201a3d4755 draft

planemo upload for repository https://github.com/usegalaxy-eu/temporary-tools/tree/master/jbrowse2 commit a37bfdfc108501b11c7b2aa15efb1bd16f0c4b66
author fubar
date Sun, 28 Jan 2024 06:48:52 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/GFFOutput.py	Sun Jan 28 06:48:52 2024 +0000
@@ -0,0 +1,213 @@
+"""Output Biopython SeqRecords and SeqFeatures to GFF3 format.
+
+The target format is GFF3, the current GFF standard:
+    http://www.sequenceontology.org/gff3.shtml
+"""
+from six.moves import urllib
+
+from Bio import SeqIO
+
+
+class _IdHandler:
+    """Generate IDs for GFF3 Parent/Child
+    relationships where they don't exist."""
+
+    def __init__(self):
+        self._prefix = "biopygen"
+        self._counter = 1
+        self._seen_ids = []
+
+    def _generate_id(self, quals):
+        """Generate a unique ID not present in our existing IDs."""
+        gen_id = self._get_standard_id(quals)
+        if gen_id is None:
+            while 1:
+                gen_id = "%s%s" % (self._prefix, self._counter)
+                if gen_id not in self._seen_ids:
+                    break
+                self._counter += 1
+        return gen_id
+
+    def _get_standard_id(self, quals):
+        """Retrieve standardized IDs from other sources like NCBI GenBank.
+
+        This tries to find IDs from known key/values when stored differently
+        than GFF3 specifications.
+        """
+        possible_keys = ["transcript_id", "protein_id"]
+        for test_key in possible_keys:
+            if test_key in quals:
+                cur_id = quals[test_key]
+                if isinstance(cur_id, tuple) or isinstance(cur_id, list):
+                    return cur_id[0]
+                else:
+                    return cur_id
+        return None
+
+    def update_quals(self, quals, has_children):
+        """Update a set of qualifiers, adding an ID if necessary."""
+        cur_id = quals.get("ID", None)
+        # if we have an ID, record it
+        if cur_id:
+            if not isinstance(cur_id, list) and not isinstance(cur_id, tuple):
+                cur_id = [cur_id]
+            for add_id in cur_id:
+                self._seen_ids.append(add_id)
+        # if we need one and don't have it, create a new one
+        elif has_children:
+            new_id = self._generate_id(quals)
+            self._seen_ids.append(new_id)
+            quals["ID"] = [new_id]
+        return quals
+
+
+class GFF3Writer:
+    """Write GFF3 files starting with standard Biopython objects."""
+
+    def __init__(self):
+        pass
+
+    def write(self, recs, out_handle, include_fasta=False):
+        """Write the provided records to the given handle in GFF3 format."""
+        id_handler = _IdHandler()
+        self._write_header(out_handle)
+        fasta_recs = []
+        try:
+            recs = iter(recs)
+        except TypeError:
+            recs = [recs]
+        for rec in recs:
+            self._write_rec(rec, out_handle)
+            self._write_annotations(rec.annotations, rec.id, len(rec.seq), out_handle)
+            for sf in rec.features:
+                sf = self._clean_feature(sf)
+                id_handler = self._write_feature(sf, rec.id, out_handle, id_handler)
+            if include_fasta and len(rec.seq) > 0:
+                fasta_recs.append(rec)
+        if len(fasta_recs) > 0:
+            self._write_fasta(fasta_recs, out_handle)
+
+    def _clean_feature(self, feature):
+        quals = {}
+        for key, val in feature.qualifiers.items():
+            if not isinstance(val, (list, tuple)):
+                val = [val]
+            val = [str(x) for x in val]
+            quals[key] = val
+        feature.qualifiers = quals
+        # Support for Biopython 1.68 and above, which removed sub_features
+        if not hasattr(feature, "sub_features"):
+            feature.sub_features = []
+        clean_sub = [self._clean_feature(f) for f in feature.sub_features]
+        feature.sub_features = clean_sub
+        return feature
+
+    def _write_rec(self, rec, out_handle):
+        # if we have a SeqRecord, write out optional directive
+        if len(rec.seq) > 0:
+            out_handle.write("##sequence-region %s 1 %s\n" % (rec.id, len(rec.seq)))
+
+    def _get_phase(self, feature):
+        if "phase" in feature.qualifiers:
+            phase = feature.qualifiers["phase"][0]
+        elif feature.type == "CDS":
+            phase = int(feature.qualifiers.get("codon_start", [1])[0]) - 1
+        else:
+            phase = "."
+        return str(phase)
+
+    def _write_feature(self, feature, rec_id, out_handle, id_handler, parent_id=None):
+        """Write a feature with location information."""
+        if feature.location.strand == 1:
+            strand = "+"
+        elif feature.location.strand == -1:
+            strand = "-"
+        else:
+            strand = "."
+        # remove any standard features from the qualifiers
+        quals = feature.qualifiers.copy()
+        for std_qual in ["source", "score", "phase"]:
+            if std_qual in quals and len(quals[std_qual]) == 1:
+                del quals[std_qual]
+        # add a link to a parent identifier if it exists
+        if parent_id:
+            if "Parent" not in quals:
+                quals["Parent"] = []
+            quals["Parent"].append(parent_id)
+        quals = id_handler.update_quals(quals, len(feature.sub_features) > 0)
+        if feature.type:
+            ftype = feature.type
+        else:
+            ftype = "sequence_feature"
+        parts = [
+            str(rec_id),
+            feature.qualifiers.get("source", ["feature"])[0],
+            ftype,
+            str(feature.location.start + 1),  # 1-based indexing
+            str(feature.location.end),
+            feature.qualifiers.get("score", ["."])[0],
+            strand,
+            self._get_phase(feature),
+            self._format_keyvals(quals),
+        ]
+        out_handle.write("\t".join(parts) + "\n")
+        for sub_feature in feature.sub_features:
+            id_handler = self._write_feature(
+                sub_feature,
+                rec_id,
+                out_handle,
+                id_handler,
+                quals["ID"][0],
+            )
+        return id_handler
+
+    def _format_keyvals(self, keyvals):
+        format_kvs = []
+        for key in sorted(keyvals.keys()):
+            values = keyvals[key]
+            key = key.strip()
+            format_vals = []
+            if not isinstance(values, list) or isinstance(values, tuple):
+                values = [values]
+            for val in values:
+                val = urllib.parse.quote(str(val).strip(), safe=":/ ")
+                if (key and val) and val not in format_vals:
+                    format_vals.append(val)
+            format_kvs.append("%s=%s" % (key, ",".join(format_vals)))
+        return ";".join(format_kvs)
+
+    def _write_annotations(self, anns, rec_id, size, out_handle):
+        """Add annotations which refer to an entire sequence."""
+        format_anns = self._format_keyvals(anns)
+        if format_anns:
+            parts = [
+                rec_id,
+                "annotation",
+                "remark",
+                "1",
+                str(size if size > 1 else 1),
+                ".",
+                ".",
+                ".",
+                format_anns,
+            ]
+            out_handle.write("\t".join(parts) + "\n")
+
+    def _write_header(self, out_handle):
+        """Write out standard header directives."""
+        out_handle.write("##gff-version 3\n")
+
+    def _write_fasta(self, recs, out_handle):
+        """Write sequence records using the ##FASTA directive."""
+        out_handle.write("##FASTA\n")
+        SeqIO.write(recs, out_handle, "fasta")
+
+
+def write(recs, out_handle, include_fasta=False):
+    """High level interface to write GFF3 files from SeqRecords and SeqFeatures.
+
+    If include_fasta is True, the GFF3 file will include sequence information
+    using the ##FASTA directive.
+    """
+    writer = GFF3Writer()
+    return writer.write(recs, out_handle, include_fasta)