comparison vsnp_build_tables.py @ 3:abfb861df879 draft

Uploaded
author greg
date Sun, 03 Jan 2021 16:21:29 +0000
parents b60858c3eb91
children f641e52353e8
comparison
equal deleted inserted replaced
2:85384a9bfba2 3:abfb861df879
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 2
3 import argparse 3 import argparse
4 import multiprocessing
5 import os 4 import os
5 import re
6
6 import pandas 7 import pandas
7 import queue
8 import pandas.io.formats.excel 8 import pandas.io.formats.excel
9 import re
10 from Bio import SeqIO 9 from Bio import SeqIO
11 10
12 INPUT_JSON_AVG_MQ_DIR = 'input_json_avg_mq_dir'
13 INPUT_JSON_DIR = 'input_json_dir'
14 INPUT_NEWICK_DIR = 'input_newick_dir'
15 # Maximum columns allowed in a LibreOffice 11 # Maximum columns allowed in a LibreOffice
16 # spreadsheet is 1024. Excel allows for 12 # spreadsheet is 1024. Excel allows for
17 # 16,384 columns, but we'll set the lower 13 # 16,384 columns, but we'll set the lower
18 # number as the maximum. Some browsers 14 # number as the maximum. Some browsers
19 # (e.g., Firefox on Linux) are configured 15 # (e.g., Firefox on Linux) are configured
30 all_ref = ref_df[ref_df['reference'] == gbk_chrome] 26 all_ref = ref_df[ref_df['reference'] == gbk_chrome]
31 positions = all_ref.position.to_frame() 27 positions = all_ref.position.to_frame()
32 # Create an annotation file. 28 # Create an annotation file.
33 annotation_file = "%s_annotations.csv" % group 29 annotation_file = "%s_annotations.csv" % group
34 with open(annotation_file, "a") as fh: 30 with open(annotation_file, "a") as fh:
35 for index, row in positions.iterrows(): 31 for _, row in positions.iterrows():
36 pos = row.position 32 pos = row.position
37 try: 33 try:
38 aaa = pro.iloc[pro.index.get_loc(int(pos))][['chrom', 'locus', 'product', 'gene']] 34 aaa = pro.iloc[pro.index.get_loc(int(pos))][['chrom', 'locus', 'product', 'gene']]
39 try: 35 try:
40 chrom, name, locus, tag = aaa.values[0] 36 chrom, name, locus, tag = aaa.values[0]
142 pro.index = pandas.IntervalIndex.from_arrays(pro['start'], pro['stop'], closed='both') 138 pro.index = pandas.IntervalIndex.from_arrays(pro['start'], pro['stop'], closed='both')
143 annotation_dict[chromosome] = pro 139 annotation_dict[chromosome] = pro
144 return annotation_dict 140 return annotation_dict
145 141
146 142
147 def get_base_file_name(file_path): 143 def get_sample_name(file_path):
148 base_file_name = os.path.basename(file_path) 144 base_file_name = os.path.basename(file_path)
149 if base_file_name.find(".") > 0: 145 if base_file_name.find(".") > 0:
150 # Eliminate the extension. 146 # Eliminate the extension.
151 return os.path.splitext(base_file_name)[0] 147 return os.path.splitext(base_file_name)[0]
152 elif base_file_name.find("_") > 0: 148 return base_file_name
153 # The dot extension was likely changed to
154 # the " character.
155 items = base_file_name.split("_")
156 return "_".join(items[0:-1])
157 else:
158 return base_file_name
159 149
160 150
161 def output_cascade_table(cascade_order, mqdf, group, annotation_dict): 151 def output_cascade_table(cascade_order, mqdf, group, annotation_dict):
162 cascade_order_mq = pandas.concat([cascade_order, mqdf], join='inner') 152 cascade_order_mq = pandas.concat([cascade_order, mqdf], join='inner')
163 output_table(cascade_order_mq, "cascade", group, annotation_dict) 153 output_table(cascade_order_mq, "cascade", group, annotation_dict)
166 def output_excel(df, type_str, group, annotation_dict, count=None): 156 def output_excel(df, type_str, group, annotation_dict, count=None):
167 # Output the temporary json file that 157 # Output the temporary json file that
168 # is used by the excel_formatter. 158 # is used by the excel_formatter.
169 if count is None: 159 if count is None:
170 if group is None: 160 if group is None:
171 json_file_name = "%s_order_mq.json" % type_str 161 json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_order_mq.json" % type_str)
172 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_table.xlsx" % type_str) 162 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_table.xlsx" % type_str)
173 else: 163 else:
174 json_file_name = "%s_%s_order_mq.json" % (group, type_str) 164 json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_order_mq.json" % (group, type_str))
175 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table.xlsx" % (group, type_str)) 165 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table.xlsx" % (group, type_str))
176 else: 166 else:
167 # The table has more columns than is allowed by the
168 # MAXCOLS setting, so multiple files will be produced
169 # as an output collection.
177 if group is None: 170 if group is None:
178 json_file_name = "%s_order_mq_%d.json" % (type_str, count) 171 json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_order_mq_%d.json" % (type_str, count))
179 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_table_%d.xlsx" % (type_str, count)) 172 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_table_%d.xlsx" % (type_str, count))
180 else: 173 else:
181 json_file_name = "%s_%s_order_mq_%d.json" % (group, type_str, count) 174 json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_order_mq_%d.json" % (group, type_str, count))
182 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table_%d.xlsx" % (group, type_str, count)) 175 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table_%d.xlsx" % (group, type_str, count))
183 df.to_json(json_file_name, orient='split') 176 df.to_json(json_file_name, orient='split')
184 # Output the Excel file. 177 # Output the Excel file.
185 excel_formatter(json_file_name, excel_file_name, group, annotation_dict) 178 excel_formatter(json_file_name, excel_file_name, group, annotation_dict)
186 179
226 output_excel(df_of_type, type_str, group_str, annotation_dict, count=count) 219 output_excel(df_of_type, type_str, group_str, annotation_dict, count=count)
227 else: 220 else:
228 output_excel(df, type_str, group_str, annotation_dict) 221 output_excel(df, type_str, group_str, annotation_dict)
229 222
230 223
231 def preprocess_tables(task_queue, annotation_dict, timeout): 224 def preprocess_tables(newick_file, json_file, json_avg_mq_file, annotation_dict):
232 while True: 225 avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split')
233 try: 226 # Map quality to dataframe.
234 tup = task_queue.get(block=True, timeout=timeout) 227 mqdf = avg_mq_series.to_frame(name='MQ')
235 except queue.Empty: 228 mqdf = mqdf.T
236 break 229 # Get the group.
237 newick_file, json_file, json_avg_mq_file = tup 230 group = get_sample_name(newick_file)
238 avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split') 231 snps_df = pandas.read_json(json_file, orient='split')
239 # Map quality to dataframe. 232 with open(newick_file, 'r') as fh:
240 mqdf = avg_mq_series.to_frame(name='MQ') 233 for line in fh:
241 mqdf = mqdf.T 234 line = re.sub('[:,]', '\n', line)
242 # Get the group. 235 line = re.sub('[)(]', '', line)
243 group = get_base_file_name(newick_file) 236 line = re.sub(r'[0-9].*\.[0-9].*\n', '', line)
244 snps_df = pandas.read_json(json_file, orient='split') 237 line = re.sub('root\n', '', line)
245 with open(newick_file, 'r') as fh: 238 sample_order = line.split('\n')
246 for line in fh: 239 sample_order = list([_f for _f in sample_order if _f])
247 line = re.sub('[:,]', '\n', line) 240 sample_order.insert(0, 'root')
248 line = re.sub('[)(]', '', line) 241 tree_order = snps_df.loc[sample_order]
249 line = re.sub(r'[0-9].*\.[0-9].*\n', '', line) 242 # Count number of SNPs in each column.
250 line = re.sub('root\n', '', line) 243 snp_per_column = []
251 sample_order = line.split('\n') 244 for column_header in tree_order:
252 sample_order = list([_f for _f in sample_order if _f]) 245 count = 0
253 sample_order.insert(0, 'root') 246 column = tree_order[column_header]
254 tree_order = snps_df.loc[sample_order] 247 for element in column:
255 # Count number of SNPs in each column. 248 if element != column[0]:
256 snp_per_column = [] 249 count = count + 1
257 for column_header in tree_order: 250 snp_per_column.append(count)
258 count = 0 251 row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column")
259 column = tree_order[column_header] 252 # Count number of SNPS from the
260 for element in column: 253 # top of each column in the table.
261 if element != column[0]: 254 snp_from_top = []
262 count = count + 1 255 for column_header in tree_order:
263 snp_per_column.append(count) 256 count = 0
264 row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column") 257 column = tree_order[column_header]
265 # Count number of SNPS from the 258 # for each element in the column
266 # top of each column in the table. 259 # skip the first element
267 snp_from_top = [] 260 for element in column[1:]:
268 for column_header in tree_order: 261 if element == column[0]:
269 count = 0 262 count = count + 1
270 column = tree_order[column_header] 263 else:
271 # for each element in the column 264 break
272 # skip the first element 265 snp_from_top.append(count)
273 for element in column[1:]: 266 row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top")
274 if element == column[0]: 267 tree_order = tree_order.append([row1])
275 count = count + 1 268 tree_order = tree_order.append([row2])
276 else: 269 # In pandas=0.18.1 even this does not work:
277 break 270 # abc = row1.to_frame()
278 snp_from_top.append(count) 271 # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18)
279 row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top") 272 # tree_order.append(abc)
280 tree_order = tree_order.append([row1]) 273 # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions"
281 tree_order = tree_order.append([row2]) 274 tree_order = tree_order.T
282 # In pandas=0.18.1 even this does not work: 275 tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False])
283 # abc = row1.to_frame() 276 tree_order = tree_order.T
284 # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18) 277 # Remove snp_per_column and snp_from_top rows.
285 # tree_order.append(abc) 278 cascade_order = tree_order[:-2]
286 # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions" 279 # Output the cascade table.
287 tree_order = tree_order.T 280 output_cascade_table(cascade_order, mqdf, group, annotation_dict)
288 tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False]) 281 # Output the sorted table.
289 tree_order = tree_order.T 282 output_sort_table(cascade_order, mqdf, group, annotation_dict)
290 # Remove snp_per_column and snp_from_top rows.
291 cascade_order = tree_order[:-2]
292 # Output the cascade table.
293 output_cascade_table(cascade_order, mqdf, group, annotation_dict)
294 # Output the sorted table.
295 output_sort_table(cascade_order, mqdf, group, annotation_dict)
296 task_queue.task_done()
297
298
299 def set_num_cpus(num_files, processes):
300 num_cpus = int(multiprocessing.cpu_count())
301 if num_files < num_cpus and num_files < processes:
302 return num_files
303 if num_cpus < processes:
304 half_cpus = int(num_cpus / 2)
305 if num_files < half_cpus:
306 return num_files
307 return half_cpus
308 return processes
309 283
310 284
311 if __name__ == '__main__': 285 if __name__ == '__main__':
312 parser = argparse.ArgumentParser() 286 parser = argparse.ArgumentParser()
313 287
314 parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', required=False, default=None, help='Average MQ json file')
315 parser.add_argument('--input_newick', action='store', dest='input_newick', required=False, default=None, help='Newick file')
316 parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', required=False, default=None, help='SNPs json file')
317 parser.add_argument('--gbk_file', action='store', dest='gbk_file', required=False, default=None, help='Optional gbk file'), 288 parser.add_argument('--gbk_file', action='store', dest='gbk_file', required=False, default=None, help='Optional gbk file'),
318 parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting') 289 parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', help='Average MQ json file')
290 parser.add_argument('--input_newick', action='store', dest='input_newick', help='Newick file')
291 parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', help='SNPs json file')
319 292
320 args = parser.parse_args() 293 args = parser.parse_args()
321 294
322 if args.gbk_file is not None: 295 if args.gbk_file is not None:
323 # Create the annotation_dict for annotating 296 # Create the annotation_dict for annotating
324 # the Excel tables. 297 # the Excel tables.
325 annotation_dict = get_annotation_dict(args.gbk_file) 298 annotation_dict = get_annotation_dict(args.gbk_file)
326 else: 299 else:
327 annotation_dict = None 300 annotation_dict = None
328 301
329 # The assumption here is that the list of files 302 preprocess_tables(args.input_newick, args.input_snps_json, args.input_avg_mq_json, annotation_dict)
330 # in both INPUT_NEWICK_DIR and INPUT_JSON_DIR are
331 # named such that they are properly matched if
332 # the directories contain more than 1 file (i.e.,
333 # hopefully the newick file names and json file names
334 # will be something like Mbovis-01D6_* so they can be
335 # sorted and properly associated with each other).
336 if args.input_newick is not None:
337 newick_files = [args.input_newick]
338 else:
339 newick_files = []
340 for file_name in sorted(os.listdir(INPUT_NEWICK_DIR)):
341 file_path = os.path.abspath(os.path.join(INPUT_NEWICK_DIR, file_name))
342 newick_files.append(file_path)
343 if args.input_snps_json is not None:
344 json_files = [args.input_snps_json]
345 else:
346 json_files = []
347 for file_name in sorted(os.listdir(INPUT_JSON_DIR)):
348 file_path = os.path.abspath(os.path.join(INPUT_JSON_DIR, file_name))
349 json_files.append(file_path)
350 if args.input_avg_mq_json is not None:
351 json_avg_mq_files = [args.input_avg_mq_json]
352 else:
353 json_avg_mq_files = []
354 for file_name in sorted(os.listdir(INPUT_JSON_AVG_MQ_DIR)):
355 file_path = os.path.abspath(os.path.join(INPUT_JSON_AVG_MQ_DIR, file_name))
356 json_avg_mq_files.append(file_path)
357
358 multiprocessing.set_start_method('spawn')
359 queue1 = multiprocessing.JoinableQueue()
360 queue2 = multiprocessing.JoinableQueue()
361 num_files = len(newick_files)
362 cpus = set_num_cpus(num_files, args.processes)
363 # Set a timeout for get()s in the queue.
364 timeout = 0.05
365
366 for i, newick_file in enumerate(newick_files):
367 json_file = json_files[i]
368 json_avg_mq_file = json_avg_mq_files[i]
369 queue1.put((newick_file, json_file, json_avg_mq_file))
370
371 # Complete the preprocess_tables task.
372 processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(cpus)]
373 for p in processes:
374 p.start()
375 for p in processes:
376 p.join()
377 queue1.join()
378
379 if queue1.empty():
380 queue1.close()
381 queue1.join_thread()