Mercurial > repos > mheinzl > variant_analyzer2
diff mut2sscs.py @ 78:fdfe9a919ff7 draft
planemo upload for repository https://github.com/Single-Molecule-Genetics/VariantAnalyzerGalaxy/tree/master/tools/variant_analyzer commit ee4a8e6cf290e6c8a4d55f9cd2839d60ab3b11c8-dirty
author | mheinzl |
---|---|
date | Fri, 22 Jul 2022 09:19:44 +0000 |
parents | 6ccff403db8a |
children |
line wrap: on
line diff
--- a/mut2sscs.py Mon Mar 29 09:22:57 2021 +0000 +++ b/mut2sscs.py Fri Jul 22 09:19:44 2022 +0000 @@ -23,6 +23,7 @@ import argparse import json import os +import re import sys import numpy as np @@ -71,60 +72,111 @@ continue else: alt = variant.ALT[0] + alt = alt.upper() + ref = ref.upper() + if "N" in alt: # skip indels with N in alt allele --> it is not an indel but just a mismatch at the position where the N is (checked this in IGV) + continue chrom_stop_pos = str(chrom) + "#" + str(stop_pos) + "#" + ref + "#" + alt - if len(ref) == len(alt): - for pileupcolumn in bam.pileup(chrom, stop_pos - 1, stop_pos + 1, max_depth=1000000000): - if pileupcolumn.reference_pos == stop_pos: - count_alt = 0 - count_ref = 0 - count_indel = 0 - print("unfiltered reads=", pileupcolumn.n, "filtered reads=", len(pileupcolumn.pileups), - "difference= ", len(pileupcolumn.pileups) - pileupcolumn.n) - for pileupread in pileupcolumn.pileups: - if not pileupread.is_del and not pileupread.is_refskip: - tag = pileupread.alignment.query_name - abba = tag[-2:] - # query position is None if is_del or is_refskip is set. - if pileupread.alignment.query_sequence[pileupread.query_position] == alt: - count_alt += 1 - if chrom_stop_pos in mut_pos_dict: - if abba in mut_pos_dict[chrom_stop_pos]: - mut_pos_dict[chrom_stop_pos][abba] += 1 + for pileupcolumn in bam.pileup(chrom, stop_pos - 1, stop_pos + 1, max_depth=1000000000): + if pileupcolumn.reference_pos == stop_pos: + count_alt = 0 + count_ref = 0 + count_indel = 0 + for pileupread in pileupcolumn.pileups: + if not pileupread.is_refskip: + tag = pileupread.alignment.query_name + abba = tag[-2:] + if pileupread.is_del: + p = pileupread.query_position_or_next + e = p + len(alt) - 1 + else: + p = pileupread.query_position + e = p + len(alt) + tag = pileupread.alignment.query_name + if "_" in tag: + tag = re.split('_', tag)[0] + s = p + split_cigar = re.split('(\d+)', pileupread.alignment.cigarstring) + if len(ref) < len(alt): # insertion + if "I" in split_cigar: + all_insertions = [inser_i for inser_i, ins in enumerate(split_cigar) if ins == "I"] + for ai in all_insertions: # if multiple insertions in DCS + ins_index = [int(ci) for ci in split_cigar[:ai - 1] if ci.isdigit()] + ins_count = split_cigar[ai - 1] # nr of insertions should match with alt allele + if "I" in split_cigar and sum(ins_index) == p + 1 and len(alt) - 1 == int(ins_count): + nuc = pileupread.alignment.query_sequence[s:e] + break else: - mut_pos_dict[chrom_stop_pos][abba] = 1 - else: - mut_pos_dict[chrom_stop_pos] = {} - mut_pos_dict[chrom_stop_pos][abba] = 1 - if chrom_stop_pos not in ref_pos_dict: - ref_pos_dict[chrom_stop_pos] = {} - ref_pos_dict[chrom_stop_pos][abba] = 0 + nuc = pileupread.alignment.query_sequence[s] + else: + nuc = pileupread.alignment.query_sequence[s] + elif len(ref) > len(alt): # deletion + ref_positions = pileupread.alignment.get_reference_positions(full_length=True)[s:p + len(ref)] + if "D" in split_cigar: + all_deletions = [del_i for del_i, dele in enumerate(split_cigar) if dele == "D"] + for di, ai in enumerate(all_deletions): # if multiple insertions in DCS + if di > 0: # more than 1 deletion, don't count previous deletion to position + all_deletions_mod = split_cigar[:ai - 1] + prev_del_idx = [all_deletions_mod.index("D") - 1, all_deletions_mod.index("D")] + split_cigar_no_prev = [ad for i, ad in enumerate(all_deletions_mod) if i not in prev_del_idx] + del_index = [int(ci) for ci in split_cigar_no_prev[:ai - 1] if ci.isdigit()] + else: # first deletion in read, sum all previous (mis)matches and insertions to position + del_index = [int(ci) for ci in split_cigar[:ai - 1] if ci.isdigit()] + del_count = split_cigar[ai - 1] # nr of deletions should match with ref allele + if "D" in split_cigar and sum(del_index) == p + 1 and len(ref) - 1 == int(del_count): + nuc = pileupread.alignment.query_sequence[s:e] + if nuc == "": + nuc = str(alt) + break + else: + nuc = pileupread.alignment.query_sequence[s:s + len(ref)] + elif len(ref_positions) < len(ref): # DCS has reference but the position is at the very end of the DCS and therefore not the full reference positions are there + nuc = pileupread.alignment.get_reference_sequence()[s:s + len(ref)] + if nuc.upper() == ref[:len(nuc)]: + nuc = str(ref) + else: + nuc = pileupread.alignment.query_sequence[s:s + len(ref)] + else: # SNV: query position is None if is_del or is_refskip is set. + nuc = pileupread.alignment.query_sequence[s] - elif pileupread.alignment.query_sequence[pileupread.query_position] == ref: - count_ref += 1 - if chrom_stop_pos in ref_pos_dict: - if abba in ref_pos_dict[chrom_stop_pos]: - ref_pos_dict[chrom_stop_pos][abba] += 1 - else: - ref_pos_dict[chrom_stop_pos][abba] = 1 + nuc = nuc.upper() + + if nuc == alt: + count_alt += 1 + if chrom_stop_pos in mut_pos_dict: + if abba in mut_pos_dict[chrom_stop_pos]: + mut_pos_dict[chrom_stop_pos][abba] += 1 else: - ref_pos_dict[chrom_stop_pos] = {} + mut_pos_dict[chrom_stop_pos][abba] = 1 + else: + mut_pos_dict[chrom_stop_pos] = {} + mut_pos_dict[chrom_stop_pos][abba] = 1 + if chrom_stop_pos not in ref_pos_dict: + ref_pos_dict[chrom_stop_pos] = {} + ref_pos_dict[chrom_stop_pos][abba] = 0 + elif nuc == ref: + count_ref += 1 + if chrom_stop_pos in ref_pos_dict: + if abba in ref_pos_dict[chrom_stop_pos]: + ref_pos_dict[chrom_stop_pos][abba] += 1 + else: ref_pos_dict[chrom_stop_pos][abba] = 1 else: - count_indel += 1 - - print("coverage at pos %s = %s, ref = %s, alt = %s, indel = %s,\n" % - (pileupcolumn.pos, count_ref + count_alt, count_ref, count_alt, count_indel)) + ref_pos_dict[chrom_stop_pos] = {} + ref_pos_dict[chrom_stop_pos][abba] = 1 + else: + count_indel += 1 + print("coverage at pos %s = %s, ref = %s, alt = %s, other = %s,\n" % + (pileupcolumn.pos, count_ref + count_alt, count_ref, count_alt, count_indel)) - # if mutation is in DCS file but not in SSCS, then set counts to NA - if chrom_stop_pos not in mut_pos_dict.keys(): - mut_pos_dict[chrom_stop_pos] = {} - mut_pos_dict[chrom_stop_pos]["ab"] = 0 - mut_pos_dict[chrom_stop_pos]["ba"] = 0 - ref_pos_dict[chrom_stop_pos] = {} - ref_pos_dict[chrom_stop_pos]["ab"] = 0 - ref_pos_dict[chrom_stop_pos]["ba"] = 0 - else: - print("indels are currently not evaluated") + # if mutation is in DCS file but not in SSCS, then set counts to NA + if chrom_stop_pos not in mut_pos_dict.keys(): + mut_pos_dict[chrom_stop_pos] = {} + mut_pos_dict[chrom_stop_pos]["ab"] = 0 + mut_pos_dict[chrom_stop_pos]["ba"] = 0 + ref_pos_dict[chrom_stop_pos] = {} + ref_pos_dict[chrom_stop_pos]["ab"] = 0 + ref_pos_dict[chrom_stop_pos]["ba"] = 0 bam.close() # save counts