Mercurial > repos > workflow4metabolomics > kmd_hmdb_data_plot
comparison kmd_hmdb_interrogator.py @ 0:59c8bad5f6bc draft default tip
planemo upload for repository https://github.com/workflow4metabolomics/tools-metabolomics/blob/master/tools/kmd_hmdb_data_plot/ commit 7fa454b6a4268b89fe18043e8dd10f30a7b4c7ca
| author | workflow4metabolomics |
|---|---|
| date | Tue, 29 Aug 2023 09:45:16 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:59c8bad5f6bc |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 import csv | |
| 4 import operator | |
| 5 | |
| 6 import click | |
| 7 | |
| 8 import kmd_hmdb_api_client.client | |
| 9 from kmd_hmdb_api_client.api.default import ( | |
| 10 api_annotation_get, | |
| 11 api_compound_find, | |
| 12 api_taxonomy_get, | |
| 13 ) | |
| 14 | |
| 15 __version__ = "1.0.0" | |
| 16 | |
| 17 | |
| 18 kmd_hmdb_client = kmd_hmdb_api_client.client.Client( | |
| 19 "https://kmd-hmdb-rest-api.metabolomics-chopin.e-metabohub.fr", | |
| 20 verify_ssl=False, | |
| 21 timeout=500, | |
| 22 ) | |
| 23 | |
| 24 find_compound = ( | |
| 25 lambda *args, **kwargs: | |
| 26 api_compound_find.sync(*args, **kwargs, client=kmd_hmdb_client) | |
| 27 ) | |
| 28 get_taxonomy = ( | |
| 29 lambda *args, **kwargs: | |
| 30 api_taxonomy_get.sync(*args, **kwargs, client=kmd_hmdb_client) | |
| 31 ) | |
| 32 get_annotation = ( | |
| 33 lambda *args, **kwargs: | |
| 34 api_annotation_get.sync(*args, **kwargs, client=kmd_hmdb_client) | |
| 35 ) | |
| 36 | |
| 37 positive_adducts = [ | |
| 38 "M+H", | |
| 39 "M+2H", | |
| 40 "M+H+NH4", | |
| 41 "M+H+Na", | |
| 42 "M+H+K", | |
| 43 "M+ACN+2H", | |
| 44 "M+2Na", | |
| 45 "M+H-2H2O", | |
| 46 "M+H-H2O", | |
| 47 "M+NH4", | |
| 48 "M+Na", | |
| 49 "M+CH3OH+H", | |
| 50 "M+K", | |
| 51 "M+ACN+H", | |
| 52 "M+2Na-H", | |
| 53 "M+IsoProp+H", | |
| 54 "M+ACN+Na", | |
| 55 "M+2K+H", | |
| 56 "M+DMSO+H", | |
| 57 "M+2ACN+H", | |
| 58 "2M+H", | |
| 59 "2M+NH4", | |
| 60 "2M+Na", | |
| 61 "2M+K", | |
| 62 ] | |
| 63 | |
| 64 negative_adducts = [ | |
| 65 "M-H", | |
| 66 "M-2H", | |
| 67 "M-H2O-H", | |
| 68 "M+Cl", | |
| 69 "M+FA-H", | |
| 70 "M+Hac-H", | |
| 71 "M-H+HCOONa", | |
| 72 "M+Br", | |
| 73 "M+TFA-H", | |
| 74 "2M-H", | |
| 75 "2M+FA-H", | |
| 76 "2M+Hac-H", | |
| 77 ] | |
| 78 | |
| 79 adduct_choices = positive_adducts + negative_adducts | |
| 80 | |
| 81 taxonomy_column_choices = [ | |
| 82 "class", | |
| 83 "kingdom", | |
| 84 "molecular_framework", | |
| 85 "sub_class", | |
| 86 "super_class", | |
| 87 "id", | |
| 88 ] | |
| 89 | |
| 90 annotation_column_choices = [ | |
| 91 "adduct", | |
| 92 "kendricks_mass", | |
| 93 "kendricks_mass_defect", | |
| 94 "monisotopic_molecular_weight", | |
| 95 "nominal_mass", | |
| 96 "polarity", | |
| 97 "annotation_id", | |
| 98 ] | |
| 99 | |
| 100 compound_column_choices = [ | |
| 101 | |
| 102 "database", | |
| 103 "metabolite_name", | |
| 104 "chemical_formula", | |
| 105 "hmdb_id", | |
| 106 "inchikey", | |
| 107 "compound_id", | |
| 108 ] + annotation_column_choices | |
| 109 | |
| 110 | |
| 111 @click.group() | |
| 112 def cli(): | |
| 113 pass | |
| 114 | |
| 115 | |
| 116 @cli.command(help="") | |
| 117 @click.option( | |
| 118 "--version", | |
| 119 is_flag=True, | |
| 120 ) | |
| 121 @click.option( | |
| 122 "--mz-ratio", | |
| 123 default=[303.05], | |
| 124 show_default=True, | |
| 125 multiple=True, | |
| 126 help="Provide the mz-ratio." | |
| 127 ) | |
| 128 @click.option( | |
| 129 "--database", | |
| 130 default=["farid"], | |
| 131 show_default=True, | |
| 132 multiple=True, | |
| 133 help="Provide the database." | |
| 134 ) | |
| 135 @click.option( | |
| 136 "--mass-tolerance", | |
| 137 default=10.5, | |
| 138 show_default=True, | |
| 139 help="Provide the mass-tolerance." | |
| 140 ) | |
| 141 @click.option( | |
| 142 "--adducts", | |
| 143 default=["M+H"], | |
| 144 type=click.Choice(adduct_choices), | |
| 145 multiple=True, | |
| 146 show_default=True, | |
| 147 show_choices=False, | |
| 148 help="Provide the adducts." | |
| 149 ) | |
| 150 @click.option( | |
| 151 "--columns", | |
| 152 default=compound_column_choices[:], | |
| 153 type=click.Choice(compound_column_choices), | |
| 154 multiple=True, | |
| 155 show_default=True, | |
| 156 show_choices=False, | |
| 157 help="Provide the outputed columns." | |
| 158 ) | |
| 159 @click.option( | |
| 160 "--output-path", | |
| 161 help="Provide the output path." | |
| 162 ) | |
| 163 def compound(*args, **kwargs): | |
| 164 | |
| 165 if kwargs.pop("version"): | |
| 166 print(__version__) | |
| 167 exit(0) | |
| 168 | |
| 169 adducts = kwargs.pop("adducts") | |
| 170 polarity = get_polarity(adducts) | |
| 171 | |
| 172 other_kwargs, compound_kwargs = build_kwargs( | |
| 173 adducts=adducts, | |
| 174 polarity=polarity, | |
| 175 **kwargs | |
| 176 ) | |
| 177 columns = other_kwargs["columns"] | |
| 178 result = find_compound(**compound_kwargs) | |
| 179 result = explode_compounds( | |
| 180 result, | |
| 181 with_annotations=any(map( | |
| 182 columns.__contains__, | |
| 183 annotation_column_choices | |
| 184 )) | |
| 185 ) | |
| 186 check_columns_in_result(result, columns) | |
| 187 output_csv_result( | |
| 188 result, | |
| 189 columns, | |
| 190 other_kwargs.get("output_path"), | |
| 191 delimiter="\t", | |
| 192 ) | |
| 193 | |
| 194 | |
| 195 def explode_compounds(result, with_annotations): | |
| 196 if with_annotations: | |
| 197 return [{ | |
| 198 "database": cpd.database, | |
| 199 "metabolite_name": cpd.metabolite_name, | |
| 200 "chemical_formula": cpd.chemical_formula, | |
| 201 "hmdb_id": cpd.hmdb_id, | |
| 202 "inchikey": cpd.inchikey, | |
| 203 "compound_id": cpd.id, | |
| 204 "adduct": annotation.name, | |
| 205 "kendricks_mass": annotation.kendricks_mass, | |
| 206 "kendricks_mass_defect": annotation.kendricks_mass_defect, | |
| 207 "monisotopic_molecular_weight": | |
| 208 annotation.monisotopic_molecular_weight, | |
| 209 "nominal_mass": annotation.nominal_mass, | |
| 210 "polarity": annotation.polarity, | |
| 211 "annotation_id": annotation.id, | |
| 212 } | |
| 213 for cpd in result | |
| 214 for annotation in cpd.annotations | |
| 215 ] | |
| 216 else: | |
| 217 return [{ | |
| 218 "database": cpd.database, | |
| 219 "metabolite_name": cpd.metabolite_name, | |
| 220 "chemical_formula": cpd.chemical_formula, | |
| 221 "hmdb_id": cpd.hmdb_id, | |
| 222 "inchikey": cpd.inchikey, | |
| 223 "compound_id": cpd.id, | |
| 224 } | |
| 225 for cpd in result | |
| 226 ] | |
| 227 | |
| 228 | |
| 229 @cli.command(help="") | |
| 230 @click.option( | |
| 231 "--id", | |
| 232 type=int, | |
| 233 help="Provide the wanted annotation's id." | |
| 234 ) | |
| 235 @click.option( | |
| 236 "--columns", | |
| 237 default=annotation_column_choices[:], | |
| 238 type=click.Choice(annotation_column_choices), | |
| 239 multiple=True, | |
| 240 show_default=True, | |
| 241 show_choices=False, | |
| 242 help="Provide the outputed columns." | |
| 243 ) | |
| 244 @click.option( | |
| 245 "--output-path", | |
| 246 help="Provide the output path." | |
| 247 ) | |
| 248 def annotation(*args, **kwargs): | |
| 249 result = get_annotation(id=kwargs.pop("id")) | |
| 250 result = [result] | |
| 251 columns = kwargs["columns"] | |
| 252 check_columns_in_result(result, columns) | |
| 253 output_csv_result( | |
| 254 result, | |
| 255 columns, | |
| 256 kwargs.get("output_path") | |
| 257 ) | |
| 258 | |
| 259 | |
| 260 def get_polarity(adducts): | |
| 261 if any(map(positive_adducts.__contains__, adducts)): | |
| 262 return "positive" | |
| 263 if any(map(negative_adducts.__contains__, adducts)): | |
| 264 return "negative" | |
| 265 # polarity = [] | |
| 266 # if any(map(positive_adducts.__contains__, adducts)): | |
| 267 # polarity.append("positive") | |
| 268 # if any(map(negative_adducts.__contains__, adducts)): | |
| 269 # polarity.append("negative") | |
| 270 | |
| 271 | |
| 272 def build_kwargs(**kwargs): | |
| 273 for original, replacement in ( | |
| 274 ("database", "database_list"), | |
| 275 ("polarity", "polarity_list"), | |
| 276 ): | |
| 277 if original in kwargs: | |
| 278 kwargs[replacement] = kwargs.pop(original) | |
| 279 other_kwargs = { | |
| 280 other_arg: kwargs.pop(other_arg) | |
| 281 for other_arg in ("columns", "output_path", "with_annotations") | |
| 282 if other_arg in kwargs | |
| 283 } | |
| 284 return other_kwargs, kwargs | |
| 285 | |
| 286 | |
| 287 def check_columns_in_result(result, columns): | |
| 288 if not result: | |
| 289 return | |
| 290 if not isinstance(result[0], dict): | |
| 291 result = [item.to_dict() for item in result] | |
| 292 keys = result[0].keys() | |
| 293 missing = [ | |
| 294 column for column in columns | |
| 295 if column not in keys | |
| 296 ] | |
| 297 if missing: | |
| 298 if len(missing) == 1: | |
| 299 raise ValueError( | |
| 300 f"Could not find the column {missing[0]} in the results." | |
| 301 ) | |
| 302 else: | |
| 303 raise ValueError( | |
| 304 "Could not find any of the columns " | |
| 305 + ','.join(missing) | |
| 306 + " in the results." | |
| 307 ) | |
| 308 | |
| 309 | |
| 310 def output_csv_result(result, columns, output_path, **csv_parameters): | |
| 311 if not output_path: | |
| 312 raise ValueError("Missing output path. Cannot output CSV results.") | |
| 313 with open(output_path, mode="w", newline='') as output_file: | |
| 314 writer = csv.writer(output_file, **csv_parameters) | |
| 315 write_result(result, columns, writer) | |
| 316 | |
| 317 | |
| 318 def write_result(result, columns, writer): | |
| 319 getters = list(map(operator.itemgetter, columns)) | |
| 320 writer.writerow(columns) | |
| 321 writer.writerows( | |
| 322 (getter(compound) for getter in getters) | |
| 323 for compound in result | |
| 324 ) | |
| 325 | |
| 326 | |
| 327 if __name__ == "__main__": | |
| 328 cli() |
