1
|
1
|
|
2 import numpy as np
|
|
3 import networkx as nx
|
|
4
|
|
5
|
|
6 __all__ = [
|
|
7 'compute_graphics',
|
|
8 ]
|
|
9
|
|
10
|
|
11 def _rgb_to_hex(tup):
|
|
12 """Convert RGBA to #AARRGGBB
|
|
13 """
|
|
14 tup = tuple(tup)
|
|
15 if len(tup) == 3:
|
|
16 return '#%02x%02x%02x' % tup
|
|
17 elif len(tup) == 4:
|
|
18 return '#%02x%02x%02x%02x' % (tup[3], tup[0], tup[1], tup[2])
|
|
19 else:
|
|
20 raise ValueError("Array or tuple for RGB or RGBA should be given.")
|
|
21
|
|
22
|
|
23 def compute_graphics(
|
|
24 F,
|
|
25 act,
|
|
26 A,
|
|
27 n2i,
|
|
28 lw_min=1.0,
|
|
29 lw_max=10.0,
|
|
30 pct_link=90,
|
|
31 pct_act=50,
|
|
32 dg=None):
|
|
33 """Compute graphics of signal flow.
|
|
34
|
|
35 This method performs a calculation for generating colors
|
|
36 of nodes and links for visualizing purpose.
|
|
37
|
|
38 Parameters
|
|
39 ----------
|
|
40 F : numpy.ndarray
|
|
41 A matrix of signal flows.
|
|
42 It is usually calculated as W2*x1 - W1*x1,
|
|
43 where W is weight matrix and
|
|
44 x is a vector of activities at steady-state.
|
|
45
|
|
46 act : numpy.ndarray
|
|
47 Change in the activities. It is usually calculated
|
|
48 as x2 - x1, where x is
|
|
49 the a vector of activities at steady-state.
|
|
50
|
|
51 A : numpy.ndarray
|
|
52 Adjacency matrix of the network.
|
|
53
|
|
54 n2i : dict
|
|
55 Name to index dictionary.
|
|
56
|
|
57 lw_min : float, optional
|
|
58 Minimum link width, which is also used for unchanged flow.
|
|
59
|
|
60 lw_max : float, optional
|
|
61 Maximum link width.
|
|
62
|
|
63 pct_link : int, optional
|
|
64 Percentile of link width, which is used to set
|
|
65 the maximum value for setting link widths.
|
|
66 Default value is 90.
|
|
67
|
|
68 pct_act : int, optional
|
|
69 Percentile of activity, which is used to set
|
|
70 the maximum value for coloring nodes.
|
|
71 Default value is 50.
|
|
72
|
|
73 dg : NetworkX.DiGraph, optional
|
|
74 Existing NetworkX object to contain graphics information
|
|
75 for visualizing nodes and links.
|
|
76
|
|
77 Returns
|
|
78 -------
|
|
79 dg : NetworkX.DiGraph
|
|
80 NetworkX object containing graphics information
|
|
81 for visualizing nodes and links.
|
|
82
|
|
83 """
|
|
84
|
|
85 if not dg:
|
|
86 dg = nx.DiGraph()
|
|
87 dg.add_nodes_from(n2i)
|
|
88
|
|
89 _compute_graphics_nodes(dg, n2i, act, pct_act)
|
|
90 _compute_graphics_links(dg, n2i, A, F, pct_link, lw_min, lw_max)
|
|
91
|
|
92 return dg
|
|
93
|
|
94
|
|
95 def _compute_graphics_nodes(dg, n2i, act, pct_act):
|
|
96 color_white = np.array([255, 255, 255])
|
|
97 color_up = np.array([255, 0, 0])
|
|
98 color_dn = np.array([0, 0, 255])
|
|
99
|
|
100 abs_act = np.abs(act)
|
|
101 thr = np.percentile(abs_act, pct_act)
|
|
102 thr = 1 if thr == 0 else thr
|
|
103
|
|
104 arr_t = np.zeros_like(act)
|
|
105 for i, elem in enumerate(act):
|
|
106 t = np.clip(np.abs(elem) / thr, a_min=0, a_max=1)
|
|
107 arr_t[i] = t
|
|
108
|
|
109 for iden, idx in n2i.items():
|
|
110 fold = act[idx]
|
|
111
|
|
112 if fold > 0:
|
|
113 color = color_white + arr_t[idx] * (color_up - color_white)
|
|
114 elif fold <= 0:
|
|
115 color = color_white + arr_t[idx] * (color_dn - color_white)
|
|
116
|
|
117 color = _rgb_to_hex(np.int32(color))
|
|
118
|
|
119 data = dg.nodes[iden]
|
|
120 data['FILL_COLOR'] = color
|
|
121 data['BORDER_WIDTH'] = 2
|
|
122 data['BORDER_COLOR'] = _rgb_to_hex((40, 40, 40))
|
|
123 # end of for
|
|
124
|
|
125
|
|
126 def _compute_graphics_links(dg, n2i, A, F, pct_link, lw_min, lw_max):
|
|
127 i2n = {val: key for key, val in n2i.items()}
|
|
128
|
|
129 log_flows = np.log10(np.abs(F[F.nonzero()]))
|
|
130 flow_max = log_flows.max()
|
|
131 flow_min = log_flows.min()
|
|
132 flow_thr = np.percentile(log_flows, pct_link)
|
|
133
|
|
134 ir, ic = A.nonzero() # F.nonzero()
|
|
135 for i, j in zip(ir, ic):
|
|
136 tgt = i2n[i]
|
|
137 src = i2n[j]
|
|
138 f = F[i, j]
|
|
139
|
|
140 #link = net.nxdg[src][tgt]['VIS']
|
|
141 dg.add_edge(src, tgt)
|
|
142 data = dg.edges[src, tgt]
|
|
143
|
|
144
|
|
145 #header_old = link.header
|
|
146 #args_header = header_old.width, header_old.height, header_old.offset
|
|
147 if f > 0:
|
|
148 sign_link = +1 # PosHeader(*args_header)
|
|
149 color_link = _rgb_to_hex((255, 10, 10, 70))
|
|
150 elif f < 0:
|
|
151 sign_link = -1 # NegHeader(*args_header)
|
|
152 color_link = _rgb_to_hex((10, 10, 255, 70))
|
|
153 else: # When flow is zero, show the sign of the original link.
|
|
154 if A[i, j] > 0:
|
|
155 sign_link = +1 # PosHeader(*args_header)
|
|
156 elif A[i, j] < 0:
|
|
157 sign_link = -1 # NegHeader(*args_header)
|
|
158 else:
|
|
159 raise RuntimeError("Abnormal state has been reached in "
|
|
160 "_compute_graphics_links.")
|
|
161
|
|
162 color_link = _rgb_to_hex((100, 100, 100, 100))
|
|
163
|
|
164
|
|
165 # If header exists, it should be removed,
|
|
166 # because the sign of signal flow can be different
|
|
167 # from the original sign of header.
|
|
168 if 'HEADER' in data:
|
|
169 data.pop('HEADER')
|
|
170
|
|
171 data['SIGN'] = sign_link
|
|
172 data['FILL_COLOR'] = color_link
|
|
173
|
|
174 if f == 0:
|
|
175 data['WIDTH'] = lw_min
|
|
176 elif (flow_max - flow_min) == 0:
|
|
177 data['WIDTH'] = 0.5 * (lw_max + lw_min)
|
|
178 else:
|
|
179 log_f = np.log10(np.abs(f))
|
|
180 log_f = np.clip(log_f, a_min=flow_min, a_max=flow_thr)
|
|
181 lw = (log_f - flow_min) / (flow_max - flow_min) * (
|
|
182 lw_max - lw_min) + lw_min
|
|
183 data['WIDTH'] = lw
|
|
184
|