Mercurial > repos > rplanel > sequence_splitter
diff sequence-splitter.py @ 0:3e33310a7082 draft
planemo upload for repository https://github.com/rplanel/galaxy-tools/tree/master/tools/sequence-splitter commit cba12d215a89e9685c7d1f55d067770d7ec0dea2
author | rplanel |
---|---|
date | Thu, 08 Aug 2019 11:18:30 -0400 |
parents | |
children | 7b509a1801e4 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sequence-splitter.py Thu Aug 08 11:18:30 2019 -0400 @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +import argparse +import logging +import os +from itertools import chain, islice, tee + +# BioPython +from Bio import SeqIO + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s : %(levelname)s : %(message)s" +) +logger = logging.getLogger() + + +def main(): + args = parse_arguments() + # extract the basename and the file extension + basename, file_extension = os.path.splitext(args.sequences) + # extract the filename (with no extension) + _, filename = os.path.split(basename) + chunk_size = args.chunk_size + # split the sequences in chunks + if chunk_size: + logger.info("%s = %s", "chunk size parameter", chunk_size) + sequences_record = gen_sequence_record(args.sequences, args.format) + chunks = gen_get_chunks_by_size(sequences_record, chunk_size) + else: + logger.info("%s = %s", "number of chunks parameter", args.nb_chunk) + chunks = gen_get_chunks(args.sequences, args.format, args.nb_chunk) + + # Write the chunks in numbered files. + write_chunks(chunks, args.output, filename, file_extension, args.format) + + +def gen_get_chunks(sequences_path, sequences_format, nb_chunk): + """[summary] + + Arguments: + sequences_path {[type]} -- [description] + sequences_format {[type]} -- [description] + nb_chunk {[type]} -- [description] + + Returns: + [type] -- [description] + """ + # First record to count the sequences + sequences_record_to_count = gen_sequence_record( + sequences_path, sequences_format) + # Get the number of sequences + nb_sequences = get_nb_sequences(sequences_record_to_count) + logger.info("%s = %i", "Number of sequences per chunk", nb_sequences) + # Second record to that will be splitted + sequences_to_split = gen_sequence_record(sequences_path, sequences_format) + + # Get the size of the chunks + chunk_size = int(nb_sequences / nb_chunk) if nb_sequences > nb_chunk else 1 + return gen_get_chunks_by_size(sequences_to_split, chunk_size) + + +def gen_get_chunks_by_size(iterable, size=10): + logger.info( + "%s = %i", + "chunk size got (could be different from parameter if more chunk asked \ + than sequences in multifasta)", + size + ) + iterator = iter(iterable) + for first in iterator: + yield chain([first], islice(iterator, size - 1)) + + +def gen_sequence_record(sequences_path, sequence_format): + return SeqIO.parse(sequences_path, sequence_format) + + +def get_nb_sequences(sequences): + return sum(1 for _ in sequences) + + +def write_chunks(iterable, dirname, filename, file_extension, sequence_format): + for idx, chunk in enumerate(iterable): + if not os.path.exists(dirname): + os.mkdir(dirname) + output_file = os.path.join( + dirname, filename + "-chunk-" + str(idx + 1) + file_extension + ) + with open(output_file, mode="w") as output_handle: + count_seq, seq_to_write = tee(chunk, 2) + logger.info( + "%s : number of seuquences = %i", output_file, len( + list(count_seq)) + ) + SeqIO.write(seq_to_write, output_handle, sequence_format) + + +def parse_arguments(): + parser = argparse.ArgumentParser(description="Split fasta/fastq files") + parser.add_argument( + "-s", "--sequences", type=str, help="File that contains the sequences" + ) + + parser.add_argument("-f", "--format", type=str, + help="File format (fastq, fasta)") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "-c", "--chunk-size", + type=int, + help="The number of sequences by chunks." + ) + group.add_argument("-n", "--nb-chunk", help="Number of chunks", type=int) + + parser.add_argument( + "-o", + "--output", + type=str, + default="./", + help="The output directory where the chunks will be saved", + ) + + return parser.parse_args() + + +if __name__ == "__main__": + logger.info("START") + main() + logger.info("FINISHED")