Mercurial > repos > greg > vsnp_build_tables
diff vsnp_build_tables.py @ 3:abfb861df879 draft
Uploaded
author | greg |
---|---|
date | Sun, 03 Jan 2021 16:21:29 +0000 |
parents | b60858c3eb91 |
children | f641e52353e8 |
line wrap: on
line diff
--- a/vsnp_build_tables.py Thu Apr 30 15:55:22 2020 -0400 +++ b/vsnp_build_tables.py Sun Jan 03 16:21:29 2021 +0000 @@ -1,17 +1,13 @@ #!/usr/bin/env python import argparse -import multiprocessing import os +import re + import pandas -import queue import pandas.io.formats.excel -import re from Bio import SeqIO -INPUT_JSON_AVG_MQ_DIR = 'input_json_avg_mq_dir' -INPUT_JSON_DIR = 'input_json_dir' -INPUT_NEWICK_DIR = 'input_newick_dir' # Maximum columns allowed in a LibreOffice # spreadsheet is 1024. Excel allows for # 16,384 columns, but we'll set the lower @@ -32,7 +28,7 @@ # Create an annotation file. annotation_file = "%s_annotations.csv" % group with open(annotation_file, "a") as fh: - for index, row in positions.iterrows(): + for _, row in positions.iterrows(): pos = row.position try: aaa = pro.iloc[pro.index.get_loc(int(pos))][['chrom', 'locus', 'product', 'gene']] @@ -144,18 +140,12 @@ return annotation_dict -def get_base_file_name(file_path): +def get_sample_name(file_path): base_file_name = os.path.basename(file_path) if base_file_name.find(".") > 0: # Eliminate the extension. return os.path.splitext(base_file_name)[0] - elif base_file_name.find("_") > 0: - # The dot extension was likely changed to - # the " character. - items = base_file_name.split("_") - return "_".join(items[0:-1]) - else: - return base_file_name + return base_file_name def output_cascade_table(cascade_order, mqdf, group, annotation_dict): @@ -168,17 +158,20 @@ # is used by the excel_formatter. if count is None: if group is None: - json_file_name = "%s_order_mq.json" % type_str + json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_order_mq.json" % type_str) excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_table.xlsx" % type_str) else: - json_file_name = "%s_%s_order_mq.json" % (group, type_str) + json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_order_mq.json" % (group, type_str)) excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table.xlsx" % (group, type_str)) else: + # The table has more columns than is allowed by the + # MAXCOLS setting, so multiple files will be produced + # as an output collection. if group is None: - json_file_name = "%s_order_mq_%d.json" % (type_str, count) + json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_order_mq_%d.json" % (type_str, count)) excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_table_%d.xlsx" % (type_str, count)) else: - json_file_name = "%s_%s_order_mq_%d.json" % (group, type_str, count) + json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_order_mq_%d.json" % (group, type_str, count)) excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table_%d.xlsx" % (group, type_str, count)) df.to_json(json_file_name, orient='split') # Output the Excel file. @@ -228,94 +221,74 @@ output_excel(df, type_str, group_str, annotation_dict) -def preprocess_tables(task_queue, annotation_dict, timeout): - while True: - try: - tup = task_queue.get(block=True, timeout=timeout) - except queue.Empty: - break - newick_file, json_file, json_avg_mq_file = tup - avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split') - # Map quality to dataframe. - mqdf = avg_mq_series.to_frame(name='MQ') - mqdf = mqdf.T - # Get the group. - group = get_base_file_name(newick_file) - snps_df = pandas.read_json(json_file, orient='split') - with open(newick_file, 'r') as fh: - for line in fh: - line = re.sub('[:,]', '\n', line) - line = re.sub('[)(]', '', line) - line = re.sub(r'[0-9].*\.[0-9].*\n', '', line) - line = re.sub('root\n', '', line) - sample_order = line.split('\n') - sample_order = list([_f for _f in sample_order if _f]) - sample_order.insert(0, 'root') - tree_order = snps_df.loc[sample_order] - # Count number of SNPs in each column. - snp_per_column = [] - for column_header in tree_order: - count = 0 - column = tree_order[column_header] - for element in column: - if element != column[0]: - count = count + 1 - snp_per_column.append(count) - row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column") - # Count number of SNPS from the - # top of each column in the table. - snp_from_top = [] - for column_header in tree_order: - count = 0 - column = tree_order[column_header] - # for each element in the column - # skip the first element - for element in column[1:]: - if element == column[0]: - count = count + 1 - else: - break - snp_from_top.append(count) - row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top") - tree_order = tree_order.append([row1]) - tree_order = tree_order.append([row2]) - # In pandas=0.18.1 even this does not work: - # abc = row1.to_frame() - # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18) - # tree_order.append(abc) - # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions" - tree_order = tree_order.T - tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False]) - tree_order = tree_order.T - # Remove snp_per_column and snp_from_top rows. - cascade_order = tree_order[:-2] - # Output the cascade table. - output_cascade_table(cascade_order, mqdf, group, annotation_dict) - # Output the sorted table. - output_sort_table(cascade_order, mqdf, group, annotation_dict) - task_queue.task_done() - - -def set_num_cpus(num_files, processes): - num_cpus = int(multiprocessing.cpu_count()) - if num_files < num_cpus and num_files < processes: - return num_files - if num_cpus < processes: - half_cpus = int(num_cpus / 2) - if num_files < half_cpus: - return num_files - return half_cpus - return processes +def preprocess_tables(newick_file, json_file, json_avg_mq_file, annotation_dict): + avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split') + # Map quality to dataframe. + mqdf = avg_mq_series.to_frame(name='MQ') + mqdf = mqdf.T + # Get the group. + group = get_sample_name(newick_file) + snps_df = pandas.read_json(json_file, orient='split') + with open(newick_file, 'r') as fh: + for line in fh: + line = re.sub('[:,]', '\n', line) + line = re.sub('[)(]', '', line) + line = re.sub(r'[0-9].*\.[0-9].*\n', '', line) + line = re.sub('root\n', '', line) + sample_order = line.split('\n') + sample_order = list([_f for _f in sample_order if _f]) + sample_order.insert(0, 'root') + tree_order = snps_df.loc[sample_order] + # Count number of SNPs in each column. + snp_per_column = [] + for column_header in tree_order: + count = 0 + column = tree_order[column_header] + for element in column: + if element != column[0]: + count = count + 1 + snp_per_column.append(count) + row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column") + # Count number of SNPS from the + # top of each column in the table. + snp_from_top = [] + for column_header in tree_order: + count = 0 + column = tree_order[column_header] + # for each element in the column + # skip the first element + for element in column[1:]: + if element == column[0]: + count = count + 1 + else: + break + snp_from_top.append(count) + row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top") + tree_order = tree_order.append([row1]) + tree_order = tree_order.append([row2]) + # In pandas=0.18.1 even this does not work: + # abc = row1.to_frame() + # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18) + # tree_order.append(abc) + # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions" + tree_order = tree_order.T + tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False]) + tree_order = tree_order.T + # Remove snp_per_column and snp_from_top rows. + cascade_order = tree_order[:-2] + # Output the cascade table. + output_cascade_table(cascade_order, mqdf, group, annotation_dict) + # Output the sorted table. + output_sort_table(cascade_order, mqdf, group, annotation_dict) if __name__ == '__main__': parser = argparse.ArgumentParser() - parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', required=False, default=None, help='Average MQ json file') - parser.add_argument('--input_newick', action='store', dest='input_newick', required=False, default=None, help='Newick file') - parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', required=False, default=None, help='SNPs json file') parser.add_argument('--gbk_file', action='store', dest='gbk_file', required=False, default=None, help='Optional gbk file'), - parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting') + parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', help='Average MQ json file') + parser.add_argument('--input_newick', action='store', dest='input_newick', help='Newick file') + parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', help='SNPs json file') args = parser.parse_args() @@ -326,56 +299,4 @@ else: annotation_dict = None - # The assumption here is that the list of files - # in both INPUT_NEWICK_DIR and INPUT_JSON_DIR are - # named such that they are properly matched if - # the directories contain more than 1 file (i.e., - # hopefully the newick file names and json file names - # will be something like Mbovis-01D6_* so they can be - # sorted and properly associated with each other). - if args.input_newick is not None: - newick_files = [args.input_newick] - else: - newick_files = [] - for file_name in sorted(os.listdir(INPUT_NEWICK_DIR)): - file_path = os.path.abspath(os.path.join(INPUT_NEWICK_DIR, file_name)) - newick_files.append(file_path) - if args.input_snps_json is not None: - json_files = [args.input_snps_json] - else: - json_files = [] - for file_name in sorted(os.listdir(INPUT_JSON_DIR)): - file_path = os.path.abspath(os.path.join(INPUT_JSON_DIR, file_name)) - json_files.append(file_path) - if args.input_avg_mq_json is not None: - json_avg_mq_files = [args.input_avg_mq_json] - else: - json_avg_mq_files = [] - for file_name in sorted(os.listdir(INPUT_JSON_AVG_MQ_DIR)): - file_path = os.path.abspath(os.path.join(INPUT_JSON_AVG_MQ_DIR, file_name)) - json_avg_mq_files.append(file_path) - - multiprocessing.set_start_method('spawn') - queue1 = multiprocessing.JoinableQueue() - queue2 = multiprocessing.JoinableQueue() - num_files = len(newick_files) - cpus = set_num_cpus(num_files, args.processes) - # Set a timeout for get()s in the queue. - timeout = 0.05 - - for i, newick_file in enumerate(newick_files): - json_file = json_files[i] - json_avg_mq_file = json_avg_mq_files[i] - queue1.put((newick_file, json_file, json_avg_mq_file)) - - # Complete the preprocess_tables task. - processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(cpus)] - for p in processes: - p.start() - for p in processes: - p.join() - queue1.join() - - if queue1.empty(): - queue1.close() - queue1.join_thread() + preprocess_tables(args.input_newick, args.input_snps_json, args.input_avg_mq_json, annotation_dict)