comparison planemo/lib/python3.7/site-packages/bioblend/_tests/TestGalaxyWorkflows.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d30785e31577
1 import json
2 import os
3 import shutil
4 import tempfile
5 import time
6
7 from . import GalaxyTestBase, test_util
8
9
10 class TestGalaxyWorkflows(GalaxyTestBase.GalaxyTestBase):
11
12 @test_util.skip_unless_tool("cat1")
13 @test_util.skip_unless_tool("cat")
14 def test_workflow_scheduling(self):
15 path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
16 workflow = self.gi.workflows.import_workflow_from_local_path(path)
17 workflow_id = workflow["id"]
18 history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
19 dataset1_id = self._test_dataset(history_id)
20
21 invocations = self.gi.workflows.get_invocations(workflow_id)
22 self.assertEqual(len(invocations), 0)
23
24 invocation = self.gi.workflows.invoke_workflow(
25 workflow["id"],
26 inputs={"0": {"src": "hda", "id": dataset1_id}},
27 )
28 invocation_id = invocation["id"]
29 invocations = self.gi.workflows.get_invocations(workflow_id)
30 self.assertEqual(len(invocations), 1)
31 self.assertEqual(invocations[0]["id"], invocation_id)
32
33 def invocation_steps_by_order_index():
34 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id)
35 return dict((s["order_index"], s) for s in invocation["steps"])
36
37 for _ in range(20):
38 if 2 in invocation_steps_by_order_index():
39 break
40 time.sleep(.5)
41
42 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id)
43 self.assertEqual(invocation['state'], "ready")
44
45 steps = invocation_steps_by_order_index()
46 pause_step = steps[2]
47 self.assertIsNone(
48 self.gi.workflows.show_invocation_step(workflow_id, invocation_id, pause_step["id"])["action"])
49 self.gi.workflows.run_invocation_step_action(workflow_id, invocation_id, pause_step["id"], action=True)
50 self.assertTrue(self.gi.workflows.show_invocation_step(workflow_id, invocation_id, pause_step["id"])["action"])
51 for _ in range(20):
52 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id)
53 if invocation["state"] == "scheduled":
54 break
55
56 time.sleep(.5)
57
58 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id)
59 self.assertEqual(invocation["state"], "scheduled")
60
61 @test_util.skip_unless_tool("cat1")
62 @test_util.skip_unless_tool("cat")
63 def test_cancelling_workflow_scheduling(self):
64 path = test_util.get_abspath(os.path.join('data', 'test_workflow_pause.ga'))
65 workflow = self.gi.workflows.import_workflow_from_local_path(path)
66 workflow_id = workflow["id"]
67 history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
68 dataset1_id = self._test_dataset(history_id)
69
70 invocations = self.gi.workflows.get_invocations(workflow_id)
71 self.assertEqual(len(invocations), 0)
72
73 invocation = self.gi.workflows.invoke_workflow(
74 workflow["id"],
75 inputs={"0": {"src": "hda", "id": dataset1_id}},
76 )
77 invocation_id = invocation["id"]
78 invocations = self.gi.workflows.get_invocations(workflow_id)
79 self.assertEqual(len(invocations), 1)
80 self.assertEqual(invocations[0]["id"], invocation_id)
81
82 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id)
83 self.assertIn(invocation['state'], ['new', 'ready'])
84
85 self.gi.workflows.cancel_invocation(workflow_id, invocation_id)
86 invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id)
87 self.assertEqual(invocation['state'], 'cancelled')
88
89 def test_import_export_workflow_from_local_path(self):
90 with self.assertRaises(Exception):
91 self.gi.workflows.import_workflow_from_local_path(None)
92 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
93 imported_wf = self.gi.workflows.import_workflow_from_local_path(path)
94 self.assertIsInstance(imported_wf, dict)
95 self.assertEqual(imported_wf['name'], 'paste_columns')
96 self.assertTrue(imported_wf['url'].startswith('/api/workflows/'))
97 self.assertFalse(imported_wf['deleted'])
98 self.assertFalse(imported_wf['published'])
99 with self.assertRaises(Exception):
100 self.gi.workflows.export_workflow_to_local_path(None, None, None)
101 export_dir = tempfile.mkdtemp(prefix='bioblend_test_')
102 try:
103 self.gi.workflows.export_workflow_to_local_path(imported_wf['id'], export_dir)
104 dir_contents = os.listdir(export_dir)
105 self.assertEqual(len(dir_contents), 1)
106 export_path = os.path.join(export_dir, dir_contents[0])
107 with open(export_path, 'r') as f:
108 exported_wf_dict = json.load(f)
109 finally:
110 shutil.rmtree(export_dir)
111 self.assertIsInstance(exported_wf_dict, dict)
112
113 def test_import_publish_workflow_from_local_path(self):
114 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
115 imported_wf = self.gi.workflows.import_workflow_from_local_path(path, publish=True)
116 self.assertIsInstance(imported_wf, dict)
117 self.assertFalse(imported_wf['deleted'])
118 self.assertTrue(imported_wf['published'])
119
120 def test_import_export_workflow_dict(self):
121 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
122 with open(path, 'r') as f:
123 wf_dict = json.load(f)
124 imported_wf = self.gi.workflows.import_workflow_dict(wf_dict)
125 self.assertIsInstance(imported_wf, dict)
126 self.assertEqual(imported_wf['name'], 'paste_columns')
127 self.assertTrue(imported_wf['url'].startswith('/api/workflows/'))
128 self.assertFalse(imported_wf['deleted'])
129 self.assertFalse(imported_wf['published'])
130 exported_wf_dict = self.gi.workflows.export_workflow_dict(imported_wf['id'])
131 self.assertIsInstance(exported_wf_dict, dict)
132
133 def test_import_publish_workflow_dict(self):
134 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
135 with open(path, 'r') as f:
136 wf_dict = json.load(f)
137 imported_wf = self.gi.workflows.import_workflow_dict(wf_dict, publish=True)
138 self.assertIsInstance(imported_wf, dict)
139 self.assertFalse(imported_wf['deleted'])
140 self.assertTrue(imported_wf['published'])
141
142 def test_get_workflows(self):
143 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
144 wf = self.gi.workflows.import_workflow_from_local_path(path)
145 all_wfs = self.gi.workflows.get_workflows()
146 self.assertGreater(len(all_wfs), 0)
147 wf_data = self.gi.workflows.get_workflows(workflow_id=wf['id'])[0]
148 self.assertEqual(wf['id'], wf_data['id'])
149 self.assertEqual(wf['name'], wf_data['name'])
150 self.assertEqual(wf['url'], wf_data['url'])
151 wf_data_list = self.gi.workflows.get_workflows(name=wf['name'])
152 self.assertTrue(any(_['id'] == wf['id'] for _ in wf_data_list))
153
154 def test_show_workflow(self):
155 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
156 wf = self.gi.workflows.import_workflow_from_local_path(path)
157 wf_data = self.gi.workflows.show_workflow(wf['id'])
158 self.assertEqual(wf_data['id'], wf['id'])
159 self.assertEqual(wf_data['name'], wf['name'])
160 self.assertEqual(wf_data['url'], wf['url'])
161 self.assertEqual(len(wf_data['steps']), 3)
162 self.assertIsNotNone(wf_data['inputs'])
163
164 @test_util.skip_unless_galaxy('release_18.05')
165 def test_update_workflow_name(self):
166 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
167 wf = self.gi.workflows.import_workflow_from_local_path(path)
168 new_name = 'new name'
169 updated_wf = self.gi.workflows.update_workflow(wf['id'], name=new_name)
170 self.assertEqual(updated_wf['name'], new_name)
171
172 @test_util.skip_unless_galaxy('release_19.09') # due to Galaxy bug fixed in https://github.com/galaxyproject/galaxy/pull/9014
173 def test_show_workflow_versions(self):
174 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
175 wf = self.gi.workflows.import_workflow_from_local_path(path)
176 wf_data = self.gi.workflows.show_workflow(wf['id'])
177 self.assertEqual(wf_data['version'], 0)
178 new_name = 'new name'
179 self.gi.workflows.update_workflow(wf['id'], name=new_name)
180 updated_wf = self.gi.workflows.show_workflow(wf['id'])
181 self.assertEqual(updated_wf['name'], new_name)
182 self.assertEqual(updated_wf['version'], 1)
183 updated_wf = self.gi.workflows.show_workflow(wf['id'], version=0)
184 self.assertEqual(updated_wf['name'], 'paste_columns')
185 self.assertEqual(updated_wf['version'], 0)
186 updated_wf = self.gi.workflows.show_workflow(wf['id'], version=1)
187 self.assertEqual(updated_wf['name'], new_name)
188 self.assertEqual(updated_wf['version'], 1)
189
190 def test_run_workflow(self):
191 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
192 wf = self.gi.workflows.import_workflow_from_local_path(path)
193 # Try invalid run of workflow
194 with self.assertRaises(Exception):
195 self.gi.workflows.run_workflow(wf['id'], None)
196
197 def test_invoke_workflow(self):
198 path = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
199 wf = self.gi.workflows.import_workflow_from_local_path(path)
200 history_id = self.gi.histories.create_history(name="test_wf_invocation")['id']
201 dataset1_id = self._test_dataset(history_id)
202 dataset = {'src': 'hda', 'id': dataset1_id}
203 invoke_response = self.gi.workflows.invoke_workflow(
204 wf['id'],
205 inputs={'Input 1': dataset, 'Input 2': dataset},
206 history_id=history_id,
207 inputs_by='name',
208 )
209 assert invoke_response['state'] == 'new', invoke_response