Mercurial > repos > laurenmarazzi > netisce_test
diff tools/myTools/bin/sfa/utils.py @ 1:7e5c71b2e71f draft default tip
Uploaded
author | laurenmarazzi |
---|---|
date | Wed, 22 Dec 2021 16:00:34 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/myTools/bin/sfa/utils.py Wed Dec 22 16:00:34 2021 +0000 @@ -0,0 +1,354 @@ +# -*- coding: utf-8 -*- +import sys +if sys.version_info <= (2, 8): + from builtins import super + +import os +import codecs +from collections import defaultdict + +import numpy as np +import scipy as sp +import pandas as pd +import networkx as nx + + +__all__ = ["FrozenClass", + "Singleton", + "to_networkx_digraph", + "normalize", + "rand_swap", + "rand_flip", + "rand_weights", + "rand_structure", + "get_akey", + "get_avalue",] + + +class FrozenClass(object): + + __isfrozen = False + + def __setattr__(self, key, value): + if self.__isfrozen and not hasattr(self, key): + raise TypeError( "%r is a frozen class" % self ) + object.__setattr__(self, key, value) + + def _freeze(self): + self.__isfrozen = True + + def _melt(self): + self.__isfrozen = False + +""" +<Reference> +http://stackoverflow.com/questions/3603502/prevent-creating-new-attributes-outside-init +""" +# end of def FrozenClass + + +def Singleton(_class): + class __Singleton(_class): + __instance = None + + def __new__(cls, *args, **kwargs): + if cls.__instance is None: + cls.__instance = super().__new__(cls, *args, **kwargs) + + # Creation and initialization of '__initialized' + cls.__instance.__initialized = False + # end of if + return cls.__instance + + def __init__(self, *args, **kwargs): + if self.__initialized: + return + + super().__init__(*args, **kwargs) + self.__initialized = True + + def __repr__(self): + return '<{0} Singleton object at {1}>'.format( + _class.__name__, hex(id(self))) + + def __str__(self): + return super().__str__() + # end of def class + + __Singleton.__name__ = _class.__name__ + return __Singleton + +""" +<References> +http://m.egloos.zum.com/mataeoh/v/7081556 +""" +# end of def Singleton + +def normalize(A, norm_in=True, norm_out=True): + # Check whether A is a square matrix + if A.shape[0] != A.shape[1]: + raise ValueError( + "The A (adjacency matrix) should be square matrix.") + + # Build propagation matrix (aka. transition matrix) _W from A + W = A.copy() + + # Norm. out-degree + if norm_out == True: + sum_col_A = np.abs(A).sum(axis=0) + sum_col_A[sum_col_A == 0] = 1 + if norm_in == False: + Dc = 1 / sum_col_A + else: + Dc = 1 / np.sqrt(sum_col_A) + # end of else + W = Dc * W # This is not matrix multiplication + + # Norm. in-degree + if norm_in == True: + sum_row_A = np.abs(A).sum(axis=1) + sum_row_A[sum_row_A == 0] = 1 + if norm_out == False: + Dr = 1 / sum_row_A + else: + Dr = 1 / np.sqrt(sum_row_A) + # end of row + W = np.multiply(W, np.mat(Dr).T) + # Converting np.mat to ndarray + # does not cost a lot. + W = W.A + # end of if + """ + The normalization above is the same as the follows: + >>> np.diag(Dr).dot(A.dot(np.diag(Dc))) + """ + return W + + +# end of def normalize + +def to_networkx_digraph(A, n2i=None): + if not n2i: + return nx.from_numpy_array(A, create_using=nx.Digraph) + + i2n = {ix:name for name, ix in n2i.items()} + dg = nx.DiGraph() + ind_row, ind_col = A.nonzero() + for ix_trg, ix_src in zip(ind_row, ind_col): + name_src = i2n[ix_src] + name_trg = i2n[ix_trg] + sign = np.sign(A[ix_trg, ix_src]) + dg.add_edge(name_src, name_trg) + dg.edges[name_src, name_trg]['SIGN'] = sign + # end of for + return dg + # end of for +# end of def to_networkx_digraph + +def rand_swap(A, nsamp=10, noself=True, pivots=None, inplace=False): + """Randomly rewire the network connections by swapping. + + Parameters + ---------- + A : numpy.ndarray + Adjacency matrix (connection matrix). + nsamp : int, optional + Number of sampled connections to rewire + noself : bool, optional + Whether to allow self-loop link. + pivots : list, optional + Indices of pivot nodes + inplace : bool, optional + Modify the given adjacency matrix for rewiring. + + + Returns + ------- + B : numpy.ndarray + The randomized matrix. + The reference of the given W is returned, when inplace is True. + """ + + + if not inplace: + A_org = A + B = A.copy() #np.array(A, dtype=np.float64) + else: + A_org = A.copy() #np.array(A, dtype=np.float64) + B = A + + cnt = 0 + while cnt < nsamp: + ir, ic = B.nonzero() + if pivots: + if np.random.uniform() < 0.5: + isrc1 = np.random.choice(pivots) + nz = B[:, isrc1].nonzero()[0] + if len(nz) == 0: + continue + itrg1 = np.random.choice(nz) + else: + itrg1 = np.random.choice(pivots) + nz = B[itrg1, :].nonzero()[0] + if len(nz) == 0: + continue + isrc1 = np.random.choice(nz) + # if-else + + itrg2, isrc2 = itrg1, isrc1 + while isrc1 == isrc2 and itrg1 == itrg2: + i2 = np.random.randint(0, ir.size) + itrg2, isrc2 = ir[i2], ic[i2] + else: + i1, i2 = 0, 0 + while i1 == i2: + i1, i2 = np.random.randint(0, ir.size, 2) + + itrg1, isrc1 = ir[i1], ic[i1] + itrg2, isrc2 = ir[i2], ic[i2] + + if noself: + if itrg2 == isrc1 or itrg1 == isrc2: + continue + + # Are the swapped links new? + if B[itrg2, isrc1] == 0 and B[itrg1, isrc2] == 0: + a, b = B[itrg1, isrc1], B[itrg2, isrc2] + + # Are the swapped links in the original network? + if A_org[itrg2, isrc1] == a and A_org[itrg1, isrc2] == b: + continue + + B[itrg2, isrc1], B[itrg1, isrc2] = a, b + B[itrg1, isrc1], B[itrg2, isrc2] = 0, 0 + cnt += 1 + else: + continue + # end of while + + if not inplace: + return B + + +def rand_flip(A, nsamp=10, pivots=None, inplace=False): + """Randomly flip the signs of connections. + + Parameters + ---------- + A : numpy.ndarray + Adjacency matrix (connection matrix). + nsamp : int, optional + Number of sampled connections to be flipped. + pivots : list, optional + Indices of pivot nodes + inplace : bool, optional + Modify the given adjacency matrix for rewiring. + + Returns + ------- + B : numpy.ndarray + The randomized matrix. + The reference of the given W is returned, when inplace is True. + """ + if not inplace: + B = A.copy() #np.array(A, dtype=np.float64) + else: + B = A + + ir, ic = B.nonzero() + if pivots: + iflip = np.random.choice(pivots, nsamp) + else: + iflip = np.random.randint(0, ir.size, nsamp) + + B[ir[iflip], ic[iflip]] *= -1 + return B + + +def rand_weights(W, lb=-3, ub=3, inplace=False): + """ Randomly sample the weights of connections in W from 10^(lb, ub). + + Parameters + ---------- + W : numpy.ndarray + Adjacency (connection) or weight matrix. + lb : float, optional + The 10's exponent for lower bound + inplace : bool, optional + Modify the given adjacency matrix for rewiring. + + Returns + ------- + B : numpy.ndarray + The randomly sampled weight matrix. + The reference of the given W is returned, when inplace is True. + """ + if not inplace: + B = np.array(W, dtype=np.float64) + else: + if not np.issubdtype(W.dtype, np.floating): + raise ValueError("W.dtype given to rand_weights should be " + "a float type, not %s"%(W.dtype)) + + B = W + # end of if-else + + ir, ic = B.nonzero() + weights_rand = 10 ** np.random.uniform(lb, ub, + size=(ir.size,)) + + B[ir, ic] = weights_rand*np.sign(B[ir, ic], dtype=np.float) + """The above code is equal to the following: + + for i in range(ir.size): + p, q = ir[i], ic[i] + B[p, q] = weights_rand[i] * np.sign(B[p, q], dtype=np.float) + """ + return B + + +def rand_structure(A, nswap=10, nflip=10, noself=True, pivots=None, inplace=False): + if not inplace: + B = A.copy() + else: + B = A + if nflip > 0: + B = rand_flip(B, nflip, pivots, inplace) + if nswap > 0: + B = rand_swap(B, nswap, noself, pivots, inplace) + return B + + +def get_akey(d): + """Get a key from a given dictionary. + It returns the first key in d.keys(). + + Parameters + ---------- + d : dict + Dictionary of objects. + + Returns + ------- + obj : object + First item of iter(d.keys()). + """ + return next(iter(d.keys())) + + +def get_avalue(d): + """Get a value from a given dictionary. + It returns the value designated by sfa.get_akey(). + + Parameters + ---------- + d : dict + Dictionary of objects. + + Returns + ------- + obj : object + First item of d[iter(d.keys())]. + """ + akey = next(iter(d.keys())) + return d[akey] \ No newline at end of file