Mercurial > repos > bioit_sciensano > phagetermvirome
comparison PhageTerm.py @ 0:69e8f12c8b31 draft
"planemo upload"
author | bioit_sciensano |
---|---|
date | Fri, 11 Mar 2022 15:06:20 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:69e8f12c8b31 |
---|---|
1 #! /usr/bin/env python | |
2 # -*- coding: utf-8 -*- | |
3 ##@file phageterm.py | |
4 # | |
5 # main program | |
6 ## PhageTerm software | |
7 # | |
8 # Phageterm is a tool to determine phage termini and packaging strategy | |
9 # and other useful informations using raw sequencing reads. | |
10 # (This programs works with sequencing reads from a randomly | |
11 # sheared DNA library preparations as Illumina TruSeq paired-end or similar) | |
12 # | |
13 # ---------------------------------------------------------------------- | |
14 # Copyright (C) 2017 Julian Garneau | |
15 # | |
16 # This program is free software; you can redistribute it and/or modify | |
17 # it under the terms of the GNU General Public License as published by | |
18 # the Free Software Foundation; either version 3 of the License, or | |
19 # (at your option) any later version. | |
20 # <http://www.gnu.org/licenses/gpl-3.0.html> | |
21 # | |
22 # This program is distributed in the hope that it will be useful, | |
23 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
24 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
25 # GNU General Public License for more details. | |
26 # ---------------------------------------------------------------------- | |
27 # | |
28 # @author Julian Garneau <julian.garneau@usherbrooke.ca> | |
29 # @author Marc Monot <marc.monot@pasteur.fr> | |
30 # @author David Bikard <david.bikard@pasteur.fr> | |
31 | |
32 | |
33 ### PYTHON Module | |
34 # Base | |
35 #import sys | |
36 | |
37 | |
38 from __future__ import print_function | |
39 | |
40 # Multiprocessing | |
41 import multiprocessing | |
42 import os | |
43 from multiprocessing import Manager | |
44 | |
45 | |
46 # Project | |
47 | |
48 from _modules.utilities import checkReportTitle | |
49 from _modules.functions_PhageTerm import * | |
50 from _modules.common_readsCoverage_processing import processCovValuesForSeq | |
51 from _modules.main_utils import setOptions,checkOptArgsConsistency | |
52 | |
53 | |
54 ### MAIN | |
55 def main(): | |
56 | |
57 getopt=setOptions() | |
58 inRawDArgs, fParms, tParms, inDArgs=checkOptArgsConsistency(getopt) | |
59 | |
60 # For each fasta in file | |
61 DR = {"Headful (pac)":{}, "COS (5')":{}, "COS (3')":{}, "COS":{}, "DTR (short)":{}, "DTR (long)":{}, "Mu-like":{}, "UNKNOWN":{}, "NEW":{}} | |
62 results_pos = 0 | |
63 no_match = [] | |
64 draw = 0 # used when one wants to draw some graphs. | |
65 chk_handler = RCCheckpoint_handler(tParms.chk_freq, tParms.dir_chk, tParms.test_mode) | |
66 ## VL: keep this code just in case we want to try GPU implementation again later. | |
67 # if tParms.gpu!=0: | |
68 # ref_data = refData(inDArgs.refseq_liste, fParms.seed, inDArgs.hostseq) | |
69 # nb_extracts=inRawDArgs.tot_reads | |
70 # if (inRawDArgs.paired!=""): | |
71 # nb_extracts_per_read=7 | |
72 # else: | |
73 # nb_extracts_per_read=4 | |
74 # nb_extracts *= nb_extracts_per_read | |
75 # | |
76 # gpu_mapping_res_dir = tParms.gpu_mapping_res_dir | |
77 # wanted_gpu_nb_chunks = tParms.wanted_chunks | |
78 # mapper = GPU_chunkMapper() | |
79 # mapper.setRefData(ref_data) | |
80 # mapper.setFicDir(gpu_mapping_res_dir) | |
81 # nb_kmer_in_chunk = nb_extracts//wanted_gpu_nb_chunks | |
82 # doMapping(nb_kmer_in_chunk, mapper, inRawDArgs.fastq, "", ref_data, nb_extracts_per_read) | |
83 # if tParms.gpu_mapping_res_dir!=0: | |
84 # exit() # Consider that if we put results in files, it is because we are processing large datasets on a cluster. Otherwise, go on working. | |
85 # | |
86 # if tParms.dir_cov_res!=None and tParms.gpu_mapping_res_dir!=None: # Process the mapping results produced by the GPU and put results in files | |
87 # if tParms.idx_chunk==None or tParms.idx_seq==None: | |
88 # print "Indicate index of chunk and sequence to process" | |
89 # exit(1) | |
90 # seq_info = seqInfo(inDArgs.refseq_liste[tParms.idx_seq],tParms.idx_seq, inDArgs.hostseq) | |
91 # fname=os.path.join(tParms.gpu_mapping_res_dir,base_fname_rinfo+str(tParms.idx_chunk)) | |
92 # d_rinfo=load_d_rinfo(fname) | |
93 # readsCoverageGPU_chunk(inRawDArgs.fastq, seq_info, tParms.idx_chunk, d_rinfo, fParms.edge, tParms.limit_coverage, fParms.virome, tParms.gpu_mapping_res_dir, | |
94 # tParms.dir_cov_res, logger=None) | |
95 # exit() # Consider that if we put results in files, it is because we are processing large datasets on a cluster. | |
96 | |
97 if tParms.multi_machine: | |
98 print("Running on cluster") | |
99 print(tParms.dir_cov_mm, tParms.seq_id, tParms.dir_seq_mm, tParms.DR_path) | |
100 if tParms.dir_cov_mm!=None and tParms.gpu_mapping_res_dir==None and tParms.dir_seq_mm==None: # perform mapping and readCoverage calculation and write results in file. | |
101 # In that case we are processing data in an embarrassingly parallel way on a cluster. | |
102 position = [] | |
103 read_indices = list(range(int(inRawDArgs.tot_reads))) | |
104 part = chunks(read_indices, tParms.core) | |
105 for i in range(tParms.core): | |
106 position.append(next(part)[0]) | |
107 | |
108 position = position + [int(inRawDArgs.tot_reads)] | |
109 idx_refseq=chk_handler.getIdxSeq(tParms.core_id) | |
110 print("starting processing at sequence: ",idx_refseq) | |
111 for refseq in inDArgs.refseq_liste[idx_refseq:]: | |
112 readsCoverage(inRawDArgs, refseq, inDArgs, fParms,None,tParms.core_id, position[tParms.core_id], position[tParms.core_id + 1], | |
113 tParms,chk_handler,idx_refseq) | |
114 print("Processed: ", idx_refseq, " sequences") | |
115 idx_refseq+=1 | |
116 if tParms.core_id==0: | |
117 fname=os.path.join(tParms.dir_cov_mm,"nb_seq_processed.txt") | |
118 f=open(fname,"w") | |
119 f.write(str(idx_refseq)) | |
120 f.close() | |
121 exit() # Consider that if we put results in files, it is because we are processing large datasets on a cluster. | |
122 if tParms.dir_cov_mm!=None and tParms.seq_id!=None and tParms.dir_seq_mm!=None and tParms.DR_path!=None: | |
123 from _modules.seq_processing import sum_readsCoverage_for_seq | |
124 # in that case, we are processing all the results of readCoverage sequence by sequence in an embarrassingly parallel way on a cluster. | |
125 sum_readsCoverage_for_seq(tParms.dir_cov_mm, tParms.seq_id, tParms.nb_pieces, inDArgs, fParms, inRawDArgs, tParms.dir_seq_mm,tParms.DR_path) | |
126 exit() | |
127 if tParms.dir_seq_mm!=None and tParms.dir_cov_mm==None and tParms.seq_id==None and tParms.DR_path!=None: # report generation | |
128 from _modules.generate_report import loadDR,genReport | |
129 loadDR(tParms.DR_path, DR) | |
130 genReport(fParms, inDArgs, inRawDArgs, no_match, DR) | |
131 exit() | |
132 else: # mono machine original multi processing mode. | |
133 ### COVERAGE | |
134 print("\nCalculating coverage values, please wait (may take a while)...\n") | |
135 start_run = time.time() | |
136 | |
137 if not fParms.test_run and tParms.core == 1: | |
138 print("If your computer has more than 1 processor, you can use the -c or --core option to speed up the process.\n\n") | |
139 | |
140 | |
141 for refseq in inDArgs.refseq_liste: | |
142 jobs = [] | |
143 manager = Manager() | |
144 return_dict = manager.dict() | |
145 position = [] | |
146 | |
147 read_indices = list(range(int(inRawDArgs.tot_reads))) | |
148 part = chunks(read_indices, tParms.core) | |
149 for i in range(tParms.core): | |
150 position.append(next(part)[0]) | |
151 | |
152 position = position + [int(inRawDArgs.tot_reads)] | |
153 | |
154 for i in range(0, tParms.core): | |
155 tParms.core_id=i | |
156 process = multiprocessing.Process(target=readsCoverage, args=(inRawDArgs, refseq, inDArgs, fParms,return_dict, i,position[i], position[i+1], | |
157 tParms, chk_handler,results_pos)) | |
158 jobs.append(process) | |
159 | |
160 for j in jobs: | |
161 j.start() | |
162 | |
163 for j in jobs: | |
164 j.join() | |
165 | |
166 # merging results | |
167 for core_id in range(tParms.core): | |
168 if core_id == 0: | |
169 termini_coverage = return_dict[core_id][0] | |
170 whole_coverage = return_dict[core_id][1] | |
171 paired_whole_coverage = return_dict[core_id][2] | |
172 phage_hybrid_coverage = return_dict[core_id][3] | |
173 host_hybrid_coverage = return_dict[core_id][4] | |
174 host_whole_coverage = return_dict[core_id][5] | |
175 list_hybrid = return_dict[core_id][6] | |
176 insert = return_dict[core_id][7].tolist() | |
177 paired_missmatch = return_dict[core_id][8] | |
178 reads_tested = return_dict[core_id][9] | |
179 else: | |
180 termini_coverage += return_dict[core_id][0] | |
181 whole_coverage += return_dict[core_id][1] | |
182 paired_whole_coverage += return_dict[core_id][2] | |
183 phage_hybrid_coverage += return_dict[core_id][3] | |
184 host_hybrid_coverage += return_dict[core_id][4] | |
185 host_whole_coverage += return_dict[core_id][5] | |
186 list_hybrid += return_dict[core_id][6] | |
187 insert += return_dict[core_id][7].tolist() | |
188 paired_missmatch += return_dict[core_id][8] | |
189 reads_tested += return_dict[core_id][9] | |
190 | |
191 termini_coverage = termini_coverage.tolist() | |
192 whole_coverage = whole_coverage.tolist() | |
193 paired_whole_coverage = paired_whole_coverage.tolist() | |
194 phage_hybrid_coverage = phage_hybrid_coverage.tolist() | |
195 host_hybrid_coverage = host_hybrid_coverage.tolist() | |
196 host_whole_coverage = host_whole_coverage.tolist() | |
197 list_hybrid = list_hybrid.tolist() | |
198 | |
199 | |
200 # Estimate fParms.virome run time | |
201 if fParms.virome: | |
202 end_run = time.time() | |
203 virome_run = int((end_run - start_run) * inDArgs.nbr_virome) | |
204 print("\n\nThe fasta file tested contains: " + str(inDArgs.nbr_virome) + " contigs (mean length: " + str( | |
205 inDArgs.mean_virome) + ")") | |
206 print("\nA complete run takes approximatively (" + str(tParms.core) + " core used) : " + EstimateTime( | |
207 virome_run) + "\n") | |
208 exit() | |
209 | |
210 # Contigs without any match | |
211 if sum(termini_coverage[0]) + sum(termini_coverage[1]) == 0: | |
212 no_match.append((checkReportTitle(inDArgs.refseq_name[results_pos]))) | |
213 continue | |
214 | |
215 s_stats=processCovValuesForSeq(refseq,inDArgs.hostseq,inDArgs.refseq_name,inDArgs.refseq_liste,fParms.seed,inRawDArgs.analysis_name,inRawDArgs.tot_reads,\ | |
216 results_pos,fParms.test_run, inRawDArgs.paired,fParms.edge,inRawDArgs.host,fParms.test, fParms.surrounding,\ | |
217 fParms.limit_preferred,fParms.limit_fixed,fParms.Mu_threshold,termini_coverage,whole_coverage,\ | |
218 paired_whole_coverage,phage_hybrid_coverage,host_hybrid_coverage, host_whole_coverage,insert,list_hybrid,reads_tested,DR) | |
219 | |
220 | |
221 results_pos += 1 | |
222 | |
223 | |
224 | |
225 ### EXPORT Data | |
226 if len(inDArgs.refseq_liste) == 1: | |
227 # Test No Match | |
228 if len(no_match) == 1: | |
229 print("\n\nERROR: No reads match, please check your reference file.") | |
230 exit() | |
231 | |
232 # Text report only | |
233 if fParms.workflow: | |
234 WorkflowReport(inRawDArgs.analysis_name, s_stats.P_class, s_stats.P_left, s_stats.P_right, s_stats.P_type, s_stats.P_orient, s_stats.ave_whole_cov) | |
235 else: | |
236 # Statistics | |
237 ExportStatistics(inRawDArgs.analysis_name, whole_coverage, paired_whole_coverage, termini_coverage, s_stats.phage_plus_norm, s_stats.phage_minus_norm, inRawDArgs.paired, fParms.test_run) | |
238 | |
239 # Sequence | |
240 ExportCohesiveSeq(inRawDArgs.analysis_name, s_stats.ArtcohesiveSeq, s_stats.P_seqcoh, fParms.test_run) | |
241 ExportPhageSequence(inRawDArgs.analysis_name, s_stats.P_left, s_stats.P_right, refseq, s_stats.P_orient, s_stats.Redundant, s_stats.Mu_like, \ | |
242 s_stats.P_class, s_stats.P_seqcoh, fParms.test_run) | |
243 | |
244 # Report | |
245 # TODO: just pass s_stat as argument; it will be cleaner. | |
246 CreateReport(inRawDArgs.analysis_name, fParms.seed, s_stats.added_whole_coverage, draw, s_stats.Redundant, s_stats.P_left, s_stats.P_right, s_stats.Permuted, \ | |
247 s_stats.P_orient, s_stats.termini_coverage_norm_close, \ | |
248 s_stats.picMaxPlus_norm_close, s_stats.picMaxMinus_norm_close, s_stats.gen_len, inRawDArgs.tot_reads, s_stats.P_seqcoh, s_stats.phage_plus_norm, \ | |
249 s_stats.phage_minus_norm, s_stats.ArtPackmode, s_stats.termini, s_stats.forward, s_stats.reverse, s_stats.ArtOrient, s_stats.ArtcohesiveSeq, \ | |
250 s_stats.termini_coverage_close, s_stats.picMaxPlus_close, s_stats.picMaxMinus_close, \ | |
251 s_stats.picOUT_norm_forw, s_stats.picOUT_norm_rev, s_stats.picOUT_forw, s_stats.picOUT_rev, s_stats.lost_perc, s_stats.ave_whole_cov, \ | |
252 s_stats.R1, s_stats.R2, s_stats.R3, inRawDArgs.host, len(inDArgs.hostseq), host_whole_coverage, \ | |
253 s_stats.picMaxPlus_host, s_stats.picMaxMinus_host, fParms.surrounding, s_stats.drop_cov, inRawDArgs.paired, insert, phage_hybrid_coverage,\ | |
254 host_hybrid_coverage, s_stats.added_paired_whole_coverage, s_stats.Mu_like, fParms.test_run, s_stats.P_class, s_stats.P_type, s_stats.P_concat) | |
255 | |
256 if (inRawDArgs.nrt==True): # non regression tests, dump phage class name into file for later checking. | |
257 fnrt=open("nrt.txt","w") | |
258 fnrt.write(s_stats.P_class) | |
259 fnrt.close() | |
260 else: | |
261 # Test No Match | |
262 if len(no_match) == inDArgs.nbr_virome: | |
263 print("\n\nERROR: No reads match, please check your reference file.") | |
264 exit() | |
265 | |
266 # Report Resume | |
267 multiReport = SummaryReport(inRawDArgs.analysis_name, DR, no_match) | |
268 multiCohSeq = "" | |
269 multiPhageSeq = "" | |
270 multiWorkflow = "#analysis_name\tClass\tLeft\tPVal\tAdjPval\tRight\tPVal\tAdjPval\tType\tOrient\tCoverage\tComments\n" | |
271 | |
272 # No Match in workflow | |
273 if fParms.workflow: | |
274 for no_match_contig in no_match: | |
275 multiWorkflow += WorkflowReport(no_match_contig, "-", "-", "-", "-", "-", 0, 1) | |
276 | |
277 for DPC in DR: | |
278 for DC in DR[DPC]: | |
279 stat_dict = DR[DPC][DC] # splat this in everywhere | |
280 # Text report | |
281 if fParms.workflow: | |
282 multiWorkflow += WorkflowReport(phagename=DC, multi=1, **stat_dict) | |
283 # Sequence | |
284 idx_refseq=DR[DPC][DC]["idx_refseq_in_list"] | |
285 refseq=inDArgs.refseq_liste[idx_refseq] | |
286 multiCohSeq += ExportCohesiveSeq(DC, stat_dict["ArtcohesiveSeq"], stat_dict["P_seqcoh"], fParms.test_run, 1) | |
287 multiPhageSeq += ExportPhageSequence(DC, stat_dict["P_left"], stat_dict["P_right"], refseq, stat_dict["P_orient"], stat_dict["Redundant"], stat_dict["Mu_like"], stat_dict["P_class"], stat_dict["P_seqcoh"], fParms.test_run, 1) | |
288 | |
289 # Report | |
290 multiReport = CreateReport(phagename=DC, | |
291 draw=draw, | |
292 multi=1, | |
293 multiReport=multiReport, | |
294 **stat_dict) | |
295 | |
296 # Workflow | |
297 if not fParms.test: | |
298 if fParms.workflow: | |
299 filoutWorkflow = open(inRawDArgs.analysis_name + "_workflow.txt", "w") | |
300 filoutWorkflow.write(multiWorkflow) | |
301 filoutWorkflow.close() | |
302 | |
303 # Concatene Sequences | |
304 filoutCohSeq = open(inRawDArgs.analysis_name + "_cohesive-sequence.fasta", "w") | |
305 filoutCohSeq.write(multiCohSeq) | |
306 filoutCohSeq.close() | |
307 | |
308 filoutPhageSeq = open(inRawDArgs.analysis_name + "_sequence.fasta", "w") | |
309 filoutPhageSeq.write(multiPhageSeq) | |
310 filoutPhageSeq.close() | |
311 | |
312 # Concatene Report | |
313 doc = SimpleDocTemplate("%s_PhageTerm_report.pdf" % inRawDArgs.analysis_name, pagesize=letter, rightMargin=10,leftMargin=10, topMargin=5, bottomMargin=10) | |
314 doc.build(multiReport) | |
315 | |
316 | |
317 # Real virome run time | |
318 end_run = time.time() | |
319 virome_run = int(end_run-start_run) | |
320 print("\nThe fasta file tested contains: " + str(inDArgs.nbr_virome) + " contigs (mean length: " + str(inDArgs.mean_virome) + ")") | |
321 print("The run has taken (" + str(tParms.core) + " core used) : " + EstimateTime(virome_run) + "\n") | |
322 exit() | |
323 | |
324 | |
325 | |
326 if __name__ == '__main__': | |
327 main() | |
328 | |
329 | |
330 | |
331 | |
332 | |
333 | |
334 | |
335 | |
336 |