Mercurial > repos > dcouvin > resfinder4
comparison resfinder/cge/out/result.py @ 0:55051a9bc58d draft default tip
Uploaded
author | dcouvin |
---|---|
date | Mon, 10 Jan 2022 20:06:07 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:55051a9bc58d |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 import json | |
4 import os.path | |
5 | |
6 from .parserdict import ParserDict | |
7 from .exceptions import CGECoreOutTypeError, CGECoreOutInputError | |
8 | |
9 | |
10 class Result(dict): | |
11 | |
12 BEONE_JSON_FILE = "beone.json" | |
13 beone_json_path = os.path.join(os.path.dirname(__file__), BEONE_JSON_FILE) | |
14 | |
15 def __init__(self, result_type=None, fmt_file=beone_json_path, | |
16 parsers=None, **kwargs): | |
17 | |
18 self.defs = {} | |
19 with open(fmt_file, "r") as fh: | |
20 self.defs = json.load(fh) | |
21 | |
22 if(parsers is None): | |
23 self.val_parsers = ParserDict() | |
24 else: | |
25 self.val_parsers = ParserDict(parsers) | |
26 | |
27 type = self._get_type(result_type, **kwargs) | |
28 self._set_type(type) | |
29 self._parser = ResultParser(result_def=self.defs[type]) | |
30 for d in self._parser.arrays: | |
31 self[d] = [] | |
32 for d in self._parser.dicts: | |
33 self[d] = {} | |
34 | |
35 self.add(**kwargs) | |
36 | |
37 def _set_type(self, type): | |
38 if(type in self.defs): | |
39 self["type"] = type | |
40 else: | |
41 raise CGECoreOutTypeError( | |
42 "Unknown result type given. Type given: {}. Type must be one " | |
43 "of:\n{}".format(type, list(self.defs.keys()))) | |
44 | |
45 def _get_type(self, result_type=None, **kwargs): | |
46 type = None | |
47 if(result_type is not None): | |
48 type = result_type | |
49 if(kwargs): | |
50 kw_type = kwargs.get("type", None) | |
51 if(type is not None and kw_type is not None and type != kw_type): | |
52 raise CGECoreOutTypeError( | |
53 "Type was given as argument to method call and as an " | |
54 "attribute in the given dictionary, but they did not " | |
55 "match. {} (method) != {} (dict)".format(type, kw_type)) | |
56 elif(kw_type is not None): | |
57 type = kw_type | |
58 if(type is None): | |
59 raise CGECoreOutTypeError( | |
60 "The class format requires a 'type' attribute. The given " | |
61 "dictionary contained the following attributes: {}" | |
62 .format(kwargs.keys())) | |
63 return type | |
64 | |
65 def add(self, **kwargs): | |
66 for key, val in kwargs.items(): | |
67 if(val is None): | |
68 continue | |
69 self[key] = val | |
70 | |
71 def add_class(self, cl, result_type=None, **kwargs): | |
72 type = self._get_type(result_type, **kwargs) | |
73 res = Result(result_type=type, **kwargs) | |
74 if(cl in self._parser.arrays): | |
75 self[cl].append(res) | |
76 elif(cl in self._parser.dicts): | |
77 self[cl][res["key"]] = res | |
78 else: | |
79 self[cl] = res | |
80 | |
81 def modify_class(self, cl, result_type=None, **kwargs): | |
82 type = self._get_type(result_type, **kwargs) | |
83 res = Result(result_type=type, **kwargs) | |
84 res_id = res["ref_id"].replace("_", ";;") | |
85 for key, value in res.items(): | |
86 if key not in self[cl][res_id]: | |
87 self[cl][res_id][key] = value | |
88 elif self[cl][res_id][key] != value: | |
89 self[cl][res_id][key] = \ | |
90 self[cl][res_id][key] \ | |
91 + ", " + value | |
92 else: | |
93 pass | |
94 | |
95 def check_results(self, errors=None): | |
96 self.errors = {} | |
97 | |
98 for key, val in self.items(): | |
99 if(key == "type"): | |
100 continue | |
101 # The key is not defined, and is not checked | |
102 elif(key not in self.defs[self["type"]]): | |
103 continue | |
104 self._check_result(key, val, self.errors) | |
105 | |
106 # errors is not None if called recursively | |
107 if(errors is not None): | |
108 errors[self["key"]] = self.errors | |
109 return None | |
110 # errors is None if it is the first/root call | |
111 elif(errors is None and self._no_errors(self.errors)): | |
112 return None | |
113 else: | |
114 raise CGECoreOutInputError( | |
115 "Some input data did not pass validation, please consult the " | |
116 "Dictionary of ERRORS:{}".format(self.errors), | |
117 self.errors) | |
118 | |
119 def _check_result(self, key, val, errors, index=None): | |
120 # Remember Result is a dict object and therefore this test should | |
121 # be before the dict test. | |
122 if(isinstance(val, Result)): | |
123 val.check_results(errors) | |
124 elif(isinstance(val, dict)): | |
125 self._check_result_dict(key, val, errors) | |
126 elif(isinstance(val, list)): | |
127 self._check_result_list(key, val, errors) | |
128 else: | |
129 self._check_result_val(key, val, errors, index) | |
130 | |
131 def del_entries_by_values(self, values): | |
132 values = tuple(values) | |
133 deleted_keys = [] | |
134 for key, entry_val in self.items(): | |
135 if(key == "type"): | |
136 continue | |
137 if(entry_val in values): | |
138 deleted_keys.append(key) | |
139 for key in deleted_keys: | |
140 del self[key] | |
141 return deleted_keys | |
142 | |
143 def _no_errors(self, errors): | |
144 no_errors = True | |
145 | |
146 for key, val in errors.items(): | |
147 | |
148 if(isinstance(val, dict)): | |
149 no_errors = self._no_errors(val) | |
150 if(no_errors is False): | |
151 return False | |
152 | |
153 elif(val is not None): | |
154 return False | |
155 | |
156 return no_errors | |
157 | |
158 def _check_result_val(self, key, val, errors, index=None): | |
159 val_type = self._parser[key] | |
160 | |
161 if(val_type.endswith("*")): | |
162 val_type = val_type[:-1] | |
163 | |
164 val_error = self.val_parsers[val_type](val) | |
165 | |
166 if(val_error): | |
167 if(index is not None): | |
168 val_error = "{}:{} ".format(index, val_error) | |
169 errors[key] = val_error | |
170 | |
171 def _check_result_dict(self, result_key, result_dict, errors): | |
172 errors[result_key] = {} | |
173 for key, val in result_dict.items(): | |
174 self._check_result(key, val, errors[result_key]) | |
175 | |
176 def _check_result_list(self, result_key, result_list, errors): | |
177 errors[result_key] = {} | |
178 for i, val in enumerate(result_list): | |
179 self._check_result(result_key, val, errors[result_key], index=i) | |
180 | |
181 | |
182 class ResultParser(dict): | |
183 """""" | |
184 def __init__(self, result_def): | |
185 self.classes = set() | |
186 self.arrays = {} | |
187 self.dicts = {} | |
188 | |
189 for key, val_def_str in result_def.items(): | |
190 val_def, *sub_def = val_def_str.split(" ") | |
191 if(sub_def and val_def == "dict"): | |
192 self.dicts[key] = sub_def[0] | |
193 self[key] = sub_def[0] | |
194 elif(sub_def and val_def == "array"): | |
195 self.arrays[key] = sub_def[0] | |
196 self[key] = sub_def[0] | |
197 else: | |
198 self[key] = val_def |