comparison bprom_gff3_converter.py @ 0:14e64dda68c8 draft default tip

planemo upload commit 852ac96ca53a2ffa0947e6df5e24671866b642f5
author cpt
date Sun, 23 Jul 2023 01:47:35 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:14e64dda68c8
1 import argparse
2
3 import pandas as pd
4 import re
5 from typing import List, Match, Dict, TextIO, Union
6 from datetime import date
7
8 # In this file, a "feature" refers to the collection of data between the > keys of the bprom output.
9 # That collection of data refers to one section of the DNA upstream of a gene
10
11
12 def read_bprom_file(bprom_file) -> List[str]:
13 """Reads in file, creating a list of strings with each list element containing a line from the file"""
14 contents = []
15
16 with open(bprom_file) as file:
17 for line in file:
18 contents.append(line)
19
20 return contents
21
22
23 def concatenate_then_split(contents) -> List[str]:
24 """Concatenates the file into one large string.
25 Then splits it on '>' so that each feature's data is together in one element"""
26 # Concatenates the entire file into one large string
27 concat_contents = "".join(contents)
28
29 # Removing the empty string '' at element 0 used to make the join
30 concat_contents = concat_contents[1:]
31
32 # Splits the file into a list of strings on ">"
33 features = concat_contents.split(">")
34
35 return features
36
37
38 def remove_promoterless_features(features) -> List[str]:
39 """For each concatenated feature string passed, removes the element
40 if the # of predicted promoters is 0."""
41 cleaned_features = features
42 indices_to_delete = []
43 for i, feature in enumerate(cleaned_features):
44 if "Number of predicted promoters - 0" in cleaned_features[i]:
45 indices_to_delete.append(i)
46 # Must delete in reverse order, otherwise it changes the list indices after
47 # the element deleted, and you delete subsequent elements at i+1, i+2, i+3, etc
48 for i in sorted(indices_to_delete, reverse=True):
49 del cleaned_features[i]
50
51 return cleaned_features
52
53
54 def extract_accession(feature) -> str:
55 """Extract accession"""
56 accession = re.search("[\w](.*)(?=_)", feature)
57 accession = accession.group().replace("_", "").strip()
58
59 return accession
60
61
62 def extract_test_seq_position(feature) -> List[str]:
63 """Extract position in genome. Gets any number of values '(.*)' between the brackets
64 using 'lookbehind/lookright' (?<=PATTERN) and 'lookahead/lookleft' regex assertions
65 to extract (?<=Location=\\[)(.*)(?=]\\()"""
66 location = re.search("(?<=Location=\\[)(.*)(?=]\\()", feature)
67 location = location.group().split(":")
68
69 return location
70
71
72 def extract_strand_direction(feature) -> str:
73 """Extract strand direction for a feature, - or +"""
74 # Matches for '(.)'
75 direction = re.search("(?<=\\().(?=\\))", feature)
76 direction = direction.group()
77
78 return direction
79
80
81 def extract_promoter_data(feature) -> Dict[str, str]:
82 """Extracts all promoter data using regular expressions.
83 Use for one element in the output of concatenate_then_split()"""
84 # Extract promoter -10 and -35 sequences and scores
85 # Gets everything between "-xx box at pos." and " Score"
86 minus10 = re.search("(?<=-10 box at pos.)(.*)(?= Score)(.*)", feature)
87 minus35 = re.search("(?<=-35 box at pos.)(.*)(?= Score)(.*)", feature)
88
89 # Extracts the match and removes leading and trailing whitespace (which can be variable)
90 # (the bprom output does not maintain the same # of whitespace characters
91 # if there are less digits, at least for the scoring)
92 minus10 = minus10.group().lstrip().split(" ")
93 minus10_pos = int(minus10[0])
94 minus10_seq = minus10[1]
95 minus10_score = minus10[-1]
96
97 minus35 = minus35.group().lstrip().split(" ")
98 minus35_pos = int(minus35[0])
99 minus35_seq = minus35[1]
100 minus35_score = minus35[-1]
101
102 # Can change these keys to change the column 9
103 promoter_data = {
104 "minus10_pos": minus10_pos,
105 "minus10_seq": minus10_seq,
106 "minus10_score": minus10_score,
107 "minus35_pos": minus35_pos,
108 "minus35_seq": minus35_seq,
109 "minus35_score": minus35_score,
110 }
111 return promoter_data
112
113
114 def convert_extracted_promoter_data_to_ID_column_format(
115 promoter_data, calculated_promoter_positions
116 ) -> str:
117 """Converts input data to the GFF3 ID column (column 9) format, a semicolon separated
118 list of values providing additional information about each feature"""
119 # Replaces the BPROM output positions with the calculated ones
120 minus_10_calculated = calculated_promoter_positions[2]
121 minus_35_calculated = calculated_promoter_positions[3]
122 promoter_data["minus10_pos"] = minus_10_calculated
123 promoter_data["minus35_pos"] = minus_35_calculated
124
125 # Creates the column 9 string (attributes)
126 promoter_data = ["{}={}".format(key, value) for key, value in promoter_data.items()]
127 promoter_data = (
128 "Description=Predicted promoter data;" + "Note=" + ",".join(promoter_data) + ";"
129 )
130 return promoter_data
131
132
133 def extract_LDF_score(feature) -> str:
134 """Extract LDF score"""
135 LDF = re.search("(?<=LDF-)(.*)", feature)
136 LDF = LDF.group().strip()
137
138 return LDF
139
140
141 def calculate_promoter_position(feature):
142 """Calculate promoter positions (in the context of the genome) based on BPROM predictions."""
143 # Get 'Promoter Pos: X' data. This refers to the predicted transcriptional start site!
144 promoter_pos = re.search("(?<=Promoter Pos:)(.*)(?=LDF)", feature)
145 promoter_pos = int(promoter_pos.group().strip())
146
147 # Get start and end positions from 'Location=[XXX:YYYY]'
148 test_seq_position = extract_test_seq_position(feature)
149 test_cds_location_start_pos = int(test_seq_position[0])
150 test_cds_location_end_pos = int(test_seq_position[1])
151
152 promoter_data = extract_promoter_data(feature)
153
154 """ IMPORTANT!! Whether or not you add or subtract to calculate the promoter start
155 # position depends on whether we're on the + or - strand!
156 # The workflow Jolene uses is smart enough to correctly pull upstream
157 # for both + and - strands (i.e., pulls left for +, pulls right for -)
158 # THEREFORE, for a gene with a start at 930 on the + strand, it pulls 830:930
159 # And for a gene with a start at 930 on the - strand, it pulls 930:1030 """
160
161 direction = extract_strand_direction(feature)
162
163 if direction == "+":
164 # BPROM starts counting from the LEFT boundary for + strand test sequences (as expected)
165 # Get -10 promoter position
166 minus10_pos = promoter_data["minus10_pos"]
167 minus10_pos_in_context_of_genome = test_cds_location_start_pos + minus10_pos
168 # Get -35 promoter position
169 minus35_pos = promoter_data["minus35_pos"]
170 minus35_pos_in_context_of_genome = test_cds_location_start_pos + minus35_pos
171
172 start = test_cds_location_start_pos + minus35_pos
173 end = test_cds_location_start_pos + promoter_pos
174
175 calculated_promoter_positions = [
176 start,
177 end,
178 minus10_pos_in_context_of_genome,
179 minus35_pos_in_context_of_genome,
180 ]
181 return calculated_promoter_positions
182
183 elif direction == "-":
184 # BPROM starts counting from the RIGHT boundary for - strand test sequences
185 # Get -10 promoter position
186 minus10_pos = promoter_data["minus10_pos"]
187 minus10_pos_in_context_of_genome = test_cds_location_end_pos - minus10_pos
188 # Get -35 promoter position
189 minus35_pos = promoter_data["minus35_pos"]
190 minus35_pos_in_context_of_genome = test_cds_location_end_pos - minus35_pos
191
192 # The start and end are reversed
193 end = test_cds_location_end_pos - minus35_pos
194 start = test_cds_location_end_pos - promoter_pos
195
196 calculated_promoter_positions = [
197 start,
198 end,
199 minus10_pos_in_context_of_genome,
200 minus35_pos_in_context_of_genome,
201 ]
202 return calculated_promoter_positions
203
204 else:
205 assert "Error: Strand data neither '+' nor '-'"
206
207
208 def extract_tf_binding_elements():
209 """Extract predicted transcription factor binding elements"""
210 return
211
212
213 def extract_data_for_all_features(features) -> List[List[Union[str, int]]]:
214 """Loops through cleaned bprom output extracting all data of interest and builds the
215 structure for loading into a dataframe"""
216 extracted_data = []
217 for feature in features:
218 # loop through features, a List[str] containing each feature [str] in the
219 # original bprom format as a single string, but cleaned of irrelevant data
220 calculated_promoter_positions = calculate_promoter_position(feature)
221 promoter_data = extract_promoter_data(feature)
222 promoter_data_converted = convert_extracted_promoter_data_to_ID_column_format(
223 promoter_data, calculated_promoter_positions
224 )
225
226 extracted_data.append(
227 [
228 extract_accession(feature), # Seqid, col 1
229 "bprom", # Source, col 2
230 "promoter", # Type, col 3
231 calculated_promoter_positions[0], # Start, col 4
232 calculated_promoter_positions[1], # End, col 5
233 extract_LDF_score(feature), # Score, col 6
234 extract_strand_direction(feature), # Strand direction, col 7
235 ".", # Phase, col 8
236 promoter_data_converted, # Attributes, col 9
237 ]
238 )
239
240 return extracted_data
241
242
243 def convert_to_dataframe(extracted_data) -> pd.DataFrame:
244 """Convert extracted and processed data to Pandas dataframe with gff3 column names"""
245
246 df = pd.DataFrame(
247 extracted_data,
248 columns=[
249 "seqid",
250 "source",
251 "type",
252 "start",
253 "end",
254 "score",
255 "strand",
256 "phase",
257 "attributes",
258 ],
259 )
260 return df
261
262
263 def write_to_gff3(dataframe) -> None:
264 """Create a gff3 text file from the DataFrame by converting to a tab separated values (tsv) file"""
265 tsv = dataframe.to_csv(sep="\t", index=False, header=None)
266
267 # Gets the first element of the first column to use for
268 accession = dataframe.iloc[0][0]
269
270 year, month, day = date.today().year, date.today().month, date.today().day
271
272 # with open(f'{year}_{month}_{day}_bprom_as_gff3_{accession}.txt', 'w') as wf:
273 # Header so Galaxy can recognize as GFF3
274 print("##gff-version 3\n")
275 # for line in tsv:
276 print(tsv)
277
278 return
279
280
281 def convert_bprom_output_to_gff3(bprom_file) -> None:
282 """Master function. Given a BPROM .txt file as output, extracts data and writes as a GFF3 file"""
283 bprom_file = read_bprom_file(bprom_file)
284 concatenated_bprom_file = concatenate_then_split(bprom_file)
285 working_file = remove_promoterless_features(concatenated_bprom_file)
286 extracted_data = extract_data_for_all_features(working_file)
287 gff3_dataframe = convert_to_dataframe(extracted_data)
288 # Create the gff3 text file
289 write_to_gff3(gff3_dataframe)
290
291 return
292
293
294 if __name__ == "__main__":
295 ## Shows the DataFrame output in the terminal for testing/debugging
296 # bprom_file = read_bprom_file('BPROM_output.txt')
297 # concatenated_bprom_file: List[str] = concatenate_then_split(bprom_file)
298 # working_file = remove_promoterless_features(concatenated_bprom_file)
299 # print(convert_to_dataframe(extract_data_for_all_features(working_file)).to_string())
300
301 parser = argparse.ArgumentParser(
302 description="converts BPROM output to the gff3 file format"
303 )
304
305 parser.add_argument("-f", help="bprom file as .txt")
306 args = parser.parse_args()
307 # Actual function for converting the BPROM output to gff3
308 convert_bprom_output_to_gff3(args.f)
309
310 # Upload to cpt github in the directory Galaxy-Tools/tools/external/