Mercurial > repos > iuc > medaka_variant_pipeline
view convert_VCF_info_fields.py @ 13:3fbefde449bc draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/medaka commit 52289bc7b99bfa8a3bda46cb35cea98399419dab"
author | iuc |
---|---|
date | Thu, 18 Nov 2021 20:01:04 +0000 |
parents | 597407d61386 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Takes in VCF file annotated with medaka tools annotate and converts # # Usage statement: # python convert_VCF_info_fields.py in_vcf.vcf out_vcf.vcf # 10/21/2020 - Nathan P. Roach, natproach@gmail.com import re import sys from collections import OrderedDict from math import log10 import scipy.stats def pval_to_phredqual(pval): try: ret = round(-10 * log10(pval)) except ValueError: ret = 2147483647 # transform pval of 0.0 to max signed 32 bit int return ret def parseInfoField(info): info_fields = info.split(";") info_dict = OrderedDict() for info_field in info_fields: code, val = info_field.split("=") info_dict[code] = val return info_dict def annotateVCF(in_vcf_filepath, out_vcf_filepath): """Postprocess output of medaka tools annotate. Splits multiallelic sites into separate records. Replaces medaka INFO fields that might represent information of the ref and multiple alternate alleles with simple ref, alt allele counterparts. """ in_vcf = open(in_vcf_filepath, "r") # medaka INFO fields that do not make sense after splitting of # multi-allelic records # DP will be overwritten with the value of DPSP because medaka tools # annotate currently only calculates the latter correctly # (https://github.com/nanoporetech/medaka/issues/192). # DPS, which is as unreliable as DP, gets skipped and the code # calculates the spanning reads equivalent DPSPS instead. to_skip = {"SC", "SR", "AR", "DP", "DPSP", "DPS"} struct_meta_pat = re.compile("##(.+)=<ID=([^,]+)(,.+)?>") header_lines = [] contig_ids = set() contig_ids_simple = set() # parse the metadata lines of the input VCF and drop: # - duplicate lines # - INFO lines declaring keys we are not going to write # - redundant contig information while True: line = in_vcf.readline() if line[:2] != "##": assert line.startswith("#CHROM") break if line in header_lines: # the annotate tool may generate lines already written by # medaka variant again (example: medaka version line) continue match = struct_meta_pat.match(line) if match: match_type, match_id, match_misc = match.groups() if match_type == "INFO": if match_id == "DPSP": line = line.replace("DPSP", "DP") elif match_id in to_skip: continue elif match_type == "contig": contig_ids.add(match_id) if not match_misc: # the annotate tools writes its own contig info, # which is redundant with contig info generated by # medaka variant, but lacks a length value. # We don't need the incomplete line. contig_ids_simple.add(match_id) continue header_lines.append(line) # Lets check the above assumption about each ID-only contig line # having a more complete counterpart. assert not (contig_ids_simple - contig_ids) header_lines.insert(1, "##convert_VCF_info_fields=0.2\n") header_lines += [ '##INFO=<ID=DPSPS,Number=2,Type=Integer,Description="Depth of spanning reads by strand">\n', '##INFO=<ID=AF,Number=1,Type=Float,Description="Spanning Reads Allele Frequency">\n', '##INFO=<ID=FAF,Number=1,Type=Float,Description="Forward Spanning Reads Allele Frequency">\n', '##INFO=<ID=RAF,Number=1,Type=Float,Description="Reverse Spanning Reads Allele Frequency">\n', '##INFO=<ID=SB,Number=1,Type=Integer,Description="Phred-scaled strand bias of spanning reads at this position">\n', '##INFO=<ID=DP4,Number=4,Type=Integer,Description="Counts for ref-forward bases, ref-reverse, alt-forward and alt-reverse bases in spanning reads">\n', '##INFO=<ID=AS,Number=4,Type=Integer,Description="Total alignment score to ref and alt allele of spanning reads by strand (ref fwd, ref rev, alt fwd, alt rev) aligned with parasail match 5, mismatch -4, open 5, extend 3">\n', line, ] with open(out_vcf_filepath, "w") as out_vcf: out_vcf.writelines(header_lines) for line in in_vcf: fields = line.split("\t") info_dict = parseInfoField(fields[7]) sr_list = [int(x) for x in info_dict["SR"].split(",")] sc_list = [int(x) for x in info_dict["SC"].split(",")] if len(sr_list) != len(sc_list): print("WARNING - SR and SC are different lengths, " "skipping variant") print(line.strip()) # Print the line for debugging purposes continue variant_list = fields[4].split(",") dpsp = int(info_dict["DPSP"]) ref_fwd, ref_rev = 0, 1 dpspf, dpspr = (int(x) for x in info_dict["AR"].split(",")) for i in range(0, len(sr_list), 2): dpspf += sr_list[i] dpspr += sr_list[i + 1] for j, i in enumerate(range(2, len(sr_list), 2)): dp4 = (sr_list[ref_fwd], sr_list[ref_rev], sr_list[i], sr_list[i + 1]) dp2x2 = [[dp4[0], dp4[1]], [dp4[2], dp4[3]]] _, p_val = scipy.stats.fisher_exact(dp2x2) sb = pval_to_phredqual(p_val) as_ = (sc_list[ref_fwd], sc_list[ref_rev], sc_list[i], sc_list[i + 1]) info = [] for code in info_dict: if code in to_skip: continue val = info_dict[code] info.append("%s=%s" % (code, val)) info.append("DP=%d" % dpsp) info.append("DPSPS=%d,%d" % (dpspf, dpspr)) if dpsp == 0: info.append("AF=NaN") else: af = (dp4[2] + dp4[3]) / dpsp info.append("AF=%.6f" % af) if dpspf == 0: info.append("FAF=NaN") else: faf = dp4[2] / dpspf info.append("FAF=%.6f" % faf) if dpspr == 0: info.append("RAF=NaN") else: raf = dp4[3] / dpspr info.append("RAF=%.6f" % raf) info.append("SB=%d" % sb) info.append("DP4=%d,%d,%d,%d" % dp4) info.append("AS=%d,%d,%d,%d" % as_) new_info = ";".join(info) fields[4] = variant_list[j] fields[7] = new_info out_vcf.write("\t".join(fields)) in_vcf.close() if __name__ == "__main__": annotateVCF(sys.argv[1], sys.argv[2])