Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/bioblend/_tests/TestGalaxyWorkflows.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d30785e31577 |
|---|---|
| 1 import json | |
| 2 import os | |
| 3 import shutil | |
| 4 import tempfile | |
| 5 import time | |
| 6 | |
| 7 from . import GalaxyTestBase, test_util | |
| 8 | |
| 9 | |
| 10 class TestGalaxyWorkflows(GalaxyTestBase.GalaxyTestBase): | |
| 11 | |
| 12 @test_util.skip_unless_tool("cat1") | |
| 13 @test_util.skip_unless_tool("cat") | |
| 14 def test_workflow_scheduling(self): | |
| 15 path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga')) | |
| 16 workflow = self.gi.workflows.import_workflow_from_local_path(path) | |
| 17 workflow_id = workflow["id"] | |
| 18 history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"] | |
| 19 dataset1_id = self._test_dataset(history_id) | |
| 20 | |
| 21 invocations = self.gi.workflows.get_invocations(workflow_id) | |
| 22 self.assertEqual(len(invocations), 0) | |
| 23 | |
| 24 invocation = self.gi.workflows.invoke_workflow( | |
| 25 workflow["id"], | |
| 26 inputs={"0": {"src": "hda", "id": dataset1_id}}, | |
| 27 ) | |
| 28 invocation_id = invocation["id"] | |
| 29 invocations = self.gi.workflows.get_invocations(workflow_id) | |
| 30 self.assertEqual(len(invocations), 1) | |
| 31 self.assertEqual(invocations[0]["id"], invocation_id) | |
| 32 | |
| 33 def invocation_steps_by_order_index(): | |
| 34 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
| 35 return dict((s["order_index"], s) for s in invocation["steps"]) | |
| 36 | |
| 37 for _ in range(20): | |
| 38 if 2 in invocation_steps_by_order_index(): | |
| 39 break | |
| 40 time.sleep(.5) | |
| 41 | |
| 42 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
| 43 self.assertEqual(invocation['state'], "ready") | |
| 44 | |
| 45 steps = invocation_steps_by_order_index() | |
| 46 pause_step = steps[2] | |
| 47 self.assertIsNone( | |
| 48 self.gi.workflows.show_invocation_step(workflow_id, invocation_id, pause_step["id"])["action"]) | |
| 49 self.gi.workflows.run_invocation_step_action(workflow_id, invocation_id, pause_step["id"], action=True) | |
| 50 self.assertTrue(self.gi.workflows.show_invocation_step(workflow_id, invocation_id, pause_step["id"])["action"]) | |
| 51 for _ in range(20): | |
| 52 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
| 53 if invocation["state"] == "scheduled": | |
| 54 break | |
| 55 | |
| 56 time.sleep(.5) | |
| 57 | |
| 58 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
| 59 self.assertEqual(invocation["state"], "scheduled") | |
| 60 | |
| 61 @test_util.skip_unless_tool("cat1") | |
| 62 @test_util.skip_unless_tool("cat") | |
| 63 def test_cancelling_workflow_scheduling(self): | |
| 64 path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga')) | |
| 65 workflow = self.gi.workflows.import_workflow_from_local_path(path) | |
| 66 workflow_id = workflow["id"] | |
| 67 history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"] | |
| 68 dataset1_id = self._test_dataset(history_id) | |
| 69 | |
| 70 invocations = self.gi.workflows.get_invocations(workflow_id) | |
| 71 self.assertEqual(len(invocations), 0) | |
| 72 | |
| 73 invocation = self.gi.workflows.invoke_workflow( | |
| 74 workflow["id"], | |
| 75 inputs={"0": {"src": "hda", "id": dataset1_id}}, | |
| 76 ) | |
| 77 invocation_id = invocation["id"] | |
| 78 invocations = self.gi.workflows.get_invocations(workflow_id) | |
| 79 self.assertEqual(len(invocations), 1) | |
| 80 self.assertEqual(invocations[0]["id"], invocation_id) | |
| 81 | |
| 82 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
| 83 self.assertIn(invocation['state'], ['new', 'ready']) | |
| 84 | |
| 85 self.gi.workflows.cancel_invocation(workflow_id, invocation_id) | |
| 86 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
| 87 self.assertEqual(invocation['state'], 'cancelled') | |
| 88 | |
| 89 def test_import_export_workflow_from_local_path(self): | |
| 90 with self.assertRaises(Exception): | |
| 91 self.gi.workflows.import_workflow_from_local_path(None) | |
| 92 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
| 93 imported_wf = self.gi.workflows.import_workflow_from_local_path(path) | |
| 94 self.assertIsInstance(imported_wf, dict) | |
| 95 self.assertEqual(imported_wf['name'], 'paste_columns') | |
| 96 self.assertTrue(imported_wf['url'].startswith('/api/workflows/')) | |
| 97 self.assertFalse(imported_wf['deleted']) | |
| 98 self.assertFalse(imported_wf['published']) | |
| 99 with self.assertRaises(Exception): | |
| 100 self.gi.workflows.export_workflow_to_local_path(None, None, None) | |
| 101 export_dir = tempfile.mkdtemp(prefix='bioblend_test_') | |
| 102 try: | |
| 103 self.gi.workflows.export_workflow_to_local_path(imported_wf['id'], export_dir) | |
| 104 dir_contents = os.listdir(export_dir) | |
| 105 self.assertEqual(len(dir_contents), 1) | |
| 106 export_path = os.path.join(export_dir, dir_contents[0]) | |
| 107 with open(export_path, 'r') as f: | |
| 108 exported_wf_dict = json.load(f) | |
| 109 finally: | |
| 110 shutil.rmtree(export_dir) | |
| 111 self.assertIsInstance(exported_wf_dict, dict) | |
| 112 | |
| 113 def test_import_publish_workflow_from_local_path(self): | |
| 114 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
| 115 imported_wf = self.gi.workflows.import_workflow_from_local_path(path, publish=True) | |
| 116 self.assertIsInstance(imported_wf, dict) | |
| 117 self.assertFalse(imported_wf['deleted']) | |
| 118 self.assertTrue(imported_wf['published']) | |
| 119 | |
| 120 def test_import_export_workflow_dict(self): | |
| 121 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
| 122 with open(path, 'r') as f: | |
| 123 wf_dict = json.load(f) | |
| 124 imported_wf = self.gi.workflows.import_workflow_dict(wf_dict) | |
| 125 self.assertIsInstance(imported_wf, dict) | |
| 126 self.assertEqual(imported_wf['name'], 'paste_columns') | |
| 127 self.assertTrue(imported_wf['url'].startswith('/api/workflows/')) | |
| 128 self.assertFalse(imported_wf['deleted']) | |
| 129 self.assertFalse(imported_wf['published']) | |
| 130 exported_wf_dict = self.gi.workflows.export_workflow_dict(imported_wf['id']) | |
| 131 self.assertIsInstance(exported_wf_dict, dict) | |
| 132 | |
| 133 def test_import_publish_workflow_dict(self): | |
| 134 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
| 135 with open(path, 'r') as f: | |
| 136 wf_dict = json.load(f) | |
| 137 imported_wf = self.gi.workflows.import_workflow_dict(wf_dict, publish=True) | |
| 138 self.assertIsInstance(imported_wf, dict) | |
| 139 self.assertFalse(imported_wf['deleted']) | |
| 140 self.assertTrue(imported_wf['published']) | |
| 141 | |
| 142 def test_get_workflows(self): | |
| 143 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
| 144 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
| 145 all_wfs = self.gi.workflows.get_workflows() | |
| 146 self.assertGreater(len(all_wfs), 0) | |
| 147 wf_data = self.gi.workflows.get_workflows(workflow_id=wf['id'])[0] | |
| 148 self.assertEqual(wf['id'], wf_data['id']) | |
| 149 self.assertEqual(wf['name'], wf_data['name']) | |
| 150 self.assertEqual(wf['url'], wf_data['url']) | |
| 151 wf_data_list = self.gi.workflows.get_workflows(name=wf['name']) | |
| 152 self.assertTrue(any(_['id'] == wf['id'] for _ in wf_data_list)) | |
| 153 | |
| 154 def test_show_workflow(self): | |
| 155 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
| 156 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
| 157 wf_data = self.gi.workflows.show_workflow(wf['id']) | |
| 158 self.assertEqual(wf_data['id'], wf['id']) | |
| 159 self.assertEqual(wf_data['name'], wf['name']) | |
| 160 self.assertEqual(wf_data['url'], wf['url']) | |
| 161 self.assertEqual(len(wf_data['steps']), 3) | |
| 162 self.assertIsNotNone(wf_data['inputs']) | |
| 163 | |
| 164 @test_util.skip_unless_galaxy('release_18.05') | |
| 165 def test_update_workflow_name(self): | |
| 166 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
| 167 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
| 168 new_name = 'new name' | |
| 169 updated_wf = self.gi.workflows.update_workflow(wf['id'], name=new_name) | |
| 170 self.assertEqual(updated_wf['name'], new_name) | |
| 171 | |
| 172 @test_util.skip_unless_galaxy('release_19.09') # due to Galaxy bug fixed in https://github.com/galaxyproject/galaxy/pull/9014 | |
| 173 def test_show_workflow_versions(self): | |
| 174 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
| 175 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
| 176 wf_data = self.gi.workflows.show_workflow(wf['id']) | |
| 177 self.assertEqual(wf_data['version'], 0) | |
| 178 new_name = 'new name' | |
| 179 self.gi.workflows.update_workflow(wf['id'], name=new_name) | |
| 180 updated_wf = self.gi.workflows.show_workflow(wf['id']) | |
| 181 self.assertEqual(updated_wf['name'], new_name) | |
| 182 self.assertEqual(updated_wf['version'], 1) | |
| 183 updated_wf = self.gi.workflows.show_workflow(wf['id'], version=0) | |
| 184 self.assertEqual(updated_wf['name'], 'paste_columns') | |
| 185 self.assertEqual(updated_wf['version'], 0) | |
| 186 updated_wf = self.gi.workflows.show_workflow(wf['id'], version=1) | |
| 187 self.assertEqual(updated_wf['name'], new_name) | |
| 188 self.assertEqual(updated_wf['version'], 1) | |
| 189 | |
| 190 def test_run_workflow(self): | |
| 191 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
| 192 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
| 193 # Try invalid run of workflow | |
| 194 with self.assertRaises(Exception): | |
| 195 self.gi.workflows.run_workflow(wf['id'], None) | |
| 196 | |
| 197 def test_invoke_workflow(self): | |
| 198 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
| 199 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
| 200 history_id = self.gi.histories.create_history(name="test_wf_invocation")['id'] | |
| 201 dataset1_id = self._test_dataset(history_id) | |
| 202 dataset = {'src': 'hda', 'id': dataset1_id} | |
| 203 invoke_response = self.gi.workflows.invoke_workflow( | |
| 204 wf['id'], | |
| 205 inputs={'Input 1': dataset, 'Input 2': dataset}, | |
| 206 history_id=history_id, | |
| 207 inputs_by='name', | |
| 208 ) | |
| 209 assert invoke_response['state'] == 'new', invoke_response |
