view bprom_gff3_converter.py @ 0:14e64dda68c8 draft default tip

planemo upload commit 852ac96ca53a2ffa0947e6df5e24671866b642f5
author cpt
date Sun, 23 Jul 2023 01:47:35 +0000
parents
children
line wrap: on
line source

import argparse

import pandas as pd
import re
from typing import List, Match, Dict, TextIO, Union
from datetime import date

# In this file, a "feature" refers to the collection of data between the > keys of the bprom output.
# That collection of data refers to one section of the DNA upstream of a gene


def read_bprom_file(bprom_file) -> List[str]:
    """Reads in file, creating a list of strings with each list element containing a line from the file"""
    contents = []

    with open(bprom_file) as file:
        for line in file:
            contents.append(line)

    return contents


def concatenate_then_split(contents) -> List[str]:
    """Concatenates the file into one large string.
    Then splits it on '>' so that each feature's data is together in one element"""
    # Concatenates the entire file into one large string
    concat_contents = "".join(contents)

    # Removing the empty string '' at element 0 used to make the join
    concat_contents = concat_contents[1:]

    # Splits the file into a list of strings on ">"
    features = concat_contents.split(">")

    return features


def remove_promoterless_features(features) -> List[str]:
    """For each concatenated feature string passed, removes the element
    if the # of predicted promoters is 0."""
    cleaned_features = features
    indices_to_delete = []
    for i, feature in enumerate(cleaned_features):
        if "Number of predicted promoters -      0" in cleaned_features[i]:
            indices_to_delete.append(i)
    # Must delete in reverse order, otherwise it changes the list indices after
    # the element deleted, and you delete subsequent elements at i+1, i+2, i+3, etc
    for i in sorted(indices_to_delete, reverse=True):
        del cleaned_features[i]

    return cleaned_features


def extract_accession(feature) -> str:
    """Extract accession"""
    accession = re.search("[\w](.*)(?=_)", feature)
    accession = accession.group().replace("_", "").strip()

    return accession


def extract_test_seq_position(feature) -> List[str]:
    """Extract position in genome. Gets any number of values '(.*)' between the brackets
    using 'lookbehind/lookright' (?<=PATTERN) and 'lookahead/lookleft' regex assertions
    to extract (?<=Location=\\[)(.*)(?=]\\()"""
    location = re.search("(?<=Location=\\[)(.*)(?=]\\()", feature)
    location = location.group().split(":")

    return location


def extract_strand_direction(feature) -> str:
    """Extract strand direction for a feature, - or +"""
    # Matches for '(.)'
    direction = re.search("(?<=\\().(?=\\))", feature)
    direction = direction.group()

    return direction


def extract_promoter_data(feature) -> Dict[str, str]:
    """Extracts all promoter data using regular expressions.
    Use for one element in the output of concatenate_then_split()"""
    # Extract promoter -10 and -35 sequences and scores
    # Gets everything between "-xx box at pos." and " Score"
    minus10 = re.search("(?<=-10 box at pos.)(.*)(?= Score)(.*)", feature)
    minus35 = re.search("(?<=-35 box at pos.)(.*)(?= Score)(.*)", feature)

    # Extracts the match and removes leading and trailing whitespace (which can be variable)
    # (the bprom output does not maintain the same # of whitespace characters
    # if there are less digits, at least for the scoring)
    minus10 = minus10.group().lstrip().split(" ")
    minus10_pos = int(minus10[0])
    minus10_seq = minus10[1]
    minus10_score = minus10[-1]

    minus35 = minus35.group().lstrip().split(" ")
    minus35_pos = int(minus35[0])
    minus35_seq = minus35[1]
    minus35_score = minus35[-1]

    # Can change these keys to change the column 9
    promoter_data = {
        "minus10_pos": minus10_pos,
        "minus10_seq": minus10_seq,
        "minus10_score": minus10_score,
        "minus35_pos": minus35_pos,
        "minus35_seq": minus35_seq,
        "minus35_score": minus35_score,
    }
    return promoter_data


def convert_extracted_promoter_data_to_ID_column_format(
    promoter_data, calculated_promoter_positions
) -> str:
    """Converts input data to the GFF3 ID column (column 9) format, a semicolon separated
    list of values providing additional information about each feature"""
    # Replaces the BPROM output positions with the calculated ones
    minus_10_calculated = calculated_promoter_positions[2]
    minus_35_calculated = calculated_promoter_positions[3]
    promoter_data["minus10_pos"] = minus_10_calculated
    promoter_data["minus35_pos"] = minus_35_calculated

    # Creates the column 9 string (attributes)
    promoter_data = ["{}={}".format(key, value) for key, value in promoter_data.items()]
    promoter_data = (
        "Description=Predicted promoter data;" + "Note=" + ",".join(promoter_data) + ";"
    )
    return promoter_data


def extract_LDF_score(feature) -> str:
    """Extract LDF score"""
    LDF = re.search("(?<=LDF-)(.*)", feature)
    LDF = LDF.group().strip()

    return LDF


def calculate_promoter_position(feature):
    """Calculate promoter positions (in the context of the genome) based on BPROM predictions."""
    # Get 'Promoter Pos:     X' data. This refers to the predicted transcriptional start site!
    promoter_pos = re.search("(?<=Promoter Pos:)(.*)(?=LDF)", feature)
    promoter_pos = int(promoter_pos.group().strip())

    # Get start and end positions from 'Location=[XXX:YYYY]'
    test_seq_position = extract_test_seq_position(feature)
    test_cds_location_start_pos = int(test_seq_position[0])
    test_cds_location_end_pos = int(test_seq_position[1])

    promoter_data = extract_promoter_data(feature)

    """ IMPORTANT!! Whether or not you add or subtract to calculate the promoter start
    # position depends on whether we're on the + or - strand!
    # The workflow Jolene uses is smart enough to correctly pull upstream
    # for both + and - strands (i.e., pulls left for +, pulls right for -)
    # THEREFORE, for a gene with a start at 930 on the + strand, it pulls 830:930
    # And for a gene with a start at 930 on the - strand, it pulls 930:1030 """

    direction = extract_strand_direction(feature)

    if direction == "+":
        # BPROM starts counting from the LEFT boundary for + strand test sequences (as expected)
        # Get -10 promoter position
        minus10_pos = promoter_data["minus10_pos"]
        minus10_pos_in_context_of_genome = test_cds_location_start_pos + minus10_pos
        # Get -35 promoter position
        minus35_pos = promoter_data["minus35_pos"]
        minus35_pos_in_context_of_genome = test_cds_location_start_pos + minus35_pos

        start = test_cds_location_start_pos + minus35_pos
        end = test_cds_location_start_pos + promoter_pos

        calculated_promoter_positions = [
            start,
            end,
            minus10_pos_in_context_of_genome,
            minus35_pos_in_context_of_genome,
        ]
        return calculated_promoter_positions

    elif direction == "-":
        # BPROM starts counting from the RIGHT boundary for - strand test sequences
        # Get -10 promoter position
        minus10_pos = promoter_data["minus10_pos"]
        minus10_pos_in_context_of_genome = test_cds_location_end_pos - minus10_pos
        # Get -35 promoter position
        minus35_pos = promoter_data["minus35_pos"]
        minus35_pos_in_context_of_genome = test_cds_location_end_pos - minus35_pos

        # The start and end are reversed
        end = test_cds_location_end_pos - minus35_pos
        start = test_cds_location_end_pos - promoter_pos

        calculated_promoter_positions = [
            start,
            end,
            minus10_pos_in_context_of_genome,
            minus35_pos_in_context_of_genome,
        ]
        return calculated_promoter_positions

    else:
        assert "Error: Strand data neither '+' nor '-'"


def extract_tf_binding_elements():
    """Extract predicted transcription factor binding elements"""
    return


def extract_data_for_all_features(features) -> List[List[Union[str, int]]]:
    """Loops through cleaned bprom output extracting all data of interest and builds the
    structure for loading into a dataframe"""
    extracted_data = []
    for feature in features:
        # loop through features, a List[str] containing each feature [str] in the
        # original bprom format as a single string, but cleaned of irrelevant data
        calculated_promoter_positions = calculate_promoter_position(feature)
        promoter_data = extract_promoter_data(feature)
        promoter_data_converted = convert_extracted_promoter_data_to_ID_column_format(
            promoter_data, calculated_promoter_positions
        )

        extracted_data.append(
            [
                extract_accession(feature),  # Seqid, col 1
                "bprom",  # Source, col 2
                "promoter",  # Type, col 3
                calculated_promoter_positions[0],  # Start, col 4
                calculated_promoter_positions[1],  # End, col 5
                extract_LDF_score(feature),  # Score, col 6
                extract_strand_direction(feature),  # Strand direction, col 7
                ".",  # Phase, col 8
                promoter_data_converted,  # Attributes, col 9
            ]
        )

    return extracted_data


def convert_to_dataframe(extracted_data) -> pd.DataFrame:
    """Convert extracted and processed data to Pandas dataframe with gff3 column names"""

    df = pd.DataFrame(
        extracted_data,
        columns=[
            "seqid",
            "source",
            "type",
            "start",
            "end",
            "score",
            "strand",
            "phase",
            "attributes",
        ],
    )
    return df


def write_to_gff3(dataframe) -> None:
    """Create a gff3 text file from the DataFrame by converting to a tab separated values (tsv) file"""
    tsv = dataframe.to_csv(sep="\t", index=False, header=None)

    # Gets the first element of the first column to use for
    accession = dataframe.iloc[0][0]

    year, month, day = date.today().year, date.today().month, date.today().day

    # with open(f'{year}_{month}_{day}_bprom_as_gff3_{accession}.txt', 'w') as wf:
    # Header so Galaxy can recognize as GFF3
    print("##gff-version 3\n")
    # for line in tsv:
    print(tsv)

    return


def convert_bprom_output_to_gff3(bprom_file) -> None:
    """Master function. Given a BPROM .txt file as output, extracts data and writes as a GFF3 file"""
    bprom_file = read_bprom_file(bprom_file)
    concatenated_bprom_file = concatenate_then_split(bprom_file)
    working_file = remove_promoterless_features(concatenated_bprom_file)
    extracted_data = extract_data_for_all_features(working_file)
    gff3_dataframe = convert_to_dataframe(extracted_data)
    # Create the gff3 text file
    write_to_gff3(gff3_dataframe)

    return


if __name__ == "__main__":
    ## Shows the DataFrame output in the terminal for testing/debugging
    # bprom_file = read_bprom_file('BPROM_output.txt')
    # concatenated_bprom_file: List[str] = concatenate_then_split(bprom_file)
    # working_file = remove_promoterless_features(concatenated_bprom_file)
    # print(convert_to_dataframe(extract_data_for_all_features(working_file)).to_string())

    parser = argparse.ArgumentParser(
        description="converts BPROM output to the gff3 file format"
    )

    parser.add_argument("-f", help="bprom file as .txt")
    args = parser.parse_args()
    # Actual function for converting the BPROM output to gff3
    convert_bprom_output_to_gff3(args.f)

    # Upload to cpt github in the directory Galaxy-Tools/tools/external/