Mercurial > repos > p.lucas > get_coverage_stats
view get_cov_stats.py @ 1:e5dc0426ca39 draft default tip
Uploaded
author | p.lucas |
---|---|
date | Tue, 11 Jun 2024 12:28:27 +0000 |
parents | |
children |
line wrap: on
line source
# -*- coding: utf-8 -*- import os import re import argparse import sys import subprocess from subprocess import Popen, PIPE def get_contigs(bam): header, err = Popen(["samtools", "view", "-H", bam], stdout=PIPE, stderr=PIPE).communicate() headers = header.decode("utf-8").splitlines() contigs = {} for h in headers: x = re.findall(r"@SQ\WSN:([A-Za-z0-9_\.\|-]*)\WLN:([0-9]+)", h) if x != []: contigs[x[0][0]] = int(x[0][1]) return contigs def coverage(bam, paired): if os.path.isfile(bam) is False: raise Exception("Bam file does not exist") subprocess.run(f"samtools index {bam}", shell=True) contigs = get_contigs(bam) coverage_dict = {} command = "samtools stats %s | grep \"raw total seq\"" % (bam) line = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") line = line.rstrip("\n").split("\t") nb_reads = int(line[2]) for c in contigs.keys(): c_escaped = re.escape(c) sam_command = f"samtools depth -d 0 -r {c_escaped} {bam}" command = f"{sam_command} | awk '{{sum+=$3;cnt++}}END{{print cnt \"\t\" sum}}'" coverage_dict[c] = {} com = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") com = com.rstrip("\n").split("\t") command = "samtools view -c -F 0x904 %s %s " % (bam, c_escaped) mapped_read = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") if not com[0]: com[0] = 0 if not com[1]: com[1] = 0 # print("This is c: "+c) # if isinstance(com[0], int) == False: # com[0] = 0 # if isinstance(com[1], int) == False: # com[1] = 0 # print("This is com 0: "+str(com[0])) # print("This is com 1: "+str(com[1])) coverage_dict[c]["Bases Mapped"] = int(com[0]) coverage_dict[c]["Sum of Depths"] = int(com[1]) coverage_dict[c]["Breadth of Coverage"] = coverage_dict[c]["Bases Mapped"] / float(contigs[c]) coverage_dict[c]["Depth of Coverage"] = coverage_dict[c]["Sum of Depths"] / float(contigs[c]) coverage_dict[c]["Length"] = int(contigs[c]) coverage_dict[c]["Mapped Reads"] = int(mapped_read) # print("cov"+str(coverage_dict[c]["Depth of Coverage"])) if coverage_dict[c]["Depth of Coverage"] > 0: coverage_dict[c]["N Reads covX"] = 80 * nb_reads / coverage_dict[c]["Depth of Coverage"] if paired: coverage_dict[c]["N Reads covX"] = coverage_dict[c]["N Reads covX"] / 2.0 else: coverage_dict[c]["N Reads covX"] = nb_reads genome_length = float(sum(contigs.values())) coverage_dict["genome"] = {} coverage_dict["genome"]["Length"] = int(genome_length) coverage_dict["genome"]["Total Reads"] = int(nb_reads) bases_mapped = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k != "genome"]) coverage_dict["genome"]["Bases Mapped"] = bases_mapped sum_of_depth = sum([x["Sum of Depths"] for k, x in coverage_dict.items() if k != "genome"]) coverage_dict["genome"]["Sum of Depths"] = sum_of_depth # brea = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k!="genome"])/ genome_length # coverage_dict["genome"]["Breadth of Coverage"] = brea if (genome_length == 0): sys.exit("genome_length is 0 line 61") coverage_dict["genome"]["Depth of Coverage"] = float(sum_of_depth)/float(genome_length) cov_dict = coverage_dict["genome"]["Depth of Coverage"] mapped_reads = sum([x["Mapped Reads"] for k, x in coverage_dict.items() if k != "genome"]) coverage_dict["genome"]["Mapped Reads"] = mapped_reads if coverage_dict["genome"]["Depth of Coverage"] > 0: coverage_dict["genome"]["N Reads for assembly"] = round(80 * nb_reads / cov_dict) else: coverage_dict["genome"]["N Reads for assembly"] = nb_reads if paired: n_reads = round(coverage_dict["genome"]["N Reads for assembly"] / 2.0) coverage_dict["genome"]["N Reads for assembly"] = n_reads if coverage_dict["genome"]["N Reads for assembly"] > nb_reads: coverage_dict["genome"]["N Reads for assembly"] = nb_reads coverage = [] for k, v in coverage_dict.items(): print(k) for x in v.items(): print("%s\t%s" % (x[0], str(x[1]))) coverage += [(k, x[0], x[1])] return coverage def __main__(): parser = argparse.ArgumentParser(description="""Calculate coverage from bam""") parser.add_argument("-b", "-bam", dest="bam", help="bam file") parser.add_argument("-p", "-paired", dest="paired", help="if reads paired", action="store_true") options = parser.parse_args() # Check options: if len(sys.argv) < 3 or len(sys.argv) > 4: parser.print_help() sys.exit(1) paired = False if options.paired: paired = True bam = options.bam cov = coverage(bam, paired) if __name__ == "__main__": __main__()