Mercurial > repos > recetox > freqsap
comparison freqSAP.py @ 0:ddabfd6ee2a2 draft default tip
planemo upload for repository https://github.com/RECETOX/galaxytools/tree/main/tools/freqsap commit 202a898874d0de51b9923430ea0ef3040084c8d0
| author | recetox |
|---|---|
| date | Fri, 18 Jul 2025 13:21:36 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:ddabfd6ee2a2 |
|---|---|
| 1 import argparse | |
| 2 import json | |
| 3 import os | |
| 4 import sys | |
| 5 | |
| 6 import pandas as pd | |
| 7 import requests | |
| 8 | |
| 9 VARIANT_INDEX = "NCBI Reference SNP (rs) Report ALPHA" | |
| 10 | |
| 11 | |
| 12 def get_protein_variation(accession: str) -> tuple[dict, pd.DataFrame]: | |
| 13 requestURL = f"https://www.ebi.ac.uk/proteins/api/variation?offset=0&size=-1&accession={accession}" | |
| 14 r = requests.get(requestURL, headers={"Accept": "application/json"}) | |
| 15 | |
| 16 if not r.ok: | |
| 17 r.raise_for_status() | |
| 18 sys.exit() | |
| 19 | |
| 20 responseBody = r.text | |
| 21 | |
| 22 data = json.loads(responseBody)[0] | |
| 23 | |
| 24 features = pd.DataFrame(data.pop("features")) | |
| 25 return data, features | |
| 26 | |
| 27 | |
| 28 def get_position(feature: dict) -> str: | |
| 29 """ | |
| 30 Get the position of a feature in the protein sequence. | |
| 31 | |
| 32 Args: | |
| 33 feature (dict): A feature dictionary containing 'begin' and 'end'. | |
| 34 | |
| 35 Returns: | |
| 36 str: The position in the format "start-end". | |
| 37 """ | |
| 38 begin = feature.get("begin") | |
| 39 end = feature.get("end") | |
| 40 if begin == end: | |
| 41 return str(begin) | |
| 42 return f"{feature.get('begin')}-{feature.get('end')}" | |
| 43 | |
| 44 | |
| 45 def get_frequency(variant: str) -> str: | |
| 46 if not variant: | |
| 47 return "" | |
| 48 | |
| 49 try: | |
| 50 freq_url = f"https://www.ncbi.nlm.nih.gov/snp/{variant}/download/frequency" | |
| 51 r = requests.get(freq_url, headers={"Accept": "application/json"}) | |
| 52 if not r.ok: | |
| 53 r.raise_for_status() | |
| 54 return r.text | |
| 55 except requests.exceptions.RequestException as e: | |
| 56 print(f"Error fetching frequency data for variant {variant}: {e}") | |
| 57 return "" | |
| 58 | |
| 59 | |
| 60 def parse_frequency_reponse(responseBody: str) -> tuple[dict, pd.DataFrame]: | |
| 61 """ | |
| 62 Parse the frequency response body. | |
| 63 | |
| 64 Args: | |
| 65 responseBody (str): The response body as a string. | |
| 66 | |
| 67 Returns: | |
| 68 dict: Parsed JSON data. | |
| 69 """ | |
| 70 if responseBody == "": | |
| 71 return {}, pd.DataFrame() | |
| 72 | |
| 73 lines = responseBody.splitlines() | |
| 74 n_lines = len(lines) | |
| 75 i = 0 | |
| 76 | |
| 77 metadata = {} | |
| 78 header = [] | |
| 79 rows = [] | |
| 80 | |
| 81 while i < n_lines: | |
| 82 line = lines[i] | |
| 83 tokens = line.split("\t") | |
| 84 | |
| 85 if len(tokens) == 2: | |
| 86 key = tokens[0].strip("# ") | |
| 87 value = tokens[1].strip() | |
| 88 metadata[key] = value | |
| 89 elif len(tokens) > 2: | |
| 90 if tokens[0].startswith("#"): | |
| 91 header = [t.strip("# ") for t in tokens] | |
| 92 else: | |
| 93 row = list(map(lambda x: "NA" if x == "" else x, tokens)) | |
| 94 rows.append(row) | |
| 95 elif len(tokens) == 1: | |
| 96 pass | |
| 97 else: | |
| 98 print(f"Unexpected line format at line {i}: {line}") | |
| 99 sys.exit(1) | |
| 100 i += 1 | |
| 101 continue | |
| 102 | |
| 103 df = pd.DataFrame(rows, columns=header) | |
| 104 df[VARIANT_INDEX] = metadata.get(VARIANT_INDEX) | |
| 105 return metadata, df | |
| 106 | |
| 107 | |
| 108 def get_variant_id(feature: dict) -> str: | |
| 109 """ | |
| 110 Get the variant ID from a feature. | |
| 111 | |
| 112 Args: | |
| 113 feature (dict): A feature dictionary. | |
| 114 | |
| 115 Returns: | |
| 116 str: The variant ID or None if not applicable. | |
| 117 """ | |
| 118 xrefs = feature.get("xrefs", []) | |
| 119 for xref in xrefs: | |
| 120 if xref.get("id").startswith("rs"): | |
| 121 return xref.get("id") | |
| 122 return "" | |
| 123 | |
| 124 | |
| 125 def combine_alternative_sequences(df: pd.DataFrame) -> pd.DataFrame: | |
| 126 if "mutatedType" not in df.columns: | |
| 127 return df | |
| 128 return df.groupby(["begin", "end", "variant_id"], dropna=False, as_index=False).agg( | |
| 129 { | |
| 130 col: ( | |
| 131 (lambda x: ",".join(x.astype(str).unique())) | |
| 132 if col == "mutatedType" | |
| 133 else "first" | |
| 134 ) | |
| 135 for col in df.columns | |
| 136 } | |
| 137 ) | |
| 138 | |
| 139 | |
| 140 def get_populations(regions: list[str]) -> set[str]: | |
| 141 """Generate subgroups for larger groups. | |
| 142 | |
| 143 Args: | |
| 144 groups (list[str]): List of main groups to include. | |
| 145 | |
| 146 Returns: | |
| 147 list[str]: List of all subgroups in the main group. | |
| 148 """ | |
| 149 mapping: dict[str, set[str]] = { | |
| 150 "Africa": set(["African"]), | |
| 151 "North America": set( | |
| 152 [ | |
| 153 "American", | |
| 154 "African American", | |
| 155 "Mexican", | |
| 156 "Cuban", | |
| 157 "European American", | |
| 158 "NativeAmerican", | |
| 159 "NativeHawaiian", | |
| 160 ] | |
| 161 ), | |
| 162 "Asia": set( | |
| 163 [ | |
| 164 "Asian", | |
| 165 "East Asian", | |
| 166 "Central Asia", | |
| 167 "JAPANESE", | |
| 168 "KOREAN", | |
| 169 "South Asian", | |
| 170 "SouthAsian", | |
| 171 ] | |
| 172 ), | |
| 173 "Europe": set( | |
| 174 [ | |
| 175 "Europe", | |
| 176 "European", | |
| 177 "Finnish from FINRISK project", | |
| 178 "Spanish controls", | |
| 179 "TWIN COHORT", | |
| 180 ] | |
| 181 ), | |
| 182 "Global": set(["Global", "Total"]), | |
| 183 "South America": set( | |
| 184 [ | |
| 185 "Latin American 1", | |
| 186 "Latin American 2", | |
| 187 "Dominican", | |
| 188 "PuertoRican", | |
| 189 "SouthAmerican", | |
| 190 ] | |
| 191 ), | |
| 192 "Middle East": set(["Middle Eastern", "Near_East"]), | |
| 193 "Other": set(["Other"]), | |
| 194 } | |
| 195 | |
| 196 return set.union(*[mapping.get(group, set()) for group in regions]) | |
| 197 | |
| 198 | |
| 199 def main(): | |
| 200 parser = argparse.ArgumentParser(description="Protein Variance CLI Application") | |
| 201 parser.add_argument( | |
| 202 "-a", "--accession", type=str, required=True, help="Protein accession number." | |
| 203 ) | |
| 204 parser.add_argument( | |
| 205 "-p", | |
| 206 "--populations", | |
| 207 type=str, | |
| 208 required=True, | |
| 209 help="Comma-separated list of populations.", | |
| 210 ) | |
| 211 parser.add_argument( | |
| 212 "-f", | |
| 213 "--output-format", | |
| 214 type=str, | |
| 215 choices=["xlsx", "tabular", "csv"], | |
| 216 default="tabular", | |
| 217 help="Output format: xlsx, tabular (tsv), or csv. Default is tabular.", | |
| 218 ) | |
| 219 parser.add_argument( | |
| 220 "-o", | |
| 221 "--output-file", | |
| 222 type=str, | |
| 223 default="protein_variation.tsv", | |
| 224 help="Output file name.", | |
| 225 ) | |
| 226 | |
| 227 args = parser.parse_args() | |
| 228 | |
| 229 populations = get_populations(args.populations.split(",")) | |
| 230 | |
| 231 _, features_df = get_protein_variation(args.accession) | |
| 232 | |
| 233 features_df["variant_id"] = features_df.apply(get_variant_id, axis=1) | |
| 234 features_df["variation_position"] = features_df.apply(get_position, axis=1) | |
| 235 features_df = combine_alternative_sequences(features_df) | |
| 236 | |
| 237 results = list( | |
| 238 zip( | |
| 239 *[ | |
| 240 parse_frequency_reponse(get_frequency(variant)) | |
| 241 for variant in features_df["variant_id"] | |
| 242 ] | |
| 243 ) | |
| 244 ) | |
| 245 | |
| 246 metadata_df = pd.DataFrame(results[0]) | |
| 247 frequencies_df = pd.concat(results[1]) | |
| 248 | |
| 249 merged = pd.concat([features_df, metadata_df], axis=1) | |
| 250 final_merged = pd.merge(merged, frequencies_df, on=VARIANT_INDEX, how="outer") | |
| 251 | |
| 252 final_merged[["Ref Allele NA", "Ref Allele Prob"]] = final_merged[ | |
| 253 "Ref Allele" | |
| 254 ].str.split("=", n=1, expand=True) | |
| 255 | |
| 256 alt_alleles = final_merged["Alt Allele"].str.split(",", expand=True) | |
| 257 if alt_alleles.columns.size == 2: | |
| 258 final_merged[["Alt Allele 1", "Alt Allele 2"]] = final_merged[ | |
| 259 "Alt Allele" | |
| 260 ].str.split(",", expand=True) | |
| 261 final_merged[["Alt Allele NA 1", "Alt Allele Prob 1"]] = final_merged[ | |
| 262 "Alt Allele 1" | |
| 263 ].str.split("=", n=1, expand=True) | |
| 264 final_merged[["Alt Allele NA 2", "Alt Allele Prob 2"]] = final_merged[ | |
| 265 "Alt Allele 2" | |
| 266 ].str.split("=", n=1, expand=True) | |
| 267 else: | |
| 268 final_merged[["Alt Allele NA 1", "Alt Allele Prob 1"]] = final_merged[ | |
| 269 "Alt Allele" | |
| 270 ].str.split("=", n=1, expand=True) | |
| 271 final_merged[["Alt Allele NA 2", "Alt Allele Prob 2"]] = None | |
| 272 | |
| 273 subset_cols: list[str] = [ | |
| 274 "variation_position", | |
| 275 "consequenceType", | |
| 276 "wildType", | |
| 277 "mutatedType", | |
| 278 "variant_id", | |
| 279 "Alleles", | |
| 280 "Study", | |
| 281 "Population", | |
| 282 "Group", | |
| 283 "Samplesize", | |
| 284 "Ref Allele", | |
| 285 "Alt Allele", | |
| 286 "BioProject ID", | |
| 287 "BioSample ID", | |
| 288 "somaticStatus", | |
| 289 "Ref Allele NA", | |
| 290 "Ref Allele Prob", | |
| 291 "Alt Allele NA 1", | |
| 292 "Alt Allele Prob 1", | |
| 293 "Alt Allele NA 2", | |
| 294 "Alt Allele Prob 2", | |
| 295 ] | |
| 296 | |
| 297 subset = final_merged[subset_cols] | |
| 298 subset = subset[subset["Population"].isin(populations)] | |
| 299 | |
| 300 if args.output_format == "xlsx": | |
| 301 outdir = os.path.dirname(args.output_file) | |
| 302 outpath = os.path.join(outdir, "results.xlsx") | |
| 303 subset.to_excel(outpath, index=False, na_rep="NA", engine="openpyxl") | |
| 304 os.rename(outpath, args.output_file) | |
| 305 elif args.output_format == "csv": | |
| 306 subset.to_csv(args.output_file, index=False, na_rep="NA") | |
| 307 else: # tabular/tsv | |
| 308 subset.to_csv(args.output_file, index=False, sep="\t", na_rep="NA") | |
| 309 | |
| 310 | |
| 311 if __name__ == "__main__": | |
| 312 main() |
