Mercurial > repos > iuc > ivar_variants
comparison ivar_variants_to_vcf.py @ 6:147465efa99c draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ivar/ commit 847ec10cd36ea4f3cd4c257d5742f0fb401e364e"
| author | iuc |
|---|---|
| date | Thu, 10 Jun 2021 22:06:04 +0000 |
| parents | |
| children | 045d6d00f606 |
comparison
equal
deleted
inserted
replaced
| 5:49236b03e4fd | 6:147465efa99c |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import argparse | |
| 3 import errno | |
| 4 import os | |
| 5 import re | |
| 6 import sys | |
| 7 | |
| 8 | |
| 9 def parse_args(args=None): | |
| 10 Description = "Convert iVar variants tsv file to vcf format." | |
| 11 Epilog = """Example usage: python ivar_variants_to_vcf.py <FILE_IN> <FILE_OUT>""" | |
| 12 | |
| 13 parser = argparse.ArgumentParser(description=Description, epilog=Epilog) | |
| 14 parser.add_argument("FILE_IN", help="Input tsv file.") | |
| 15 parser.add_argument("FILE_OUT", help="Full path to output vcf file.") | |
| 16 parser.add_argument( | |
| 17 "-po", | |
| 18 "--pass_only", | |
| 19 dest="PASS_ONLY", | |
| 20 help="Only output variants that PASS all filters.", | |
| 21 action="store_true", | |
| 22 ) | |
| 23 | |
| 24 return parser.parse_args(args) | |
| 25 | |
| 26 | |
| 27 def make_dir(path): | |
| 28 if not len(path) == 0: | |
| 29 try: | |
| 30 os.makedirs(path) | |
| 31 except OSError as exception: | |
| 32 if exception.errno != errno.EEXIST: | |
| 33 raise | |
| 34 | |
| 35 | |
| 36 def info_line(info_keys, kv): | |
| 37 info_strings = [] | |
| 38 for key in info_keys: | |
| 39 if key not in kv: | |
| 40 raise KeyError( | |
| 41 'Expected key {} missing from INFO field key value pairs' | |
| 42 .format(key) | |
| 43 ) | |
| 44 if kv[key] is False: | |
| 45 # a FLAG element, which should not be set | |
| 46 continue | |
| 47 if kv[key] is True: | |
| 48 # a FLAG element => write the key only | |
| 49 info_strings.append(key) | |
| 50 else: | |
| 51 info_strings.append('{}={}'.format(key, kv[key])) | |
| 52 return ';'.join(info_strings) | |
| 53 | |
| 54 | |
| 55 def ivar_variants_to_vcf(FileIn, FileOut, passOnly=False): | |
| 56 filename = os.path.splitext(FileIn)[0] | |
| 57 header = ( | |
| 58 "##fileformat=VCFv4.2\n" | |
| 59 "##source=iVar\n" | |
| 60 '##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">\n' | |
| 61 '##INFO=<ID=REF_DP,Number=1,Type=Integer,Description="Depth of reference base">\n' | |
| 62 '##INFO=<ID=REF_RV,Number=1,Type=Integer,Description="Depth of reference base on reverse reads">\n' | |
| 63 '##INFO=<ID=REF_QUAL,Number=1,Type=Integer,Description="Mean quality of reference base">\n' | |
| 64 '##INFO=<ID=ALT_DP,Number=1,Type=Integer,Description="Depth of alternate base">\n' | |
| 65 '##INFO=<ID=ALT_RV,Number=1,Type=Integer,Description="Deapth of alternate base on reverse reads">\n' | |
| 66 '##INFO=<ID=ALT_QUAL,Number=1,Type=Integer,Description="Mean quality of alternate base">\n' | |
| 67 '##INFO=<ID=AF,Number=1,Type=Float,Description="Frequency of alternate base">\n' | |
| 68 '##INFO=<ID=INDEL,Number=0,Type=Flag,Description="Indicates that the variant is an INDEL.">\n' | |
| 69 '##FILTER=<ID=PASS,Description="Result of p-value <= 0.05">\n' | |
| 70 '##FILTER=<ID=FAIL,Description="Result of p-value > 0.05">\n' | |
| 71 ) | |
| 72 info_keys = [ | |
| 73 re.match(r'##INFO=<ID=([^,]+),', line).group(1) | |
| 74 for line in header.splitlines() | |
| 75 if line.startswith('##INFO=') | |
| 76 ] | |
| 77 header += ( | |
| 78 "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n" | |
| 79 ) | |
| 80 | |
| 81 vars_seen = set() | |
| 82 varCountDict = {"SNP": 0, "INS": 0, "DEL": 0} | |
| 83 OutDir = os.path.dirname(FileOut) | |
| 84 make_dir(OutDir) | |
| 85 with open(FileIn) as f, open(FileOut, "w") as fout: | |
| 86 fout.write(header) | |
| 87 for line in f: | |
| 88 if line.startswith("REGION"): | |
| 89 continue | |
| 90 | |
| 91 line = line.split("\t") | |
| 92 CHROM = line[0] | |
| 93 POS = line[1] | |
| 94 ID = "." | |
| 95 REF = line[2] | |
| 96 ALT = line[3] | |
| 97 if ALT[0] == "+": | |
| 98 ALT = REF + ALT[1:] | |
| 99 var_type = "INS" | |
| 100 elif ALT[0] == "-": | |
| 101 REF += ALT[1:] | |
| 102 ALT = line[2] | |
| 103 var_type = "DEL" | |
| 104 else: | |
| 105 var_type = "SNP" | |
| 106 QUAL = "." | |
| 107 pass_test = line[13] | |
| 108 if pass_test == "TRUE": | |
| 109 FILTER = "PASS" | |
| 110 else: | |
| 111 FILTER = "FAIL" | |
| 112 | |
| 113 if (passOnly and FILTER != "PASS"): | |
| 114 continue | |
| 115 var = (CHROM, POS, REF, ALT) | |
| 116 if var in vars_seen: | |
| 117 continue | |
| 118 | |
| 119 info_elements = { | |
| 120 'DP': line[11], | |
| 121 'REF_DP': line[4], | |
| 122 'REF_RV': line[5], | |
| 123 'REF_QUAL': line[6], | |
| 124 'ALT_DP': line[7], | |
| 125 'ALT_RV': line[8], | |
| 126 'ALT_QUAL': line[9], | |
| 127 'AF': line[10] | |
| 128 } | |
| 129 if var_type in ['INS', 'DEL']: | |
| 130 # add INDEL FLAG | |
| 131 info_elements['INDEL'] = True | |
| 132 else: | |
| 133 info_elements['INDEL'] = False | |
| 134 INFO = info_line(info_keys, info_elements) | |
| 135 | |
| 136 vars_seen.add(var) | |
| 137 varCountDict[var_type] += 1 | |
| 138 fout.write( | |
| 139 '\t'.join( | |
| 140 [CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO] | |
| 141 ) + '\n' | |
| 142 ) | |
| 143 | |
| 144 # Print variant counts to pass to MultiQC | |
| 145 varCountList = [(k, str(v)) for k, v in sorted(varCountDict.items())] | |
| 146 print("\t".join(["sample"] + [x[0] for x in varCountList])) | |
| 147 print("\t".join([filename] + [x[1] for x in varCountList])) | |
| 148 | |
| 149 | |
| 150 def main(args=None): | |
| 151 args = parse_args(args) | |
| 152 ivar_variants_to_vcf( | |
| 153 args.FILE_IN, args.FILE_OUT, args.PASS_ONLY | |
| 154 ) | |
| 155 | |
| 156 | |
| 157 if __name__ == "__main__": | |
| 158 sys.exit(main()) |
