Mercurial > repos > greg > vsnp_get_snps
comparison vsnp_get_snps.py @ 0:ee4ef1fc23c6 draft
Uploaded
| author | greg |
|---|---|
| date | Tue, 21 Apr 2020 10:14:11 -0400 |
| parents | |
| children | 14285a94fb13 |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:ee4ef1fc23c6 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 | |
| 3 # Collect quality parsimonious SNPs from vcf files and output alignment files in fasta format. | |
| 4 | |
| 5 import argparse | |
| 6 import multiprocessing | |
| 7 import os | |
| 8 import pandas | |
| 9 import queue | |
| 10 import shutil | |
| 11 import sys | |
| 12 import time | |
| 13 import vcf | |
| 14 from collections import OrderedDict | |
| 15 from datetime import datetime | |
| 16 | |
| 17 ALL_VCFS_DIR = 'all_vcf' | |
| 18 INPUT_VCF_DIR = 'input_vcf_dir' | |
| 19 OUTPUT_JSON_AVG_MQ_DIR = 'output_json_avg_mq_dir' | |
| 20 OUTPUT_JSON_SNPS_DIR = 'output_json_snps_dir' | |
| 21 OUTPUT_SNPS_DIR = 'output_snps_dir' | |
| 22 | |
| 23 | |
| 24 def get_time_stamp(): | |
| 25 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S') | |
| 26 | |
| 27 | |
| 28 def set_num_cpus(num_files, processes): | |
| 29 num_cpus = int(multiprocessing.cpu_count()) | |
| 30 if num_files < num_cpus and num_files < processes: | |
| 31 return num_files | |
| 32 if num_cpus < processes: | |
| 33 half_cpus = int(num_cpus / 2) | |
| 34 if num_files < half_cpus: | |
| 35 return num_files | |
| 36 return half_cpus | |
| 37 return processes | |
| 38 | |
| 39 | |
| 40 def setup_all_vcfs(vcf_files, vcf_dirs): | |
| 41 # Create the all_vcfs directory and link | |
| 42 # all input vcf files into it for processing. | |
| 43 os.makedirs(ALL_VCFS_DIR) | |
| 44 vcf_dirs.append(ALL_VCFS_DIR) | |
| 45 for vcf_file in vcf_files: | |
| 46 file_name_base = os.path.basename(vcf_file) | |
| 47 dst_file = os.path.join(ALL_VCFS_DIR, file_name_base) | |
| 48 os.symlink(vcf_file, dst_file) | |
| 49 return vcf_dirs | |
| 50 | |
| 51 | |
| 52 class SnpFinder: | |
| 53 | |
| 54 def __init__(self, num_files, reference, excel_grouper_file, | |
| 55 all_isolates, ac, mq_val, n_threshold, qual_threshold, output_summary): | |
| 56 self.ac = ac | |
| 57 self.all_isolates = all_isolates | |
| 58 self.all_positions = None | |
| 59 # Filter based on the contents of an Excel file. | |
| 60 self.excel_grouper_file = excel_grouper_file | |
| 61 # Use Genbank file | |
| 62 self.groups = [] | |
| 63 # This will be populated from the columns | |
| 64 # in the Excel filter file if it is used. | |
| 65 self.mq_val = mq_val | |
| 66 self.n_threshold = n_threshold | |
| 67 self.qual_threshold = qual_threshold | |
| 68 self.reference = reference | |
| 69 self.start_time = get_time_stamp() | |
| 70 self.summary_str = "" | |
| 71 self.timer_start = datetime.now() | |
| 72 self.num_files = num_files | |
| 73 self.initiate_summary(output_summary) | |
| 74 | |
| 75 def append_to_summary(self, html_str): | |
| 76 self.summary_str = "%s%s" % (self.summary_str, html_str) | |
| 77 | |
| 78 def bin_input_files(self, filename, samples_groups_dict, defining_snps, inverted_defining_snps, found_positions, found_positions_mix): | |
| 79 sample_groups_list = [] | |
| 80 table_name = self.get_base_file_name(filename) | |
| 81 try: | |
| 82 defining_snp = False | |
| 83 # Absolute positions in set union of two lists. | |
| 84 for abs_position in list(defining_snps.keys() & (found_positions.keys() | found_positions_mix.keys())): | |
| 85 group = defining_snps[abs_position] | |
| 86 sample_groups_list.append(group) | |
| 87 self.check_add_group(group) | |
| 88 if len(list(defining_snps.keys() & found_positions_mix.keys())) > 0: | |
| 89 table_name = self.get_base_file_name(filename) | |
| 90 table_name = '%s<font color="red">[[MIXED]]</font>' % table_name | |
| 91 self.copy_file(filename, group) | |
| 92 defining_snp = True | |
| 93 if not set(inverted_defining_snps.keys()).intersection(found_positions.keys() | found_positions_mix.keys()): | |
| 94 for abs_position in list(inverted_defining_snps.keys()): | |
| 95 group = inverted_defining_snps[abs_position] | |
| 96 sample_groups_list.append(group) | |
| 97 self.check_add_group(group) | |
| 98 self.copy_file(filename, group) | |
| 99 defining_snp = True | |
| 100 if defining_snp: | |
| 101 samples_groups_dict[table_name] = sorted(sample_groups_list) | |
| 102 else: | |
| 103 samples_groups_dict[table_name] = ['<font color="red">No defining SNP</font>'] | |
| 104 except TypeError as e: | |
| 105 msg = "<br/>Error processing file %s to generate samples_groups_dict: %s<br/>" % (filename, str(e)) | |
| 106 self.append_to_summary(msg) | |
| 107 samples_groups_dict[table_name] = [msg] | |
| 108 return samples_groups_dict | |
| 109 | |
| 110 def check_add_group(self, group): | |
| 111 if group not in self.groups: | |
| 112 self.groups.append(group) | |
| 113 | |
| 114 def copy_file(self, filename, dir): | |
| 115 if not os.path.exists(dir): | |
| 116 os.makedirs(dir) | |
| 117 shutil.copy(filename, dir) | |
| 118 | |
| 119 def decide_snps(self, filename): | |
| 120 positions_dict = self.all_positions | |
| 121 # Find the SNPs in a vcf file to produce a pandas data | |
| 122 # frame and a dictionary containing sample map qualities. | |
| 123 sample_map_qualities = {} | |
| 124 # Eliminate the path. | |
| 125 file_name_base = self.get_base_file_name(filename) | |
| 126 vcf_reader = vcf.Reader(open(filename, 'r')) | |
| 127 sample_dict = {} | |
| 128 for record in vcf_reader: | |
| 129 alt = str(record.ALT[0]) | |
| 130 record_position = "%s:%s" % (str(record.CHROM), str(record.POS)) | |
| 131 if record_position in positions_dict: | |
| 132 if alt == "None": | |
| 133 sample_dict.update({record_position: "-"}) | |
| 134 else: | |
| 135 # Not sure this is the best place to capture MQM average | |
| 136 # may be faster after parsimony SNPs are decided, but | |
| 137 # then it will require opening the files again. | |
| 138 # On rare occassions MQM gets called "NaN", thus passing | |
| 139 # a string when a number is expected when calculating average. | |
| 140 mq_val = self.get_mq_val(record.INFO, filename) | |
| 141 if str(mq_val).lower() not in ["nan"]: | |
| 142 sample_map_qualities.update({record_position: mq_val}) | |
| 143 # Add parameters here to change what each vcf represents. | |
| 144 # SNP is represented in table, now how will the vcf represent | |
| 145 # the called position alt != "None", which means a deletion | |
| 146 # as alt is not record.FILTER, or rather passed. | |
| 147 len_alt = len(alt) | |
| 148 if len_alt == 1: | |
| 149 qual_val = self.val_as_int(record.QUAL) | |
| 150 ac = record.INFO['AC'][0] | |
| 151 ref = str(record.REF[0]) | |
| 152 if ac == 2 and qual_val > self.n_threshold: | |
| 153 sample_dict.update({record_position: alt}) | |
| 154 elif ac == 1 and qual_val > self.n_threshold: | |
| 155 alt_ref = "%s%s" % (alt, ref) | |
| 156 if alt_ref == "AG": | |
| 157 sample_dict.update({record_position: "R"}) | |
| 158 elif alt_ref == "CT": | |
| 159 sample_dict.update({record_position: "Y"}) | |
| 160 elif alt_ref == "GC": | |
| 161 sample_dict.update({record_position: "S"}) | |
| 162 elif alt_ref == "AT": | |
| 163 sample_dict.update({record_position: "W"}) | |
| 164 elif alt_ref == "GT": | |
| 165 sample_dict.update({record_position: "K"}) | |
| 166 elif alt_ref == "AC": | |
| 167 sample_dict.update({record_position: "M"}) | |
| 168 elif alt_ref == "GA": | |
| 169 sample_dict.update({record_position: "R"}) | |
| 170 elif alt_ref == "TC": | |
| 171 sample_dict.update({record_position: "Y"}) | |
| 172 elif alt_ref == "CG": | |
| 173 sample_dict.update({record_position: "S"}) | |
| 174 elif alt_ref == "TA": | |
| 175 sample_dict.update({record_position: "W"}) | |
| 176 elif alt_ref == "TG": | |
| 177 sample_dict.update({record_position: "K"}) | |
| 178 elif alt_ref == "CA": | |
| 179 sample_dict.update({record_position: "M"}) | |
| 180 else: | |
| 181 sample_dict.update({record_position: "N"}) | |
| 182 # Poor calls | |
| 183 elif qual_val <= 50: | |
| 184 # Do not coerce record.REF[0] to a string! | |
| 185 sample_dict.update({record_position: record.REF[0]}) | |
| 186 elif qual_val <= self.n_threshold: | |
| 187 sample_dict.update({record_position: "N"}) | |
| 188 else: | |
| 189 # Insurance -- Will still report on a possible | |
| 190 # SNP even if missed with above statement | |
| 191 # Do not coerce record.REF[0] to a string! | |
| 192 sample_dict.update({record_position: record.REF[0]}) | |
| 193 # Merge dictionaries and order | |
| 194 merge_dict = {} | |
| 195 # abs_pos:REF | |
| 196 merge_dict.update(positions_dict) | |
| 197 # abs_pos:ALT replacing all_positions, because keys must be unique | |
| 198 merge_dict.update(sample_dict) | |
| 199 sample_df = pandas.DataFrame(merge_dict, index=[file_name_base]) | |
| 200 return sample_df, file_name_base, sample_map_qualities | |
| 201 | |
| 202 def df_to_fasta(self, parsimonious_df, group): | |
| 203 # Generate SNP alignment file from the parsimonious_df | |
| 204 # data frame. | |
| 205 snps_file = os.path.join(OUTPUT_SNPS_DIR, "%s.fasta" % group) | |
| 206 test_duplicates = [] | |
| 207 has_sequence_data = False | |
| 208 for index, row in parsimonious_df.iterrows(): | |
| 209 for pos in row: | |
| 210 if len(pos) > 0: | |
| 211 has_sequence_data = True | |
| 212 break | |
| 213 if has_sequence_data: | |
| 214 with open(snps_file, 'w') as fh: | |
| 215 for index, row in parsimonious_df.iterrows(): | |
| 216 test_duplicates.append(row.name) | |
| 217 if test_duplicates.count(row.name) < 2: | |
| 218 print(f'>{row.name}', file=fh) | |
| 219 for pos in row: | |
| 220 print(pos, end='', file=fh) | |
| 221 print("", file=fh) | |
| 222 return has_sequence_data | |
| 223 | |
| 224 def find_initial_positions(self, filename): | |
| 225 # Find SNP positions in a vcf file. | |
| 226 found_positions = {} | |
| 227 found_positions_mix = {} | |
| 228 try: | |
| 229 vcf_reader = vcf.Reader(open(filename, 'r')) | |
| 230 try: | |
| 231 for record in vcf_reader: | |
| 232 qual_val = self.val_as_int(record.QUAL) | |
| 233 chrom = record.CHROM | |
| 234 position = record.POS | |
| 235 absolute_position = "%s:%s" % (str(chrom), str(position)) | |
| 236 alt = str(record.ALT[0]) | |
| 237 if alt != "None": | |
| 238 mq_val = self.get_mq_val(record.INFO, filename) | |
| 239 ac = record.INFO['AC'][0] | |
| 240 len_ref = len(record.REF) | |
| 241 if ac == self.ac and len_ref == 1 and qual_val > self.qual_threshold and mq_val > self.mq_val: | |
| 242 found_positions.update({absolute_position: record.REF}) | |
| 243 if ac == 1 and len_ref == 1 and qual_val > self.qual_threshold and mq_val > self.mq_val: | |
| 244 found_positions_mix.update({absolute_position: record.REF}) | |
| 245 return found_positions, found_positions_mix | |
| 246 except (ZeroDivisionError, ValueError, UnboundLocalError, TypeError) as e: | |
| 247 self.append_to_summar("<br/>Error parsing record in file %s: %s<br/>" % (filename, str(e))) | |
| 248 return {'': ''}, {'': ''} | |
| 249 except (SyntaxError, AttributeError) as e: | |
| 250 self.append_to_summary("<br/>Error attempting to read file %s: %s<br/>" % (filename, str(e))) | |
| 251 return {'': ''}, {'': ''} | |
| 252 | |
| 253 def gather_and_filter(self, prefilter_df, mq_averages, group_dir): | |
| 254 # Group a data frame of SNPs. | |
| 255 if self.excel_grouper_file is None: | |
| 256 filtered_all_df = prefilter_df | |
| 257 sheet_names = None | |
| 258 else: | |
| 259 # Filter positions to be removed from all. | |
| 260 xl = pandas.ExcelFile(self.excel_grouper_file) | |
| 261 sheet_names = xl.sheet_names | |
| 262 # Use the first column to filter "all" postions. | |
| 263 exclusion_list_all = self.get_position_list(sheet_names, 0) | |
| 264 exclusion_list_group = self.get_position_list(sheet_names, group_dir) | |
| 265 exclusion_list = exclusion_list_all + exclusion_list_group | |
| 266 # Filters for all applied. | |
| 267 filtered_all_df = prefilter_df.drop(columns=exclusion_list, errors='ignore') | |
| 268 json_snps_file = os.path.join(OUTPUT_JSON_SNPS_DIR, "%s.json" % group_dir) | |
| 269 parsimonious_df = self.get_parsimonious_df(filtered_all_df) | |
| 270 samples_number, columns = parsimonious_df.shape | |
| 271 if samples_number >= 4: | |
| 272 has_sequence_data = self.df_to_fasta(parsimonious_df, group_dir) | |
| 273 if has_sequence_data: | |
| 274 json_avg_mq_file = os.path.join(OUTPUT_JSON_AVG_MQ_DIR, "%s.json" % group_dir) | |
| 275 mq_averages.to_json(json_avg_mq_file, orient='split') | |
| 276 parsimonious_df.to_json(json_snps_file, orient='split') | |
| 277 else: | |
| 278 msg = "<br/>No sequence data" | |
| 279 if group_dir is not None: | |
| 280 msg = "%s for group: %s" % (msg, group_dir) | |
| 281 self.append_to_summary("%s<br/>\n" % msg) | |
| 282 else: | |
| 283 msg = "<br/>Too few samples to build tree" | |
| 284 if group_dir is not None: | |
| 285 msg = "%s for group: %s" % (msg, group_dir) | |
| 286 self.append_to_summary("%s<br/>\n" % msg) | |
| 287 | |
| 288 def get_base_file_name(self, file_path): | |
| 289 base_file_name = os.path.basename(file_path) | |
| 290 if base_file_name.find(".") > 0: | |
| 291 # Eliminate the extension. | |
| 292 return os.path.splitext(base_file_name)[0] | |
| 293 elif base_file_name.find("_") > 0: | |
| 294 # The dot extension was likely changed to | |
| 295 # the " character. | |
| 296 items = base_file_name.split("_") | |
| 297 return "_".join(items[0:-1]) | |
| 298 else: | |
| 299 return base_file_name | |
| 300 | |
| 301 def get_mq_val(self, record_info, filename): | |
| 302 # Get the MQ (gatk) or MQM (freebayes) value | |
| 303 # from the record.INFO component of the vcf file. | |
| 304 try: | |
| 305 mq_val = record_info['MQM'] | |
| 306 return self.return_val(mq_val) | |
| 307 except Exception: | |
| 308 try: | |
| 309 mq_val = record_info['MQ'] | |
| 310 return self.return_val(mq_val) | |
| 311 except Exception: | |
| 312 msg = "Invalid or unsupported vcf header %s in file: %s\n" % (str(record_info), filename) | |
| 313 sys.exit(msg) | |
| 314 | |
| 315 def get_parsimonious_df(self, filtered_all_df): | |
| 316 # Get the parsimonious SNPs data frame | |
| 317 # from a data frame of filtered SNPs. | |
| 318 try: | |
| 319 ref_series = filtered_all_df.loc['root'] | |
| 320 # In all_vcf root needs to be removed. | |
| 321 filtered_all_df = filtered_all_df.drop(['root']) | |
| 322 except KeyError: | |
| 323 pass | |
| 324 parsimony = filtered_all_df.loc[:, (filtered_all_df != filtered_all_df.iloc[0]).any()] | |
| 325 parsimony_positions = list(parsimony) | |
| 326 parse_df = filtered_all_df[parsimony_positions] | |
| 327 ref_df = ref_series.to_frame() | |
| 328 ref_df = ref_df.T | |
| 329 parsimonious_df = pandas.concat([parse_df, ref_df], join='inner') | |
| 330 return parsimonious_df | |
| 331 | |
| 332 def get_position_list(self, sheet_names, group): | |
| 333 # Get a list of positions defined by an excel file. | |
| 334 exclusion_list = [] | |
| 335 try: | |
| 336 filter_to_all = pandas.read_excel(self.excel_grouper_file, header=1, usecols=[group]) | |
| 337 for value in filter_to_all.values: | |
| 338 value = str(value[0]) | |
| 339 if "-" not in value.split(":")[-1]: | |
| 340 exclusion_list.append(value) | |
| 341 elif "-" in value: | |
| 342 try: | |
| 343 chrom, sequence_range = value.split(":") | |
| 344 except Exception as e: | |
| 345 sys.exit(str(e)) | |
| 346 value = sequence_range.split("-") | |
| 347 for position in range(int(value[0].replace(',', '')), int(value[1].replace(',', '')) + 1): | |
| 348 exclusion_list.append(chrom + ":" + str(position)) | |
| 349 return exclusion_list | |
| 350 except ValueError: | |
| 351 exclusion_list = [] | |
| 352 return exclusion_list | |
| 353 | |
| 354 def get_snps(self, task_queue, timeout): | |
| 355 while True: | |
| 356 try: | |
| 357 group_dir = task_queue.get(block=True, timeout=timeout) | |
| 358 except queue.Empty: | |
| 359 break | |
| 360 # Parse all vcf files to accumulate SNPs into a | |
| 361 # data frame. | |
| 362 positions_dict = {} | |
| 363 group_files = [] | |
| 364 for file_name in os.listdir(os.path.abspath(group_dir)): | |
| 365 file_path = os.path.abspath(os.path.join(group_dir, file_name)) | |
| 366 group_files.append(file_path) | |
| 367 for file_name in group_files: | |
| 368 try: | |
| 369 found_positions, found_positions_mix = self.find_initial_positions(file_name) | |
| 370 positions_dict.update(found_positions) | |
| 371 except Exception as e: | |
| 372 self.append_to_summary("Error updating the positions_dict dictionary when processing file %s:\n%s\n" % (file_name, str(e))) | |
| 373 # Order before adding to file to match | |
| 374 # with ordering of individual samples. | |
| 375 # all_positions is abs_pos:REF | |
| 376 self.all_positions = OrderedDict(sorted(positions_dict.items())) | |
| 377 ref_positions_df = pandas.DataFrame(self.all_positions, index=['root']) | |
| 378 all_map_qualities = {} | |
| 379 df_list = [] | |
| 380 for file_name in group_files: | |
| 381 sample_df, file_name_base, sample_map_qualities = self.decide_snps(file_name) | |
| 382 df_list.append(sample_df) | |
| 383 all_map_qualities.update({file_name_base: sample_map_qualities}) | |
| 384 all_sample_df = pandas.concat(df_list) | |
| 385 # All positions have now been selected for each sample, | |
| 386 # so select parisomony informative SNPs. This removes | |
| 387 # columns where all fields are the same. | |
| 388 # Add reference to top row. | |
| 389 prefilter_df = pandas.concat([ref_positions_df, all_sample_df], join='inner') | |
| 390 all_mq_df = pandas.DataFrame.from_dict(all_map_qualities) | |
| 391 mq_averages = all_mq_df.mean(axis=1).astype(int) | |
| 392 self.gather_and_filter(prefilter_df, mq_averages, group_dir) | |
| 393 task_queue.task_done() | |
| 394 | |
| 395 def group_vcfs(self, vcf_files): | |
| 396 # Parse an excel file to produce a | |
| 397 # grouping dictionary for filtering SNPs. | |
| 398 xl = pandas.ExcelFile(self.excel_grouper_file) | |
| 399 sheet_names = xl.sheet_names | |
| 400 ws = pandas.read_excel(self.excel_grouper_file, sheet_name=sheet_names[0]) | |
| 401 defining_snps = ws.iloc[0] | |
| 402 defsnp_iterator = iter(defining_snps.iteritems()) | |
| 403 next(defsnp_iterator) | |
| 404 defining_snps = {} | |
| 405 inverted_defining_snps = {} | |
| 406 for abs_pos, group in defsnp_iterator: | |
| 407 if '!' in abs_pos: | |
| 408 inverted_defining_snps[abs_pos.replace('!', '')] = group | |
| 409 else: | |
| 410 defining_snps[abs_pos] = group | |
| 411 samples_groups_dict = {} | |
| 412 for vcf_file in vcf_files: | |
| 413 found_positions, found_positions_mix = self.find_initial_positions(vcf_file) | |
| 414 samples_groups_dict = self.bin_input_files(vcf_file, samples_groups_dict, defining_snps, inverted_defining_snps, found_positions, found_positions_mix) | |
| 415 # Output summary grouping table. | |
| 416 self.append_to_summary('<br/>') | |
| 417 self.append_to_summary('<b>Groupings with %d listed:</b><br/>\n' % len(samples_groups_dict)) | |
| 418 self.append_to_summary('<table cellpadding="5" cellspaging="5" border="1">\n') | |
| 419 for key, value in samples_groups_dict.items(): | |
| 420 self.append_to_summary('<tr align="left"><th>Sample Name</th>\n') | |
| 421 self.append_to_summary('<td>%s</td>' % key) | |
| 422 for group in value: | |
| 423 self.append_to_summary('<td>%s</td>\n' % group) | |
| 424 self.append_to_summary('</tr>\n') | |
| 425 self.append_to_summary('</table><br/>\n') | |
| 426 | |
| 427 def initiate_summary(self, output_summary): | |
| 428 # Output summary file handle. | |
| 429 self.append_to_summary('<html>\n') | |
| 430 self.append_to_summary('<head></head>\n') | |
| 431 self.append_to_summary('<body style=\"font-size:12px;">') | |
| 432 self.append_to_summary("<b>Time started:</b> %s<br/>" % str(get_time_stamp())) | |
| 433 self.append_to_summary("<b>Number of VCF inputs:</b> %d<br/>" % self.num_files) | |
| 434 self.append_to_summary("<b>Reference:</b> %s<br/>" % str(self.reference)) | |
| 435 self.append_to_summary("<b>All isolates:</b> %s<br/>" % str(self.all_isolates)) | |
| 436 | |
| 437 def return_val(self, val, index=0): | |
| 438 # Handle element and single-element list values. | |
| 439 if isinstance(val, list): | |
| 440 return val[index] | |
| 441 return val | |
| 442 | |
| 443 def val_as_int(self, val): | |
| 444 # Handle integer value conversion. | |
| 445 try: | |
| 446 return int(val) | |
| 447 except TypeError: | |
| 448 # val is likely None here. | |
| 449 return 0 | |
| 450 | |
| 451 | |
| 452 if __name__ == '__main__': | |
| 453 parser = argparse.ArgumentParser() | |
| 454 | |
| 455 parser.add_argument('--all_isolates', action='store', dest='all_isolates', required=False, default="No", help='Create table with all isolates'), | |
| 456 parser.add_argument('--excel_grouper_file', action='store', dest='excel_grouper_file', required=False, default=None, help='Optional Excel filter file'), | |
| 457 parser.add_argument('--output_summary', action='store', dest='output_summary', help='Output summary html file'), | |
| 458 parser.add_argument('--reference', action='store', dest='reference', help='Reference file'), | |
| 459 parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting') | |
| 460 | |
| 461 args = parser.parse_args() | |
| 462 | |
| 463 # Initializations - TODO: should these be passed in as command line args? | |
| 464 ac = 2 | |
| 465 mq_val = 56 | |
| 466 n_threshold = 50 | |
| 467 qual_threshold = 150 | |
| 468 | |
| 469 # Build the list of sample vcf files for the current run. | |
| 470 vcf_files = [] | |
| 471 for file_name in os.listdir(INPUT_VCF_DIR): | |
| 472 file_path = os.path.abspath(os.path.join(INPUT_VCF_DIR, file_name)) | |
| 473 vcf_files.append(file_path) | |
| 474 | |
| 475 multiprocessing.set_start_method('spawn') | |
| 476 queue1 = multiprocessing.JoinableQueue() | |
| 477 num_files = len(vcf_files) | |
| 478 cpus = set_num_cpus(num_files, args.processes) | |
| 479 # Set a timeout for get()s in the queue. | |
| 480 timeout = 0.05 | |
| 481 | |
| 482 # Initialize the snp_finder object. | |
| 483 snp_finder = SnpFinder(num_files, args.reference, args.excel_grouper_file, args.all_isolates, | |
| 484 ac, mq_val, n_threshold, qual_threshold, args.output_summary) | |
| 485 | |
| 486 # Initialize the set of directories containiing vcf files for analysis. | |
| 487 vcf_dirs = [] | |
| 488 if args.excel_grouper_file is None: | |
| 489 vcf_dirs = setup_all_vcfs(vcf_files, vcf_dirs) | |
| 490 else: | |
| 491 if args.all_isolates.lower() == "yes": | |
| 492 vcf_dirs = setup_all_vcfs(vcf_files, vcf_dirs) | |
| 493 # Parse the Excel file to detemine groups for filtering. | |
| 494 snp_finder.group_vcfs(vcf_files) | |
| 495 # Append the list of group directories created by | |
| 496 # the above call to the set of directories containing | |
| 497 # vcf files for analysis | |
| 498 group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups] | |
| 499 vcf_dirs.extend(group_dirs) | |
| 500 | |
| 501 # Populate the queue for job splitting. | |
| 502 for vcf_dir in vcf_dirs: | |
| 503 queue1.put(vcf_dir) | |
| 504 | |
| 505 # Complete the get_snps task. | |
| 506 processes = [multiprocessing.Process(target=snp_finder.get_snps, args=(queue1, timeout, )) for _ in range(cpus)] | |
| 507 for p in processes: | |
| 508 p.start() | |
| 509 for p in processes: | |
| 510 p.join() | |
| 511 queue1.join() | |
| 512 | |
| 513 # Finish summary log. | |
| 514 snp_finder.append_to_summary("<br/><b>Time finished:</b> %s<br/>\n" % get_time_stamp()) | |
| 515 total_run_time = datetime.now() - snp_finder.timer_start | |
| 516 snp_finder.append_to_summary("<br/><b>Total run time:</b> %s<br/>\n" % str(total_run_time)) | |
| 517 snp_finder.append_to_summary('</body>\n</html>\n') | |
| 518 with open(args.output_summary, "w") as fh: | |
| 519 fh.write("%s" % snp_finder.summary_str) |
