comparison vsnp_build_tables.py @ 0:38a38babcb31 draft

Uploaded
author greg
date Tue, 21 Apr 2020 10:00:22 -0400
parents
children b60858c3eb91
comparison
equal deleted inserted replaced
-1:000000000000 0:38a38babcb31
1 #!/usr/bin/env python
2
3 import argparse
4 import multiprocessing
5 import os
6 import pandas
7 import queue
8 import pandas.io.formats.excel
9 import re
10 from Bio import SeqIO
11
12 INPUT_JSON_AVG_MQ_DIR = 'input_json_avg_mq_dir'
13 INPUT_JSON_DIR = 'input_json_dir'
14 INPUT_NEWICK_DIR = 'input_newick_dir'
15 # Maximum columns allowed in a LibreOffice
16 # spreadsheet is 1024. Excel allows for
17 # 16,384 columns, but we'll set the lower
18 # number as the maximum since Galaxy is
19 # mostly run on Linux.
20 MAXCOLS = 10000
21 OUTPUT_EXCEL_DIR = 'output_excel_dir'
22
23
24 def annotate_table(table_df, group, annotation_dict):
25 for gbk_chrome, pro in list(annotation_dict.items()):
26 ref_pos = list(table_df)
27 ref_series = pandas.Series(ref_pos)
28 ref_df = pandas.DataFrame(ref_series.str.split(':', expand=True).values, columns=['reference', 'position'])
29 all_ref = ref_df[ref_df['reference'] == gbk_chrome]
30 positions = all_ref.position.to_frame()
31 # Create an annotation file.
32 annotation_file = "%s_annotations.csv" % group
33 with open(annotation_file, "a") as fh:
34 for index, row in positions.iterrows():
35 pos = row.position
36 try:
37 aaa = pro.iloc[pro.index.get_loc(int(pos))][['chrom', 'locus', 'product', 'gene']]
38 try:
39 chrom, name, locus, tag = aaa.values[0]
40 print("{}:{}\t{}, {}, {}".format(chrom, pos, locus, tag, name), file=fh)
41 except ValueError:
42 # If only one annotation for the entire
43 # chromosome (e.g., flu) then having [0] fails
44 chrom, name, locus, tag = aaa.values
45 print("{}:{}\t{}, {}, {}".format(chrom, pos, locus, tag, name), file=fh)
46 except KeyError:
47 print("{}:{}\tNo annotated product".format(gbk_chrome, pos), file=fh)
48 # Read the annotation file into a data frame.
49 annotations_df = pandas.read_csv(annotation_file, sep='\t', header=None, names=['index', 'annotations'], index_col='index')
50 # Remove the annotation_file from disk since both
51 # cascade and sort tables are built using the file,
52 # and it is opened for writing in append mode.
53 os.remove(annotation_file)
54 # Process the data.
55 table_df_transposed = table_df.T
56 table_df_transposed.index = table_df_transposed.index.rename('index')
57 table_df_transposed = table_df_transposed.merge(annotations_df, left_index=True, right_index=True)
58 table_df = table_df_transposed.T
59 return table_df
60
61
62 def excel_formatter(json_file_name, excel_file_name, group, annotation_dict):
63 pandas.io.formats.excel.header_style = None
64 table_df = pandas.read_json(json_file_name, orient='split')
65 if annotation_dict is not None:
66 table_df = annotate_table(table_df, group, annotation_dict)
67 else:
68 table_df = table_df.append(pandas.Series(name='no annotations'))
69 writer = pandas.ExcelWriter(excel_file_name, engine='xlsxwriter')
70 table_df.to_excel(writer, sheet_name='Sheet1')
71 writer_book = writer.book
72 ws = writer.sheets['Sheet1']
73 format_a = writer_book.add_format({'bg_color': '#58FA82'})
74 format_g = writer_book.add_format({'bg_color': '#F7FE2E'})
75 format_c = writer_book.add_format({'bg_color': '#0000FF'})
76 format_t = writer_book.add_format({'bg_color': '#FF0000'})
77 format_normal = writer_book.add_format({'bg_color': '#FDFEFE'})
78 formatlowqual = writer_book.add_format({'font_color': '#C70039', 'bg_color': '#E2CFDD'})
79 format_ambigous = writer_book.add_format({'font_color': '#C70039', 'bg_color': '#E2CFDD'})
80 format_n = writer_book.add_format({'bg_color': '#E2CFDD'})
81 rows, cols = table_df.shape
82 ws.set_column(0, 0, 30)
83 ws.set_column(1, cols, 2.1)
84 ws.freeze_panes(2, 1)
85 format_annotation = writer_book.add_format({'font_color': '#0A028C', 'rotation': '-90', 'align': 'top'})
86 # Set last row.
87 ws.set_row(rows + 1, cols + 1, format_annotation)
88 # Make sure that row/column locations don't overlap.
89 ws.conditional_format(rows - 2, 1, rows - 1, cols, {'type': 'cell', 'criteria': '<', 'value': 55, 'format': formatlowqual})
90 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'cell', 'criteria': '==', 'value': 'B$2', 'format': format_normal})
91 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': 'A', 'format': format_a})
92 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': 'G', 'format': format_g})
93 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': 'C', 'format': format_c})
94 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': 'T', 'format': format_t})
95 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': 'S', 'format': format_ambigous})
96 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': 'Y', 'format': format_ambigous})
97 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': 'R', 'format': format_ambigous})
98 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': 'W', 'format': format_ambigous})
99 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': 'K', 'format': format_ambigous})
100 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': 'M', 'format': format_ambigous})
101 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': 'N', 'format': format_n})
102 ws.conditional_format(2, 1, rows - 2, cols, {'type': 'text', 'criteria': 'containing', 'value': '-', 'format': format_n})
103 format_rotation = writer_book.add_format({})
104 format_rotation.set_rotation(90)
105 for column_num, column_name in enumerate(list(table_df.columns)):
106 ws.write(0, column_num + 1, column_name, format_rotation)
107 format_annotation = writer_book.add_format({'font_color': '#0A028C', 'rotation': '-90', 'align': 'top'})
108 # Set last row.
109 ws.set_row(rows, 400, format_annotation)
110 writer.save()
111
112
113 def get_annotation_dict(gbk_file):
114 gbk_dict = SeqIO.to_dict(SeqIO.parse(gbk_file, "genbank"))
115 annotation_dict = {}
116 tmp_file = "features.csv"
117 # Create a file of chromosomes and features.
118 for chromosome in list(gbk_dict.keys()):
119 with open(tmp_file, 'w+') as fh:
120 for feature in gbk_dict[chromosome].features:
121 if "CDS" in feature.type or "rRNA" in feature.type:
122 try:
123 product = feature.qualifiers['product'][0]
124 except KeyError:
125 product = None
126 try:
127 locus = feature.qualifiers['locus_tag'][0]
128 except KeyError:
129 locus = None
130 try:
131 gene = feature.qualifiers['gene'][0]
132 except KeyError:
133 gene = None
134 fh.write("%s\t%d\t%d\t%s\t%s\t%s\n" % (chromosome, int(feature.location.start), int(feature.location.end), locus, product, gene))
135 # Read the chromosomes and features file into a data frame.
136 df = pandas.read_csv(tmp_file, sep='\t', names=["chrom", "start", "stop", "locus", "product", "gene"])
137 # Process the data.
138 df = df.sort_values(['start', 'gene'], ascending=[True, False])
139 df = df.drop_duplicates('start')
140 pro = df.reset_index(drop=True)
141 pro.index = pandas.IntervalIndex.from_arrays(pro['start'], pro['stop'], closed='both')
142 annotation_dict[chromosome] = pro
143 return annotation_dict
144
145
146 def get_base_file_name(file_path):
147 base_file_name = os.path.basename(file_path)
148 if base_file_name.find(".") > 0:
149 # Eliminate the extension.
150 return os.path.splitext(base_file_name)[0]
151 elif base_file_name.find("_") > 0:
152 # The dot extension was likely changed to
153 # the " character.
154 items = base_file_name.split("_")
155 return "_".join(items[0:-1])
156 else:
157 return base_file_name
158
159
160 def output_cascade_table(cascade_order, mqdf, group, annotation_dict):
161 cascade_order_mq = pandas.concat([cascade_order, mqdf], join='inner')
162 output_table(cascade_order_mq, "cascade", group, annotation_dict)
163
164
165 def output_excel(df, type_str, group, annotation_dict, count=None):
166 # Output the temporary json file that
167 # is used by the excel_formatter.
168 if count is None:
169 if group is None:
170 json_file_name = "%s_order_mq.json" % type_str
171 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_table.xlsx" % type_str)
172 else:
173 json_file_name = "%s_%s_order_mq.json" % (group, type_str)
174 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table.xlsx" % (group, type_str))
175 else:
176 if group is None:
177 json_file_name = "%s_order_mq_%d.json" % (type_str, count)
178 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_table_%d.xlsx" % (type_str, count))
179 else:
180 json_file_name = "%s_%s_order_mq_%d.json" % (group, type_str, count)
181 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table_%d.xlsx" % (group, type_str, count))
182 df.to_json(json_file_name, orient='split')
183 # Output the Excel file.
184 excel_formatter(json_file_name, excel_file_name, group, annotation_dict)
185
186
187 def output_sort_table(cascade_order, mqdf, group, annotation_dict):
188 sort_df = cascade_order.T
189 sort_df['abs_value'] = sort_df.index
190 sort_df[['chrom', 'pos']] = sort_df['abs_value'].str.split(':', expand=True)
191 sort_df = sort_df.drop(['abs_value', 'chrom'], axis=1)
192 sort_df.pos = sort_df.pos.astype(int)
193 sort_df = sort_df.sort_values(by=['pos'])
194 sort_df = sort_df.drop(['pos'], axis=1)
195 sort_df = sort_df.T
196 sort_order_mq = pandas.concat([sort_df, mqdf], join='inner')
197 output_table(sort_order_mq, "sort", group, annotation_dict)
198
199
200 def output_table(df, type_str, group, annotation_dict):
201 if isinstance(group, str) and group.startswith("dataset"):
202 # Inputs are single files, not collections,
203 # so input file names are not useful for naming
204 # output files.
205 group_str = None
206 else:
207 group_str = group
208 count = 0
209 chunk_start = 0
210 chunk_end = 0
211 column_count = df.shape[1]
212 if column_count >= MAXCOLS:
213 # Here the number of columns is greater than
214 # the maximum allowed by Excel, so multiple
215 # outputs will be produced.
216 while column_count >= MAXCOLS:
217 count += 1
218 chunk_end += MAXCOLS
219 df_of_type = df.iloc[:, chunk_start:chunk_end]
220 output_excel(df_of_type, type_str, group_str, annotation_dict, count=count)
221 chunk_start += MAXCOLS
222 column_count -= MAXCOLS
223 count += 1
224 df_of_type = df.iloc[:, chunk_start:]
225 output_excel(df_of_type, type_str, group_str, annotation_dict, count=count)
226 else:
227 output_excel(df, type_str, group_str, annotation_dict)
228
229
230 def preprocess_tables(task_queue, annotation_dict, timeout):
231 while True:
232 try:
233 tup = task_queue.get(block=True, timeout=timeout)
234 except queue.Empty:
235 break
236 newick_file, json_file, json_avg_mq_file = tup
237 avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split')
238 # Map quality to dataframe.
239 mqdf = avg_mq_series.to_frame(name='MQ')
240 mqdf = mqdf.T
241 # Get the group.
242 group = get_base_file_name(newick_file)
243 snps_df = pandas.read_json(json_file, orient='split')
244 with open(newick_file, 'r') as fh:
245 for line in fh:
246 line = re.sub('[:,]', '\n', line)
247 line = re.sub('[)(]', '', line)
248 line = re.sub('[0-9].*\.[0-9].*\n', '', line)
249 line = re.sub('root\n', '', line)
250 sample_order = line.split('\n')
251 sample_order = list([_f for _f in sample_order if _f])
252 sample_order.insert(0, 'root')
253 tree_order = snps_df.loc[sample_order]
254 # Count number of SNPs in each column.
255 snp_per_column = []
256 for column_header in tree_order:
257 count = 0
258 column = tree_order[column_header]
259 for element in column:
260 if element != column[0]:
261 count = count + 1
262 snp_per_column.append(count)
263 row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column")
264 # Count number of SNPS from the
265 # top of each column in the table.
266 snp_from_top = []
267 for column_header in tree_order:
268 count = 0
269 column = tree_order[column_header]
270 # for each element in the column
271 # skip the first element
272 for element in column[1:]:
273 if element == column[0]:
274 count = count + 1
275 else:
276 break
277 snp_from_top.append(count)
278 row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top")
279 tree_order = tree_order.append([row1])
280 tree_order = tree_order.append([row2])
281 # In pandas=0.18.1 even this does not work:
282 # abc = row1.to_frame()
283 # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18)
284 # tree_order.append(abc)
285 # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions"
286 tree_order = tree_order.T
287 tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False])
288 tree_order = tree_order.T
289 # Remove snp_per_column and snp_from_top rows.
290 cascade_order = tree_order[:-2]
291 # Output the cascade table.
292 output_cascade_table(cascade_order, mqdf, group, annotation_dict)
293 # Output the sorted table.
294 output_sort_table(cascade_order, mqdf, group, annotation_dict)
295 task_queue.task_done()
296
297
298 def set_num_cpus(num_files, processes):
299 num_cpus = int(multiprocessing.cpu_count())
300 if num_files < num_cpus and num_files < processes:
301 return num_files
302 if num_cpus < processes:
303 half_cpus = int(num_cpus / 2)
304 if num_files < half_cpus:
305 return num_files
306 return half_cpus
307 return processes
308
309
310 if __name__ == '__main__':
311 parser = argparse.ArgumentParser()
312
313 parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', required=False, default=None, help='Average MQ json file')
314 parser.add_argument('--input_newick', action='store', dest='input_newick', required=False, default=None, help='Newick file')
315 parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', required=False, default=None, help='SNPs json file')
316 parser.add_argument('--gbk_file', action='store', dest='gbk_file', required=False, default=None, help='Optional gbk file'),
317 parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting')
318
319 args = parser.parse_args()
320
321 if args.gbk_file is not None:
322 # Create the annotation_dict for annotating
323 # the Excel tables.
324 annotation_dict = get_annotation_dict(args.gbk_file)
325 else:
326 annotation_dict = None
327
328 # The assumption here is that the list of files
329 # in both INPUT_NEWICK_DIR and INPUT_JSON_DIR are
330 # named such that they are properly matched if
331 # the directories contain more than 1 file (i.e.,
332 # hopefully the newick file names and json file names
333 # will be something like Mbovis-01D6_* so they can be
334 # sorted and properly associated with each other).
335 if args.input_newick is not None:
336 newick_files = [args.input_newick]
337 else:
338 newick_files = []
339 for file_name in sorted(os.listdir(INPUT_NEWICK_DIR)):
340 file_path = os.path.abspath(os.path.join(INPUT_NEWICK_DIR, file_name))
341 newick_files.append(file_path)
342 if args.input_snps_json is not None:
343 json_files = [args.input_snps_json]
344 else:
345 json_files = []
346 for file_name in sorted(os.listdir(INPUT_JSON_DIR)):
347 file_path = os.path.abspath(os.path.join(INPUT_JSON_DIR, file_name))
348 json_files.append(file_path)
349 if args.input_avg_mq_json is not None:
350 json_avg_mq_files = [args.input_avg_mq_json]
351 else:
352 json_avg_mq_files = []
353 for file_name in sorted(os.listdir(INPUT_JSON_AVG_MQ_DIR)):
354 file_path = os.path.abspath(os.path.join(INPUT_JSON_AVG_MQ_DIR, file_name))
355 json_avg_mq_files.append(file_path)
356
357 multiprocessing.set_start_method('spawn')
358 queue1 = multiprocessing.JoinableQueue()
359 queue2 = multiprocessing.JoinableQueue()
360 num_files = len(newick_files)
361 cpus = set_num_cpus(num_files, args.processes)
362 # Set a timeout for get()s in the queue.
363 timeout = 0.05
364
365 for i, newick_file in enumerate(newick_files):
366 json_file = json_files[i]
367 json_avg_mq_file = json_avg_mq_files[i]
368 queue1.put((newick_file, json_file, json_avg_mq_file))
369
370 # Complete the preprocess_tables task.
371 processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(cpus)]
372 for p in processes:
373 p.start()
374 for p in processes:
375 p.join()
376 queue1.join()
377
378 if queue1.empty():
379 queue1.close()
380 queue1.join_thread()