view add_taxonomic_labels.py @ 2:f4b8ab4ed24e draft

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/add_header_tool commit 4017d38cf327c48a6252e488ba792527dae97a70-dirty
author onnodg
date Mon, 15 Dec 2025 16:49:00 +0000
parents abd214795fa5
children 04ec86bdac32
line wrap: on
line source

"""This script processes the output obtained from a curated BLAST database
and prepares the correct input format for downstream analysis.

The difference before and after running this script is that taxonomic labels
in the BLAST output are no longer marked as unknown, because the curated database
only includes taxonomy information in the headers of the output sequences.
The source and sequence ID of the hit are also added to the correct position
in the tabular file.

To extract the correct taxonomy levels, it is necessary to know the exact
positions of the required taxonomic ranks. These positions must be specified
in the `--taxon_levels`, based on splitting each header string first on '=' and
then on whitespace.

Important:
This script is not needed for GenBank-based databases, since their output
is already usable.  
`--taxon_levels` is a critical variable — do not modify it unless you
understand what you're doing.
"""

import argparse
import re

def parse_arguments(args_list=None):
    """Parse the command line arguments."""
    parser = argparse.ArgumentParser(description="Add taxonomix labels from curated database output")
    parser.add_argument("-i", "--input_file", required=True)
    parser.add_argument("-o", "--output_file", required=True)
    parser.add_argument("-t", "--taxon_levels", type=int, nargs="+", default=[1, 2, 4, 7, 11, 12, 13])
    return parser.parse_args(args_list)


def add_labels(input, output, taxon_levels):
    """
    Add taxonomic labels, source and seqID to correct positions in BLAST output.

    :param input: Path to BLAST output file.
    :type input: str
    :param output: Path to output file.
    :type output: str
    :param taxon_levels: List of taxonomic levels.
    :type taxon_levels: list
    :return: None
    :rtype: None
    :raises: ValueError: if certain fields are missing from the input
    """
    # Make header in file
    with open(output, 'w') as outfile:
        outfile.write('#Query ID	#Subject	#Subject accession	#Subject Taxonomy ID	#Identity percentage	'
                      '#Coverage	#evalue	#bitscore	#Source	#Taxonomy\n')

    with (open(input, 'r') as infile, open(output, 'a') as outfile):
        for line in infile:
            if "#Query ID" not in line:
                new_taxa = ''
                src_val = ""
                seq_id = ""
                split_index = 0
                if "superkingdom=" in line and "markercode="  in line and "Genbank" in line:
                    taxonomy = line.split("superkingdom=")[-1].split("markercode=")[0].strip()
                    line = line.split("Genbank")[0].strip()
                    parts = line.strip().split()
                else:
                    raise ValueError("Line does not contain expected fields: superkingdom, markercode, or Genbank")

                for i, p in enumerate(parts):
                    if p.startswith("source="):
                        src_val = p.split("=", 1)[1]
                    elif p.startswith("sequenceID="):
                        seq_id = p.split("=", 1)[1]
                    elif p.startswith("markercode="):
                        split_index = i
                    if src_val and seq_id and split_index:
                        break

                # Split on the source, which appears twice in the line
                line1, remove, line2 = line.split(f"source={src_val}", 2)
                # Just remove the taxon and source
                keep = remove.split(parts[split_index-1])[-1]
                if src_val and seq_id:
                    # Insert source and seq id in the line
                    line = (line1.strip() + "\t" + keep.strip() + "\t" + seq_id.strip() + "\t"
                    + line2.strip() + "\t" + src_val)
                # Add taxon to line
                for level in taxon_levels:
                    if level != taxon_levels[-1]:
                        new_taxa += taxonomy.split('=')[level].split(' ')[0]
                        new_taxa += ' / '
                    else:
                        # The species name already contains a space, so it
                        # should not be split further.
                        new_taxa += taxonomy.split('=')[level]
                line += '\t'
                line += new_taxa
                outfile.write(f'{line}\n')


def main(arg_list=None):
    args = parse_arguments(arg_list)
    add_labels(args.input_file, args.output_file, args.taxon_levels)


if __name__ == '__main__':
    main()