Mercurial > repos > dcouvin > resfinder4
diff resfinder/cge/out/result.py @ 0:55051a9bc58d draft default tip
Uploaded
author | dcouvin |
---|---|
date | Mon, 10 Jan 2022 20:06:07 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/resfinder/cge/out/result.py Mon Jan 10 20:06:07 2022 +0000 @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 + +import json +import os.path + +from .parserdict import ParserDict +from .exceptions import CGECoreOutTypeError, CGECoreOutInputError + + +class Result(dict): + + BEONE_JSON_FILE = "beone.json" + beone_json_path = os.path.join(os.path.dirname(__file__), BEONE_JSON_FILE) + + def __init__(self, result_type=None, fmt_file=beone_json_path, + parsers=None, **kwargs): + + self.defs = {} + with open(fmt_file, "r") as fh: + self.defs = json.load(fh) + + if(parsers is None): + self.val_parsers = ParserDict() + else: + self.val_parsers = ParserDict(parsers) + + type = self._get_type(result_type, **kwargs) + self._set_type(type) + self._parser = ResultParser(result_def=self.defs[type]) + for d in self._parser.arrays: + self[d] = [] + for d in self._parser.dicts: + self[d] = {} + + self.add(**kwargs) + + def _set_type(self, type): + if(type in self.defs): + self["type"] = type + else: + raise CGECoreOutTypeError( + "Unknown result type given. Type given: {}. Type must be one " + "of:\n{}".format(type, list(self.defs.keys()))) + + def _get_type(self, result_type=None, **kwargs): + type = None + if(result_type is not None): + type = result_type + if(kwargs): + kw_type = kwargs.get("type", None) + if(type is not None and kw_type is not None and type != kw_type): + raise CGECoreOutTypeError( + "Type was given as argument to method call and as an " + "attribute in the given dictionary, but they did not " + "match. {} (method) != {} (dict)".format(type, kw_type)) + elif(kw_type is not None): + type = kw_type + if(type is None): + raise CGECoreOutTypeError( + "The class format requires a 'type' attribute. The given " + "dictionary contained the following attributes: {}" + .format(kwargs.keys())) + return type + + def add(self, **kwargs): + for key, val in kwargs.items(): + if(val is None): + continue + self[key] = val + + def add_class(self, cl, result_type=None, **kwargs): + type = self._get_type(result_type, **kwargs) + res = Result(result_type=type, **kwargs) + if(cl in self._parser.arrays): + self[cl].append(res) + elif(cl in self._parser.dicts): + self[cl][res["key"]] = res + else: + self[cl] = res + + def modify_class(self, cl, result_type=None, **kwargs): + type = self._get_type(result_type, **kwargs) + res = Result(result_type=type, **kwargs) + res_id = res["ref_id"].replace("_", ";;") + for key, value in res.items(): + if key not in self[cl][res_id]: + self[cl][res_id][key] = value + elif self[cl][res_id][key] != value: + self[cl][res_id][key] = \ + self[cl][res_id][key] \ + + ", " + value + else: + pass + + def check_results(self, errors=None): + self.errors = {} + + for key, val in self.items(): + if(key == "type"): + continue + # The key is not defined, and is not checked + elif(key not in self.defs[self["type"]]): + continue + self._check_result(key, val, self.errors) + + # errors is not None if called recursively + if(errors is not None): + errors[self["key"]] = self.errors + return None + # errors is None if it is the first/root call + elif(errors is None and self._no_errors(self.errors)): + return None + else: + raise CGECoreOutInputError( + "Some input data did not pass validation, please consult the " + "Dictionary of ERRORS:{}".format(self.errors), + self.errors) + + def _check_result(self, key, val, errors, index=None): + # Remember Result is a dict object and therefore this test should + # be before the dict test. + if(isinstance(val, Result)): + val.check_results(errors) + elif(isinstance(val, dict)): + self._check_result_dict(key, val, errors) + elif(isinstance(val, list)): + self._check_result_list(key, val, errors) + else: + self._check_result_val(key, val, errors, index) + + def del_entries_by_values(self, values): + values = tuple(values) + deleted_keys = [] + for key, entry_val in self.items(): + if(key == "type"): + continue + if(entry_val in values): + deleted_keys.append(key) + for key in deleted_keys: + del self[key] + return deleted_keys + + def _no_errors(self, errors): + no_errors = True + + for key, val in errors.items(): + + if(isinstance(val, dict)): + no_errors = self._no_errors(val) + if(no_errors is False): + return False + + elif(val is not None): + return False + + return no_errors + + def _check_result_val(self, key, val, errors, index=None): + val_type = self._parser[key] + + if(val_type.endswith("*")): + val_type = val_type[:-1] + + val_error = self.val_parsers[val_type](val) + + if(val_error): + if(index is not None): + val_error = "{}:{} ".format(index, val_error) + errors[key] = val_error + + def _check_result_dict(self, result_key, result_dict, errors): + errors[result_key] = {} + for key, val in result_dict.items(): + self._check_result(key, val, errors[result_key]) + + def _check_result_list(self, result_key, result_list, errors): + errors[result_key] = {} + for i, val in enumerate(result_list): + self._check_result(result_key, val, errors[result_key], index=i) + + +class ResultParser(dict): + """""" + def __init__(self, result_def): + self.classes = set() + self.arrays = {} + self.dicts = {} + + for key, val_def_str in result_def.items(): + val_def, *sub_def = val_def_str.split(" ") + if(sub_def and val_def == "dict"): + self.dicts[key] = sub_def[0] + self[key] = sub_def[0] + elif(sub_def and val_def == "array"): + self.arrays[key] = sub_def[0] + self[key] = sub_def[0] + else: + self[key] = val_def