diff convert_VCF_info_fields.py @ 13:72b5e9dda577 draft

"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/medaka commit 52289bc7b99bfa8a3bda46cb35cea98399419dab"
author iuc
date Thu, 18 Nov 2021 20:01:28 +0000
parents 0e9e7fcdd543
children
line wrap: on
line diff
--- a/convert_VCF_info_fields.py	Fri Sep 17 20:23:12 2021 +0000
+++ b/convert_VCF_info_fields.py	Thu Nov 18 20:01:28 2021 +0000
@@ -24,10 +24,10 @@
 
 
 def parseInfoField(info):
-    info_fields = info.split(';')
+    info_fields = info.split(";")
     info_dict = OrderedDict()
     for info_field in info_fields:
-        code, val = info_field.split('=')
+        code, val = info_field.split("=")
         info_dict[code] = val
     return info_dict
 
@@ -40,7 +40,7 @@
     and multiple alternate alleles with simple ref, alt allele counterparts.
     """
 
-    in_vcf = open(in_vcf_filepath, 'r')
+    in_vcf = open(in_vcf_filepath, "r")
     # medaka INFO fields that do not make sense after splitting of
     # multi-allelic records
     # DP will be overwritten with the value of DPSP because medaka tools
@@ -48,8 +48,8 @@
     # (https://github.com/nanoporetech/medaka/issues/192).
     # DPS, which is as unreliable as DP, gets skipped and the code
     # calculates the spanning reads equivalent DPSPS instead.
-    to_skip = {'SC', 'SR', 'AR', 'DP', 'DPSP', 'DPS'}
-    struct_meta_pat = re.compile('##(.+)=<ID=([^,]+)(,.+)?>')
+    to_skip = {"SC", "SR", "AR", "DP", "DPSP", "DPS"}
+    struct_meta_pat = re.compile("##(.+)=<ID=([^,]+)(,.+)?>")
     header_lines = []
     contig_ids = set()
     contig_ids_simple = set()
@@ -59,8 +59,8 @@
     # - redundant contig information
     while True:
         line = in_vcf.readline()
-        if line[:2] != '##':
-            assert line.startswith('#CHROM')
+        if line[:2] != "##":
+            assert line.startswith("#CHROM")
             break
         if line in header_lines:
             # the annotate tool may generate lines already written by
@@ -69,12 +69,12 @@
         match = struct_meta_pat.match(line)
         if match:
             match_type, match_id, match_misc = match.groups()
-            if match_type == 'INFO':
-                if match_id == 'DPSP':
-                    line = line.replace('DPSP', 'DP')
+            if match_type == "INFO":
+                if match_id == "DPSP":
+                    line = line.replace("DPSP", "DP")
                 elif match_id in to_skip:
                     continue
-            elif match_type == 'contig':
+            elif match_type == "contig":
                 contig_ids.add(match_id)
                 if not match_misc:
                     # the annotate tools writes its own contig info,
@@ -87,7 +87,7 @@
     # Lets check the above assumption about each ID-only contig line
     # having a more complete counterpart.
     assert not (contig_ids_simple - contig_ids)
-    header_lines.insert(1, '##convert_VCF_info_fields=0.2\n')
+    header_lines.insert(1, "##convert_VCF_info_fields=0.2\n")
     header_lines += [
         '##INFO=<ID=DPSPS,Number=2,Type=Integer,Description="Depth of spanning reads by strand">\n',
         '##INFO=<ID=AF,Number=1,Type=Float,Description="Spanning Reads Allele Frequency">\n',
@@ -96,47 +96,34 @@
         '##INFO=<ID=SB,Number=1,Type=Integer,Description="Phred-scaled strand bias of spanning reads at this position">\n',
         '##INFO=<ID=DP4,Number=4,Type=Integer,Description="Counts for ref-forward bases, ref-reverse, alt-forward and alt-reverse bases in spanning reads">\n',
         '##INFO=<ID=AS,Number=4,Type=Integer,Description="Total alignment score to ref and alt allele of spanning reads by strand (ref fwd, ref rev, alt fwd, alt rev) aligned with parasail match 5, mismatch -4, open 5, extend 3">\n',
-        line
+        line,
     ]
 
-    with open(out_vcf_filepath, 'w') as out_vcf:
+    with open(out_vcf_filepath, "w") as out_vcf:
         out_vcf.writelines(header_lines)
         for line in in_vcf:
-            fields = line.split('\t')
+            fields = line.split("\t")
             info_dict = parseInfoField(fields[7])
-            sr_list = [int(x) for x in info_dict["SR"].split(',')]
-            sc_list = [int(x) for x in info_dict["SC"].split(',')]
+            sr_list = [int(x) for x in info_dict["SR"].split(",")]
+            sc_list = [int(x) for x in info_dict["SC"].split(",")]
             if len(sr_list) != len(sc_list):
-                print(
-                    'WARNING - SR and SC are different lengths, '
-                    'skipping variant'
-                )
+                print("WARNING - SR and SC are different lengths, " "skipping variant")
                 print(line.strip())  # Print the line for debugging purposes
                 continue
-            variant_list = fields[4].split(',')
-            dpsp = int(info_dict['DPSP'])
+            variant_list = fields[4].split(",")
+            dpsp = int(info_dict["DPSP"])
             ref_fwd, ref_rev = 0, 1
-            dpspf, dpspr = (int(x) for x in info_dict['AR'].split(','))
+            dpspf, dpspr = (int(x) for x in info_dict["AR"].split(","))
             for i in range(0, len(sr_list), 2):
                 dpspf += sr_list[i]
                 dpspr += sr_list[i + 1]
             for j, i in enumerate(range(2, len(sr_list), 2)):
-                dp4 = (
-                    sr_list[ref_fwd],
-                    sr_list[ref_rev],
-                    sr_list[i],
-                    sr_list[i + 1]
-                )
+                dp4 = (sr_list[ref_fwd], sr_list[ref_rev], sr_list[i], sr_list[i + 1])
                 dp2x2 = [[dp4[0], dp4[1]], [dp4[2], dp4[3]]]
                 _, p_val = scipy.stats.fisher_exact(dp2x2)
                 sb = pval_to_phredqual(p_val)
 
-                as_ = (
-                    sc_list[ref_fwd],
-                    sc_list[ref_rev],
-                    sc_list[i],
-                    sc_list[i + 1]
-                )
+                as_ = (sc_list[ref_fwd], sc_list[ref_rev], sc_list[i], sc_list[i + 1])
 
                 info = []
                 for code in info_dict:
@@ -145,31 +132,31 @@
                     val = info_dict[code]
                     info.append("%s=%s" % (code, val))
 
-                info.append('DP=%d' % dpsp)
-                info.append('DPSPS=%d,%d' % (dpspf, dpspr))
+                info.append("DP=%d" % dpsp)
+                info.append("DPSPS=%d,%d" % (dpspf, dpspr))
 
                 if dpsp == 0:
-                    info.append('AF=NaN')
+                    info.append("AF=NaN")
                 else:
                     af = (dp4[2] + dp4[3]) / dpsp
-                    info.append('AF=%.6f' % af)
+                    info.append("AF=%.6f" % af)
                 if dpspf == 0:
-                    info.append('FAF=NaN')
+                    info.append("FAF=NaN")
                 else:
                     faf = dp4[2] / dpspf
-                    info.append('FAF=%.6f' % faf)
+                    info.append("FAF=%.6f" % faf)
                 if dpspr == 0:
-                    info.append('RAF=NaN')
+                    info.append("RAF=NaN")
                 else:
                     raf = dp4[3] / dpspr
-                    info.append('RAF=%.6f' % raf)
-                info.append('SB=%d' % sb)
-                info.append('DP4=%d,%d,%d,%d' % dp4)
-                info.append('AS=%d,%d,%d,%d' % as_)
-                new_info = ';'.join(info)
+                    info.append("RAF=%.6f" % raf)
+                info.append("SB=%d" % sb)
+                info.append("DP4=%d,%d,%d,%d" % dp4)
+                info.append("AS=%d,%d,%d,%d" % as_)
+                new_info = ";".join(info)
                 fields[4] = variant_list[j]
                 fields[7] = new_info
-                out_vcf.write('\t'.join(fields))
+                out_vcf.write("\t".join(fields))
     in_vcf.close()