Mercurial > repos > p.lucas > get_coverage_stats
comparison get_cov_stats.py @ 1:e5dc0426ca39 draft default tip
Uploaded
| author | p.lucas |
|---|---|
| date | Tue, 11 Jun 2024 12:28:27 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 0:2e05d9335bae | 1:e5dc0426ca39 |
|---|---|
| 1 # -*- coding: utf-8 -*- | |
| 2 import os | |
| 3 import re | |
| 4 import argparse | |
| 5 import sys | |
| 6 import subprocess | |
| 7 | |
| 8 from subprocess import Popen, PIPE | |
| 9 | |
| 10 def get_contigs(bam): | |
| 11 header, err = Popen(["samtools", "view", "-H", bam], stdout=PIPE, stderr=PIPE).communicate() | |
| 12 headers = header.decode("utf-8").splitlines() | |
| 13 contigs = {} | |
| 14 for h in headers: | |
| 15 x = re.findall(r"@SQ\WSN:([A-Za-z0-9_\.\|-]*)\WLN:([0-9]+)", h) | |
| 16 if x != []: | |
| 17 contigs[x[0][0]] = int(x[0][1]) | |
| 18 return contigs | |
| 19 | |
| 20 | |
| 21 def coverage(bam, paired): | |
| 22 if os.path.isfile(bam) is False: | |
| 23 raise Exception("Bam file does not exist") | |
| 24 subprocess.run(f"samtools index {bam}", shell=True) | |
| 25 contigs = get_contigs(bam) | |
| 26 coverage_dict = {} | |
| 27 command = "samtools stats %s | grep \"raw total seq\"" % (bam) | |
| 28 line = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") | |
| 29 line = line.rstrip("\n").split("\t") | |
| 30 nb_reads = int(line[2]) | |
| 31 | |
| 32 for c in contigs.keys(): | |
| 33 c_escaped = re.escape(c) | |
| 34 sam_command = f"samtools depth -d 0 -r {c_escaped} {bam}" | |
| 35 command = f"{sam_command} | awk '{{sum+=$3;cnt++}}END{{print cnt \"\t\" sum}}'" | |
| 36 coverage_dict[c] = {} | |
| 37 com = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") | |
| 38 com = com.rstrip("\n").split("\t") | |
| 39 command = "samtools view -c -F 0x904 %s %s " % (bam, c_escaped) | |
| 40 mapped_read = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") | |
| 41 if not com[0]: | |
| 42 com[0] = 0 | |
| 43 if not com[1]: | |
| 44 com[1] = 0 | |
| 45 # print("This is c: "+c) | |
| 46 # if isinstance(com[0], int) == False: | |
| 47 # com[0] = 0 | |
| 48 # if isinstance(com[1], int) == False: | |
| 49 # com[1] = 0 | |
| 50 # print("This is com 0: "+str(com[0])) | |
| 51 # print("This is com 1: "+str(com[1])) | |
| 52 coverage_dict[c]["Bases Mapped"] = int(com[0]) | |
| 53 coverage_dict[c]["Sum of Depths"] = int(com[1]) | |
| 54 coverage_dict[c]["Breadth of Coverage"] = coverage_dict[c]["Bases Mapped"] / float(contigs[c]) | |
| 55 coverage_dict[c]["Depth of Coverage"] = coverage_dict[c]["Sum of Depths"] / float(contigs[c]) | |
| 56 coverage_dict[c]["Length"] = int(contigs[c]) | |
| 57 coverage_dict[c]["Mapped Reads"] = int(mapped_read) | |
| 58 # print("cov"+str(coverage_dict[c]["Depth of Coverage"])) | |
| 59 if coverage_dict[c]["Depth of Coverage"] > 0: | |
| 60 coverage_dict[c]["N Reads covX"] = 80 * nb_reads / coverage_dict[c]["Depth of Coverage"] | |
| 61 if paired: | |
| 62 coverage_dict[c]["N Reads covX"] = coverage_dict[c]["N Reads covX"] / 2.0 | |
| 63 else: | |
| 64 coverage_dict[c]["N Reads covX"] = nb_reads | |
| 65 | |
| 66 genome_length = float(sum(contigs.values())) | |
| 67 coverage_dict["genome"] = {} | |
| 68 coverage_dict["genome"]["Length"] = int(genome_length) | |
| 69 coverage_dict["genome"]["Total Reads"] = int(nb_reads) | |
| 70 bases_mapped = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k != "genome"]) | |
| 71 coverage_dict["genome"]["Bases Mapped"] = bases_mapped | |
| 72 sum_of_depth = sum([x["Sum of Depths"] for k, x in coverage_dict.items() if k != "genome"]) | |
| 73 coverage_dict["genome"]["Sum of Depths"] = sum_of_depth | |
| 74 # brea = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k!="genome"])/ genome_length | |
| 75 # coverage_dict["genome"]["Breadth of Coverage"] = brea | |
| 76 if (genome_length == 0): | |
| 77 sys.exit("genome_length is 0 line 61") | |
| 78 coverage_dict["genome"]["Depth of Coverage"] = float(sum_of_depth)/float(genome_length) | |
| 79 cov_dict = coverage_dict["genome"]["Depth of Coverage"] | |
| 80 mapped_reads = sum([x["Mapped Reads"] for k, x in coverage_dict.items() if k != "genome"]) | |
| 81 coverage_dict["genome"]["Mapped Reads"] = mapped_reads | |
| 82 if coverage_dict["genome"]["Depth of Coverage"] > 0: | |
| 83 coverage_dict["genome"]["N Reads for assembly"] = round(80 * nb_reads / cov_dict) | |
| 84 else: | |
| 85 coverage_dict["genome"]["N Reads for assembly"] = nb_reads | |
| 86 if paired: | |
| 87 n_reads = round(coverage_dict["genome"]["N Reads for assembly"] / 2.0) | |
| 88 coverage_dict["genome"]["N Reads for assembly"] = n_reads | |
| 89 | |
| 90 if coverage_dict["genome"]["N Reads for assembly"] > nb_reads: | |
| 91 coverage_dict["genome"]["N Reads for assembly"] = nb_reads | |
| 92 | |
| 93 coverage = [] | |
| 94 for k, v in coverage_dict.items(): | |
| 95 print(k) | |
| 96 for x in v.items(): | |
| 97 print("%s\t%s" % (x[0], str(x[1]))) | |
| 98 coverage += [(k, x[0], x[1])] | |
| 99 | |
| 100 return coverage | |
| 101 | |
| 102 | |
| 103 def __main__(): | |
| 104 parser = argparse.ArgumentParser(description="""Calculate coverage from bam""") | |
| 105 parser.add_argument("-b", "-bam", dest="bam", help="bam file") | |
| 106 parser.add_argument("-p", "-paired", dest="paired", help="if reads paired", action="store_true") | |
| 107 options = parser.parse_args() | |
| 108 | |
| 109 # Check options: | |
| 110 if len(sys.argv) < 3 or len(sys.argv) > 4: | |
| 111 parser.print_help() | |
| 112 sys.exit(1) | |
| 113 | |
| 114 paired = False | |
| 115 if options.paired: | |
| 116 paired = True | |
| 117 bam = options.bam | |
| 118 cov = coverage(bam, paired) | |
| 119 | |
| 120 | |
| 121 if __name__ == "__main__": | |
| 122 __main__() |
