Mercurial > repos > greg > vsnp_determine_ref_from_data
comparison vsnp_determine_ref_from_data.py @ 4:36bdf8b439ed draft
Uploaded
author | greg |
---|---|
date | Sun, 03 Jan 2021 16:13:22 +0000 |
parents | ebc08e5ce646 |
children |
comparison
equal
deleted
inserted
replaced
3:6116deacb2c7 | 4:36bdf8b439ed |
---|---|
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 | 2 |
3 import argparse | 3 import argparse |
4 import gzip | 4 import gzip |
5 import multiprocessing | |
6 import os | 5 import os |
7 import queue | 6 from collections import OrderedDict |
7 | |
8 import yaml | 8 import yaml |
9 from Bio.SeqIO.QualityIO import FastqGeneralIterator | 9 from Bio.SeqIO.QualityIO import FastqGeneralIterator |
10 from collections import OrderedDict | 10 |
11 | |
12 INPUT_READS_DIR = 'input_reads' | |
13 OUTPUT_DBKEY_DIR = 'output_dbkey' | 11 OUTPUT_DBKEY_DIR = 'output_dbkey' |
14 OUTPUT_METRICS_DIR = 'output_metrics' | 12 OUTPUT_METRICS_DIR = 'output_metrics' |
15 | 13 |
16 | 14 |
17 def get_base_file_name(file_path): | 15 def get_sample_name(file_path): |
18 base_file_name = os.path.basename(file_path) | 16 base_file_name = os.path.basename(file_path) |
19 if base_file_name.find(".") > 0: | 17 if base_file_name.find(".") > 0: |
20 # Eliminate the extension. | 18 # Eliminate the extension. |
21 return os.path.splitext(base_file_name)[0] | 19 return os.path.splitext(base_file_name)[0] |
22 elif base_file_name.find("_") > 0: | |
23 # The dot extension was likely changed to | |
24 # the " character. | |
25 items = base_file_name.split("_") | |
26 no_ext = "_".join(items[0:-2]) | |
27 if len(no_ext) > 0: | |
28 return no_ext | |
29 return base_file_name | 20 return base_file_name |
30 | 21 |
31 | 22 |
32 def get_dbkey(dnaprints_dict, key, s): | 23 def get_dbkey(dnaprints_dict, key, s): |
33 # dnaprints_dict looks something like this: | 24 # dnaprints_dict looks something like this: |
89 group = "" | 80 group = "" |
90 dbkey = "" | 81 dbkey = "" |
91 return group, dbkey | 82 return group, dbkey |
92 | 83 |
93 | 84 |
94 def get_group_and_dbkey_for_collection(task_queue, finished_queue, dnaprints_dict, timeout): | |
95 while True: | |
96 try: | |
97 tup = task_queue.get(block=True, timeout=timeout) | |
98 except queue.Empty: | |
99 break | |
100 fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum = tup | |
101 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum) | |
102 finished_queue.put((fastq_file, count_list, group, dbkey)) | |
103 task_queue.task_done() | |
104 | |
105 | |
106 def get_oligo_dict(): | 85 def get_oligo_dict(): |
107 oligo_dict = {} | 86 oligo_dict = {} |
108 oligo_dict["01_ab1"] = "AATTGTCGGATAGCCTGGCGATAACGACGC" | 87 oligo_dict["01_ab1"] = "AATTGTCGGATAGCCTGGCGATAACGACGC" |
109 oligo_dict["02_ab3"] = "CACACGCGGGCCGGAACTGCCGCAAATGAC" | 88 oligo_dict["02_ab3"] = "CACACGCGGGCCGGAACTGCCGCAAATGAC" |
110 oligo_dict["03_ab5"] = "GCTGAAGCGGCAGACCGGCAGAACGAATAT" | 89 oligo_dict["03_ab5"] = "GCTGAAGCGGCAGACCGGCAGAACGAATAT" |
136 | 115 |
137 | 116 |
138 def get_seq_counts(value, fastq_list, gzipped): | 117 def get_seq_counts(value, fastq_list, gzipped): |
139 count = 0 | 118 count = 0 |
140 for fastq_file in fastq_list: | 119 for fastq_file in fastq_list: |
141 if gzipped == "true": | 120 if gzipped: |
142 with gzip.open(fastq_file, 'rt') as fh: | 121 with gzip.open(fastq_file, 'rt') as fh: |
143 for title, seq, qual in FastqGeneralIterator(fh): | 122 for title, seq, qual in FastqGeneralIterator(fh): |
144 count += seq.count(value) | 123 count += seq.count(value) |
145 else: | 124 else: |
146 with open(fastq_file, 'r') as fh: | 125 with open(fastq_file, 'r') as fh: |
162 count_list.append(v) | 141 count_list.append(v) |
163 brucella_sum = sum(count_list[:16]) | 142 brucella_sum = sum(count_list[:16]) |
164 bovis_sum = sum(count_list[16:24]) | 143 bovis_sum = sum(count_list[16:24]) |
165 para_sum = sum(count_list[24:]) | 144 para_sum = sum(count_list[24:]) |
166 return count_summary, count_list, brucella_sum, bovis_sum, para_sum | 145 return count_summary, count_list, brucella_sum, bovis_sum, para_sum |
167 | |
168 | |
169 def get_species_counts_for_collection(task_queue, finished_queue, gzipped, timeout): | |
170 while True: | |
171 try: | |
172 fastq_file = task_queue.get(block=True, timeout=timeout) | |
173 except queue.Empty: | |
174 break | |
175 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts([fastq_file], gzipped) | |
176 finished_queue.put((fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum)) | |
177 task_queue.task_done() | |
178 | 146 |
179 | 147 |
180 def get_species_strings(count_summary): | 148 def get_species_strings(count_summary): |
181 binary_dictionary = {} | 149 binary_dictionary = {} |
182 for k, v in count_summary.items(): | 150 for k, v in count_summary.items(): |
195 para_binary = binary_list[24:] | 163 para_binary = binary_list[24:] |
196 para_string = ''.join(str(e) for e in para_binary) | 164 para_string = ''.join(str(e) for e in para_binary) |
197 return brucella_string, bovis_string, para_string | 165 return brucella_string, bovis_string, para_string |
198 | 166 |
199 | 167 |
200 def get_species_strings_for_collection(task_queue, finished_queue, timeout): | 168 def output_dbkey(file_name, dbkey, output_file): |
201 while True: | |
202 try: | |
203 tup = task_queue.get(block=True, timeout=timeout) | |
204 except queue.Empty: | |
205 break | |
206 fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum = tup | |
207 brucella_string, bovis_string, para_string = get_species_strings(count_summary) | |
208 finished_queue.put((fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum)) | |
209 task_queue.task_done() | |
210 | |
211 | |
212 def output_dbkey(file_name, dbkey, output_file=None): | |
213 # Output the dbkey. | 169 # Output the dbkey. |
214 if output_file is None: | |
215 # We're producing a dataset collection. | |
216 output_file = os.path.join(OUTPUT_DBKEY_DIR, "%s.txt" % file_name) | |
217 with open(output_file, "w") as fh: | 170 with open(output_file, "w") as fh: |
218 fh.write("%s" % dbkey) | 171 fh.write("%s" % dbkey) |
219 | 172 |
220 | 173 |
221 def output_files(fastq_file, count_list, group, dbkey, dbkey_file=None, metrics_file=None): | 174 def output_files(fastq_file, count_list, group, dbkey, dbkey_file, metrics_file): |
222 base_file_name = get_base_file_name(fastq_file) | 175 base_file_name = get_sample_name(fastq_file) |
223 if dbkey_file is not None: | |
224 # We're dealing with a single read or | |
225 # a set of paired reads. If the latter, | |
226 # the following will hopefully produce a | |
227 # good sample string. | |
228 if base_file_name.find("_") > 0: | |
229 base_file_name = base_file_name.split("_")[0] | |
230 output_dbkey(base_file_name, dbkey, dbkey_file) | 176 output_dbkey(base_file_name, dbkey, dbkey_file) |
231 output_metrics(base_file_name, count_list, group, dbkey, metrics_file) | 177 output_metrics(base_file_name, count_list, group, dbkey, metrics_file) |
232 | 178 |
233 | 179 |
234 def output_files_for_collection(task_queue, timeout): | 180 def output_metrics(file_name, count_list, group, dbkey, output_file): |
235 while True: | |
236 try: | |
237 tup = task_queue.get(block=True, timeout=timeout) | |
238 except queue.Empty: | |
239 break | |
240 fastq_file, count_list, group, dbkey = tup | |
241 output_files(fastq_file, count_list, group, dbkey) | |
242 task_queue.task_done() | |
243 | |
244 | |
245 def output_metrics(file_name, count_list, group, dbkey, output_file=None): | |
246 # Output the metrics. | 181 # Output the metrics. |
247 if output_file is None: | |
248 # We're producing a dataset collection. | |
249 output_file = os.path.join(OUTPUT_METRICS_DIR, "%s.txt" % file_name) | |
250 with open(output_file, "w") as fh: | 182 with open(output_file, "w") as fh: |
251 fh.write("Sample: %s\n" % file_name) | 183 fh.write("Sample: %s\n" % file_name) |
252 fh.write("Brucella counts: ") | 184 fh.write("Brucella counts: ") |
253 for i in count_list[:16]: | 185 for i in count_list[:16]: |
254 fh.write("%d," % i) | 186 fh.write("%d," % i) |
260 fh.write("%d," % i) | 192 fh.write("%d," % i) |
261 fh.write("\nGroup: %s" % group) | 193 fh.write("\nGroup: %s" % group) |
262 fh.write("\ndbkey: %s\n" % dbkey) | 194 fh.write("\ndbkey: %s\n" % dbkey) |
263 | 195 |
264 | 196 |
265 def set_num_cpus(num_files, processes): | |
266 num_cpus = int(multiprocessing.cpu_count()) | |
267 if num_files < num_cpus and num_files < processes: | |
268 return num_files | |
269 if num_cpus < processes: | |
270 half_cpus = int(num_cpus / 2) | |
271 if num_files < half_cpus: | |
272 return num_files | |
273 return half_cpus | |
274 return processes | |
275 | |
276 | |
277 if __name__ == '__main__': | 197 if __name__ == '__main__': |
278 parser = argparse.ArgumentParser() | 198 parser = argparse.ArgumentParser() |
279 | 199 |
280 parser.add_argument('--dnaprint_fields', action='append', dest='dnaprint_fields', nargs=2, help="List of dnaprints data table value, name and path fields") | 200 parser.add_argument('--dnaprint_fields', action='append', dest='dnaprint_fields', nargs=2, help="List of dnaprints data table value, name and path fields") |
281 parser.add_argument('--read1', action='store', dest='read1', required=False, default=None, help='Required: single read') | 201 parser.add_argument('--read1', action='store', dest='read1', help='Required: single read') |
282 parser.add_argument('--read2', action='store', dest='read2', required=False, default=None, help='Optional: paired read') | 202 parser.add_argument('--read2', action='store', dest='read2', required=False, default=None, help='Optional: paired read') |
283 parser.add_argument('--gzipped', action='store', dest='gzipped', help='Input files are gzipped') | 203 parser.add_argument('--gzipped', action='store_true', dest='gzipped', help='Input files are gzipped') |
284 parser.add_argument('--output_dbkey', action='store', dest='output_dbkey', required=False, default=None, help='Output reference file') | 204 parser.add_argument('--output_dbkey', action='store', dest='output_dbkey', help='Output reference file') |
285 parser.add_argument('--output_metrics', action='store', dest='output_metrics', required=False, default=None, help='Output metrics file') | 205 parser.add_argument('--output_metrics', action='store', dest='output_metrics', help='Output metrics file') |
286 parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting') | |
287 | 206 |
288 args = parser.parse_args() | 207 args = parser.parse_args() |
289 | 208 |
290 collection = False | 209 fastq_list = [args.read1] |
291 fastq_list = [] | 210 if args.read2 is not None: |
292 if args.read1 is not None: | 211 fastq_list.append(args.read2) |
293 fastq_list.append(args.read1) | |
294 if args.read2 is not None: | |
295 fastq_list.append(args.read2) | |
296 else: | |
297 collection = True | |
298 for file_name in sorted(os.listdir(INPUT_READS_DIR)): | |
299 file_path = os.path.abspath(os.path.join(INPUT_READS_DIR, file_name)) | |
300 fastq_list.append(file_path) | |
301 | 212 |
302 # The value of dnaprint_fields is a list of lists, where each list is | 213 # The value of dnaprint_fields is a list of lists, where each list is |
303 # the [value, name, path] components of the vsnp_dnaprints data table. | 214 # the [value, name, path] components of the vsnp_dnaprints data table. |
304 # The data_manager_vsnp_dnaprints tool assigns the dbkey column from the | 215 # The data_manager_vsnp_dnaprints tool assigns the dbkey column from the |
305 # all_fasta data table to the value column in the vsnp_dnaprints data | 216 # all_fasta data table to the value column in the vsnp_dnaprints data |
306 # table to ensure a proper mapping for discovering the dbkey. | 217 # table to ensure a proper mapping for discovering the dbkey. |
307 dnaprints_dict = get_dnaprints_dict(args.dnaprint_fields) | 218 dnaprints_dict = get_dnaprints_dict(args.dnaprint_fields) |
308 | 219 |
309 if collection: | 220 # Here fastq_list consists of either a single read |
310 # Here fastq_list consists of any number of | 221 # or a set of paired reads, producing single outputs. |
311 # reads, so each file will be processed and | 222 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts(fastq_list, args.gzipped) |
312 # dataset collections will be produced as outputs. | 223 brucella_string, bovis_string, para_string = get_species_strings(count_summary) |
313 multiprocessing.set_start_method('spawn') | 224 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum) |
314 queue1 = multiprocessing.JoinableQueue() | 225 output_files(args.read1, count_list, group, dbkey, dbkey_file=args.output_dbkey, metrics_file=args.output_metrics) |
315 queue2 = multiprocessing.JoinableQueue() | |
316 num_files = len(fastq_list) | |
317 cpus = set_num_cpus(num_files, args.processes) | |
318 # Set a timeout for get()s in the queue. | |
319 timeout = 0.05 | |
320 | |
321 for fastq_file in fastq_list: | |
322 queue1.put(fastq_file) | |
323 | |
324 # Complete the get_species_counts task. | |
325 processes = [multiprocessing.Process(target=get_species_counts_for_collection, args=(queue1, queue2, args.gzipped, timeout, )) for _ in range(cpus)] | |
326 for p in processes: | |
327 p.start() | |
328 for p in processes: | |
329 p.join() | |
330 queue1.join() | |
331 | |
332 # Complete the get_species_strings task. | |
333 processes = [multiprocessing.Process(target=get_species_strings_for_collection, args=(queue2, queue1, timeout, )) for _ in range(cpus)] | |
334 for p in processes: | |
335 p.start() | |
336 for p in processes: | |
337 p.join() | |
338 queue2.join() | |
339 | |
340 # Complete the get_group_and_dbkey task. | |
341 processes = [multiprocessing.Process(target=get_group_and_dbkey_for_collection, args=(queue1, queue2, dnaprints_dict, timeout, )) for _ in range(cpus)] | |
342 for p in processes: | |
343 p.start() | |
344 for p in processes: | |
345 p.join() | |
346 queue1.join() | |
347 | |
348 # Complete the output_files task. | |
349 processes = [multiprocessing.Process(target=output_files_for_collection, args=(queue2, timeout, )) for _ in range(cpus)] | |
350 for p in processes: | |
351 p.start() | |
352 for p in processes: | |
353 p.join() | |
354 queue2.join() | |
355 | |
356 if queue1.empty() and queue2.empty(): | |
357 queue1.close() | |
358 queue1.join_thread() | |
359 queue2.close() | |
360 queue2.join_thread() | |
361 else: | |
362 # Here fastq_list consists of either a single read | |
363 # or a set of paired reads, producing single outputs. | |
364 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts(fastq_list, args.gzipped) | |
365 brucella_string, bovis_string, para_string = get_species_strings(count_summary) | |
366 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum) | |
367 output_files(args.read1, count_list, group, dbkey, dbkey_file=args.output_dbkey, metrics_file=args.output_metrics) |