0
|
1 #!/usr/bin/env python
|
|
2 import argparse
|
|
3 import os
|
|
4 import shutil
|
|
5 import string
|
|
6 import sys
|
|
7 import threading
|
|
8 import time
|
|
9
|
|
10 from bioblend import galaxy
|
1
|
11 from datetime import datetime
|
0
|
12 from six.moves import configparser
|
|
13
|
|
14 parser = argparse.ArgumentParser()
|
|
15 parser.add_argument('--affy_metadata', dest='affy_metadata', help='Input Affymetrix 96 well plate metadata file')
|
|
16 parser.add_argument('--annot', dest='annot', help='Probeset annotation file')
|
|
17 parser.add_argument('--api_key', dest='api_key', help='Current user API key')
|
|
18 parser.add_argument('--calls', dest='calls', help='Apt-probeset genotype calls file')
|
|
19 parser.add_argument('--confidences', dest='confidences', help='Apt-probeset genotype confidences file')
|
|
20 parser.add_argument('--config_file', dest='config_file', help='qgw_config.ini')
|
|
21 parser.add_argument('--dbkey', dest='dbkey', help='Reference genome dbkey')
|
|
22 parser.add_argument('--reference_genome', dest='reference_genome', help='Reference genome')
|
|
23 parser.add_argument('--history_id', dest='history_id', help='Encoded id of current history')
|
|
24 parser.add_argument('--output', dest='output', help='Output dataset')
|
|
25 parser.add_argument('--report', dest='report', help='Apt-probeset genotype report file')
|
|
26 parser.add_argument('--sample_attributes', dest='sample_attributes', help='Sample attributes tabular file')
|
|
27 parser.add_argument('--snp-posteriors', dest='snp-posteriors', help='Apt-probeset genotype snp-posteriors file')
|
|
28 parser.add_argument('--summary', dest='summary', help='Apt-probeset genotype summary file')
|
|
29 args = parser.parse_args()
|
|
30
|
|
31
|
|
32 def add_library_dataset_to_history(gi, history_id, dataset_id, history_datasets, outputfh):
|
|
33 # Add a data library dataset to a history.
|
|
34 outputfh.write('\nImporting dataset into current history.\n')
|
|
35 new_hda_dict = gi.histories.upload_dataset_from_library(history_id, dataset_id)
|
|
36 new_hda_name = new_hda_dict['name']
|
|
37 history_datasets[new_hda_name] = new_hda_dict
|
|
38 return history_datasets
|
|
39
|
|
40
|
|
41 def copy_history_dataset_to_library(gi, library_id, dataset_id, outputfh):
|
|
42 # Copy a history dataset to a data library.
|
|
43 outputfh.write('\nCopying history dataset with id %s to data library with id %s.\n' % (str(dataset_id), str(library_id)))
|
|
44 new_library_dataset_dict = gi.libraries.copy_from_dataset(library_id, dataset_id)
|
|
45 return new_library_dataset_dict
|
|
46
|
|
47
|
|
48 def copy_dataset_to_storage(src_path, dst_base_path, dataset_name, output_fh):
|
1
|
49 # Copy a dataset to a storage directory on disk. Use the date
|
|
50 # to name the storage directory to enable storing a file per day
|
|
51 # (multiple runs per day will overwrite the existing file).
|
|
52 date_str = datetime.now().strftime("%Y_%m_%d")
|
|
53 dst_dir = os.path.join(dst_base_path, date_str)
|
|
54 if not os.path.isdir(dst_dir):
|
|
55 os.makedirs(dst_dir)
|
|
56 dst_path = os.path.join(dst_dir, dataset_name)
|
0
|
57 shutil.copyfile(src_path, dst_path)
|
|
58 outputfh.write("Copied %s to storage.\n" % dataset_name)
|
|
59
|
|
60
|
|
61 def delete_history_dataset(gi, history_id, dataset_id, outputfh, purge=False):
|
|
62 # Delete a history dataset.
|
|
63 outputfh.write("\nDeleting history dataset with id %s.\n" % dataset_id)
|
|
64 gi.histories.delete_dataset(history_id, dataset_id, purge=purge)
|
|
65
|
|
66
|
|
67 def delete_library_dataset(gi, library_id, dataset_id, outputfh, purged=False):
|
|
68 # Delete a library dataset.
|
|
69 outputfh.write("\nDeleting library dataset with id %s.\n" % dataset_id)
|
|
70 deleted_dataset_dict = gi.libraries.delete_library_dataset(library_id, dataset_id, purged=purged)
|
|
71 return deleted_dataset_dict
|
|
72
|
|
73
|
|
74 def get_config_settings(config_file, section='defaults'):
|
|
75 # Return a dictionary consisting of the key / value pairs
|
|
76 # of the defaults section of config_file.
|
|
77 d = {}
|
|
78 config_parser = configparser.ConfigParser()
|
|
79 config_parser.read(config_file)
|
|
80 for key, value in config_parser.items(section):
|
|
81 if section == 'defaults':
|
|
82 d[string.upper(key)] = value
|
|
83 else:
|
|
84 d[key] = value
|
|
85 return d
|
|
86
|
|
87
|
|
88 def get_data_library_dict(gi, name, outputfh):
|
|
89 # Use the Galaxy API to get the data library named name.
|
|
90 outputfh.write("\nSearching for data library named %s.\n" % name)
|
|
91 # The following is not correctly filtering out deleted libraries.
|
|
92 data_lib_dicts = gi.libraries.get_libraries(library_id=None, name=name, deleted=False)
|
|
93 for data_lib_dict in data_lib_dicts:
|
|
94 if data_lib_dict['name'] == name and data_lib_dict['deleted'] not in [True, 'true', 'True']:
|
|
95 outputfh.write("Found data library named %s.\n" % name)
|
|
96 outputfh.write("%s\n" % str(data_lib_dict))
|
|
97 return data_lib_dict
|
|
98 return None
|
|
99
|
|
100
|
|
101 def get_history_status(gi, history_id):
|
|
102 return gi.histories.get_status(history_id)
|
|
103
|
|
104
|
|
105 def get_history_dataset_id_by_name(gi, history_id, dataset_name, outputfh):
|
|
106 # Use the Galaxy API to get the bcftools merge dataset id
|
|
107 # from the current history.
|
|
108 outputfh.write("\nSearching for history dataset named %s.\n" % str(dataset_name))
|
|
109 history_dataset_dicts = get_history_datasets(gi, history_id)
|
|
110 for name, hd_dict in history_dataset_dicts.items():
|
|
111 name = name.lower()
|
|
112 if name.startswith(dataset_name.lower()):
|
|
113 outputfh.write("Found dataset named %s.\n" % str(dataset_name))
|
|
114 return hd_dict['id']
|
|
115 return None
|
|
116
|
|
117
|
|
118 def get_history_datasets(gi, history_id):
|
|
119 history_datasets = {}
|
|
120 history_dict = gi.histories.show_history(history_id, contents=True, deleted='false', details=None)
|
|
121 for contents_dict in history_dict:
|
|
122 if contents_dict['history_content_type'] == 'dataset':
|
|
123 dataset_name = contents_dict['name']
|
|
124 # Don't include the "Queue genotype workflow" dataset.
|
|
125 if dataset_name.startswith("Queue genotype workflow"):
|
|
126 continue
|
|
127 history_datasets[dataset_name] = contents_dict
|
|
128 return history_datasets
|
|
129
|
|
130
|
|
131 def get_library_dataset_file_path(gi, library_id, dataset_id, outputfh):
|
|
132 dataset_dict = gi.libraries.show_dataset(library_id, dataset_id)
|
|
133 outputfh.write("\nReturning file path of library dataset.\n")
|
|
134 return dataset_dict.get('file_name', None)
|
|
135
|
|
136
|
|
137 def get_library_dataset_id_by_name(gi, data_lib_id, dataset_name, outputfh):
|
|
138 # Use the Galaxy API to get the all_genotyped_samples.vcf dataset id.
|
|
139 # We're assuming it is in the root folder.
|
|
140 outputfh.write("\nSearching for library dataset named %s.\n" % str(dataset_name))
|
|
141 lib_item_dicts = gi.libraries.show_library(data_lib_id, contents=True)
|
|
142 for lib_item_dict in lib_item_dicts:
|
|
143 if lib_item_dict['type'] == 'file':
|
|
144 dataset_name = lib_item_dict['name'].lstrip('/').lower()
|
|
145 if dataset_name.startswith(dataset_name):
|
|
146 outputfh.write("Found dataset named %s.\n" % str(dataset_name))
|
|
147 return lib_item_dict['id']
|
|
148 return None
|
|
149
|
|
150
|
|
151 def get_value_from_config(config_defaults, value):
|
|
152 return config_defaults.get(value, None)
|
|
153
|
|
154
|
|
155 def get_workflow(gi, name, outputfh, galaxy_base_url=None, api_key=None):
|
|
156 outputfh.write("\nSearching for workflow named %s\n" % name)
|
|
157 workflow_info_dicts = gi.workflows.get_workflows(name=name, published=True)
|
|
158 if len(workflow_info_dicts) == 0:
|
|
159 return None, None
|
|
160 wf_info_dict = workflow_info_dicts[0]
|
|
161 workflow_id = wf_info_dict['id']
|
|
162 # Get the complete workflow.
|
|
163 workflow_dict = gi.workflows.show_workflow(workflow_id)
|
|
164 outputfh.write("Found workflow named %s.\n" % name)
|
|
165 return workflow_id, workflow_dict
|
|
166
|
|
167
|
|
168 def get_workflow_input_datasets(gi, history_datasets, workflow_name, workflow_dict, outputfh):
|
|
169 # Map the history datasets to the input datasets for the workflow.
|
|
170 workflow_inputs = {}
|
|
171 outputfh.write("\nMapping datasets from history to workflow %s.\n" % workflow_name)
|
|
172 steps_dict = workflow_dict.get('steps', None)
|
|
173 if steps_dict is not None:
|
|
174 for step_index, step_dict in steps_dict.items():
|
|
175 # Dicts that define dataset inputs for a workflow
|
|
176 # look like this.
|
|
177 # "0": {
|
|
178 # "tool_id": null,
|
|
179 # "tool_version": null,
|
|
180 # "id": 0,
|
|
181 # "input_steps": {},
|
|
182 # "tool_inputs": {},
|
|
183 # "type": "data_input",
|
|
184 # "annotation": null
|
|
185 # },
|
|
186 tool_id = step_dict.get('tool_id', None)
|
|
187 tool_type = step_dict.get('type', None)
|
|
188 # This requires the workflow input dataset annotation to be a
|
|
189 # string # (e.g., report) that enables it to be appropriatey
|
|
190 # matched to a dataset (e.g., axiongt1_report.txt).
|
|
191 # 1. affy_metadata.tabular - must have the word "metadata" in
|
|
192 # the file name.
|
|
193 # 2. sample_attributes.tabular - must have the word "attributes"
|
|
194 # in the file name.
|
|
195 # 3. probeset_annotation.csv - must have the word "annotation" in
|
|
196 # the file name.
|
|
197 # 4. <summary file>.txt - must have the the word "summary" in the
|
|
198 # file name.
|
|
199 # 5. <snp-posteriors file>.txt - must have the the word
|
|
200 # "snp-posteriors" in the file name.
|
|
201 # 6. <report file>.txt - must have the the word "report" in the
|
|
202 # file name.
|
|
203 # 7. <confidences file>.txt - must have the the word "confidences"
|
|
204 # in the file name.
|
|
205 # 8. <calls file>.txt - must have the the word "calls" in the
|
|
206 # file name.
|
|
207 # 9. all_genotyped_samples.vcf - must have "all_genotyped_samples"
|
|
208 # in the file name.
|
|
209 annotation = step_dict.get('annotation', None)
|
|
210 if tool_id is None and tool_type == 'data_input' and annotation is not None:
|
|
211 annotation_check = annotation.lower()
|
|
212 # inputs is a list and workflow input datasets
|
|
213 # have no inputs.
|
|
214 for input_hda_name, input_hda_dict in history_datasets.items():
|
|
215 input_hda_name_check = input_hda_name.lower()
|
|
216 if input_hda_name_check.find(annotation_check) >= 0:
|
|
217 workflow_inputs[step_index] = {'src': 'hda', 'id': input_hda_dict['id']}
|
|
218 outputfh.write(" - Mapped dataset %s from history to workflow input dataset with annotation %s.\n" % (input_hda_name, annotation))
|
|
219 break
|
|
220 return workflow_inputs
|
|
221
|
|
222
|
|
223 def start_workflow(gi, workflow_id, workflow_name, inputs, params, history_id, outputfh):
|
|
224 outputfh.write("\nExecuting workflow %s.\n" % workflow_name)
|
|
225 workflow_invocation_dict = gi.workflows.invoke_workflow(workflow_id, inputs=inputs, params=params, history_id=history_id)
|
|
226 outputfh.write("Response from executing workflow %s:\n" % workflow_name)
|
|
227 outputfh.write("%s\n" % str(workflow_invocation_dict))
|
|
228
|
|
229
|
|
230 def rename_library_dataset(gi, dataset_id, name, outputfh):
|
|
231 outputfh.write("\nRenaming library dataset with id %s to be named %s.\n" % (str(dataset_id), str(name)))
|
|
232 library_dataset_dict = gi.libraries.update_library_dataset(dataset_id, name=name)
|
|
233 return library_dataset_dict
|
|
234
|
|
235
|
|
236 def update_workflow_params(workflow_dict, dbkey, outputfh):
|
|
237 parameter_updates = None
|
|
238 name = workflow_dict['name']
|
|
239 outputfh.write("\nChecking for tool parameter updates for workflow %s using dbkey %s.\n" % (name, dbkey))
|
|
240 step_dicts = workflow_dict.get('steps', None)
|
|
241 for step_id, step_dict in step_dicts.items():
|
|
242 tool_id = step_dict['tool_id']
|
|
243 if tool_id is None:
|
|
244 continue
|
|
245 # Handle reference_source entries
|
|
246 if tool_id.find('affy2vcf') > 0:
|
|
247 tool_inputs_dict = step_dict['tool_inputs']
|
|
248 # The queue_genotype_workflow tool provides a selection of only
|
|
249 # a locally cached reference genome (not a history item), so dbkey
|
|
250 # will always refer to a locally cached genome.
|
|
251 # The affy2vcf tool allows the user to select either a locally
|
|
252 # cached reference genome or a history item, but the workflow is
|
|
253 # defined to use a locally cached reference genome by default.
|
|
254 reference_genome_source_cond_dict = tool_inputs_dict['reference_genome_source_cond']
|
|
255 # The value of reference_genome_source_cond_dict['reference_genome_source']
|
|
256 # will always be 'cached'.
|
|
257 workflow_db_key = reference_genome_source_cond_dict['locally_cached_item']
|
|
258 if dbkey != workflow_db_key:
|
|
259 reference_genome_source_cond_dict['locally_cached_item'] = dbkey
|
|
260 parameter_updates = {}
|
|
261 parameter_updates[step_id] = reference_genome_source_cond_dict
|
|
262 outputfh.write("Updated step id %s with the following entry:\n%s\n" % (step_id, str(reference_genome_source_cond_dict)))
|
|
263 return parameter_updates
|
|
264
|
|
265
|
|
266 outputfh = open(args.output, "w")
|
|
267 config_defaults = get_config_settings(args.config_file)
|
|
268 user_api_key = open(args.api_key, 'r').read()
|
|
269 admin_api_key = get_value_from_config(config_defaults, 'ADMIN_API_KEY')
|
|
270 galaxy_base_url = get_value_from_config(config_defaults, 'GALAXY_BASE_URL')
|
|
271 gi = galaxy.GalaxyInstance(url=galaxy_base_url, key=user_api_key)
|
|
272 ags_dataset_name = get_value_from_config(config_defaults, 'ALL_GENOTYPED_SAMPLES_DATASET_NAME')
|
|
273 ags_library_name = get_value_from_config(config_defaults, 'ALL_GENOTYPED_SAMPLES_LIBRARY_NAME')
|
|
274 ags_storage_dir = get_value_from_config(config_defaults, 'ALL_GENOTYPED_SAMPLES_STORAGE_DIR')
|
|
275 coralsnp_workflow_name = get_value_from_config(config_defaults, 'CORALSNP_WORKFLOW_NAME')
|
|
276 es_workflow_name = get_value_from_config(config_defaults, 'ENSURE_SYNCED_WORKFLOW_NAME')
|
|
277 vam_workflow_name = get_value_from_config(config_defaults, 'VALIDATE_AFFY_METADATA_WORKFLOW_NAME')
|
|
278
|
|
279 affy_metadata_is_valid = False
|
|
280 datasets_have_queued = False
|
|
281 stag_database_updated = False
|
|
282 synced = False
|
|
283 lock = threading.Lock()
|
|
284 lock.acquire(True)
|
|
285 try:
|
|
286 # Get the current history datasets. At this point, the
|
|
287 # history will ideally contain only the datasets to be
|
|
288 # used as inputs to the 3 workflows, EnsureSynced,
|
|
289 # ValidateAffyMetadata and CoralSNP.
|
|
290 history_datasets = get_history_datasets(gi, args.history_id)
|
|
291
|
|
292 # Get the All Genotyped Samples data library.
|
|
293 ags_data_library_dict = get_data_library_dict(gi, ags_library_name, outputfh)
|
|
294 ags_library_id = ags_data_library_dict['id']
|
|
295 # Get the public all_genotyped_samples.vcf library dataset id.
|
|
296 ags_ldda_id = get_library_dataset_id_by_name(gi, ags_library_id, ags_dataset_name, outputfh)
|
|
297
|
|
298 # Import the public all_genotyped_samples dataset from
|
|
299 # the data library to the current history.
|
|
300 history_datasets = add_library_dataset_to_history(gi, args.history_id, ags_ldda_id, history_datasets, outputfh)
|
|
301 outputfh.write("\nSleeping for 5 seconds...\n")
|
|
302 time.sleep(5)
|
|
303
|
|
304 # Get the EnsureSynced workflow
|
|
305 es_workflow_id, es_workflow_dict = get_workflow(gi, es_workflow_name, outputfh)
|
|
306 outputfh.write("\nEnsureSynced workflow id: %s\n" % str(es_workflow_id))
|
|
307 # Map the history datasets to the input datasets for
|
|
308 # the EnsureSynced workflow.
|
|
309 es_workflow_input_datasets = get_workflow_input_datasets(gi, history_datasets, es_workflow_name, es_workflow_dict, outputfh)
|
|
310 # Start the EnsureSynced workflow.
|
|
311 start_workflow(gi, es_workflow_id, es_workflow_name, es_workflow_input_datasets, None, args.history_id, outputfh)
|
|
312 outputfh.write("\nSleeping for 15 seconds...\n")
|
|
313 time.sleep(15)
|
|
314 # Poll the history datasets, checking the statuses, and wait until
|
|
315 # the workflow is finished. The workflow itself simply schedules
|
|
316 # all of the jobs, so it cannot be checked for a state.
|
|
317 while True:
|
|
318 history_status_dict = get_history_status(gi, args.history_id)
|
|
319 sd_dict = history_status_dict['state_details']
|
|
320 outputfh.write("\nsd_dict: %s\n" % str(sd_dict))
|
|
321 # The queue_genotype_workflow tool will continue to be in a
|
|
322 # "running" state while inside this for loop, so we know that
|
|
323 # the workflow has completed if only 1 dataset has this state.
|
|
324 if sd_dict['running'] <= 1:
|
|
325 if sd_dict['error'] == 0:
|
|
326 # The all_genotyped_samples.vcf file is
|
|
327 # in sync with the stag database.
|
|
328 synced = True
|
|
329 break
|
|
330 outputfh.write("\nSleeping for 5 seconds...\n")
|
|
331 time.sleep(5)
|
|
332
|
|
333 if synced:
|
|
334 # Get the ValidateAffyMetadata workflow.
|
|
335 vam_workflow_id, vam_workflow_dict = get_workflow(gi, vam_workflow_name, outputfh)
|
|
336 outputfh.write("\nValidateAffyMetadata workflow id: %s\n" % str(vam_workflow_id))
|
|
337 # Map the history datasets to the input datasets for
|
|
338 # the ValidateAffyMetadata workflow.
|
|
339 vam_workflow_input_datasets = get_workflow_input_datasets(gi, history_datasets, vam_workflow_name, vam_workflow_dict, outputfh)
|
|
340 # Start the ValidateAffyMetadata workflow.
|
|
341 start_workflow(gi, vam_workflow_id, vam_workflow_name, vam_workflow_input_datasets, None, args.history_id, outputfh)
|
|
342 outputfh.write("\nSleeping for 15 seconds...\n")
|
|
343 time.sleep(15)
|
|
344 # Poll the history datasets, checking the statuses, and wait until
|
|
345 # the workflow is finished.
|
|
346 while True:
|
|
347 history_status_dict = get_history_status(gi, args.history_id)
|
|
348 sd_dict = history_status_dict['state_details']
|
|
349 outputfh.write("\nsd_dict: %s\n" % str(sd_dict))
|
|
350 # The queue_genotype_workflow tool will continue to be in a
|
|
351 # "running" state while inside this for loop, so we know that
|
|
352 # the workflow has completed if only 1 dataset has this state.
|
|
353 if sd_dict['running'] <= 1:
|
|
354 if sd_dict['error'] == 0:
|
|
355 # The metadata is valid.
|
|
356 affy_metadata_is_valid = True
|
|
357 break
|
|
358 outputfh.write("\nSleeping for 5 seconds...\n")
|
|
359 time.sleep(5)
|
|
360 else:
|
|
361 outputfh.write("\nProcessing ended in error...\n")
|
|
362 outputfh.close()
|
|
363 lock.release()
|
|
364 sys.exit(1)
|
|
365
|
|
366 if affy_metadata_is_valid:
|
|
367 # Get the CoralSNP workflow.
|
|
368 coralsnp_workflow_id, coralsnp_workflow_dict = get_workflow(gi, coralsnp_workflow_name, outputfh)
|
|
369 outputfh.write("\nCoralSNP workflow id: %s\n" % str(coralsnp_workflow_id))
|
|
370 # Map the history datasets to the input datasets for
|
|
371 # the CoralSNP workflow.
|
|
372 coralsnp_workflow_input_datasets = get_workflow_input_datasets(gi, history_datasets, coralsnp_workflow_name, coralsnp_workflow_dict, outputfh)
|
|
373 outputfh.write("\nCoralSNP workflow input datasets: %s\n" % str(coralsnp_workflow_input_datasets))
|
|
374 # Get the CoralSNP workflow params that could be updated.
|
|
375 coralsnp_params = update_workflow_params(coralsnp_workflow_dict, args.dbkey, outputfh)
|
|
376 outputfh.write("\nCoralSNP params: %s\n" % str(coralsnp_params))
|
|
377 # Start the CoralSNP workflow.
|
|
378 start_workflow(gi, coralsnp_workflow_id, coralsnp_workflow_name, coralsnp_workflow_input_datasets, coralsnp_params, args.history_id, outputfh)
|
|
379 outputfh.write("\nSleeping for 15 seconds...\n")
|
|
380 time.sleep(15)
|
|
381 # Poll the history datasets, checking the statuses, and wait until
|
|
382 # the workflow is finished. The workflow itself simply schedules
|
|
383 # all of the jobs, so it cannot be checked for a state.
|
|
384 while True:
|
|
385 history_status_dict = get_history_status(gi, args.history_id)
|
|
386 sd_dict = history_status_dict['state_details']
|
|
387 outputfh.write("\nsd_dict: %s\n" % str(sd_dict))
|
|
388 # The queue_genotype_workflow tool will continue to be in a
|
|
389 # "running" state while inside this for loop, so we know that
|
|
390 # the workflow has completed if no datasets are in the "new" or
|
|
391 # "queued" state and there is only 1 dataset in the "running"
|
|
392 # state. We cannot filter on datasets in the "paused" state
|
|
393 # because any datasets downstream from one in an "error" state
|
|
394 # will automatically be given a "paused" state. Of course, we'll
|
|
395 # always break if any datasets are in the "error" state. At
|
|
396 # least one dataset must have reached the "queued" state before
|
|
397 # the workflow is complete.
|
|
398 if not datasets_have_queued:
|
|
399 if sd_dict['queued'] > 0:
|
|
400 datasets_have_queued = True
|
|
401 if sd_dict['error'] != 0:
|
|
402 break
|
|
403 if datasets_have_queued and sd_dict['queued'] == 0 and sd_dict['new'] == 0 and sd_dict['running'] <= 1:
|
|
404 # The stag database has been updated.
|
|
405 stag_database_updated = True
|
|
406 break
|
|
407 outputfh.write("\nSleeping for 5 seconds...\n")
|
|
408 time.sleep(5)
|
|
409 if stag_database_updated:
|
|
410 # Get the id of the "bcftools merge" dataset in the current history.
|
|
411 bcftools_merge_dataset_id = get_history_dataset_id_by_name(gi, args.history_id, "bcftools merge", outputfh)
|
|
412 # Create a new dataset in the All Genotyped Samples data library by
|
|
413 # importing the "bcftools merge" dataset from the current history.
|
|
414 # We'll do this as the coraldmin user.
|
|
415 admin_gi = galaxy.GalaxyInstance(url=galaxy_base_url, key=admin_api_key)
|
|
416 new_ags_dataset_dict = copy_history_dataset_to_library(admin_gi, ags_library_id, bcftools_merge_dataset_id, outputfh)
|
|
417 # Rename the ldda to be all_genotyped_samples.vcf.
|
|
418 new_ags_ldda_id = new_ags_dataset_dict['id']
|
|
419 renamed_ags_dataset_dict = rename_library_dataset(admin_gi, new_ags_ldda_id, ags_dataset_name, outputfh)
|
|
420 # Get the full path of the all_genotyped_samples.vcf library dataset.
|
|
421 ags_ldda_file_path = get_library_dataset_file_path(gi, ags_library_id, ags_ldda_id, outputfh)
|
|
422 # Copy the all_genotyped_samples.vcf dataset to storage. We
|
|
423 # will only keep a single copy of this file since this tool
|
|
424 # will end in an error before the CoralSNP workflow is started
|
|
425 # if the all_genotyped_samples.vcf file is not sync'd with the
|
|
426 # stag database.
|
|
427 copy_dataset_to_storage(ags_ldda_file_path, ags_storage_dir, ags_dataset_name, outputfh)
|
|
428 # Delete the original all_genotyped_samples library dataset.
|
|
429 deleted_dataset_dict = delete_library_dataset(admin_gi, ags_library_id, ags_ldda_id, outputfh)
|
|
430 # To save disk space, delete the all_genotyped_samples hda
|
|
431 # in the current history to enable later purging by an admin.
|
|
432 ags_hda_id = get_history_dataset_id_by_name(gi, args.history_id, "all_genotyped_samples", outputfh)
|
|
433 delete_history_dataset(gi, args.history_id, ags_hda_id, outputfh)
|
1
|
434 else:
|
|
435 outputfh.write("\nProcessing ended in error...\n")
|
|
436 outputfh.close()
|
|
437 lock.release()
|
|
438 sys.exit(1)
|
0
|
439 else:
|
|
440 outputfh.write("\nProcessing ended in error...\n")
|
|
441 outputfh.close()
|
|
442 lock.release()
|
|
443 sys.exit(1)
|
|
444 except Exception as e:
|
|
445 outputfh.write("Exception preparing or executing either the ValidateAffyMetadata workflow or the CoralSNP workflow:\n%s\n" % str(e))
|
|
446 outputfh.write("\nProcessing ended in error...\n")
|
|
447 outputfh.close()
|
|
448 lock.release()
|
|
449 sys.exit(1)
|
|
450 finally:
|
|
451 lock.release()
|
|
452
|
|
453 outputfh.write("\nFinished processing...\n")
|
|
454 outputfh.close()
|