diff cpt_convert_mga_to_gff3.py @ 10:4100ee965124 draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:40:41 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cpt_convert_mga_to_gff3.py	Mon Jun 05 02:40:41 2023 +0000
@@ -0,0 +1,125 @@
+#!/usr/bin/env python
+import sys
+import argparse
+from Bio import SeqIO
+from Bio.SeqFeature import SeqFeature
+from Bio.SeqFeature import FeatureLocation
+from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature
+import logging
+
+logging.basicConfig(level=logging.INFO)
+
+
+def mga_to_gff3(mga_output, genome):
+    seq_dict = SeqIO.to_dict(SeqIO.parse(genome, "fasta"))
+
+    current_record = None
+    for line in mga_output:
+        if line.startswith("#"):
+            if line.startswith("# gc = ") or line.startswith("# self:"):
+                continue
+            chromId = line.strip().replace("# ", "")
+
+            if " " in chromId:
+                chromId = chromId[0 : chromId.index(" ")]
+
+            if chromId in seq_dict:
+                if current_record is not None:
+                    yield current_record
+                current_record = seq_dict[chromId]
+            else:
+                raise Exception(
+                    "Found results for sequence %s which was not in fasta file sequences (%s)"
+                    % (chromId, ", ".join(seq_dict.keys()))
+                )
+
+        else:
+            (
+                gene_id,
+                start,
+                end,
+                strand,
+                phase,
+                complete,
+                score,
+                model,
+                rbs_start,
+                rbs_end,
+                rbs_score,
+            ) = line.strip().split("\t")
+            start = int(start)
+            end = int(end)
+            strand = +1 if strand == "+" else -1
+
+            # Correct for gff3
+            start -= 1
+
+            rbs_feat = None
+            if rbs_start != "-":
+                rbs_start = int(rbs_start)
+                rbs_end = int(rbs_end)
+                rbs_feat = gffSeqFeature(
+                    FeatureLocation(rbs_start, rbs_end),
+                    type="Shine_Dalgarno_sequence",
+                    strand=strand,
+                    qualifiers={
+                        "ID": "%s.rbs_%s" % (current_record.id, gene_id),
+                        "Source": "MGA",
+                    },
+                    phase=phase,
+                    source="MGA",
+                )
+
+            cds_feat = gffSeqFeature(
+                FeatureLocation(start, end),
+                type="CDS",
+                strand=strand,
+                qualifiers={
+                    "Source": "MGA",
+                    "ID": "%s.cds_%s" % (current_record.id, gene_id),
+                },
+                phase=phase,
+                source="MGA",
+            )
+
+            if rbs_feat is not None:
+                if strand > 0:
+                    gene_start = rbs_start
+                    gene_end = end
+                else:
+                    gene_start = start
+                    gene_end = rbs_end
+            else:
+                gene_start = start
+                gene_end = end
+
+            gene = gffSeqFeature(
+                FeatureLocation(gene_start, gene_end),
+                type="gene",
+                strand=strand,
+                id="%s.%s" % (current_record.id, gene_id),
+                qualifiers={
+                    "Source": "MGA",
+                    "ID": "%s.%s" % (current_record.id, gene_id),
+                },
+                phase=phase,
+                source="MGA",
+            )
+
+            gene.sub_features = [cds_feat]
+            if rbs_feat is not None:
+                gene.sub_features.append(rbs_feat)
+            current_record.features.append(gene)
+    yield current_record
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description="Convert MGA to GFF3", epilog="")
+    parser.add_argument(
+        "mga_output", type=argparse.FileType("r"), help="MetaGeneAnnotator Output"
+    )
+    parser.add_argument("genome", type=argparse.FileType("r"), help="Fasta Genome")
+    args = parser.parse_args()
+
+    for result in mga_to_gff3(**vars(args)):
+        gffWrite([result], sys.stdout)