| 0 | 1 ##@file readsCoverage_res.py | 
|  | 2 # Compact structure to store partial results of readsCoverage for later processing; used in multi machine mode and for checkpoints. | 
|  | 3 # | 
|  | 4 #@author vlegrand@pasteur.fr | 
|  | 5 import numpy as np | 
|  | 6 import os | 
|  | 7 import time | 
|  | 8 | 
|  | 9 base_chk_fname="chk_" | 
|  | 10 chk_fname_sep="_" | 
|  | 11 | 
|  | 12 | 
|  | 13 ## Utility classes for testing the checkpoint implementation | 
|  | 14 # class checkpoint_visitor: | 
|  | 15 #     def __str__(self): | 
|  | 16 #         return self.__class__.__name__ | 
|  | 17 # | 
|  | 18 # class checkpoint_visitor_11150_Cos5(checkpoint_visitor): | 
|  | 19 #     def visit(self,chk_res): | 
|  | 20 #         if chk_res.host_len!=0 or chk_res.gen!=25 or chk_res.reads_tested!=2: | 
|  | 21 #             return False | 
|  | 22 #         return True | 
|  | 23 # | 
|  | 24 # class checkpoint_visitor_38_Cos5(checkpoint_visitor): | 
|  | 25 #     def visit(self,chk_res): | 
|  | 26 #         if chk_res.host_len!=0 or chk_res.gen!=25 or chk_res.reads_tested!=2: | 
|  | 27 #             return False | 
|  | 28 #         return True | 
|  | 29 | 
|  | 30 | 
|  | 31 | 
|  | 32 | 
|  | 33 | 
|  | 34 | 
|  | 35 def loadArr(arr_idx0,arr_val0,arr_idx1,arr_val1,arr2D): | 
|  | 36     for idx, val in zip(arr_idx0, arr_val0): | 
|  | 37         arr2D[0][idx] = val | 
|  | 38 | 
|  | 39     for idx, val in zip(arr_idx1, arr_val1): | 
|  | 40         arr2D[1][idx] = val | 
|  | 41 | 
|  | 42 | 
|  | 43 def loadRCRes(filename): | 
|  | 44     npzfile = np.load(filename) | 
|  | 45     gen_len=npzfile['gen_len'] | 
|  | 46     gen_len=int(gen_len) | 
|  | 47     host_len=npzfile['host_len'] | 
|  | 48     host_len=int(host_len) | 
|  | 49     termini_coverage_idx0 = npzfile['termini_coverage_idx0'] | 
|  | 50     termini_coverage_val0=npzfile['termini_coverage_val0'] | 
|  | 51     termini_coverage_idx1 = npzfile['termini_coverage_idx1'] | 
|  | 52     termini_coverage_val1 = npzfile['termini_coverage_val1'] | 
|  | 53 | 
|  | 54     whole_coverage_idx0=npzfile['whole_coverage_idx0'] | 
|  | 55     whole_coverage_val0 = npzfile['whole_coverage_val0'] | 
|  | 56     whole_coverage_idx1 = npzfile['whole_coverage_idx1'] | 
|  | 57     whole_coverage_val1 = npzfile['whole_coverage_val1'] | 
|  | 58 | 
|  | 59     paired_whole_coverage_idx0=npzfile['paired_whole_coverage_idx0'] | 
|  | 60     paired_whole_coverage_val0 = npzfile['paired_whole_coverage_val0'] | 
|  | 61     paired_whole_coverage_idx1 = npzfile['paired_whole_coverage_idx1'] | 
|  | 62     paired_whole_coverage_val1 = npzfile['paired_whole_coverage_val1'] | 
|  | 63 | 
|  | 64     phage_hybrid_coverage_idx0=npzfile['phage_hybrid_coverage_idx0'] | 
|  | 65     phage_hybrid_coverage_val0 = npzfile['phage_hybrid_coverage_val0'] | 
|  | 66     phage_hybrid_coverage_idx1 = npzfile['phage_hybrid_coverage_idx0'] | 
|  | 67     phage_hybrid_coverage_val1 = npzfile['phage_hybrid_coverage_idx1'] | 
|  | 68 | 
|  | 69     host_hybrid_coverage_idx0 = npzfile['host_hybrid_coverage_idx0'] | 
|  | 70     host_hybrid_coverage_val0 = npzfile['host_hybrid_coverage_val0'] | 
|  | 71     host_hybrid_coverage_idx1 = npzfile['host_hybrid_coverage_idx1'] | 
|  | 72     host_hybrid_coverage_val1 = npzfile['host_hybrid_coverage_val1'] | 
|  | 73 | 
|  | 74     host_whole_coverage_idx0 = npzfile['host_whole_coverage_idx0'] | 
|  | 75     host_whole_coverage_val0 = npzfile['host_whole_coverage_val0'] | 
|  | 76     host_whole_coverage_idx1 = npzfile['host_whole_coverage_idx1'] | 
|  | 77     host_whole_coverage_val1 = npzfile['host_whole_coverage_val1'] | 
|  | 78 | 
|  | 79     list_hybrid=npzfile['list_hybrid'] | 
|  | 80     insert=npzfile['insert'] | 
|  | 81     insert=list(insert) | 
|  | 82     paired_mismatch=npzfile['paired_mismatch'] | 
|  | 83     reads_tested=npzfile['reads_tested'] | 
|  | 84 | 
|  | 85     termini_coverage=np.array([gen_len*[0], gen_len*[0]]) | 
|  | 86 | 
|  | 87     whole_coverage        = np.array([gen_len*[0], gen_len*[0]]) | 
|  | 88     paired_whole_coverage = np.array([gen_len*[0], gen_len*[0]]) | 
|  | 89     phage_hybrid_coverage = np.array([gen_len*[0], gen_len*[0]]) | 
|  | 90     host_hybrid_coverage  = np.array([host_len*[0], host_len*[0]]) | 
|  | 91     host_whole_coverage   = np.array([host_len*[0], host_len*[0]]) | 
|  | 92     loadArr(termini_coverage_idx0,termini_coverage_val0,termini_coverage_idx1,termini_coverage_val1,termini_coverage) | 
|  | 93     loadArr(whole_coverage_idx0,whole_coverage_val0,whole_coverage_idx1,whole_coverage_val1,whole_coverage) | 
|  | 94     loadArr(paired_whole_coverage_idx0,paired_whole_coverage_val0,paired_whole_coverage_idx1,paired_whole_coverage_val1,paired_whole_coverage) | 
|  | 95     loadArr(phage_hybrid_coverage_idx0,phage_hybrid_coverage_val0,phage_hybrid_coverage_idx1,phage_hybrid_coverage_val1,phage_hybrid_coverage) | 
|  | 96     loadArr(host_hybrid_coverage_idx0,host_hybrid_coverage_val0,host_hybrid_coverage_idx1,host_hybrid_coverage_val1,host_hybrid_coverage) | 
|  | 97     loadArr(host_whole_coverage_idx0,host_whole_coverage_val0,host_whole_coverage_idx1,host_whole_coverage_val1,host_whole_coverage) | 
|  | 98 | 
|  | 99     res=RCRes(termini_coverage,whole_coverage,paired_whole_coverage,\ | 
|  | 100               phage_hybrid_coverage, host_hybrid_coverage,\ | 
|  | 101               host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested) | 
|  | 102 | 
|  | 103     return res | 
|  | 104 | 
|  | 105 ## | 
|  | 106 # Working structure for readsCoverage (encapsulating temporary results) | 
|  | 107 class RCWorkingS: | 
|  | 108     def __init__(self,rc_res,cnt_line,read_match): | 
|  | 109         self.interm_res=rc_res | 
|  | 110         self.count_line=cnt_line | 
|  | 111         self.read_match=read_match | 
|  | 112 | 
|  | 113 class RCRes: | 
|  | 114     def __init__(self,termini_coverage,whole_coverage,paired_whole_coverage,\ | 
|  | 115                  phage_hybrid_coverage, host_hybrid_coverage, \ | 
|  | 116                  host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested): | 
|  | 117 | 
|  | 118         self.termini_coverage=termini_coverage | 
|  | 119         self.whole_coverage=whole_coverage | 
|  | 120         self.paired_whole_coverage=paired_whole_coverage | 
|  | 121         self.phage_hybrid_coverage=phage_hybrid_coverage | 
|  | 122         self.host_hybrid_coverage=host_hybrid_coverage | 
|  | 123         self.host_whole_coverage=host_whole_coverage | 
|  | 124 | 
|  | 125         self.list_hybrid=list_hybrid | 
|  | 126         self.insert=insert | 
|  | 127         self.paired_mismatch=paired_mismatch | 
|  | 128         self.reads_tested=reads_tested | 
|  | 129 | 
|  | 130         self.gen_len = len(self.termini_coverage[0]) | 
|  | 131         self.host_len= len(self.host_hybrid_coverage[0]) | 
|  | 132 | 
|  | 133     # def accept(self,a_visitor): | 
|  | 134     #     self.vis=a_visitor | 
|  | 135     # | 
|  | 136     # def make_visit(self): | 
|  | 137     #     self.vis.visit() | 
|  | 138 | 
|  | 139     def save(self,filename): | 
|  | 140         termini_coverage_idx0 = np.flatnonzero(self.termini_coverage[0]) | 
|  | 141         termini_coverage_val0 = self.termini_coverage[0][termini_coverage_idx0] | 
|  | 142         termini_coverage_idx1 = np.flatnonzero(self.termini_coverage[1]) | 
|  | 143         termini_coverage_val1 = self.termini_coverage[1][termini_coverage_idx1] | 
|  | 144 | 
|  | 145         whole_coverage_idx0 = np.flatnonzero(self.whole_coverage[0]) | 
|  | 146         whole_coverage_val0 = self.whole_coverage[0][whole_coverage_idx0] | 
|  | 147         whole_coverage_idx1 = np.flatnonzero(self.whole_coverage[1]) | 
|  | 148         whole_coverage_val1 = self.whole_coverage[1][whole_coverage_idx1] | 
|  | 149 | 
|  | 150         paired_whole_coverage_idx0 = np.flatnonzero(self.paired_whole_coverage[0]) | 
|  | 151         paired_whole_coverage_val0 = self.paired_whole_coverage[0][paired_whole_coverage_idx0] | 
|  | 152         paired_whole_coverage_idx1 = np.flatnonzero(self.paired_whole_coverage[1]) | 
|  | 153         paired_whole_coverage_val1 = self.paired_whole_coverage[1][paired_whole_coverage_idx1] | 
|  | 154 | 
|  | 155         phage_hybrid_coverage_idx0 = np.flatnonzero(self.phage_hybrid_coverage[0]) | 
|  | 156         phage_hybrid_coverage_val0 = self.phage_hybrid_coverage[0][phage_hybrid_coverage_idx0] | 
|  | 157         phage_hybrid_coverage_idx1 = np.flatnonzero(self.phage_hybrid_coverage[1]) | 
|  | 158         phage_hybrid_coverage_val1 = self.phage_hybrid_coverage[1][phage_hybrid_coverage_idx1] | 
|  | 159 | 
|  | 160         host_hybrid_coverage_idx0 = np.flatnonzero(self.host_hybrid_coverage[0]) | 
|  | 161         host_hybrid_coverage_val0 = self.host_hybrid_coverage[0][host_hybrid_coverage_idx0] | 
|  | 162         host_hybrid_coverage_idx1 = np.flatnonzero(self.host_hybrid_coverage[1]) | 
|  | 163         host_hybrid_coverage_val1 = self.host_hybrid_coverage[1][host_hybrid_coverage_idx1] | 
|  | 164 | 
|  | 165         host_whole_coverage_idx0 = np.flatnonzero(self.host_whole_coverage[0]) | 
|  | 166         host_whole_coverage_val0 = self.host_whole_coverage[0][host_whole_coverage_idx0] | 
|  | 167         host_whole_coverage_idx1 = np.flatnonzero(self.host_whole_coverage[1]) | 
|  | 168         host_whole_coverage_val1 = self.host_whole_coverage[1][host_whole_coverage_idx1] | 
|  | 169 | 
|  | 170         np.savez(filename,gen_len=np.array(self.gen_len),host_len=np.array(self.host_len),\ | 
|  | 171                  termini_coverage_idx0=termini_coverage_idx0, termini_coverage_val0=termini_coverage_val0,\ | 
|  | 172                  termini_coverage_idx1=termini_coverage_idx1, termini_coverage_val1=termini_coverage_val1,\ | 
|  | 173                  whole_coverage_idx0=whole_coverage_idx0,whole_coverage_val0=whole_coverage_val0,\ | 
|  | 174                  whole_coverage_idx1=whole_coverage_idx1,whole_coverage_val1=whole_coverage_val1,\ | 
|  | 175                  paired_whole_coverage_idx0=paired_whole_coverage_idx0,paired_whole_coverage_val0=paired_whole_coverage_val0,\ | 
|  | 176                  paired_whole_coverage_idx1=paired_whole_coverage_idx1,paired_whole_coverage_val1=paired_whole_coverage_val1, \ | 
|  | 177                  phage_hybrid_coverage_idx0=phage_hybrid_coverage_idx0,phage_hybrid_coverage_val0=phage_hybrid_coverage_val0, \ | 
|  | 178                  phage_hybrid_coverage_idx1=phage_hybrid_coverage_idx1,phage_hybrid_coverage_val1=phage_hybrid_coverage_val1, \ | 
|  | 179                  host_hybrid_coverage_idx0=host_hybrid_coverage_idx0,host_hybrid_coverage_val0=host_hybrid_coverage_val0, \ | 
|  | 180                  host_hybrid_coverage_idx1=host_hybrid_coverage_idx1,host_hybrid_coverage_val1=host_hybrid_coverage_val1, \ | 
|  | 181                  host_whole_coverage_idx0=host_whole_coverage_idx0,host_whole_coverage_val0=host_whole_coverage_val0, \ | 
|  | 182                  host_whole_coverage_idx1=host_whole_coverage_idx1,host_whole_coverage_val1=host_whole_coverage_val1, \ | 
|  | 183                  list_hybrid=self.list_hybrid,insert=self.insert,paired_mismatch=np.array(self.paired_mismatch),\ | 
|  | 184                  reads_tested=self.reads_tested) | 
|  | 185 | 
|  | 186 | 
|  | 187 class RCCheckpoint: | 
|  | 188     def __init__(self,count_line,core_id,idx_seq,termini_coverage,whole_coverage,paired_whole_coverage,\ | 
|  | 189                  phage_hybrid_coverage, host_hybrid_coverage, \ | 
|  | 190                  host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested,read_match): | 
|  | 191         self.count_line=count_line | 
|  | 192         self.core_id=core_id | 
|  | 193         self.idx_seq=idx_seq | 
|  | 194         self.read_match=read_match | 
|  | 195         self.res=RCRes(termini_coverage,whole_coverage,paired_whole_coverage,\ | 
|  | 196                  phage_hybrid_coverage, host_hybrid_coverage, \ | 
|  | 197                  host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested) | 
|  | 198 | 
|  | 199 | 
|  | 200     def save(self,dir_chk,core_id,idx_refseq): | 
|  | 201         filename=base_chk_fname+str(self.core_id)+chk_fname_sep+str(self.idx_seq)+chk_fname_sep+\ | 
|  | 202                  str(self.count_line)+chk_fname_sep+str(self.read_match) | 
|  | 203         full_fname = os.path.join(dir_chk, filename) | 
|  | 204         self.res.save(full_fname) | 
|  | 205         # remove old breakpoint file | 
|  | 206         list_f=os.listdir(dir_chk) | 
|  | 207         sub_s=base_chk_fname+ str(core_id) + chk_fname_sep + str(idx_refseq) + chk_fname_sep | 
|  | 208         for f in list_f: | 
|  | 209             if f!=filename+".npz" and sub_s in f: | 
|  | 210                 os.remove(os.path.join(dir_chk,f)) | 
|  | 211 | 
|  | 212 | 
|  | 213 class RCCheckpoint_handler: | 
|  | 214     def __init__(self,chk_freq,dir_chk,test_mode=False): | 
|  | 215         self.chk_freq=chk_freq | 
|  | 216         self.test_mode = test_mode | 
|  | 217         self.start_t=0 | 
|  | 218         self.dir_chk = dir_chk | 
|  | 219         # if self.test_mode == True: | 
|  | 220         #     self.v38_C5 = checkpoint_visitor_38_Cos5() | 
|  | 221         #     self.v11150_C5 = checkpoint_visitor_11150_Cos5() | 
|  | 222         if self.test_mode==True: | 
|  | 223             self.start_t = time.perf_counter_ns() | 
|  | 224             if os.path.exists(dir_chk): | 
|  | 225                 if not (os.path.isdir(dir_chk)): | 
|  | 226                     raise RuntimeError("dir_chk must point to a directory") | 
|  | 227             else: | 
|  | 228                 os.mkdir(dir_chk) | 
|  | 229         elif self.chk_freq!=0: | 
|  | 230             if os.path.exists(dir_chk): | 
|  | 231                 if not (os.path.isdir(dir_chk)): | 
|  | 232                     raise RuntimeError("dir_chk must point to a directory") | 
|  | 233             else: | 
|  | 234                 raise RuntimeError("dir_chk must point to an existing directory") | 
|  | 235 | 
|  | 236     def getIdxSeq(self,core_id): | 
|  | 237         idx_seq=0 | 
|  | 238         if self.chk_freq!=0 or self.test_mode==True: | 
|  | 239             list_f = os.listdir(self.dir_chk) | 
|  | 240             subfname = base_chk_fname+ str(core_id) + chk_fname_sep | 
|  | 241             chk_f = "" | 
|  | 242             for fname in list_f: | 
|  | 243                 if subfname in fname: | 
|  | 244                     chk_f = fname | 
|  | 245                     break | 
|  | 246             if chk_f != "": | 
|  | 247                 l=chk_f.split(chk_fname_sep) | 
|  | 248                 idx_seq=int(l[2]) | 
|  | 249         return idx_seq | 
|  | 250 | 
|  | 251 | 
|  | 252     def load(self,core_id,idx_refseq): | 
|  | 253         if self.chk_freq!=0 or self.test_mode==True: | 
|  | 254             list_f = os.listdir(self.dir_chk) | 
|  | 255             subfname = base_chk_fname+ str(core_id) + chk_fname_sep + str(idx_refseq) + chk_fname_sep | 
|  | 256             chk_f = "" | 
|  | 257             for fname in list_f: | 
|  | 258                 if subfname in fname: | 
|  | 259                     chk_f = fname | 
|  | 260                     break | 
|  | 261             if chk_f != "": | 
|  | 262                 interm_res=loadRCRes(os.path.join(self.dir_chk,chk_f)) | 
|  | 263                 # if self.test_mode==True: | 
|  | 264                 #     interm_res.accept(self.v38_C5) | 
|  | 265                 l=chk_f.split(chk_fname_sep) | 
|  | 266                 cnt_line=int(l[-2]) | 
|  | 267                 tmp=l[-1] # get rid of .npz extension | 
|  | 268                 l2=tmp.split(".") | 
|  | 269                 read_match=int(l2[0]) | 
|  | 270                 partial_res=RCWorkingS(interm_res,cnt_line,read_match) | 
|  | 271                 # if self.test_mode: | 
|  | 272                 #     partial_res.accept(self.v38_C5) | 
|  | 273                 #     partial_res.make_visit() | 
|  | 274                 return partial_res | 
|  | 275             else:  # no checkpoint found for this sequence, start from beginning | 
|  | 276                 return None | 
|  | 277         else: | 
|  | 278             return None | 
|  | 279 | 
|  | 280 | 
|  | 281     def check(self,count_line,core_id,idx_seq,termini_coverage,whole_coverage,paired_whole_coverage,\ | 
|  | 282                  phage_hybrid_coverage, host_hybrid_coverage, \ | 
|  | 283                  host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested,read_match): | 
|  | 284         cur_t = time.perf_counter_ns() | 
|  | 285         elapsed_t = cur_t - self.start_t | 
|  | 286         #convert elapsed_t tp to seconds | 
|  | 287         elaspsed_t=elapsed_t * 0.000000001 | 
|  | 288         if (self.test_mode==True or (self.chk_freq!=0 and (elapsed_t % self.chk_freq == 0))):  # time to create checkpoint. | 
|  | 289             chkp=RCCheckpoint(count_line,core_id,idx_seq,termini_coverage,whole_coverage,paired_whole_coverage,\ | 
|  | 290                  phage_hybrid_coverage, host_hybrid_coverage, \ | 
|  | 291                  host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested,read_match) | 
|  | 292             chkp.save(self.dir_chk,core_id,idx_seq) | 
|  | 293 | 
|  | 294 | 
|  | 295     def end(self,core_id): | 
|  | 296         if (self.test_mode==False and self.chk_freq!=0) : | 
|  | 297             # remove old breakpoint file | 
|  | 298             list_f = os.listdir(self.dir_chk) | 
|  | 299             sub_s=base_chk_fname+str(core_id)+chk_fname_sep | 
|  | 300             for f in list_f: | 
|  | 301                 if sub_s in f: | 
|  | 302                     os.remove(os.path.join(self.dir_chk, f)) | 
|  | 303 | 
|  | 304 | 
|  | 305 | 
|  | 306 | 
|  | 307 | 
|  | 308 | 
|  | 309 | 
|  | 310 | 
|  | 311 |