view get_cov_stats.py @ 1:e5dc0426ca39 draft default tip

Uploaded
author p.lucas
date Tue, 11 Jun 2024 12:28:27 +0000
parents
children
line wrap: on
line source

# -*- coding: utf-8 -*-
import os
import re
import argparse
import sys
import subprocess

from subprocess import Popen, PIPE

def get_contigs(bam):
    header, err = Popen(["samtools", "view", "-H", bam], stdout=PIPE, stderr=PIPE).communicate()
    headers = header.decode("utf-8").splitlines()
    contigs = {}
    for h in headers:
        x = re.findall(r"@SQ\WSN:([A-Za-z0-9_\.\|-]*)\WLN:([0-9]+)", h)
        if x != []:
            contigs[x[0][0]] = int(x[0][1])
    return contigs


def coverage(bam, paired):
  if os.path.isfile(bam) is False:
    raise Exception("Bam file does not exist")
  subprocess.run(f"samtools index {bam}", shell=True)
  contigs = get_contigs(bam)
  coverage_dict = {}
  command = "samtools stats %s | grep \"raw total seq\"" % (bam)
  line = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8")
  line = line.rstrip("\n").split("\t")
  nb_reads = int(line[2])

  for c in contigs.keys():
    c_escaped = re.escape(c)
    sam_command = f"samtools depth -d 0 -r {c_escaped} {bam}"
    command = f"{sam_command} | awk '{{sum+=$3;cnt++}}END{{print cnt \"\t\" sum}}'"
    coverage_dict[c] = {}
    com = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8")
    com = com.rstrip("\n").split("\t")
    command = "samtools view -c -F 0x904 %s %s " % (bam, c_escaped)
    mapped_read = Popen(command, stdout=PIPE, shell=True).communicate()[0].decode("utf-8")
    if not com[0]:
      com[0] = 0
    if not com[1]:
      com[1] = 0
    # print("This is c: "+c)
    # if isinstance(com[0], int) == False:
    #   com[0] = 0
    # if isinstance(com[1], int) == False:
    #   com[1] = 0
    # print("This is com 0: "+str(com[0]))
    # print("This is com 1: "+str(com[1]))
    coverage_dict[c]["Bases Mapped"] = int(com[0])
    coverage_dict[c]["Sum of Depths"] = int(com[1])
    coverage_dict[c]["Breadth of Coverage"] = coverage_dict[c]["Bases Mapped"] / float(contigs[c])
    coverage_dict[c]["Depth of Coverage"] = coverage_dict[c]["Sum of Depths"] / float(contigs[c])
    coverage_dict[c]["Length"] = int(contigs[c])
    coverage_dict[c]["Mapped Reads"] = int(mapped_read)
    # print("cov"+str(coverage_dict[c]["Depth of Coverage"]))
    if coverage_dict[c]["Depth of Coverage"] > 0:
      coverage_dict[c]["N Reads covX"] = 80 * nb_reads / coverage_dict[c]["Depth of Coverage"]
      if paired:
        coverage_dict[c]["N Reads covX"] = coverage_dict[c]["N Reads covX"] / 2.0
    else:
      coverage_dict[c]["N Reads covX"] = nb_reads

  genome_length = float(sum(contigs.values()))
  coverage_dict["genome"] = {}
  coverage_dict["genome"]["Length"] = int(genome_length)
  coverage_dict["genome"]["Total Reads"] = int(nb_reads)
  bases_mapped = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k != "genome"])
  coverage_dict["genome"]["Bases Mapped"] = bases_mapped
  sum_of_depth = sum([x["Sum of Depths"] for k, x in coverage_dict.items() if k != "genome"])
  coverage_dict["genome"]["Sum of Depths"] = sum_of_depth
  # brea = sum([x["Bases Mapped"] for k, x in coverage_dict.items() if k!="genome"])/ genome_length
  # coverage_dict["genome"]["Breadth of Coverage"] = brea
  if (genome_length == 0):
    sys.exit("genome_length is 0 line 61")
  coverage_dict["genome"]["Depth of Coverage"] = float(sum_of_depth)/float(genome_length)
  cov_dict = coverage_dict["genome"]["Depth of Coverage"]
  mapped_reads = sum([x["Mapped Reads"] for k, x in coverage_dict.items() if k != "genome"])
  coverage_dict["genome"]["Mapped Reads"] = mapped_reads
  if coverage_dict["genome"]["Depth of Coverage"] > 0:
    coverage_dict["genome"]["N Reads for assembly"] = round(80 * nb_reads / cov_dict)
  else:
    coverage_dict["genome"]["N Reads for assembly"] = nb_reads
  if paired:
    n_reads = round(coverage_dict["genome"]["N Reads for assembly"] / 2.0)
    coverage_dict["genome"]["N Reads for assembly"] = n_reads

  if coverage_dict["genome"]["N Reads for assembly"] > nb_reads:
    coverage_dict["genome"]["N Reads for assembly"] = nb_reads

  coverage = []
  for k, v in coverage_dict.items():
    print(k)
    for x in v.items():
      print("%s\t%s" % (x[0], str(x[1])))
      coverage += [(k, x[0], x[1])]

  return coverage


def __main__():
  parser = argparse.ArgumentParser(description="""Calculate coverage from bam""")
  parser.add_argument("-b", "-bam", dest="bam", help="bam file")
  parser.add_argument("-p", "-paired", dest="paired", help="if reads paired", action="store_true")
  options = parser.parse_args()

  # Check options:
  if len(sys.argv) < 3 or len(sys.argv) > 4:
    parser.print_help()
    sys.exit(1)

  paired = False
  if options.paired:
    paired = True
  bam = options.bam
  cov = coverage(bam, paired)


if __name__ == "__main__":
  __main__()