Mercurial > repos > cpt > cpt_genome_editor
comparison genome_editor.py @ 3:134bb2d7cdfd draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:43:21 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
2:787ce84e8d16 | 3:134bb2d7cdfd |
---|---|
1 #!/usr/bin/env python | |
2 import logging | |
3 import copy | |
4 import argparse | |
5 import tsv | |
6 from Bio import SeqIO | |
7 from Bio.Seq import Seq | |
8 from Bio.SeqFeature import FeatureLocation | |
9 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature, convertSeqRec | |
10 from gff3 import feature_lambda, feature_test_contains | |
11 | |
12 logging.basicConfig(level=logging.INFO) | |
13 log = logging.getLogger(__name__) | |
14 | |
15 | |
16 def mutate(gff3, fasta, changes, customSeqs, new_id): | |
17 # Change Language | |
18 # - we can only accept ONE genome as an input. (TODO: support multiple?) | |
19 # - we can only build ONE genome as an output. (TODO: support multiple?) | |
20 # - must allow selection of various regions | |
21 # '1,1000,+ 40,100,- custom_seq_1' | |
22 try: | |
23 custom_seqs = SeqIO.to_dict(SeqIO.parse(customSeqs, "fasta")) | |
24 except: | |
25 custom_seqs = {} | |
26 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
27 # Pull first and onl record | |
28 rec = list(gffParse(gff3, base_dict=seq_dict))[0] | |
29 # Create a "clean" record | |
30 new_record = copy.deepcopy(rec) | |
31 new_record.id = new_id | |
32 new_record.seq = Seq("") | |
33 new_record.features = [] | |
34 new_record.annotations = {} | |
35 # Process changes. | |
36 chain = [] | |
37 topFeats = {} | |
38 covered = 0 | |
39 for feat in rec.features: | |
40 if "ID" in feat.qualifiers.keys(): | |
41 topFeats[feat.qualifiers["ID"][0]] = feat.location.start | |
42 for change in changes: | |
43 if "," in change: | |
44 (start, end, strand) = change.split(",") | |
45 start = int(start) - 1 | |
46 end = int(end) | |
47 | |
48 # Make any complaints | |
49 broken_feature_start = list( | |
50 feature_lambda( | |
51 rec.features, | |
52 feature_test_contains, | |
53 {"index": start}, | |
54 subfeatures=False, | |
55 ) | |
56 ) | |
57 if len(broken_feature_start) > 0: | |
58 pass | |
59 # log.info("DANGER: Start index chosen (%s) is in the middle of a feature (%s %s). This feature will disappear from the output", start, broken_feature_start[0].id, broken_feature_start[0].location) | |
60 broken_feature_end = list( | |
61 feature_lambda( | |
62 rec.features, | |
63 feature_test_contains, | |
64 {"index": end}, | |
65 subfeatures=False, | |
66 ) | |
67 ) | |
68 if len(broken_feature_end) > 0: | |
69 pass | |
70 # log.info("DANGER: End index chosen (%s) is in the middle of a feature (%s %s). This feature will disappear from the output", end, broken_feature_end[0].id, broken_feature_end[0].location) | |
71 | |
72 # Ok, fetch features | |
73 if strand == "+": | |
74 tmp_req = rec[start:end] | |
75 else: | |
76 tmp_req = rec[start:end].reverse_complement( | |
77 id=True, | |
78 name=True, | |
79 description=True, | |
80 features=True, | |
81 annotations=True, | |
82 letter_annotations=True, | |
83 dbxrefs=True, | |
84 ) | |
85 tmp_req = convertSeqRec(tmp_req)[0] | |
86 | |
87 def update_location(feature, shiftS): | |
88 feature.location = FeatureLocation( | |
89 feature.location.start + shiftS, | |
90 feature.location.end + shiftS, | |
91 feature.strand, | |
92 ) | |
93 for i in feature.sub_features: | |
94 i = update_location(i, shiftS) | |
95 return feature | |
96 | |
97 # for feature in tmp_req.features: | |
98 | |
99 chain.append( | |
100 [ | |
101 rec.id, | |
102 start + 1, | |
103 end, | |
104 strand, | |
105 new_record.id, | |
106 len(new_record) + 1, | |
107 len(new_record) + (end - start), | |
108 "+", | |
109 ] | |
110 ) | |
111 | |
112 covered += len(new_record.seq) | |
113 print(covered) | |
114 new_record.seq += tmp_req.seq | |
115 # NB: THIS MUST USE BIOPYTHON 1.67. 1.68 Removes access to | |
116 # subfeatures, which means you will only get top-level features. | |
117 startInd = len(new_record.features) | |
118 new_record.features += tmp_req.features | |
119 | |
120 for i in new_record.features[startInd:]: | |
121 i.location = FeatureLocation( | |
122 i.location.start + covered, | |
123 i.location.end + covered, | |
124 i.location.strand, | |
125 ) | |
126 if "ID" not in i.qualifiers.keys(): | |
127 continue | |
128 diffS = i.location.start - topFeats[i.qualifiers["ID"][0]] | |
129 subFeats = i.sub_features | |
130 for j in subFeats: | |
131 j = update_location(j, diffS) | |
132 else: | |
133 new_record.seq += custom_seqs[change].seq | |
134 yield new_record, chain | |
135 | |
136 | |
137 if __name__ == "__main__": | |
138 parser = argparse.ArgumentParser() | |
139 parser.add_argument("fasta", type=argparse.FileType("r"), help="Sequence") | |
140 parser.add_argument("gff3", type=argparse.FileType("r"), help="Annotations") | |
141 parser.add_argument("new_id", help="Append to ID", default="_v2") | |
142 parser.add_argument( | |
143 "--out_fasta", | |
144 type=argparse.FileType("w"), | |
145 help="Output fasta", | |
146 default="out.fa", | |
147 ) | |
148 parser.add_argument( | |
149 "--out_gff3", | |
150 type=argparse.FileType("w"), | |
151 help="Output gff3", | |
152 default="out.gff3", | |
153 ) | |
154 parser.add_argument( | |
155 "--out_simpleChain", | |
156 type=argparse.FileType("w"), | |
157 help="Output simple chain (i.e. not a real UCSC chain file)", | |
158 default="out.chain", | |
159 ) | |
160 parser.add_argument("--changes", nargs="+") | |
161 parser.add_argument("--customSeqs", type=argparse.FileType("r")) | |
162 args = parser.parse_args() | |
163 | |
164 for rec, chain in mutate( | |
165 args.gff3, args.fasta, args.changes, args.customSeqs, args.new_id | |
166 ): | |
167 # TODO: Check that this appends and doesn't overwirte | |
168 gffWrite([rec], args.out_gff3) | |
169 SeqIO.write([rec], args.out_fasta, "fasta") | |
170 tsv.dump(chain, args.out_simpleChain) |