comparison freqSAP.py @ 0:ddabfd6ee2a2 draft default tip

planemo upload for repository https://github.com/RECETOX/galaxytools/tree/main/tools/freqsap commit 202a898874d0de51b9923430ea0ef3040084c8d0
author recetox
date Fri, 18 Jul 2025 13:21:36 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:ddabfd6ee2a2
1 import argparse
2 import json
3 import os
4 import sys
5
6 import pandas as pd
7 import requests
8
9 VARIANT_INDEX = "NCBI Reference SNP (rs) Report ALPHA"
10
11
12 def get_protein_variation(accession: str) -> tuple[dict, pd.DataFrame]:
13 requestURL = f"https://www.ebi.ac.uk/proteins/api/variation?offset=0&size=-1&accession={accession}"
14 r = requests.get(requestURL, headers={"Accept": "application/json"})
15
16 if not r.ok:
17 r.raise_for_status()
18 sys.exit()
19
20 responseBody = r.text
21
22 data = json.loads(responseBody)[0]
23
24 features = pd.DataFrame(data.pop("features"))
25 return data, features
26
27
28 def get_position(feature: dict) -> str:
29 """
30 Get the position of a feature in the protein sequence.
31
32 Args:
33 feature (dict): A feature dictionary containing 'begin' and 'end'.
34
35 Returns:
36 str: The position in the format "start-end".
37 """
38 begin = feature.get("begin")
39 end = feature.get("end")
40 if begin == end:
41 return str(begin)
42 return f"{feature.get('begin')}-{feature.get('end')}"
43
44
45 def get_frequency(variant: str) -> str:
46 if not variant:
47 return ""
48
49 try:
50 freq_url = f"https://www.ncbi.nlm.nih.gov/snp/{variant}/download/frequency"
51 r = requests.get(freq_url, headers={"Accept": "application/json"})
52 if not r.ok:
53 r.raise_for_status()
54 return r.text
55 except requests.exceptions.RequestException as e:
56 print(f"Error fetching frequency data for variant {variant}: {e}")
57 return ""
58
59
60 def parse_frequency_reponse(responseBody: str) -> tuple[dict, pd.DataFrame]:
61 """
62 Parse the frequency response body.
63
64 Args:
65 responseBody (str): The response body as a string.
66
67 Returns:
68 dict: Parsed JSON data.
69 """
70 if responseBody == "":
71 return {}, pd.DataFrame()
72
73 lines = responseBody.splitlines()
74 n_lines = len(lines)
75 i = 0
76
77 metadata = {}
78 header = []
79 rows = []
80
81 while i < n_lines:
82 line = lines[i]
83 tokens = line.split("\t")
84
85 if len(tokens) == 2:
86 key = tokens[0].strip("# ")
87 value = tokens[1].strip()
88 metadata[key] = value
89 elif len(tokens) > 2:
90 if tokens[0].startswith("#"):
91 header = [t.strip("# ") for t in tokens]
92 else:
93 row = list(map(lambda x: "NA" if x == "" else x, tokens))
94 rows.append(row)
95 elif len(tokens) == 1:
96 pass
97 else:
98 print(f"Unexpected line format at line {i}: {line}")
99 sys.exit(1)
100 i += 1
101 continue
102
103 df = pd.DataFrame(rows, columns=header)
104 df[VARIANT_INDEX] = metadata.get(VARIANT_INDEX)
105 return metadata, df
106
107
108 def get_variant_id(feature: dict) -> str:
109 """
110 Get the variant ID from a feature.
111
112 Args:
113 feature (dict): A feature dictionary.
114
115 Returns:
116 str: The variant ID or None if not applicable.
117 """
118 xrefs = feature.get("xrefs", [])
119 for xref in xrefs:
120 if xref.get("id").startswith("rs"):
121 return xref.get("id")
122 return ""
123
124
125 def combine_alternative_sequences(df: pd.DataFrame) -> pd.DataFrame:
126 if "mutatedType" not in df.columns:
127 return df
128 return df.groupby(["begin", "end", "variant_id"], dropna=False, as_index=False).agg(
129 {
130 col: (
131 (lambda x: ",".join(x.astype(str).unique()))
132 if col == "mutatedType"
133 else "first"
134 )
135 for col in df.columns
136 }
137 )
138
139
140 def get_populations(regions: list[str]) -> set[str]:
141 """Generate subgroups for larger groups.
142
143 Args:
144 groups (list[str]): List of main groups to include.
145
146 Returns:
147 list[str]: List of all subgroups in the main group.
148 """
149 mapping: dict[str, set[str]] = {
150 "Africa": set(["African"]),
151 "North America": set(
152 [
153 "American",
154 "African American",
155 "Mexican",
156 "Cuban",
157 "European American",
158 "NativeAmerican",
159 "NativeHawaiian",
160 ]
161 ),
162 "Asia": set(
163 [
164 "Asian",
165 "East Asian",
166 "Central Asia",
167 "JAPANESE",
168 "KOREAN",
169 "South Asian",
170 "SouthAsian",
171 ]
172 ),
173 "Europe": set(
174 [
175 "Europe",
176 "European",
177 "Finnish from FINRISK project",
178 "Spanish controls",
179 "TWIN COHORT",
180 ]
181 ),
182 "Global": set(["Global", "Total"]),
183 "South America": set(
184 [
185 "Latin American 1",
186 "Latin American 2",
187 "Dominican",
188 "PuertoRican",
189 "SouthAmerican",
190 ]
191 ),
192 "Middle East": set(["Middle Eastern", "Near_East"]),
193 "Other": set(["Other"]),
194 }
195
196 return set.union(*[mapping.get(group, set()) for group in regions])
197
198
199 def main():
200 parser = argparse.ArgumentParser(description="Protein Variance CLI Application")
201 parser.add_argument(
202 "-a", "--accession", type=str, required=True, help="Protein accession number."
203 )
204 parser.add_argument(
205 "-p",
206 "--populations",
207 type=str,
208 required=True,
209 help="Comma-separated list of populations.",
210 )
211 parser.add_argument(
212 "-f",
213 "--output-format",
214 type=str,
215 choices=["xlsx", "tabular", "csv"],
216 default="tabular",
217 help="Output format: xlsx, tabular (tsv), or csv. Default is tabular.",
218 )
219 parser.add_argument(
220 "-o",
221 "--output-file",
222 type=str,
223 default="protein_variation.tsv",
224 help="Output file name.",
225 )
226
227 args = parser.parse_args()
228
229 populations = get_populations(args.populations.split(","))
230
231 _, features_df = get_protein_variation(args.accession)
232
233 features_df["variant_id"] = features_df.apply(get_variant_id, axis=1)
234 features_df["variation_position"] = features_df.apply(get_position, axis=1)
235 features_df = combine_alternative_sequences(features_df)
236
237 results = list(
238 zip(
239 *[
240 parse_frequency_reponse(get_frequency(variant))
241 for variant in features_df["variant_id"]
242 ]
243 )
244 )
245
246 metadata_df = pd.DataFrame(results[0])
247 frequencies_df = pd.concat(results[1])
248
249 merged = pd.concat([features_df, metadata_df], axis=1)
250 final_merged = pd.merge(merged, frequencies_df, on=VARIANT_INDEX, how="outer")
251
252 final_merged[["Ref Allele NA", "Ref Allele Prob"]] = final_merged[
253 "Ref Allele"
254 ].str.split("=", n=1, expand=True)
255
256 alt_alleles = final_merged["Alt Allele"].str.split(",", expand=True)
257 if alt_alleles.columns.size == 2:
258 final_merged[["Alt Allele 1", "Alt Allele 2"]] = final_merged[
259 "Alt Allele"
260 ].str.split(",", expand=True)
261 final_merged[["Alt Allele NA 1", "Alt Allele Prob 1"]] = final_merged[
262 "Alt Allele 1"
263 ].str.split("=", n=1, expand=True)
264 final_merged[["Alt Allele NA 2", "Alt Allele Prob 2"]] = final_merged[
265 "Alt Allele 2"
266 ].str.split("=", n=1, expand=True)
267 else:
268 final_merged[["Alt Allele NA 1", "Alt Allele Prob 1"]] = final_merged[
269 "Alt Allele"
270 ].str.split("=", n=1, expand=True)
271 final_merged[["Alt Allele NA 2", "Alt Allele Prob 2"]] = None
272
273 subset_cols: list[str] = [
274 "variation_position",
275 "consequenceType",
276 "wildType",
277 "mutatedType",
278 "variant_id",
279 "Alleles",
280 "Study",
281 "Population",
282 "Group",
283 "Samplesize",
284 "Ref Allele",
285 "Alt Allele",
286 "BioProject ID",
287 "BioSample ID",
288 "somaticStatus",
289 "Ref Allele NA",
290 "Ref Allele Prob",
291 "Alt Allele NA 1",
292 "Alt Allele Prob 1",
293 "Alt Allele NA 2",
294 "Alt Allele Prob 2",
295 ]
296
297 subset = final_merged[subset_cols]
298 subset = subset[subset["Population"].isin(populations)]
299
300 if args.output_format == "xlsx":
301 outdir = os.path.dirname(args.output_file)
302 outpath = os.path.join(outdir, "results.xlsx")
303 subset.to_excel(outpath, index=False, na_rep="NA", engine="openpyxl")
304 os.rename(outpath, args.output_file)
305 elif args.output_format == "csv":
306 subset.to_csv(args.output_file, index=False, na_rep="NA")
307 else: # tabular/tsv
308 subset.to_csv(args.output_file, index=False, sep="\t", na_rep="NA")
309
310
311 if __name__ == "__main__":
312 main()