Mercurial > repos > greg > vsnp_statistics
diff vsnp_statistics.py @ 1:14e29f7d59ca draft
Uploaded
author | greg |
---|---|
date | Wed, 29 Apr 2020 16:56:10 -0400 |
parents | c21d338dbdc4 |
children | 321a8259e3f9 |
line wrap: on
line diff
--- a/vsnp_statistics.py Tue Apr 21 10:19:53 2020 -0400 +++ b/vsnp_statistics.py Wed Apr 29 16:56:10 2020 -0400 @@ -2,19 +2,22 @@ import argparse import gzip -import multiprocessing import numpy import os import pandas -import queue +import shutil INPUT_IDXSTATS_DIR = 'input_idxstats' INPUT_METRICS_DIR = 'input_metrics' INPUT_READS_DIR = 'input_reads' -OUTPUT_DIR = 'output' QUALITYKEY = {'!':'0', '"':'1', '#':'2', '$':'3', '%':'4', '&':'5', "'":'6', '(':'7', ')':'8', '*':'9', '+':'10', ',':'11', '-':'12', '.':'13', '/':'14', '0':'15', '1':'16', '2':'17', '3':'18', '4':'19', '5':'20', '6':'21', '7':'22', '8':'23', '9':'24', ':':'25', ';':'26', '<':'27', '=':'28', '>':'29', '?':'30', '@':'31', 'A':'32', 'B':'33', 'C':'34', 'D':'35', 'E':'36', 'F':'37', 'G':'38', 'H':'39', 'I':'40', 'J':'41', 'K':'42', 'L':'43', 'M':'44', 'N':'45', 'O':'46', 'P':'47', 'Q':'48', 'R':'49', 'S':'50', 'T':'51', 'U':'52', 'V':'53', 'W':'54', 'X':'55', 'Y':'56', 'Z':'57', '_':'1', ']':'1', '[':'1', '\\':'1', '\n':'1', '`':'1', 'a':'1', 'b':'1', 'c':'1', 'd':'1', 'e':'1', 'f':'1', 'g':'1', 'h':'1', 'i':'1', 'j':'1', 'k':'1', 'l':'1', 'm':'1', 'n':'1', 'o':'1', 'p':'1', 'q':'1', 'r':'1', 's':'1', 't':'1', 'u':'1', 'v':'1', 'w':'1', 'x':'1', 'y':'1', 'z':'1', ' ':'1'} -READCOLUMNS = ['Sample', 'Reference', 'Fastq File', 'Size', 'Total Reads', 'Mean Read Length', 'Mean Read Quality', 'Reads Passing Q30'] -SEP = "\t" + + +def fastq_to_df(fastq_file, gzipped): + if gzipped.lower() == "true": + return pandas.read_csv(gzip.open(fastq_file, "r"), header=None, sep="^") + else: + return pandas.read_csv(open(fastq_file, "r"), header=None, sep="^") def get_base_file_name(file_path): @@ -52,121 +55,108 @@ return '??? bytes' -def output_read_stats(gzipped, fastq_file, ofh, sampling_number=10000, output_sample=False, dbkey=None, collection=False): - file_name_base = os.path.basename(fastq_file) - # Output a 2-column file where column 1 is - # the labels and column 2 is the values. - if output_sample: - # The Sample and Reference columns should be - # output only once, so this block handles - # paired reads, where the columns are not - # output for Read2. - try: - # Illumina read file names are something like: - # 13-1941-6_S4_L001_R1_600000_fastq_gz - sample = file_name_base.split("_")[0] - except Exception: - sample = "" - # Sample - ofh.write("Sample%s%s\n" % (SEP, sample)) - ofh.write("Reference%s%s\n" % (SEP, dbkey)) - if collection: - read = "Read" +def output_statistics(reads_files, idxstats_files, metrics_files, output_file, gzipped, dbkey): + # Produce an Excel spreadsheet that + # contains a row for each sample. + columns = ['Reference', 'File Size', 'Mean Read Length', 'Mean Read Quality', 'Reads Passing Q30', + 'Total Reads', 'All Mapped Reads', 'Unmapped Reads', 'Unmapped Reads Percentage of Total', + 'Reference with Coverage', 'Average Depth of Coverage', 'Good SNP Count'] + data_frames = [] + for i, fastq_file in enumerate(reads_files): + idxstats_file = idxstats_files[i] + metrics_file = metrics_files[i] + file_name_base = os.path.basename(fastq_file) + # Read fastq_file into a data frame. + fastq_df = fastq_to_df(fastq_file, gzipped) + total_reads = int(len(fastq_df.index) / 4) + current_sample_df = pandas.DataFrame(index=[file_name_base], columns=columns) + # Reference + current_sample_df.at[file_name_base, 'Reference'] = dbkey + # File Size + current_sample_df.at[file_name_base, 'File Size'] = nice_size(os.path.getsize(fastq_file)) + # Mean Read Length + sampling_size = 10000 + if sampling_size > total_reads: + sampling_size = total_reads + fastq_df = fastq_df.iloc[3::4].sample(sampling_size) + dict_mean = {} + list_length = [] + for index, row in fastq_df.iterrows(): + base_qualities = [] + for base in list(row.array[0]): + base_qualities.append(int(QUALITYKEY[base])) + dict_mean[index] = numpy.mean(base_qualities) + list_length.append(len(row.array[0])) + current_sample_df.at[file_name_base, 'Mean Read Length'] = "%.1f" % numpy.mean(list_length) + # Mean Read Quality + df_mean = pandas.DataFrame.from_dict(dict_mean, orient='index', columns=['ave']) + current_sample_df.at[file_name_base, 'Mean Read Quality'] = "%.1f" % df_mean['ave'].mean() + # Reads Passing Q30 + reads_gt_q30 = len(df_mean[df_mean['ave'] >= 30]) + reads_passing_q30 = "{:10.2f}".format(reads_gt_q30 / sampling_size) + current_sample_df.at[file_name_base, 'Reads Passing Q30'] = reads_passing_q30 + # Total Reads + current_sample_df.at[file_name_base, 'Total Reads'] = total_reads + # All Mapped Reads + all_mapped_reads, unmapped_reads = process_idxstats_file(idxstats_file) + current_sample_df.at[file_name_base, 'All Mapped Reads'] = all_mapped_reads + # Unmapped Reads + current_sample_df.at[file_name_base, 'Unmapped Reads'] = unmapped_reads + # Unmapped Reads Percentage of Total + if unmapped_reads > 0: + unmapped_reads_percentage = "{:10.2f}".format(unmapped_reads / total_reads) else: - read = "Read1" - else: - read = "Read2" - # Read - ofh.write("%s File%s%s\n" % (read, SEP, file_name_base)) - # File Size - ofh.write("%s File Size%s%s\n" % (read, SEP, nice_size(os.path.getsize(fastq_file)))) - if gzipped.lower() == "true": - df = pandas.read_csv(gzip.open(fastq_file, "r"), header=None, sep="^") - else: - df = pandas.read_csv(open(fastq_file, "r"), header=None, sep="^") - total_read_count = int(len(df.index) / 4) - # Readx Total Reads - ofh.write("%s Total Reads%s%s\n" % (read, SEP, total_read_count)) - # Mean Read Length - sampling_size = int(sampling_number) - if sampling_size > total_read_count: - sampling_size = total_read_count - df = df.iloc[3::4].sample(sampling_size) - dict_mean = {} - list_length = [] - for index, row in df.iterrows(): - base_qualities = [] - for base in list(row.array[0]): - base_qualities.append(int(QUALITYKEY[base])) - dict_mean[index] = numpy.mean(base_qualities) - list_length.append(len(row.array[0])) - ofh.write("%s Mean Read Length%s%s\n" % (read, SEP, "%.1f" % numpy.mean(list_length))) - # Mean Read Quality - df_mean = pandas.DataFrame.from_dict(dict_mean, orient='index', columns=['ave']) - ofh.write("%s Mean Read Quality%s%s\n" % (read, SEP, "%.1f" % df_mean['ave'].mean())) - # Reads Passing Q30 - reads_gt_q30 = len(df_mean[df_mean['ave'] >= 30]) - reads_passing_q30 = "{:10.2f}".format(reads_gt_q30 / sampling_size) - ofh.write("%s reads passing Q30%s%s\n" % (read, SEP, reads_passing_q30)) - return total_read_count + unmapped_reads_percentage = 0 + current_sample_df.at[file_name_base, 'Unmapped Reads Percentage of Total'] = unmapped_reads_percentage + # Reference with Coverage + ref_with_coverage, avg_depth_of_coverage, good_snp_count = process_metrics_file(metrics_file) + current_sample_df.at[file_name_base, 'Reference with Coverage'] = ref_with_coverage + # Average Depth of Coverage + current_sample_df.at[file_name_base, 'Average Depth of Coverage'] = avg_depth_of_coverage + # Good SNP Count + current_sample_df.at[file_name_base, 'Good SNP Count'] = good_snp_count + data_frames.append(current_sample_df) + excel_df = pandas.concat(data_frames) + excel_file_name = "output.xlsx" + writer = pandas.ExcelWriter(excel_file_name, engine='xlsxwriter') + excel_df.to_excel(writer, sheet_name='Sheet1') + writer.save() + shutil.move(excel_file_name, output_file) -def output_statistics(task_queue, read2, collection, gzipped, dbkey, timeout): - while True: - try: - tup = task_queue.get(block=True, timeout=timeout) - except queue.Empty: - break - read_file, idxstats_file, metrics_file, output_file = tup - total_reads = 0 - with open(output_file, "w") as ofh: - total_reads += output_read_stats(gzipped, read_file, ofh, output_sample=True, dbkey=dbkey, collection=collection) - if read2 is not None: - total_reads += output_read_stats(gzipped, read2, ofh) - ofh.write("Total Reads%s%d\n" % (SEP, total_reads)) - with open(idxstats_file, "r") as ifh: - unmapped_reads = 0 - for i, line in enumerate(ifh): - items = line.split("\t") - if i == 0: - # NC_002945.4 4349904 213570 4047 - ofh.write("All Mapped Reads%s%s\n" % (SEP, items[2])) - elif i == 1: - # * 0 0 82774 - unmapped_reads = int(items[3]) - ofh.write("Unmapped Reads%s%d\n" % (SEP, unmapped_reads)) - percent_str = "Unmapped Reads Percentage of Total" - if unmapped_reads > 0: - unmapped_reads_percentage = "{:10.2f}".format(unmapped_reads / total_reads) - ofh.write("%s%s%s\n" % (percent_str, SEP, unmapped_reads_percentage)) - else: - ofh.write("%s%s0\n" % (percent_str, SEP)) - with open(metrics_file, "r") as ifh: - for i, line in enumerate(ifh): - if i == 0: - # Skip comments. - continue - items = line.split("\t") - if i == 1: - # MarkDuplicates 10.338671 98.74% - ofh.write("Average Depth of Coverage%s%s\n" % (SEP, items[2])) - ofh.write("Reference with Coverage%s%s\n" % (SEP, items[3])) - elif i == 2: - # VCFfilter 611 - ofh.write("Good SNP Count%s%s\n" % (SEP, items[1])) - task_queue.task_done() +def process_idxstats_file(idxstats_file): + all_mapped_reads = 0 + unmapped_reads = 0 + with open(idxstats_file, "r") as fh: + for i, line in enumerate(fh): + items = line.split("\t") + if i == 0: + # NC_002945.4 4349904 213570 4047 + all_mapped_reads = int(items[2]) + elif i == 1: + # * 0 0 82774 + unmapped_reads = int(items[3]) + return all_mapped_reads, unmapped_reads -def set_num_cpus(num_files, processes): - num_cpus = int(multiprocessing.cpu_count()) - if num_files < num_cpus and num_files < processes: - return num_files - if num_cpus < processes: - half_cpus = int(num_cpus / 2) - if num_files < half_cpus: - return num_files - return half_cpus - return processes +def process_metrics_file(metrics_file): + ref_with_coverage = '0%' + avg_depth_of_coverage = 0 + good_snp_count = 0 + with open(metrics_file, "r") as ifh: + for i, line in enumerate(ifh): + if i == 0: + # Skip comments. + continue + items = line.split("\t") + if i == 1: + # MarkDuplicates 10.338671 98.74% + ref_with_coverage = items[3] + avg_depth_of_coverage = items[2] + elif i == 2: + # VCFfilter 611 + good_snp_count = items[1] + return ref_with_coverage, avg_depth_of_coverage, good_snp_count if __name__ == '__main__': @@ -177,60 +167,36 @@ parser.add_argument('--dbkey', action='store', dest='dbkey', help='Reference dbkey') parser.add_argument('--gzipped', action='store', dest='gzipped', help='Input files are gzipped') parser.add_argument('--samtools_idxstats', action='store', dest='samtools_idxstats', required=False, default=None, help='Output of samtools_idxstats') - parser.add_argument('--output', action='store', dest='output', required=False, default=None, help='Output statisticsfile') + parser.add_argument('--output', action='store', dest='output', help='Output Excel statistics file') parser.add_argument('--vsnp_azc', action='store', dest='vsnp_azc', required=False, default=None, help='Output of vsnp_add_zero_coverage') - parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting') args = parser.parse_args() + print("args:\n%s\n" % str(args)) reads_files = [] idxstats_files = [] metrics_files = [] - output_files = [] - if args.output is not None: - # The inputs were not dataset collections, so + # Accumulate inputs. + if args.read1 is not None: + # The inputs are not dataset collections, so # read1, read2 (possibly) and vsnp_azc will also # not be None. - collection = False reads_files.append(args.read1) idxstats_files.append(args.samtools_idxstats) metrics_files.append(args.vsnp_azc) - output_files.append(args.output) + if args.read2 is not None: + reads_files.append(args.read2) + idxstats_files.append(args.samtools_idxstats) + metrics_files.append(args.vsnp_azc) else: - collection = True for file_name in sorted(os.listdir(INPUT_READS_DIR)): file_path = os.path.abspath(os.path.join(INPUT_READS_DIR, file_name)) reads_files.append(file_path) base_file_name = get_base_file_name(file_path) - output_files.append(os.path.abspath(os.path.join(OUTPUT_DIR, "%s.tabular" % base_file_name))) for file_name in sorted(os.listdir(INPUT_IDXSTATS_DIR)): file_path = os.path.abspath(os.path.join(INPUT_IDXSTATS_DIR, file_name)) idxstats_files.append(file_path) for file_name in sorted(os.listdir(INPUT_METRICS_DIR)): file_path = os.path.abspath(os.path.join(INPUT_METRICS_DIR, file_name)) metrics_files.append(file_path) - - multiprocessing.set_start_method('spawn') - queue1 = multiprocessing.JoinableQueue() - num_files = len(output_files) - cpus = set_num_cpus(num_files, args.processes) - # Set a timeout for get()s in the queue. - timeout = 0.05 - - for i, output_file in enumerate(output_files): - read_file = reads_files[i] - idxstats_file = idxstats_files[i] - metrics_file = metrics_files[i] - queue1.put((read_file, idxstats_file, metrics_file, output_file)) - - # Complete the output_statistics task. - processes = [multiprocessing.Process(target=output_statistics, args=(queue1, args.read2, collection, args.gzipped, args.dbkey, timeout, )) for _ in range(cpus)] - for p in processes: - p.start() - for p in processes: - p.join() - queue1.join() - - if queue1.empty(): - queue1.close() - queue1.join_thread() + output_statistics(reads_files, idxstats_files, metrics_files, args.output, args.gzipped, args.dbkey)