Mercurial > repos > rplanel > sequence_splitter
view sequence-splitter.py @ 0:3e33310a7082 draft
planemo upload for repository https://github.com/rplanel/galaxy-tools/tree/master/tools/sequence-splitter commit cba12d215a89e9685c7d1f55d067770d7ec0dea2
author | rplanel |
---|---|
date | Thu, 08 Aug 2019 11:18:30 -0400 |
parents | |
children | 7b509a1801e4 |
line wrap: on
line source
#!/usr/bin/env python3 # coding: utf-8 import argparse import logging import os from itertools import chain, islice, tee # BioPython from Bio import SeqIO logging.basicConfig( level=logging.INFO, format="%(asctime)s : %(levelname)s : %(message)s" ) logger = logging.getLogger() def main(): args = parse_arguments() # extract the basename and the file extension basename, file_extension = os.path.splitext(args.sequences) # extract the filename (with no extension) _, filename = os.path.split(basename) chunk_size = args.chunk_size # split the sequences in chunks if chunk_size: logger.info("%s = %s", "chunk size parameter", chunk_size) sequences_record = gen_sequence_record(args.sequences, args.format) chunks = gen_get_chunks_by_size(sequences_record, chunk_size) else: logger.info("%s = %s", "number of chunks parameter", args.nb_chunk) chunks = gen_get_chunks(args.sequences, args.format, args.nb_chunk) # Write the chunks in numbered files. write_chunks(chunks, args.output, filename, file_extension, args.format) def gen_get_chunks(sequences_path, sequences_format, nb_chunk): """[summary] Arguments: sequences_path {[type]} -- [description] sequences_format {[type]} -- [description] nb_chunk {[type]} -- [description] Returns: [type] -- [description] """ # First record to count the sequences sequences_record_to_count = gen_sequence_record( sequences_path, sequences_format) # Get the number of sequences nb_sequences = get_nb_sequences(sequences_record_to_count) logger.info("%s = %i", "Number of sequences per chunk", nb_sequences) # Second record to that will be splitted sequences_to_split = gen_sequence_record(sequences_path, sequences_format) # Get the size of the chunks chunk_size = int(nb_sequences / nb_chunk) if nb_sequences > nb_chunk else 1 return gen_get_chunks_by_size(sequences_to_split, chunk_size) def gen_get_chunks_by_size(iterable, size=10): logger.info( "%s = %i", "chunk size got (could be different from parameter if more chunk asked \ than sequences in multifasta)", size ) iterator = iter(iterable) for first in iterator: yield chain([first], islice(iterator, size - 1)) def gen_sequence_record(sequences_path, sequence_format): return SeqIO.parse(sequences_path, sequence_format) def get_nb_sequences(sequences): return sum(1 for _ in sequences) def write_chunks(iterable, dirname, filename, file_extension, sequence_format): for idx, chunk in enumerate(iterable): if not os.path.exists(dirname): os.mkdir(dirname) output_file = os.path.join( dirname, filename + "-chunk-" + str(idx + 1) + file_extension ) with open(output_file, mode="w") as output_handle: count_seq, seq_to_write = tee(chunk, 2) logger.info( "%s : number of seuquences = %i", output_file, len( list(count_seq)) ) SeqIO.write(seq_to_write, output_handle, sequence_format) def parse_arguments(): parser = argparse.ArgumentParser(description="Split fasta/fastq files") parser.add_argument( "-s", "--sequences", type=str, help="File that contains the sequences" ) parser.add_argument("-f", "--format", type=str, help="File format (fastq, fasta)") group = parser.add_mutually_exclusive_group(required=True) group.add_argument( "-c", "--chunk-size", type=int, help="The number of sequences by chunks." ) group.add_argument("-n", "--nb-chunk", help="Number of chunks", type=int) parser.add_argument( "-o", "--output", type=str, default="./", help="The output directory where the chunks will be saved", ) return parser.parse_args() if __name__ == "__main__": logger.info("START") main() logger.info("FINISHED")