Mercurial > repos > cpt > cpt_gff_extract_seq
diff gff3_extract_sequence.py @ 4:34b80e483fb8 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:43:58 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gff3_extract_sequence.py Mon Jun 05 02:43:58 2023 +0000 @@ -0,0 +1,335 @@ +#!/usr/bin/env python +import sys +import argparse +import logging +import uuid +from CPT_GFFParser import gffParse, gffWrite +from Bio import SeqIO +from Bio.Seq import Seq +from Bio.SeqRecord import SeqRecord +from Bio.SeqFeature import FeatureLocation, CompoundLocation +from gff3 import feature_lambda, feature_test_type, get_id + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + + +def main(fasta, gff3, feature_filter=None, nodesc=False): + if feature_filter == "nice_cds": + from gff2gb import gff3_to_genbank as cpt_Gff2Gbk + + for rec in cpt_Gff2Gbk(gff3, fasta, 11): + seenList = {} + if rec.seq[0] == "?": + sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") + exit(1) + for feat in sorted(rec.features, key=lambda x: x.location.start): + if feat.type != "CDS": + continue + + ind = 0 + if ( + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-") + ) + in seenList.keys() + ): + seenList[ + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + ] += 1 + ind = seenList[ + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + ] + else: + seenList[ + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + ] = 1 + append = "" + if ind != 0: + append = "_" + str(ind) + + if nodesc: + description = "" + else: + feat.qualifiers["ID"] = [feat._ID] + product = feat.qualifiers.get("product", "") + description = ( + "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format( + feat, product + ) + ) + yield [ + SeqRecord( + feat.extract(rec).seq, + id=str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + + append, + description=description, + ) + ] + + elif feature_filter == "unique_cds": + seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) + seen_ids = {} + + for rec in gffParse(gff3, base_dict=seq_dict): + noMatch = True + if "Alias" in rec.features[0].qualifiers.keys(): + lColumn = rec.features[0].qualifiers["Alias"][0] + else: + lColumn = "" + for x in seq_dict: + if x == rec.id or x == lColumn: + noMatch = False + if noMatch: + sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") + exit(1) + newfeats = [] + for feat in sorted( + feature_lambda( + rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False + ), + key=lambda f: f.location.start, + ): + nid = rec.id + "____" + feat.id + if nid in seen_ids: + nid = nid + "__" + uuid.uuid4().hex + feat.qualifiers["ID"] = [nid] + newfeats.append(feat) + seen_ids[nid] = True + + if nodesc: + description = "" + else: + if feat.strand == -1: + important_data = { + "Location": FeatureLocation( + feat.location.start + 1, + feat.location.end - feat.phase, + feat.strand, + ) + } + else: + important_data = { + "Location": FeatureLocation( + feat.location.start + 1 + feat.phase, + feat.location.end, + feat.strand, + ) + } + if "Name" in feat.qualifiers: + important_data["Name"] = feat.qualifiers.get("Name", [""])[0] + + description = "[{}]".format( + ";".join( + [ + "{key}={value}".format(key=k, value=v) + for (k, v) in important_data.items() + ] + ) + ) + # if feat.id == "CPT_Privateer_006.p01": + # print(feat) + # exit() + + if isinstance(feat.location, CompoundLocation): + finSeq = "" + if feat.strand == -1: + for x in feat.location.parts: + finSeq += str( + ( + rec.seq[ + feat.location.start : feat.location.end + - feat.phase + ] + ).reverse_complement() + ) + else: + for x in feat.location.parts: + finSeq += str( + rec.seq[ + feat.location.start + feat.phase : feat.location.end + ] + ) + yield [ + SeqRecord( + finSeq, + id=nid.replace(" ", "-"), + description=description, + ) + ] + elif feat.strand == -1: + yield [ + SeqRecord( + ( + rec.seq[ + feat.location.start : feat.location.end - feat.phase + ] + ).reverse_complement(), + id=nid.replace(" ", "-"), + description=description, + ) + ] + else: + yield [ + SeqRecord( + # feat.extract(rec).seq, + rec.seq[ + feat.location.start + feat.phase : feat.location.end + ], + id=nid.replace(" ", "-"), + description=description, + ) + ] + rec.features = newfeats + rec.annotations = {} + # gffWrite([rec], sys.stdout) + else: + seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) + + for rec in gffParse(gff3, base_dict=seq_dict): + noMatch = True + if "Alias" in rec.features[0].qualifiers.keys(): + lColumn = rec.features[0].qualifiers["Alias"][0] + else: + lColumn = "" + for x in seq_dict: + if x == rec.id or x == lColumn: + noMatch = False + if noMatch: + sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") + exit(1) + for feat in sorted( + feature_lambda( + rec.features, + feature_test_type, + {"type": feature_filter}, + subfeatures=True, + ), + key=lambda f: f.location.start, + ): + id = feat.id + if len(id) == 0: + id = get_id(feat) + + if nodesc: + description = "" + else: + if feat.strand == -1: + important_data = { + "Location": FeatureLocation( + feat.location.start + 1, + feat.location.end - feat.phase, + feat.strand, + ) + } + else: + important_data = { + "Location": FeatureLocation( + feat.location.start + 1 + feat.phase, + feat.location.end, + feat.strand, + ) + } + if "Name" in feat.qualifiers: + important_data["Name"] = feat.qualifiers.get("Name", [""])[0] + + description = "[{}]".format( + ";".join( + [ + "{key}={value}".format(key=k, value=v) + for (k, v) in important_data.items() + ] + ) + ) + + if isinstance(feat.location, CompoundLocation): + finSeq = "" + if feat.strand == -1: + for x in feat.location.parts: + finSeq += str( + ( + rec.seq[x.start : x.end - feat.phase] + ).reverse_complement() + ) + else: + for x in feat.location.parts: + finSeq += str(rec.seq[x.start + feat.phase : x.end]) + yield [ + SeqRecord( + Seq(finSeq), + id=id.replace(" ", "-"), + description=description, + ) + ] + + else: + + if feat.strand == -1: + yield [ + SeqRecord( + seq=Seq( + str( + rec.seq[ + feat.location.start : feat.location.end + - feat.phase + ] + ) + ).reverse_complement(), + id=id.replace(" ", "-"), + description=description, + ) + ] + else: + yield [ + SeqRecord( + # feat.extract(rec).seq, + seq=Seq( + str( + rec.seq[ + feat.location.start + + feat.phase : feat.location.end + ] + ) + ), + id=id.replace(" ", "-"), + description=description, + ) + ] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Export corresponding sequence in genome from GFF3", epilog="" + ) + parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") + parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File") + parser.add_argument( + "--feature_filter", default=None, help="Filter for specific feature types" + ) + parser.add_argument( + "--nodesc", action="store_true", help="Strip description field off" + ) + args = parser.parse_args() + for seq in main(**vars(args)): + # if isinstance(seq, list): + # for x in seq: + # print(type(x.seq)) + # SeqIO.write(x, sys.stdout, "fasta") + # else: + SeqIO.write(seq, sys.stdout, "fasta")