view split_file_to_collection.py @ 10:2dae863c8f42 draft default tip

planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/tools/text_processing/split_file_to_collection commit 5d21f3d6a3a84b4737a2091ae0d772471eb389dd
author bgruening
date Thu, 23 May 2024 15:03:47 +0000
parents baabc30154cd
children
line wrap: on
line source

#!/usr/bin/env python

import argparse
import math
import os
import random
import re

# configuration of the splitting for specific file types
# - regular expression matching the record separator ('' if not splitting by regex but by number of lines)
# - number of lines to split after (0 if not splitting by number of lines but regex)
# - a boolean indicating if the record separator is at the end of the record
#
# new file types can be added by appending to this dict,
# updating the parser, and adding a new type option in the Galaxy wrapper
FILETYPES = {
    "fasta": (r"^>", 0, False),
    "fastq": (r"", 4, False),
    "tabular": (r"", 1, False),
    "txt": (r"", 1, False),
    "mgf": (r"^BEGIN IONS", 0, False),
    "sdf": (r"\$\$\$\$", 0, True),
}


def main():
    ps = parser_cli()
    args = vars(ps.parse_args())

    # get args and validate
    in_file = args["in"]
    if not os.path.isfile(args["in"]):
        raise FileNotFoundError("Input file does not exist")

    out_dir = args["out_dir"]
    if not os.path.isdir(args["out_dir"]):
        raise FileNotFoundError("out_dir is not a directory")

    top = args["top"]
    if top < 0:
        raise ValueError("Number of header lines cannot be negative")

    ftype = args["ftype"]

    assert (
        ftype != "generic" or args["generic_re"] is not None
    ), "--generic_re needs to be given for generic input"

    if args["ftype"] == "tabular" and args["by"] == "col":
        args["match"] = replace_mapped_chars(args["match"])
        args["sub"] = replace_mapped_chars(args["sub"])
        split_by_column(args, in_file, out_dir, top)
    else:
        args["generic_re"] = replace_mapped_chars(args["generic_re"])
        split_by_record(args, in_file, out_dir, top, ftype)


def parser_cli():
    parser = argparse.ArgumentParser(
        description="split a file into multiple files. "
        + "Can split on the column of a tabular file, "
        + "with custom and useful names based on column value."
    )
    parser.add_argument("--in", "-i", required=True, help="The input file")
    parser.add_argument(
        "--out_dir",
        "-o",
        default=os.getcwd(),
        help="The output directory",
        required=True,
    )
    parser.add_argument(
        "--file_names",
        "-a",
        help="If not splitting by column, the base name of the new files",
    )
    parser.add_argument(
        "--file_ext",
        "-e",
        help="If not splitting by column,"
        + " the extension of the new files (without a period)",
    )
    parser.add_argument(
        "--ftype",
        "-f",
        help="The type of the file to split",
        required=True,
        choices=["mgf", "fastq", "fasta", "sdf", "tabular", "txt", "generic"],
    )
    parser.add_argument(
        "--by",
        "-b",
        help="Split by line or by column (tabular only)",
        default="row",
        choices=["col", "row"],
    )
    parser.add_argument(
        "--top",
        "-t",
        type=int,
        default=0,
        help="Number of header lines to carry over to new files.",
    )
    parser.add_argument(
        "--rand",
        "-r",
        help="Divide records randomly into new files",
        action="store_true",
    )
    parser.add_argument(
        "--seed",
        "-x",
        help="Provide a seed for the random number generator. "
        + 'If not provided and args["rand"]==True, then date is used',
        type=int,
    )
    group = parser.add_mutually_exclusive_group()
    group.add_argument(
        "--numnew",
        "-n",
        type=int,
        default=1,
        help="Number of output files desired. Not valid for splitting on a column. Not compatible with chunksize and will be ignored if both are set.",
    )
    group.add_argument(
        "--chunksize",
        "-k",
        type=int,
        default=0,
        help="Number of records by file. Not valid for splitting on a column",
    )
    parser.add_argument(
        "--batch",
        action="store_true",
        help="Distribute files to collection while maintaining order. Ignored if splitting on column.",
    )
    generic = parser.add_argument_group("Arguments controling generic splitting")
    group = generic.add_mutually_exclusive_group()
    group.add_argument(
        "--generic_re",
        "-g",
        default="",
        help="Regular expression indicating the start of a new record (only for generic)",
        required=False,
    )
    group.add_argument(
        "--generic_num",
        type=int,
        default=0,
        help="Length of records in number of lines (only for generic)",
        required=False,
    )
    generic.add_argument(
        "--split_after",
        "-p",
        action="store_true",
        help="Split between records after separator (default is before). "
        + "Only for generic splitting by regex - specific ftypes are always split in the default way",
    )
    bycol = parser.add_argument_group("If splitting on a column")
    bycol.add_argument(
        "--match",
        "-m",
        default="(.*)",
        help="The regular expression to match id column entries",
    )
    bycol.add_argument(
        "--sub",
        "-s",
        default=r"\1",
        help="The regular expression to substitute in for the matched pattern.",
    )
    bycol.add_argument(
        "--id_column",
        "-c",
        default="1",
        help="Column that is used to name output files. Indexed starting from 1.",
        type=int,
    )
    return parser


def replace_mapped_chars(pattern):
    """
    handles special escaped characters when coming from galaxy
    """
    mapped_chars = {"'": "__sq__", "\\": "__backslash__"}
    for key, value in mapped_chars.items():
        pattern = pattern.replace(value, key)
    return pattern


def split_by_record(args, in_file, out_dir, top, ftype):
    # get configuration (record separator, start at end) for given filetype
    sep, num, sep_at_end = FILETYPES.get(
        ftype, (args["generic_re"], args["generic_num"], args["split_after"])
    )
    sep = re.compile(sep)

    chunksize = args["chunksize"]
    numnew = args["numnew"]

    # random division
    rand = args["rand"]
    seed = args["seed"]
    if seed:
        random.seed(seed)
    else:
        random.seed()

    # batched division (maintains order)
    batch = args["batch"]

    # determine
    # - the number of records that should be stored per file
    #   (done always, even if used only for batch mode)
    # - if the separator is a the start / end of the record
    n_per_file = math.inf
    if (
        chunksize != 0 or batch
    ):  # needs to be calculated if either batch or chunksize are selected
        with open(in_file) as f:
            # read header lines
            for i in range(top):
                f.readline()
            n_records = 0
            last_line_matched = False
            for line in f:
                if (num == 0 and re.match(sep, line) is not None) or (
                    num > 0 and n_records % num == 0
                ):
                    n_records += 1
                    last_line_matched = True
                else:
                    last_line_matched = False
            if sep_at_end and not last_line_matched:
                n_records += 1

        # if there are fewer records than desired files
        numnew = min(numnew, n_records)
        # approx. number of records per file
        if chunksize == 0:  # i.e. no chunking
            n_per_file = n_records // numnew
        else:
            numnew = max(n_records // chunksize, 1)  # should not be less than 1
            n_per_file = chunksize

    # make new files
    # strip extension of old file and add number
    custom_new_file_name = args["file_names"]
    custom_new_file_ext = "." + args["file_ext"]
    if custom_new_file_name is None:
        new_file_base = os.path.splitext(os.path.basename(in_file))
    else:
        new_file_base = [custom_new_file_name, custom_new_file_ext]

    newfile_names = [
        os.path.join(out_dir, "%s_%06d%s" % (new_file_base[0], count, new_file_base[1]))
        for count in range(0, numnew)
    ]
    # bunch o' counters
    # index to list of new files
    if rand:
        new_file_counter = int(math.floor(random.random() * numnew))
    else:
        new_file_counter = 0
    new_file = open(newfile_names[new_file_counter], "a")
    # to contain header specified by top
    header = ""
    # keep track of the files that have been opened so far
    fresh_files = set(range(numnew))

    # keep track in loop of number of records in each file
    # only used in batch
    records_in_file = 0

    # open file
    with open(in_file, "r") as f:
        # read header
        for i in range(top):
            header += f.readline()

        record = ""
        for line_no, line in enumerate(f):
            # check if beginning of line is record sep
            # if beginning of line is record sep, either start record or finish one
            if (num == 0 and re.match(sep, line) is not None) or (
                num > 0 and line_no % num == 0
            ):
                # this only happens first time through
                if record == "":
                    record += line
                else:
                    # if is in fresh_files, write header and drop from freshFiles
                    if new_file_counter in fresh_files:
                        new_file.write(header)
                        fresh_files.remove(new_file_counter)

                    if sep_at_end:
                        record += line
                    # write record to file
                    new_file.write(record)
                    if not sep_at_end:
                        record = line
                    else:
                        record = ""

                    # change destination file
                    if rand:
                        new_file_counter = int(math.floor(random.random() * numnew))
                        new_file.close()
                        new_file = open(newfile_names[new_file_counter], "a")
                    elif batch:
                        # number of records read per file
                        records_in_file += 1
                        # have we reached the max for each file?
                        # if so, switch file
                        if records_in_file >= n_per_file:
                            new_file_counter = (new_file_counter + 1) % numnew
                            records_in_file = 0  # reset to 0
                            new_file.close()
                            new_file = open(newfile_names[new_file_counter], "a")
                    else:
                        new_file_counter = (new_file_counter + 1) % numnew
                        new_file.close()
                        new_file = open(newfile_names[new_file_counter], "a")
            # if beginning of line is not record sep, we must be inside a record
            # so just append
            else:
                record += line
        # after loop, write final record to file
        if new_file_counter in fresh_files:
            new_file.write(header)
        new_file.write(record)
        new_file.close()


def split_by_column(args, in_file, out_dir, top):

    # shift to 0-based indexing
    id_col = int(args["id_column"]) - 1

    try:
        match = re.compile(args["match"])
    except re.error:
        print("ERROR: Match (-m) supplied is not valid regex.")
        raise

    sub = args["sub"]

    # set of file names
    files = set()

    # keep track of how many lines have been read
    n_read = 0
    header = ""
    with open(in_file) as file:
        for line in file:
            # if still in top, save to header
            n_read += 1
            if n_read <= top:
                header += line
                continue
            # split into columns, on tab
            fields = re.split(r"\t", line.strip("\n"))

            # get id column value
            id_col_val = fields[id_col]

            # use regex to get new file name
            out_file_name = re.sub(match, sub, id_col_val)
            out_file_path = os.path.join(out_dir, out_file_name)

            # write
            with open(out_file_path, "a") as current_new_file:
                if out_file_name not in files:
                    current_new_file.write(header)
                    files.add(out_file_name)
                current_new_file.write(line)


if __name__ == "__main__":
    main()