Mercurial > repos > artbio > lumpy_smoove
comparison vcf2hrdetect.py @ 11:5a326a6fa105 draft default tip
planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/main/tools/lumpy_smoove commit 8b10e8fc832f8ca7c32479e20d5edbd62088a3aa
| author | artbio |
|---|---|
| date | Fri, 17 Oct 2025 17:21:17 +0000 |
| parents | 7dcf61950215 |
| children |
comparison
equal
deleted
inserted
replaced
| 10:8711df965d4b | 11:5a326a6fa105 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import argparse | |
| 3 import re | |
| 1 import sys | 4 import sys |
| 2 | 5 |
| 3 handle = open(sys.argv[1], 'r') | 6 |
| 4 vcfdict = dict() | 7 def create_arg_parser(): |
| 5 tabdict = dict() | 8 """Creates and returns the argument parser.""" |
| 6 for line in handle: | 9 parser = argparse.ArgumentParser( |
| 7 if line[0] == "#": | 10 description=( |
| 8 continue | 11 "Convert a VCF file from lumpy-smoove to a tabular format " |
| 9 else: | 12 "compatible with the HRDetect pipeline." |
| 10 tabline = line[:-1].split("\t") | 13 ) |
| 11 vcfdict[tabline[2]] = tabline | 14 ) |
| 12 for id in vcfdict.keys(): | 15 parser.add_argument( |
| 13 if "_1" in id: | 16 'vcf_file', |
| 14 newid = id[:-2] | 17 help='Path to the input VCF file.' |
| 15 pointbreak = vcfdict[id][4] | 18 ) |
| 16 if "]" in pointbreak: | 19 parser.add_argument( |
| 17 coordbreak = pointbreak.split("]")[1].split(":")[1] | 20 'output_file', |
| 18 chrom = pointbreak.split("]")[1].split(":")[0] | 21 help='Path to the output tabular file.' |
| 19 elif "[" in pointbreak: | 22 ) |
| 20 coordbreak = pointbreak.split("[")[1].split(":")[1] | 23 return parser |
| 21 chrom = pointbreak.split("[")[1].split(":")[0] | 24 |
| 22 if vcfdict[id][0] == chrom: | 25 |
| 23 tabdict[newid] = [chrom, vcfdict[id][1], chrom, coordbreak, "INV"] | 26 def parse_breakend_alt(alt_field): |
| 24 else: | 27 """ |
| 25 tabdict[newid] = [vcfdict[id][0], vcfdict[id][1], | 28 Parses the ALT field for a breakend and returns chromosome and position. |
| 26 chrom, coordbreak, "TRA"] | 29 |
| 27 for id in list(vcfdict): | 30 Args: |
| 28 if "_" in id: | 31 alt_field (str): The ALT field (column 5) of a VCF line. |
| 29 del vcfdict[id] | 32 |
| 30 for id in vcfdict.keys(): # only sv that are not of type TRA or INV | 33 Returns: |
| 31 chr1 = vcfdict[id][0] | 34 tuple: A tuple containing (chromosome, position) or (None, None) |
| 32 chr2 = vcfdict[id][0] | 35 if parsing fails. |
| 33 pos1 = vcfdict[id][1] | 36 """ |
| 34 pos2 = vcfdict[id][7].split("END=")[1].split(";")[0] | 37 # Search for patterns ]chr:pos] or [chr:pos[ |
| 35 type = vcfdict[id][7].split("SVTYPE=")[1].split(";")[0] | 38 pattern = ( |
| 36 tabdict[id] = [chr1, pos1, chr2, pos2, type] | 39 r"\](?P<chrom1>[^:]+):(?P<pos1>\d+)\]|" |
| 37 out = open(sys.argv[2], 'w') | 40 r"\[(?P<chrom2>[^:]+):(?P<pos2>\d+)\[" |
| 38 out.write("chr1\tpos1\tchr2\tpos2\ttype\n") | 41 ) |
| 39 for key in tabdict: | 42 match = re.search(pattern, alt_field) |
| 40 line = "\t".join(tabdict[key]) + "\n" | 43 |
| 41 out.write(line) | 44 if not match: |
| 45 return None, None | |
| 46 | |
| 47 groups = match.groupdict() | |
| 48 chrom = groups['chrom1'] or groups['chrom2'] | |
| 49 pos = groups['pos1'] or groups['pos2'] | |
| 50 return chrom, pos | |
| 51 | |
| 52 | |
| 53 def process_vcf(vcf_path, output_path): | |
| 54 """ | |
| 55 Reads a VCF file, converts it, and writes the result to a tabular file. | |
| 56 | |
| 57 Args: | |
| 58 vcf_path (str): Path to the input VCF file. | |
| 59 output_path (str): Path to the output tabular file. | |
| 60 """ | |
| 61 header = ["chr1", "pos1", "chr2", "pos2", "type"] | |
| 62 try: | |
| 63 with open(vcf_path, 'r') as infile, open(output_path, 'w') as outfile: | |
| 64 outfile.write("\t".join(header) + "\n") | |
| 65 | |
| 66 for line in infile: | |
| 67 if line.startswith('#'): | |
| 68 continue | |
| 69 | |
| 70 fields = line.strip().split('\t') | |
| 71 if len(fields) < 8: | |
| 72 continue | |
| 73 | |
| 74 chrom1 = fields[0] | |
| 75 pos1 = fields[1] | |
| 76 info = fields[7] | |
| 77 | |
| 78 # Attempt to extract the structural variant type from the info | |
| 79 svtype_match = re.search(r'SVTYPE=([^;]+)', info) | |
| 80 if not svtype_match: | |
| 81 continue # Skip lines without SVTYPE tag | |
| 82 svtype = svtype_match.group(1) | |
| 83 | |
| 84 if svtype == "BND": # Breakend (INV or TRA) | |
| 85 alt_field = fields[4] | |
| 86 chrom2, pos2 = parse_breakend_alt(alt_field) | |
| 87 if not (chrom2 and pos2): | |
| 88 continue | |
| 89 event_type = "INV" if chrom1 == chrom2 else "TRA" | |
| 90 row = [chrom1, pos1, chrom2, pos2, event_type] | |
| 91 outfile.write("\t".join(row) + "\n") | |
| 92 | |
| 93 else: # Other SV types (DEL, DUP, etc.) | |
| 94 end_match = re.search(r'END=([^;]+)', info) | |
| 95 if not end_match: | |
| 96 continue | |
| 97 pos2 = end_match.group(1) | |
| 98 chrom2 = chrom1 | |
| 99 row = [chrom1, pos1, chrom2, pos2, svtype] | |
| 100 outfile.write("\t".join(row) + "\n") | |
| 101 | |
| 102 except FileNotFoundError: | |
| 103 print(f"Error: File '{vcf_path}' not found.", | |
| 104 file=sys.stderr) | |
| 105 sys.exit(1) | |
| 106 except IOError as e: | |
| 107 print(f"IO Error: {e}", file=sys.stderr) | |
| 108 sys.exit(1) | |
| 109 | |
| 110 | |
| 111 def main(): | |
| 112 """Main function of the script.""" | |
| 113 parser = create_arg_parser() | |
| 114 args = parser.parse_args() | |
| 115 process_vcf(args.vcf_file, args.output_file) | |
| 116 | |
| 117 | |
| 118 if __name__ == '__main__': | |
| 119 main() |
