comparison vcf2hrdetect.py @ 11:5a326a6fa105 draft default tip

planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/main/tools/lumpy_smoove commit 8b10e8fc832f8ca7c32479e20d5edbd62088a3aa
author artbio
date Fri, 17 Oct 2025 17:21:17 +0000
parents 7dcf61950215
children
comparison
equal deleted inserted replaced
10:8711df965d4b 11:5a326a6fa105
1 #!/usr/bin/env python
2 import argparse
3 import re
1 import sys 4 import sys
2 5
3 handle = open(sys.argv[1], 'r') 6
4 vcfdict = dict() 7 def create_arg_parser():
5 tabdict = dict() 8 """Creates and returns the argument parser."""
6 for line in handle: 9 parser = argparse.ArgumentParser(
7 if line[0] == "#": 10 description=(
8 continue 11 "Convert a VCF file from lumpy-smoove to a tabular format "
9 else: 12 "compatible with the HRDetect pipeline."
10 tabline = line[:-1].split("\t") 13 )
11 vcfdict[tabline[2]] = tabline 14 )
12 for id in vcfdict.keys(): 15 parser.add_argument(
13 if "_1" in id: 16 'vcf_file',
14 newid = id[:-2] 17 help='Path to the input VCF file.'
15 pointbreak = vcfdict[id][4] 18 )
16 if "]" in pointbreak: 19 parser.add_argument(
17 coordbreak = pointbreak.split("]")[1].split(":")[1] 20 'output_file',
18 chrom = pointbreak.split("]")[1].split(":")[0] 21 help='Path to the output tabular file.'
19 elif "[" in pointbreak: 22 )
20 coordbreak = pointbreak.split("[")[1].split(":")[1] 23 return parser
21 chrom = pointbreak.split("[")[1].split(":")[0] 24
22 if vcfdict[id][0] == chrom: 25
23 tabdict[newid] = [chrom, vcfdict[id][1], chrom, coordbreak, "INV"] 26 def parse_breakend_alt(alt_field):
24 else: 27 """
25 tabdict[newid] = [vcfdict[id][0], vcfdict[id][1], 28 Parses the ALT field for a breakend and returns chromosome and position.
26 chrom, coordbreak, "TRA"] 29
27 for id in list(vcfdict): 30 Args:
28 if "_" in id: 31 alt_field (str): The ALT field (column 5) of a VCF line.
29 del vcfdict[id] 32
30 for id in vcfdict.keys(): # only sv that are not of type TRA or INV 33 Returns:
31 chr1 = vcfdict[id][0] 34 tuple: A tuple containing (chromosome, position) or (None, None)
32 chr2 = vcfdict[id][0] 35 if parsing fails.
33 pos1 = vcfdict[id][1] 36 """
34 pos2 = vcfdict[id][7].split("END=")[1].split(";")[0] 37 # Search for patterns ]chr:pos] or [chr:pos[
35 type = vcfdict[id][7].split("SVTYPE=")[1].split(";")[0] 38 pattern = (
36 tabdict[id] = [chr1, pos1, chr2, pos2, type] 39 r"\](?P<chrom1>[^:]+):(?P<pos1>\d+)\]|"
37 out = open(sys.argv[2], 'w') 40 r"\[(?P<chrom2>[^:]+):(?P<pos2>\d+)\["
38 out.write("chr1\tpos1\tchr2\tpos2\ttype\n") 41 )
39 for key in tabdict: 42 match = re.search(pattern, alt_field)
40 line = "\t".join(tabdict[key]) + "\n" 43
41 out.write(line) 44 if not match:
45 return None, None
46
47 groups = match.groupdict()
48 chrom = groups['chrom1'] or groups['chrom2']
49 pos = groups['pos1'] or groups['pos2']
50 return chrom, pos
51
52
53 def process_vcf(vcf_path, output_path):
54 """
55 Reads a VCF file, converts it, and writes the result to a tabular file.
56
57 Args:
58 vcf_path (str): Path to the input VCF file.
59 output_path (str): Path to the output tabular file.
60 """
61 header = ["chr1", "pos1", "chr2", "pos2", "type"]
62 try:
63 with open(vcf_path, 'r') as infile, open(output_path, 'w') as outfile:
64 outfile.write("\t".join(header) + "\n")
65
66 for line in infile:
67 if line.startswith('#'):
68 continue
69
70 fields = line.strip().split('\t')
71 if len(fields) < 8:
72 continue
73
74 chrom1 = fields[0]
75 pos1 = fields[1]
76 info = fields[7]
77
78 # Attempt to extract the structural variant type from the info
79 svtype_match = re.search(r'SVTYPE=([^;]+)', info)
80 if not svtype_match:
81 continue # Skip lines without SVTYPE tag
82 svtype = svtype_match.group(1)
83
84 if svtype == "BND": # Breakend (INV or TRA)
85 alt_field = fields[4]
86 chrom2, pos2 = parse_breakend_alt(alt_field)
87 if not (chrom2 and pos2):
88 continue
89 event_type = "INV" if chrom1 == chrom2 else "TRA"
90 row = [chrom1, pos1, chrom2, pos2, event_type]
91 outfile.write("\t".join(row) + "\n")
92
93 else: # Other SV types (DEL, DUP, etc.)
94 end_match = re.search(r'END=([^;]+)', info)
95 if not end_match:
96 continue
97 pos2 = end_match.group(1)
98 chrom2 = chrom1
99 row = [chrom1, pos1, chrom2, pos2, svtype]
100 outfile.write("\t".join(row) + "\n")
101
102 except FileNotFoundError:
103 print(f"Error: File '{vcf_path}' not found.",
104 file=sys.stderr)
105 sys.exit(1)
106 except IOError as e:
107 print(f"IO Error: {e}", file=sys.stderr)
108 sys.exit(1)
109
110
111 def main():
112 """Main function of the script."""
113 parser = create_arg_parser()
114 args = parser.parse_args()
115 process_vcf(args.vcf_file, args.output_file)
116
117
118 if __name__ == '__main__':
119 main()