Mercurial > repos > laurenmarazzi > netisce_test
comparison tools/myTools/bin/sfa/vis/utils.py @ 1:7e5c71b2e71f draft default tip
Uploaded
author | laurenmarazzi |
---|---|
date | Wed, 22 Dec 2021 16:00:34 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:f24d4892aaed | 1:7e5c71b2e71f |
---|---|
1 | |
2 import numpy as np | |
3 import networkx as nx | |
4 | |
5 | |
6 __all__ = [ | |
7 'compute_graphics', | |
8 ] | |
9 | |
10 | |
11 def _rgb_to_hex(tup): | |
12 """Convert RGBA to #AARRGGBB | |
13 """ | |
14 tup = tuple(tup) | |
15 if len(tup) == 3: | |
16 return '#%02x%02x%02x' % tup | |
17 elif len(tup) == 4: | |
18 return '#%02x%02x%02x%02x' % (tup[3], tup[0], tup[1], tup[2]) | |
19 else: | |
20 raise ValueError("Array or tuple for RGB or RGBA should be given.") | |
21 | |
22 | |
23 def compute_graphics( | |
24 F, | |
25 act, | |
26 A, | |
27 n2i, | |
28 lw_min=1.0, | |
29 lw_max=10.0, | |
30 pct_link=90, | |
31 pct_act=50, | |
32 dg=None): | |
33 """Compute graphics of signal flow. | |
34 | |
35 This method performs a calculation for generating colors | |
36 of nodes and links for visualizing purpose. | |
37 | |
38 Parameters | |
39 ---------- | |
40 F : numpy.ndarray | |
41 A matrix of signal flows. | |
42 It is usually calculated as W2*x1 - W1*x1, | |
43 where W is weight matrix and | |
44 x is a vector of activities at steady-state. | |
45 | |
46 act : numpy.ndarray | |
47 Change in the activities. It is usually calculated | |
48 as x2 - x1, where x is | |
49 the a vector of activities at steady-state. | |
50 | |
51 A : numpy.ndarray | |
52 Adjacency matrix of the network. | |
53 | |
54 n2i : dict | |
55 Name to index dictionary. | |
56 | |
57 lw_min : float, optional | |
58 Minimum link width, which is also used for unchanged flow. | |
59 | |
60 lw_max : float, optional | |
61 Maximum link width. | |
62 | |
63 pct_link : int, optional | |
64 Percentile of link width, which is used to set | |
65 the maximum value for setting link widths. | |
66 Default value is 90. | |
67 | |
68 pct_act : int, optional | |
69 Percentile of activity, which is used to set | |
70 the maximum value for coloring nodes. | |
71 Default value is 50. | |
72 | |
73 dg : NetworkX.DiGraph, optional | |
74 Existing NetworkX object to contain graphics information | |
75 for visualizing nodes and links. | |
76 | |
77 Returns | |
78 ------- | |
79 dg : NetworkX.DiGraph | |
80 NetworkX object containing graphics information | |
81 for visualizing nodes and links. | |
82 | |
83 """ | |
84 | |
85 if not dg: | |
86 dg = nx.DiGraph() | |
87 dg.add_nodes_from(n2i) | |
88 | |
89 _compute_graphics_nodes(dg, n2i, act, pct_act) | |
90 _compute_graphics_links(dg, n2i, A, F, pct_link, lw_min, lw_max) | |
91 | |
92 return dg | |
93 | |
94 | |
95 def _compute_graphics_nodes(dg, n2i, act, pct_act): | |
96 color_white = np.array([255, 255, 255]) | |
97 color_up = np.array([255, 0, 0]) | |
98 color_dn = np.array([0, 0, 255]) | |
99 | |
100 abs_act = np.abs(act) | |
101 thr = np.percentile(abs_act, pct_act) | |
102 thr = 1 if thr == 0 else thr | |
103 | |
104 arr_t = np.zeros_like(act) | |
105 for i, elem in enumerate(act): | |
106 t = np.clip(np.abs(elem) / thr, a_min=0, a_max=1) | |
107 arr_t[i] = t | |
108 | |
109 for iden, idx in n2i.items(): | |
110 fold = act[idx] | |
111 | |
112 if fold > 0: | |
113 color = color_white + arr_t[idx] * (color_up - color_white) | |
114 elif fold <= 0: | |
115 color = color_white + arr_t[idx] * (color_dn - color_white) | |
116 | |
117 color = _rgb_to_hex(np.int32(color)) | |
118 | |
119 data = dg.nodes[iden] | |
120 data['FILL_COLOR'] = color | |
121 data['BORDER_WIDTH'] = 2 | |
122 data['BORDER_COLOR'] = _rgb_to_hex((40, 40, 40)) | |
123 # end of for | |
124 | |
125 | |
126 def _compute_graphics_links(dg, n2i, A, F, pct_link, lw_min, lw_max): | |
127 i2n = {val: key for key, val in n2i.items()} | |
128 | |
129 log_flows = np.log10(np.abs(F[F.nonzero()])) | |
130 flow_max = log_flows.max() | |
131 flow_min = log_flows.min() | |
132 flow_thr = np.percentile(log_flows, pct_link) | |
133 | |
134 ir, ic = A.nonzero() # F.nonzero() | |
135 for i, j in zip(ir, ic): | |
136 tgt = i2n[i] | |
137 src = i2n[j] | |
138 f = F[i, j] | |
139 | |
140 #link = net.nxdg[src][tgt]['VIS'] | |
141 dg.add_edge(src, tgt) | |
142 data = dg.edges[src, tgt] | |
143 | |
144 | |
145 #header_old = link.header | |
146 #args_header = header_old.width, header_old.height, header_old.offset | |
147 if f > 0: | |
148 sign_link = +1 # PosHeader(*args_header) | |
149 color_link = _rgb_to_hex((255, 10, 10, 70)) | |
150 elif f < 0: | |
151 sign_link = -1 # NegHeader(*args_header) | |
152 color_link = _rgb_to_hex((10, 10, 255, 70)) | |
153 else: # When flow is zero, show the sign of the original link. | |
154 if A[i, j] > 0: | |
155 sign_link = +1 # PosHeader(*args_header) | |
156 elif A[i, j] < 0: | |
157 sign_link = -1 # NegHeader(*args_header) | |
158 else: | |
159 raise RuntimeError("Abnormal state has been reached in " | |
160 "_compute_graphics_links.") | |
161 | |
162 color_link = _rgb_to_hex((100, 100, 100, 100)) | |
163 | |
164 | |
165 # If header exists, it should be removed, | |
166 # because the sign of signal flow can be different | |
167 # from the original sign of header. | |
168 if 'HEADER' in data: | |
169 data.pop('HEADER') | |
170 | |
171 data['SIGN'] = sign_link | |
172 data['FILL_COLOR'] = color_link | |
173 | |
174 if f == 0: | |
175 data['WIDTH'] = lw_min | |
176 elif (flow_max - flow_min) == 0: | |
177 data['WIDTH'] = 0.5 * (lw_max + lw_min) | |
178 else: | |
179 log_f = np.log10(np.abs(f)) | |
180 log_f = np.clip(log_f, a_min=flow_min, a_max=flow_thr) | |
181 lw = (log_f - flow_min) / (flow_max - flow_min) * ( | |
182 lw_max - lw_min) + lw_min | |
183 data['WIDTH'] = lw | |
184 |