Mercurial > repos > recetox > freqsap
comparison freqSAP.py @ 0:ddabfd6ee2a2 draft default tip
planemo upload for repository https://github.com/RECETOX/galaxytools/tree/main/tools/freqsap commit 202a898874d0de51b9923430ea0ef3040084c8d0
author | recetox |
---|---|
date | Fri, 18 Jul 2025 13:21:36 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:ddabfd6ee2a2 |
---|---|
1 import argparse | |
2 import json | |
3 import os | |
4 import sys | |
5 | |
6 import pandas as pd | |
7 import requests | |
8 | |
9 VARIANT_INDEX = "NCBI Reference SNP (rs) Report ALPHA" | |
10 | |
11 | |
12 def get_protein_variation(accession: str) -> tuple[dict, pd.DataFrame]: | |
13 requestURL = f"https://www.ebi.ac.uk/proteins/api/variation?offset=0&size=-1&accession={accession}" | |
14 r = requests.get(requestURL, headers={"Accept": "application/json"}) | |
15 | |
16 if not r.ok: | |
17 r.raise_for_status() | |
18 sys.exit() | |
19 | |
20 responseBody = r.text | |
21 | |
22 data = json.loads(responseBody)[0] | |
23 | |
24 features = pd.DataFrame(data.pop("features")) | |
25 return data, features | |
26 | |
27 | |
28 def get_position(feature: dict) -> str: | |
29 """ | |
30 Get the position of a feature in the protein sequence. | |
31 | |
32 Args: | |
33 feature (dict): A feature dictionary containing 'begin' and 'end'. | |
34 | |
35 Returns: | |
36 str: The position in the format "start-end". | |
37 """ | |
38 begin = feature.get("begin") | |
39 end = feature.get("end") | |
40 if begin == end: | |
41 return str(begin) | |
42 return f"{feature.get('begin')}-{feature.get('end')}" | |
43 | |
44 | |
45 def get_frequency(variant: str) -> str: | |
46 if not variant: | |
47 return "" | |
48 | |
49 try: | |
50 freq_url = f"https://www.ncbi.nlm.nih.gov/snp/{variant}/download/frequency" | |
51 r = requests.get(freq_url, headers={"Accept": "application/json"}) | |
52 if not r.ok: | |
53 r.raise_for_status() | |
54 return r.text | |
55 except requests.exceptions.RequestException as e: | |
56 print(f"Error fetching frequency data for variant {variant}: {e}") | |
57 return "" | |
58 | |
59 | |
60 def parse_frequency_reponse(responseBody: str) -> tuple[dict, pd.DataFrame]: | |
61 """ | |
62 Parse the frequency response body. | |
63 | |
64 Args: | |
65 responseBody (str): The response body as a string. | |
66 | |
67 Returns: | |
68 dict: Parsed JSON data. | |
69 """ | |
70 if responseBody == "": | |
71 return {}, pd.DataFrame() | |
72 | |
73 lines = responseBody.splitlines() | |
74 n_lines = len(lines) | |
75 i = 0 | |
76 | |
77 metadata = {} | |
78 header = [] | |
79 rows = [] | |
80 | |
81 while i < n_lines: | |
82 line = lines[i] | |
83 tokens = line.split("\t") | |
84 | |
85 if len(tokens) == 2: | |
86 key = tokens[0].strip("# ") | |
87 value = tokens[1].strip() | |
88 metadata[key] = value | |
89 elif len(tokens) > 2: | |
90 if tokens[0].startswith("#"): | |
91 header = [t.strip("# ") for t in tokens] | |
92 else: | |
93 row = list(map(lambda x: "NA" if x == "" else x, tokens)) | |
94 rows.append(row) | |
95 elif len(tokens) == 1: | |
96 pass | |
97 else: | |
98 print(f"Unexpected line format at line {i}: {line}") | |
99 sys.exit(1) | |
100 i += 1 | |
101 continue | |
102 | |
103 df = pd.DataFrame(rows, columns=header) | |
104 df[VARIANT_INDEX] = metadata.get(VARIANT_INDEX) | |
105 return metadata, df | |
106 | |
107 | |
108 def get_variant_id(feature: dict) -> str: | |
109 """ | |
110 Get the variant ID from a feature. | |
111 | |
112 Args: | |
113 feature (dict): A feature dictionary. | |
114 | |
115 Returns: | |
116 str: The variant ID or None if not applicable. | |
117 """ | |
118 xrefs = feature.get("xrefs", []) | |
119 for xref in xrefs: | |
120 if xref.get("id").startswith("rs"): | |
121 return xref.get("id") | |
122 return "" | |
123 | |
124 | |
125 def combine_alternative_sequences(df: pd.DataFrame) -> pd.DataFrame: | |
126 if "mutatedType" not in df.columns: | |
127 return df | |
128 return df.groupby(["begin", "end", "variant_id"], dropna=False, as_index=False).agg( | |
129 { | |
130 col: ( | |
131 (lambda x: ",".join(x.astype(str).unique())) | |
132 if col == "mutatedType" | |
133 else "first" | |
134 ) | |
135 for col in df.columns | |
136 } | |
137 ) | |
138 | |
139 | |
140 def get_populations(regions: list[str]) -> set[str]: | |
141 """Generate subgroups for larger groups. | |
142 | |
143 Args: | |
144 groups (list[str]): List of main groups to include. | |
145 | |
146 Returns: | |
147 list[str]: List of all subgroups in the main group. | |
148 """ | |
149 mapping: dict[str, set[str]] = { | |
150 "Africa": set(["African"]), | |
151 "North America": set( | |
152 [ | |
153 "American", | |
154 "African American", | |
155 "Mexican", | |
156 "Cuban", | |
157 "European American", | |
158 "NativeAmerican", | |
159 "NativeHawaiian", | |
160 ] | |
161 ), | |
162 "Asia": set( | |
163 [ | |
164 "Asian", | |
165 "East Asian", | |
166 "Central Asia", | |
167 "JAPANESE", | |
168 "KOREAN", | |
169 "South Asian", | |
170 "SouthAsian", | |
171 ] | |
172 ), | |
173 "Europe": set( | |
174 [ | |
175 "Europe", | |
176 "European", | |
177 "Finnish from FINRISK project", | |
178 "Spanish controls", | |
179 "TWIN COHORT", | |
180 ] | |
181 ), | |
182 "Global": set(["Global", "Total"]), | |
183 "South America": set( | |
184 [ | |
185 "Latin American 1", | |
186 "Latin American 2", | |
187 "Dominican", | |
188 "PuertoRican", | |
189 "SouthAmerican", | |
190 ] | |
191 ), | |
192 "Middle East": set(["Middle Eastern", "Near_East"]), | |
193 "Other": set(["Other"]), | |
194 } | |
195 | |
196 return set.union(*[mapping.get(group, set()) for group in regions]) | |
197 | |
198 | |
199 def main(): | |
200 parser = argparse.ArgumentParser(description="Protein Variance CLI Application") | |
201 parser.add_argument( | |
202 "-a", "--accession", type=str, required=True, help="Protein accession number." | |
203 ) | |
204 parser.add_argument( | |
205 "-p", | |
206 "--populations", | |
207 type=str, | |
208 required=True, | |
209 help="Comma-separated list of populations.", | |
210 ) | |
211 parser.add_argument( | |
212 "-f", | |
213 "--output-format", | |
214 type=str, | |
215 choices=["xlsx", "tabular", "csv"], | |
216 default="tabular", | |
217 help="Output format: xlsx, tabular (tsv), or csv. Default is tabular.", | |
218 ) | |
219 parser.add_argument( | |
220 "-o", | |
221 "--output-file", | |
222 type=str, | |
223 default="protein_variation.tsv", | |
224 help="Output file name.", | |
225 ) | |
226 | |
227 args = parser.parse_args() | |
228 | |
229 populations = get_populations(args.populations.split(",")) | |
230 | |
231 _, features_df = get_protein_variation(args.accession) | |
232 | |
233 features_df["variant_id"] = features_df.apply(get_variant_id, axis=1) | |
234 features_df["variation_position"] = features_df.apply(get_position, axis=1) | |
235 features_df = combine_alternative_sequences(features_df) | |
236 | |
237 results = list( | |
238 zip( | |
239 *[ | |
240 parse_frequency_reponse(get_frequency(variant)) | |
241 for variant in features_df["variant_id"] | |
242 ] | |
243 ) | |
244 ) | |
245 | |
246 metadata_df = pd.DataFrame(results[0]) | |
247 frequencies_df = pd.concat(results[1]) | |
248 | |
249 merged = pd.concat([features_df, metadata_df], axis=1) | |
250 final_merged = pd.merge(merged, frequencies_df, on=VARIANT_INDEX, how="outer") | |
251 | |
252 final_merged[["Ref Allele NA", "Ref Allele Prob"]] = final_merged[ | |
253 "Ref Allele" | |
254 ].str.split("=", n=1, expand=True) | |
255 | |
256 alt_alleles = final_merged["Alt Allele"].str.split(",", expand=True) | |
257 if alt_alleles.columns.size == 2: | |
258 final_merged[["Alt Allele 1", "Alt Allele 2"]] = final_merged[ | |
259 "Alt Allele" | |
260 ].str.split(",", expand=True) | |
261 final_merged[["Alt Allele NA 1", "Alt Allele Prob 1"]] = final_merged[ | |
262 "Alt Allele 1" | |
263 ].str.split("=", n=1, expand=True) | |
264 final_merged[["Alt Allele NA 2", "Alt Allele Prob 2"]] = final_merged[ | |
265 "Alt Allele 2" | |
266 ].str.split("=", n=1, expand=True) | |
267 else: | |
268 final_merged[["Alt Allele NA 1", "Alt Allele Prob 1"]] = final_merged[ | |
269 "Alt Allele" | |
270 ].str.split("=", n=1, expand=True) | |
271 final_merged[["Alt Allele NA 2", "Alt Allele Prob 2"]] = None | |
272 | |
273 subset_cols: list[str] = [ | |
274 "variation_position", | |
275 "consequenceType", | |
276 "wildType", | |
277 "mutatedType", | |
278 "variant_id", | |
279 "Alleles", | |
280 "Study", | |
281 "Population", | |
282 "Group", | |
283 "Samplesize", | |
284 "Ref Allele", | |
285 "Alt Allele", | |
286 "BioProject ID", | |
287 "BioSample ID", | |
288 "somaticStatus", | |
289 "Ref Allele NA", | |
290 "Ref Allele Prob", | |
291 "Alt Allele NA 1", | |
292 "Alt Allele Prob 1", | |
293 "Alt Allele NA 2", | |
294 "Alt Allele Prob 2", | |
295 ] | |
296 | |
297 subset = final_merged[subset_cols] | |
298 subset = subset[subset["Population"].isin(populations)] | |
299 | |
300 if args.output_format == "xlsx": | |
301 outdir = os.path.dirname(args.output_file) | |
302 outpath = os.path.join(outdir, "results.xlsx") | |
303 subset.to_excel(outpath, index=False, na_rep="NA", engine="openpyxl") | |
304 os.rename(outpath, args.output_file) | |
305 elif args.output_format == "csv": | |
306 subset.to_csv(args.output_file, index=False, na_rep="NA") | |
307 else: # tabular/tsv | |
308 subset.to_csv(args.output_file, index=False, sep="\t", na_rep="NA") | |
309 | |
310 | |
311 if __name__ == "__main__": | |
312 main() |