Mercurial > repos > iuc > medaka_snp
comparison convert_VCF_info_fields.py @ 0:179342c7b86c draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/medaka commit 4d3dfd4bcb567178107dcfd808ff03f9fec0bdbd
| author | iuc |
|---|---|
| date | Wed, 12 Oct 2022 07:43:59 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:179342c7b86c |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Takes in VCF file annotated with medaka tools annotate and converts | |
| 4 # | |
| 5 # Usage statement: | |
| 6 # python convert_VCF_info_fields.py in_vcf.vcf out_vcf.vcf | |
| 7 | |
| 8 # 10/21/2020 - Nathan P. Roach, natproach@gmail.com | |
| 9 | |
| 10 import re | |
| 11 import sys | |
| 12 from collections import OrderedDict | |
| 13 from math import log10 | |
| 14 | |
| 15 import scipy.stats | |
| 16 | |
| 17 | |
| 18 def pval_to_phredqual(pval): | |
| 19 try: | |
| 20 ret = round(-10 * log10(pval)) | |
| 21 except ValueError: | |
| 22 ret = 2147483647 # transform pval of 0.0 to max signed 32 bit int | |
| 23 return ret | |
| 24 | |
| 25 | |
| 26 def parseInfoField(info): | |
| 27 info_fields = info.split(";") | |
| 28 info_dict = OrderedDict() | |
| 29 for info_field in info_fields: | |
| 30 code, val = info_field.split("=") | |
| 31 info_dict[code] = val | |
| 32 return info_dict | |
| 33 | |
| 34 | |
| 35 def annotateVCF(in_vcf_filepath, out_vcf_filepath): | |
| 36 """Postprocess output of medaka tools annotate. | |
| 37 | |
| 38 Splits multiallelic sites into separate records. | |
| 39 Replaces medaka INFO fields that might represent information of the ref | |
| 40 and multiple alternate alleles with simple ref, alt allele counterparts. | |
| 41 """ | |
| 42 | |
| 43 in_vcf = open(in_vcf_filepath, "r") | |
| 44 # medaka INFO fields that do not make sense after splitting of | |
| 45 # multi-allelic records | |
| 46 # DP will be overwritten with the value of DPSP because medaka tools | |
| 47 # annotate currently only calculates the latter correctly | |
| 48 # (https://github.com/nanoporetech/medaka/issues/192). | |
| 49 # DPS, which is as unreliable as DP, gets skipped and the code | |
| 50 # calculates the spanning reads equivalent DPSPS instead. | |
| 51 to_skip = {"SC", "SR", "AR", "DP", "DPSP", "DPS"} | |
| 52 struct_meta_pat = re.compile("##(.+)=<ID=([^,]+)(,.+)?>") | |
| 53 header_lines = [] | |
| 54 contig_ids = set() | |
| 55 contig_ids_simple = set() | |
| 56 # parse the metadata lines of the input VCF and drop: | |
| 57 # - duplicate lines | |
| 58 # - INFO lines declaring keys we are not going to write | |
| 59 # - redundant contig information | |
| 60 while True: | |
| 61 line = in_vcf.readline() | |
| 62 if line[:2] != "##": | |
| 63 assert line.startswith("#CHROM") | |
| 64 break | |
| 65 if line in header_lines: | |
| 66 # the annotate tool may generate lines already written by | |
| 67 # medaka variant again (example: medaka version line) | |
| 68 continue | |
| 69 match = struct_meta_pat.match(line) | |
| 70 if match: | |
| 71 match_type, match_id, match_misc = match.groups() | |
| 72 if match_type == "INFO": | |
| 73 if match_id == "DPSP": | |
| 74 line = line.replace("DPSP", "DP") | |
| 75 elif match_id in to_skip: | |
| 76 continue | |
| 77 elif match_type == "contig": | |
| 78 contig_ids.add(match_id) | |
| 79 if not match_misc: | |
| 80 # the annotate tools writes its own contig info, | |
| 81 # which is redundant with contig info generated by | |
| 82 # medaka variant, but lacks a length value. | |
| 83 # We don't need the incomplete line. | |
| 84 contig_ids_simple.add(match_id) | |
| 85 continue | |
| 86 header_lines.append(line) | |
| 87 # Lets check the above assumption about each ID-only contig line | |
| 88 # having a more complete counterpart. | |
| 89 assert not (contig_ids_simple - contig_ids) | |
| 90 header_lines.insert(1, "##convert_VCF_info_fields=0.2\n") | |
| 91 header_lines += [ | |
| 92 '##INFO=<ID=DPSPS,Number=2,Type=Integer,Description="Depth of spanning reads by strand">\n', | |
| 93 '##INFO=<ID=AF,Number=1,Type=Float,Description="Spanning Reads Allele Frequency">\n', | |
| 94 '##INFO=<ID=FAF,Number=1,Type=Float,Description="Forward Spanning Reads Allele Frequency">\n', | |
| 95 '##INFO=<ID=RAF,Number=1,Type=Float,Description="Reverse Spanning Reads Allele Frequency">\n', | |
| 96 '##INFO=<ID=SB,Number=1,Type=Integer,Description="Phred-scaled strand bias of spanning reads at this position">\n', | |
| 97 '##INFO=<ID=DP4,Number=4,Type=Integer,Description="Counts for ref-forward bases, ref-reverse, alt-forward and alt-reverse bases in spanning reads">\n', | |
| 98 '##INFO=<ID=AS,Number=4,Type=Integer,Description="Total alignment score to ref and alt allele of spanning reads by strand (ref fwd, ref rev, alt fwd, alt rev) aligned with parasail match 5, mismatch -4, open 5, extend 3">\n', | |
| 99 line, | |
| 100 ] | |
| 101 | |
| 102 with open(out_vcf_filepath, "w") as out_vcf: | |
| 103 out_vcf.writelines(header_lines) | |
| 104 for line in in_vcf: | |
| 105 fields = line.split("\t") | |
| 106 info_dict = parseInfoField(fields[7]) | |
| 107 sr_list = [int(x) for x in info_dict["SR"].split(",")] | |
| 108 sc_list = [int(x) for x in info_dict["SC"].split(",")] | |
| 109 if len(sr_list) != len(sc_list): | |
| 110 print("WARNING - SR and SC are different lengths, " "skipping variant") | |
| 111 print(line.strip()) # Print the line for debugging purposes | |
| 112 continue | |
| 113 variant_list = fields[4].split(",") | |
| 114 dpsp = int(info_dict["DPSP"]) | |
| 115 ref_fwd, ref_rev = 0, 1 | |
| 116 dpspf, dpspr = (int(x) for x in info_dict["AR"].split(",")) | |
| 117 for i in range(0, len(sr_list), 2): | |
| 118 dpspf += sr_list[i] | |
| 119 dpspr += sr_list[i + 1] | |
| 120 for j, i in enumerate(range(2, len(sr_list), 2)): | |
| 121 dp4 = (sr_list[ref_fwd], sr_list[ref_rev], sr_list[i], sr_list[i + 1]) | |
| 122 dp2x2 = [[dp4[0], dp4[1]], [dp4[2], dp4[3]]] | |
| 123 _, p_val = scipy.stats.fisher_exact(dp2x2) | |
| 124 sb = pval_to_phredqual(p_val) | |
| 125 | |
| 126 as_ = (sc_list[ref_fwd], sc_list[ref_rev], sc_list[i], sc_list[i + 1]) | |
| 127 | |
| 128 info = [] | |
| 129 for code in info_dict: | |
| 130 if code in to_skip: | |
| 131 continue | |
| 132 val = info_dict[code] | |
| 133 info.append("%s=%s" % (code, val)) | |
| 134 | |
| 135 info.append("DP=%d" % dpsp) | |
| 136 info.append("DPSPS=%d,%d" % (dpspf, dpspr)) | |
| 137 | |
| 138 if dpsp == 0: | |
| 139 info.append("AF=NaN") | |
| 140 else: | |
| 141 af = (dp4[2] + dp4[3]) / dpsp | |
| 142 info.append("AF=%.6f" % af) | |
| 143 if dpspf == 0: | |
| 144 info.append("FAF=NaN") | |
| 145 else: | |
| 146 faf = dp4[2] / dpspf | |
| 147 info.append("FAF=%.6f" % faf) | |
| 148 if dpspr == 0: | |
| 149 info.append("RAF=NaN") | |
| 150 else: | |
| 151 raf = dp4[3] / dpspr | |
| 152 info.append("RAF=%.6f" % raf) | |
| 153 info.append("SB=%d" % sb) | |
| 154 info.append("DP4=%d,%d,%d,%d" % dp4) | |
| 155 info.append("AS=%d,%d,%d,%d" % as_) | |
| 156 new_info = ";".join(info) | |
| 157 fields[4] = variant_list[j] | |
| 158 fields[7] = new_info | |
| 159 out_vcf.write("\t".join(fields)) | |
| 160 in_vcf.close() | |
| 161 | |
| 162 | |
| 163 if __name__ == "__main__": | |
| 164 annotateVCF(sys.argv[1], sys.argv[2]) |
