changeset 12:0882b7bb3dfc draft

Uploaded
author greg
date Thu, 15 Jul 2021 20:39:52 +0000
parents 560dcf3f9f3d
children b5ca9d62c7bb
files queue_genotype_workflow.py
diffstat 1 files changed, 45 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/queue_genotype_workflow.py	Wed Nov 11 18:21:36 2020 +0000
+++ b/queue_genotype_workflow.py	Thu Jul 15 20:39:52 2021 +0000
@@ -283,10 +283,12 @@
 ags_storage_dir = get_value_from_config(config_defaults, 'ALL_GENOTYPED_SAMPLES_STORAGE_DIR')
 coralsnp_workflow_name = get_value_from_config(config_defaults, 'CORALSNP_WORKFLOW_NAME')
 es_workflow_name = get_value_from_config(config_defaults, 'ENSURE_SYNCED_WORKFLOW_NAME')
+fags_workflow_name = get_value_from_config(config_defaults, 'FILTER_ALL_GENOTYPED_SAMPLES_WORKFLOW_NAME')
 vam_workflow_name = get_value_from_config(config_defaults, 'VALIDATE_AFFY_METADATA_WORKFLOW_NAME')
 
 affy_metadata_is_valid = False
 datasets_have_queued = False
+filtered = False
 stag_database_updated = False
 synced = False
 lock = threading.Lock()
@@ -393,6 +395,7 @@
         while True:
             history_status_dict = get_history_status(gi, args.history_id)
             sd_dict = history_status_dict['state_details']
+            outputfh.write("\ndatasets_have_queued: %s\n" % str(datasets_have_queued))
             outputfh.write("\nsd_dict: %s\n" % str(sd_dict))
             # The queue_genotype_workflow tool will continue to be in a
             # "running" state while inside this for loop, so  we know that
@@ -415,30 +418,69 @@
                 break
             outputfh.write("\nSleeping for 5 seconds...\n")
             time.sleep(5)
+        outputfh.write("\nstag_database_updated: %s\n" % str(stag_database_updated))
         if stag_database_updated:
             # Get the id of the "bcftools merge" dataset in the current history.
-            bcftools_merge_dataset_id = get_history_dataset_id_by_name(gi, args.history_id, "bcftools merge", outputfh)
+            bcftools_merge = get_history_dataset_id_by_name(gi, args.history_id, "bcftools merge", outputfh)
+            # Get the FilterAllGenotypedSamples workflow
+            fags_workflow_id, fags_workflow_dict = get_workflow(gi, fags_workflow_name, outputfh)
+            outputfh.write("\nFilterAllGenotypedSamples workflow id: %s\n" % str(fags_workflow_id))
+            # Map the history datasets to the input datasets for
+            # the FilterAllGenotypedSamples workflow.
+            fags_workflow_input_datasets = get_workflow_input_datasets(gi, history_datasets, fags_workflow_name, fags_workflow_dict, outputfh)
+            # Start the FilterAllGenotypedSamples workflow.
+            start_workflow(gi, fags_workflow_id, fags_workflow_name, fags_workflow_input_datasets, None, args.history_id, outputfh)
+            outputfh.write("\nSleeping for 15 seconds...\n")
+            time.sleep(15)
+            # Poll the history datasets, checking the statuses, and wait until
+            # the workflow is finished.  The workflow itself simply schedules
+            # all of the jobs, so it cannot be checked for a state.
+            while True:
+                history_status_dict = get_history_status(gi, args.history_id)
+                sd_dict = history_status_dict['state_details']
+                outputfh.write("\nsd_dict: %s\n" % str(sd_dict))
+                # The queue_genotype_workflow tool will continue to be in a
+                # "running" state while inside this for loop, so  we know that
+                # the workflow has completed if only 1 dataset has this state.
+                if sd_dict['running'] <= 1:
+                    if sd_dict['error'] == 0:
+                        # The all_genotyped_samples.vcf file is filtered.
+                        filtered = True
+                        break
+                outputfh.write("\nSleeping for 5 seconds...\n")
+                time.sleep(5)
+        outputfh.write("\nfiltered: %s\n" % str(filtered))
+        if filtered:
+            # Get the id of the "bcftools view" dataset in the current history.
+            bcftools_view = get_history_dataset_id_by_name(gi, args.history_id, "bcftools view", outputfh)
             # Create a new dataset in the All Genotyped Samples data library by
-            # importing the "bcftools merge" dataset from the current history.
+            # importing the "bcftools view" dataset from the current history.
             # We'll do this as the coraldmin user.
             admin_gi = galaxy.GalaxyInstance(url=galaxy_base_url, key=admin_api_key)
-            new_ags_dataset_dict = copy_history_dataset_to_library(admin_gi, ags_library_id, bcftools_merge_dataset_id, outputfh)
+            new_ags_dataset_dict = copy_history_dataset_to_library(admin_gi, ags_library_id, bcftools_view, outputfh)
+            outputfh.write("\nnew_ags_dataset_dict: %s\n" % str(new_ags_dataset_dict))
             # Rename the ldda to be all_genotyped_samples.vcf.
             new_ags_ldda_id = new_ags_dataset_dict['id']
+            outputfh.write("\nnew_ags_ldda_id: %s\n" % str(new_ags_ldda_id))
             renamed_ags_dataset_dict = rename_library_dataset(admin_gi, new_ags_ldda_id, ags_dataset_name, outputfh)
+            outputfh.write("\nrenamed_ags_dataset_dict: %s\n" % str(renamed_ags_dataset_dict))
             # Get the full path of the all_genotyped_samples.vcf library dataset.
             ags_ldda_file_path = get_library_dataset_file_path(gi, ags_library_id, ags_ldda_id, outputfh)
+            outputfh.write("\nags_ldda_file_path: %s\n" % str(ags_ldda_file_path))
             # Copy the all_genotyped_samples.vcf dataset to storage.  We
             # will only keep a single copy of this file since this tool
             # will end in an error before the CoralSNP workflow is started
             # if the all_genotyped_samples.vcf file is not sync'd with the
             # stag database.
             copy_dataset_to_storage(ags_ldda_file_path, ags_storage_dir, ags_dataset_name, outputfh)
+            outputfh.write("\naCopied gs_ldda_file_path: %s to ags_storage_dir %s\n" % (str(ags_ldda_file_path), str(ags_storage_dir)))
             # Delete the original all_genotyped_samples library dataset.
             deleted_dataset_dict = delete_library_dataset(admin_gi, ags_library_id, ags_ldda_id, outputfh)
+            outputfh.write("\ndeleted_dataset_dict: %s\n" % str(deleted_dataset_dict))
             # To save disk space, delete the all_genotyped_samples hda
             # in the current history to enable later purging by an admin.
             ags_hda_id = get_history_dataset_id_by_name(gi, args.history_id, "all_genotyped_samples", outputfh)
+            outputfh.write("\nags_hda_id: %s\n" % str(ags_hda_id))
             delete_history_dataset(gi, args.history_id, ags_hda_id, outputfh)
         else:
             outputfh.write("\nProcessing ended in error...\n")