diff _modules/readsCoverage_res.py @ 0:69e8f12c8b31 draft

"planemo upload"
author bioit_sciensano
date Fri, 11 Mar 2022 15:06:20 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/_modules/readsCoverage_res.py	Fri Mar 11 15:06:20 2022 +0000
@@ -0,0 +1,311 @@
+##@file readsCoverage_res.py
+# Compact structure to store partial results of readsCoverage for later processing; used in multi machine mode and for checkpoints.
+#
+#@author vlegrand@pasteur.fr
+import numpy as np
+import os
+import time
+
+base_chk_fname="chk_"
+chk_fname_sep="_"
+
+
+## Utility classes for testing the checkpoint implementation
+# class checkpoint_visitor:
+#     def __str__(self):
+#         return self.__class__.__name__
+#
+# class checkpoint_visitor_11150_Cos5(checkpoint_visitor):
+#     def visit(self,chk_res):
+#         if chk_res.host_len!=0 or chk_res.gen!=25 or chk_res.reads_tested!=2:
+#             return False
+#         return True
+#
+# class checkpoint_visitor_38_Cos5(checkpoint_visitor):
+#     def visit(self,chk_res):
+#         if chk_res.host_len!=0 or chk_res.gen!=25 or chk_res.reads_tested!=2:
+#             return False
+#         return True
+
+
+
+
+
+
+def loadArr(arr_idx0,arr_val0,arr_idx1,arr_val1,arr2D):
+    for idx, val in zip(arr_idx0, arr_val0):
+        arr2D[0][idx] = val
+
+    for idx, val in zip(arr_idx1, arr_val1):
+        arr2D[1][idx] = val
+
+
+def loadRCRes(filename):
+    npzfile = np.load(filename)
+    gen_len=npzfile['gen_len']
+    gen_len=int(gen_len)
+    host_len=npzfile['host_len']
+    host_len=int(host_len)
+    termini_coverage_idx0 = npzfile['termini_coverage_idx0']
+    termini_coverage_val0=npzfile['termini_coverage_val0']
+    termini_coverage_idx1 = npzfile['termini_coverage_idx1']
+    termini_coverage_val1 = npzfile['termini_coverage_val1']
+
+    whole_coverage_idx0=npzfile['whole_coverage_idx0']
+    whole_coverage_val0 = npzfile['whole_coverage_val0']
+    whole_coverage_idx1 = npzfile['whole_coverage_idx1']
+    whole_coverage_val1 = npzfile['whole_coverage_val1']
+
+    paired_whole_coverage_idx0=npzfile['paired_whole_coverage_idx0']
+    paired_whole_coverage_val0 = npzfile['paired_whole_coverage_val0']
+    paired_whole_coverage_idx1 = npzfile['paired_whole_coverage_idx1']
+    paired_whole_coverage_val1 = npzfile['paired_whole_coverage_val1']
+
+    phage_hybrid_coverage_idx0=npzfile['phage_hybrid_coverage_idx0']
+    phage_hybrid_coverage_val0 = npzfile['phage_hybrid_coverage_val0']
+    phage_hybrid_coverage_idx1 = npzfile['phage_hybrid_coverage_idx0']
+    phage_hybrid_coverage_val1 = npzfile['phage_hybrid_coverage_idx1']
+
+    host_hybrid_coverage_idx0 = npzfile['host_hybrid_coverage_idx0']
+    host_hybrid_coverage_val0 = npzfile['host_hybrid_coverage_val0']
+    host_hybrid_coverage_idx1 = npzfile['host_hybrid_coverage_idx1']
+    host_hybrid_coverage_val1 = npzfile['host_hybrid_coverage_val1']
+
+    host_whole_coverage_idx0 = npzfile['host_whole_coverage_idx0']
+    host_whole_coverage_val0 = npzfile['host_whole_coverage_val0']
+    host_whole_coverage_idx1 = npzfile['host_whole_coverage_idx1']
+    host_whole_coverage_val1 = npzfile['host_whole_coverage_val1']
+
+    list_hybrid=npzfile['list_hybrid']
+    insert=npzfile['insert']
+    insert=list(insert)
+    paired_mismatch=npzfile['paired_mismatch']
+    reads_tested=npzfile['reads_tested']
+
+    termini_coverage=np.array([gen_len*[0], gen_len*[0]])
+
+    whole_coverage        = np.array([gen_len*[0], gen_len*[0]])
+    paired_whole_coverage = np.array([gen_len*[0], gen_len*[0]])
+    phage_hybrid_coverage = np.array([gen_len*[0], gen_len*[0]])
+    host_hybrid_coverage  = np.array([host_len*[0], host_len*[0]])
+    host_whole_coverage   = np.array([host_len*[0], host_len*[0]])
+    loadArr(termini_coverage_idx0,termini_coverage_val0,termini_coverage_idx1,termini_coverage_val1,termini_coverage)
+    loadArr(whole_coverage_idx0,whole_coverage_val0,whole_coverage_idx1,whole_coverage_val1,whole_coverage)
+    loadArr(paired_whole_coverage_idx0,paired_whole_coverage_val0,paired_whole_coverage_idx1,paired_whole_coverage_val1,paired_whole_coverage)
+    loadArr(phage_hybrid_coverage_idx0,phage_hybrid_coverage_val0,phage_hybrid_coverage_idx1,phage_hybrid_coverage_val1,phage_hybrid_coverage)
+    loadArr(host_hybrid_coverage_idx0,host_hybrid_coverage_val0,host_hybrid_coverage_idx1,host_hybrid_coverage_val1,host_hybrid_coverage)
+    loadArr(host_whole_coverage_idx0,host_whole_coverage_val0,host_whole_coverage_idx1,host_whole_coverage_val1,host_whole_coverage)
+
+    res=RCRes(termini_coverage,whole_coverage,paired_whole_coverage,\
+              phage_hybrid_coverage, host_hybrid_coverage,\
+              host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested)
+
+    return res
+
+##
+# Working structure for readsCoverage (encapsulating temporary results)
+class RCWorkingS:
+    def __init__(self,rc_res,cnt_line,read_match):
+        self.interm_res=rc_res
+        self.count_line=cnt_line
+        self.read_match=read_match
+
+class RCRes:
+    def __init__(self,termini_coverage,whole_coverage,paired_whole_coverage,\
+                 phage_hybrid_coverage, host_hybrid_coverage, \
+                 host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested):
+
+        self.termini_coverage=termini_coverage
+        self.whole_coverage=whole_coverage
+        self.paired_whole_coverage=paired_whole_coverage
+        self.phage_hybrid_coverage=phage_hybrid_coverage
+        self.host_hybrid_coverage=host_hybrid_coverage
+        self.host_whole_coverage=host_whole_coverage
+
+        self.list_hybrid=list_hybrid
+        self.insert=insert
+        self.paired_mismatch=paired_mismatch
+        self.reads_tested=reads_tested
+
+        self.gen_len = len(self.termini_coverage[0])
+        self.host_len= len(self.host_hybrid_coverage[0])
+
+    # def accept(self,a_visitor):
+    #     self.vis=a_visitor
+    #
+    # def make_visit(self):
+    #     self.vis.visit()
+
+    def save(self,filename):
+        termini_coverage_idx0 = np.flatnonzero(self.termini_coverage[0])
+        termini_coverage_val0 = self.termini_coverage[0][termini_coverage_idx0]
+        termini_coverage_idx1 = np.flatnonzero(self.termini_coverage[1])
+        termini_coverage_val1 = self.termini_coverage[1][termini_coverage_idx1]
+
+        whole_coverage_idx0 = np.flatnonzero(self.whole_coverage[0])
+        whole_coverage_val0 = self.whole_coverage[0][whole_coverage_idx0]
+        whole_coverage_idx1 = np.flatnonzero(self.whole_coverage[1])
+        whole_coverage_val1 = self.whole_coverage[1][whole_coverage_idx1]
+
+        paired_whole_coverage_idx0 = np.flatnonzero(self.paired_whole_coverage[0])
+        paired_whole_coverage_val0 = self.paired_whole_coverage[0][paired_whole_coverage_idx0]
+        paired_whole_coverage_idx1 = np.flatnonzero(self.paired_whole_coverage[1])
+        paired_whole_coverage_val1 = self.paired_whole_coverage[1][paired_whole_coverage_idx1]
+
+        phage_hybrid_coverage_idx0 = np.flatnonzero(self.phage_hybrid_coverage[0])
+        phage_hybrid_coverage_val0 = self.phage_hybrid_coverage[0][phage_hybrid_coverage_idx0]
+        phage_hybrid_coverage_idx1 = np.flatnonzero(self.phage_hybrid_coverage[1])
+        phage_hybrid_coverage_val1 = self.phage_hybrid_coverage[1][phage_hybrid_coverage_idx1]
+
+        host_hybrid_coverage_idx0 = np.flatnonzero(self.host_hybrid_coverage[0])
+        host_hybrid_coverage_val0 = self.host_hybrid_coverage[0][host_hybrid_coverage_idx0]
+        host_hybrid_coverage_idx1 = np.flatnonzero(self.host_hybrid_coverage[1])
+        host_hybrid_coverage_val1 = self.host_hybrid_coverage[1][host_hybrid_coverage_idx1]
+
+        host_whole_coverage_idx0 = np.flatnonzero(self.host_whole_coverage[0])
+        host_whole_coverage_val0 = self.host_whole_coverage[0][host_whole_coverage_idx0]
+        host_whole_coverage_idx1 = np.flatnonzero(self.host_whole_coverage[1])
+        host_whole_coverage_val1 = self.host_whole_coverage[1][host_whole_coverage_idx1]
+
+        np.savez(filename,gen_len=np.array(self.gen_len),host_len=np.array(self.host_len),\
+                 termini_coverage_idx0=termini_coverage_idx0, termini_coverage_val0=termini_coverage_val0,\
+                 termini_coverage_idx1=termini_coverage_idx1, termini_coverage_val1=termini_coverage_val1,\
+                 whole_coverage_idx0=whole_coverage_idx0,whole_coverage_val0=whole_coverage_val0,\
+                 whole_coverage_idx1=whole_coverage_idx1,whole_coverage_val1=whole_coverage_val1,\
+                 paired_whole_coverage_idx0=paired_whole_coverage_idx0,paired_whole_coverage_val0=paired_whole_coverage_val0,\
+                 paired_whole_coverage_idx1=paired_whole_coverage_idx1,paired_whole_coverage_val1=paired_whole_coverage_val1, \
+                 phage_hybrid_coverage_idx0=phage_hybrid_coverage_idx0,phage_hybrid_coverage_val0=phage_hybrid_coverage_val0, \
+                 phage_hybrid_coverage_idx1=phage_hybrid_coverage_idx1,phage_hybrid_coverage_val1=phage_hybrid_coverage_val1, \
+                 host_hybrid_coverage_idx0=host_hybrid_coverage_idx0,host_hybrid_coverage_val0=host_hybrid_coverage_val0, \
+                 host_hybrid_coverage_idx1=host_hybrid_coverage_idx1,host_hybrid_coverage_val1=host_hybrid_coverage_val1, \
+                 host_whole_coverage_idx0=host_whole_coverage_idx0,host_whole_coverage_val0=host_whole_coverage_val0, \
+                 host_whole_coverage_idx1=host_whole_coverage_idx1,host_whole_coverage_val1=host_whole_coverage_val1, \
+                 list_hybrid=self.list_hybrid,insert=self.insert,paired_mismatch=np.array(self.paired_mismatch),\
+                 reads_tested=self.reads_tested)
+
+
+class RCCheckpoint:
+    def __init__(self,count_line,core_id,idx_seq,termini_coverage,whole_coverage,paired_whole_coverage,\
+                 phage_hybrid_coverage, host_hybrid_coverage, \
+                 host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested,read_match):
+        self.count_line=count_line
+        self.core_id=core_id
+        self.idx_seq=idx_seq
+        self.read_match=read_match
+        self.res=RCRes(termini_coverage,whole_coverage,paired_whole_coverage,\
+                 phage_hybrid_coverage, host_hybrid_coverage, \
+                 host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested)
+
+
+    def save(self,dir_chk,core_id,idx_refseq):
+        filename=base_chk_fname+str(self.core_id)+chk_fname_sep+str(self.idx_seq)+chk_fname_sep+\
+                 str(self.count_line)+chk_fname_sep+str(self.read_match)
+        full_fname = os.path.join(dir_chk, filename)
+        self.res.save(full_fname)
+        # remove old breakpoint file
+        list_f=os.listdir(dir_chk)
+        sub_s=base_chk_fname+ str(core_id) + chk_fname_sep + str(idx_refseq) + chk_fname_sep
+        for f in list_f:
+            if f!=filename+".npz" and sub_s in f:
+                os.remove(os.path.join(dir_chk,f))
+
+
+class RCCheckpoint_handler:
+    def __init__(self,chk_freq,dir_chk,test_mode=False):
+        self.chk_freq=chk_freq
+        self.test_mode = test_mode
+        self.start_t=0
+        self.dir_chk = dir_chk
+        # if self.test_mode == True:
+        #     self.v38_C5 = checkpoint_visitor_38_Cos5()
+        #     self.v11150_C5 = checkpoint_visitor_11150_Cos5()
+        if self.test_mode==True:
+            self.start_t = time.perf_counter_ns()
+            if os.path.exists(dir_chk):
+                if not (os.path.isdir(dir_chk)):
+                    raise RuntimeError("dir_chk must point to a directory")
+            else:
+                os.mkdir(dir_chk)
+        elif self.chk_freq!=0:
+            if os.path.exists(dir_chk):
+                if not (os.path.isdir(dir_chk)):
+                    raise RuntimeError("dir_chk must point to a directory")
+            else:
+                raise RuntimeError("dir_chk must point to an existing directory")
+
+    def getIdxSeq(self,core_id):
+        idx_seq=0
+        if self.chk_freq!=0 or self.test_mode==True:
+            list_f = os.listdir(self.dir_chk)
+            subfname = base_chk_fname+ str(core_id) + chk_fname_sep
+            chk_f = ""
+            for fname in list_f:
+                if subfname in fname:
+                    chk_f = fname
+                    break
+            if chk_f != "":
+                l=chk_f.split(chk_fname_sep)
+                idx_seq=int(l[2])
+        return idx_seq
+
+
+    def load(self,core_id,idx_refseq):
+        if self.chk_freq!=0 or self.test_mode==True:
+            list_f = os.listdir(self.dir_chk)
+            subfname = base_chk_fname+ str(core_id) + chk_fname_sep + str(idx_refseq) + chk_fname_sep
+            chk_f = ""
+            for fname in list_f:
+                if subfname in fname:
+                    chk_f = fname
+                    break
+            if chk_f != "":
+                interm_res=loadRCRes(os.path.join(self.dir_chk,chk_f))
+                # if self.test_mode==True:
+                #     interm_res.accept(self.v38_C5)
+                l=chk_f.split(chk_fname_sep)
+                cnt_line=int(l[-2])
+                tmp=l[-1] # get rid of .npz extension
+                l2=tmp.split(".")
+                read_match=int(l2[0])
+                partial_res=RCWorkingS(interm_res,cnt_line,read_match)
+                # if self.test_mode:
+                #     partial_res.accept(self.v38_C5)
+                #     partial_res.make_visit()
+                return partial_res
+            else:  # no checkpoint found for this sequence, start from beginning
+                return None
+        else:
+            return None
+
+
+    def check(self,count_line,core_id,idx_seq,termini_coverage,whole_coverage,paired_whole_coverage,\
+                 phage_hybrid_coverage, host_hybrid_coverage, \
+                 host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested,read_match):
+        cur_t = time.perf_counter_ns()
+        elapsed_t = cur_t - self.start_t
+        #convert elapsed_t tp to seconds
+        elaspsed_t=elapsed_t * 0.000000001
+        if (self.test_mode==True or (self.chk_freq!=0 and (elapsed_t % self.chk_freq == 0))):  # time to create checkpoint.
+            chkp=RCCheckpoint(count_line,core_id,idx_seq,termini_coverage,whole_coverage,paired_whole_coverage,\
+                 phage_hybrid_coverage, host_hybrid_coverage, \
+                 host_whole_coverage,list_hybrid,insert,paired_mismatch,reads_tested,read_match)
+            chkp.save(self.dir_chk,core_id,idx_seq)
+
+
+    def end(self,core_id):
+        if (self.test_mode==False and self.chk_freq!=0) :
+            # remove old breakpoint file
+            list_f = os.listdir(self.dir_chk)
+            sub_s=base_chk_fname+str(core_id)+chk_fname_sep
+            for f in list_f:
+                if sub_s in f:
+                    os.remove(os.path.join(self.dir_chk, f))
+
+
+
+
+
+
+
+
+