comparison vsnp_build_tables.py @ 9:f641e52353e8 draft

"planemo upload for repository https://github.com/gregvonkuster/galaxy_tools/tree/master/tools/sequence_analysis/vsnp/vsnp_build_tables commit 1131a7accc36df73eac621f6ae8aa3cb62403bde"
author greg
date Thu, 29 Jul 2021 13:52:48 +0000
parents abfb861df879
children
comparison
equal deleted inserted replaced
8:c3a6795aed09 9:f641e52353e8
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 2
3 import argparse 3 import argparse
4 import multiprocessing
4 import os 5 import os
6 import queue
5 import re 7 import re
6 8
7 import pandas 9 import pandas
8 import pandas.io.formats.excel 10 import pandas.io.formats.excel
9 from Bio import SeqIO 11 from Bio import SeqIO
14 # number as the maximum. Some browsers 16 # number as the maximum. Some browsers
15 # (e.g., Firefox on Linux) are configured 17 # (e.g., Firefox on Linux) are configured
16 # to use LibreOffice for Excel spreadsheets. 18 # to use LibreOffice for Excel spreadsheets.
17 MAXCOLS = 1024 19 MAXCOLS = 1024
18 OUTPUT_EXCEL_DIR = 'output_excel_dir' 20 OUTPUT_EXCEL_DIR = 'output_excel_dir'
21 INPUT_JSON_AVG_MQ_DIR = 'input_json_avg_mq_dir'
22 INPUT_JSON_DIR = 'input_json_dir'
23 INPUT_NEWICK_DIR = 'input_newick_dir'
19 24
20 25
21 def annotate_table(table_df, group, annotation_dict): 26 def annotate_table(table_df, group, annotation_dict):
22 for gbk_chrome, pro in list(annotation_dict.items()): 27 for gbk_chrome, pro in list(annotation_dict.items()):
23 ref_pos = list(table_df) 28 ref_pos = list(table_df)
219 output_excel(df_of_type, type_str, group_str, annotation_dict, count=count) 224 output_excel(df_of_type, type_str, group_str, annotation_dict, count=count)
220 else: 225 else:
221 output_excel(df, type_str, group_str, annotation_dict) 226 output_excel(df, type_str, group_str, annotation_dict)
222 227
223 228
224 def preprocess_tables(newick_file, json_file, json_avg_mq_file, annotation_dict): 229 def preprocess_tables(task_queue, annotation_dict, timeout):
225 avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split') 230 while True:
226 # Map quality to dataframe. 231 try:
227 mqdf = avg_mq_series.to_frame(name='MQ') 232 tup = task_queue.get(block=True, timeout=timeout)
228 mqdf = mqdf.T 233 except queue.Empty:
229 # Get the group. 234 break
230 group = get_sample_name(newick_file) 235 newick_file, json_file, json_avg_mq_file = tup
231 snps_df = pandas.read_json(json_file, orient='split') 236 avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split')
232 with open(newick_file, 'r') as fh: 237 # Map quality to dataframe.
233 for line in fh: 238 mqdf = avg_mq_series.to_frame(name='MQ')
234 line = re.sub('[:,]', '\n', line) 239 mqdf = mqdf.T
235 line = re.sub('[)(]', '', line) 240 # Get the group.
236 line = re.sub(r'[0-9].*\.[0-9].*\n', '', line) 241 group = get_sample_name(newick_file)
237 line = re.sub('root\n', '', line) 242 snps_df = pandas.read_json(json_file, orient='split')
238 sample_order = line.split('\n') 243 with open(newick_file, 'r') as fh:
239 sample_order = list([_f for _f in sample_order if _f]) 244 for line in fh:
240 sample_order.insert(0, 'root') 245 line = re.sub('[:,]', '\n', line)
241 tree_order = snps_df.loc[sample_order] 246 line = re.sub('[)(]', '', line)
242 # Count number of SNPs in each column. 247 line = re.sub(r'[0-9].*\.[0-9].*\n', '', line)
243 snp_per_column = [] 248 line = re.sub('root\n', '', line)
244 for column_header in tree_order: 249 sample_order = line.split('\n')
245 count = 0 250 sample_order = list([_f for _f in sample_order if _f])
246 column = tree_order[column_header] 251 sample_order.insert(0, 'root')
247 for element in column: 252 tree_order = snps_df.loc[sample_order]
248 if element != column[0]: 253 # Count number of SNPs in each column.
249 count = count + 1 254 snp_per_column = []
250 snp_per_column.append(count) 255 for column_header in tree_order:
251 row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column") 256 count = 0
252 # Count number of SNPS from the 257 column = tree_order[column_header]
253 # top of each column in the table. 258 for element in column:
254 snp_from_top = [] 259 if element != column[0]:
255 for column_header in tree_order: 260 count = count + 1
256 count = 0 261 snp_per_column.append(count)
257 column = tree_order[column_header] 262 row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column")
258 # for each element in the column 263 # Count number of SNPS from the
259 # skip the first element 264 # top of each column in the table.
260 for element in column[1:]: 265 snp_from_top = []
261 if element == column[0]: 266 for column_header in tree_order:
262 count = count + 1 267 count = 0
263 else: 268 column = tree_order[column_header]
264 break 269 # for each element in the column
265 snp_from_top.append(count) 270 # skip the first element
266 row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top") 271 for element in column[1:]:
267 tree_order = tree_order.append([row1]) 272 if element == column[0]:
268 tree_order = tree_order.append([row2]) 273 count = count + 1
269 # In pandas=0.18.1 even this does not work: 274 else:
270 # abc = row1.to_frame() 275 break
271 # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18) 276 snp_from_top.append(count)
272 # tree_order.append(abc) 277 row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top")
273 # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions" 278 tree_order = tree_order.append([row1])
274 tree_order = tree_order.T 279 tree_order = tree_order.append([row2])
275 tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False]) 280 # In pandas=0.18.1 even this does not work:
276 tree_order = tree_order.T 281 # abc = row1.to_frame()
277 # Remove snp_per_column and snp_from_top rows. 282 # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18)
278 cascade_order = tree_order[:-2] 283 # tree_order.append(abc)
279 # Output the cascade table. 284 # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions"
280 output_cascade_table(cascade_order, mqdf, group, annotation_dict) 285 tree_order = tree_order.T
281 # Output the sorted table. 286 tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False])
282 output_sort_table(cascade_order, mqdf, group, annotation_dict) 287 tree_order = tree_order.T
288 # Remove snp_per_column and snp_from_top rows.
289 cascade_order = tree_order[:-2]
290 # Output the cascade table.
291 output_cascade_table(cascade_order, mqdf, group, annotation_dict)
292 # Output the sorted table.
293 output_sort_table(cascade_order, mqdf, group, annotation_dict)
294 task_queue.task_done()
295
296
297 def set_num_cpus(num_files, processes):
298 num_cpus = int(multiprocessing.cpu_count())
299 if num_files < num_cpus and num_files < processes:
300 return num_files
301 if num_cpus < processes:
302 half_cpus = int(num_cpus / 2)
303 if num_files < half_cpus:
304 return num_files
305 return half_cpus
306 return processes
283 307
284 308
285 if __name__ == '__main__': 309 if __name__ == '__main__':
286 parser = argparse.ArgumentParser() 310 parser = argparse.ArgumentParser()
287 311
312 parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', required=False, default=None, help='Average MQ json file')
313 parser.add_argument('--input_newick', action='store', dest='input_newick', required=False, default=None, help='Newick file')
314 parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', required=False, default=None, help='SNPs json file')
288 parser.add_argument('--gbk_file', action='store', dest='gbk_file', required=False, default=None, help='Optional gbk file'), 315 parser.add_argument('--gbk_file', action='store', dest='gbk_file', required=False, default=None, help='Optional gbk file'),
289 parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', help='Average MQ json file') 316 parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting')
290 parser.add_argument('--input_newick', action='store', dest='input_newick', help='Newick file')
291 parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', help='SNPs json file')
292 317
293 args = parser.parse_args() 318 args = parser.parse_args()
294 319
295 if args.gbk_file is not None: 320 if args.gbk_file is not None:
296 # Create the annotation_dict for annotating 321 # Create the annotation_dict for annotating
297 # the Excel tables. 322 # the Excel tables.
298 annotation_dict = get_annotation_dict(args.gbk_file) 323 annotation_dict = get_annotation_dict(args.gbk_file)
299 else: 324 else:
300 annotation_dict = None 325 annotation_dict = None
301 326
302 preprocess_tables(args.input_newick, args.input_snps_json, args.input_avg_mq_json, annotation_dict) 327 # The assumption here is that the list of files
328 # in both INPUT_NEWICK_DIR and INPUT_JSON_DIR are
329 # named such that they are properly matched if
330 # the directories contain more than 1 file (i.e.,
331 # hopefully the newick file names and json file names
332 # will be something like Mbovis-01D6_* so they can be
333 # sorted and properly associated with each other).
334 if args.input_newick is not None:
335 newick_files = [args.input_newick]
336 else:
337 newick_files = []
338 for file_name in sorted(os.listdir(INPUT_NEWICK_DIR)):
339 file_path = os.path.abspath(os.path.join(INPUT_NEWICK_DIR, file_name))
340 newick_files.append(file_path)
341 if args.input_snps_json is not None:
342 json_files = [args.input_snps_json]
343 else:
344 json_files = []
345 for file_name in sorted(os.listdir(INPUT_JSON_DIR)):
346 file_path = os.path.abspath(os.path.join(INPUT_JSON_DIR, file_name))
347 json_files.append(file_path)
348 if args.input_avg_mq_json is not None:
349 json_avg_mq_files = [args.input_avg_mq_json]
350 else:
351 json_avg_mq_files = []
352 for file_name in sorted(os.listdir(INPUT_JSON_AVG_MQ_DIR)):
353 file_path = os.path.abspath(os.path.join(INPUT_JSON_AVG_MQ_DIR, file_name))
354 json_avg_mq_files.append(file_path)
355
356 multiprocessing.set_start_method('spawn')
357 queue1 = multiprocessing.JoinableQueue()
358 queue2 = multiprocessing.JoinableQueue()
359 num_files = len(newick_files)
360 cpus = set_num_cpus(num_files, args.processes)
361 # Set a timeout for get()s in the queue.
362 timeout = 0.05
363
364 for i, newick_file in enumerate(newick_files):
365 json_file = json_files[i]
366 json_avg_mq_file = json_avg_mq_files[i]
367 queue1.put((newick_file, json_file, json_avg_mq_file))
368
369 # Complete the preprocess_tables task.
370 processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(cpus)]
371 for p in processes:
372 p.start()
373 for p in processes:
374 p.join()
375 queue1.join()
376
377 if queue1.empty():
378 queue1.close()
379 queue1.join_thread()