Mercurial > repos > lain > ms_to_peakforest_it
view server.py @ 1:7e3085fc60c1 draft default tip
master branch Updating
author | lain |
---|---|
date | Wed, 30 Aug 2023 14:21:18 +0000 |
parents | b58b229c4cbf |
children |
line wrap: on
line source
#!/usr/bin/env python3 import atexit import csv import http.server import json import logging import os import re import shutil import socketserver import sys import tempfile import yaml TAB_LIST_PLACEHOLDER = "TAB_LIST_PLACEHOLDER" MS_PEAK_VALUES_PLACEHOLDER = "MS_PEAK_VALUES_PLACEHOLDER" COMPOUND_NAME_PLACEHOLDER = "COMPOUND_NAME_PLACEHOLDER" TAB_INDEX_PLACEHOLDER = "TAB_INDEX_PLACEHOLDER" EMBED_JS_PLACEHOLDER = "EMBED_JS" ACTIVE_TAB_PLACEHOLDER = "ACTIVE_TAB_PLACEHOLDER" ADD_SPECTRUM_FORM = "ADD_SPECTRUM_FORM" PRODUCE_JSON_PLACEHOLDER = "PRODUCE_JSON_PLACEHOLDER" COMPOUND_REF = "compound-ref" COMPOUND_MIX = "compound-mix" END_MS_PEAK_VALUES_PLACEHOLDER = " ]" MS_DATA_COLUMN_NUMBER = 9 DEFAULT_MS_PEAK_VALUES = ( "[\n" + (" [" + ','.join([' ""'] * MS_DATA_COLUMN_NUMBER) + "],\n") * 17 + END_MS_PEAK_VALUES_PLACEHOLDER ) FRAGNOT_HEADER = { "m/z": "fragment_mz", "absolute_intensity": "abs_intensity", "relative_intensity": "rel_intensity", "theo_mass": "", "delta_ppm": "ppm", "rdbequiv": "", "composition": "fragment", "attribution": "composition", } MS_2_SNOOP_HEADER = { "name": str, "inchikey": str, # "composition": str, "composition": lambda *args:"", "fragment": str, "fragment_mz": str, "ppm": str, "fileid": str, "correlation": str, "abs_intensity": lambda x:float(x), # * 100, "rel_intensity": lambda x:float(x) * 100 if x != "" else "", "valid_corelation": str } class ConfigException(ValueError): """ An exception raised when something went wrong in the config and we cannot continue - i.e: when there's no token for peakforest """ class YAMLConfig(dict): """ Dictionary that handles key with dot in them: test["truc.chose"] is equivalant to test["truc"]["chose"] Assignation works too. Add the possibility to use placeholders: --- yaml test: {{ truc.chose }} truc: chose: bidule --- here, test's value is "bidule" """ def __init__(self, *args, **kwargs): meta_conf = kwargs.pop("__meta_config__", {}) self._debug = meta_conf.get("__debug__", False) self._stream_name = meta_conf.get("__debug_stream__", "stdout") self._debug_stream = getattr(sys, self._stream_name) self._only_root_debug = meta_conf.get("__only_root_debug__", False) if "__root__" in kwargs: if self._only_root_debug: self._debug = False self._name = kwargs.pop("__name__") self._debugger("Is not root config.") self._root = kwargs.pop("__root__") else: self._name = "root" self._debugger("Is root config.") self._root = self super().__init__(*args, **kwargs) for key, value in self.copy().items(): if isinstance(value, dict) and not isinstance(value, YAMLConfig): self._debugger(f"Parsing sub-config for {key}") self[key] = self._propagate(value, key) self._replace_placeholders(self) self._extract_defaults() def _propagate(self, sub_dict, name): if isinstance(sub_dict, dict) and not isinstance(sub_dict, self.__class__): return YAMLConfig( **sub_dict, __name__=name, __root__=self._root, __meta_config__={ "__debug__": self._debug, "__debug_stream__": self._stream_name, "__only_root_debug__": self._only_root_debug, } ) return sub_dict def _debugger(self, message): if self._debug: self._debug_stream.write(f"[{self._name}]: {message}\n") self._debug_stream.flush() def __getattr__(self, attr): if attr in self: return self[attr] if '.' in attr: attr, sub = attr.split('.', 1) return getattr(getattr(self, attr), sub) return super().__getattribute__(attr) def _replace_placeholders(self, subpart): self._debugger("Replacing placeholders...") for sub_key, sub_item in subpart.copy().items(): if isinstance(sub_item, str): for placeholder in re.findall("{{ (?P<placeholder>.*?) }}", sub_item): if placeholder not in self._root: self._debugger(f"Could not fine replacement for {placeholder}") continue replacement = self._root[placeholder] if isinstance(replacement, str): self._debugger(f"Found placeholder: {placeholder} -> {replacement}") sub_item = sub_item.replace( "{{ " + placeholder + " }}", replacement ) else: self._debugger(f"Found placeholder: {placeholder} -> {replacement.__class__.__name__}") sub_item = self._propagate(replacement, placeholder) dict.__setitem__(subpart, sub_key, sub_item) elif isinstance(sub_item, dict): super().__setitem__(sub_key, self._propagate(sub_item, sub_key)) def _extract_defaults(self): if self._root is not self: return if "defaults" not in self: self._debugger("No defaults here.") return if "arguments" not in self: self._debugger("Arguments creation...") self["arguments"] = self._propagate({}, "arguments") self._debugger("Populating arguments with defaults values") for key, value in self.defaults.items(): if key not in self: if isinstance(value, dict): value = self._propagate(value, key) self.arguments[key] = value self._debugger(f"Default {key} = {value}") def __setitem__(self, key, value): if isinstance(value, dict): value = self._propagate(value, key) if "." not in key: return super().__setitem__(key, value) curent = self key, subkey = key.rsplit(".", 1) self[key][subkey] = value def __getitem__(self, key): if super().__contains__(key): return super().__getitem__(key) if "." not in key: return super().__getitem__(key) curent = self while "." in key: key, subkey = key.split(".", 1) curent = curent[key] key = subkey if subkey not in curent: curent[subkey] = self._propagate({}, subkey) result = curent[subkey] return result def __contains__(self, key): if "." not in key: return super().__contains__(key) key, subkey = key.split(".", 1) if not super().__contains__(key): return False return subkey in self[key] def copy(self): return { key: ( value if not isinstance(value, dict) else value.copy() ) for key, value in self.items() } class YAMLParameters(YAMLConfig): """ Parses parameters from the command line and put them in the config. Uses the config to know which parameter is recognized, or not, to know the metadata (author, version), which command is a flag, is optional, the help strings, etc... Assigns default small parameter if not defined in the "shortcut" section of the config file. CLI config must be in the root section "parameters": --- parameters: mandatory: input: input file path flags: help: Show this help optional: method: "default is {{ defaults.method }}" meta: author: Lain Pavot version: 1.2.0 shortcuts: help: h ## will autogenerate -i for input and -m for method --- default parameters are searched in the "default" root section. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._errors = list() if not self.parameters.shortcuts: self.parameters["shortcuts"] = YAMLConfig() self._mandatory = self.parameters.mandatory self._optional = self.parameters.optional self._flags = { flag: False for flag in self.parameters.flags } self._all_params = self._optional.copy() self._all_params.update(self._mandatory) self._all_params.update(self._flags) self._small_params = dict() self._determine_small_params() @property def in_error(self): return bool(self._errors) @property def sorted_keys(self): return sorted(self._all_params.keys()) @property def sorted_items(self): return sorted(self._all_params.items()) def _determine_small_params(self, verbose=False): self._small_params = (self.parameters.shortcuts or {}).copy() chars = list(map(chr, range(97, 123))) + list(map(chr, range(65, 91))) all_params = self._all_params.copy() for long, short in self._small_params.items(): chars.remove(short) del all_params[long] for param in all_params.copy().keys(): for operation in ( lambda x:x[0], ## select first char lambda x:x.split('-', 1)[-1][0], ## first char after - lambda x:x.split('_', 1)[-1][0], ## first char after _ lambda x:x.split('.', 1)[-1][0], ## first char after . lambda x:x[0].upper(), ## select first char lambda x:x.split('-', 1)[-1][0].upper(), ## first char after - lambda x:x.split('_', 1)[-1][0].upper(), ## first char after _ lambda x:x.split('.', 1)[-1][0].upper(), ## first char after . lambda x: chars[0], ## first letter in the alphabet ): char = operation(param) if char not in self._small_params.values(): self._small_params[param] = char chars.remove(char) del all_params[param] break def _get_parameter_index(self, parameter, original): if f"--{parameter}" in sys.argv: return sys.argv.index(f"--{parameter}") parameter = self._small_params[original] if f"-{parameter}" in sys.argv: return sys.argv.index(f"-{parameter}") return None def as_parameter(self, string): return ( string .replace('.', '-') .replace('_', '-') ) def show_version(self): print(self.parameters.meta.version) def show_help(self): parameters = [ f"-{self._small_params[arg]}|--{self.as_parameter(arg)} {arg}" for arg in self._mandatory ] + [ f"[-{self._small_params[arg]}|--{self.as_parameter(arg)} {arg}]" for arg in self._optional ] + [ f"[-{self._small_params[arg]}|--{self.as_parameter(arg)}]" for arg in self._flags ] print( f"Usage: {__file__} " + ' '.join(parameters) + "\n\n" + '\n'.join( f" -{self._small_params[args]}|--{self.as_parameter(args)}: {help_str}" for args, help_str in self.sorted_items ) + "\n\n" + '\n'.join( f"{key}: {value}" for key, value in self.parameters.meta.items() ) ) sys.exit(0) def parse_args(self): errors = list() for kind in ("mandatory", "optional", "flags"): keys = list(sorted(getattr(self, f"_{kind}").keys())) for original_param, actual_param in zip( keys, map(self.as_parameter, keys), ): if original_param in self.defaults: self.arguments[original_param] = self.defaults[original_param] elif kind == "flags": self.arguments[original_param] = False parser = getattr(self, f"parse_{kind}") if (error := parser(original_param, actual_param)): errors.append(error) self._errors = errors return self def parse_mandatory(self, original, actual): if (index := self._get_parameter_index(actual, original)) is None: return f"The parameter --{actual} is mandatory." if index == len(sys.argv) - 1: return f"The parameter --{actual} needs a value." self.arguments[original] = sys.argv[index + 1] def parse_optional(self, original, actual): if (index := self._get_parameter_index(actual, original)) is None: return if index == len(sys.argv) - 1: return f"The parameter --{actual} needs a value." self.arguments[original] = sys.argv[index + 1] def parse_flags(self, original, actual): if (index := self._get_parameter_index(actual, original)) is None: return self.arguments[original] = True def parse_config(**kwargs): """ opens the config file, extract it using pyyaml's safe loader and tries to extract and apply a maximum of informations/directives from the config: - token retrieval - workdir management - tempfile management """ root_dir = os.path.dirname(os.path.abspath(__file__)) with open(os.path.join(root_dir, "config.yml")) as config_file: config = YAMLConfig( **yaml.load(config_file.read(), Loader=yaml.SafeLoader), **kwargs ) if not config.token.value: if config.token.use_file: if (not os.path.exists(path := config.token.file_path)): raise ConfigException("Missing token value or token file.") with open(path) as token_file: config.token["value"] = token_file.read() elif config.defaults.peakforest.token: config.token["value"] = config.defaults.peakforest.token if config.workdir.create_tmp: tmp_dir = tempfile.mkdtemp() atexit.register(lambda:shutil.rmtree(tmp_dir)) else: tmp_dir = tempfile.gettempdir() config.workdir["tmp_dir"] = tmp_dir config["root_dir"] = root_dir config["tab_list"] = [] config["form_template"] = os.path.join(root_dir, config.templates.form) config["meta_template"] = os.path.join(root_dir, config.templates.main) config["js_template"] = os.path.join(root_dir, config.templates.js) config["tab_list_template"] = os.path.join(root_dir, config.templates.tab_list) config["placeholders"] = dict() config.placeholders[MS_PEAK_VALUES_PLACEHOLDER] = DEFAULT_MS_PEAK_VALUES config.placeholders[TAB_INDEX_PLACEHOLDER] = "1" config.placeholders[ACTIVE_TAB_PLACEHOLDER] = "active" config.placeholders[ADD_SPECTRUM_FORM] = "" config.placeholders[EMBED_JS_PLACEHOLDER] = "" config.placeholders[TAB_LIST_PLACEHOLDER] = "" # config.placeholders["DEFAULT_MIN_MZ"] = "50" # config.placeholders["DEFAULT_MAX_MZ"] = "500" config.placeholders["DEFAULT_RESOLUTION_LOW"] = "" config.placeholders["DEFAULT_RESOLUTION_HIGH"] = "selected=\"selected\"" config.placeholders["DEFAULT_RESOLUTION_UNSET"] = "" return config def parse_parameters(config): """ parses command line and checks provided values are acceptable/usable. Raises some error if not. """ parameters = YAMLParameters(**config) parameters.parse_args() parameters["json_result"] = [] get_logger(parameters) arguments = parameters.arguments if arguments.help: parameters.show_help() sys.exit(0) if arguments.version: parameters.show_version() sys.exit(0) if parameters.in_error: raise ValueError( "Some errors occured during parameters extraction: \n" + '\n'.join(parameters.errors) ) parameters.placeholders["DEFAULT_MIN_RT"] = str(arguments.rt_min) parameters.placeholders["DEFAULT_MAX_RT"] = str(arguments.rt_max) if arguments.sample_type == COMPOUND_MIX: parameters["form_template"] = os.path.join( parameters["root_dir"], parameters.templates.form_mix ) parameters["meta_template"] = os.path.join( parameters["root_dir"], parameters.templates.main_mix ) else: # elif arguments.sample_type == COMPOUND_REF: parameters["form_template"] = os.path.join( parameters["root_dir"], parameters.templates.form_ref ) parameters["meta_template"] = os.path.join( parameters["root_dir"], parameters.templates.main_ref ) arguments["produce_json"] = ( "output_json" in arguments and arguments["output_json"] != "" ) if arguments.produce_json: parameters.placeholders[PRODUCE_JSON_PLACEHOLDER] = "true" parameters.json_result = [] arguments["output_json"] = os.path.abspath(arguments["output_json"]) atexit.register(save_json, parameters) else: parameters.placeholders[PRODUCE_JSON_PLACEHOLDER] = "false" if arguments.run_dry_html: arguments["do_run_dry"] = True parameters.generated["html"] = os.path.abspath(arguments.run_dry_html) if arguments.run_dry_js: arguments["do_run_dry"] = True parameters.generated["js"] = os.path.abspath(arguments.run_dry_js) if arguments.do_run_dry: parameters.logger.info("Dry run. Server will ne be run.") if arguments.run_dry_html: parameters.logger.info(f"HTML file will be put in {arguments.run_dry_html}") if arguments.run_dry_js: parameters.logger.info(f"JS file will be put in {arguments.run_dry_js}") if arguments.peakforest.token: config.token["value"] = arguments.peakforest.token if not config.token.value: raise ConfigException( "No token provided. We will not be able to connect to peakforest." ) if os.path.exists(arguments.input): single_file = True file_paths = [arguments.input] else: path_list = arguments.input.split(',') if all(map(os.path.exists, path_list)): single_file = False file_paths = path_list else: raise ValueError( f"Some files cannot be found: " + ', '.join( path for path in path_list if not os.path.exists(path) ) ) arguments["input"] = list(map(os.path.abspath, file_paths)) if single_file: arguments["name"] = [arguments.name] arguments["raw_metadata"] = [arguments.raw_metadata] parameters.logger.info(f"Single file processing: {arguments.input}") else: parameters.logger.info(f"Multiple file processing:") arguments["raw_metadata"] = arguments.raw_metadata.split( arguments.raw_metadata_sep ) if not arguments.name: arguments["name"] = arguments["raw_metadata"] else: arguments["name"] = arguments.name.split(',') for i in range(len(arguments.name)): parameters.logger.info(f" - file: {arguments.input[i]}") parameters.logger.info(f" - name: {arguments.name[i]}") parameters.logger.info(f" - metadata: {arguments.raw_metadata[i]}") parameters.logger.info(f" ") if ( len(arguments.name) != len(arguments.raw_metadata) or len(arguments.name) != len(arguments.input) ): raise ValueError( "name, raw_metadata and input parameters have different lengths: \n" f"input is {len(arguments.input)} elements long, " f"raw_metadata is {len(arguments.raw_metadata)} elements long " f"and name is {len(arguments.name)} elements long." ) if arguments.spectrum_type == "LC_MS": arguments["scan_type"] = "ms" elif arguments.spectrum_type == "LC_MSMS": arguments["scan_type"] = "ms2" if arguments.method == "test": if arguments.spectrum_type == "LC_MS": arguments["method"] = "cf_pfem_urine_qtof" else: arguments["method"] = "cf_pfem_urine_method1_qtof-msms" arguments["method"] = "toulouse-metatoul-agromix__ft-esi__msms" if arguments["sample_type"] == COMPOUND_MIX: check_mix_compound_files(parameters) more_info_in_logs(parameters) return parameters def check_mix_compound_files(parameters): arguments = parameters.arguments try: numbarz = [ list(map(int, os.path.basename(metadata).split("_", 1)[0].split("-"))) for metadata in arguments.raw_metadata ] except ValueError: ## file does not start with `[0-9]+-[0-9]+_.*`: probably ## a ms2snoop file. return # parameters.logger.error( # "Metadata/file names does not start with `[0-9]+-[0-9]+_.*` . " # "This is necessary in the case of compounds mix." # ) # sys.exit(-1) runs, samples = zip(*numbarz) if not all(runs[0] == i for i in runs[1:]): parameters.logger.error( "Run numbers in metadata/file names are not identical. " "You mixed some files." ) sys.exit(-1) length = len(samples) if list(sorted(samples)) != list(range(1, length+1)): if not all(samples.count(i) == 1 for i in samples): parameters.logger.error("Some samples are duplicated. ") else: parameters.logger.error("Some samples files are missing. ") sys.exit(-1) def more_info_in_logs(config): arguments = config.arguments if arguments.embed_js: config.logger.info(f"JS will be embed in HTML page to form a HTML bundle.") else: config.logger.info(f"JS are separated files, needed to be served.") config.logger.info(f"Choosen parameters:") config.logger.info(f" - method: {arguments.method}") config.logger.info(f" - peakforest instance: {arguments.peakforest.url}") config.logger.info(f" - polarity instance: {arguments.polarity}") config.logger.info(f" - spectrum type: {arguments.spectrum_type}") config.logger.info(f" - scan type: {arguments.scan_type}") config.logger.info(f" - produce JSON: {arguments.produce_json}") config.logger.info(f" - sample type: {arguments.sample_type}") def process_all_files(config): """ for each file and its metadata, read and process them, then fills the meta html template file with the whole result. """ arguments = config.arguments extra_defaults = [ process_fragnot_metadata(metadata, config) for metadata in arguments.raw_metadata ] for i, name in enumerate(arguments.name): extra_defaults[i]["name"] = name if not extra_defaults: extra_defaults = [{}] * len(arguments.input) index = 0 for input_path, extra_default in zip(arguments.input, extra_defaults): config.logger.info(f"Processing file at {input_path}...") curent_defaults = arguments.copy() curent_defaults.update(extra_default) if config.arguments.verbose: config.logger.info( "[VERBOSE] Defaults for curent file: " + ';'.join(f"{key}={value}" for key, value in curent_defaults.items()) ) tsv_content, tsv_data_extractor = read_input(input_path, config) index = process_tsv( tsv_content, tsv_data_extractor, config, defaults_data = curent_defaults, index = index+1, ) if arguments.embed_js: config.logger.info(f"Embeding JS in HTML file... ") for index in range(len(config.tab_list)): config.placeholders[EMBED_JS_PLACEHOLDER] += "<script type='text/javascript'>" with open(f"add-one-spectrum-{index+1}.js") as js_file: config.placeholders[EMBED_JS_PLACEHOLDER] += js_file.read() config.placeholders[EMBED_JS_PLACEHOLDER] += "</script>" config.placeholders[EMBED_JS_PLACEHOLDER] += "\n" config.logger.info(f" - add-one-spectrum-{index+1}.js embed.") config.placeholders[TAB_LIST_PLACEHOLDER] = "\n".join(config.tab_list) else: config.placeholders[EMBED_JS_PLACEHOLDER] += "<script type='text/javascript'>" config.placeholders[EMBED_JS_PLACEHOLDER] += "</script>" config.placeholders[EMBED_JS_PLACEHOLDER] += "\n".join( [""] + [ " "*12 + f"<script src=\"./add-one-spectrum-{index+1}.js\"></script>" for index in range(len(config.tab_list)) ] ) config.placeholders[EMBED_JS_PLACEHOLDER] += "\n" config.placeholders[TAB_LIST_PLACEHOLDER] = "\n".join(config.tab_list) fill_template("meta_template", "pf_path", config) def fill_template( template_name, output_name, config, additional_placeholders=dict() ): """ Fills a template, replaces the placeholders. Either outputs the result in a given file, or returns it if path is none. """ template_path = config[template_name] config.logger.debug(f"Filling template {template_name} at {template_path}...") with open(template_path) as template_file: template_content = template_file.read() placeholders = config.placeholders.copy() placeholders.update(additional_placeholders) for placeholder, replacement in placeholders.items(): if not placeholder.startswith(config.templates.placeholders.start): placeholder = placeholder.join(( config.templates.placeholders.start, config.templates.placeholders.stop )) template_content = template_content.replace(placeholder, replacement) if output_name is None: config.logger.debug(f"Returning template content") return template_content output_path = config[output_name] if "{{ index }}" in output_path: index_value = additional_placeholders["{{ index }}"] config.logger.debug(f"Changing index value for {index_value}") output_path = output_path.replace("{{ index }}", index_value) config.logger.debug(f"Full output path {output_path}") with open(output_path, "w") as output_file: output_file.write(template_content) def read_input(input_path, config): """ reads a tsv file and determin its processor, based on its header. """ with open(input_path) as input_file: config.logger.info(f"Reading {input_path}...") tsv_file = csv.reader(input_file, delimiter='\t') header = next(tsv_file) tsv_file = list(tsv_file) config.logger.info(f"Header is: {', '.join(header)}") if header == list(FRAGNOT_HEADER): config.logger.info(f"Fragnot recognized.") processor = fragnot_extractor return uniformize_fragnot(tsv_file, header), processor else: config.logger.info(f"MS2Snoop recognized.") processor = ms2snoop_extractor return uniformize_ms2snoop(tsv_file, header), processor def uniformize_fragnot(content, header): """ sorts fragnot data so they appear always in the same order """ return sorted(content, key=lambda x:(float(x[0]), float(x[4]))) def uniformize_ms2snoop(content, header): """ sorts ms2snoop data so they appear always in the same order """ return sorted(content, key=lambda x:(x[0], float(x[4]))) def process_fragnot_metadata(raw_metadata, config): """ Tries to extract informations from the metadata provided by fragnot files names. Heavily based on regex defined in conf file. """ regex = config.regex.copy() del regex["values"] result = {} config.logger.info(f"Extracting info from {raw_metadata}...") count = 0 for name, expression in regex.items(): if (match := re.search(expression, raw_metadata)): result[name] = match[name] count += 1 did = "+ did" else: did = "- did not" if config.arguments.verbose: config.logger.info(f" {did} match {expression}") config.logger.info(f"{count} useful informations extracted.") return result def process_tsv( tsv_content, tsv_data_extractor, config, defaults_data={}, index=1 ): """ processes one tsv file, containing one or multiple compounds. Creation of the peak table for each compound """ tsv_content = list(tsv_content) curent_name, ms_data = get_ms_data( tsv_content[0], tsv_data_extractor, defaults_data, config ) _, second_ms_data = get_ms_data( tsv_content[1], tsv_data_extractor, defaults_data, config ) ms_peak_table = [] config.logger.info(f"Processing compound {curent_name}...") for line in tsv_content: name, new_ms_data = get_ms_data(line, tsv_data_extractor, defaults_data, config) if name != curent_name: new_compound(curent_name, index, ms_data, config, ms_peak_table) curent_name = name index += 1 config.logger.info(f"Processing compound {curent_name}...") ms_peak_table = [] ms_data = new_ms_data ms_peak_table.append( ", ".join( f'"{value}"' if value not in ("na", "NA") else '""' for value in ( ms_data["fragment_mz"], ms_data["abs_intensity"], ms_data["rel_intensity"], ms_data["ppm"], ms_data["fragment"], ms_data["composition"], str(ms_data["valid_corelation"] == "TRUE").lower(), "true" if ms_data.get("correlation") == "1" else "false" ) ) ) new_compound(curent_name, index, ms_data, config, ms_peak_table) return index def get_ms_data(line, extractor, defaults, config): ms_data = defaults.copy() ms_data.update(extractor(config, *line)) return ms_data["name"], ms_data def new_compound(name, index, ms_data, config, ms_peak_table): """ aggregates informations to form the peak table, adds the compound to the tab list, creates the js file for this tab """ ignore_multiple_parent_ion(ms_peak_table) determine_min_max_mz(ms_peak_table) guess_relative_intensities(ms_peak_table) accept_all_fragments_if_all_false(ms_peak_table) config.placeholders[MS_PEAK_VALUES_PLACEHOLDER] = f"""[ {','.join('['+line+']' for line in ms_peak_table)} ]""" tab_list = fill_template( "tab_list_template", None, config, { COMPOUND_NAME_PLACEHOLDER: name, TAB_INDEX_PLACEHOLDER: str(index), }) config.tab_list.append(tab_list) create_js_file(index, ms_data, config) config.placeholders[ADD_SPECTRUM_FORM] += fill_template( "form_template", None, config, {TAB_INDEX_PLACEHOLDER: str(index)}, ) if index == 1: config.placeholders[ACTIVE_TAB_PLACEHOLDER] = "" def determine_min_max_mz(ms_peak_table): mz_list = [ float(ms_peak_table[i].split(", ")[0].strip("\"")) for i in range(len(ms_peak_table)) ] config.placeholders["DEFAULT_MIN_MZ"] = str(min(mz_list)) config.placeholders["DEFAULT_MAX_MZ"] = str(max(mz_list)) def guess_relative_intensities(ms_peak_table): if all( ms_peak_table[i].split(", ")[2].strip("\"") == "" for i in range(len(ms_peak_table)) ): absolute_intensities = [ float(ms_peak_table[i].split(", ")[1].strip("\"")) for i in range(len(ms_peak_table)) ] greatest = float(max(absolute_intensities)) relative_intensities = [ intensity / greatest * 100 for intensity in absolute_intensities ] replace_ms_table_value(ms_peak_table, 2, relative_intensities) def ignore_multiple_parent_ion(ms_peak_table): if len([ None for x in ms_peak_table if x.split(", ")[7].strip("\"") == "true" ]) > 1: ## if more than one is the precursor, then none is the precursor replace_ms_table_value(ms_peak_table, 7, "\"false\"") def accept_all_fragments_if_all_false(ms_peak_table): if all( ms_peak_table[i].split(", ")[6].strip("\"") == "false" for i in range(len(ms_peak_table)) ): replace_ms_table_value(ms_peak_table, 6, "true") def replace_ms_table_value(ms_peak_table, index, value, sep=", "): length = len(ms_peak_table) if not isinstance(value, list): if isinstance(value, str): value = [value.join('""')] * length else: value = [str(value)] * length if not isinstance(value[0], str): value = [str(x) for x in value] count = ms_peak_table[0].count(sep) endindex = count - index neg_endindex = -endindex for i in range(length): ms_peak_table[i] = sep.join(( *ms_peak_table[i].split(sep, index)[:index], value[i], *ms_peak_table[i].rsplit(sep, endindex)[neg_endindex:] )) def fragnot_extractor(config, *line): """ Fragnot processor - extracts one fragnot line of content and produces a uniformised output. """ fragnot_data = { FRAGNOT_HEADER[header]: line[i].strip() for i, header in enumerate(FRAGNOT_HEADER) } # fragnot_data["composition"] = "unknown" fragnot_data["valid_corelation"] = config.arguments.validation return fragnot_data def ms2snoop_extractor(config, *line): """ Fragnot processor - extracts one ms2snoop line of content and produces a uniformised output. """ ms2snoop_data = { header: MS_2_SNOOP_HEADER[header](line[i]) for i, header in enumerate(MS_2_SNOOP_HEADER) } return ms2snoop_data def create_js_file(index, ms_data, config): """ fills the js template file for one tab (compound) """ if (method := ms_data["method"]): method = f'"{method}"' else: method = "null" if config.arguments.verbose: config.logger.info( "[VERBOSE] " + ';'.join(f"{key}={value}" for key, value in ms_data.items()) ) fill_template( "js_template", "js_file", config, { TAB_INDEX_PLACEHOLDER: str(index), "INCHIKEY_PLACEHOLDER": ms_data["inchikey"], "DEFAULT_DATA": f"""{{ name: "{ms_data["name"]}", inchikey: "{ms_data["inchikey"]}", method: {method}, spectrum_type: "{ms_data["spectrum_type"]}", scan_type: "{ms_data["scan_type"]}", polarity: "{ms_data["polarity"]}", resolution: "{ms_data["resolution"]}", sample_type: "{ms_data["sample_type"]}", }}""", "{{ index }}": str(index) }, ) def prepare_workplace(config): """ prepares the directory we will work in. """ if config.workdir.work_in_tmp: os.chdir(config.workdir.tmp_dir) config.logger.info(f"Moving to {os.getcwd()}") if config.workdir.generate_in_tmp: gen_dir = config.workdir.tmp_dir else: gen_dir = tempfile.gettempdir() config.workdir.tmp_dir = gen_dir shutil.copy(os.path.join(config["root_dir"], "src", "common.js"), gen_dir) config.logger.info(f"Outputs will be generated in {config.workdir.tmp_dir}") return gen_dir def get_hander_for(directory, config): """ generates the handler class for the directory we provide. """ config["json_result"] = [{}] * len(config.tab_list) class HTTPHandler(http.server.SimpleHTTPRequestHandler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs, directory=directory) def do_POST(self): content_length = int(self.headers.get("Content-Length")) json_bytes = self.rfile.read(content_length).decode("utf-8") # json_list = json.loads(json_bytes) # for i, obj in enumerate(json_list): # print(obj) # if obj: # config["json_result"][i] = obj json_obj = json.loads(json_bytes) config["json_result"][json_obj["index"]] = json_obj["object"] save_json(config) self.send_head() self.wfile.write(json_bytes.encode("utf-8")) return def do_GET(self): if self.path == "/quit": self.path = "/" super().do_GET() exit(0) self.path = os.path.join(directory, self.path) if self.path == "/": self.path = config.generated.html return super().do_GET() return HTTPHandler def save_json(config): json_string = json.dumps(config["json_result"]) print(json_string) with open(config.arguments.output_json, "w") as json_file: json_file.write(json_string) def run_server(config): """ prepare and runs the server, with the handler for the given directory """ ip, port = config.arguments.ip, int(config.arguments.port) config.logger.debug(f"IP and port: {ip}:{port}") socketserver.TCPServer.allow_reuse_address = True config.logger.debug(f"Allow reuse adress.") handler = get_hander_for(config.workdir.tmp_dir, config) config.logger.debug(f"Created server handler for {config.workdir.tmp_dir}") config.logger.debug( f"Content of directory {config.workdir.tmp_dir}: " + "\n" + '\n'.join(sorted( f" - {path}"for path in os.listdir(config.workdir.tmp_dir) )) ) config.logger.debug(f"Creating TCP server...") server = socketserver.TCPServer((ip, port), handler) if ip == "0.0.0.0": displayed_ip = "localhost" else: displayed_ip = ip config.logger.debug(f"Serving...") print() print(f"http://{displayed_ip}:{port}") server.serve_forever() def get_logger(config, dummy=False): dummy_log = lambda msg:dummy and config.logger.info(msg) arguments = config.arguments if not dummy: logger = logging.getLogger(__file__) if arguments.debug: dummy_log(f"Output debug info.") level = logging.DEBUG else: level = logging.INFO if not dummy: logger.setLevel(level) formatter = logging.Formatter( "%(asctime)s - %(levelname)s - %(message)s" ) if arguments.logging.std == "err": dummy_log(f"Handler added to output logs in stderr.") if not dummy: handler = logging.StreamHandler(sys.stderr) handler.setLevel(level) handler.setFormatter(formatter) logger.addHandler(handler) elif arguments.logging.std == "out": dummy_log(f"Handler added to output logs in stdout.") if not dummy: handler = logging.StreamHandler(sys.stdout) handler.setLevel(level) handler.setFormatter(formatter) logger.addHandler(handler) else: dummy_log(f"Logs will not be output in stderr not stdout.") if (path := arguments.logging.file.path): dummy_log(f"Add log file: {arguments.logging.file.path}.") if not arguments.logging.file.append: dummy_log(f"Log file content cleaned.") with open(path, "w"):pass else: dummy_log(f"Logs appended to log file.") if not dummy: file_handler = logging.FileHandler(filename=path) file_handler.setLevel(level) file_handler.setFormatter(formatter) logger.addHandler(file_handler) if not dummy: config["logger"] = logger starting_sequence(logger) get_logger(config, dummy=True) return logger def starting_sequence(logger): logger.info("*bip* *bop*") logger.info("starting...") logger.info("program...") logger.info("MS2PF is running...") logger.info("*bip* *bop* am a robot") atexit.register(stoping_sequence, logger) def stoping_sequence(logger): logger.info("*bip* *bop*") logger.info("ending...") logger.info("program...") logger.info("MS2PF is shuting down...") logger.info("...robot") logger.info("*bip* *bop*") logger.info("shutdown") logger.info("...") if __name__ == "__main__": print(os.listdir("test-data")) if not os.path.exists("config.yml"): shutil.copy("config.default.yml", "config.yml") base_config = parse_config() config = parse_parameters(base_config) """ The config contains result of the parsed config file. """ arguments = config.arguments if arguments.pid: print(arguments.pid) with open(arguments.pid, "w") as pid_file: pid_file.write(str(os.getpid())) atexit.register(lambda:os.unlink(arguments.pid)) config.logger.info(f"Starting MS2PF from {os.getcwd()}") gen_dir = prepare_workplace(config) config["pf_path"] = os.path.join(gen_dir, config.generated.html) config.logger.info(f"HTML output file will be {config.pf_path}") config["js_file"] = os.path.join(gen_dir, config.generated.js) config.logger.info(f"JS output files will like {config.js_file}") config.placeholders["PF_URL_PLACEHOLDER"] = arguments.peakforest.url config.placeholders["PF_TOKEN_PLACEHOLDER"] = ( arguments.peakforest.token or config.token.value ) if (token := config.placeholders.PF_TOKEN_PLACEHOLDER): config.logger.info(f"Using a token for authentification - length: {len(token)}") else: config.logger.info(f"No token provided for peakforest authentification.") process_all_files(config) if not arguments.do_run_dry: config.logger.debug(f"Running the server.") if arguments.firefox or arguments.chromium: config.logger.debug(f"Running the server.") import threading import time if arguments.firefox: browser = "firefox" else: browser = "chromium" if (ip := config.network.ip) == "0.0.0.0": ip = "localhost" adress = f"http://{ip}:{config.network.port}" threading.Thread( target=lambda:( time.sleep(1), os.system(f"{browser} {adress}") ), daemon=True ).start() run_server(config) else: config.logger.debug(f"Server not run.")