diff genome_editor.py @ 3:134bb2d7cdfd draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:43:21 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/genome_editor.py	Mon Jun 05 02:43:21 2023 +0000
@@ -0,0 +1,170 @@
+#!/usr/bin/env python
+import logging
+import copy
+import argparse
+import tsv
+from Bio import SeqIO
+from Bio.Seq import Seq
+from Bio.SeqFeature import FeatureLocation
+from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature, convertSeqRec
+from gff3 import feature_lambda, feature_test_contains
+
+logging.basicConfig(level=logging.INFO)
+log = logging.getLogger(__name__)
+
+
+def mutate(gff3, fasta, changes, customSeqs, new_id):
+    # Change Language
+    # - we can only accept ONE genome as an input. (TODO: support multiple?)
+    # - we can only build ONE genome as an output. (TODO: support multiple?)
+    # - must allow selection of various regions
+    # '1,1000,+   40,100,-    custom_seq_1'
+    try:
+        custom_seqs = SeqIO.to_dict(SeqIO.parse(customSeqs, "fasta"))
+    except:
+        custom_seqs = {}
+    seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
+    # Pull first and onl record
+    rec = list(gffParse(gff3, base_dict=seq_dict))[0]
+    # Create a "clean" record
+    new_record = copy.deepcopy(rec)
+    new_record.id = new_id
+    new_record.seq = Seq("")
+    new_record.features = []
+    new_record.annotations = {}
+    # Process changes.
+    chain = []
+    topFeats = {}
+    covered = 0
+    for feat in rec.features:
+        if "ID" in feat.qualifiers.keys():
+            topFeats[feat.qualifiers["ID"][0]] = feat.location.start
+    for change in changes:
+        if "," in change:
+            (start, end, strand) = change.split(",")
+            start = int(start) - 1
+            end = int(end)
+
+            # Make any complaints
+            broken_feature_start = list(
+                feature_lambda(
+                    rec.features,
+                    feature_test_contains,
+                    {"index": start},
+                    subfeatures=False,
+                )
+            )
+            if len(broken_feature_start) > 0:
+                pass
+                # log.info("DANGER: Start index chosen (%s) is in the middle of a feature (%s %s). This feature will disappear from the output", start, broken_feature_start[0].id, broken_feature_start[0].location)
+            broken_feature_end = list(
+                feature_lambda(
+                    rec.features,
+                    feature_test_contains,
+                    {"index": end},
+                    subfeatures=False,
+                )
+            )
+            if len(broken_feature_end) > 0:
+                pass
+                # log.info("DANGER: End index chosen (%s) is in the middle of a feature (%s %s). This feature will disappear from the output", end, broken_feature_end[0].id, broken_feature_end[0].location)
+
+            # Ok, fetch features
+            if strand == "+":
+                tmp_req = rec[start:end]
+            else:
+                tmp_req = rec[start:end].reverse_complement(
+                    id=True,
+                    name=True,
+                    description=True,
+                    features=True,
+                    annotations=True,
+                    letter_annotations=True,
+                    dbxrefs=True,
+                )
+            tmp_req = convertSeqRec(tmp_req)[0]
+
+            def update_location(feature, shiftS):
+                feature.location = FeatureLocation(
+                    feature.location.start + shiftS,
+                    feature.location.end + shiftS,
+                    feature.strand,
+                )
+                for i in feature.sub_features:
+                    i = update_location(i, shiftS)
+                return feature
+
+            # for feature in tmp_req.features:
+
+            chain.append(
+                [
+                    rec.id,
+                    start + 1,
+                    end,
+                    strand,
+                    new_record.id,
+                    len(new_record) + 1,
+                    len(new_record) + (end - start),
+                    "+",
+                ]
+            )
+
+            covered += len(new_record.seq)
+            print(covered)
+            new_record.seq += tmp_req.seq
+            # NB: THIS MUST USE BIOPYTHON 1.67. 1.68 Removes access to
+            # subfeatures, which means you will only get top-level features.
+            startInd = len(new_record.features)
+            new_record.features += tmp_req.features
+
+            for i in new_record.features[startInd:]:
+                i.location = FeatureLocation(
+                    i.location.start + covered,
+                    i.location.end + covered,
+                    i.location.strand,
+                )
+                if "ID" not in i.qualifiers.keys():
+                    continue
+                diffS = i.location.start - topFeats[i.qualifiers["ID"][0]]
+                subFeats = i.sub_features
+                for j in subFeats:
+                    j = update_location(j, diffS)
+        else:
+            new_record.seq += custom_seqs[change].seq
+    yield new_record, chain
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()
+    parser.add_argument("fasta", type=argparse.FileType("r"), help="Sequence")
+    parser.add_argument("gff3", type=argparse.FileType("r"), help="Annotations")
+    parser.add_argument("new_id", help="Append to ID", default="_v2")
+    parser.add_argument(
+        "--out_fasta",
+        type=argparse.FileType("w"),
+        help="Output fasta",
+        default="out.fa",
+    )
+    parser.add_argument(
+        "--out_gff3",
+        type=argparse.FileType("w"),
+        help="Output gff3",
+        default="out.gff3",
+    )
+    parser.add_argument(
+        "--out_simpleChain",
+        type=argparse.FileType("w"),
+        help="Output simple chain (i.e. not a real UCSC chain file)",
+        default="out.chain",
+    )
+    parser.add_argument("--changes", nargs="+")
+    parser.add_argument("--customSeqs", type=argparse.FileType("r"))
+    args = parser.parse_args()
+
+    for rec, chain in mutate(
+        args.gff3, args.fasta, args.changes, args.customSeqs, args.new_id
+    ):
+        # TODO: Check that this appends and doesn't overwirte
+        gffWrite([rec], args.out_gff3)
+        SeqIO.write([rec], args.out_fasta, "fasta")
+        tsv.dump(chain, args.out_simpleChain)