view resfinder/cge/out/result.py @ 0:55051a9bc58d draft default tip

Uploaded
author dcouvin
date Mon, 10 Jan 2022 20:06:07 +0000
parents
children
line wrap: on
line source

#!/usr/bin/env python3

import json
import os.path

from .parserdict import ParserDict
from .exceptions import CGECoreOutTypeError, CGECoreOutInputError


class Result(dict):

    BEONE_JSON_FILE = "beone.json"
    beone_json_path = os.path.join(os.path.dirname(__file__), BEONE_JSON_FILE)

    def __init__(self, result_type=None, fmt_file=beone_json_path,
                 parsers=None, **kwargs):

        self.defs = {}
        with open(fmt_file, "r") as fh:
            self.defs = json.load(fh)

        if(parsers is None):
            self.val_parsers = ParserDict()
        else:
            self.val_parsers = ParserDict(parsers)

        type = self._get_type(result_type, **kwargs)
        self._set_type(type)
        self._parser = ResultParser(result_def=self.defs[type])
        for d in self._parser.arrays:
            self[d] = []
        for d in self._parser.dicts:
            self[d] = {}

        self.add(**kwargs)

    def _set_type(self, type):
        if(type in self.defs):
            self["type"] = type
        else:
            raise CGECoreOutTypeError(
                "Unknown result type given. Type given: {}. Type must be one "
                "of:\n{}".format(type, list(self.defs.keys())))

    def _get_type(self, result_type=None, **kwargs):
        type = None
        if(result_type is not None):
            type = result_type
        if(kwargs):
            kw_type = kwargs.get("type", None)
            if(type is not None and kw_type is not None and type != kw_type):
                raise CGECoreOutTypeError(
                    "Type was given as argument to method call and as an "
                    "attribute in the given dictionary, but they did not "
                    "match. {} (method) != {} (dict)".format(type, kw_type))
            elif(kw_type is not None):
                type = kw_type
        if(type is None):
            raise CGECoreOutTypeError(
                "The class format requires a 'type' attribute. The given "
                "dictionary contained the following attributes: {}"
                .format(kwargs.keys()))
        return type

    def add(self, **kwargs):
        for key, val in kwargs.items():
            if(val is None):
                continue
            self[key] = val

    def add_class(self, cl, result_type=None, **kwargs):
        type = self._get_type(result_type, **kwargs)
        res = Result(result_type=type, **kwargs)
        if(cl in self._parser.arrays):
            self[cl].append(res)
        elif(cl in self._parser.dicts):
            self[cl][res["key"]] = res
        else:
            self[cl] = res

    def modify_class(self, cl, result_type=None, **kwargs):
        type = self._get_type(result_type, **kwargs)
        res = Result(result_type=type, **kwargs)
        res_id = res["ref_id"].replace("_", ";;")
        for key, value in res.items():
            if key not in self[cl][res_id]:
                self[cl][res_id][key] = value
            elif self[cl][res_id][key] != value:
                self[cl][res_id][key] = \
                        self[cl][res_id][key] \
                        + ", " + value
            else:
                pass

    def check_results(self, errors=None):
        self.errors = {}

        for key, val in self.items():
            if(key == "type"):
                continue
            # The key is not defined, and is not checked
            elif(key not in self.defs[self["type"]]):
                continue
            self._check_result(key, val, self.errors)

        # errors is not None if called recursively
        if(errors is not None):
            errors[self["key"]] = self.errors
            return None
        # errors is None if it is the first/root call
        elif(errors is None and self._no_errors(self.errors)):
            return None
        else:
            raise CGECoreOutInputError(
                "Some input data did not pass validation, please consult the "
                "Dictionary of ERRORS:{}".format(self.errors),
                self.errors)

    def _check_result(self, key, val, errors, index=None):
        # Remember Result is a dict object and therefore this test should
        # be before the dict test.
        if(isinstance(val, Result)):
            val.check_results(errors)
        elif(isinstance(val, dict)):
            self._check_result_dict(key, val, errors)
        elif(isinstance(val, list)):
            self._check_result_list(key, val, errors)
        else:
            self._check_result_val(key, val, errors, index)

    def del_entries_by_values(self, values):
        values = tuple(values)
        deleted_keys = []
        for key, entry_val in self.items():
            if(key == "type"):
                continue
            if(entry_val in values):
                deleted_keys.append(key)
        for key in deleted_keys:
            del self[key]
        return deleted_keys

    def _no_errors(self, errors):
        no_errors = True

        for key, val in errors.items():

            if(isinstance(val, dict)):
                no_errors = self._no_errors(val)
                if(no_errors is False):
                    return False

            elif(val is not None):
                return False

        return no_errors

    def _check_result_val(self, key, val, errors, index=None):
        val_type = self._parser[key]

        if(val_type.endswith("*")):
            val_type = val_type[:-1]

        val_error = self.val_parsers[val_type](val)

        if(val_error):
            if(index is not None):
                val_error = "{}:{} ".format(index, val_error)
            errors[key] = val_error

    def _check_result_dict(self, result_key, result_dict, errors):
        errors[result_key] = {}
        for key, val in result_dict.items():
            self._check_result(key, val, errors[result_key])

    def _check_result_list(self, result_key, result_list, errors):
        errors[result_key] = {}
        for i, val in enumerate(result_list):
            self._check_result(result_key, val, errors[result_key], index=i)


class ResultParser(dict):
    """"""
    def __init__(self, result_def):
        self.classes = set()
        self.arrays = {}
        self.dicts = {}

        for key, val_def_str in result_def.items():
            val_def, *sub_def = val_def_str.split(" ")
            if(sub_def and val_def == "dict"):
                self.dicts[key] = sub_def[0]
                self[key] = sub_def[0]
            elif(sub_def and val_def == "array"):
                self.arrays[key] = sub_def[0]
                self[key] = sub_def[0]
            else:
                self[key] = val_def