Mercurial > repos > cpt > cpt_bprom_converter
comparison bprom_gff3_converter.py @ 0:14e64dda68c8 draft default tip
planemo upload commit 852ac96ca53a2ffa0947e6df5e24671866b642f5
author | cpt |
---|---|
date | Sun, 23 Jul 2023 01:47:35 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:14e64dda68c8 |
---|---|
1 import argparse | |
2 | |
3 import pandas as pd | |
4 import re | |
5 from typing import List, Match, Dict, TextIO, Union | |
6 from datetime import date | |
7 | |
8 # In this file, a "feature" refers to the collection of data between the > keys of the bprom output. | |
9 # That collection of data refers to one section of the DNA upstream of a gene | |
10 | |
11 | |
12 def read_bprom_file(bprom_file) -> List[str]: | |
13 """Reads in file, creating a list of strings with each list element containing a line from the file""" | |
14 contents = [] | |
15 | |
16 with open(bprom_file) as file: | |
17 for line in file: | |
18 contents.append(line) | |
19 | |
20 return contents | |
21 | |
22 | |
23 def concatenate_then_split(contents) -> List[str]: | |
24 """Concatenates the file into one large string. | |
25 Then splits it on '>' so that each feature's data is together in one element""" | |
26 # Concatenates the entire file into one large string | |
27 concat_contents = "".join(contents) | |
28 | |
29 # Removing the empty string '' at element 0 used to make the join | |
30 concat_contents = concat_contents[1:] | |
31 | |
32 # Splits the file into a list of strings on ">" | |
33 features = concat_contents.split(">") | |
34 | |
35 return features | |
36 | |
37 | |
38 def remove_promoterless_features(features) -> List[str]: | |
39 """For each concatenated feature string passed, removes the element | |
40 if the # of predicted promoters is 0.""" | |
41 cleaned_features = features | |
42 indices_to_delete = [] | |
43 for i, feature in enumerate(cleaned_features): | |
44 if "Number of predicted promoters - 0" in cleaned_features[i]: | |
45 indices_to_delete.append(i) | |
46 # Must delete in reverse order, otherwise it changes the list indices after | |
47 # the element deleted, and you delete subsequent elements at i+1, i+2, i+3, etc | |
48 for i in sorted(indices_to_delete, reverse=True): | |
49 del cleaned_features[i] | |
50 | |
51 return cleaned_features | |
52 | |
53 | |
54 def extract_accession(feature) -> str: | |
55 """Extract accession""" | |
56 accession = re.search("[\w](.*)(?=_)", feature) | |
57 accession = accession.group().replace("_", "").strip() | |
58 | |
59 return accession | |
60 | |
61 | |
62 def extract_test_seq_position(feature) -> List[str]: | |
63 """Extract position in genome. Gets any number of values '(.*)' between the brackets | |
64 using 'lookbehind/lookright' (?<=PATTERN) and 'lookahead/lookleft' regex assertions | |
65 to extract (?<=Location=\\[)(.*)(?=]\\()""" | |
66 location = re.search("(?<=Location=\\[)(.*)(?=]\\()", feature) | |
67 location = location.group().split(":") | |
68 | |
69 return location | |
70 | |
71 | |
72 def extract_strand_direction(feature) -> str: | |
73 """Extract strand direction for a feature, - or +""" | |
74 # Matches for '(.)' | |
75 direction = re.search("(?<=\\().(?=\\))", feature) | |
76 direction = direction.group() | |
77 | |
78 return direction | |
79 | |
80 | |
81 def extract_promoter_data(feature) -> Dict[str, str]: | |
82 """Extracts all promoter data using regular expressions. | |
83 Use for one element in the output of concatenate_then_split()""" | |
84 # Extract promoter -10 and -35 sequences and scores | |
85 # Gets everything between "-xx box at pos." and " Score" | |
86 minus10 = re.search("(?<=-10 box at pos.)(.*)(?= Score)(.*)", feature) | |
87 minus35 = re.search("(?<=-35 box at pos.)(.*)(?= Score)(.*)", feature) | |
88 | |
89 # Extracts the match and removes leading and trailing whitespace (which can be variable) | |
90 # (the bprom output does not maintain the same # of whitespace characters | |
91 # if there are less digits, at least for the scoring) | |
92 minus10 = minus10.group().lstrip().split(" ") | |
93 minus10_pos = int(minus10[0]) | |
94 minus10_seq = minus10[1] | |
95 minus10_score = minus10[-1] | |
96 | |
97 minus35 = minus35.group().lstrip().split(" ") | |
98 minus35_pos = int(minus35[0]) | |
99 minus35_seq = minus35[1] | |
100 minus35_score = minus35[-1] | |
101 | |
102 # Can change these keys to change the column 9 | |
103 promoter_data = { | |
104 "minus10_pos": minus10_pos, | |
105 "minus10_seq": minus10_seq, | |
106 "minus10_score": minus10_score, | |
107 "minus35_pos": minus35_pos, | |
108 "minus35_seq": minus35_seq, | |
109 "minus35_score": minus35_score, | |
110 } | |
111 return promoter_data | |
112 | |
113 | |
114 def convert_extracted_promoter_data_to_ID_column_format( | |
115 promoter_data, calculated_promoter_positions | |
116 ) -> str: | |
117 """Converts input data to the GFF3 ID column (column 9) format, a semicolon separated | |
118 list of values providing additional information about each feature""" | |
119 # Replaces the BPROM output positions with the calculated ones | |
120 minus_10_calculated = calculated_promoter_positions[2] | |
121 minus_35_calculated = calculated_promoter_positions[3] | |
122 promoter_data["minus10_pos"] = minus_10_calculated | |
123 promoter_data["minus35_pos"] = minus_35_calculated | |
124 | |
125 # Creates the column 9 string (attributes) | |
126 promoter_data = ["{}={}".format(key, value) for key, value in promoter_data.items()] | |
127 promoter_data = ( | |
128 "Description=Predicted promoter data;" + "Note=" + ",".join(promoter_data) + ";" | |
129 ) | |
130 return promoter_data | |
131 | |
132 | |
133 def extract_LDF_score(feature) -> str: | |
134 """Extract LDF score""" | |
135 LDF = re.search("(?<=LDF-)(.*)", feature) | |
136 LDF = LDF.group().strip() | |
137 | |
138 return LDF | |
139 | |
140 | |
141 def calculate_promoter_position(feature): | |
142 """Calculate promoter positions (in the context of the genome) based on BPROM predictions.""" | |
143 # Get 'Promoter Pos: X' data. This refers to the predicted transcriptional start site! | |
144 promoter_pos = re.search("(?<=Promoter Pos:)(.*)(?=LDF)", feature) | |
145 promoter_pos = int(promoter_pos.group().strip()) | |
146 | |
147 # Get start and end positions from 'Location=[XXX:YYYY]' | |
148 test_seq_position = extract_test_seq_position(feature) | |
149 test_cds_location_start_pos = int(test_seq_position[0]) | |
150 test_cds_location_end_pos = int(test_seq_position[1]) | |
151 | |
152 promoter_data = extract_promoter_data(feature) | |
153 | |
154 """ IMPORTANT!! Whether or not you add or subtract to calculate the promoter start | |
155 # position depends on whether we're on the + or - strand! | |
156 # The workflow Jolene uses is smart enough to correctly pull upstream | |
157 # for both + and - strands (i.e., pulls left for +, pulls right for -) | |
158 # THEREFORE, for a gene with a start at 930 on the + strand, it pulls 830:930 | |
159 # And for a gene with a start at 930 on the - strand, it pulls 930:1030 """ | |
160 | |
161 direction = extract_strand_direction(feature) | |
162 | |
163 if direction == "+": | |
164 # BPROM starts counting from the LEFT boundary for + strand test sequences (as expected) | |
165 # Get -10 promoter position | |
166 minus10_pos = promoter_data["minus10_pos"] | |
167 minus10_pos_in_context_of_genome = test_cds_location_start_pos + minus10_pos | |
168 # Get -35 promoter position | |
169 minus35_pos = promoter_data["minus35_pos"] | |
170 minus35_pos_in_context_of_genome = test_cds_location_start_pos + minus35_pos | |
171 | |
172 start = test_cds_location_start_pos + minus35_pos | |
173 end = test_cds_location_start_pos + promoter_pos | |
174 | |
175 calculated_promoter_positions = [ | |
176 start, | |
177 end, | |
178 minus10_pos_in_context_of_genome, | |
179 minus35_pos_in_context_of_genome, | |
180 ] | |
181 return calculated_promoter_positions | |
182 | |
183 elif direction == "-": | |
184 # BPROM starts counting from the RIGHT boundary for - strand test sequences | |
185 # Get -10 promoter position | |
186 minus10_pos = promoter_data["minus10_pos"] | |
187 minus10_pos_in_context_of_genome = test_cds_location_end_pos - minus10_pos | |
188 # Get -35 promoter position | |
189 minus35_pos = promoter_data["minus35_pos"] | |
190 minus35_pos_in_context_of_genome = test_cds_location_end_pos - minus35_pos | |
191 | |
192 # The start and end are reversed | |
193 end = test_cds_location_end_pos - minus35_pos | |
194 start = test_cds_location_end_pos - promoter_pos | |
195 | |
196 calculated_promoter_positions = [ | |
197 start, | |
198 end, | |
199 minus10_pos_in_context_of_genome, | |
200 minus35_pos_in_context_of_genome, | |
201 ] | |
202 return calculated_promoter_positions | |
203 | |
204 else: | |
205 assert "Error: Strand data neither '+' nor '-'" | |
206 | |
207 | |
208 def extract_tf_binding_elements(): | |
209 """Extract predicted transcription factor binding elements""" | |
210 return | |
211 | |
212 | |
213 def extract_data_for_all_features(features) -> List[List[Union[str, int]]]: | |
214 """Loops through cleaned bprom output extracting all data of interest and builds the | |
215 structure for loading into a dataframe""" | |
216 extracted_data = [] | |
217 for feature in features: | |
218 # loop through features, a List[str] containing each feature [str] in the | |
219 # original bprom format as a single string, but cleaned of irrelevant data | |
220 calculated_promoter_positions = calculate_promoter_position(feature) | |
221 promoter_data = extract_promoter_data(feature) | |
222 promoter_data_converted = convert_extracted_promoter_data_to_ID_column_format( | |
223 promoter_data, calculated_promoter_positions | |
224 ) | |
225 | |
226 extracted_data.append( | |
227 [ | |
228 extract_accession(feature), # Seqid, col 1 | |
229 "bprom", # Source, col 2 | |
230 "promoter", # Type, col 3 | |
231 calculated_promoter_positions[0], # Start, col 4 | |
232 calculated_promoter_positions[1], # End, col 5 | |
233 extract_LDF_score(feature), # Score, col 6 | |
234 extract_strand_direction(feature), # Strand direction, col 7 | |
235 ".", # Phase, col 8 | |
236 promoter_data_converted, # Attributes, col 9 | |
237 ] | |
238 ) | |
239 | |
240 return extracted_data | |
241 | |
242 | |
243 def convert_to_dataframe(extracted_data) -> pd.DataFrame: | |
244 """Convert extracted and processed data to Pandas dataframe with gff3 column names""" | |
245 | |
246 df = pd.DataFrame( | |
247 extracted_data, | |
248 columns=[ | |
249 "seqid", | |
250 "source", | |
251 "type", | |
252 "start", | |
253 "end", | |
254 "score", | |
255 "strand", | |
256 "phase", | |
257 "attributes", | |
258 ], | |
259 ) | |
260 return df | |
261 | |
262 | |
263 def write_to_gff3(dataframe) -> None: | |
264 """Create a gff3 text file from the DataFrame by converting to a tab separated values (tsv) file""" | |
265 tsv = dataframe.to_csv(sep="\t", index=False, header=None) | |
266 | |
267 # Gets the first element of the first column to use for | |
268 accession = dataframe.iloc[0][0] | |
269 | |
270 year, month, day = date.today().year, date.today().month, date.today().day | |
271 | |
272 # with open(f'{year}_{month}_{day}_bprom_as_gff3_{accession}.txt', 'w') as wf: | |
273 # Header so Galaxy can recognize as GFF3 | |
274 print("##gff-version 3\n") | |
275 # for line in tsv: | |
276 print(tsv) | |
277 | |
278 return | |
279 | |
280 | |
281 def convert_bprom_output_to_gff3(bprom_file) -> None: | |
282 """Master function. Given a BPROM .txt file as output, extracts data and writes as a GFF3 file""" | |
283 bprom_file = read_bprom_file(bprom_file) | |
284 concatenated_bprom_file = concatenate_then_split(bprom_file) | |
285 working_file = remove_promoterless_features(concatenated_bprom_file) | |
286 extracted_data = extract_data_for_all_features(working_file) | |
287 gff3_dataframe = convert_to_dataframe(extracted_data) | |
288 # Create the gff3 text file | |
289 write_to_gff3(gff3_dataframe) | |
290 | |
291 return | |
292 | |
293 | |
294 if __name__ == "__main__": | |
295 ## Shows the DataFrame output in the terminal for testing/debugging | |
296 # bprom_file = read_bprom_file('BPROM_output.txt') | |
297 # concatenated_bprom_file: List[str] = concatenate_then_split(bprom_file) | |
298 # working_file = remove_promoterless_features(concatenated_bprom_file) | |
299 # print(convert_to_dataframe(extract_data_for_all_features(working_file)).to_string()) | |
300 | |
301 parser = argparse.ArgumentParser( | |
302 description="converts BPROM output to the gff3 file format" | |
303 ) | |
304 | |
305 parser.add_argument("-f", help="bprom file as .txt") | |
306 args = parser.parse_args() | |
307 # Actual function for converting the BPROM output to gff3 | |
308 convert_bprom_output_to_gff3(args.f) | |
309 | |
310 # Upload to cpt github in the directory Galaxy-Tools/tools/external/ |