Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/planemo/reports/allure.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import json | |
2 | |
3 from allure_commons import plugin_manager | |
4 from allure_commons.lifecycle import AllureLifecycle | |
5 from allure_commons.logger import AllureFileLogger | |
6 from allure_commons.model2 import ( | |
7 Label, | |
8 Link, | |
9 Status, | |
10 StatusDetails, | |
11 ) | |
12 from allure_commons.types import AttachmentType, LabelType, LinkType | |
13 from allure_commons.utils import ( | |
14 escape_non_unicode_symbols, | |
15 md5, | |
16 platform_label, | |
17 uuid4, | |
18 ) | |
19 from dateutil import parser | |
20 from galaxy.util import ( | |
21 safe_makedirs, | |
22 ) | |
23 | |
24 JSON_INDENT = 2 | |
25 | |
26 | |
27 class AllureListener(object): | |
28 def __init__(self, lifecycle): | |
29 self.lifecycle = lifecycle | |
30 | |
31 | |
32 class AllureWriter: | |
33 | |
34 def __init__(self, results_path): | |
35 safe_makedirs(results_path) | |
36 self.lifecycle = AllureLifecycle() | |
37 self.logger = AllureFileLogger(results_path) | |
38 self.listener = AllureListener(self.lifecycle) | |
39 | |
40 def process(self, structured_data, file_modication_datetime=None): | |
41 plugin_manager.register(self.listener) | |
42 plugin_manager.register(self.logger) | |
43 | |
44 for test_case in structured_data["tests"]: | |
45 self.process_test_case(test_case, file_modication_datetime=file_modication_datetime) | |
46 | |
47 plugin_manager.unregister(plugin=self.listener) | |
48 plugin_manager.unregister(plugin=self.logger) | |
49 | |
50 def process_test_case(self, test_case, file_modication_datetime=None): | |
51 | |
52 with self.lifecycle.schedule_test_case() as test_result: | |
53 test_index = test_case["id"] | |
54 test_data = test_case.get("data") or {} | |
55 job = test_data.get("job") or {} | |
56 test_result.name = test_index | |
57 self._record_start_stop(test_result, file_modication_datetime, job) | |
58 | |
59 test_result.fullName = test_index | |
60 test_result.testCaseId = md5(test_index) | |
61 test_result.historyId = md5(test_index) | |
62 tool_id = self._record_suite_labels(test_result, test_data, job) | |
63 | |
64 self._attach_data("test_data", json.dumps(test_data, indent=JSON_INDENT), attachment_type=AttachmentType.JSON) | |
65 for key in ["stderr", "stdout", "command_line", "external_id", "job_messages"]: | |
66 val = job.get(key) | |
67 if not val: | |
68 continue | |
69 if isinstance(val, list): | |
70 attachment_type = AttachmentType.JSON | |
71 # job messages | |
72 val = json.dumps(val, indent=JSON_INDENT) | |
73 else: | |
74 if not val.strip(): | |
75 continue | |
76 attachment_type = AttachmentType.TEXT | |
77 self._attach_data(key, val, attachment_type=attachment_type) | |
78 | |
79 problem_message = None | |
80 for key in ["execution_problem", "output_problems"]: | |
81 val = test_data.get(key) | |
82 if not val: | |
83 continue | |
84 if isinstance(val, list) and val: | |
85 # remove duplicated messages... | |
86 val = list(set(val)) | |
87 | |
88 attachment_type = AttachmentType.HTML | |
89 as_html_list = "<ul>" | |
90 as_html_list += "\n".join([f"<li><pre>{v}</pre></li>" for v in val]) | |
91 as_html_list += "</ul>" | |
92 problem_message = val[0] | |
93 val = as_html_list | |
94 else: | |
95 if not val.strip(): | |
96 continue | |
97 attachment_type = AttachmentType.TEXT | |
98 problem_message = val | |
99 self._attach_data(key, val, attachment_type=attachment_type) | |
100 | |
101 if problem_message is None and "job_messages" in job: | |
102 job_messages = job.get("job_messages") | |
103 if job_messages: | |
104 problem_message = str(job_messages) | |
105 | |
106 test_result.labels.append(Label(name=LabelType.FRAMEWORK, value='planemo')) | |
107 test_result.labels.append(Label(name=LabelType.LANGUAGE, value=platform_label())) | |
108 | |
109 self._record_tool_link(test_result, tool_id) | |
110 self._record_status(test_result, test_data) | |
111 if test_result.status in [Status.BROKEN, Status.FAILED]: | |
112 test_result.statusDetails = StatusDetails( | |
113 message=escape_non_unicode_symbols(problem_message or "Unknown problem"), | |
114 trace=None | |
115 ) | |
116 | |
117 self.lifecycle.write_test_case() | |
118 | |
119 def _record_start_stop(self, test_result, file_modication_datetime, job): | |
120 start_datetime = file_modication_datetime | |
121 end_datetime = file_modication_datetime | |
122 if "create_time" in job: | |
123 start_datetime = parser.parse(job["create_time"]) | |
124 | |
125 if "update_time" in job: | |
126 end_datetime = parser.parse(job["update_time"]) | |
127 | |
128 if start_datetime is not None: | |
129 test_result.start = int(round(start_datetime.timestamp() * 1000)) | |
130 if end_datetime is not None: | |
131 test_result.stop = int(round(end_datetime.timestamp() * 1000)) | |
132 | |
133 def _record_suite_labels(self, test_result, test_data, job): | |
134 tool_id = None | |
135 if "tool_id" in test_data: | |
136 tool_id = test_data["tool_id"] | |
137 test_result.labels.append(Label(name=LabelType.PARENT_SUITE, value=tool_id)) | |
138 elif "tool_id" in job: | |
139 tool_id = job["tool_id"] | |
140 test_result.labels.append(Label(name=LabelType.PARENT_SUITE, value=tool_id)) | |
141 | |
142 if "tool_version" in test_data: | |
143 test_result.labels.append(Label(name=LabelType.SUITE, value=test_data["tool_version"])) | |
144 elif "tool_version" in job: | |
145 test_result.labels.append(Label(name=LabelType.SUITE, value=job["tool_version"])) | |
146 | |
147 if "test_index" in test_data: | |
148 test_result.labels.append(Label(name=LabelType.SUB_SUITE, value=str(test_data["test_index"]))) | |
149 return tool_id | |
150 | |
151 def _record_tool_link(self, test_result, tool_id): | |
152 if tool_id and 'repos' in tool_id: | |
153 tool_parts = tool_id.split("/") | |
154 if len(tool_parts) >= 4: | |
155 link = Link(LinkType.LINK, "https://%s" % "/".join(tool_parts[0:4]), "Tool Repository") | |
156 test_result.links.append(link) | |
157 | |
158 def _record_status(self, test_result, test_data): | |
159 status = test_data.get("status", "error") | |
160 if status == "success": | |
161 test_result.status = Status.PASSED | |
162 elif status == "failure": | |
163 test_result.status = Status.FAILED | |
164 elif status == "skip": | |
165 test_result.status = Status.SKIPPED | |
166 else: | |
167 test_result.status = Status.BROKEN | |
168 | |
169 def _attach_data(self, key, val, attachment_type=AttachmentType.TEXT): | |
170 self.lifecycle.attach_data( | |
171 uuid4(), | |
172 val, | |
173 name=key, | |
174 attachment_type=attachment_type, | |
175 extension=None | |
176 ) | |
177 | |
178 | |
179 def write_results(results_path, structured_data, **kwds): | |
180 AllureWriter(results_path).process(structured_data) |