Mercurial > repos > p.lucas > get_coverage_stats
comparison get_cov_stats.py @ 1:e5dc0426ca39 draft default tip
Uploaded
author | p.lucas |
---|---|
date | Tue, 11 Jun 2024 12:28:27 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:2e05d9335bae | 1:e5dc0426ca39 |
---|---|
1 # -*- coding: utf-8 -*- | |
2 import os | |
3 import re | |
4 import argparse | |
5 import sys | |
6 import subprocess | |
7 | |
8 from subprocess import Popen, PIPE | |
9 | |
10 def get_contigs(bam): | |
11 header, err = Popen(["samtools", "view", "-H", bam], stdout=PIPE, stderr=PIPE).communicate() | |
12 headers = header.decode("utf-8").splitlines() | |
13 contigs = {} | |
14 for h in headers: | |
15 x = re.findall(r"@SQ\WSN:([A-Za-z0-9_\.\|-]*)\WLN:([0-9]+)", h) | |
16 if x != []: | |
17 contigs[x[0][0]] = int(x[0][1]) | |
18 return contigs | |
19 | |
20 | |
21 def coverage(bam, paired): | |
22 if os.path.isfile(bam) is False: | |
23 raise Exception("Bam file does not exist") | |
24 subprocess.run(f"samtools index {bam}", shell=True) | |
25 contigs = get_contigs(bam) | |
26 coverage_dict = {} | |
27 command = "samtools stats %s | grep \"raw total seq\"" % (bam) | |
28 line = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") | |
29 line = line.rstrip("\n").split("\t") | |
30 nb_reads = int(line[2]) | |
31 | |
32 for c in contigs.keys(): | |
33 c_escaped = re.escape(c) | |
34 sam_command = f"samtools depth -d 0 -r {c_escaped} {bam}" | |
35 command = f"{sam_command} | awk '{{sum+=$3;cnt++}}END{{print cnt \"\t\" sum}}'" | |
36 coverage_dict[c] = {} | |
37 com = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") | |
38 com = com.rstrip("\n").split("\t") | |
39 command = "samtools view -c -F 0x904 %s %s " % (bam, c_escaped) | |
40 mapped_read = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8") | |
41 if not com[0]: | |
42 com[0] = 0 | |
43 if not com[1]: | |
44 com[1] = 0 | |
45 # print("This is c: "+c) | |
46 # if isinstance(com[0], int) == False: | |
47 # com[0] = 0 | |
48 # if isinstance(com[1], int) == False: | |
49 # com[1] = 0 | |
50 # print("This is com 0: "+str(com[0])) | |
51 # print("This is com 1: "+str(com[1])) | |
52 coverage_dict[c]["Bases Mapped"] = int(com[0]) | |
53 coverage_dict[c]["Sum of Depths"] = int(com[1]) | |
54 coverage_dict[c]["Breadth of Coverage"] = coverage_dict[c]["Bases Mapped"] / float(contigs[c]) | |
55 coverage_dict[c]["Depth of Coverage"] = coverage_dict[c]["Sum of Depths"] / float(contigs[c]) | |
56 coverage_dict[c]["Length"] = int(contigs[c]) | |
57 coverage_dict[c]["Mapped Reads"] = int(mapped_read) | |
58 # print("cov"+str(coverage_dict[c]["Depth of Coverage"])) | |
59 if coverage_dict[c]["Depth of Coverage"] > 0: | |
60 coverage_dict[c]["N Reads covX"] = 80 * nb_reads / coverage_dict[c]["Depth of Coverage"] | |
61 if paired: | |
62 coverage_dict[c]["N Reads covX"] = coverage_dict[c]["N Reads covX"] / 2.0 | |
63 else: | |
64 coverage_dict[c]["N Reads covX"] = nb_reads | |
65 | |
66 genome_length = float(sum(contigs.values())) | |
67 coverage_dict["genome"] = {} | |
68 coverage_dict["genome"]["Length"] = int(genome_length) | |
69 coverage_dict["genome"]["Total Reads"] = int(nb_reads) | |
70 bases_mapped = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k != "genome"]) | |
71 coverage_dict["genome"]["Bases Mapped"] = bases_mapped | |
72 sum_of_depth = sum([x["Sum of Depths"] for k, x in coverage_dict.items() if k != "genome"]) | |
73 coverage_dict["genome"]["Sum of Depths"] = sum_of_depth | |
74 # brea = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k!="genome"])/ genome_length | |
75 # coverage_dict["genome"]["Breadth of Coverage"] = brea | |
76 if (genome_length == 0): | |
77 sys.exit("genome_length is 0 line 61") | |
78 coverage_dict["genome"]["Depth of Coverage"] = float(sum_of_depth)/float(genome_length) | |
79 cov_dict = coverage_dict["genome"]["Depth of Coverage"] | |
80 mapped_reads = sum([x["Mapped Reads"] for k, x in coverage_dict.items() if k != "genome"]) | |
81 coverage_dict["genome"]["Mapped Reads"] = mapped_reads | |
82 if coverage_dict["genome"]["Depth of Coverage"] > 0: | |
83 coverage_dict["genome"]["N Reads for assembly"] = round(80 * nb_reads / cov_dict) | |
84 else: | |
85 coverage_dict["genome"]["N Reads for assembly"] = nb_reads | |
86 if paired: | |
87 n_reads = round(coverage_dict["genome"]["N Reads for assembly"] / 2.0) | |
88 coverage_dict["genome"]["N Reads for assembly"] = n_reads | |
89 | |
90 if coverage_dict["genome"]["N Reads for assembly"] > nb_reads: | |
91 coverage_dict["genome"]["N Reads for assembly"] = nb_reads | |
92 | |
93 coverage = [] | |
94 for k, v in coverage_dict.items(): | |
95 print(k) | |
96 for x in v.items(): | |
97 print("%s\t%s" % (x[0], str(x[1]))) | |
98 coverage += [(k, x[0], x[1])] | |
99 | |
100 return coverage | |
101 | |
102 | |
103 def __main__(): | |
104 parser = argparse.ArgumentParser(description="""Calculate coverage from bam""") | |
105 parser.add_argument("-b", "-bam", dest="bam", help="bam file") | |
106 parser.add_argument("-p", "-paired", dest="paired", help="if reads paired", action="store_true") | |
107 options = parser.parse_args() | |
108 | |
109 # Check options: | |
110 if len(sys.argv) < 3 or len(sys.argv) > 4: | |
111 parser.print_help() | |
112 sys.exit(1) | |
113 | |
114 paired = False | |
115 if options.paired: | |
116 paired = True | |
117 bam = options.bam | |
118 cov = coverage(bam, paired) | |
119 | |
120 | |
121 if __name__ == "__main__": | |
122 __main__() |