0
|
1 #!/usr/bin/env python
|
|
2
|
|
3 # Collect quality parsimonious SNPs from vcf files and output alignment files in fasta format.
|
|
4
|
|
5 import argparse
|
|
6 import multiprocessing
|
|
7 import os
|
|
8 import pandas
|
|
9 import queue
|
|
10 import shutil
|
|
11 import sys
|
|
12 import time
|
|
13 import vcf
|
|
14 from collections import OrderedDict
|
|
15 from datetime import datetime
|
|
16
|
|
17 ALL_VCFS_DIR = 'all_vcf'
|
|
18 INPUT_VCF_DIR = 'input_vcf_dir'
|
|
19 OUTPUT_JSON_AVG_MQ_DIR = 'output_json_avg_mq_dir'
|
|
20 OUTPUT_JSON_SNPS_DIR = 'output_json_snps_dir'
|
|
21 OUTPUT_SNPS_DIR = 'output_snps_dir'
|
|
22
|
|
23
|
|
24 def get_time_stamp():
|
|
25 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S')
|
|
26
|
|
27
|
|
28 def set_num_cpus(num_files, processes):
|
|
29 num_cpus = int(multiprocessing.cpu_count())
|
|
30 if num_files < num_cpus and num_files < processes:
|
|
31 return num_files
|
|
32 if num_cpus < processes:
|
|
33 half_cpus = int(num_cpus / 2)
|
|
34 if num_files < half_cpus:
|
|
35 return num_files
|
|
36 return half_cpus
|
|
37 return processes
|
|
38
|
|
39
|
|
40 def setup_all_vcfs(vcf_files, vcf_dirs):
|
|
41 # Create the all_vcfs directory and link
|
|
42 # all input vcf files into it for processing.
|
|
43 os.makedirs(ALL_VCFS_DIR)
|
|
44 vcf_dirs.append(ALL_VCFS_DIR)
|
|
45 for vcf_file in vcf_files:
|
|
46 file_name_base = os.path.basename(vcf_file)
|
|
47 dst_file = os.path.join(ALL_VCFS_DIR, file_name_base)
|
|
48 os.symlink(vcf_file, dst_file)
|
|
49 return vcf_dirs
|
|
50
|
|
51
|
|
52 class SnpFinder:
|
|
53
|
|
54 def __init__(self, num_files, reference, excel_grouper_file,
|
|
55 all_isolates, ac, mq_val, n_threshold, qual_threshold, output_summary):
|
|
56 self.ac = ac
|
|
57 self.all_isolates = all_isolates
|
|
58 self.all_positions = None
|
|
59 # Filter based on the contents of an Excel file.
|
|
60 self.excel_grouper_file = excel_grouper_file
|
|
61 # Use Genbank file
|
|
62 self.groups = []
|
|
63 # This will be populated from the columns
|
|
64 # in the Excel filter file if it is used.
|
|
65 self.mq_val = mq_val
|
|
66 self.n_threshold = n_threshold
|
|
67 self.qual_threshold = qual_threshold
|
|
68 self.reference = reference
|
|
69 self.start_time = get_time_stamp()
|
|
70 self.summary_str = ""
|
|
71 self.timer_start = datetime.now()
|
|
72 self.num_files = num_files
|
|
73 self.initiate_summary(output_summary)
|
|
74
|
|
75 def append_to_summary(self, html_str):
|
|
76 self.summary_str = "%s%s" % (self.summary_str, html_str)
|
|
77
|
|
78 def bin_input_files(self, filename, samples_groups_dict, defining_snps, inverted_defining_snps, found_positions, found_positions_mix):
|
|
79 sample_groups_list = []
|
|
80 table_name = self.get_base_file_name(filename)
|
|
81 try:
|
|
82 defining_snp = False
|
|
83 # Absolute positions in set union of two lists.
|
|
84 for abs_position in list(defining_snps.keys() & (found_positions.keys() | found_positions_mix.keys())):
|
|
85 group = defining_snps[abs_position]
|
|
86 sample_groups_list.append(group)
|
|
87 self.check_add_group(group)
|
|
88 if len(list(defining_snps.keys() & found_positions_mix.keys())) > 0:
|
|
89 table_name = self.get_base_file_name(filename)
|
|
90 table_name = '%s<font color="red">[[MIXED]]</font>' % table_name
|
|
91 self.copy_file(filename, group)
|
|
92 defining_snp = True
|
|
93 if not set(inverted_defining_snps.keys()).intersection(found_positions.keys() | found_positions_mix.keys()):
|
|
94 for abs_position in list(inverted_defining_snps.keys()):
|
|
95 group = inverted_defining_snps[abs_position]
|
|
96 sample_groups_list.append(group)
|
|
97 self.check_add_group(group)
|
|
98 self.copy_file(filename, group)
|
|
99 defining_snp = True
|
|
100 if defining_snp:
|
|
101 samples_groups_dict[table_name] = sorted(sample_groups_list)
|
|
102 else:
|
|
103 samples_groups_dict[table_name] = ['<font color="red">No defining SNP</font>']
|
|
104 except TypeError as e:
|
|
105 msg = "<br/>Error processing file %s to generate samples_groups_dict: %s<br/>" % (filename, str(e))
|
|
106 self.append_to_summary(msg)
|
|
107 samples_groups_dict[table_name] = [msg]
|
|
108 return samples_groups_dict
|
|
109
|
|
110 def check_add_group(self, group):
|
|
111 if group not in self.groups:
|
|
112 self.groups.append(group)
|
|
113
|
|
114 def copy_file(self, filename, dir):
|
|
115 if not os.path.exists(dir):
|
|
116 os.makedirs(dir)
|
|
117 shutil.copy(filename, dir)
|
|
118
|
|
119 def decide_snps(self, filename):
|
|
120 positions_dict = self.all_positions
|
|
121 # Find the SNPs in a vcf file to produce a pandas data
|
|
122 # frame and a dictionary containing sample map qualities.
|
|
123 sample_map_qualities = {}
|
|
124 # Eliminate the path.
|
|
125 file_name_base = self.get_base_file_name(filename)
|
|
126 vcf_reader = vcf.Reader(open(filename, 'r'))
|
|
127 sample_dict = {}
|
|
128 for record in vcf_reader:
|
|
129 alt = str(record.ALT[0])
|
|
130 record_position = "%s:%s" % (str(record.CHROM), str(record.POS))
|
|
131 if record_position in positions_dict:
|
|
132 if alt == "None":
|
|
133 sample_dict.update({record_position: "-"})
|
|
134 else:
|
|
135 # Not sure this is the best place to capture MQM average
|
|
136 # may be faster after parsimony SNPs are decided, but
|
|
137 # then it will require opening the files again.
|
|
138 # On rare occassions MQM gets called "NaN", thus passing
|
|
139 # a string when a number is expected when calculating average.
|
|
140 mq_val = self.get_mq_val(record.INFO, filename)
|
|
141 if str(mq_val).lower() not in ["nan"]:
|
|
142 sample_map_qualities.update({record_position: mq_val})
|
|
143 # Add parameters here to change what each vcf represents.
|
|
144 # SNP is represented in table, now how will the vcf represent
|
|
145 # the called position alt != "None", which means a deletion
|
|
146 # as alt is not record.FILTER, or rather passed.
|
|
147 len_alt = len(alt)
|
|
148 if len_alt == 1:
|
|
149 qual_val = self.val_as_int(record.QUAL)
|
|
150 ac = record.INFO['AC'][0]
|
|
151 ref = str(record.REF[0])
|
|
152 if ac == 2 and qual_val > self.n_threshold:
|
|
153 sample_dict.update({record_position: alt})
|
|
154 elif ac == 1 and qual_val > self.n_threshold:
|
|
155 alt_ref = "%s%s" % (alt, ref)
|
|
156 if alt_ref == "AG":
|
|
157 sample_dict.update({record_position: "R"})
|
|
158 elif alt_ref == "CT":
|
|
159 sample_dict.update({record_position: "Y"})
|
|
160 elif alt_ref == "GC":
|
|
161 sample_dict.update({record_position: "S"})
|
|
162 elif alt_ref == "AT":
|
|
163 sample_dict.update({record_position: "W"})
|
|
164 elif alt_ref == "GT":
|
|
165 sample_dict.update({record_position: "K"})
|
|
166 elif alt_ref == "AC":
|
|
167 sample_dict.update({record_position: "M"})
|
|
168 elif alt_ref == "GA":
|
|
169 sample_dict.update({record_position: "R"})
|
|
170 elif alt_ref == "TC":
|
|
171 sample_dict.update({record_position: "Y"})
|
|
172 elif alt_ref == "CG":
|
|
173 sample_dict.update({record_position: "S"})
|
|
174 elif alt_ref == "TA":
|
|
175 sample_dict.update({record_position: "W"})
|
|
176 elif alt_ref == "TG":
|
|
177 sample_dict.update({record_position: "K"})
|
|
178 elif alt_ref == "CA":
|
|
179 sample_dict.update({record_position: "M"})
|
|
180 else:
|
|
181 sample_dict.update({record_position: "N"})
|
|
182 # Poor calls
|
|
183 elif qual_val <= 50:
|
|
184 # Do not coerce record.REF[0] to a string!
|
|
185 sample_dict.update({record_position: record.REF[0]})
|
|
186 elif qual_val <= self.n_threshold:
|
|
187 sample_dict.update({record_position: "N"})
|
|
188 else:
|
|
189 # Insurance -- Will still report on a possible
|
|
190 # SNP even if missed with above statement
|
|
191 # Do not coerce record.REF[0] to a string!
|
|
192 sample_dict.update({record_position: record.REF[0]})
|
|
193 # Merge dictionaries and order
|
|
194 merge_dict = {}
|
|
195 # abs_pos:REF
|
|
196 merge_dict.update(positions_dict)
|
|
197 # abs_pos:ALT replacing all_positions, because keys must be unique
|
|
198 merge_dict.update(sample_dict)
|
|
199 sample_df = pandas.DataFrame(merge_dict, index=[file_name_base])
|
|
200 return sample_df, file_name_base, sample_map_qualities
|
|
201
|
|
202 def df_to_fasta(self, parsimonious_df, group):
|
|
203 # Generate SNP alignment file from the parsimonious_df
|
|
204 # data frame.
|
|
205 snps_file = os.path.join(OUTPUT_SNPS_DIR, "%s.fasta" % group)
|
|
206 test_duplicates = []
|
|
207 has_sequence_data = False
|
|
208 for index, row in parsimonious_df.iterrows():
|
|
209 for pos in row:
|
|
210 if len(pos) > 0:
|
|
211 has_sequence_data = True
|
|
212 break
|
|
213 if has_sequence_data:
|
|
214 with open(snps_file, 'w') as fh:
|
|
215 for index, row in parsimonious_df.iterrows():
|
|
216 test_duplicates.append(row.name)
|
|
217 if test_duplicates.count(row.name) < 2:
|
|
218 print(f'>{row.name}', file=fh)
|
|
219 for pos in row:
|
|
220 print(pos, end='', file=fh)
|
|
221 print("", file=fh)
|
|
222 return has_sequence_data
|
|
223
|
|
224 def find_initial_positions(self, filename):
|
|
225 # Find SNP positions in a vcf file.
|
|
226 found_positions = {}
|
|
227 found_positions_mix = {}
|
|
228 try:
|
|
229 vcf_reader = vcf.Reader(open(filename, 'r'))
|
|
230 try:
|
|
231 for record in vcf_reader:
|
|
232 qual_val = self.val_as_int(record.QUAL)
|
|
233 chrom = record.CHROM
|
|
234 position = record.POS
|
|
235 absolute_position = "%s:%s" % (str(chrom), str(position))
|
|
236 alt = str(record.ALT[0])
|
|
237 if alt != "None":
|
|
238 mq_val = self.get_mq_val(record.INFO, filename)
|
|
239 ac = record.INFO['AC'][0]
|
|
240 len_ref = len(record.REF)
|
|
241 if ac == self.ac and len_ref == 1 and qual_val > self.qual_threshold and mq_val > self.mq_val:
|
|
242 found_positions.update({absolute_position: record.REF})
|
|
243 if ac == 1 and len_ref == 1 and qual_val > self.qual_threshold and mq_val > self.mq_val:
|
|
244 found_positions_mix.update({absolute_position: record.REF})
|
|
245 return found_positions, found_positions_mix
|
|
246 except (ZeroDivisionError, ValueError, UnboundLocalError, TypeError) as e:
|
|
247 self.append_to_summar("<br/>Error parsing record in file %s: %s<br/>" % (filename, str(e)))
|
|
248 return {'': ''}, {'': ''}
|
|
249 except (SyntaxError, AttributeError) as e:
|
|
250 self.append_to_summary("<br/>Error attempting to read file %s: %s<br/>" % (filename, str(e)))
|
|
251 return {'': ''}, {'': ''}
|
|
252
|
|
253 def gather_and_filter(self, prefilter_df, mq_averages, group_dir):
|
|
254 # Group a data frame of SNPs.
|
|
255 if self.excel_grouper_file is None:
|
|
256 filtered_all_df = prefilter_df
|
|
257 sheet_names = None
|
|
258 else:
|
|
259 # Filter positions to be removed from all.
|
|
260 xl = pandas.ExcelFile(self.excel_grouper_file)
|
|
261 sheet_names = xl.sheet_names
|
|
262 # Use the first column to filter "all" postions.
|
|
263 exclusion_list_all = self.get_position_list(sheet_names, 0)
|
|
264 exclusion_list_group = self.get_position_list(sheet_names, group_dir)
|
|
265 exclusion_list = exclusion_list_all + exclusion_list_group
|
|
266 # Filters for all applied.
|
|
267 filtered_all_df = prefilter_df.drop(columns=exclusion_list, errors='ignore')
|
|
268 json_snps_file = os.path.join(OUTPUT_JSON_SNPS_DIR, "%s.json" % group_dir)
|
|
269 parsimonious_df = self.get_parsimonious_df(filtered_all_df)
|
|
270 samples_number, columns = parsimonious_df.shape
|
|
271 if samples_number >= 4:
|
|
272 has_sequence_data = self.df_to_fasta(parsimonious_df, group_dir)
|
|
273 if has_sequence_data:
|
|
274 json_avg_mq_file = os.path.join(OUTPUT_JSON_AVG_MQ_DIR, "%s.json" % group_dir)
|
|
275 mq_averages.to_json(json_avg_mq_file, orient='split')
|
|
276 parsimonious_df.to_json(json_snps_file, orient='split')
|
|
277 else:
|
|
278 msg = "<br/>No sequence data"
|
|
279 if group_dir is not None:
|
|
280 msg = "%s for group: %s" % (msg, group_dir)
|
|
281 self.append_to_summary("%s<br/>\n" % msg)
|
|
282 else:
|
|
283 msg = "<br/>Too few samples to build tree"
|
|
284 if group_dir is not None:
|
|
285 msg = "%s for group: %s" % (msg, group_dir)
|
|
286 self.append_to_summary("%s<br/>\n" % msg)
|
|
287
|
|
288 def get_base_file_name(self, file_path):
|
|
289 base_file_name = os.path.basename(file_path)
|
|
290 if base_file_name.find(".") > 0:
|
|
291 # Eliminate the extension.
|
|
292 return os.path.splitext(base_file_name)[0]
|
|
293 elif base_file_name.find("_") > 0:
|
|
294 # The dot extension was likely changed to
|
|
295 # the " character.
|
|
296 items = base_file_name.split("_")
|
|
297 return "_".join(items[0:-1])
|
|
298 else:
|
|
299 return base_file_name
|
|
300
|
|
301 def get_mq_val(self, record_info, filename):
|
|
302 # Get the MQ (gatk) or MQM (freebayes) value
|
|
303 # from the record.INFO component of the vcf file.
|
|
304 try:
|
|
305 mq_val = record_info['MQM']
|
|
306 return self.return_val(mq_val)
|
|
307 except Exception:
|
|
308 try:
|
|
309 mq_val = record_info['MQ']
|
|
310 return self.return_val(mq_val)
|
|
311 except Exception:
|
|
312 msg = "Invalid or unsupported vcf header %s in file: %s\n" % (str(record_info), filename)
|
|
313 sys.exit(msg)
|
|
314
|
|
315 def get_parsimonious_df(self, filtered_all_df):
|
|
316 # Get the parsimonious SNPs data frame
|
|
317 # from a data frame of filtered SNPs.
|
|
318 try:
|
|
319 ref_series = filtered_all_df.loc['root']
|
|
320 # In all_vcf root needs to be removed.
|
|
321 filtered_all_df = filtered_all_df.drop(['root'])
|
|
322 except KeyError:
|
|
323 pass
|
|
324 parsimony = filtered_all_df.loc[:, (filtered_all_df != filtered_all_df.iloc[0]).any()]
|
|
325 parsimony_positions = list(parsimony)
|
|
326 parse_df = filtered_all_df[parsimony_positions]
|
|
327 ref_df = ref_series.to_frame()
|
|
328 ref_df = ref_df.T
|
|
329 parsimonious_df = pandas.concat([parse_df, ref_df], join='inner')
|
|
330 return parsimonious_df
|
|
331
|
|
332 def get_position_list(self, sheet_names, group):
|
|
333 # Get a list of positions defined by an excel file.
|
|
334 exclusion_list = []
|
|
335 try:
|
|
336 filter_to_all = pandas.read_excel(self.excel_grouper_file, header=1, usecols=[group])
|
|
337 for value in filter_to_all.values:
|
|
338 value = str(value[0])
|
|
339 if "-" not in value.split(":")[-1]:
|
|
340 exclusion_list.append(value)
|
|
341 elif "-" in value:
|
|
342 try:
|
|
343 chrom, sequence_range = value.split(":")
|
|
344 except Exception as e:
|
|
345 sys.exit(str(e))
|
|
346 value = sequence_range.split("-")
|
|
347 for position in range(int(value[0].replace(',', '')), int(value[1].replace(',', '')) + 1):
|
|
348 exclusion_list.append(chrom + ":" + str(position))
|
|
349 return exclusion_list
|
|
350 except ValueError:
|
|
351 exclusion_list = []
|
|
352 return exclusion_list
|
|
353
|
|
354 def get_snps(self, task_queue, timeout):
|
|
355 while True:
|
|
356 try:
|
|
357 group_dir = task_queue.get(block=True, timeout=timeout)
|
|
358 except queue.Empty:
|
|
359 break
|
|
360 # Parse all vcf files to accumulate SNPs into a
|
|
361 # data frame.
|
|
362 positions_dict = {}
|
|
363 group_files = []
|
|
364 for file_name in os.listdir(os.path.abspath(group_dir)):
|
|
365 file_path = os.path.abspath(os.path.join(group_dir, file_name))
|
|
366 group_files.append(file_path)
|
|
367 for file_name in group_files:
|
|
368 try:
|
|
369 found_positions, found_positions_mix = self.find_initial_positions(file_name)
|
|
370 positions_dict.update(found_positions)
|
|
371 except Exception as e:
|
|
372 self.append_to_summary("Error updating the positions_dict dictionary when processing file %s:\n%s\n" % (file_name, str(e)))
|
|
373 # Order before adding to file to match
|
|
374 # with ordering of individual samples.
|
|
375 # all_positions is abs_pos:REF
|
|
376 self.all_positions = OrderedDict(sorted(positions_dict.items()))
|
|
377 ref_positions_df = pandas.DataFrame(self.all_positions, index=['root'])
|
|
378 all_map_qualities = {}
|
|
379 df_list = []
|
|
380 for file_name in group_files:
|
|
381 sample_df, file_name_base, sample_map_qualities = self.decide_snps(file_name)
|
|
382 df_list.append(sample_df)
|
|
383 all_map_qualities.update({file_name_base: sample_map_qualities})
|
|
384 all_sample_df = pandas.concat(df_list)
|
|
385 # All positions have now been selected for each sample,
|
|
386 # so select parisomony informative SNPs. This removes
|
|
387 # columns where all fields are the same.
|
|
388 # Add reference to top row.
|
|
389 prefilter_df = pandas.concat([ref_positions_df, all_sample_df], join='inner')
|
|
390 all_mq_df = pandas.DataFrame.from_dict(all_map_qualities)
|
|
391 mq_averages = all_mq_df.mean(axis=1).astype(int)
|
|
392 self.gather_and_filter(prefilter_df, mq_averages, group_dir)
|
|
393 task_queue.task_done()
|
|
394
|
|
395 def group_vcfs(self, vcf_files):
|
|
396 # Parse an excel file to produce a
|
|
397 # grouping dictionary for filtering SNPs.
|
|
398 xl = pandas.ExcelFile(self.excel_grouper_file)
|
|
399 sheet_names = xl.sheet_names
|
|
400 ws = pandas.read_excel(self.excel_grouper_file, sheet_name=sheet_names[0])
|
|
401 defining_snps = ws.iloc[0]
|
|
402 defsnp_iterator = iter(defining_snps.iteritems())
|
|
403 next(defsnp_iterator)
|
|
404 defining_snps = {}
|
|
405 inverted_defining_snps = {}
|
|
406 for abs_pos, group in defsnp_iterator:
|
|
407 if '!' in abs_pos:
|
|
408 inverted_defining_snps[abs_pos.replace('!', '')] = group
|
|
409 else:
|
|
410 defining_snps[abs_pos] = group
|
|
411 samples_groups_dict = {}
|
|
412 for vcf_file in vcf_files:
|
|
413 found_positions, found_positions_mix = self.find_initial_positions(vcf_file)
|
|
414 samples_groups_dict = self.bin_input_files(vcf_file, samples_groups_dict, defining_snps, inverted_defining_snps, found_positions, found_positions_mix)
|
|
415 # Output summary grouping table.
|
|
416 self.append_to_summary('<br/>')
|
|
417 self.append_to_summary('<b>Groupings with %d listed:</b><br/>\n' % len(samples_groups_dict))
|
|
418 self.append_to_summary('<table cellpadding="5" cellspaging="5" border="1">\n')
|
|
419 for key, value in samples_groups_dict.items():
|
|
420 self.append_to_summary('<tr align="left"><th>Sample Name</th>\n')
|
|
421 self.append_to_summary('<td>%s</td>' % key)
|
|
422 for group in value:
|
|
423 self.append_to_summary('<td>%s</td>\n' % group)
|
|
424 self.append_to_summary('</tr>\n')
|
|
425 self.append_to_summary('</table><br/>\n')
|
|
426
|
|
427 def initiate_summary(self, output_summary):
|
|
428 # Output summary file handle.
|
|
429 self.append_to_summary('<html>\n')
|
|
430 self.append_to_summary('<head></head>\n')
|
|
431 self.append_to_summary('<body style=\"font-size:12px;">')
|
|
432 self.append_to_summary("<b>Time started:</b> %s<br/>" % str(get_time_stamp()))
|
|
433 self.append_to_summary("<b>Number of VCF inputs:</b> %d<br/>" % self.num_files)
|
|
434 self.append_to_summary("<b>Reference:</b> %s<br/>" % str(self.reference))
|
|
435 self.append_to_summary("<b>All isolates:</b> %s<br/>" % str(self.all_isolates))
|
|
436
|
|
437 def return_val(self, val, index=0):
|
|
438 # Handle element and single-element list values.
|
|
439 if isinstance(val, list):
|
|
440 return val[index]
|
|
441 return val
|
|
442
|
|
443 def val_as_int(self, val):
|
|
444 # Handle integer value conversion.
|
|
445 try:
|
|
446 return int(val)
|
|
447 except TypeError:
|
|
448 # val is likely None here.
|
|
449 return 0
|
|
450
|
|
451
|
|
452 if __name__ == '__main__':
|
|
453 parser = argparse.ArgumentParser()
|
|
454
|
|
455 parser.add_argument('--all_isolates', action='store', dest='all_isolates', required=False, default="No", help='Create table with all isolates'),
|
|
456 parser.add_argument('--excel_grouper_file', action='store', dest='excel_grouper_file', required=False, default=None, help='Optional Excel filter file'),
|
|
457 parser.add_argument('--output_summary', action='store', dest='output_summary', help='Output summary html file'),
|
|
458 parser.add_argument('--reference', action='store', dest='reference', help='Reference file'),
|
|
459 parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting')
|
|
460
|
|
461 args = parser.parse_args()
|
|
462
|
|
463 # Initializations - TODO: should these be passed in as command line args?
|
|
464 ac = 2
|
|
465 mq_val = 56
|
|
466 n_threshold = 50
|
|
467 qual_threshold = 150
|
|
468
|
|
469 # Build the list of sample vcf files for the current run.
|
|
470 vcf_files = []
|
|
471 for file_name in os.listdir(INPUT_VCF_DIR):
|
|
472 file_path = os.path.abspath(os.path.join(INPUT_VCF_DIR, file_name))
|
|
473 vcf_files.append(file_path)
|
|
474
|
|
475 multiprocessing.set_start_method('spawn')
|
|
476 queue1 = multiprocessing.JoinableQueue()
|
|
477 num_files = len(vcf_files)
|
|
478 cpus = set_num_cpus(num_files, args.processes)
|
|
479 # Set a timeout for get()s in the queue.
|
|
480 timeout = 0.05
|
|
481
|
|
482 # Initialize the snp_finder object.
|
|
483 snp_finder = SnpFinder(num_files, args.reference, args.excel_grouper_file, args.all_isolates,
|
|
484 ac, mq_val, n_threshold, qual_threshold, args.output_summary)
|
|
485
|
|
486 # Initialize the set of directories containiing vcf files for analysis.
|
|
487 vcf_dirs = []
|
|
488 if args.excel_grouper_file is None:
|
|
489 vcf_dirs = setup_all_vcfs(vcf_files, vcf_dirs)
|
|
490 else:
|
|
491 if args.all_isolates.lower() == "yes":
|
|
492 vcf_dirs = setup_all_vcfs(vcf_files, vcf_dirs)
|
|
493 # Parse the Excel file to detemine groups for filtering.
|
|
494 snp_finder.group_vcfs(vcf_files)
|
|
495 # Append the list of group directories created by
|
|
496 # the above call to the set of directories containing
|
|
497 # vcf files for analysis
|
|
498 group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups]
|
|
499 vcf_dirs.extend(group_dirs)
|
|
500
|
|
501 # Populate the queue for job splitting.
|
|
502 for vcf_dir in vcf_dirs:
|
|
503 queue1.put(vcf_dir)
|
|
504
|
|
505 # Complete the get_snps task.
|
|
506 processes = [multiprocessing.Process(target=snp_finder.get_snps, args=(queue1, timeout, )) for _ in range(cpus)]
|
|
507 for p in processes:
|
|
508 p.start()
|
|
509 for p in processes:
|
|
510 p.join()
|
|
511 queue1.join()
|
|
512
|
|
513 # Finish summary log.
|
|
514 snp_finder.append_to_summary("<br/><b>Time finished:</b> %s<br/>\n" % get_time_stamp())
|
|
515 total_run_time = datetime.now() - snp_finder.timer_start
|
|
516 snp_finder.append_to_summary("<br/><b>Total run time:</b> %s<br/>\n" % str(total_run_time))
|
|
517 snp_finder.append_to_summary('</body>\n</html>\n')
|
|
518 with open(args.output_summary, "w") as fh:
|
|
519 fh.write("%s" % snp_finder.summary_str)
|