annotate vsnp_determine_ref_from_data.py @ 1:bca267738b33 draft

Uploaded
author greg
date Thu, 19 Nov 2020 21:25:31 +0000
parents ebc08e5ce646
children 36bdf8b439ed
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
0
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
1 #!/usr/bin/env python
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
2
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
3 import argparse
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
4 import gzip
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
5 import multiprocessing
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
6 import os
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
7 import queue
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
8 import yaml
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
9 from Bio.SeqIO.QualityIO import FastqGeneralIterator
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
10 from collections import OrderedDict
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
11
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
12 INPUT_READS_DIR = 'input_reads'
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
13 OUTPUT_DBKEY_DIR = 'output_dbkey'
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
14 OUTPUT_METRICS_DIR = 'output_metrics'
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
15
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
16
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
17 def get_base_file_name(file_path):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
18 base_file_name = os.path.basename(file_path)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
19 if base_file_name.find(".") > 0:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
20 # Eliminate the extension.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
21 return os.path.splitext(base_file_name)[0]
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
22 elif base_file_name.find("_") > 0:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
23 # The dot extension was likely changed to
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
24 # the " character.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
25 items = base_file_name.split("_")
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
26 no_ext = "_".join(items[0:-2])
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
27 if len(no_ext) > 0:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
28 return no_ext
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
29 return base_file_name
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
30
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
31
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
32 def get_dbkey(dnaprints_dict, key, s):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
33 # dnaprints_dict looks something like this:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
34 # {'brucella': {'NC_002945v4': ['11001110', '11011110', '11001100']}
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
35 # {'bovis': {'NC_006895': ['11111110', '00010010', '01111011']}}
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
36 d = dnaprints_dict.get(key, {})
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
37 for data_table_value, v_list in d.items():
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
38 if s in v_list:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
39 return data_table_value
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
40 return ""
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
41
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
42
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
43 def get_dnaprints_dict(dnaprint_fields):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
44 # A dndprint_fields entry looks something liek this.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
45 # [['AF2122', '/galaxy/tool-data/vsnp/AF2122/dnaprints/NC_002945v4.yml']]
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
46 dnaprints_dict = {}
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
47 for item in dnaprint_fields:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
48 # Here item is a 2-element list of data
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
49 # table components, # value and path.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
50 value = item[0]
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
51 path = item[1].strip()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
52 with open(path, "rt") as fh:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
53 # The format of all dnaprints yaml
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
54 # files is something like this:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
55 # brucella:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
56 # - 0111111111111111
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
57 print_dict = yaml.load(fh, Loader=yaml.Loader)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
58 for print_dict_k, print_dict_v in print_dict.items():
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
59 dnaprints_v_dict = dnaprints_dict.get(print_dict_k, {})
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
60 if len(dnaprints_v_dict) > 0:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
61 # dnaprints_dict already contains k (e.g., 'brucella',
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
62 # and dnaprints_v_dict will be a dictionary # that
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
63 # looks something like this:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
64 # {'NC_002945v4': ['11001110', '11011110', '11001100']}
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
65 value_list = dnaprints_v_dict.get(value, [])
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
66 value_list = value_list + print_dict_v
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
67 dnaprints_v_dict[value] = value_list
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
68 else:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
69 # dnaprints_v_dict is an empty dictionary.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
70 dnaprints_v_dict[value] = print_dict_v
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
71 dnaprints_dict[print_dict_k] = dnaprints_v_dict
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
72 # dnaprints_dict looks something like this:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
73 # {'brucella': {'NC_002945v4': ['11001110', '11011110', '11001100']}
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
74 # {'bovis': {'NC_006895': ['11111110', '00010010', '01111011']}}
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
75 return dnaprints_dict
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
76
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
77
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
78 def get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
79 if brucella_sum > 3:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
80 group = "Brucella"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
81 dbkey = get_dbkey(dnaprints_dict, "brucella", brucella_string)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
82 elif bovis_sum > 3:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
83 group = "TB"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
84 dbkey = get_dbkey(dnaprints_dict, "bovis", bovis_string)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
85 elif para_sum >= 1:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
86 group = "paraTB"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
87 dbkey = get_dbkey(dnaprints_dict, "para", para_string)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
88 else:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
89 group = ""
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
90 dbkey = ""
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
91 return group, dbkey
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
92
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
93
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
94 def get_group_and_dbkey_for_collection(task_queue, finished_queue, dnaprints_dict, timeout):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
95 while True:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
96 try:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
97 tup = task_queue.get(block=True, timeout=timeout)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
98 except queue.Empty:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
99 break
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
100 fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum = tup
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
101 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
102 finished_queue.put((fastq_file, count_list, group, dbkey))
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
103 task_queue.task_done()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
104
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
105
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
106 def get_oligo_dict():
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
107 oligo_dict = {}
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
108 oligo_dict["01_ab1"] = "AATTGTCGGATAGCCTGGCGATAACGACGC"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
109 oligo_dict["02_ab3"] = "CACACGCGGGCCGGAACTGCCGCAAATGAC"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
110 oligo_dict["03_ab5"] = "GCTGAAGCGGCAGACCGGCAGAACGAATAT"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
111 oligo_dict["04_mel"] = "TGTCGCGCGTCAAGCGGCGTGAAATCTCTG"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
112 oligo_dict["05_suis1"] = "TGCGTTGCCGTGAAGCTTAATTCGGCTGAT"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
113 oligo_dict["06_suis2"] = "GGCAATCATGCGCAGGGCTTTGCATTCGTC"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
114 oligo_dict["07_suis3"] = "CAAGGCAGATGCACATAATCCGGCGACCCG"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
115 oligo_dict["08_ceti1"] = "GTGAATATAGGGTGAATTGATCTTCAGCCG"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
116 oligo_dict["09_ceti2"] = "TTACAAGCAGGCCTATGAGCGCGGCGTGAA"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
117 oligo_dict["10_canis4"] = "CTGCTACATAAAGCACCCGGCGACCGAGTT"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
118 oligo_dict["11_canis"] = "ATCGTTTTGCGGCATATCGCTGACCACAGC"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
119 oligo_dict["12_ovis"] = "CACTCAATCTTCTCTACGGGCGTGGTATCC"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
120 oligo_dict["13_ether2"] = "CGAAATCGTGGTGAAGGACGGGACCGAACC"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
121 oligo_dict["14_63B1"] = "CCTGTTTAAAAGAATCGTCGGAACCGCTCT"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
122 oligo_dict["15_16M0"] = "TCCCGCCGCCATGCCGCCGAAAGTCGCCGT"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
123 oligo_dict["16_mel1b"] = "TCTGTCCAAACCCCGTGACCGAACAATAGA"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
124 oligo_dict["17_tb157"] = "CTCTTCGTATACCGTTCCGTCGTCACCATGGTCCT"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
125 oligo_dict["18_tb7"] = "TCACGCAGCCAACGATATTCGTGTACCGCGACGGT"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
126 oligo_dict["19_tbbov"] = "CTGGGCGACCCGGCCGACCTGCACACCGCGCATCA"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
127 oligo_dict["20_tb5"] = "CCGTGGTGGCGTATCGGGCCCCTGGATCGCGCCCT"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
128 oligo_dict["21_tb2"] = "ATGTCTGCGTAAAGAAGTTCCATGTCCGGGAAGTA"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
129 oligo_dict["22_tb3"] = "GAAGACCTTGATGCCGATCTGGGTGTCGATCTTGA"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
130 oligo_dict["23_tb4"] = "CGGTGTTGAAGGGTCCCCCGTTCCAGAAGCCGGTG"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
131 oligo_dict["24_tb6"] = "ACGGTGATTCGGGTGGTCGACACCGATGGTTCAGA"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
132 oligo_dict["25_para"] = "CCTTTCTTGAAGGGTGTTCG"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
133 oligo_dict["26_para_sheep"] = "CGTGGTGGCGACGGCGGCGGGCCTGTCTAT"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
134 oligo_dict["27_para_cattle"] = "TCTCCTCGGTCGGTGATTCGGGGGCGCGGT"
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
135 return oligo_dict
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
136
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
137
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
138 def get_seq_counts(value, fastq_list, gzipped):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
139 count = 0
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
140 for fastq_file in fastq_list:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
141 if gzipped == "true":
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
142 with gzip.open(fastq_file, 'rt') as fh:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
143 for title, seq, qual in FastqGeneralIterator(fh):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
144 count += seq.count(value)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
145 else:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
146 with open(fastq_file, 'r') as fh:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
147 for title, seq, qual in FastqGeneralIterator(fh):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
148 count += seq.count(value)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
149 return(value, count)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
150
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
151
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
152 def get_species_counts(fastq_list, gzipped):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
153 count_summary = {}
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
154 oligo_dict = get_oligo_dict()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
155 for v1 in oligo_dict.values():
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
156 returned_value, count = get_seq_counts(v1, fastq_list, gzipped)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
157 for key, v2 in oligo_dict.items():
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
158 if returned_value == v2:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
159 count_summary.update({key: count})
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
160 count_list = []
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
161 for v in count_summary.values():
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
162 count_list.append(v)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
163 brucella_sum = sum(count_list[:16])
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
164 bovis_sum = sum(count_list[16:24])
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
165 para_sum = sum(count_list[24:])
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
166 return count_summary, count_list, brucella_sum, bovis_sum, para_sum
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
167
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
168
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
169 def get_species_counts_for_collection(task_queue, finished_queue, gzipped, timeout):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
170 while True:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
171 try:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
172 fastq_file = task_queue.get(block=True, timeout=timeout)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
173 except queue.Empty:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
174 break
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
175 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts([fastq_file], gzipped)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
176 finished_queue.put((fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum))
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
177 task_queue.task_done()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
178
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
179
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
180 def get_species_strings(count_summary):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
181 binary_dictionary = {}
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
182 for k, v in count_summary.items():
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
183 if v > 1:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
184 binary_dictionary.update({k: 1})
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
185 else:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
186 binary_dictionary.update({k: 0})
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
187 binary_dictionary = OrderedDict(sorted(binary_dictionary.items()))
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
188 binary_list = []
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
189 for v in binary_dictionary.values():
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
190 binary_list.append(v)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
191 brucella_binary = binary_list[:16]
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
192 brucella_string = ''.join(str(e) for e in brucella_binary)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
193 bovis_binary = binary_list[16:24]
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
194 bovis_string = ''.join(str(e) for e in bovis_binary)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
195 para_binary = binary_list[24:]
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
196 para_string = ''.join(str(e) for e in para_binary)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
197 return brucella_string, bovis_string, para_string
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
198
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
199
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
200 def get_species_strings_for_collection(task_queue, finished_queue, timeout):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
201 while True:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
202 try:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
203 tup = task_queue.get(block=True, timeout=timeout)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
204 except queue.Empty:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
205 break
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
206 fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum = tup
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
207 brucella_string, bovis_string, para_string = get_species_strings(count_summary)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
208 finished_queue.put((fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum))
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
209 task_queue.task_done()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
210
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
211
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
212 def output_dbkey(file_name, dbkey, output_file=None):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
213 # Output the dbkey.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
214 if output_file is None:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
215 # We're producing a dataset collection.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
216 output_file = os.path.join(OUTPUT_DBKEY_DIR, "%s.txt" % file_name)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
217 with open(output_file, "w") as fh:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
218 fh.write("%s" % dbkey)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
219
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
220
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
221 def output_files(fastq_file, count_list, group, dbkey, dbkey_file=None, metrics_file=None):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
222 base_file_name = get_base_file_name(fastq_file)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
223 if dbkey_file is not None:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
224 # We're dealing with a single read or
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
225 # a set of paired reads. If the latter,
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
226 # the following will hopefully produce a
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
227 # good sample string.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
228 if base_file_name.find("_") > 0:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
229 base_file_name = base_file_name.split("_")[0]
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
230 output_dbkey(base_file_name, dbkey, dbkey_file)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
231 output_metrics(base_file_name, count_list, group, dbkey, metrics_file)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
232
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
233
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
234 def output_files_for_collection(task_queue, timeout):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
235 while True:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
236 try:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
237 tup = task_queue.get(block=True, timeout=timeout)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
238 except queue.Empty:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
239 break
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
240 fastq_file, count_list, group, dbkey = tup
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
241 output_files(fastq_file, count_list, group, dbkey)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
242 task_queue.task_done()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
243
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
244
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
245 def output_metrics(file_name, count_list, group, dbkey, output_file=None):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
246 # Output the metrics.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
247 if output_file is None:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
248 # We're producing a dataset collection.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
249 output_file = os.path.join(OUTPUT_METRICS_DIR, "%s.txt" % file_name)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
250 with open(output_file, "w") as fh:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
251 fh.write("Sample: %s\n" % file_name)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
252 fh.write("Brucella counts: ")
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
253 for i in count_list[:16]:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
254 fh.write("%d," % i)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
255 fh.write("\nTB counts: ")
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
256 for i in count_list[16:24]:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
257 fh.write("%d," % i)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
258 fh.write("\nPara counts: ")
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
259 for i in count_list[24:]:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
260 fh.write("%d," % i)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
261 fh.write("\nGroup: %s" % group)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
262 fh.write("\ndbkey: %s\n" % dbkey)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
263
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
264
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
265 def set_num_cpus(num_files, processes):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
266 num_cpus = int(multiprocessing.cpu_count())
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
267 if num_files < num_cpus and num_files < processes:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
268 return num_files
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
269 if num_cpus < processes:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
270 half_cpus = int(num_cpus / 2)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
271 if num_files < half_cpus:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
272 return num_files
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
273 return half_cpus
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
274 return processes
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
275
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
276
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
277 if __name__ == '__main__':
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
278 parser = argparse.ArgumentParser()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
279
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
280 parser.add_argument('--dnaprint_fields', action='append', dest='dnaprint_fields', nargs=2, help="List of dnaprints data table value, name and path fields")
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
281 parser.add_argument('--read1', action='store', dest='read1', required=False, default=None, help='Required: single read')
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
282 parser.add_argument('--read2', action='store', dest='read2', required=False, default=None, help='Optional: paired read')
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
283 parser.add_argument('--gzipped', action='store', dest='gzipped', help='Input files are gzipped')
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
284 parser.add_argument('--output_dbkey', action='store', dest='output_dbkey', required=False, default=None, help='Output reference file')
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
285 parser.add_argument('--output_metrics', action='store', dest='output_metrics', required=False, default=None, help='Output metrics file')
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
286 parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting')
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
287
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
288 args = parser.parse_args()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
289
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
290 collection = False
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
291 fastq_list = []
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
292 if args.read1 is not None:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
293 fastq_list.append(args.read1)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
294 if args.read2 is not None:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
295 fastq_list.append(args.read2)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
296 else:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
297 collection = True
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
298 for file_name in sorted(os.listdir(INPUT_READS_DIR)):
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
299 file_path = os.path.abspath(os.path.join(INPUT_READS_DIR, file_name))
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
300 fastq_list.append(file_path)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
301
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
302 # The value of dnaprint_fields is a list of lists, where each list is
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
303 # the [value, name, path] components of the vsnp_dnaprints data table.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
304 # The data_manager_vsnp_dnaprints tool assigns the dbkey column from the
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
305 # all_fasta data table to the value column in the vsnp_dnaprints data
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
306 # table to ensure a proper mapping for discovering the dbkey.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
307 dnaprints_dict = get_dnaprints_dict(args.dnaprint_fields)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
308
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
309 if collection:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
310 # Here fastq_list consists of any number of
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
311 # reads, so each file will be processed and
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
312 # dataset collections will be produced as outputs.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
313 multiprocessing.set_start_method('spawn')
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
314 queue1 = multiprocessing.JoinableQueue()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
315 queue2 = multiprocessing.JoinableQueue()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
316 num_files = len(fastq_list)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
317 cpus = set_num_cpus(num_files, args.processes)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
318 # Set a timeout for get()s in the queue.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
319 timeout = 0.05
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
320
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
321 for fastq_file in fastq_list:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
322 queue1.put(fastq_file)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
323
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
324 # Complete the get_species_counts task.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
325 processes = [multiprocessing.Process(target=get_species_counts_for_collection, args=(queue1, queue2, args.gzipped, timeout, )) for _ in range(cpus)]
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
326 for p in processes:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
327 p.start()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
328 for p in processes:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
329 p.join()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
330 queue1.join()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
331
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
332 # Complete the get_species_strings task.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
333 processes = [multiprocessing.Process(target=get_species_strings_for_collection, args=(queue2, queue1, timeout, )) for _ in range(cpus)]
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
334 for p in processes:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
335 p.start()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
336 for p in processes:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
337 p.join()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
338 queue2.join()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
339
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
340 # Complete the get_group_and_dbkey task.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
341 processes = [multiprocessing.Process(target=get_group_and_dbkey_for_collection, args=(queue1, queue2, dnaprints_dict, timeout, )) for _ in range(cpus)]
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
342 for p in processes:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
343 p.start()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
344 for p in processes:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
345 p.join()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
346 queue1.join()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
347
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
348 # Complete the output_files task.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
349 processes = [multiprocessing.Process(target=output_files_for_collection, args=(queue2, timeout, )) for _ in range(cpus)]
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
350 for p in processes:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
351 p.start()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
352 for p in processes:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
353 p.join()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
354 queue2.join()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
355
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
356 if queue1.empty() and queue2.empty():
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
357 queue1.close()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
358 queue1.join_thread()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
359 queue2.close()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
360 queue2.join_thread()
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
361 else:
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
362 # Here fastq_list consists of either a single read
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
363 # or a set of paired reads, producing single outputs.
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
364 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts(fastq_list, args.gzipped)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
365 brucella_string, bovis_string, para_string = get_species_strings(count_summary)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
366 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum)
ebc08e5ce646 Uploaded
greg
parents:
diff changeset
367 output_files(args.read1, count_list, group, dbkey, dbkey_file=args.output_dbkey, metrics_file=args.output_metrics)