Mercurial > repos > muon-spectroscopy-computational-project > larch_criteria_report
comparison larch_criteria_report.py @ 0:aa9cb2b42741 draft default tip
planemo upload for repository https://github.com/MaterialsGalaxy/larch-tools/tree/main/larch_criteria_report commit 5be486890442dedfb327289d597e1c8110240735
author | muon-spectroscopy-computational-project |
---|---|
date | Tue, 14 Nov 2023 15:34:55 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:aa9cb2b42741 |
---|---|
1 import csv | |
2 import json | |
3 import os | |
4 import sys | |
5 from typing import Iterable | |
6 | |
7 import matplotlib.pyplot as plt | |
8 | |
9 import numpy as np | |
10 | |
11 | |
12 def plot(variable: str, column: Iterable[float]): | |
13 variable_stripped = variable.strip() | |
14 path = f"plots/{variable_stripped}.png" | |
15 plt.figure(figsize=(8, 4)) | |
16 plt.plot(column) | |
17 plt.xlim((0, len(column))) | |
18 ticks, _ = plt.xticks() | |
19 plt.xticks(np.array(ticks).astype("int")) | |
20 plt.xlabel("Dataset number") | |
21 plt.ylabel(variable_stripped) | |
22 plt.savefig(path, format="png") | |
23 | |
24 | |
25 def load(filepath: str) -> "list[list[str|float]]": | |
26 with open(filepath) as f: | |
27 reader = csv.reader(f) | |
28 header = next(reader) | |
29 columns = [[h] for h in header] | |
30 for row in reader: | |
31 for i, value in enumerate(row): | |
32 columns[i].append(float(value)) | |
33 | |
34 return columns | |
35 | |
36 | |
37 def parse_reports(input_data: str) -> "dict[str, list[float]]": | |
38 # Need to scrape variables from individual files | |
39 report_criteria = input_values["format"]["report_criteria"] | |
40 data = {c["variable"]: [] for c in report_criteria} | |
41 headers = list(data.keys()) | |
42 with open("criteria_report.csv", "w") as f_out: | |
43 writer = csv.writer(f_out) | |
44 writer.writerow([f"{h:>12s}" for h in headers]) | |
45 | |
46 if os.path.isdir(input_data): | |
47 input_files = [ | |
48 os.path.join(input_data, f) for f in os.listdir(input_data) | |
49 ] | |
50 input_files.sort() | |
51 else: | |
52 input_files = input_data.split(",") | |
53 | |
54 for input_file in input_files: | |
55 row = parse_row(data, headers, input_file) | |
56 writer.writerow(row) | |
57 | |
58 return data | |
59 | |
60 | |
61 def parse_row( | |
62 data: "dict[str, list[float]]", headers: "list[str]", input_file: str | |
63 ) -> "list[str]": | |
64 row = [None] * len(headers) | |
65 with open(input_file) as f_in: | |
66 line = f_in.readline() | |
67 while line: | |
68 words = line.split() | |
69 try: | |
70 variable = words[0] | |
71 value = words[2] | |
72 if variable in headers: | |
73 row[headers.index(variable)] = f"{value:>12s}" | |
74 data[variable].append(float(value)) | |
75 if all(row): | |
76 return row | |
77 except IndexError: | |
78 # Not all lines will have potential variables/values | |
79 # so just pass | |
80 pass | |
81 | |
82 line = f_in.readline() | |
83 | |
84 # Only reach here if we run out of lines without finding a value for each | |
85 # variable | |
86 raise RuntimeError( | |
87 f"One or more criteria missing, was looking for {headers} but found " | |
88 f"{row}" | |
89 ) | |
90 | |
91 | |
92 if __name__ == "__main__": | |
93 input_data = sys.argv[1] | |
94 input_values = json.load(open(sys.argv[2], "r", encoding="utf-8")) | |
95 | |
96 if "report_criteria" in input_values["format"]: | |
97 data = parse_reports(input_data) | |
98 for variable, column in data.items(): | |
99 plot(variable, column) | |
100 else: | |
101 columns = load(input_data) | |
102 for column in columns: | |
103 plot(column[0], column[1:]) |