Mercurial > repos > cpt > cpt_convert_mga
view cpt_convert_mga_to_gff3.py @ 16:2c83f55438c9 draft default tip
planemo upload commit f33bdf952d796c5d7a240b132af3c4cbd102decc
author | cpt |
---|---|
date | Fri, 05 Jan 2024 05:50:13 +0000 |
parents | 4100ee965124 |
children |
line wrap: on
line source
#!/usr/bin/env python import sys import argparse from Bio import SeqIO from Bio.SeqFeature import SeqFeature from Bio.SeqFeature import FeatureLocation from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature import logging logging.basicConfig(level=logging.INFO) def mga_to_gff3(mga_output, genome): seq_dict = SeqIO.to_dict(SeqIO.parse(genome, "fasta")) current_record = None for line in mga_output: if line.startswith("#"): if line.startswith("# gc = ") or line.startswith("# self:"): continue chromId = line.strip().replace("# ", "") if " " in chromId: chromId = chromId[0 : chromId.index(" ")] if chromId in seq_dict: if current_record is not None: yield current_record current_record = seq_dict[chromId] else: raise Exception( "Found results for sequence %s which was not in fasta file sequences (%s)" % (chromId, ", ".join(seq_dict.keys())) ) else: ( gene_id, start, end, strand, phase, complete, score, model, rbs_start, rbs_end, rbs_score, ) = line.strip().split("\t") start = int(start) end = int(end) strand = +1 if strand == "+" else -1 # Correct for gff3 start -= 1 rbs_feat = None if rbs_start != "-": rbs_start = int(rbs_start) rbs_end = int(rbs_end) rbs_feat = gffSeqFeature( FeatureLocation(rbs_start, rbs_end), type="Shine_Dalgarno_sequence", strand=strand, qualifiers={ "ID": "%s.rbs_%s" % (current_record.id, gene_id), "Source": "MGA", }, phase=phase, source="MGA", ) cds_feat = gffSeqFeature( FeatureLocation(start, end), type="CDS", strand=strand, qualifiers={ "Source": "MGA", "ID": "%s.cds_%s" % (current_record.id, gene_id), }, phase=phase, source="MGA", ) if rbs_feat is not None: if strand > 0: gene_start = rbs_start gene_end = end else: gene_start = start gene_end = rbs_end else: gene_start = start gene_end = end gene = gffSeqFeature( FeatureLocation(gene_start, gene_end), type="gene", strand=strand, id="%s.%s" % (current_record.id, gene_id), qualifiers={ "Source": "MGA", "ID": "%s.%s" % (current_record.id, gene_id), }, phase=phase, source="MGA", ) gene.sub_features = [cds_feat] if rbs_feat is not None: gene.sub_features.append(rbs_feat) current_record.features.append(gene) yield current_record if __name__ == "__main__": parser = argparse.ArgumentParser(description="Convert MGA to GFF3", epilog="") parser.add_argument( "mga_output", type=argparse.FileType("r"), help="MetaGeneAnnotator Output" ) parser.add_argument("genome", type=argparse.FileType("r"), help="Fasta Genome") args = parser.parse_args() for result in mga_to_gff3(**vars(args)): gffWrite([result], sys.stdout)