Mercurial > repos > p.lucas > get_coverage_stats
changeset 1:e5dc0426ca39 draft default tip
Uploaded
author | p.lucas |
---|---|
date | Tue, 11 Jun 2024 12:28:27 +0000 |
parents | 2e05d9335bae |
children | |
files | get_cov_stats.py |
diffstat | 1 files changed, 122 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/get_cov_stats.py Tue Jun 11 12:28:27 2024 +0000 @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +import os +import re +import argparse +import sys +import subprocess + +from subprocess import Popen, PIPE + +def get_contigs(bam): + header, err = Popen(["samtools", "view", "-H", bam], stdout=PIPE, stderr=PIPE).communicate() + headers = header.decode("utf-8").splitlines() + contigs = {} + for h in headers: + x = re.findall(r"@SQ\WSN:([A-Za-z0-9_\.\|-]*)\WLN:([0-9]+)", h) + if x != []: + contigs[x[0][0]] = int(x[0][1]) + return contigs + + +def coverage(bam, paired): + if os.path.isfile(bam) is False: + raise Exception("Bam file does not exist") + subprocess.run(f"samtools index {bam}", shell=True) + contigs = get_contigs(bam) + coverage_dict = {} + command = "samtools stats %s | grep \"raw total seq\"" % (bam) + line = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") + line = line.rstrip("\n").split("\t") + nb_reads = int(line[2]) + + for c in contigs.keys(): + c_escaped = re.escape(c) + sam_command = f"samtools depth -d 0 -r {c_escaped} {bam}" + command = f"{sam_command} | awk '{{sum+=$3;cnt++}}END{{print cnt \"\t\" sum}}'" + coverage_dict[c] = {} + com = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") + com = com.rstrip("\n").split("\t") + command = "samtools view -c -F 0x904 %s %s " % (bam, c_escaped) + mapped_read = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") + if not com[0]: + com[0] = 0 + if not com[1]: + com[1] = 0 + # print("This is c: "+c) + # if isinstance(com[0], int) == False: + # com[0] = 0 + # if isinstance(com[1], int) == False: + # com[1] = 0 + # print("This is com 0: "+str(com[0])) + # print("This is com 1: "+str(com[1])) + coverage_dict[c]["Bases Mapped"] = int(com[0]) + coverage_dict[c]["Sum of Depths"] = int(com[1]) + coverage_dict[c]["Breadth of Coverage"] = coverage_dict[c]["Bases Mapped"] / float(contigs[c]) + coverage_dict[c]["Depth of Coverage"] = coverage_dict[c]["Sum of Depths"] / float(contigs[c]) + coverage_dict[c]["Length"] = int(contigs[c]) + coverage_dict[c]["Mapped Reads"] = int(mapped_read) + # print("cov"+str(coverage_dict[c]["Depth of Coverage"])) + if coverage_dict[c]["Depth of Coverage"] > 0: + coverage_dict[c]["N Reads covX"] = 80 * nb_reads / coverage_dict[c]["Depth of Coverage"] + if paired: + coverage_dict[c]["N Reads covX"] = coverage_dict[c]["N Reads covX"] / 2.0 + else: + coverage_dict[c]["N Reads covX"] = nb_reads + + genome_length = float(sum(contigs.values())) + coverage_dict["genome"] = {} + coverage_dict["genome"]["Length"] = int(genome_length) + coverage_dict["genome"]["Total Reads"] = int(nb_reads) + bases_mapped = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k != "genome"]) + coverage_dict["genome"]["Bases Mapped"] = bases_mapped + sum_of_depth = sum([x["Sum of Depths"] for k, x in coverage_dict.items() if k != "genome"]) + coverage_dict["genome"]["Sum of Depths"] = sum_of_depth + # brea = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k!="genome"])/ genome_length + # coverage_dict["genome"]["Breadth of Coverage"] = brea + if (genome_length == 0): + sys.exit("genome_length is 0 line 61") + coverage_dict["genome"]["Depth of Coverage"] = float(sum_of_depth)/float(genome_length) + cov_dict = coverage_dict["genome"]["Depth of Coverage"] + mapped_reads = sum([x["Mapped Reads"] for k, x in coverage_dict.items() if k != "genome"]) + coverage_dict["genome"]["Mapped Reads"] = mapped_reads + if coverage_dict["genome"]["Depth of Coverage"] > 0: + coverage_dict["genome"]["N Reads for assembly"] = round(80 * nb_reads / cov_dict) + else: + coverage_dict["genome"]["N Reads for assembly"] = nb_reads + if paired: + n_reads = round(coverage_dict["genome"]["N Reads for assembly"] / 2.0) + coverage_dict["genome"]["N Reads for assembly"] = n_reads + + if coverage_dict["genome"]["N Reads for assembly"] > nb_reads: + coverage_dict["genome"]["N Reads for assembly"] = nb_reads + + coverage = [] + for k, v in coverage_dict.items(): + print(k) + for x in v.items(): + print("%s\t%s" % (x[0], str(x[1]))) + coverage += [(k, x[0], x[1])] + + return coverage + + +def __main__(): + parser = argparse.ArgumentParser(description="""Calculate coverage from bam""") + parser.add_argument("-b", "-bam", dest="bam", help="bam file") + parser.add_argument("-p", "-paired", dest="paired", help="if reads paired", action="store_true") + options = parser.parse_args() + + # Check options: + if len(sys.argv) < 3 or len(sys.argv) > 4: + parser.print_help() + sys.exit(1) + + paired = False + if options.paired: + paired = True + bam = options.bam + cov = coverage(bam, paired) + + +if __name__ == "__main__": + __main__()