comparison env/lib/python3.9/site-packages/bioblend/_tests/TestGalaxyObjects.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # pylint: disable=C0103,E1101
2 import json
3 import os
4 import shutil
5 import socket
6 import sys
7 import tarfile
8 import tempfile
9 import uuid
10 from ssl import SSLError
11 from urllib.error import URLError
12 from urllib.request import urlopen
13
14 import bioblend
15 import bioblend.galaxy.objects.galaxy_instance as galaxy_instance
16 import bioblend.galaxy.objects.wrappers as wrappers
17 from bioblend.galaxy import dataset_collections
18 from . import test_util
19 from .test_util import unittest
20
21 bioblend.set_stream_logger('test', level='INFO')
22 socket.setdefaulttimeout(10.0)
23 SAMPLE_FN = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
24 SAMPLE_WF_COLL_FN = test_util.get_abspath(os.path.join('data', 'paste_columns_collections.ga'))
25 SAMPLE_WF_PARAMETER_INPUT_FN = test_util.get_abspath(os.path.join('data', 'workflow_with_parameter_input.ga'))
26 FOO_DATA = 'foo\nbar\n'
27 FOO_DATA_2 = 'foo2\nbar2\n'
28 SAMPLE_WF_DICT = {
29 'deleted': False,
30 'id': '9005c5112febe774',
31 'inputs': {
32 '571': {'label': 'Input Dataset', 'value': ''},
33 '572': {'label': 'Input Dataset', 'value': ''},
34 },
35 'model_class': 'StoredWorkflow',
36 'name': 'paste_columns',
37 'owner': 'user_foo',
38 'published': False,
39 'steps': {
40 '571': {
41 'id': 571,
42 'input_steps': {},
43 'tool_id': None,
44 'tool_inputs': {'name': 'Input Dataset'},
45 'tool_version': None,
46 'type': 'data_input',
47 },
48 '572': {
49 'id': 572,
50 'input_steps': {},
51 'tool_id': None,
52 'tool_inputs': {'name': 'Input Dataset'},
53 'tool_version': None,
54 'type': 'data_input',
55 },
56 '573': {
57 'id': 573,
58 'input_steps': {
59 'input1': {'source_step': 571, 'step_output': 'output'},
60 'input2': {'source_step': 572, 'step_output': 'output'},
61 },
62 'tool_id': 'Paste1',
63 'tool_inputs': {
64 'delimiter': '"T"',
65 'input1': 'null',
66 'input2': 'null',
67 },
68 'tool_version': '1.0.0',
69 'type': 'tool',
70 }
71 },
72 'tags': [],
73 'url': '/api/workflows/9005c5112febe774',
74 }
75
76
77 def is_reachable(url):
78 res = None
79 try:
80 res = urlopen(url, timeout=5)
81 except (SSLError, URLError, socket.timeout):
82 return False
83 if res is not None:
84 res.close()
85 return True
86
87
88 def upload_from_fs(lib, bnames, **kwargs):
89 tempdir = tempfile.mkdtemp(prefix='bioblend_test_')
90 try:
91 fnames = [os.path.join(tempdir, _) for _ in bnames]
92 for fn in fnames:
93 with open(fn, 'w') as f:
94 f.write(FOO_DATA)
95 dss = lib.upload_from_galaxy_fs(fnames, **kwargs)
96 finally:
97 shutil.rmtree(tempdir)
98 return dss, fnames
99
100
101 class MockWrapper(wrappers.Wrapper):
102 BASE_ATTRS = frozenset(['a', 'b'])
103
104 def __init__(self, *args, **kwargs):
105 super().__init__(*args, **kwargs)
106
107 @property
108 def gi_module(self):
109 return super().gi_module()
110
111
112 class TestWrapper(unittest.TestCase):
113
114 def setUp(self):
115 self.d = {'a': 1, 'b': [2, 3], 'c': {'x': 4}}
116 self.assertRaises(TypeError, wrappers.Wrapper, self.d)
117 self.w = MockWrapper(self.d)
118
119 def test_initialize(self):
120 for k in MockWrapper.BASE_ATTRS:
121 self.assertEqual(getattr(self.w, k), self.d[k])
122 self.w.a = 222
123 self.w.b[0] = 222
124 self.assertEqual(self.w.a, 222)
125 self.assertEqual(self.w.b[0], 222)
126 self.assertEqual(self.d['a'], 1)
127 self.assertEqual(self.d['b'][0], 2)
128 self.assertRaises(AttributeError, getattr, self.w, 'foo')
129 self.assertRaises(AttributeError, setattr, self.w, 'foo', 0)
130
131 def test_taint(self):
132 self.assertFalse(self.w.is_modified)
133 self.w.a = 111 # pylint: disable=W0201
134 self.assertTrue(self.w.is_modified)
135
136 def test_serialize(self):
137 w = MockWrapper.from_json(self.w.to_json())
138 self.assertEqual(w.wrapped, self.w.wrapped)
139
140 def test_clone(self):
141 w = self.w.clone()
142 self.assertEqual(w.wrapped, self.w.wrapped)
143 w.b[0] = 111
144 self.assertEqual(self.w.b[0], 2)
145
146 def test_kwargs(self):
147 parent = MockWrapper({'a': 10})
148 w = MockWrapper(self.d, parent=parent)
149 self.assertIs(w.parent, parent)
150 self.assertRaises(AttributeError, setattr, w, 'parent', 0)
151
152
153 class TestWorkflow(unittest.TestCase):
154
155 def setUp(self):
156 self.wf = wrappers.Workflow(SAMPLE_WF_DICT)
157
158 def test_initialize(self):
159 self.assertEqual(self.wf.id, '9005c5112febe774')
160 self.assertEqual(self.wf.name, 'paste_columns')
161 self.assertEqual(self.wf.deleted, False)
162 self.assertEqual(self.wf.owner, 'user_foo')
163 self.assertEqual(self.wf.published, False)
164 self.assertEqual(self.wf.tags, [])
165 self.assertEqual(
166 self.wf.input_labels_to_ids, {'Input Dataset': {'571', '572'}})
167 self.assertEqual(self.wf.tool_labels_to_ids, {'Paste1': {'573'}})
168 self.assertEqual(self.wf.data_input_ids, {'571', '572'})
169 self.assertEqual(self.wf.source_ids, {'571', '572'})
170 self.assertEqual(self.wf.sink_ids, {'573'})
171
172 def test_dag(self):
173 inv_dag = {}
174 for h, tails in self.wf.dag.items():
175 for t in tails:
176 inv_dag.setdefault(str(t), set()).add(h)
177 self.assertEqual(self.wf.inv_dag, inv_dag)
178 heads = set(self.wf.dag)
179 self.assertEqual(heads, set.union(*self.wf.inv_dag.values()))
180 tails = set(self.wf.inv_dag)
181 self.assertEqual(tails, set.union(*self.wf.dag.values()))
182 ids = self.wf.sorted_step_ids()
183 self.assertEqual(set(ids), heads | tails)
184 for h, tails in self.wf.dag.items():
185 for t in tails:
186 self.assertLess(ids.index(h), ids.index(t))
187
188 def test_steps(self):
189 steps = SAMPLE_WF_DICT['steps']
190 for sid, s in self.wf.steps.items():
191 self.assertIsInstance(s, wrappers.Step)
192 self.assertEqual(s.id, sid)
193 self.assertIn(sid, steps)
194 self.assertIs(s.parent, self.wf)
195 self.assertEqual(self.wf.data_input_ids, {'571', '572'})
196 self.assertEqual(self.wf.tool_ids, {'573'})
197
198 def test_taint(self):
199 self.assertFalse(self.wf.is_modified)
200 self.wf.steps['571'].tool_id = 'foo'
201 self.assertTrue(self.wf.is_modified)
202
203 def test_input_map(self):
204 class DummyLD:
205 SRC = 'ld'
206
207 def __init__(self, id_):
208 self.id = id_
209
210 label = 'Input Dataset'
211 self.assertEqual(self.wf.input_labels, {label})
212 input_map = self.wf.convert_input_map(
213 {label: [DummyLD('a'), DummyLD('b')]})
214 # {'571': {'id': 'a', 'src': 'ld'}, '572': {'id': 'b', 'src': 'ld'}}
215 # OR
216 # {'571': {'id': 'b', 'src': 'ld'}, '572': {'id': 'a', 'src': 'ld'}}
217 self.assertEqual(set(input_map), {'571', '572'})
218 for d in input_map.values():
219 self.assertEqual(set(d), {'id', 'src'})
220 self.assertEqual(d['src'], 'ld')
221 self.assertIn(d['id'], 'ab')
222
223
224 @test_util.skip_unless_galaxy()
225 class GalaxyObjectsTestBase(unittest.TestCase):
226
227 def setUp(self):
228 galaxy_key = os.environ['BIOBLEND_GALAXY_API_KEY']
229 galaxy_url = os.environ['BIOBLEND_GALAXY_URL']
230 self.gi = galaxy_instance.GalaxyInstance(galaxy_url, galaxy_key)
231
232
233 class TestGalaxyInstance(GalaxyObjectsTestBase):
234
235 def test_library(self):
236 name = 'test_%s' % uuid.uuid4().hex
237 description, synopsis = 'D', 'S'
238 lib = self.gi.libraries.create(
239 name, description=description, synopsis=synopsis)
240 self.assertEqual(lib.name, name)
241 self.assertEqual(lib.description, description)
242 self.assertEqual(lib.synopsis, synopsis)
243 self.assertEqual(len(lib.content_infos), 1) # root folder
244 self.assertEqual(len(lib.folder_ids), 1)
245 self.assertEqual(len(lib.dataset_ids), 0)
246 self.assertIn(lib.id, [_.id for _ in self.gi.libraries.list()])
247 lib.delete()
248 self.assertFalse(lib.is_mapped)
249
250 def test_workflow_from_str(self):
251 with open(SAMPLE_FN) as f:
252 wf = self.gi.workflows.import_new(f.read())
253 self._check_and_del_workflow(wf)
254
255 def test_workflow_collections_from_str(self):
256 with open(SAMPLE_WF_COLL_FN) as f:
257 wf = self.gi.workflows.import_new(f.read())
258 self._check_and_del_workflow(wf)
259
260 @test_util.skip_unless_galaxy('release_19.01')
261 def test_workflow_parameter_input(self):
262 with open(SAMPLE_WF_PARAMETER_INPUT_FN) as f:
263 self.gi.workflows.import_new(f.read())
264
265 def test_workflow_from_dict(self):
266 with open(SAMPLE_FN) as f:
267 wf = self.gi.workflows.import_new(json.load(f))
268 self._check_and_del_workflow(wf)
269
270 def test_workflow_publish_from_dict(self):
271 with open(SAMPLE_FN) as f:
272 wf = self.gi.workflows.import_new(json.load(f), publish=True)
273 self._check_and_del_workflow(wf, check_is_public=True)
274
275 def test_workflow_missing_tools(self):
276 with open(SAMPLE_FN) as f:
277 wf_dump = json.load(f)
278 wf_info = self.gi.gi.workflows.import_workflow_dict(wf_dump)
279 wf_dict = self.gi.gi.workflows.show_workflow(wf_info['id'])
280 for id_, step in wf_dict['steps'].items():
281 if step['type'] == 'tool':
282 for k in 'tool_inputs', 'tool_version':
283 wf_dict['steps'][id_][k] = None
284 wf = wrappers.Workflow(wf_dict, gi=self.gi)
285 self.assertFalse(wf.is_runnable)
286 self.assertRaises(RuntimeError, wf.run)
287 wf.delete()
288
289 def test_workflow_export(self):
290 with open(SAMPLE_FN) as f:
291 wf1 = self.gi.workflows.import_new(f.read())
292 wf2 = self.gi.workflows.import_new(wf1.export())
293 self.assertNotEqual(wf1.id, wf2.id)
294 for wf in wf1, wf2:
295 self._check_and_del_workflow(wf)
296
297 def _check_and_del_workflow(self, wf, check_is_public=False):
298 # Galaxy appends additional text to imported workflow names
299 self.assertTrue(wf.name.startswith('paste_columns'))
300 self.assertEqual(type(wf.owner), str)
301 self.assertEqual(len(wf.steps), 3)
302 for step_id, step in wf.steps.items():
303 self.assertIsInstance(step, wrappers.Step)
304 self.assertEqual(step_id, step.id)
305 self.assertIsInstance(step.tool_inputs, dict)
306 if step.type == 'tool':
307 self.assertIsNotNone(step.tool_id)
308 self.assertIsNotNone(step.tool_version)
309 self.assertIsInstance(step.input_steps, dict)
310 elif step.type in ('data_collection_input', 'data_input'):
311 self.assertIsNone(step.tool_id)
312 self.assertIsNone(step.tool_version)
313 self.assertEqual(step.input_steps, {})
314 wf_ids = {_.id for _ in self.gi.workflows.list()}
315 self.assertIn(wf.id, wf_ids)
316 if check_is_public:
317 self.assertTrue(wf.published)
318 wf.delete()
319
320 # not very accurate:
321 # * we can't publish a wf from the API
322 # * we can't directly get another user's wf
323 def test_workflow_from_shared(self):
324 all_prevs = {
325 _.id: _ for _ in self.gi.workflows.get_previews(published=True)
326 }
327 pub_only_ids = set(all_prevs).difference(
328 _.id for _ in self.gi.workflows.get_previews())
329 if pub_only_ids:
330 wf_id = pub_only_ids.pop()
331 imported = self.gi.workflows.import_shared(wf_id)
332 self.assertIsInstance(imported, wrappers.Workflow)
333 imported.delete()
334 else:
335 self.skipTest('no published workflows, manually publish a workflow to run this test')
336
337 def test_get_libraries(self):
338 self._test_multi_get('library')
339
340 def test_get_histories(self):
341 self._test_multi_get('history')
342
343 def test_get_workflows(self):
344 self._test_multi_get('workflow')
345
346 def _normalized_functions(self, obj_type):
347 if obj_type == 'library':
348 create = self.gi.libraries.create
349 get_objs = self.gi.libraries.list
350 get_prevs = self.gi.libraries.get_previews
351 del_kwargs = {}
352 elif obj_type == 'history':
353 create = self.gi.histories.create
354 get_objs = self.gi.histories.list
355 get_prevs = self.gi.histories.get_previews
356 del_kwargs = {'purge': True}
357 elif obj_type == 'workflow':
358 def create(name):
359 with open(SAMPLE_FN) as f:
360 d = json.load(f)
361 d['name'] = name
362 return self.gi.workflows.import_new(d)
363
364 get_objs = self.gi.workflows.list
365 get_prevs = self.gi.workflows.get_previews
366 del_kwargs = {}
367 return create, get_objs, get_prevs, del_kwargs
368
369 def _test_multi_get(self, obj_type):
370 create, get_objs, get_prevs, del_kwargs = self._normalized_functions(
371 obj_type)
372
373 def ids(seq):
374 return {_.id for _ in seq}
375
376 names = ['test_%s' % uuid.uuid4().hex for _ in range(2)]
377 objs = []
378 try:
379 objs = [create(_) for _ in names]
380 self.assertLessEqual(ids(objs), ids(get_objs()))
381 if obj_type != 'workflow':
382 filtered = get_objs(name=names[0])
383 self.assertEqual(len(filtered), 1)
384 self.assertEqual(filtered[0].id, objs[0].id)
385 del_id = objs[-1].id
386 objs.pop().delete(**del_kwargs)
387 self.assertIn(del_id, ids(get_prevs(deleted=True)))
388 else:
389 # Galaxy appends info strings to imported workflow names
390 prev = get_prevs()[0]
391 filtered = get_objs(name=prev.name)
392 self.assertEqual(len(filtered), 1)
393 self.assertEqual(filtered[0].id, prev.id)
394 finally:
395 for o in objs:
396 o.delete(**del_kwargs)
397
398 def test_delete_libraries_by_name(self):
399 self._test_delete_by_name('library')
400
401 def test_delete_histories_by_name(self):
402 self._test_delete_by_name('history')
403
404 def test_delete_workflows_by_name(self):
405 self._test_delete_by_name('workflow')
406
407 def _test_delete_by_name(self, obj_type):
408 create, _, get_prevs, del_kwargs = self._normalized_functions(
409 obj_type)
410 name = 'test_%s' % uuid.uuid4().hex
411 objs = [create(name) for _ in range(2)] # noqa: F812
412 final_name = objs[0].name
413 prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted]
414 self.assertEqual(len(prevs), len(objs))
415 del_kwargs['name'] = final_name
416 objs[0].gi_module.delete(**del_kwargs)
417 prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted]
418 self.assertEqual(len(prevs), 0)
419
420
421 class TestLibrary(GalaxyObjectsTestBase):
422 # just something that can be expected to be always up
423 DS_URL = 'https://tools.ietf.org/rfc/rfc1866.txt'
424
425 def setUp(self):
426 super().setUp()
427 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
428
429 def tearDown(self):
430 self.lib.delete()
431
432 def test_root_folder(self):
433 r = self.lib.root_folder
434 self.assertIsNone(r.parent)
435
436 def test_folder(self):
437 name, desc = 'test_%s' % uuid.uuid4().hex, 'D'
438 folder = self.lib.create_folder(name, description=desc)
439 self.assertEqual(folder.name, name)
440 self.assertEqual(folder.description, desc)
441 self.assertIs(folder.container, self.lib)
442 self.assertEqual(folder.parent.id, self.lib.root_folder.id)
443 self.assertEqual(len(self.lib.content_infos), 2)
444 self.assertEqual(len(self.lib.folder_ids), 2)
445 self.assertIn(folder.id, self.lib.folder_ids)
446 retrieved = self.lib.get_folder(folder.id)
447 self.assertEqual(folder.id, retrieved.id)
448
449 def _check_datasets(self, dss):
450 self.assertEqual(len(dss), len(self.lib.dataset_ids))
451 self.assertEqual({_.id for _ in dss}, set(self.lib.dataset_ids))
452 for ds in dss:
453 self.assertIs(ds.container, self.lib)
454
455 def test_dataset(self):
456 folder = self.lib.create_folder('test_%s' % uuid.uuid4().hex)
457 ds = self.lib.upload_data(FOO_DATA, folder=folder)
458 self.assertEqual(len(self.lib.content_infos), 3)
459 self.assertEqual(len(self.lib.folder_ids), 2)
460 self._check_datasets([ds])
461
462 def test_dataset_from_url(self):
463 if is_reachable(self.DS_URL):
464 ds = self.lib.upload_from_url(self.DS_URL)
465 self._check_datasets([ds])
466 else:
467 self.skipTest('%s not reachable' % self.DS_URL)
468
469 def test_dataset_from_local(self):
470 with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f:
471 f.write(FOO_DATA)
472 f.flush()
473 ds = self.lib.upload_from_local(f.name)
474 self._check_datasets([ds])
475
476 def test_datasets_from_fs(self):
477 bnames = ['f%d.txt' % i for i in range(2)]
478 dss, fnames = upload_from_fs(self.lib, bnames)
479 self._check_datasets(dss)
480 dss, fnames = upload_from_fs(
481 self.lib, bnames, link_data_only='link_to_files')
482 for ds, fn in zip(dss, fnames):
483 self.assertEqual(ds.file_name, fn)
484
485 def test_copy_from_dataset(self):
486 hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
487 try:
488 hda = hist.paste_content(FOO_DATA)
489 ds = self.lib.copy_from_dataset(hda)
490 finally:
491 hist.delete(purge=True)
492 self._check_datasets([ds])
493
494 def test_get_dataset(self):
495 ds = self.lib.upload_data(FOO_DATA)
496 retrieved = self.lib.get_dataset(ds.id)
497 self.assertEqual(ds.id, retrieved.id)
498
499 def test_get_datasets(self):
500 bnames = ['f%d.txt' % _ for _ in range(2)]
501 dss, _ = upload_from_fs(self.lib, bnames)
502 retrieved = self.lib.get_datasets()
503 self.assertEqual(len(dss), len(retrieved))
504 self.assertEqual({_.id for _ in dss}, {_.id for _ in retrieved})
505 name = '/%s' % bnames[0]
506 selected = self.lib.get_datasets(name=name)
507 self.assertEqual(len(selected), 1)
508 self.assertEqual(selected[0].name, bnames[0])
509
510
511 class TestLDContents(GalaxyObjectsTestBase):
512
513 def setUp(self):
514 super().setUp()
515 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
516 self.ds = self.lib.upload_data(FOO_DATA)
517 self.ds.wait()
518
519 def tearDown(self):
520 self.lib.delete()
521
522 def test_dataset_get_stream(self):
523 for idx, c in enumerate(self.ds.get_stream(chunk_size=1)):
524 self.assertEqual(FOO_DATA[idx].encode(), c)
525
526 def test_dataset_peek(self):
527 fetched_data = self.ds.peek(chunk_size=4)
528 self.assertEqual(FOO_DATA[0:4].encode(), fetched_data)
529
530 def test_dataset_download(self):
531 with tempfile.TemporaryFile() as f:
532 self.ds.download(f)
533 f.seek(0)
534 self.assertEqual(FOO_DATA.encode(), f.read())
535
536 def test_dataset_get_contents(self):
537 self.assertEqual(FOO_DATA.encode(), self.ds.get_contents())
538
539 def test_dataset_delete(self):
540 self.ds.delete()
541 # Cannot test this yet because the 'deleted' attribute is not exported
542 # by the API at the moment
543 # self.assertTrue(self.ds.deleted)
544
545 def test_dataset_update(self):
546 new_name = 'test_%s' % uuid.uuid4().hex
547 new_misc_info = 'Annotation for %s' % new_name
548 new_genome_build = 'hg19'
549 updated_ldda = self.ds.update(name=new_name, misc_info=new_misc_info, genome_build=new_genome_build)
550 self.assertEqual(self.ds.id, updated_ldda.id)
551 self.assertEqual(self.ds.name, new_name)
552 self.assertEqual(self.ds.misc_info, new_misc_info)
553 self.assertEqual(self.ds.genome_build, new_genome_build)
554
555
556 class TestHistory(GalaxyObjectsTestBase):
557
558 def setUp(self):
559 super().setUp()
560 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
561
562 def tearDown(self):
563 self.hist.delete(purge=True)
564
565 def test_create_delete(self):
566 name = 'test_%s' % uuid.uuid4().hex
567 hist = self.gi.histories.create(name)
568 self.assertEqual(hist.name, name)
569 hist_id = hist.id
570 self.assertIn(hist_id, [_.id for _ in self.gi.histories.list()])
571 hist.delete(purge=True)
572 self.assertFalse(hist.is_mapped)
573 h = self.gi.histories.get(hist_id)
574 self.assertTrue(h.deleted)
575
576 def _check_dataset(self, hda):
577 self.assertIsInstance(hda, wrappers.HistoryDatasetAssociation)
578 self.assertIs(hda.container, self.hist)
579 self.assertEqual(len(self.hist.dataset_ids), 1)
580 self.assertEqual(self.hist.dataset_ids[0], hda.id)
581
582 def test_import_dataset(self):
583 lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
584 lds = lib.upload_data(FOO_DATA)
585 self.assertEqual(len(self.hist.dataset_ids), 0)
586 hda = self.hist.import_dataset(lds)
587 lib.delete()
588 self._check_dataset(hda)
589
590 def test_upload_file(self):
591 with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f:
592 f.write(FOO_DATA)
593 f.flush()
594 hda = self.hist.upload_file(f.name)
595 self._check_dataset(hda)
596
597 def test_paste_content(self):
598 hda = self.hist.paste_content(FOO_DATA)
599 self._check_dataset(hda)
600
601 def test_get_dataset(self):
602 hda = self.hist.paste_content(FOO_DATA)
603 retrieved = self.hist.get_dataset(hda.id)
604 self.assertEqual(hda.id, retrieved.id)
605
606 def test_get_datasets(self):
607 bnames = ['f%d.txt' % _ for _ in range(2)]
608 lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
609 lds = upload_from_fs(lib, bnames)[0]
610 hdas = [self.hist.import_dataset(_) for _ in lds]
611 lib.delete()
612 retrieved = self.hist.get_datasets()
613 self.assertEqual(len(hdas), len(retrieved))
614 self.assertEqual({_.id for _ in hdas}, {_.id for _ in retrieved})
615 selected = self.hist.get_datasets(name=bnames[0])
616 self.assertEqual(len(selected), 1)
617 self.assertEqual(selected[0].name, bnames[0])
618
619 def test_export_and_download(self):
620 jeha_id = self.hist.export(wait=True, maxwait=60)
621 self.assertTrue(jeha_id)
622 tempdir = tempfile.mkdtemp(prefix='bioblend_test_')
623 temp_fn = os.path.join(tempdir, 'export.tar.gz')
624 try:
625 with open(temp_fn, 'wb') as fo:
626 self.hist.download(jeha_id, fo)
627 self.assertTrue(tarfile.is_tarfile(temp_fn))
628 finally:
629 shutil.rmtree(tempdir)
630
631 def test_update(self):
632 new_name = 'test_%s' % uuid.uuid4().hex
633 new_annotation = 'Annotation for %s' % new_name
634 new_tags = ['tag1', 'tag2']
635 updated_hist = self.hist.update(name=new_name, annotation=new_annotation, tags=new_tags)
636 self.assertEqual(self.hist.id, updated_hist.id)
637 self.assertEqual(self.hist.name, new_name)
638 self.assertEqual(self.hist.annotation, new_annotation)
639 self.assertEqual(self.hist.tags, new_tags)
640 updated_hist = self.hist.update(published=True)
641 self.assertEqual(self.hist.id, updated_hist.id)
642 self.assertTrue(self.hist.published)
643
644 def test_create_dataset_collection(self):
645 self._create_collection_description()
646 hdca = self.hist.create_dataset_collection(self.collection_description)
647 self.assertIsInstance(hdca, wrappers.HistoryDatasetCollectionAssociation)
648 self.assertEqual(hdca.collection_type, 'list')
649 self.assertIs(hdca.container, self.hist)
650 self.assertEqual(len(hdca.elements), 2)
651 self.assertEqual(self.dataset1.id, hdca.elements[0]['object']['id'])
652 self.assertEqual(self.dataset2.id, hdca.elements[1]['object']['id'])
653
654 def test_delete_dataset_collection(self):
655 self._create_collection_description()
656 hdca = self.hist.create_dataset_collection(self.collection_description)
657 hdca.delete()
658 self.assertTrue(hdca.deleted)
659
660 def _create_collection_description(self):
661 self.dataset1 = self.hist.paste_content(FOO_DATA)
662 self.dataset2 = self.hist.paste_content(FOO_DATA_2)
663 self.collection_description = dataset_collections.CollectionDescription(
664 name="MyDatasetList",
665 elements=[
666 dataset_collections.HistoryDatasetElement(name="sample1", id=self.dataset1.id),
667 dataset_collections.HistoryDatasetElement(name="sample2", id=self.dataset2.id),
668 ]
669 )
670
671
672 class TestHDAContents(GalaxyObjectsTestBase):
673
674 def setUp(self):
675 super().setUp()
676 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
677 self.ds = self.hist.paste_content(FOO_DATA)
678 self.ds.wait()
679
680 def tearDown(self):
681 self.hist.delete(purge=True)
682
683 def test_dataset_get_stream(self):
684 for idx, c in enumerate(self.ds.get_stream(chunk_size=1)):
685 self.assertEqual(FOO_DATA[idx].encode(), c)
686
687 def test_dataset_peek(self):
688 fetched_data = self.ds.peek(chunk_size=4)
689 self.assertEqual(FOO_DATA[0:4].encode(), fetched_data)
690
691 def test_dataset_download(self):
692 with tempfile.TemporaryFile() as f:
693 self.ds.download(f)
694 f.seek(0)
695 self.assertEqual(FOO_DATA.encode(), f.read())
696
697 def test_dataset_get_contents(self):
698 self.assertEqual(FOO_DATA.encode(), self.ds.get_contents())
699
700 def test_dataset_update(self):
701 new_name = 'test_%s' % uuid.uuid4().hex
702 new_annotation = 'Annotation for %s' % new_name
703 new_genome_build = 'hg19'
704 updated_hda = self.ds.update(name=new_name, annotation=new_annotation, genome_build=new_genome_build)
705 self.assertEqual(self.ds.id, updated_hda.id)
706 self.assertEqual(self.ds.name, new_name)
707 self.assertEqual(self.ds.annotation, new_annotation)
708 self.assertEqual(self.ds.genome_build, new_genome_build)
709
710 def test_dataset_delete(self):
711 self.ds.delete()
712 self.assertTrue(self.ds.deleted)
713 self.assertFalse(self.ds.purged)
714
715 def test_dataset_purge(self):
716 self.ds.delete(purge=True)
717 self.assertTrue(self.ds.deleted)
718 self.assertTrue(self.ds.purged)
719
720
721 class TestRunWorkflow(GalaxyObjectsTestBase):
722
723 def setUp(self):
724 super().setUp()
725 self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
726 with open(SAMPLE_FN) as f:
727 self.wf = self.gi.workflows.import_new(f.read())
728 self.contents = ['one\ntwo\n', '1\n2\n']
729 self.inputs = [self.lib.upload_data(_) for _ in self.contents]
730
731 def tearDown(self):
732 self.wf.delete()
733 self.lib.delete()
734
735 def _test(self, existing_hist=False, params=False):
736 hist_name = 'test_%s' % uuid.uuid4().hex
737 if existing_hist:
738 hist = self.gi.histories.create(hist_name)
739 else:
740 hist = hist_name
741 if params:
742 params = {'Paste1': {'delimiter': 'U'}}
743 sep = '_' # 'U' maps to '_' in the paste tool
744 else:
745 params = None
746 sep = '\t' # default
747 input_map = {'Input 1': self.inputs[0], 'Input 2': self.inputs[1]}
748 sys.stderr.write(os.linesep)
749 outputs, out_hist = self.wf.run(
750 input_map, hist, params=params, wait=True, polling_interval=1)
751 self.assertEqual(len(outputs), 1)
752 out_ds = outputs[0]
753 self.assertIn(out_ds.id, out_hist.dataset_ids)
754 res = out_ds.get_contents()
755 exp_rows = zip(*(_.splitlines() for _ in self.contents))
756 exp_res = ("\n".join(sep.join(t) for t in exp_rows) + "\n").encode()
757 self.assertEqual(res, exp_res)
758 if existing_hist:
759 self.assertEqual(out_hist.id, hist.id)
760 out_hist.delete(purge=True)
761
762 def test_existing_history(self):
763 self._test(existing_hist=True)
764
765 def test_new_history(self):
766 self._test(existing_hist=False)
767
768 def test_params(self):
769 self._test(params=True)
770
771
772 class TestRunDatasetCollectionWorkflow(GalaxyObjectsTestBase):
773
774 def setUp(self):
775 super().setUp()
776 with open(SAMPLE_WF_COLL_FN) as f:
777 self.wf = self.gi.workflows.import_new(f.read())
778 self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
779
780 def tearDown(self):
781 self.wf.delete()
782 self.hist.delete(purge=True)
783
784 def test_run_workflow_with_dataset_collection(self):
785 dataset1 = self.hist.paste_content(FOO_DATA)
786 dataset2 = self.hist.paste_content(FOO_DATA_2)
787 collection_description = dataset_collections.CollectionDescription(
788 name="MyDatasetList",
789 elements=[
790 dataset_collections.HistoryDatasetElement(name="sample1", id=dataset1.id),
791 dataset_collections.HistoryDatasetElement(name="sample2", id=dataset2.id),
792 ]
793 )
794 dataset_collection = self.hist.create_dataset_collection(collection_description)
795 input_map = {"Input Dataset Collection": dataset_collection,
796 "Input 2": dataset1}
797 outputs, out_hist = self.wf.run(input_map, self.hist, wait=True)
798 self.assertEqual(len(outputs), 1)
799 out_hdca = outputs[0]
800 self.assertIsInstance(out_hdca, wrappers.HistoryDatasetCollectionAssociation)
801 self.assertEqual(out_hdca.collection_type, 'list')
802 self.assertEqual(len(out_hdca.elements), 2)
803 self.assertEqual(out_hist.id, self.hist.id)
804
805
806 class TestJob(GalaxyObjectsTestBase):
807
808 def test_get(self):
809 job_prevs = self.gi.jobs.get_previews()
810 if len(job_prevs) > 0:
811 job_prev = job_prevs[0]
812 self.assertIsInstance(job_prev, wrappers.JobPreview)
813 job = self.gi.jobs.get(job_prev.id)
814 self.assertIsInstance(job, wrappers.Job)
815 self.assertEqual(job.id, job_prev.id)
816 for job in self.gi.jobs.list():
817 self.assertIsInstance(job, wrappers.Job)
818
819
820 def suite():
821 loader = unittest.TestLoader()
822 s = unittest.TestSuite()
823 s.addTests([loader.loadTestsFromTestCase(c) for c in (
824 TestWrapper,
825 TestWorkflow,
826 TestGalaxyInstance,
827 TestLibrary,
828 TestLDContents,
829 TestHistory,
830 TestHDAContents,
831 TestRunWorkflow,
832 )])
833 return s
834
835
836 if __name__ == '__main__':
837 tests = suite()
838 RUNNER = unittest.TextTestRunner(verbosity=2)
839 RUNNER.run(tests)