diff PhageTerm.py @ 0:69e8f12c8b31 draft

"planemo upload"
author bioit_sciensano
date Fri, 11 Mar 2022 15:06:20 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/PhageTerm.py	Fri Mar 11 15:06:20 2022 +0000
@@ -0,0 +1,336 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+##@file phageterm.py
+#
+# main program
+## PhageTerm software
+#
+#  Phageterm is a tool to determine phage termini and packaging strategy
+#  and other useful informations using raw sequencing reads.
+#  (This programs works with sequencing reads from a randomly
+#  sheared DNA library preparations as Illumina TruSeq paired-end or similar)
+#
+#  ----------------------------------------------------------------------
+#  Copyright (C) 2017 Julian Garneau
+#
+#   This program is free software; you can redistribute it and/or modify
+#   it under the terms of the GNU General Public License as published by
+#   the Free Software Foundation; either version 3 of the License, or
+#   (at your option) any later version.
+#   <http://www.gnu.org/licenses/gpl-3.0.html>
+#
+#   This program is distributed in the hope that it will be useful,
+#   but WITHOUT ANY WARRANTY; without even the implied warranty of
+#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#   GNU General Public License for more details.
+#  ----------------------------------------------------------------------
+#
+#  @author Julian Garneau <julian.garneau@usherbrooke.ca>
+#  @author Marc Monot <marc.monot@pasteur.fr>
+#  @author David Bikard <david.bikard@pasteur.fr>
+
+ 
+### PYTHON Module
+# Base
+#import sys
+
+
+from __future__ import print_function
+
+# Multiprocessing
+import multiprocessing
+import os
+from multiprocessing import Manager
+
+
+# Project
+
+from _modules.utilities import checkReportTitle
+from _modules.functions_PhageTerm import *
+from _modules.common_readsCoverage_processing import processCovValuesForSeq
+from _modules.main_utils import setOptions,checkOptArgsConsistency
+
+
+### MAIN
+def main():
+
+    getopt=setOptions()
+    inRawDArgs, fParms, tParms, inDArgs=checkOptArgsConsistency(getopt)
+
+    # For each fasta in file
+    DR = {"Headful (pac)":{}, "COS (5')":{}, "COS (3')":{}, "COS":{}, "DTR (short)":{}, "DTR (long)":{}, "Mu-like":{}, "UNKNOWN":{}, "NEW":{}}
+    results_pos = 0
+    no_match = []
+    draw = 0 # used when one wants to draw some graphs.
+    chk_handler = RCCheckpoint_handler(tParms.chk_freq, tParms.dir_chk, tParms.test_mode)
+    ## VL: keep this code just in case we want to try GPU implementation again later.
+    # if tParms.gpu!=0:
+    #     ref_data = refData(inDArgs.refseq_liste, fParms.seed, inDArgs.hostseq)
+    #     nb_extracts=inRawDArgs.tot_reads
+    #     if (inRawDArgs.paired!=""):
+    #         nb_extracts_per_read=7
+    #     else:
+    #         nb_extracts_per_read=4
+    #     nb_extracts *= nb_extracts_per_read
+    #
+    #     gpu_mapping_res_dir = tParms.gpu_mapping_res_dir
+    #     wanted_gpu_nb_chunks = tParms.wanted_chunks
+    #     mapper = GPU_chunkMapper()
+    #     mapper.setRefData(ref_data)
+    #     mapper.setFicDir(gpu_mapping_res_dir)
+    #     nb_kmer_in_chunk = nb_extracts//wanted_gpu_nb_chunks
+    #     doMapping(nb_kmer_in_chunk, mapper, inRawDArgs.fastq, "", ref_data, nb_extracts_per_read)
+    #     if tParms.gpu_mapping_res_dir!=0:
+    #         exit() # Consider that if we put results in files, it is because we are processing large datasets on a cluster. Otherwise, go on working.
+    #
+    # if tParms.dir_cov_res!=None and tParms.gpu_mapping_res_dir!=None: # Process the mapping results produced by the GPU and put results in files
+    #     if tParms.idx_chunk==None or tParms.idx_seq==None:
+    #         print "Indicate index of chunk and sequence to process"
+    #         exit(1)
+    #     seq_info = seqInfo(inDArgs.refseq_liste[tParms.idx_seq],tParms.idx_seq, inDArgs.hostseq)
+    #     fname=os.path.join(tParms.gpu_mapping_res_dir,base_fname_rinfo+str(tParms.idx_chunk))
+    #     d_rinfo=load_d_rinfo(fname)
+    #     readsCoverageGPU_chunk(inRawDArgs.fastq, seq_info, tParms.idx_chunk, d_rinfo, fParms.edge, tParms.limit_coverage, fParms.virome, tParms.gpu_mapping_res_dir,
+    #                            tParms.dir_cov_res, logger=None)
+    #     exit() # Consider that if we put results in files, it is because we are processing large datasets on a cluster.
+
+    if tParms.multi_machine:
+        print("Running on cluster")
+        print(tParms.dir_cov_mm, tParms.seq_id, tParms.dir_seq_mm, tParms.DR_path)
+        if tParms.dir_cov_mm!=None and tParms.gpu_mapping_res_dir==None and tParms.dir_seq_mm==None: # perform mapping and readCoverage calculation and write results in file.
+            # In that case we are processing data in an embarrassingly parallel way on a cluster.
+            position = []
+            read_indices = list(range(int(inRawDArgs.tot_reads)))
+            part = chunks(read_indices, tParms.core)
+            for i in range(tParms.core):
+                position.append(next(part)[0])
+
+            position = position + [int(inRawDArgs.tot_reads)]
+            idx_refseq=chk_handler.getIdxSeq(tParms.core_id)
+            print("starting processing at sequence: ",idx_refseq)
+            for refseq in inDArgs.refseq_liste[idx_refseq:]:
+                readsCoverage(inRawDArgs, refseq, inDArgs, fParms,None,tParms.core_id, position[tParms.core_id], position[tParms.core_id + 1],
+                              tParms,chk_handler,idx_refseq)
+                print("Processed: ", idx_refseq, " sequences")
+                idx_refseq+=1
+            if tParms.core_id==0:
+                fname=os.path.join(tParms.dir_cov_mm,"nb_seq_processed.txt")
+                f=open(fname,"w")
+                f.write(str(idx_refseq))
+                f.close()
+            exit() # Consider that if we put results in files, it is because we are processing large datasets on a cluster.
+        if tParms.dir_cov_mm!=None and tParms.seq_id!=None and tParms.dir_seq_mm!=None and tParms.DR_path!=None:
+            from _modules.seq_processing import sum_readsCoverage_for_seq
+            # in that case, we are processing all the results of readCoverage sequence by sequence in an embarrassingly parallel way on a cluster.
+            sum_readsCoverage_for_seq(tParms.dir_cov_mm, tParms.seq_id, tParms.nb_pieces, inDArgs, fParms, inRawDArgs, tParms.dir_seq_mm,tParms.DR_path)
+            exit()
+        if tParms.dir_seq_mm!=None and tParms.dir_cov_mm==None and tParms.seq_id==None and tParms.DR_path!=None: # report generation
+            from _modules.generate_report import loadDR,genReport
+            loadDR(tParms.DR_path, DR)
+            genReport(fParms, inDArgs, inRawDArgs, no_match, DR)
+            exit()
+    else: # mono machine original multi processing mode.
+        ### COVERAGE
+        print("\nCalculating coverage values, please wait (may take a while)...\n")
+        start_run = time.time()
+
+        if not fParms.test_run and tParms.core == 1:
+            print("If your computer has more than 1 processor, you can use the -c or --core option to speed up the process.\n\n")
+
+
+        for refseq in inDArgs.refseq_liste:
+            jobs = []
+            manager = Manager()
+            return_dict = manager.dict()
+            position = []
+
+            read_indices = list(range(int(inRawDArgs.tot_reads)))
+            part = chunks(read_indices, tParms.core)
+            for i in range(tParms.core):
+                position.append(next(part)[0])
+
+            position = position + [int(inRawDArgs.tot_reads)]
+
+            for i in range(0, tParms.core):
+                tParms.core_id=i
+                process = multiprocessing.Process(target=readsCoverage, args=(inRawDArgs, refseq, inDArgs, fParms,return_dict, i,position[i], position[i+1],
+                                                                              tParms, chk_handler,results_pos))
+                jobs.append(process)
+
+            for j in jobs:
+                j.start()
+
+            for j in jobs:
+                j.join()
+
+            # merging results
+            for core_id in range(tParms.core):
+                if core_id == 0:
+                    termini_coverage       = return_dict[core_id][0]
+                    whole_coverage         = return_dict[core_id][1]
+                    paired_whole_coverage  = return_dict[core_id][2]
+                    phage_hybrid_coverage  = return_dict[core_id][3]
+                    host_hybrid_coverage   = return_dict[core_id][4]
+                    host_whole_coverage    = return_dict[core_id][5]
+                    list_hybrid            = return_dict[core_id][6]
+                    insert                 = return_dict[core_id][7].tolist()
+                    paired_missmatch       = return_dict[core_id][8]
+                    reads_tested           = return_dict[core_id][9]
+                else:
+                    termini_coverage      += return_dict[core_id][0]
+                    whole_coverage        += return_dict[core_id][1]
+                    paired_whole_coverage += return_dict[core_id][2]
+                    phage_hybrid_coverage += return_dict[core_id][3]
+                    host_hybrid_coverage  += return_dict[core_id][4]
+                    host_whole_coverage   += return_dict[core_id][5]
+                    list_hybrid           += return_dict[core_id][6]
+                    insert                += return_dict[core_id][7].tolist()
+                    paired_missmatch      += return_dict[core_id][8]
+                    reads_tested          += return_dict[core_id][9]
+
+            termini_coverage = termini_coverage.tolist()
+            whole_coverage = whole_coverage.tolist()
+            paired_whole_coverage = paired_whole_coverage.tolist()
+            phage_hybrid_coverage = phage_hybrid_coverage.tolist()
+            host_hybrid_coverage = host_hybrid_coverage.tolist()
+            host_whole_coverage = host_whole_coverage.tolist()
+            list_hybrid = list_hybrid.tolist()
+
+
+                        # Estimate fParms.virome run time
+            if fParms.virome:
+                end_run = time.time()
+                virome_run = int((end_run - start_run) * inDArgs.nbr_virome)
+                print("\n\nThe fasta file tested contains: " + str(inDArgs.nbr_virome) + " contigs (mean length: " + str(
+                    inDArgs.mean_virome) + ")")
+                print("\nA complete run takes approximatively (" + str(tParms.core) + " core used) : " + EstimateTime(
+                    virome_run) + "\n")
+                exit()
+
+            # Contigs without any match
+            if sum(termini_coverage[0]) + sum(termini_coverage[1]) == 0:
+                no_match.append((checkReportTitle(inDArgs.refseq_name[results_pos])))
+                continue
+
+            s_stats=processCovValuesForSeq(refseq,inDArgs.hostseq,inDArgs.refseq_name,inDArgs.refseq_liste,fParms.seed,inRawDArgs.analysis_name,inRawDArgs.tot_reads,\
+                                       results_pos,fParms.test_run, inRawDArgs.paired,fParms.edge,inRawDArgs.host,fParms.test, fParms.surrounding,\
+                                       fParms.limit_preferred,fParms.limit_fixed,fParms.Mu_threshold,termini_coverage,whole_coverage,\
+                                       paired_whole_coverage,phage_hybrid_coverage,host_hybrid_coverage, host_whole_coverage,insert,list_hybrid,reads_tested,DR)
+
+
+            results_pos += 1
+
+
+
+        ### EXPORT Data
+        if len(inDArgs.refseq_liste) == 1:
+            # Test No Match
+            if len(no_match) == 1:
+                print("\n\nERROR: No reads match, please check your reference file.")
+                exit()
+
+            # Text report only
+            if fParms.workflow:
+                WorkflowReport(inRawDArgs.analysis_name, s_stats.P_class, s_stats.P_left, s_stats.P_right, s_stats.P_type, s_stats.P_orient, s_stats.ave_whole_cov)
+            else:
+                # Statistics
+                ExportStatistics(inRawDArgs.analysis_name, whole_coverage, paired_whole_coverage, termini_coverage, s_stats.phage_plus_norm, s_stats.phage_minus_norm, inRawDArgs.paired, fParms.test_run)
+
+            # Sequence
+            ExportCohesiveSeq(inRawDArgs.analysis_name, s_stats.ArtcohesiveSeq, s_stats.P_seqcoh, fParms.test_run)
+            ExportPhageSequence(inRawDArgs.analysis_name, s_stats.P_left, s_stats.P_right, refseq, s_stats.P_orient, s_stats.Redundant, s_stats.Mu_like, \
+                                s_stats.P_class, s_stats.P_seqcoh, fParms.test_run)
+
+            # Report
+            # TODO: just pass s_stat as argument; it will be cleaner.
+            CreateReport(inRawDArgs.analysis_name, fParms.seed, s_stats.added_whole_coverage, draw, s_stats.Redundant, s_stats.P_left, s_stats.P_right, s_stats.Permuted, \
+                         s_stats.P_orient, s_stats.termini_coverage_norm_close, \
+                         s_stats.picMaxPlus_norm_close, s_stats.picMaxMinus_norm_close, s_stats.gen_len, inRawDArgs.tot_reads, s_stats.P_seqcoh, s_stats.phage_plus_norm, \
+                         s_stats.phage_minus_norm, s_stats.ArtPackmode, s_stats.termini, s_stats.forward, s_stats.reverse, s_stats.ArtOrient, s_stats.ArtcohesiveSeq, \
+                         s_stats.termini_coverage_close, s_stats.picMaxPlus_close, s_stats.picMaxMinus_close, \
+                         s_stats.picOUT_norm_forw, s_stats.picOUT_norm_rev, s_stats.picOUT_forw, s_stats.picOUT_rev, s_stats.lost_perc, s_stats.ave_whole_cov, \
+                         s_stats.R1, s_stats.R2, s_stats.R3, inRawDArgs.host, len(inDArgs.hostseq), host_whole_coverage, \
+                         s_stats.picMaxPlus_host, s_stats.picMaxMinus_host, fParms.surrounding, s_stats.drop_cov, inRawDArgs.paired, insert, phage_hybrid_coverage,\
+                         host_hybrid_coverage, s_stats.added_paired_whole_coverage, s_stats.Mu_like, fParms.test_run, s_stats.P_class, s_stats.P_type, s_stats.P_concat)
+
+            if (inRawDArgs.nrt==True): # non regression tests, dump phage class name into file for later checking.
+                fnrt=open("nrt.txt","w")
+                fnrt.write(s_stats.P_class)
+                fnrt.close()
+        else:
+            # Test No Match
+            if len(no_match) == inDArgs.nbr_virome:
+                print("\n\nERROR: No reads match, please check your reference file.")
+                exit()
+
+            # Report Resume
+            multiReport     = SummaryReport(inRawDArgs.analysis_name, DR, no_match)
+            multiCohSeq     = ""
+            multiPhageSeq   = ""
+            multiWorkflow   = "#analysis_name\tClass\tLeft\tPVal\tAdjPval\tRight\tPVal\tAdjPval\tType\tOrient\tCoverage\tComments\n"
+
+            # No Match in workflow
+            if fParms.workflow:
+                for no_match_contig in no_match:
+                    multiWorkflow += WorkflowReport(no_match_contig, "-", "-", "-", "-", "-", 0, 1)
+
+            for DPC in DR:
+                for DC in DR[DPC]:
+                    stat_dict = DR[DPC][DC]  # splat this in everywhere
+                    # Text report
+                    if fParms.workflow:
+                        multiWorkflow += WorkflowReport(phagename=DC, multi=1, **stat_dict)
+                    # Sequence
+                    idx_refseq=DR[DPC][DC]["idx_refseq_in_list"]
+                    refseq=inDArgs.refseq_liste[idx_refseq]
+                    multiCohSeq   += ExportCohesiveSeq(DC, stat_dict["ArtcohesiveSeq"], stat_dict["P_seqcoh"], fParms.test_run, 1)
+                    multiPhageSeq += ExportPhageSequence(DC, stat_dict["P_left"], stat_dict["P_right"], refseq, stat_dict["P_orient"], stat_dict["Redundant"], stat_dict["Mu_like"], stat_dict["P_class"], stat_dict["P_seqcoh"], fParms.test_run, 1)
+
+                    # Report
+                    multiReport = CreateReport(phagename=DC,
+                                               draw=draw,
+                                               multi=1,
+                                               multiReport=multiReport,
+                                               **stat_dict)
+
+            # Workflow
+            if not fParms.test:
+                if fParms.workflow:
+                    filoutWorkflow = open(inRawDArgs.analysis_name + "_workflow.txt", "w")
+                    filoutWorkflow.write(multiWorkflow)
+                    filoutWorkflow.close()
+
+                # Concatene Sequences
+                filoutCohSeq = open(inRawDArgs.analysis_name + "_cohesive-sequence.fasta", "w")
+                filoutCohSeq.write(multiCohSeq)
+                filoutCohSeq.close()
+
+                filoutPhageSeq = open(inRawDArgs.analysis_name + "_sequence.fasta", "w")
+                filoutPhageSeq.write(multiPhageSeq)
+                filoutPhageSeq.close()
+
+            # Concatene Report
+            doc = SimpleDocTemplate("%s_PhageTerm_report.pdf" % inRawDArgs.analysis_name, pagesize=letter, rightMargin=10,leftMargin=10, topMargin=5, bottomMargin=10)
+            doc.build(multiReport)
+
+
+        # Real virome run time
+        end_run = time.time()
+        virome_run = int(end_run-start_run)
+        print("\nThe fasta file tested contains: " + str(inDArgs.nbr_virome) + " contigs (mean length: " + str(inDArgs.mean_virome) + ")")
+        print("The run has taken (" + str(tParms.core) + " core used) : " + EstimateTime(virome_run) + "\n")
+        exit()
+
+
+
+if __name__ == '__main__':
+    main()
+
+
+
+
+
+
+
+
+