Mercurial > repos > laurenmarazzi > netisce_test
diff tools/myTools/bin/sfa/vis/utils.py @ 1:7e5c71b2e71f draft default tip
Uploaded
author | laurenmarazzi |
---|---|
date | Wed, 22 Dec 2021 16:00:34 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/myTools/bin/sfa/vis/utils.py Wed Dec 22 16:00:34 2021 +0000 @@ -0,0 +1,184 @@ + +import numpy as np +import networkx as nx + + +__all__ = [ + 'compute_graphics', +] + + +def _rgb_to_hex(tup): + """Convert RGBA to #AARRGGBB + """ + tup = tuple(tup) + if len(tup) == 3: + return '#%02x%02x%02x' % tup + elif len(tup) == 4: + return '#%02x%02x%02x%02x' % (tup[3], tup[0], tup[1], tup[2]) + else: + raise ValueError("Array or tuple for RGB or RGBA should be given.") + + +def compute_graphics( + F, + act, + A, + n2i, + lw_min=1.0, + lw_max=10.0, + pct_link=90, + pct_act=50, + dg=None): + """Compute graphics of signal flow. + + This method performs a calculation for generating colors + of nodes and links for visualizing purpose. + + Parameters + ---------- + F : numpy.ndarray + A matrix of signal flows. + It is usually calculated as W2*x1 - W1*x1, + where W is weight matrix and + x is a vector of activities at steady-state. + + act : numpy.ndarray + Change in the activities. It is usually calculated + as x2 - x1, where x is + the a vector of activities at steady-state. + + A : numpy.ndarray + Adjacency matrix of the network. + + n2i : dict + Name to index dictionary. + + lw_min : float, optional + Minimum link width, which is also used for unchanged flow. + + lw_max : float, optional + Maximum link width. + + pct_link : int, optional + Percentile of link width, which is used to set + the maximum value for setting link widths. + Default value is 90. + + pct_act : int, optional + Percentile of activity, which is used to set + the maximum value for coloring nodes. + Default value is 50. + + dg : NetworkX.DiGraph, optional + Existing NetworkX object to contain graphics information + for visualizing nodes and links. + + Returns + ------- + dg : NetworkX.DiGraph + NetworkX object containing graphics information + for visualizing nodes and links. + + """ + + if not dg: + dg = nx.DiGraph() + dg.add_nodes_from(n2i) + + _compute_graphics_nodes(dg, n2i, act, pct_act) + _compute_graphics_links(dg, n2i, A, F, pct_link, lw_min, lw_max) + + return dg + + +def _compute_graphics_nodes(dg, n2i, act, pct_act): + color_white = np.array([255, 255, 255]) + color_up = np.array([255, 0, 0]) + color_dn = np.array([0, 0, 255]) + + abs_act = np.abs(act) + thr = np.percentile(abs_act, pct_act) + thr = 1 if thr == 0 else thr + + arr_t = np.zeros_like(act) + for i, elem in enumerate(act): + t = np.clip(np.abs(elem) / thr, a_min=0, a_max=1) + arr_t[i] = t + + for iden, idx in n2i.items(): + fold = act[idx] + + if fold > 0: + color = color_white + arr_t[idx] * (color_up - color_white) + elif fold <= 0: + color = color_white + arr_t[idx] * (color_dn - color_white) + + color = _rgb_to_hex(np.int32(color)) + + data = dg.nodes[iden] + data['FILL_COLOR'] = color + data['BORDER_WIDTH'] = 2 + data['BORDER_COLOR'] = _rgb_to_hex((40, 40, 40)) + # end of for + + +def _compute_graphics_links(dg, n2i, A, F, pct_link, lw_min, lw_max): + i2n = {val: key for key, val in n2i.items()} + + log_flows = np.log10(np.abs(F[F.nonzero()])) + flow_max = log_flows.max() + flow_min = log_flows.min() + flow_thr = np.percentile(log_flows, pct_link) + + ir, ic = A.nonzero() # F.nonzero() + for i, j in zip(ir, ic): + tgt = i2n[i] + src = i2n[j] + f = F[i, j] + + #link = net.nxdg[src][tgt]['VIS'] + dg.add_edge(src, tgt) + data = dg.edges[src, tgt] + + + #header_old = link.header + #args_header = header_old.width, header_old.height, header_old.offset + if f > 0: + sign_link = +1 # PosHeader(*args_header) + color_link = _rgb_to_hex((255, 10, 10, 70)) + elif f < 0: + sign_link = -1 # NegHeader(*args_header) + color_link = _rgb_to_hex((10, 10, 255, 70)) + else: # When flow is zero, show the sign of the original link. + if A[i, j] > 0: + sign_link = +1 # PosHeader(*args_header) + elif A[i, j] < 0: + sign_link = -1 # NegHeader(*args_header) + else: + raise RuntimeError("Abnormal state has been reached in " + "_compute_graphics_links.") + + color_link = _rgb_to_hex((100, 100, 100, 100)) + + + # If header exists, it should be removed, + # because the sign of signal flow can be different + # from the original sign of header. + if 'HEADER' in data: + data.pop('HEADER') + + data['SIGN'] = sign_link + data['FILL_COLOR'] = color_link + + if f == 0: + data['WIDTH'] = lw_min + elif (flow_max - flow_min) == 0: + data['WIDTH'] = 0.5 * (lw_max + lw_min) + else: + log_f = np.log10(np.abs(f)) + log_f = np.clip(log_f, a_min=flow_min, a_max=flow_thr) + lw = (log_f - flow_min) / (flow_max - flow_min) * ( + lw_max - lw_min) + lw_min + data['WIDTH'] = lw +