diff bprom_gff3_converter.py @ 0:14e64dda68c8 draft default tip

planemo upload commit 852ac96ca53a2ffa0947e6df5e24671866b642f5
author cpt
date Sun, 23 Jul 2023 01:47:35 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bprom_gff3_converter.py	Sun Jul 23 01:47:35 2023 +0000
@@ -0,0 +1,310 @@
+import argparse
+
+import pandas as pd
+import re
+from typing import List, Match, Dict, TextIO, Union
+from datetime import date
+
+# In this file, a "feature" refers to the collection of data between the > keys of the bprom output.
+# That collection of data refers to one section of the DNA upstream of a gene
+
+
+def read_bprom_file(bprom_file) -> List[str]:
+    """Reads in file, creating a list of strings with each list element containing a line from the file"""
+    contents = []
+
+    with open(bprom_file) as file:
+        for line in file:
+            contents.append(line)
+
+    return contents
+
+
+def concatenate_then_split(contents) -> List[str]:
+    """Concatenates the file into one large string.
+    Then splits it on '>' so that each feature's data is together in one element"""
+    # Concatenates the entire file into one large string
+    concat_contents = "".join(contents)
+
+    # Removing the empty string '' at element 0 used to make the join
+    concat_contents = concat_contents[1:]
+
+    # Splits the file into a list of strings on ">"
+    features = concat_contents.split(">")
+
+    return features
+
+
+def remove_promoterless_features(features) -> List[str]:
+    """For each concatenated feature string passed, removes the element
+    if the # of predicted promoters is 0."""
+    cleaned_features = features
+    indices_to_delete = []
+    for i, feature in enumerate(cleaned_features):
+        if "Number of predicted promoters -      0" in cleaned_features[i]:
+            indices_to_delete.append(i)
+    # Must delete in reverse order, otherwise it changes the list indices after
+    # the element deleted, and you delete subsequent elements at i+1, i+2, i+3, etc
+    for i in sorted(indices_to_delete, reverse=True):
+        del cleaned_features[i]
+
+    return cleaned_features
+
+
+def extract_accession(feature) -> str:
+    """Extract accession"""
+    accession = re.search("[\w](.*)(?=_)", feature)
+    accession = accession.group().replace("_", "").strip()
+
+    return accession
+
+
+def extract_test_seq_position(feature) -> List[str]:
+    """Extract position in genome. Gets any number of values '(.*)' between the brackets
+    using 'lookbehind/lookright' (?<=PATTERN) and 'lookahead/lookleft' regex assertions
+    to extract (?<=Location=\\[)(.*)(?=]\\()"""
+    location = re.search("(?<=Location=\\[)(.*)(?=]\\()", feature)
+    location = location.group().split(":")
+
+    return location
+
+
+def extract_strand_direction(feature) -> str:
+    """Extract strand direction for a feature, - or +"""
+    # Matches for '(.)'
+    direction = re.search("(?<=\\().(?=\\))", feature)
+    direction = direction.group()
+
+    return direction
+
+
+def extract_promoter_data(feature) -> Dict[str, str]:
+    """Extracts all promoter data using regular expressions.
+    Use for one element in the output of concatenate_then_split()"""
+    # Extract promoter -10 and -35 sequences and scores
+    # Gets everything between "-xx box at pos." and " Score"
+    minus10 = re.search("(?<=-10 box at pos.)(.*)(?= Score)(.*)", feature)
+    minus35 = re.search("(?<=-35 box at pos.)(.*)(?= Score)(.*)", feature)
+
+    # Extracts the match and removes leading and trailing whitespace (which can be variable)
+    # (the bprom output does not maintain the same # of whitespace characters
+    # if there are less digits, at least for the scoring)
+    minus10 = minus10.group().lstrip().split(" ")
+    minus10_pos = int(minus10[0])
+    minus10_seq = minus10[1]
+    minus10_score = minus10[-1]
+
+    minus35 = minus35.group().lstrip().split(" ")
+    minus35_pos = int(minus35[0])
+    minus35_seq = minus35[1]
+    minus35_score = minus35[-1]
+
+    # Can change these keys to change the column 9
+    promoter_data = {
+        "minus10_pos": minus10_pos,
+        "minus10_seq": minus10_seq,
+        "minus10_score": minus10_score,
+        "minus35_pos": minus35_pos,
+        "minus35_seq": minus35_seq,
+        "minus35_score": minus35_score,
+    }
+    return promoter_data
+
+
+def convert_extracted_promoter_data_to_ID_column_format(
+    promoter_data, calculated_promoter_positions
+) -> str:
+    """Converts input data to the GFF3 ID column (column 9) format, a semicolon separated
+    list of values providing additional information about each feature"""
+    # Replaces the BPROM output positions with the calculated ones
+    minus_10_calculated = calculated_promoter_positions[2]
+    minus_35_calculated = calculated_promoter_positions[3]
+    promoter_data["minus10_pos"] = minus_10_calculated
+    promoter_data["minus35_pos"] = minus_35_calculated
+
+    # Creates the column 9 string (attributes)
+    promoter_data = ["{}={}".format(key, value) for key, value in promoter_data.items()]
+    promoter_data = (
+        "Description=Predicted promoter data;" + "Note=" + ",".join(promoter_data) + ";"
+    )
+    return promoter_data
+
+
+def extract_LDF_score(feature) -> str:
+    """Extract LDF score"""
+    LDF = re.search("(?<=LDF-)(.*)", feature)
+    LDF = LDF.group().strip()
+
+    return LDF
+
+
+def calculate_promoter_position(feature):
+    """Calculate promoter positions (in the context of the genome) based on BPROM predictions."""
+    # Get 'Promoter Pos:     X' data. This refers to the predicted transcriptional start site!
+    promoter_pos = re.search("(?<=Promoter Pos:)(.*)(?=LDF)", feature)
+    promoter_pos = int(promoter_pos.group().strip())
+
+    # Get start and end positions from 'Location=[XXX:YYYY]'
+    test_seq_position = extract_test_seq_position(feature)
+    test_cds_location_start_pos = int(test_seq_position[0])
+    test_cds_location_end_pos = int(test_seq_position[1])
+
+    promoter_data = extract_promoter_data(feature)
+
+    """ IMPORTANT!! Whether or not you add or subtract to calculate the promoter start
+    # position depends on whether we're on the + or - strand!
+    # The workflow Jolene uses is smart enough to correctly pull upstream
+    # for both + and - strands (i.e., pulls left for +, pulls right for -)
+    # THEREFORE, for a gene with a start at 930 on the + strand, it pulls 830:930
+    # And for a gene with a start at 930 on the - strand, it pulls 930:1030 """
+
+    direction = extract_strand_direction(feature)
+
+    if direction == "+":
+        # BPROM starts counting from the LEFT boundary for + strand test sequences (as expected)
+        # Get -10 promoter position
+        minus10_pos = promoter_data["minus10_pos"]
+        minus10_pos_in_context_of_genome = test_cds_location_start_pos + minus10_pos
+        # Get -35 promoter position
+        minus35_pos = promoter_data["minus35_pos"]
+        minus35_pos_in_context_of_genome = test_cds_location_start_pos + minus35_pos
+
+        start = test_cds_location_start_pos + minus35_pos
+        end = test_cds_location_start_pos + promoter_pos
+
+        calculated_promoter_positions = [
+            start,
+            end,
+            minus10_pos_in_context_of_genome,
+            minus35_pos_in_context_of_genome,
+        ]
+        return calculated_promoter_positions
+
+    elif direction == "-":
+        # BPROM starts counting from the RIGHT boundary for - strand test sequences
+        # Get -10 promoter position
+        minus10_pos = promoter_data["minus10_pos"]
+        minus10_pos_in_context_of_genome = test_cds_location_end_pos - minus10_pos
+        # Get -35 promoter position
+        minus35_pos = promoter_data["minus35_pos"]
+        minus35_pos_in_context_of_genome = test_cds_location_end_pos - minus35_pos
+
+        # The start and end are reversed
+        end = test_cds_location_end_pos - minus35_pos
+        start = test_cds_location_end_pos - promoter_pos
+
+        calculated_promoter_positions = [
+            start,
+            end,
+            minus10_pos_in_context_of_genome,
+            minus35_pos_in_context_of_genome,
+        ]
+        return calculated_promoter_positions
+
+    else:
+        assert "Error: Strand data neither '+' nor '-'"
+
+
+def extract_tf_binding_elements():
+    """Extract predicted transcription factor binding elements"""
+    return
+
+
+def extract_data_for_all_features(features) -> List[List[Union[str, int]]]:
+    """Loops through cleaned bprom output extracting all data of interest and builds the
+    structure for loading into a dataframe"""
+    extracted_data = []
+    for feature in features:
+        # loop through features, a List[str] containing each feature [str] in the
+        # original bprom format as a single string, but cleaned of irrelevant data
+        calculated_promoter_positions = calculate_promoter_position(feature)
+        promoter_data = extract_promoter_data(feature)
+        promoter_data_converted = convert_extracted_promoter_data_to_ID_column_format(
+            promoter_data, calculated_promoter_positions
+        )
+
+        extracted_data.append(
+            [
+                extract_accession(feature),  # Seqid, col 1
+                "bprom",  # Source, col 2
+                "promoter",  # Type, col 3
+                calculated_promoter_positions[0],  # Start, col 4
+                calculated_promoter_positions[1],  # End, col 5
+                extract_LDF_score(feature),  # Score, col 6
+                extract_strand_direction(feature),  # Strand direction, col 7
+                ".",  # Phase, col 8
+                promoter_data_converted,  # Attributes, col 9
+            ]
+        )
+
+    return extracted_data
+
+
+def convert_to_dataframe(extracted_data) -> pd.DataFrame:
+    """Convert extracted and processed data to Pandas dataframe with gff3 column names"""
+
+    df = pd.DataFrame(
+        extracted_data,
+        columns=[
+            "seqid",
+            "source",
+            "type",
+            "start",
+            "end",
+            "score",
+            "strand",
+            "phase",
+            "attributes",
+        ],
+    )
+    return df
+
+
+def write_to_gff3(dataframe) -> None:
+    """Create a gff3 text file from the DataFrame by converting to a tab separated values (tsv) file"""
+    tsv = dataframe.to_csv(sep="\t", index=False, header=None)
+
+    # Gets the first element of the first column to use for
+    accession = dataframe.iloc[0][0]
+
+    year, month, day = date.today().year, date.today().month, date.today().day
+
+    # with open(f'{year}_{month}_{day}_bprom_as_gff3_{accession}.txt', 'w') as wf:
+    # Header so Galaxy can recognize as GFF3
+    print("##gff-version 3\n")
+    # for line in tsv:
+    print(tsv)
+
+    return
+
+
+def convert_bprom_output_to_gff3(bprom_file) -> None:
+    """Master function. Given a BPROM .txt file as output, extracts data and writes as a GFF3 file"""
+    bprom_file = read_bprom_file(bprom_file)
+    concatenated_bprom_file = concatenate_then_split(bprom_file)
+    working_file = remove_promoterless_features(concatenated_bprom_file)
+    extracted_data = extract_data_for_all_features(working_file)
+    gff3_dataframe = convert_to_dataframe(extracted_data)
+    # Create the gff3 text file
+    write_to_gff3(gff3_dataframe)
+
+    return
+
+
+if __name__ == "__main__":
+    ## Shows the DataFrame output in the terminal for testing/debugging
+    # bprom_file = read_bprom_file('BPROM_output.txt')
+    # concatenated_bprom_file: List[str] = concatenate_then_split(bprom_file)
+    # working_file = remove_promoterless_features(concatenated_bprom_file)
+    # print(convert_to_dataframe(extract_data_for_all_features(working_file)).to_string())
+
+    parser = argparse.ArgumentParser(
+        description="converts BPROM output to the gff3 file format"
+    )
+
+    parser.add_argument("-f", help="bprom file as .txt")
+    args = parser.parse_args()
+    # Actual function for converting the BPROM output to gff3
+    convert_bprom_output_to_gff3(args.f)
+
+    # Upload to cpt github in the directory Galaxy-Tools/tools/external/