comparison get_cov_stats.py @ 1:e5dc0426ca39 draft default tip

Uploaded
author p.lucas
date Tue, 11 Jun 2024 12:28:27 +0000
parents
children
comparison
equal deleted inserted replaced
0:2e05d9335bae 1:e5dc0426ca39
1 # -*- coding: utf-8 -*-
2 import os
3 import re
4 import argparse
5 import sys
6 import subprocess
7
8 from subprocess import Popen, PIPE
9
10 def get_contigs(bam):
11 header, err = Popen(["samtools", "view", "-H", bam], stdout=PIPE, stderr=PIPE).communicate()
12 headers = header.decode("utf-8").splitlines()
13 contigs = {}
14 for h in headers:
15 x = re.findall(r"@SQ\WSN:([A-Za-z0-9_\.\|-]*)\WLN:([0-9]+)", h)
16 if x != []:
17 contigs[x[0][0]] = int(x[0][1])
18 return contigs
19
20
21 def coverage(bam, paired):
22 if os.path.isfile(bam) is False:
23 raise Exception("Bam file does not exist")
24 subprocess.run(f"samtools index {bam}", shell=True)
25 contigs = get_contigs(bam)
26 coverage_dict = {}
27 command = "samtools stats %s | grep \"raw total seq\"" % (bam)
28 line = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8")
29 line = line.rstrip("\n").split("\t")
30 nb_reads = int(line[2])
31
32 for c in contigs.keys():
33 c_escaped = re.escape(c)
34 sam_command = f"samtools depth -d 0 -r {c_escaped} {bam}"
35 command = f"{sam_command} | awk '{{sum+=$3;cnt++}}END{{print cnt \"\t\" sum}}'"
36 coverage_dict[c] = {}
37 com = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8")
38 com = com.rstrip("\n").split("\t")
39 command = "samtools view -c -F 0x904 %s %s " % (bam, c_escaped)
40 mapped_read = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8")
41 if not com[0]:
42 com[0] = 0
43 if not com[1]:
44 com[1] = 0
45 # print("This is c: "+c)
46 # if isinstance(com[0], int) == False:
47 # com[0] = 0
48 # if isinstance(com[1], int) == False:
49 # com[1] = 0
50 # print("This is com 0: "+str(com[0]))
51 # print("This is com 1: "+str(com[1]))
52 coverage_dict[c]["Bases Mapped"] = int(com[0])
53 coverage_dict[c]["Sum of Depths"] = int(com[1])
54 coverage_dict[c]["Breadth of Coverage"] = coverage_dict[c]["Bases Mapped"] / float(contigs[c])
55 coverage_dict[c]["Depth of Coverage"] = coverage_dict[c]["Sum of Depths"] / float(contigs[c])
56 coverage_dict[c]["Length"] = int(contigs[c])
57 coverage_dict[c]["Mapped Reads"] = int(mapped_read)
58 # print("cov"+str(coverage_dict[c]["Depth of Coverage"]))
59 if coverage_dict[c]["Depth of Coverage"] > 0:
60 coverage_dict[c]["N Reads covX"] = 80 * nb_reads / coverage_dict[c]["Depth of Coverage"]
61 if paired:
62 coverage_dict[c]["N Reads covX"] = coverage_dict[c]["N Reads covX"] / 2.0
63 else:
64 coverage_dict[c]["N Reads covX"] = nb_reads
65
66 genome_length = float(sum(contigs.values()))
67 coverage_dict["genome"] = {}
68 coverage_dict["genome"]["Length"] = int(genome_length)
69 coverage_dict["genome"]["Total Reads"] = int(nb_reads)
70 bases_mapped = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k != "genome"])
71 coverage_dict["genome"]["Bases Mapped"] = bases_mapped
72 sum_of_depth = sum([x["Sum of Depths"] for k, x in coverage_dict.items() if k != "genome"])
73 coverage_dict["genome"]["Sum of Depths"] = sum_of_depth
74 # brea = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k!="genome"])/ genome_length
75 # coverage_dict["genome"]["Breadth of Coverage"] = brea
76 if (genome_length == 0):
77 sys.exit("genome_length is 0 line 61")
78 coverage_dict["genome"]["Depth of Coverage"] = float(sum_of_depth)/float(genome_length)
79 cov_dict = coverage_dict["genome"]["Depth of Coverage"]
80 mapped_reads = sum([x["Mapped Reads"] for k, x in coverage_dict.items() if k != "genome"])
81 coverage_dict["genome"]["Mapped Reads"] = mapped_reads
82 if coverage_dict["genome"]["Depth of Coverage"] > 0:
83 coverage_dict["genome"]["N Reads for assembly"] = round(80 * nb_reads / cov_dict)
84 else:
85 coverage_dict["genome"]["N Reads for assembly"] = nb_reads
86 if paired:
87 n_reads = round(coverage_dict["genome"]["N Reads for assembly"] / 2.0)
88 coverage_dict["genome"]["N Reads for assembly"] = n_reads
89
90 if coverage_dict["genome"]["N Reads for assembly"] > nb_reads:
91 coverage_dict["genome"]["N Reads for assembly"] = nb_reads
92
93 coverage = []
94 for k, v in coverage_dict.items():
95 print(k)
96 for x in v.items():
97 print("%s\t%s" % (x[0], str(x[1])))
98 coverage += [(k, x[0], x[1])]
99
100 return coverage
101
102
103 def __main__():
104 parser = argparse.ArgumentParser(description="""Calculate coverage from bam""")
105 parser.add_argument("-b", "-bam", dest="bam", help="bam file")
106 parser.add_argument("-p", "-paired", dest="paired", help="if reads paired", action="store_true")
107 options = parser.parse_args()
108
109 # Check options:
110 if len(sys.argv) < 3 or len(sys.argv) > 4:
111 parser.print_help()
112 sys.exit(1)
113
114 paired = False
115 if options.paired:
116 paired = True
117 bam = options.bam
118 cov = coverage(bam, paired)
119
120
121 if __name__ == "__main__":
122 __main__()