annotate queue_genotype_workflow.py @ 12:0882b7bb3dfc draft

Uploaded
author greg
date Thu, 15 Jul 2021 20:39:52 +0000
parents 41477ff2ce8e
children f2c12986e5d4
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
1 #!/usr/bin/env python
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
2 import argparse
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
3 import os
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
4 import shutil
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
5 import sys
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
6 import threading
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
7 import time
10
41477ff2ce8e Uploaded
greg
parents: 6
diff changeset
8 from datetime import datetime
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
9
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
10 from bioblend import galaxy
10
41477ff2ce8e Uploaded
greg
parents: 6
diff changeset
11
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
12 from six.moves import configparser
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
13
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
14 parser = argparse.ArgumentParser()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
15 parser.add_argument('--affy_metadata', dest='affy_metadata', help='Input Affymetrix 96 well plate metadata file')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
16 parser.add_argument('--annot', dest='annot', help='Probeset annotation file')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
17 parser.add_argument('--api_key', dest='api_key', help='Current user API key')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
18 parser.add_argument('--calls', dest='calls', help='Apt-probeset genotype calls file')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
19 parser.add_argument('--confidences', dest='confidences', help='Apt-probeset genotype confidences file')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
20 parser.add_argument('--config_file', dest='config_file', help='qgw_config.ini')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
21 parser.add_argument('--dbkey', dest='dbkey', help='Reference genome dbkey')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
22 parser.add_argument('--reference_genome', dest='reference_genome', help='Reference genome')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
23 parser.add_argument('--history_id', dest='history_id', help='Encoded id of current history')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
24 parser.add_argument('--output', dest='output', help='Output dataset')
6
b7212fe2d597 Uploaded
greg
parents: 4
diff changeset
25 parser.add_argument('--output_nj_phylogeny_tree', dest='output_nj_phylogeny_tree', help='Flag to plot neighbor-joining phylogeny tree')
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
26 parser.add_argument('--report', dest='report', help='Apt-probeset genotype report file')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
27 parser.add_argument('--sample_attributes', dest='sample_attributes', help='Sample attributes tabular file')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
28 parser.add_argument('--snp-posteriors', dest='snp-posteriors', help='Apt-probeset genotype snp-posteriors file')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
29 parser.add_argument('--summary', dest='summary', help='Apt-probeset genotype summary file')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
30 args = parser.parse_args()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
31
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
32
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
33 def add_library_dataset_to_history(gi, history_id, dataset_id, history_datasets, outputfh):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
34 # Add a data library dataset to a history.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
35 outputfh.write('\nImporting dataset into current history.\n')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
36 new_hda_dict = gi.histories.upload_dataset_from_library(history_id, dataset_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
37 new_hda_name = new_hda_dict['name']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
38 history_datasets[new_hda_name] = new_hda_dict
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
39 return history_datasets
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
40
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
41
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
42 def copy_history_dataset_to_library(gi, library_id, dataset_id, outputfh):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
43 # Copy a history dataset to a data library.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
44 outputfh.write('\nCopying history dataset with id %s to data library with id %s.\n' % (str(dataset_id), str(library_id)))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
45 new_library_dataset_dict = gi.libraries.copy_from_dataset(library_id, dataset_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
46 return new_library_dataset_dict
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
47
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
48
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
49 def copy_dataset_to_storage(src_path, dst_base_path, dataset_name, output_fh):
1
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
50 # Copy a dataset to a storage directory on disk. Use the date
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
51 # to name the storage directory to enable storing a file per day
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
52 # (multiple runs per day will overwrite the existing file).
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
53 date_str = datetime.now().strftime("%Y_%m_%d")
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
54 dst_dir = os.path.join(dst_base_path, date_str)
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
55 if not os.path.isdir(dst_dir):
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
56 os.makedirs(dst_dir)
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
57 dst_path = os.path.join(dst_dir, dataset_name)
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
58 shutil.copyfile(src_path, dst_path)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
59 outputfh.write("Copied %s to storage.\n" % dataset_name)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
60
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
61
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
62 def delete_history_dataset(gi, history_id, dataset_id, outputfh, purge=False):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
63 # Delete a history dataset.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
64 outputfh.write("\nDeleting history dataset with id %s.\n" % dataset_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
65 gi.histories.delete_dataset(history_id, dataset_id, purge=purge)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
66
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
67
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
68 def delete_library_dataset(gi, library_id, dataset_id, outputfh, purged=False):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
69 # Delete a library dataset.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
70 outputfh.write("\nDeleting library dataset with id %s.\n" % dataset_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
71 deleted_dataset_dict = gi.libraries.delete_library_dataset(library_id, dataset_id, purged=purged)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
72 return deleted_dataset_dict
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
73
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
74
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
75 def get_config_settings(config_file, section='defaults'):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
76 # Return a dictionary consisting of the key / value pairs
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
77 # of the defaults section of config_file.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
78 d = {}
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
79 config_parser = configparser.ConfigParser()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
80 config_parser.read(config_file)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
81 for key, value in config_parser.items(section):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
82 if section == 'defaults':
10
41477ff2ce8e Uploaded
greg
parents: 6
diff changeset
83 d[key.upper()] = value
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
84 else:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
85 d[key] = value
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
86 return d
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
87
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
88
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
89 def get_data_library_dict(gi, name, outputfh):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
90 # Use the Galaxy API to get the data library named name.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
91 outputfh.write("\nSearching for data library named %s.\n" % name)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
92 # The following is not correctly filtering out deleted libraries.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
93 data_lib_dicts = gi.libraries.get_libraries(library_id=None, name=name, deleted=False)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
94 for data_lib_dict in data_lib_dicts:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
95 if data_lib_dict['name'] == name and data_lib_dict['deleted'] not in [True, 'true', 'True']:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
96 outputfh.write("Found data library named %s.\n" % name)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
97 outputfh.write("%s\n" % str(data_lib_dict))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
98 return data_lib_dict
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
99 return None
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
100
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
101
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
102 def get_history_status(gi, history_id):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
103 return gi.histories.get_status(history_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
104
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
105
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
106 def get_history_dataset_id_by_name(gi, history_id, dataset_name, outputfh):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
107 # Use the Galaxy API to get the bcftools merge dataset id
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
108 # from the current history.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
109 outputfh.write("\nSearching for history dataset named %s.\n" % str(dataset_name))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
110 history_dataset_dicts = get_history_datasets(gi, history_id)
4
163ecfba5961 Uploaded
greg
parents: 1
diff changeset
111 for name, hd_dict in list(history_dataset_dicts.items()):
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
112 name = name.lower()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
113 if name.startswith(dataset_name.lower()):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
114 outputfh.write("Found dataset named %s.\n" % str(dataset_name))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
115 return hd_dict['id']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
116 return None
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
117
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
118
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
119 def get_history_datasets(gi, history_id):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
120 history_datasets = {}
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
121 history_dict = gi.histories.show_history(history_id, contents=True, deleted='false', details=None)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
122 for contents_dict in history_dict:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
123 if contents_dict['history_content_type'] == 'dataset':
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
124 dataset_name = contents_dict['name']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
125 # Don't include the "Queue genotype workflow" dataset.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
126 if dataset_name.startswith("Queue genotype workflow"):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
127 continue
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
128 history_datasets[dataset_name] = contents_dict
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
129 return history_datasets
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
130
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
131
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
132 def get_library_dataset_file_path(gi, library_id, dataset_id, outputfh):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
133 dataset_dict = gi.libraries.show_dataset(library_id, dataset_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
134 outputfh.write("\nReturning file path of library dataset.\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
135 return dataset_dict.get('file_name', None)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
136
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
137
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
138 def get_library_dataset_id_by_name(gi, data_lib_id, dataset_name, outputfh):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
139 # Use the Galaxy API to get the all_genotyped_samples.vcf dataset id.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
140 # We're assuming it is in the root folder.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
141 outputfh.write("\nSearching for library dataset named %s.\n" % str(dataset_name))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
142 lib_item_dicts = gi.libraries.show_library(data_lib_id, contents=True)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
143 for lib_item_dict in lib_item_dicts:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
144 if lib_item_dict['type'] == 'file':
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
145 dataset_name = lib_item_dict['name'].lstrip('/').lower()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
146 if dataset_name.startswith(dataset_name):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
147 outputfh.write("Found dataset named %s.\n" % str(dataset_name))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
148 return lib_item_dict['id']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
149 return None
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
150
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
151
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
152 def get_value_from_config(config_defaults, value):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
153 return config_defaults.get(value, None)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
154
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
155
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
156 def get_workflow(gi, name, outputfh, galaxy_base_url=None, api_key=None):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
157 outputfh.write("\nSearching for workflow named %s\n" % name)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
158 workflow_info_dicts = gi.workflows.get_workflows(name=name, published=True)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
159 if len(workflow_info_dicts) == 0:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
160 return None, None
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
161 wf_info_dict = workflow_info_dicts[0]
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
162 workflow_id = wf_info_dict['id']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
163 # Get the complete workflow.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
164 workflow_dict = gi.workflows.show_workflow(workflow_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
165 outputfh.write("Found workflow named %s.\n" % name)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
166 return workflow_id, workflow_dict
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
167
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
168
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
169 def get_workflow_input_datasets(gi, history_datasets, workflow_name, workflow_dict, outputfh):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
170 # Map the history datasets to the input datasets for the workflow.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
171 workflow_inputs = {}
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
172 outputfh.write("\nMapping datasets from history to workflow %s.\n" % workflow_name)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
173 steps_dict = workflow_dict.get('steps', None)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
174 if steps_dict is not None:
4
163ecfba5961 Uploaded
greg
parents: 1
diff changeset
175 for step_index, step_dict in list(steps_dict.items()):
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
176 # Dicts that define dataset inputs for a workflow
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
177 # look like this.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
178 # "0": {
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
179 # "tool_id": null,
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
180 # "tool_version": null,
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
181 # "id": 0,
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
182 # "input_steps": {},
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
183 # "tool_inputs": {},
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
184 # "type": "data_input",
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
185 # "annotation": null
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
186 # },
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
187 tool_id = step_dict.get('tool_id', None)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
188 tool_type = step_dict.get('type', None)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
189 # This requires the workflow input dataset annotation to be a
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
190 # string # (e.g., report) that enables it to be appropriatey
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
191 # matched to a dataset (e.g., axiongt1_report.txt).
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
192 # 1. affy_metadata.tabular - must have the word "metadata" in
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
193 # the file name.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
194 # 2. sample_attributes.tabular - must have the word "attributes"
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
195 # in the file name.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
196 # 3. probeset_annotation.csv - must have the word "annotation" in
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
197 # the file name.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
198 # 4. <summary file>.txt - must have the the word "summary" in the
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
199 # file name.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
200 # 5. <snp-posteriors file>.txt - must have the the word
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
201 # "snp-posteriors" in the file name.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
202 # 6. <report file>.txt - must have the the word "report" in the
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
203 # file name.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
204 # 7. <confidences file>.txt - must have the the word "confidences"
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
205 # in the file name.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
206 # 8. <calls file>.txt - must have the the word "calls" in the
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
207 # file name.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
208 # 9. all_genotyped_samples.vcf - must have "all_genotyped_samples"
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
209 # in the file name.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
210 annotation = step_dict.get('annotation', None)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
211 if tool_id is None and tool_type == 'data_input' and annotation is not None:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
212 annotation_check = annotation.lower()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
213 # inputs is a list and workflow input datasets
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
214 # have no inputs.
4
163ecfba5961 Uploaded
greg
parents: 1
diff changeset
215 for input_hda_name, input_hda_dict in list(history_datasets.items()):
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
216 input_hda_name_check = input_hda_name.lower()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
217 if input_hda_name_check.find(annotation_check) >= 0:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
218 workflow_inputs[step_index] = {'src': 'hda', 'id': input_hda_dict['id']}
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
219 outputfh.write(" - Mapped dataset %s from history to workflow input dataset with annotation %s.\n" % (input_hda_name, annotation))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
220 break
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
221 return workflow_inputs
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
222
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
223
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
224 def start_workflow(gi, workflow_id, workflow_name, inputs, params, history_id, outputfh):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
225 outputfh.write("\nExecuting workflow %s.\n" % workflow_name)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
226 workflow_invocation_dict = gi.workflows.invoke_workflow(workflow_id, inputs=inputs, params=params, history_id=history_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
227 outputfh.write("Response from executing workflow %s:\n" % workflow_name)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
228 outputfh.write("%s\n" % str(workflow_invocation_dict))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
229
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
230
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
231 def rename_library_dataset(gi, dataset_id, name, outputfh):
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
232 outputfh.write("\nRenaming library dataset with id %s to be named %s.\n" % (str(dataset_id), str(name)))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
233 library_dataset_dict = gi.libraries.update_library_dataset(dataset_id, name=name)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
234 return library_dataset_dict
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
235
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
236
6
b7212fe2d597 Uploaded
greg
parents: 4
diff changeset
237 def update_workflow_params(workflow_dict, dbkey, output_nj_phylogeny_tree, outputfh):
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
238 parameter_updates = None
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
239 name = workflow_dict['name']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
240 outputfh.write("\nChecking for tool parameter updates for workflow %s using dbkey %s.\n" % (name, dbkey))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
241 step_dicts = workflow_dict.get('steps', None)
4
163ecfba5961 Uploaded
greg
parents: 1
diff changeset
242 for step_id, step_dict in list(step_dicts.items()):
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
243 tool_id = step_dict['tool_id']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
244 if tool_id is None:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
245 continue
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
246 # Handle reference_source entries
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
247 if tool_id.find('affy2vcf') > 0:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
248 tool_inputs_dict = step_dict['tool_inputs']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
249 # The queue_genotype_workflow tool provides a selection of only
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
250 # a locally cached reference genome (not a history item), so dbkey
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
251 # will always refer to a locally cached genome.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
252 # The affy2vcf tool allows the user to select either a locally
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
253 # cached reference genome or a history item, but the workflow is
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
254 # defined to use a locally cached reference genome by default.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
255 reference_genome_source_cond_dict = tool_inputs_dict['reference_genome_source_cond']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
256 # The value of reference_genome_source_cond_dict['reference_genome_source']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
257 # will always be 'cached'.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
258 workflow_db_key = reference_genome_source_cond_dict['locally_cached_item']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
259 if dbkey != workflow_db_key:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
260 reference_genome_source_cond_dict['locally_cached_item'] = dbkey
6
b7212fe2d597 Uploaded
greg
parents: 4
diff changeset
261 if parameter_updates is None:
b7212fe2d597 Uploaded
greg
parents: 4
diff changeset
262 parameter_updates = {}
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
263 parameter_updates[step_id] = reference_genome_source_cond_dict
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
264 outputfh.write("Updated step id %s with the following entry:\n%s\n" % (step_id, str(reference_genome_source_cond_dict)))
6
b7212fe2d597 Uploaded
greg
parents: 4
diff changeset
265 if tool_id.find('coral_multilocus_genotype') > 0 and output_nj_phylogeny_tree == 'yes':
b7212fe2d597 Uploaded
greg
parents: 4
diff changeset
266 # Reset the default value 'no' of output_nj_phylogeny_tree to 'yes'.
b7212fe2d597 Uploaded
greg
parents: 4
diff changeset
267 if parameter_updates is None:
b7212fe2d597 Uploaded
greg
parents: 4
diff changeset
268 parameter_updates = {}
10
41477ff2ce8e Uploaded
greg
parents: 6
diff changeset
269 output_nj_phylogeny_tree_dict = {'output_nj_phylogeny_tree': 'yes'}
6
b7212fe2d597 Uploaded
greg
parents: 4
diff changeset
270 parameter_updates[step_id] = output_nj_phylogeny_tree_dict
b7212fe2d597 Uploaded
greg
parents: 4
diff changeset
271 outputfh.write("Updated step id %s with the following entry:\n%s\n" % (step_id, str(output_nj_phylogeny_tree_dict)))
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
272 return parameter_updates
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
273
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
274
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
275 outputfh = open(args.output, "w")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
276 config_defaults = get_config_settings(args.config_file)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
277 user_api_key = open(args.api_key, 'r').read()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
278 admin_api_key = get_value_from_config(config_defaults, 'ADMIN_API_KEY')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
279 galaxy_base_url = get_value_from_config(config_defaults, 'GALAXY_BASE_URL')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
280 gi = galaxy.GalaxyInstance(url=galaxy_base_url, key=user_api_key)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
281 ags_dataset_name = get_value_from_config(config_defaults, 'ALL_GENOTYPED_SAMPLES_DATASET_NAME')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
282 ags_library_name = get_value_from_config(config_defaults, 'ALL_GENOTYPED_SAMPLES_LIBRARY_NAME')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
283 ags_storage_dir = get_value_from_config(config_defaults, 'ALL_GENOTYPED_SAMPLES_STORAGE_DIR')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
284 coralsnp_workflow_name = get_value_from_config(config_defaults, 'CORALSNP_WORKFLOW_NAME')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
285 es_workflow_name = get_value_from_config(config_defaults, 'ENSURE_SYNCED_WORKFLOW_NAME')
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
286 fags_workflow_name = get_value_from_config(config_defaults, 'FILTER_ALL_GENOTYPED_SAMPLES_WORKFLOW_NAME')
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
287 vam_workflow_name = get_value_from_config(config_defaults, 'VALIDATE_AFFY_METADATA_WORKFLOW_NAME')
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
288
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
289 affy_metadata_is_valid = False
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
290 datasets_have_queued = False
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
291 filtered = False
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
292 stag_database_updated = False
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
293 synced = False
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
294 lock = threading.Lock()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
295 lock.acquire(True)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
296 try:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
297 # Get the current history datasets. At this point, the
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
298 # history will ideally contain only the datasets to be
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
299 # used as inputs to the 3 workflows, EnsureSynced,
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
300 # ValidateAffyMetadata and CoralSNP.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
301 history_datasets = get_history_datasets(gi, args.history_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
302
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
303 # Get the All Genotyped Samples data library.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
304 ags_data_library_dict = get_data_library_dict(gi, ags_library_name, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
305 ags_library_id = ags_data_library_dict['id']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
306 # Get the public all_genotyped_samples.vcf library dataset id.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
307 ags_ldda_id = get_library_dataset_id_by_name(gi, ags_library_id, ags_dataset_name, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
308
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
309 # Import the public all_genotyped_samples dataset from
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
310 # the data library to the current history.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
311 history_datasets = add_library_dataset_to_history(gi, args.history_id, ags_ldda_id, history_datasets, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
312 outputfh.write("\nSleeping for 5 seconds...\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
313 time.sleep(5)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
314
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
315 # Get the EnsureSynced workflow
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
316 es_workflow_id, es_workflow_dict = get_workflow(gi, es_workflow_name, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
317 outputfh.write("\nEnsureSynced workflow id: %s\n" % str(es_workflow_id))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
318 # Map the history datasets to the input datasets for
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
319 # the EnsureSynced workflow.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
320 es_workflow_input_datasets = get_workflow_input_datasets(gi, history_datasets, es_workflow_name, es_workflow_dict, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
321 # Start the EnsureSynced workflow.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
322 start_workflow(gi, es_workflow_id, es_workflow_name, es_workflow_input_datasets, None, args.history_id, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
323 outputfh.write("\nSleeping for 15 seconds...\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
324 time.sleep(15)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
325 # Poll the history datasets, checking the statuses, and wait until
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
326 # the workflow is finished. The workflow itself simply schedules
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
327 # all of the jobs, so it cannot be checked for a state.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
328 while True:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
329 history_status_dict = get_history_status(gi, args.history_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
330 sd_dict = history_status_dict['state_details']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
331 outputfh.write("\nsd_dict: %s\n" % str(sd_dict))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
332 # The queue_genotype_workflow tool will continue to be in a
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
333 # "running" state while inside this for loop, so we know that
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
334 # the workflow has completed if only 1 dataset has this state.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
335 if sd_dict['running'] <= 1:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
336 if sd_dict['error'] == 0:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
337 # The all_genotyped_samples.vcf file is
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
338 # in sync with the stag database.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
339 synced = True
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
340 break
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
341 outputfh.write("\nSleeping for 5 seconds...\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
342 time.sleep(5)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
343
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
344 if synced:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
345 # Get the ValidateAffyMetadata workflow.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
346 vam_workflow_id, vam_workflow_dict = get_workflow(gi, vam_workflow_name, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
347 outputfh.write("\nValidateAffyMetadata workflow id: %s\n" % str(vam_workflow_id))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
348 # Map the history datasets to the input datasets for
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
349 # the ValidateAffyMetadata workflow.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
350 vam_workflow_input_datasets = get_workflow_input_datasets(gi, history_datasets, vam_workflow_name, vam_workflow_dict, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
351 # Start the ValidateAffyMetadata workflow.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
352 start_workflow(gi, vam_workflow_id, vam_workflow_name, vam_workflow_input_datasets, None, args.history_id, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
353 outputfh.write("\nSleeping for 15 seconds...\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
354 time.sleep(15)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
355 # Poll the history datasets, checking the statuses, and wait until
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
356 # the workflow is finished.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
357 while True:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
358 history_status_dict = get_history_status(gi, args.history_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
359 sd_dict = history_status_dict['state_details']
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
360 outputfh.write("\nsd_dict: %s\n" % str(sd_dict))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
361 # The queue_genotype_workflow tool will continue to be in a
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
362 # "running" state while inside this for loop, so we know that
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
363 # the workflow has completed if only 1 dataset has this state.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
364 if sd_dict['running'] <= 1:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
365 if sd_dict['error'] == 0:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
366 # The metadata is valid.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
367 affy_metadata_is_valid = True
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
368 break
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
369 outputfh.write("\nSleeping for 5 seconds...\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
370 time.sleep(5)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
371 else:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
372 outputfh.write("\nProcessing ended in error...\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
373 outputfh.close()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
374 lock.release()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
375 sys.exit(1)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
376
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
377 if affy_metadata_is_valid:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
378 # Get the CoralSNP workflow.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
379 coralsnp_workflow_id, coralsnp_workflow_dict = get_workflow(gi, coralsnp_workflow_name, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
380 outputfh.write("\nCoralSNP workflow id: %s\n" % str(coralsnp_workflow_id))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
381 # Map the history datasets to the input datasets for
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
382 # the CoralSNP workflow.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
383 coralsnp_workflow_input_datasets = get_workflow_input_datasets(gi, history_datasets, coralsnp_workflow_name, coralsnp_workflow_dict, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
384 outputfh.write("\nCoralSNP workflow input datasets: %s\n" % str(coralsnp_workflow_input_datasets))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
385 # Get the CoralSNP workflow params that could be updated.
6
b7212fe2d597 Uploaded
greg
parents: 4
diff changeset
386 coralsnp_params = update_workflow_params(coralsnp_workflow_dict, args.dbkey, args.output_nj_phylogeny_tree, outputfh)
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
387 outputfh.write("\nCoralSNP params: %s\n" % str(coralsnp_params))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
388 # Start the CoralSNP workflow.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
389 start_workflow(gi, coralsnp_workflow_id, coralsnp_workflow_name, coralsnp_workflow_input_datasets, coralsnp_params, args.history_id, outputfh)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
390 outputfh.write("\nSleeping for 15 seconds...\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
391 time.sleep(15)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
392 # Poll the history datasets, checking the statuses, and wait until
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
393 # the workflow is finished. The workflow itself simply schedules
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
394 # all of the jobs, so it cannot be checked for a state.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
395 while True:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
396 history_status_dict = get_history_status(gi, args.history_id)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
397 sd_dict = history_status_dict['state_details']
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
398 outputfh.write("\ndatasets_have_queued: %s\n" % str(datasets_have_queued))
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
399 outputfh.write("\nsd_dict: %s\n" % str(sd_dict))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
400 # The queue_genotype_workflow tool will continue to be in a
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
401 # "running" state while inside this for loop, so we know that
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
402 # the workflow has completed if no datasets are in the "new" or
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
403 # "queued" state and there is only 1 dataset in the "running"
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
404 # state. We cannot filter on datasets in the "paused" state
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
405 # because any datasets downstream from one in an "error" state
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
406 # will automatically be given a "paused" state. Of course, we'll
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
407 # always break if any datasets are in the "error" state. At
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
408 # least one dataset must have reached the "queued" state before
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
409 # the workflow is complete.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
410 if not datasets_have_queued:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
411 if sd_dict['queued'] > 0:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
412 datasets_have_queued = True
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
413 if sd_dict['error'] != 0:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
414 break
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
415 if datasets_have_queued and sd_dict['queued'] == 0 and sd_dict['new'] == 0 and sd_dict['running'] <= 1:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
416 # The stag database has been updated.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
417 stag_database_updated = True
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
418 break
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
419 outputfh.write("\nSleeping for 5 seconds...\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
420 time.sleep(5)
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
421 outputfh.write("\nstag_database_updated: %s\n" % str(stag_database_updated))
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
422 if stag_database_updated:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
423 # Get the id of the "bcftools merge" dataset in the current history.
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
424 bcftools_merge = get_history_dataset_id_by_name(gi, args.history_id, "bcftools merge", outputfh)
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
425 # Get the FilterAllGenotypedSamples workflow
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
426 fags_workflow_id, fags_workflow_dict = get_workflow(gi, fags_workflow_name, outputfh)
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
427 outputfh.write("\nFilterAllGenotypedSamples workflow id: %s\n" % str(fags_workflow_id))
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
428 # Map the history datasets to the input datasets for
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
429 # the FilterAllGenotypedSamples workflow.
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
430 fags_workflow_input_datasets = get_workflow_input_datasets(gi, history_datasets, fags_workflow_name, fags_workflow_dict, outputfh)
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
431 # Start the FilterAllGenotypedSamples workflow.
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
432 start_workflow(gi, fags_workflow_id, fags_workflow_name, fags_workflow_input_datasets, None, args.history_id, outputfh)
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
433 outputfh.write("\nSleeping for 15 seconds...\n")
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
434 time.sleep(15)
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
435 # Poll the history datasets, checking the statuses, and wait until
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
436 # the workflow is finished. The workflow itself simply schedules
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
437 # all of the jobs, so it cannot be checked for a state.
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
438 while True:
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
439 history_status_dict = get_history_status(gi, args.history_id)
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
440 sd_dict = history_status_dict['state_details']
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
441 outputfh.write("\nsd_dict: %s\n" % str(sd_dict))
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
442 # The queue_genotype_workflow tool will continue to be in a
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
443 # "running" state while inside this for loop, so we know that
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
444 # the workflow has completed if only 1 dataset has this state.
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
445 if sd_dict['running'] <= 1:
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
446 if sd_dict['error'] == 0:
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
447 # The all_genotyped_samples.vcf file is filtered.
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
448 filtered = True
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
449 break
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
450 outputfh.write("\nSleeping for 5 seconds...\n")
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
451 time.sleep(5)
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
452 outputfh.write("\nfiltered: %s\n" % str(filtered))
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
453 if filtered:
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
454 # Get the id of the "bcftools view" dataset in the current history.
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
455 bcftools_view = get_history_dataset_id_by_name(gi, args.history_id, "bcftools view", outputfh)
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
456 # Create a new dataset in the All Genotyped Samples data library by
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
457 # importing the "bcftools view" dataset from the current history.
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
458 # We'll do this as the coraldmin user.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
459 admin_gi = galaxy.GalaxyInstance(url=galaxy_base_url, key=admin_api_key)
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
460 new_ags_dataset_dict = copy_history_dataset_to_library(admin_gi, ags_library_id, bcftools_view, outputfh)
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
461 outputfh.write("\nnew_ags_dataset_dict: %s\n" % str(new_ags_dataset_dict))
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
462 # Rename the ldda to be all_genotyped_samples.vcf.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
463 new_ags_ldda_id = new_ags_dataset_dict['id']
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
464 outputfh.write("\nnew_ags_ldda_id: %s\n" % str(new_ags_ldda_id))
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
465 renamed_ags_dataset_dict = rename_library_dataset(admin_gi, new_ags_ldda_id, ags_dataset_name, outputfh)
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
466 outputfh.write("\nrenamed_ags_dataset_dict: %s\n" % str(renamed_ags_dataset_dict))
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
467 # Get the full path of the all_genotyped_samples.vcf library dataset.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
468 ags_ldda_file_path = get_library_dataset_file_path(gi, ags_library_id, ags_ldda_id, outputfh)
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
469 outputfh.write("\nags_ldda_file_path: %s\n" % str(ags_ldda_file_path))
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
470 # Copy the all_genotyped_samples.vcf dataset to storage. We
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
471 # will only keep a single copy of this file since this tool
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
472 # will end in an error before the CoralSNP workflow is started
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
473 # if the all_genotyped_samples.vcf file is not sync'd with the
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
474 # stag database.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
475 copy_dataset_to_storage(ags_ldda_file_path, ags_storage_dir, ags_dataset_name, outputfh)
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
476 outputfh.write("\naCopied gs_ldda_file_path: %s to ags_storage_dir %s\n" % (str(ags_ldda_file_path), str(ags_storage_dir)))
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
477 # Delete the original all_genotyped_samples library dataset.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
478 deleted_dataset_dict = delete_library_dataset(admin_gi, ags_library_id, ags_ldda_id, outputfh)
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
479 outputfh.write("\ndeleted_dataset_dict: %s\n" % str(deleted_dataset_dict))
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
480 # To save disk space, delete the all_genotyped_samples hda
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
481 # in the current history to enable later purging by an admin.
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
482 ags_hda_id = get_history_dataset_id_by_name(gi, args.history_id, "all_genotyped_samples", outputfh)
12
0882b7bb3dfc Uploaded
greg
parents: 10
diff changeset
483 outputfh.write("\nags_hda_id: %s\n" % str(ags_hda_id))
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
484 delete_history_dataset(gi, args.history_id, ags_hda_id, outputfh)
1
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
485 else:
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
486 outputfh.write("\nProcessing ended in error...\n")
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
487 outputfh.close()
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
488 lock.release()
d00c4cc7e8c2 Uploaded
greg
parents: 0
diff changeset
489 sys.exit(1)
0
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
490 else:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
491 outputfh.write("\nProcessing ended in error...\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
492 outputfh.close()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
493 lock.release()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
494 sys.exit(1)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
495 except Exception as e:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
496 outputfh.write("Exception preparing or executing either the ValidateAffyMetadata workflow or the CoralSNP workflow:\n%s\n" % str(e))
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
497 outputfh.write("\nProcessing ended in error...\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
498 outputfh.close()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
499 lock.release()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
500 sys.exit(1)
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
501 finally:
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
502 lock.release()
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
503
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
504 outputfh.write("\nFinished processing...\n")
c80fae8c94c1 Uploaded
greg
parents:
diff changeset
505 outputfh.close()