Mercurial > repos > davidvanzessen > shm_csr
comparison split_imgt_file.py @ 92:cf8ad181628f draft
planemo upload commit 36be3b053802693392f935e6619ba3f2b1704e3c
| author | rhpvorderman |
|---|---|
| date | Mon, 12 Dec 2022 12:32:44 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 91:f387cc1580c6 | 92:cf8ad181628f |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 """ | |
| 4 Script to split IMGT file into several archives for each of the genes | |
| 5 | |
| 6 Rather than creating each new archive individually this script will read | |
| 7 the input files only once and as such enormously shorten processing time. | |
| 8 """ | |
| 9 | |
| 10 import argparse | |
| 11 import io | |
| 12 import os | |
| 13 import tarfile | |
| 14 import tempfile | |
| 15 from typing import Iterator, List, Tuple | |
| 16 | |
| 17 | |
| 18 def merged_txt_to_match_dict(merged: str): | |
| 19 with open(merged, "rt") as f: | |
| 20 header = next(f).strip("\n") | |
| 21 column_names = header.split("\t") | |
| 22 # For the baseline result there is no best_match column | |
| 23 if "best_match" in column_names: | |
| 24 best_match_index = column_names.index("best_match") | |
| 25 else: | |
| 26 best_match_index = None | |
| 27 sequence_id_index = column_names.index("Sequence.ID") | |
| 28 match_dict = {} | |
| 29 for line in f: | |
| 30 values = line.strip().split("\t") | |
| 31 sequence_id = values[sequence_id_index] | |
| 32 if best_match_index is not None: | |
| 33 best_match = values[best_match_index] | |
| 34 if "unmatched" in best_match: | |
| 35 # For some reason the table has values such as: unmatched, IGA2 | |
| 36 continue | |
| 37 else: | |
| 38 best_match = "" | |
| 39 match_dict[sequence_id] = best_match | |
| 40 return match_dict | |
| 41 | |
| 42 | |
| 43 def imgt_to_tables(imgt_file: str) -> Iterator[Tuple[str, io.TextIOWrapper]]: | |
| 44 print(f"opening IMGT file: {imgt_file}") | |
| 45 with tarfile.open(imgt_file, "r") as archive: | |
| 46 while True: | |
| 47 member = archive.next() | |
| 48 if member is None: | |
| 49 return | |
| 50 if member.name in {"README.txt"}: | |
| 51 continue | |
| 52 if member.name.startswith("11_"): | |
| 53 continue | |
| 54 f = archive.extractfile(member) | |
| 55 f_text = io.TextIOWrapper(f) | |
| 56 yield member.name, f_text | |
| 57 f_text.close() | |
| 58 | |
| 59 | |
| 60 def split_imgt(imgt_file: str, merged_file: str, outdir: str, genes: List[str], | |
| 61 prefix: str): | |
| 62 """ | |
| 63 This function creates a separate tar file for each of the gene matches | |
| 64 based on the merged file. Unmatched genes are left out. | |
| 65 :param imgt_file: The original IMGT file | |
| 66 :param merged_file: The merged data file generated by SHM&CSR pipeline | |
| 67 :param outdir: The output directory. | |
| 68 :param genes: The genes to split out. Use '-' for all identified genes. | |
| 69 :return: | |
| 70 """ | |
| 71 match_dict = merged_txt_to_match_dict(merged_file) | |
| 72 gene_tarfiles = [] | |
| 73 os.makedirs(outdir, exist_ok=True) | |
| 74 for gene in genes: | |
| 75 new_filename = f"{prefix}_{gene}.txz" if gene else f"{prefix}.txz" | |
| 76 gene_tarfiles.append( | |
| 77 tarfile.open(os.path.join(outdir, new_filename), mode="w:xz") | |
| 78 ) | |
| 79 for name, table in imgt_to_tables(imgt_file): | |
| 80 # Read each table one by one and per line select in which output | |
| 81 # files it should go. | |
| 82 gene_files = [] | |
| 83 for gene in genes: | |
| 84 fp, fname = tempfile.mkstemp() | |
| 85 # The file pointer fp will be wrapped in a python file object | |
| 86 # so we can ensure there remain no open files. | |
| 87 f = open(fp, mode="wt") | |
| 88 gene_files.append((gene, f, fname)) | |
| 89 header = next(table) | |
| 90 header_number_of_tabs = header.count('\t') | |
| 91 column_names = header.strip("\n").split("\t") | |
| 92 fr1_columns = [index for index, column in enumerate(column_names) | |
| 93 if column.startswith("FR1")] | |
| 94 sequence_id_index = column_names.index("Sequence ID") | |
| 95 for _, gene_file, _ in gene_files: | |
| 96 gene_file.write(header) | |
| 97 for line in table: | |
| 98 # IMGT sometimes delivers half-empty rows. | |
| 99 row_number_of_tabs = line.count("\t") | |
| 100 missing_tabs = header_number_of_tabs - row_number_of_tabs | |
| 101 if missing_tabs: | |
| 102 line = line.strip("\n") + missing_tabs * "\t" + "\n" | |
| 103 values = line.strip("\n").split("\t") | |
| 104 sequence_id = values[sequence_id_index] | |
| 105 match = match_dict.get(sequence_id) | |
| 106 if match is None: | |
| 107 continue | |
| 108 if name.startswith("8_"): | |
| 109 # change the FR1 columns to 0 in the "8_..." file | |
| 110 for index in fr1_columns: | |
| 111 values[index] = "0" | |
| 112 line = "\t".join(values) + "\n" | |
| 113 for gene, gene_file, _ in gene_files: | |
| 114 if gene in match: | |
| 115 gene_file.write(line) | |
| 116 for gene_tarfile, (_, gene_file, fname) in zip(gene_tarfiles, gene_files): | |
| 117 gene_file.flush() | |
| 118 gene_tarfile.add(fname, name) | |
| 119 gene_file.close() | |
| 120 os.remove(fname) | |
| 121 for gene_tarfile in gene_tarfiles: | |
| 122 gene_tarfile.close() | |
| 123 | |
| 124 | |
| 125 def argument_parser() -> argparse.ArgumentParser: | |
| 126 parser = argparse.ArgumentParser() | |
| 127 parser.add_argument("imgt_file", help="The original IMGT FILE") | |
| 128 parser.add_argument("merged", help="merged.txt file") | |
| 129 parser.add_argument("--outdir", help="output directory") | |
| 130 parser.add_argument( | |
| 131 "genes", | |
| 132 nargs="+", | |
| 133 help="The genes to split out. Use '-' for all identified genes.") | |
| 134 parser.add_argument("--prefix", help="Prefix for the archives and " | |
| 135 "directories") | |
| 136 return parser | |
| 137 | |
| 138 | |
| 139 def main(): | |
| 140 args = argument_parser().parse_args() | |
| 141 genes = ["" if gene == "-" else gene for gene in args.genes] | |
| 142 split_imgt(args.imgt_file, args.merged, args.outdir, genes, | |
| 143 args.prefix) | |
| 144 | |
| 145 | |
| 146 if __name__ == "__main__": | |
| 147 main() |
