view gff3_extract_sequence.py @ 5:ab40380424c9 draft

planemo upload commit 852ac96ca53a2ffa0947e6df5e24671866b642f5
author cpt
date Sun, 23 Jul 2023 01:42:19 +0000
parents 34b80e483fb8
children
line wrap: on
line source

#!/usr/bin/env python
import sys
import argparse
import logging
import uuid
from CPT_GFFParser import gffParse, gffWrite
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.SeqFeature import FeatureLocation, CompoundLocation
from gff3 import feature_lambda, feature_test_type, get_id

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)


def main(fasta, gff3, feature_filter=None, nodesc=False):
    if feature_filter == "nice_cds":
        from gff2gb import gff3_to_genbank as cpt_Gff2Gbk

        for rec in cpt_Gff2Gbk(gff3, fasta, 11):
            seenList = {}
            if rec.seq[0] == "?":
                sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
                exit(1)
            for feat in sorted(rec.features, key=lambda x: x.location.start):
                if feat.type != "CDS":
                    continue

                ind = 0
                if (
                    str(
                        feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-")
                    )
                    in seenList.keys()
                ):
                    seenList[
                        str(
                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
                                " ", "-"
                            )
                        )
                    ] += 1
                    ind = seenList[
                        str(
                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
                                " ", "-"
                            )
                        )
                    ]
                else:
                    seenList[
                        str(
                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
                                " ", "-"
                            )
                        )
                    ] = 1
                append = ""
                if ind != 0:
                    append = "_" + str(ind)

                if nodesc:
                    description = ""
                else:
                    feat.qualifiers["ID"] = [feat._ID]
                    product = feat.qualifiers.get("product", "")
                    description = (
                        "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format(
                            feat, product
                        )
                    )
                yield [
                    SeqRecord(
                        feat.extract(rec).seq,
                        id=str(
                            feat.qualifiers.get("locus_tag", get_id(feat)).replace(
                                " ", "-"
                            )
                        )
                        + append,
                        description=description,
                    )
                ]

    elif feature_filter == "unique_cds":
        seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
        seen_ids = {}

        for rec in gffParse(gff3, base_dict=seq_dict):
            noMatch = True
            if "Alias" in rec.features[0].qualifiers.keys():
                lColumn = rec.features[0].qualifiers["Alias"][0]
            else:
                lColumn = ""
            for x in seq_dict:
                if x == rec.id or x == lColumn:
                    noMatch = False
            if noMatch:
                sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
                exit(1)
            newfeats = []
            for feat in sorted(
                feature_lambda(
                    rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False
                ),
                key=lambda f: f.location.start,
            ):
                nid = rec.id + "____" + feat.id
                if nid in seen_ids:
                    nid = nid + "__" + uuid.uuid4().hex
                feat.qualifiers["ID"] = [nid]
                newfeats.append(feat)
                seen_ids[nid] = True

                if nodesc:
                    description = ""
                else:
                    if feat.strand == -1:
                        important_data = {
                            "Location": FeatureLocation(
                                feat.location.start + 1,
                                feat.location.end - feat.phase,
                                feat.strand,
                            )
                        }
                    else:
                        important_data = {
                            "Location": FeatureLocation(
                                feat.location.start + 1 + feat.phase,
                                feat.location.end,
                                feat.strand,
                            )
                        }
                    if "Name" in feat.qualifiers:
                        important_data["Name"] = feat.qualifiers.get("Name", [""])[0]

                    description = "[{}]".format(
                        ";".join(
                            [
                                "{key}={value}".format(key=k, value=v)
                                for (k, v) in important_data.items()
                            ]
                        )
                    )
                # if feat.id == "CPT_Privateer_006.p01":
                # print(feat)
                # exit()

                if isinstance(feat.location, CompoundLocation):
                    finSeq = ""
                    if feat.strand == -1:
                        for x in feat.location.parts:
                            finSeq += str(
                                (
                                    rec.seq[
                                        feat.location.start : feat.location.end
                                        - feat.phase
                                    ]
                                ).reverse_complement()
                            )
                    else:
                        for x in feat.location.parts:
                            finSeq += str(
                                rec.seq[
                                    feat.location.start + feat.phase : feat.location.end
                                ]
                            )
                    yield [
                        SeqRecord(
                            finSeq,
                            id=nid.replace(" ", "-"),
                            description=description,
                        )
                    ]
                elif feat.strand == -1:
                    yield [
                        SeqRecord(
                            (
                                rec.seq[
                                    feat.location.start : feat.location.end - feat.phase
                                ]
                            ).reverse_complement(),
                            id=nid.replace(" ", "-"),
                            description=description,
                        )
                    ]
                else:
                    yield [
                        SeqRecord(
                            # feat.extract(rec).seq,
                            rec.seq[
                                feat.location.start + feat.phase : feat.location.end
                            ],
                            id=nid.replace(" ", "-"),
                            description=description,
                        )
                    ]
            rec.features = newfeats
            rec.annotations = {}
            # gffWrite([rec], sys.stdout)
    else:
        seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))

        for rec in gffParse(gff3, base_dict=seq_dict):
            noMatch = True
            if "Alias" in rec.features[0].qualifiers.keys():
                lColumn = rec.features[0].qualifiers["Alias"][0]
            else:
                lColumn = ""
            for x in seq_dict:
                if x == rec.id or x == lColumn:
                    noMatch = False
            if noMatch:
                sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
                exit(1)
            for feat in sorted(
                feature_lambda(
                    rec.features,
                    feature_test_type,
                    {"type": feature_filter},
                    subfeatures=True,
                ),
                key=lambda f: f.location.start,
            ):
                id = feat.id
                if len(id) == 0:
                    id = get_id(feat)

                if nodesc:
                    description = ""
                else:
                    if feat.strand == -1:
                        important_data = {
                            "Location": FeatureLocation(
                                feat.location.start + 1,
                                feat.location.end - feat.phase,
                                feat.strand,
                            )
                        }
                    else:
                        important_data = {
                            "Location": FeatureLocation(
                                feat.location.start + 1 + feat.phase,
                                feat.location.end,
                                feat.strand,
                            )
                        }
                    if "Name" in feat.qualifiers:
                        important_data["Name"] = feat.qualifiers.get("Name", [""])[0]

                    description = "[{}]".format(
                        ";".join(
                            [
                                "{key}={value}".format(key=k, value=v)
                                for (k, v) in important_data.items()
                            ]
                        )
                    )

                if isinstance(feat.location, CompoundLocation):
                    finSeq = ""
                    if feat.strand == -1:
                        for x in feat.location.parts:
                            finSeq += str(
                                (
                                    rec.seq[x.start : x.end - feat.phase]
                                ).reverse_complement()
                            )
                    else:
                        for x in feat.location.parts:
                            finSeq += str(rec.seq[x.start + feat.phase : x.end])
                    yield [
                        SeqRecord(
                            Seq(finSeq),
                            id=id.replace(" ", "-"),
                            description=description,
                        )
                    ]

                else:

                    if feat.strand == -1:
                        yield [
                            SeqRecord(
                                seq=Seq(
                                    str(
                                        rec.seq[
                                            feat.location.start : feat.location.end
                                            - feat.phase
                                        ]
                                    )
                                ).reverse_complement(),
                                id=id.replace(" ", "-"),
                                description=description,
                            )
                        ]
                    else:
                        yield [
                            SeqRecord(
                                # feat.extract(rec).seq,
                                seq=Seq(
                                    str(
                                        rec.seq[
                                            feat.location.start
                                            + feat.phase : feat.location.end
                                        ]
                                    )
                                ),
                                id=id.replace(" ", "-"),
                                description=description,
                            )
                        ]


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Export corresponding sequence in genome from GFF3", epilog=""
    )
    parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome")
    parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File")
    parser.add_argument(
        "--feature_filter", default=None, help="Filter for specific feature types"
    )
    parser.add_argument(
        "--nodesc", action="store_true", help="Strip description field off"
    )
    args = parser.parse_args()
    for seq in main(**vars(args)):
        # if isinstance(seq, list):
        #  for x in seq:
        #    print(type(x.seq))
        #    SeqIO.write(x, sys.stdout, "fasta")
        # else:
        SeqIO.write(seq, sys.stdout, "fasta")