Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/bioblend/_tests/TestGalaxyWorkflows.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 import json | |
2 import os | |
3 import shutil | |
4 import tempfile | |
5 import time | |
6 | |
7 from . import GalaxyTestBase, test_util | |
8 | |
9 | |
10 class TestGalaxyWorkflows(GalaxyTestBase.GalaxyTestBase): | |
11 | |
12 @test_util.skip_unless_tool("cat1") | |
13 @test_util.skip_unless_tool("cat") | |
14 def test_workflow_scheduling(self): | |
15 path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga')) | |
16 workflow = self.gi.workflows.import_workflow_from_local_path(path) | |
17 workflow_id = workflow["id"] | |
18 history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"] | |
19 dataset1_id = self._test_dataset(history_id) | |
20 | |
21 invocations = self.gi.workflows.get_invocations(workflow_id) | |
22 self.assertEqual(len(invocations), 0) | |
23 | |
24 invocation = self.gi.workflows.invoke_workflow( | |
25 workflow["id"], | |
26 inputs={"0": {"src": "hda", "id": dataset1_id}}, | |
27 ) | |
28 invocation_id = invocation["id"] | |
29 invocations = self.gi.workflows.get_invocations(workflow_id) | |
30 self.assertEqual(len(invocations), 1) | |
31 self.assertEqual(invocations[0]["id"], invocation_id) | |
32 | |
33 def invocation_steps_by_order_index(): | |
34 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
35 return dict((s["order_index"], s) for s in invocation["steps"]) | |
36 | |
37 for _ in range(20): | |
38 if 2 in invocation_steps_by_order_index(): | |
39 break | |
40 time.sleep(.5) | |
41 | |
42 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
43 self.assertEqual(invocation['state'], "ready") | |
44 | |
45 steps = invocation_steps_by_order_index() | |
46 pause_step = steps[2] | |
47 self.assertIsNone( | |
48 self.gi.workflows.show_invocation_step(workflow_id, invocation_id, pause_step["id"])["action"]) | |
49 self.gi.workflows.run_invocation_step_action(workflow_id, invocation_id, pause_step["id"], action=True) | |
50 self.assertTrue(self.gi.workflows.show_invocation_step(workflow_id, invocation_id, pause_step["id"])["action"]) | |
51 for _ in range(20): | |
52 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
53 if invocation["state"] == "scheduled": | |
54 break | |
55 | |
56 time.sleep(.5) | |
57 | |
58 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
59 self.assertEqual(invocation["state"], "scheduled") | |
60 | |
61 @test_util.skip_unless_tool("cat1") | |
62 @test_util.skip_unless_tool("cat") | |
63 def test_cancelling_workflow_scheduling(self): | |
64 path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga')) | |
65 workflow = self.gi.workflows.import_workflow_from_local_path(path) | |
66 workflow_id = workflow["id"] | |
67 history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"] | |
68 dataset1_id = self._test_dataset(history_id) | |
69 | |
70 invocations = self.gi.workflows.get_invocations(workflow_id) | |
71 self.assertEqual(len(invocations), 0) | |
72 | |
73 invocation = self.gi.workflows.invoke_workflow( | |
74 workflow["id"], | |
75 inputs={"0": {"src": "hda", "id": dataset1_id}}, | |
76 ) | |
77 invocation_id = invocation["id"] | |
78 invocations = self.gi.workflows.get_invocations(workflow_id) | |
79 self.assertEqual(len(invocations), 1) | |
80 self.assertEqual(invocations[0]["id"], invocation_id) | |
81 | |
82 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
83 self.assertIn(invocation['state'], ['new', 'ready']) | |
84 | |
85 self.gi.workflows.cancel_invocation(workflow_id, invocation_id) | |
86 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id) | |
87 self.assertEqual(invocation['state'], 'cancelled') | |
88 | |
89 def test_import_export_workflow_from_local_path(self): | |
90 with self.assertRaises(Exception): | |
91 self.gi.workflows.import_workflow_from_local_path(None) | |
92 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
93 imported_wf = self.gi.workflows.import_workflow_from_local_path(path) | |
94 self.assertIsInstance(imported_wf, dict) | |
95 self.assertEqual(imported_wf['name'], 'paste_columns') | |
96 self.assertTrue(imported_wf['url'].startswith('/api/workflows/')) | |
97 self.assertFalse(imported_wf['deleted']) | |
98 self.assertFalse(imported_wf['published']) | |
99 with self.assertRaises(Exception): | |
100 self.gi.workflows.export_workflow_to_local_path(None, None, None) | |
101 export_dir = tempfile.mkdtemp(prefix='bioblend_test_') | |
102 try: | |
103 self.gi.workflows.export_workflow_to_local_path(imported_wf['id'], export_dir) | |
104 dir_contents = os.listdir(export_dir) | |
105 self.assertEqual(len(dir_contents), 1) | |
106 export_path = os.path.join(export_dir, dir_contents[0]) | |
107 with open(export_path, 'r') as f: | |
108 exported_wf_dict = json.load(f) | |
109 finally: | |
110 shutil.rmtree(export_dir) | |
111 self.assertIsInstance(exported_wf_dict, dict) | |
112 | |
113 def test_import_publish_workflow_from_local_path(self): | |
114 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
115 imported_wf = self.gi.workflows.import_workflow_from_local_path(path, publish=True) | |
116 self.assertIsInstance(imported_wf, dict) | |
117 self.assertFalse(imported_wf['deleted']) | |
118 self.assertTrue(imported_wf['published']) | |
119 | |
120 def test_import_export_workflow_dict(self): | |
121 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
122 with open(path, 'r') as f: | |
123 wf_dict = json.load(f) | |
124 imported_wf = self.gi.workflows.import_workflow_dict(wf_dict) | |
125 self.assertIsInstance(imported_wf, dict) | |
126 self.assertEqual(imported_wf['name'], 'paste_columns') | |
127 self.assertTrue(imported_wf['url'].startswith('/api/workflows/')) | |
128 self.assertFalse(imported_wf['deleted']) | |
129 self.assertFalse(imported_wf['published']) | |
130 exported_wf_dict = self.gi.workflows.export_workflow_dict(imported_wf['id']) | |
131 self.assertIsInstance(exported_wf_dict, dict) | |
132 | |
133 def test_import_publish_workflow_dict(self): | |
134 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
135 with open(path, 'r') as f: | |
136 wf_dict = json.load(f) | |
137 imported_wf = self.gi.workflows.import_workflow_dict(wf_dict, publish=True) | |
138 self.assertIsInstance(imported_wf, dict) | |
139 self.assertFalse(imported_wf['deleted']) | |
140 self.assertTrue(imported_wf['published']) | |
141 | |
142 def test_get_workflows(self): | |
143 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
144 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
145 all_wfs = self.gi.workflows.get_workflows() | |
146 self.assertGreater(len(all_wfs), 0) | |
147 wf_data = self.gi.workflows.get_workflows(workflow_id=wf['id'])[0] | |
148 self.assertEqual(wf['id'], wf_data['id']) | |
149 self.assertEqual(wf['name'], wf_data['name']) | |
150 self.assertEqual(wf['url'], wf_data['url']) | |
151 wf_data_list = self.gi.workflows.get_workflows(name=wf['name']) | |
152 self.assertTrue(any(_['id'] == wf['id'] for _ in wf_data_list)) | |
153 | |
154 def test_show_workflow(self): | |
155 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
156 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
157 wf_data = self.gi.workflows.show_workflow(wf['id']) | |
158 self.assertEqual(wf_data['id'], wf['id']) | |
159 self.assertEqual(wf_data['name'], wf['name']) | |
160 self.assertEqual(wf_data['url'], wf['url']) | |
161 self.assertEqual(len(wf_data['steps']), 3) | |
162 self.assertIsNotNone(wf_data['inputs']) | |
163 | |
164 @test_util.skip_unless_galaxy('release_18.05') | |
165 def test_update_workflow_name(self): | |
166 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
167 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
168 new_name = 'new name' | |
169 updated_wf = self.gi.workflows.update_workflow(wf['id'], name=new_name) | |
170 self.assertEqual(updated_wf['name'], new_name) | |
171 | |
172 @test_util.skip_unless_galaxy('release_19.09') # due to Galaxy bug fixed in https://github.com/galaxyproject/galaxy/pull/9014 | |
173 def test_show_workflow_versions(self): | |
174 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
175 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
176 wf_data = self.gi.workflows.show_workflow(wf['id']) | |
177 self.assertEqual(wf_data['version'], 0) | |
178 new_name = 'new name' | |
179 self.gi.workflows.update_workflow(wf['id'], name=new_name) | |
180 updated_wf = self.gi.workflows.show_workflow(wf['id']) | |
181 self.assertEqual(updated_wf['name'], new_name) | |
182 self.assertEqual(updated_wf['version'], 1) | |
183 updated_wf = self.gi.workflows.show_workflow(wf['id'], version=0) | |
184 self.assertEqual(updated_wf['name'], 'paste_columns') | |
185 self.assertEqual(updated_wf['version'], 0) | |
186 updated_wf = self.gi.workflows.show_workflow(wf['id'], version=1) | |
187 self.assertEqual(updated_wf['name'], new_name) | |
188 self.assertEqual(updated_wf['version'], 1) | |
189 | |
190 def test_run_workflow(self): | |
191 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
192 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
193 # Try invalid run of workflow | |
194 with self.assertRaises(Exception): | |
195 self.gi.workflows.run_workflow(wf['id'], None) | |
196 | |
197 def test_invoke_workflow(self): | |
198 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga')) | |
199 wf = self.gi.workflows.import_workflow_from_local_path(path) | |
200 history_id = self.gi.histories.create_history(name="test_wf_invocation")['id'] | |
201 dataset1_id = self._test_dataset(history_id) | |
202 dataset = {'src': 'hda', 'id': dataset1_id} | |
203 invoke_response = self.gi.workflows.invoke_workflow( | |
204 wf['id'], | |
205 inputs={'Input 1': dataset, 'Input 2': dataset}, | |
206 history_id=history_id, | |
207 inputs_by='name', | |
208 ) | |
209 assert invoke_response['state'] == 'new', invoke_response |