Mercurial > repos > iuc > vsnp_get_snps
diff vsnp_build_tables.py @ 1:9ac0b1d5560d draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/vsnp commit 2a94c64d6c7236550bf483d2ffc4e86248c63aab"
author | iuc |
---|---|
date | Tue, 16 Nov 2021 20:11:30 +0000 |
parents | ec6e02f4eab7 |
children | 6fdfdee72f8a |
line wrap: on
line diff
--- a/vsnp_build_tables.py Tue Nov 16 08:26:58 2021 +0000 +++ b/vsnp_build_tables.py Tue Nov 16 20:11:30 2021 +0000 @@ -1,7 +1,9 @@ #!/usr/bin/env python import argparse +import multiprocessing import os +import queue import re import pandas @@ -16,6 +18,9 @@ # to use LibreOffice for Excel spreadsheets. MAXCOLS = 1024 OUTPUT_EXCEL_DIR = 'output_excel_dir' +INPUT_JSON_AVG_MQ_DIR = 'input_json_avg_mq_dir' +INPUT_JSON_DIR = 'input_json_dir' +INPUT_NEWICK_DIR = 'input_newick_dir' def annotate_table(table_df, group, annotation_dict): @@ -221,74 +226,94 @@ output_excel(df, type_str, group_str, annotation_dict) -def preprocess_tables(newick_file, json_file, json_avg_mq_file, annotation_dict): - avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split') - # Map quality to dataframe. - mqdf = avg_mq_series.to_frame(name='MQ') - mqdf = mqdf.T - # Get the group. - group = get_sample_name(newick_file) - snps_df = pandas.read_json(json_file, orient='split') - with open(newick_file, 'r') as fh: - for line in fh: - line = re.sub('[:,]', '\n', line) - line = re.sub('[)(]', '', line) - line = re.sub(r'[0-9].*\.[0-9].*\n', '', line) - line = re.sub('root\n', '', line) - sample_order = line.split('\n') - sample_order = list([_f for _f in sample_order if _f]) - sample_order.insert(0, 'root') - tree_order = snps_df.loc[sample_order] - # Count number of SNPs in each column. - snp_per_column = [] - for column_header in tree_order: - count = 0 - column = tree_order[column_header] - for element in column: - if element != column[0]: - count = count + 1 - snp_per_column.append(count) - row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column") - # Count number of SNPS from the - # top of each column in the table. - snp_from_top = [] - for column_header in tree_order: - count = 0 - column = tree_order[column_header] - # for each element in the column - # skip the first element - for element in column[1:]: - if element == column[0]: - count = count + 1 - else: - break - snp_from_top.append(count) - row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top") - tree_order = tree_order.append([row1]) - tree_order = tree_order.append([row2]) - # In pandas=0.18.1 even this does not work: - # abc = row1.to_frame() - # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18) - # tree_order.append(abc) - # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions" - tree_order = tree_order.T - tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False]) - tree_order = tree_order.T - # Remove snp_per_column and snp_from_top rows. - cascade_order = tree_order[:-2] - # Output the cascade table. - output_cascade_table(cascade_order, mqdf, group, annotation_dict) - # Output the sorted table. - output_sort_table(cascade_order, mqdf, group, annotation_dict) +def preprocess_tables(task_queue, annotation_dict, timeout): + while True: + try: + tup = task_queue.get(block=True, timeout=timeout) + except queue.Empty: + break + newick_file, json_file, json_avg_mq_file = tup + avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split') + # Map quality to dataframe. + mqdf = avg_mq_series.to_frame(name='MQ') + mqdf = mqdf.T + # Get the group. + group = get_sample_name(newick_file) + snps_df = pandas.read_json(json_file, orient='split') + with open(newick_file, 'r') as fh: + for line in fh: + line = re.sub('[:,]', '\n', line) + line = re.sub('[)(]', '', line) + line = re.sub(r'[0-9].*\.[0-9].*\n', '', line) + line = re.sub('root\n', '', line) + sample_order = line.split('\n') + sample_order = list([_f for _f in sample_order if _f]) + sample_order.insert(0, 'root') + tree_order = snps_df.loc[sample_order] + # Count number of SNPs in each column. + snp_per_column = [] + for column_header in tree_order: + count = 0 + column = tree_order[column_header] + for element in column: + if element != column[0]: + count = count + 1 + snp_per_column.append(count) + row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column") + # Count number of SNPS from the + # top of each column in the table. + snp_from_top = [] + for column_header in tree_order: + count = 0 + column = tree_order[column_header] + # for each element in the column + # skip the first element + for element in column[1:]: + if element == column[0]: + count = count + 1 + else: + break + snp_from_top.append(count) + row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top") + tree_order = tree_order.append([row1]) + tree_order = tree_order.append([row2]) + # In pandas=0.18.1 even this does not work: + # abc = row1.to_frame() + # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18) + # tree_order.append(abc) + # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions" + tree_order = tree_order.T + tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False]) + tree_order = tree_order.T + # Remove snp_per_column and snp_from_top rows. + cascade_order = tree_order[:-2] + # Output the cascade table. + output_cascade_table(cascade_order, mqdf, group, annotation_dict) + # Output the sorted table. + output_sort_table(cascade_order, mqdf, group, annotation_dict) + task_queue.task_done() + + +def set_num_cpus(num_files, processes): + num_cpus = int(multiprocessing.cpu_count()) + if num_files < num_cpus and num_files < processes: + return num_files + if num_cpus < processes: + half_cpus = int(num_cpus / 2) + if num_files < half_cpus: + return num_files + return half_cpus + return processes if __name__ == '__main__': parser = argparse.ArgumentParser() + parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', required=False, default=None, help='Average MQ json file') + parser.add_argument('--input_newick', action='store', dest='input_newick', required=False, default=None, help='Newick file') + parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', required=False, default=None, help='SNPs json file') parser.add_argument('--gbk_file', action='store', dest='gbk_file', required=False, default=None, help='Optional gbk file'), - parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', help='Average MQ json file') - parser.add_argument('--input_newick', action='store', dest='input_newick', help='Newick file') - parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', help='SNPs json file') + parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting') args = parser.parse_args() @@ -299,4 +324,56 @@ else: annotation_dict = None - preprocess_tables(args.input_newick, args.input_snps_json, args.input_avg_mq_json, annotation_dict) + # The assumption here is that the list of files + # in both INPUT_NEWICK_DIR and INPUT_JSON_DIR are + # named such that they are properly matched if + # the directories contain more than 1 file (i.e., + # hopefully the newick file names and json file names + # will be something like Mbovis-01D6_* so they can be + # sorted and properly associated with each other). + if args.input_newick is not None: + newick_files = [args.input_newick] + else: + newick_files = [] + for file_name in sorted(os.listdir(INPUT_NEWICK_DIR)): + file_path = os.path.abspath(os.path.join(INPUT_NEWICK_DIR, file_name)) + newick_files.append(file_path) + if args.input_snps_json is not None: + json_files = [args.input_snps_json] + else: + json_files = [] + for file_name in sorted(os.listdir(INPUT_JSON_DIR)): + file_path = os.path.abspath(os.path.join(INPUT_JSON_DIR, file_name)) + json_files.append(file_path) + if args.input_avg_mq_json is not None: + json_avg_mq_files = [args.input_avg_mq_json] + else: + json_avg_mq_files = [] + for file_name in sorted(os.listdir(INPUT_JSON_AVG_MQ_DIR)): + file_path = os.path.abspath(os.path.join(INPUT_JSON_AVG_MQ_DIR, file_name)) + json_avg_mq_files.append(file_path) + + multiprocessing.set_start_method('spawn') + queue1 = multiprocessing.JoinableQueue() + queue2 = multiprocessing.JoinableQueue() + num_files = len(newick_files) + cpus = set_num_cpus(num_files, args.processes) + # Set a timeout for get()s in the queue. + timeout = 0.05 + + for i, newick_file in enumerate(newick_files): + json_file = json_files[i] + json_avg_mq_file = json_avg_mq_files[i] + queue1.put((newick_file, json_file, json_avg_mq_file)) + + # Complete the preprocess_tables task. + processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(cpus)] + for p in processes: + p.start() + for p in processes: + p.join() + queue1.join() + + if queue1.empty(): + queue1.close() + queue1.join_thread()