comparison env/lib/python3.9/site-packages/planemo/reports/allure.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import json
2
3 from allure_commons import plugin_manager
4 from allure_commons.lifecycle import AllureLifecycle
5 from allure_commons.logger import AllureFileLogger
6 from allure_commons.model2 import (
7 Label,
8 Link,
9 Status,
10 StatusDetails,
11 )
12 from allure_commons.types import AttachmentType, LabelType, LinkType
13 from allure_commons.utils import (
14 escape_non_unicode_symbols,
15 md5,
16 platform_label,
17 uuid4,
18 )
19 from dateutil import parser
20 from galaxy.util import (
21 safe_makedirs,
22 )
23
24 JSON_INDENT = 2
25
26
27 class AllureListener(object):
28 def __init__(self, lifecycle):
29 self.lifecycle = lifecycle
30
31
32 class AllureWriter:
33
34 def __init__(self, results_path):
35 safe_makedirs(results_path)
36 self.lifecycle = AllureLifecycle()
37 self.logger = AllureFileLogger(results_path)
38 self.listener = AllureListener(self.lifecycle)
39
40 def process(self, structured_data, file_modication_datetime=None):
41 plugin_manager.register(self.listener)
42 plugin_manager.register(self.logger)
43
44 for test_case in structured_data["tests"]:
45 self.process_test_case(test_case, file_modication_datetime=file_modication_datetime)
46
47 plugin_manager.unregister(plugin=self.listener)
48 plugin_manager.unregister(plugin=self.logger)
49
50 def process_test_case(self, test_case, file_modication_datetime=None):
51
52 with self.lifecycle.schedule_test_case() as test_result:
53 test_index = test_case["id"]
54 test_data = test_case.get("data") or {}
55 job = test_data.get("job") or {}
56 test_result.name = test_index
57 self._record_start_stop(test_result, file_modication_datetime, job)
58
59 test_result.fullName = test_index
60 test_result.testCaseId = md5(test_index)
61 test_result.historyId = md5(test_index)
62 tool_id = self._record_suite_labels(test_result, test_data, job)
63
64 self._attach_data("test_data", json.dumps(test_data, indent=JSON_INDENT), attachment_type=AttachmentType.JSON)
65 for key in ["stderr", "stdout", "command_line", "external_id", "job_messages"]:
66 val = job.get(key)
67 if not val:
68 continue
69 if isinstance(val, list):
70 attachment_type = AttachmentType.JSON
71 # job messages
72 val = json.dumps(val, indent=JSON_INDENT)
73 else:
74 if not val.strip():
75 continue
76 attachment_type = AttachmentType.TEXT
77 self._attach_data(key, val, attachment_type=attachment_type)
78
79 problem_message = None
80 for key in ["execution_problem", "output_problems"]:
81 val = test_data.get(key)
82 if not val:
83 continue
84 if isinstance(val, list) and val:
85 # remove duplicated messages...
86 val = list(set(val))
87
88 attachment_type = AttachmentType.HTML
89 as_html_list = "<ul>"
90 as_html_list += "\n".join([f"<li><pre>{v}</pre></li>" for v in val])
91 as_html_list += "</ul>"
92 problem_message = val[0]
93 val = as_html_list
94 else:
95 if not val.strip():
96 continue
97 attachment_type = AttachmentType.TEXT
98 problem_message = val
99 self._attach_data(key, val, attachment_type=attachment_type)
100
101 if problem_message is None and "job_messages" in job:
102 job_messages = job.get("job_messages")
103 if job_messages:
104 problem_message = str(job_messages)
105
106 test_result.labels.append(Label(name=LabelType.FRAMEWORK, value='planemo'))
107 test_result.labels.append(Label(name=LabelType.LANGUAGE, value=platform_label()))
108
109 self._record_tool_link(test_result, tool_id)
110 self._record_status(test_result, test_data)
111 if test_result.status in [Status.BROKEN, Status.FAILED]:
112 test_result.statusDetails = StatusDetails(
113 message=escape_non_unicode_symbols(problem_message or "Unknown problem"),
114 trace=None
115 )
116
117 self.lifecycle.write_test_case()
118
119 def _record_start_stop(self, test_result, file_modication_datetime, job):
120 start_datetime = file_modication_datetime
121 end_datetime = file_modication_datetime
122 if "create_time" in job:
123 start_datetime = parser.parse(job["create_time"])
124
125 if "update_time" in job:
126 end_datetime = parser.parse(job["update_time"])
127
128 if start_datetime is not None:
129 test_result.start = int(round(start_datetime.timestamp() * 1000))
130 if end_datetime is not None:
131 test_result.stop = int(round(end_datetime.timestamp() * 1000))
132
133 def _record_suite_labels(self, test_result, test_data, job):
134 tool_id = None
135 if "tool_id" in test_data:
136 tool_id = test_data["tool_id"]
137 test_result.labels.append(Label(name=LabelType.PARENT_SUITE, value=tool_id))
138 elif "tool_id" in job:
139 tool_id = job["tool_id"]
140 test_result.labels.append(Label(name=LabelType.PARENT_SUITE, value=tool_id))
141
142 if "tool_version" in test_data:
143 test_result.labels.append(Label(name=LabelType.SUITE, value=test_data["tool_version"]))
144 elif "tool_version" in job:
145 test_result.labels.append(Label(name=LabelType.SUITE, value=job["tool_version"]))
146
147 if "test_index" in test_data:
148 test_result.labels.append(Label(name=LabelType.SUB_SUITE, value=str(test_data["test_index"])))
149 return tool_id
150
151 def _record_tool_link(self, test_result, tool_id):
152 if tool_id and 'repos' in tool_id:
153 tool_parts = tool_id.split("/")
154 if len(tool_parts) >= 4:
155 link = Link(LinkType.LINK, "https://%s" % "/".join(tool_parts[0:4]), "Tool Repository")
156 test_result.links.append(link)
157
158 def _record_status(self, test_result, test_data):
159 status = test_data.get("status", "error")
160 if status == "success":
161 test_result.status = Status.PASSED
162 elif status == "failure":
163 test_result.status = Status.FAILED
164 elif status == "skip":
165 test_result.status = Status.SKIPPED
166 else:
167 test_result.status = Status.BROKEN
168
169 def _attach_data(self, key, val, attachment_type=AttachmentType.TEXT):
170 self.lifecycle.attach_data(
171 uuid4(),
172 val,
173 name=key,
174 attachment_type=attachment_type,
175 extension=None
176 )
177
178
179 def write_results(results_path, structured_data, **kwds):
180 AllureWriter(results_path).process(structured_data)