comparison sequence-splitter.py @ 0:3e33310a7082 draft

planemo upload for repository https://github.com/rplanel/galaxy-tools/tree/master/tools/sequence-splitter commit cba12d215a89e9685c7d1f55d067770d7ec0dea2
author rplanel
date Thu, 08 Aug 2019 11:18:30 -0400
parents
children 7b509a1801e4
comparison
equal deleted inserted replaced
-1:000000000000 0:3e33310a7082
1 #!/usr/bin/env python3
2 # coding: utf-8
3
4 import argparse
5 import logging
6 import os
7 from itertools import chain, islice, tee
8
9 # BioPython
10 from Bio import SeqIO
11
12 logging.basicConfig(
13 level=logging.INFO, format="%(asctime)s : %(levelname)s : %(message)s"
14 )
15 logger = logging.getLogger()
16
17
18 def main():
19 args = parse_arguments()
20 # extract the basename and the file extension
21 basename, file_extension = os.path.splitext(args.sequences)
22 # extract the filename (with no extension)
23 _, filename = os.path.split(basename)
24 chunk_size = args.chunk_size
25 # split the sequences in chunks
26 if chunk_size:
27 logger.info("%s = %s", "chunk size parameter", chunk_size)
28 sequences_record = gen_sequence_record(args.sequences, args.format)
29 chunks = gen_get_chunks_by_size(sequences_record, chunk_size)
30 else:
31 logger.info("%s = %s", "number of chunks parameter", args.nb_chunk)
32 chunks = gen_get_chunks(args.sequences, args.format, args.nb_chunk)
33
34 # Write the chunks in numbered files.
35 write_chunks(chunks, args.output, filename, file_extension, args.format)
36
37
38 def gen_get_chunks(sequences_path, sequences_format, nb_chunk):
39 """[summary]
40
41 Arguments:
42 sequences_path {[type]} -- [description]
43 sequences_format {[type]} -- [description]
44 nb_chunk {[type]} -- [description]
45
46 Returns:
47 [type] -- [description]
48 """
49 # First record to count the sequences
50 sequences_record_to_count = gen_sequence_record(
51 sequences_path, sequences_format)
52 # Get the number of sequences
53 nb_sequences = get_nb_sequences(sequences_record_to_count)
54 logger.info("%s = %i", "Number of sequences per chunk", nb_sequences)
55 # Second record to that will be splitted
56 sequences_to_split = gen_sequence_record(sequences_path, sequences_format)
57
58 # Get the size of the chunks
59 chunk_size = int(nb_sequences / nb_chunk) if nb_sequences > nb_chunk else 1
60 return gen_get_chunks_by_size(sequences_to_split, chunk_size)
61
62
63 def gen_get_chunks_by_size(iterable, size=10):
64 logger.info(
65 "%s = %i",
66 "chunk size got (could be different from parameter if more chunk asked \
67 than sequences in multifasta)",
68 size
69 )
70 iterator = iter(iterable)
71 for first in iterator:
72 yield chain([first], islice(iterator, size - 1))
73
74
75 def gen_sequence_record(sequences_path, sequence_format):
76 return SeqIO.parse(sequences_path, sequence_format)
77
78
79 def get_nb_sequences(sequences):
80 return sum(1 for _ in sequences)
81
82
83 def write_chunks(iterable, dirname, filename, file_extension, sequence_format):
84 for idx, chunk in enumerate(iterable):
85 if not os.path.exists(dirname):
86 os.mkdir(dirname)
87 output_file = os.path.join(
88 dirname, filename + "-chunk-" + str(idx + 1) + file_extension
89 )
90 with open(output_file, mode="w") as output_handle:
91 count_seq, seq_to_write = tee(chunk, 2)
92 logger.info(
93 "%s : number of seuquences = %i", output_file, len(
94 list(count_seq))
95 )
96 SeqIO.write(seq_to_write, output_handle, sequence_format)
97
98
99 def parse_arguments():
100 parser = argparse.ArgumentParser(description="Split fasta/fastq files")
101 parser.add_argument(
102 "-s", "--sequences", type=str, help="File that contains the sequences"
103 )
104
105 parser.add_argument("-f", "--format", type=str,
106 help="File format (fastq, fasta)")
107 group = parser.add_mutually_exclusive_group(required=True)
108 group.add_argument(
109 "-c", "--chunk-size",
110 type=int,
111 help="The number of sequences by chunks."
112 )
113 group.add_argument("-n", "--nb-chunk", help="Number of chunks", type=int)
114
115 parser.add_argument(
116 "-o",
117 "--output",
118 type=str,
119 default="./",
120 help="The output directory where the chunks will be saved",
121 )
122
123 return parser.parse_args()
124
125
126 if __name__ == "__main__":
127 logger.info("START")
128 main()
129 logger.info("FINISHED")