Mercurial > repos > rplanel > sequence_splitter
comparison sequence-splitter.py @ 0:3e33310a7082 draft
planemo upload for repository https://github.com/rplanel/galaxy-tools/tree/master/tools/sequence-splitter commit cba12d215a89e9685c7d1f55d067770d7ec0dea2
| author | rplanel | 
|---|---|
| date | Thu, 08 Aug 2019 11:18:30 -0400 | 
| parents | |
| children | 7b509a1801e4 | 
   comparison
  equal
  deleted
  inserted
  replaced
| -1:000000000000 | 0:3e33310a7082 | 
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 # coding: utf-8 | |
| 3 | |
| 4 import argparse | |
| 5 import logging | |
| 6 import os | |
| 7 from itertools import chain, islice, tee | |
| 8 | |
| 9 # BioPython | |
| 10 from Bio import SeqIO | |
| 11 | |
| 12 logging.basicConfig( | |
| 13 level=logging.INFO, format="%(asctime)s : %(levelname)s : %(message)s" | |
| 14 ) | |
| 15 logger = logging.getLogger() | |
| 16 | |
| 17 | |
| 18 def main(): | |
| 19 args = parse_arguments() | |
| 20 # extract the basename and the file extension | |
| 21 basename, file_extension = os.path.splitext(args.sequences) | |
| 22 # extract the filename (with no extension) | |
| 23 _, filename = os.path.split(basename) | |
| 24 chunk_size = args.chunk_size | |
| 25 # split the sequences in chunks | |
| 26 if chunk_size: | |
| 27 logger.info("%s = %s", "chunk size parameter", chunk_size) | |
| 28 sequences_record = gen_sequence_record(args.sequences, args.format) | |
| 29 chunks = gen_get_chunks_by_size(sequences_record, chunk_size) | |
| 30 else: | |
| 31 logger.info("%s = %s", "number of chunks parameter", args.nb_chunk) | |
| 32 chunks = gen_get_chunks(args.sequences, args.format, args.nb_chunk) | |
| 33 | |
| 34 # Write the chunks in numbered files. | |
| 35 write_chunks(chunks, args.output, filename, file_extension, args.format) | |
| 36 | |
| 37 | |
| 38 def gen_get_chunks(sequences_path, sequences_format, nb_chunk): | |
| 39 """[summary] | |
| 40 | |
| 41 Arguments: | |
| 42 sequences_path {[type]} -- [description] | |
| 43 sequences_format {[type]} -- [description] | |
| 44 nb_chunk {[type]} -- [description] | |
| 45 | |
| 46 Returns: | |
| 47 [type] -- [description] | |
| 48 """ | |
| 49 # First record to count the sequences | |
| 50 sequences_record_to_count = gen_sequence_record( | |
| 51 sequences_path, sequences_format) | |
| 52 # Get the number of sequences | |
| 53 nb_sequences = get_nb_sequences(sequences_record_to_count) | |
| 54 logger.info("%s = %i", "Number of sequences per chunk", nb_sequences) | |
| 55 # Second record to that will be splitted | |
| 56 sequences_to_split = gen_sequence_record(sequences_path, sequences_format) | |
| 57 | |
| 58 # Get the size of the chunks | |
| 59 chunk_size = int(nb_sequences / nb_chunk) if nb_sequences > nb_chunk else 1 | |
| 60 return gen_get_chunks_by_size(sequences_to_split, chunk_size) | |
| 61 | |
| 62 | |
| 63 def gen_get_chunks_by_size(iterable, size=10): | |
| 64 logger.info( | |
| 65 "%s = %i", | |
| 66 "chunk size got (could be different from parameter if more chunk asked \ | |
| 67 than sequences in multifasta)", | |
| 68 size | |
| 69 ) | |
| 70 iterator = iter(iterable) | |
| 71 for first in iterator: | |
| 72 yield chain([first], islice(iterator, size - 1)) | |
| 73 | |
| 74 | |
| 75 def gen_sequence_record(sequences_path, sequence_format): | |
| 76 return SeqIO.parse(sequences_path, sequence_format) | |
| 77 | |
| 78 | |
| 79 def get_nb_sequences(sequences): | |
| 80 return sum(1 for _ in sequences) | |
| 81 | |
| 82 | |
| 83 def write_chunks(iterable, dirname, filename, file_extension, sequence_format): | |
| 84 for idx, chunk in enumerate(iterable): | |
| 85 if not os.path.exists(dirname): | |
| 86 os.mkdir(dirname) | |
| 87 output_file = os.path.join( | |
| 88 dirname, filename + "-chunk-" + str(idx + 1) + file_extension | |
| 89 ) | |
| 90 with open(output_file, mode="w") as output_handle: | |
| 91 count_seq, seq_to_write = tee(chunk, 2) | |
| 92 logger.info( | |
| 93 "%s : number of seuquences = %i", output_file, len( | |
| 94 list(count_seq)) | |
| 95 ) | |
| 96 SeqIO.write(seq_to_write, output_handle, sequence_format) | |
| 97 | |
| 98 | |
| 99 def parse_arguments(): | |
| 100 parser = argparse.ArgumentParser(description="Split fasta/fastq files") | |
| 101 parser.add_argument( | |
| 102 "-s", "--sequences", type=str, help="File that contains the sequences" | |
| 103 ) | |
| 104 | |
| 105 parser.add_argument("-f", "--format", type=str, | |
| 106 help="File format (fastq, fasta)") | |
| 107 group = parser.add_mutually_exclusive_group(required=True) | |
| 108 group.add_argument( | |
| 109 "-c", "--chunk-size", | |
| 110 type=int, | |
| 111 help="The number of sequences by chunks." | |
| 112 ) | |
| 113 group.add_argument("-n", "--nb-chunk", help="Number of chunks", type=int) | |
| 114 | |
| 115 parser.add_argument( | |
| 116 "-o", | |
| 117 "--output", | |
| 118 type=str, | |
| 119 default="./", | |
| 120 help="The output directory where the chunks will be saved", | |
| 121 ) | |
| 122 | |
| 123 return parser.parse_args() | |
| 124 | |
| 125 | |
| 126 if __name__ == "__main__": | |
| 127 logger.info("START") | |
| 128 main() | |
| 129 logger.info("FINISHED") | 
