view filter_kw_val.py @ 8:98cb671a92eb draft default tip

"planemo upload commit 80e3e50ca52b5b232f91e6dd6850da606d9c4c5f-dirty"
author proteore
date Mon, 10 May 2021 12:27:04 +0000
parents b4641c0f8a82
children
line wrap: on
line source

import argparse
import csv
import re
import sys


def options():
    """
    Parse options:
        -i, --input     Input filename and boolean value if the file contains header ["filename,true/false"]  # noqa 501
        --kw            Keyword to be filtered, the column number where this filter applies,
                        boolean value if the keyword should be filtered in exact ["keyword,ncol,true/false"].
                        This option can be repeated: --kw "kw1,c1,true" --kw "kw2,c1,false" --kw "kw3,c2,true"
        --kwfile        A file that contains keywords to be filter, the column where this filter applies and
                        boolean value if the keyword should be filtered in exact ["filename,ncol,true/false"]
        --value         The value to be filtered, the column number where this filter applies and the
                        operation symbol ["value,ncol,=/>/>=/</<=/!="]
        --values_range  range of values to be keep, example : --values_range 5 20 c1 true
        --operation     'keep' or 'discard' lines concerned by filter(s)
        --operator      The operator used to filter with several keywords/values : AND or OR
        --o --output    The output filename
        --discarded_lines    The file contains removed lines
        -s --sort_col   Used column to sort the file, ",true" for reverse sorting, ",false" otherwise example : c1,false
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("-i", "--input", help="Input file", required=True)
    parser.add_argument("--kw", nargs="+", action="append", help="")
    parser.add_argument("--kw_file", nargs="+", action="append", help="")
    parser.add_argument("--value", nargs="+", action="append", help="")
    parser.add_argument("--values_range", nargs="+", action="append", help="")
    parser.add_argument("--operation", default="keep", type=str, choices=['keep', 'discard'], help='')  # noqa 501
    parser.add_argument("--operator", default="OR", type=str, choices=['AND', 'OR'], help='')  # noqa 501
    parser.add_argument("-o", "--output", default="output.txt")
    parser.add_argument("--discarded_lines", default="filtered_output.txt")
    parser.add_argument("-s", "--sort_col", help="")

    args = parser.parse_args()

    filters(args)


def str_to_bool(v):
    if v.lower() in ('yes', 'true', 't', 'y', '1'):
        return True
    elif v.lower() in ('no', 'false', 'f', 'n', '0'):
        return False
    else:
        raise argparse.ArgumentTypeError('Boolean value expected.')


def proper_ncol(ncol, file):
    if ncol not in range(len(file[0])):
        print("Column "+str(ncol+1)+" not found in input file")
        # traceback.print_exc(file=sys.stdout)
        sys.exit(1)

# Check if a variable is a float or an integer


def is_number(number_format, n):
    float_format = re.compile(r"^[-]?[0-9][0-9]*.?[0-9]+$")
    int_format = re.compile(r"^[-]?[0-9][0-9]*$")
    scientific_number = re.compile(r"^[-+]?[\d]+\.?[\d]*[Ee](?:[-+]?[\d]+)?$")
    test = ""
    if number_format == "int":
        test = re.match(int_format, n)
    elif number_format == "float":
        test = re.match(float_format, n)
        if test is None:
            test = re.match(scientific_number, n)

    if test:
        return True
    else:
        return False

# Filter the document


def filters(args):
    filename = args.input.split(",")[0]
    header = str_to_bool(args.input.split(",")[1])
    csv_file = blank_to_NA(read_file(filename))
    results_dict = {}
    operator_dict = {"Equal": "=", "Higher": ">", "Equal-or-higher": ">=", "Lower": "<", "Equal-or-lower": "<=", "Different": "!="}  # noqa 501

    if args.kw:
        keywords = args.kw
        for k in keywords:
            results_dict = filter_keyword(csv_file,
                                          header,
                                          results_dict,
                                          k[0],
                                          k[1],
                                          k[2])

    if args.kw_file:
        key_files = args.kw_file
        for kf in key_files:
            header = str_to_bool(kf[1])
            ncol = column_from_txt(kf[2], csv_file)
            keywords = read_keywords_file(kf[0], header, ncol)
            results_dict = filter_keyword(csv_file, header, results_dict,
                                          keywords, kf[3], kf[4])

    if args.value:
        for v in args.value:
            v[0] = v[0].replace(",", ".")
            v[2] = operator_dict[v[2]]
            if is_number("float", v[0]):
                csv_file = comma_number_to_float(csv_file,
                                                 column_from_txt(
                                                     v[1], csv_file), header)
                results_dict = filter_value(csv_file, header,
                                            results_dict, v[0], v[1], v[2])
            else:
                raise ValueError("Please enter a number in filter by value")

    if args.values_range:
        for vr in args.values_range:
            vr[:2] = [value.replace(",", ".") for value in vr[:2]]
            csv_file = comma_number_to_float(csv_file,
                                             column_from_txt(
                                                 vr[2], csv_file), header)
            if (is_number("float", vr[0]) or is_number("int", vr[0])) and (is_number("float", vr[1]) or is_number("int", vr[1])):  # noqa 501
                results_dict = filter_values_range(csv_file,
                                                   header, results_dict,
                                                   vr[0], vr[1], vr[2], vr[3])

    remaining_lines = []
    filtered_lines = []

    if header is True:
        remaining_lines.append(csv_file[0])
        filtered_lines.append(csv_file[0])

    if results_dict == {}:   # no filter used
        remaining_lines.extend(csv_file[1:])
    else:
        for id_line, line in enumerate(csv_file):
            if id_line in results_dict:   # skip header and empty lines
                if args.operator == 'OR':
                    if any(results_dict[id_line]):
                        filtered_lines.append(line)
                    else:
                        remaining_lines.append(line)

                elif args.operator == "AND":
                    if all(results_dict[id_line]):
                        filtered_lines.append(line)
                    else:
                        remaining_lines.append(line)

    # sort of results by column
    if args.sort_col:
        sort_col = args.sort_col.split(",")[0]
        sort_col = column_from_txt(sort_col, csv_file)
        reverse = str_to_bool(args.sort_col.split(",")[1])
        remaining_lines = sort_by_column(remaining_lines, sort_col,
                                         reverse, header)
        filtered_lines = sort_by_column(filtered_lines, sort_col,
                                        reverse, header)

    # swap lists of lines (files) if 'keep' option selected
    if args.operation == "keep":
        swap = remaining_lines, filtered_lines
        remaining_lines = swap[1]
        filtered_lines = swap[0]

    # Write results to output
    with open(args.output, "w") as output:
        writer = csv.writer(output, delimiter="\t")
        writer.writerows(remaining_lines)

    # Write filtered lines to filtered_output
    with open(args.discarded_lines, "w") as filtered_output:
        writer = csv.writer(filtered_output, delimiter="\t")
        writer.writerows(filtered_lines)

# function to sort the csv_file by value in a specific column


def sort_by_column(tab, sort_col, reverse, header):

    if len(tab) > 1:  # if there's more than just a header or 1 row
        if header:
            head = tab[0]
            tab = tab[1:]

        # list of empty cells in the column to sort
        unsortable_lines = [i for i, line in enumerate(tab) if (line[sort_col]=='' or line[sort_col] == 'NA')]  # noqa 501
        unsorted_tab = [tab[i] for i in unsortable_lines]
        tab = [line for i, line in enumerate(tab) if i not in unsortable_lines]

        if only_number(tab, sort_col) and any_float(tab, sort_col):
            tab = comma_number_to_float(tab, sort_col, False)
            tab = sorted(tab, key=lambda row: float(row[sort_col]),
                         reverse=reverse)
        elif only_number(tab, sort_col):
            tab = sorted(tab, key=lambda row: int(row[sort_col]),
                         reverse=reverse)
        else:
            tab = sorted(tab, key=lambda row: row[sort_col], reverse=reverse)

        tab.extend(unsorted_tab)
        if header is True:
            tab = [head]+tab

    return tab


# replace all blank cells to NA


def blank_to_NA(csv_file):

    tmp = []
    for line in csv_file:
        line = ["NA" if cell=="" or cell==" " or cell=="NaN" else cell for cell in line ]  # noqa 501
        tmp.append(line)

    return tmp

# turn into float a column


def comma_number_to_float(csv_file, ncol, header):
    if header:
        tmp = [csv_file[0]]
        csv_file = csv_file[1:]
    else:
        tmp = []

    for line in csv_file:
        line[ncol] = line[ncol].replace(",", ".")
        tmp.append(line)

    return (tmp)

# return True is there is at least one float in the column


def any_float(tab, col):

    for line in tab:
        if is_number("float", line[col].replace(",", ".")):
            return True

    return False


def only_number(tab, col):
    for line in tab:
        if not (is_number("float", line[col].replace(",", ".")) or is_number("int", line[col].replace(",", "."))):  # noqa 501
            return False
    return True

# Read the keywords file to extract the list of keywords


def read_keywords_file(filename, header, ncol):
    with open(filename, "r") as csv_file:
        lines = csv.reader(csv_file, delimiter='\t')
        lines = blank_to_NA(lines)
        if (len(lines[0])) > 1:
            keywords = [line[ncol] for line in lines]
        else:
            keywords = ["".join(key) for key in lines]
    if header:
        keywords = keywords[1:]
    keywords = list(set(keywords))

    return keywords

# Read input file


def read_file(filename):
    with open(filename, "r") as f:
        reader = csv.reader(f, delimiter="\t")
        tab = list(reader)

    # Remove empty lines (contain only space or new line or "")
    # [tab.remove(blank) for blank in tab if blank.isspace() or blank == ""]
    tab = [line for line in tab if len("".join(line).replace(" ", "")) != 0]  # noqa 501

    return tab

# seek for keywords in rows of csvfile, return a dictionary of boolean
# (true if keyword found, false otherwise)


def filter_keyword(csv_file, header, results_dict, keywords, ncol, match):
    match = str_to_bool(match)
    ncol = column_from_txt(ncol, csv_file)
    if type(keywords) != list:
        keywords = keywords.upper().split()  # Split list of filter keyword

    for id_line, line in enumerate(csv_file):
        if header is True and id_line == 0:
            continue
        keyword_inline = line[ncol].replace('"', "").split(";")

        # Perfect match or not
        if match is True:
            found_in_line = any(pid.upper() in keywords for pid in keyword_inline)  # noqa 501
        else:
            found_in_line = any(ft in pid.upper() for pid in keyword_inline for ft in keywords)  # noqa 501

        # if the keyword is found in line
        if id_line in results_dict:
            results_dict[id_line].append(found_in_line)
        else:
            results_dict[id_line] = [found_in_line]

    return results_dict

# filter ba determined value in rows of csvfile, return a dictionary
# of boolean (true if value filtered, false otherwise)


def filter_value(csv_file, header, results_dict, filter_value, ncol, opt):

    filter_value = float(filter_value)
    ncol = column_from_txt(ncol, csv_file)
    nb_string = 0

    for id_line, line in enumerate(csv_file):
        if header is True and id_line == 0:
            continue
        value = line[ncol].replace('"', "").replace(",", ".").strip()
        if value.replace(".", "", 1).isdigit():
            to_filter = value_compare(value, filter_value, opt)

            # adding the result to the dictionary
            if id_line in results_dict:
                results_dict[id_line].append(to_filter)
            else:
                results_dict[id_line] = [to_filter]

        # impossible to treat (ex : "" instead of a number),
        # we keep the line by default
        else:
            nb_string += 1
            if id_line in results_dict:
                results_dict[id_line].append(False)
            else:
                results_dict[id_line] = [False]

    # number of lines in the csv file
    if header:
        nb_lines = len(csv_file) - 1
    else:
        nb_lines = len(csv_file)

    # if there's no numeric value in the column
    if nb_string == nb_lines:
        print('No numeric values found in the column '+str(ncol+1))
        print('The filter "'+str(opt)+' '+str(filter_value)+'" can not be applied on the column '+str(ncol+1))  # noqa 501

    return results_dict

# filter ba determined value in rows of csvfile, return a dictionary
# of boolean (true if value filtered, false otherwise)


def filter_values_range(csv_file, header, results_dict, bottom_value, top_value, ncol, inclusive):  # noqa 501
    inclusive = str_to_bool(inclusive)
    bottom_value = float(bottom_value)
    top_value = float(top_value)
    ncol = column_from_txt(ncol, csv_file)
    nb_string = 0

    for id_line, line in enumerate(csv_file):
        if header is True and id_line == 0:
            continue
        value = line[ncol].replace('"', "").replace(",", ".").strip()
        if value.replace(".", "", 1).isdigit():
            value = float(value)
            if inclusive is True:
                in_range = not (bottom_value <= value <= top_value)
            else:
                in_range = not (bottom_value < value < top_value)

            # adding the result to the dictionary
            if id_line in results_dict:
                results_dict[id_line].append(in_range)
            else:
                results_dict[id_line] = [in_range]

        # impossible to treat (ex : "" instead of a number),
        # we keep the line by default
        else:
            nb_string += 1
            if id_line in results_dict:
                results_dict[id_line].append(False)
            else:
                results_dict[id_line] = [False]

    # number of lines in the csv file
    if header:
        nb_lines = len(csv_file) - 1
    else:
        nb_lines = len(csv_file)

    # if there's no numeric value in the column
    if nb_string == nb_lines:
        print('No numeric values found in the column '+str(ncol+1))
        if inclusive:
            print ('The filter "'+str(bottom_value)+' <= x <= '+str(top_value)+'" can not be applied on the column '+str(ncol+1))  # noqa 501
        else:
            print ('The filter "'+str(bottom_value)+' < x < '+str(top_value)+'" can not be applied on the column '+str(ncol+1))  # noqa 501

    return results_dict


def column_from_txt(ncol, file):
    if is_number("int", ncol.replace("c", "")):
        ncol = int(ncol.replace("c", "")) - 1
    else:
        raise ValueError("Please specify the column where "
                         "you would like to apply the filter "
                         "with valid format")

    proper_ncol(ncol, file)

    return ncol

# return True if value is in the determined values, false otherwise


def value_compare(value, filter_value, opt):
    test_value = False

    if opt == "<":
        if float(value) < filter_value:
            test_value = True
    elif opt == "<=":
        if float(value) <= filter_value:
            test_value = True
    elif opt == ">":
        if float(value) > filter_value:
            test_value = True
    elif opt == ">=":
        if float(value) >= filter_value:
            test_value = True
    elif opt == "=":
        if float(value) == filter_value:
            test_value = True
    elif opt == "!=":
        if float(value) != filter_value:
            test_value = True

    return test_value


if __name__ == "__main__":
    options()