Mercurial > repos > laurenmarazzi > netisce_test
comparison tools/myTools/bin/sfa/utils.py @ 1:7e5c71b2e71f draft default tip
Uploaded
author | laurenmarazzi |
---|---|
date | Wed, 22 Dec 2021 16:00:34 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:f24d4892aaed | 1:7e5c71b2e71f |
---|---|
1 # -*- coding: utf-8 -*- | |
2 import sys | |
3 if sys.version_info <= (2, 8): | |
4 from builtins import super | |
5 | |
6 import os | |
7 import codecs | |
8 from collections import defaultdict | |
9 | |
10 import numpy as np | |
11 import scipy as sp | |
12 import pandas as pd | |
13 import networkx as nx | |
14 | |
15 | |
16 __all__ = ["FrozenClass", | |
17 "Singleton", | |
18 "to_networkx_digraph", | |
19 "normalize", | |
20 "rand_swap", | |
21 "rand_flip", | |
22 "rand_weights", | |
23 "rand_structure", | |
24 "get_akey", | |
25 "get_avalue",] | |
26 | |
27 | |
28 class FrozenClass(object): | |
29 | |
30 __isfrozen = False | |
31 | |
32 def __setattr__(self, key, value): | |
33 if self.__isfrozen and not hasattr(self, key): | |
34 raise TypeError( "%r is a frozen class" % self ) | |
35 object.__setattr__(self, key, value) | |
36 | |
37 def _freeze(self): | |
38 self.__isfrozen = True | |
39 | |
40 def _melt(self): | |
41 self.__isfrozen = False | |
42 | |
43 """ | |
44 <Reference> | |
45 http://stackoverflow.com/questions/3603502/prevent-creating-new-attributes-outside-init | |
46 """ | |
47 # end of def FrozenClass | |
48 | |
49 | |
50 def Singleton(_class): | |
51 class __Singleton(_class): | |
52 __instance = None | |
53 | |
54 def __new__(cls, *args, **kwargs): | |
55 if cls.__instance is None: | |
56 cls.__instance = super().__new__(cls, *args, **kwargs) | |
57 | |
58 # Creation and initialization of '__initialized' | |
59 cls.__instance.__initialized = False | |
60 # end of if | |
61 return cls.__instance | |
62 | |
63 def __init__(self, *args, **kwargs): | |
64 if self.__initialized: | |
65 return | |
66 | |
67 super().__init__(*args, **kwargs) | |
68 self.__initialized = True | |
69 | |
70 def __repr__(self): | |
71 return '<{0} Singleton object at {1}>'.format( | |
72 _class.__name__, hex(id(self))) | |
73 | |
74 def __str__(self): | |
75 return super().__str__() | |
76 # end of def class | |
77 | |
78 __Singleton.__name__ = _class.__name__ | |
79 return __Singleton | |
80 | |
81 """ | |
82 <References> | |
83 http://m.egloos.zum.com/mataeoh/v/7081556 | |
84 """ | |
85 # end of def Singleton | |
86 | |
87 def normalize(A, norm_in=True, norm_out=True): | |
88 # Check whether A is a square matrix | |
89 if A.shape[0] != A.shape[1]: | |
90 raise ValueError( | |
91 "The A (adjacency matrix) should be square matrix.") | |
92 | |
93 # Build propagation matrix (aka. transition matrix) _W from A | |
94 W = A.copy() | |
95 | |
96 # Norm. out-degree | |
97 if norm_out == True: | |
98 sum_col_A = np.abs(A).sum(axis=0) | |
99 sum_col_A[sum_col_A == 0] = 1 | |
100 if norm_in == False: | |
101 Dc = 1 / sum_col_A | |
102 else: | |
103 Dc = 1 / np.sqrt(sum_col_A) | |
104 # end of else | |
105 W = Dc * W # This is not matrix multiplication | |
106 | |
107 # Norm. in-degree | |
108 if norm_in == True: | |
109 sum_row_A = np.abs(A).sum(axis=1) | |
110 sum_row_A[sum_row_A == 0] = 1 | |
111 if norm_out == False: | |
112 Dr = 1 / sum_row_A | |
113 else: | |
114 Dr = 1 / np.sqrt(sum_row_A) | |
115 # end of row | |
116 W = np.multiply(W, np.mat(Dr).T) | |
117 # Converting np.mat to ndarray | |
118 # does not cost a lot. | |
119 W = W.A | |
120 # end of if | |
121 """ | |
122 The normalization above is the same as the follows: | |
123 >>> np.diag(Dr).dot(A.dot(np.diag(Dc))) | |
124 """ | |
125 return W | |
126 | |
127 | |
128 # end of def normalize | |
129 | |
130 def to_networkx_digraph(A, n2i=None): | |
131 if not n2i: | |
132 return nx.from_numpy_array(A, create_using=nx.Digraph) | |
133 | |
134 i2n = {ix:name for name, ix in n2i.items()} | |
135 dg = nx.DiGraph() | |
136 ind_row, ind_col = A.nonzero() | |
137 for ix_trg, ix_src in zip(ind_row, ind_col): | |
138 name_src = i2n[ix_src] | |
139 name_trg = i2n[ix_trg] | |
140 sign = np.sign(A[ix_trg, ix_src]) | |
141 dg.add_edge(name_src, name_trg) | |
142 dg.edges[name_src, name_trg]['SIGN'] = sign | |
143 # end of for | |
144 return dg | |
145 # end of for | |
146 # end of def to_networkx_digraph | |
147 | |
148 def rand_swap(A, nsamp=10, noself=True, pivots=None, inplace=False): | |
149 """Randomly rewire the network connections by swapping. | |
150 | |
151 Parameters | |
152 ---------- | |
153 A : numpy.ndarray | |
154 Adjacency matrix (connection matrix). | |
155 nsamp : int, optional | |
156 Number of sampled connections to rewire | |
157 noself : bool, optional | |
158 Whether to allow self-loop link. | |
159 pivots : list, optional | |
160 Indices of pivot nodes | |
161 inplace : bool, optional | |
162 Modify the given adjacency matrix for rewiring. | |
163 | |
164 | |
165 Returns | |
166 ------- | |
167 B : numpy.ndarray | |
168 The randomized matrix. | |
169 The reference of the given W is returned, when inplace is True. | |
170 """ | |
171 | |
172 | |
173 if not inplace: | |
174 A_org = A | |
175 B = A.copy() #np.array(A, dtype=np.float64) | |
176 else: | |
177 A_org = A.copy() #np.array(A, dtype=np.float64) | |
178 B = A | |
179 | |
180 cnt = 0 | |
181 while cnt < nsamp: | |
182 ir, ic = B.nonzero() | |
183 if pivots: | |
184 if np.random.uniform() < 0.5: | |
185 isrc1 = np.random.choice(pivots) | |
186 nz = B[:, isrc1].nonzero()[0] | |
187 if len(nz) == 0: | |
188 continue | |
189 itrg1 = np.random.choice(nz) | |
190 else: | |
191 itrg1 = np.random.choice(pivots) | |
192 nz = B[itrg1, :].nonzero()[0] | |
193 if len(nz) == 0: | |
194 continue | |
195 isrc1 = np.random.choice(nz) | |
196 # if-else | |
197 | |
198 itrg2, isrc2 = itrg1, isrc1 | |
199 while isrc1 == isrc2 and itrg1 == itrg2: | |
200 i2 = np.random.randint(0, ir.size) | |
201 itrg2, isrc2 = ir[i2], ic[i2] | |
202 else: | |
203 i1, i2 = 0, 0 | |
204 while i1 == i2: | |
205 i1, i2 = np.random.randint(0, ir.size, 2) | |
206 | |
207 itrg1, isrc1 = ir[i1], ic[i1] | |
208 itrg2, isrc2 = ir[i2], ic[i2] | |
209 | |
210 if noself: | |
211 if itrg2 == isrc1 or itrg1 == isrc2: | |
212 continue | |
213 | |
214 # Are the swapped links new? | |
215 if B[itrg2, isrc1] == 0 and B[itrg1, isrc2] == 0: | |
216 a, b = B[itrg1, isrc1], B[itrg2, isrc2] | |
217 | |
218 # Are the swapped links in the original network? | |
219 if A_org[itrg2, isrc1] == a and A_org[itrg1, isrc2] == b: | |
220 continue | |
221 | |
222 B[itrg2, isrc1], B[itrg1, isrc2] = a, b | |
223 B[itrg1, isrc1], B[itrg2, isrc2] = 0, 0 | |
224 cnt += 1 | |
225 else: | |
226 continue | |
227 # end of while | |
228 | |
229 if not inplace: | |
230 return B | |
231 | |
232 | |
233 def rand_flip(A, nsamp=10, pivots=None, inplace=False): | |
234 """Randomly flip the signs of connections. | |
235 | |
236 Parameters | |
237 ---------- | |
238 A : numpy.ndarray | |
239 Adjacency matrix (connection matrix). | |
240 nsamp : int, optional | |
241 Number of sampled connections to be flipped. | |
242 pivots : list, optional | |
243 Indices of pivot nodes | |
244 inplace : bool, optional | |
245 Modify the given adjacency matrix for rewiring. | |
246 | |
247 Returns | |
248 ------- | |
249 B : numpy.ndarray | |
250 The randomized matrix. | |
251 The reference of the given W is returned, when inplace is True. | |
252 """ | |
253 if not inplace: | |
254 B = A.copy() #np.array(A, dtype=np.float64) | |
255 else: | |
256 B = A | |
257 | |
258 ir, ic = B.nonzero() | |
259 if pivots: | |
260 iflip = np.random.choice(pivots, nsamp) | |
261 else: | |
262 iflip = np.random.randint(0, ir.size, nsamp) | |
263 | |
264 B[ir[iflip], ic[iflip]] *= -1 | |
265 return B | |
266 | |
267 | |
268 def rand_weights(W, lb=-3, ub=3, inplace=False): | |
269 """ Randomly sample the weights of connections in W from 10^(lb, ub). | |
270 | |
271 Parameters | |
272 ---------- | |
273 W : numpy.ndarray | |
274 Adjacency (connection) or weight matrix. | |
275 lb : float, optional | |
276 The 10's exponent for lower bound | |
277 inplace : bool, optional | |
278 Modify the given adjacency matrix for rewiring. | |
279 | |
280 Returns | |
281 ------- | |
282 B : numpy.ndarray | |
283 The randomly sampled weight matrix. | |
284 The reference of the given W is returned, when inplace is True. | |
285 """ | |
286 if not inplace: | |
287 B = np.array(W, dtype=np.float64) | |
288 else: | |
289 if not np.issubdtype(W.dtype, np.floating): | |
290 raise ValueError("W.dtype given to rand_weights should be " | |
291 "a float type, not %s"%(W.dtype)) | |
292 | |
293 B = W | |
294 # end of if-else | |
295 | |
296 ir, ic = B.nonzero() | |
297 weights_rand = 10 ** np.random.uniform(lb, ub, | |
298 size=(ir.size,)) | |
299 | |
300 B[ir, ic] = weights_rand*np.sign(B[ir, ic], dtype=np.float) | |
301 """The above code is equal to the following: | |
302 | |
303 for i in range(ir.size): | |
304 p, q = ir[i], ic[i] | |
305 B[p, q] = weights_rand[i] * np.sign(B[p, q], dtype=np.float) | |
306 """ | |
307 return B | |
308 | |
309 | |
310 def rand_structure(A, nswap=10, nflip=10, noself=True, pivots=None, inplace=False): | |
311 if not inplace: | |
312 B = A.copy() | |
313 else: | |
314 B = A | |
315 if nflip > 0: | |
316 B = rand_flip(B, nflip, pivots, inplace) | |
317 if nswap > 0: | |
318 B = rand_swap(B, nswap, noself, pivots, inplace) | |
319 return B | |
320 | |
321 | |
322 def get_akey(d): | |
323 """Get a key from a given dictionary. | |
324 It returns the first key in d.keys(). | |
325 | |
326 Parameters | |
327 ---------- | |
328 d : dict | |
329 Dictionary of objects. | |
330 | |
331 Returns | |
332 ------- | |
333 obj : object | |
334 First item of iter(d.keys()). | |
335 """ | |
336 return next(iter(d.keys())) | |
337 | |
338 | |
339 def get_avalue(d): | |
340 """Get a value from a given dictionary. | |
341 It returns the value designated by sfa.get_akey(). | |
342 | |
343 Parameters | |
344 ---------- | |
345 d : dict | |
346 Dictionary of objects. | |
347 | |
348 Returns | |
349 ------- | |
350 obj : object | |
351 First item of d[iter(d.keys())]. | |
352 """ | |
353 akey = next(iter(d.keys())) | |
354 return d[akey] |