diff env/lib/python3.7/site-packages/bioblend/_tests/TestGalaxyObjects.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/bioblend/_tests/TestGalaxyObjects.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,845 +0,0 @@
-# pylint: disable=C0103,E1101
-import json
-import os
-import shutil
-import socket
-import sys
-import tarfile
-import tempfile
-import uuid
-from ssl import SSLError
-
-import six
-from six.moves.urllib.error import URLError
-from six.moves.urllib.request import urlopen
-
-import bioblend
-import bioblend.galaxy.objects.galaxy_instance as galaxy_instance
-import bioblend.galaxy.objects.wrappers as wrappers
-from bioblend import ConnectionError
-from bioblend.galaxy import dataset_collections
-from . import test_util
-from .test_util import unittest
-
-bioblend.set_stream_logger('test', level='INFO')
-socket.setdefaulttimeout(10.0)
-SAMPLE_FN = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
-SAMPLE_WF_COLL_FN = test_util.get_abspath(os.path.join('data', 'paste_columns_collections.ga'))
-FOO_DATA = 'foo\nbar\n'
-FOO_DATA_2 = 'foo2\nbar2\n'
-SAMPLE_WF_DICT = {
-    'deleted': False,
-    'id': '9005c5112febe774',
-    'inputs': {
-        '571': {'label': 'Input Dataset', 'value': ''},
-        '572': {'label': 'Input Dataset', 'value': ''},
-    },
-    'model_class': 'StoredWorkflow',
-    'name': 'paste_columns',
-    'published': False,
-    'steps': {
-        '571': {
-            'id': 571,
-            'input_steps': {},
-            'tool_id': None,
-            'tool_inputs': {'name': 'Input Dataset'},
-            'tool_version': None,
-            'type': 'data_input',
-        },
-        '572': {
-            'id': 572,
-            'input_steps': {},
-            'tool_id': None,
-            'tool_inputs': {'name': 'Input Dataset'},
-            'tool_version': None,
-            'type': 'data_input',
-        },
-        '573': {
-            'id': 573,
-            'input_steps': {
-                'input1': {'source_step': 571, 'step_output': 'output'},
-                'input2': {'source_step': 572, 'step_output': 'output'},
-            },
-            'tool_id': 'Paste1',
-            'tool_inputs': {
-                'delimiter': '"T"',
-                'input1': 'null',
-                'input2': 'null',
-            },
-            'tool_version': '1.0.0',
-            'type': 'tool',
-        }
-    },
-    'tags': [],
-    'url': '/api/workflows/9005c5112febe774',
-}
-
-
-def is_reachable(url):
-    res = None
-    try:
-        res = urlopen(url, timeout=5)
-    except (SSLError, URLError, socket.timeout):
-        return False
-    if res is not None:
-        res.close()
-    return True
-
-
-def upload_from_fs(lib, bnames, **kwargs):
-    tempdir = tempfile.mkdtemp(prefix='bioblend_test_')
-    try:
-        fnames = [os.path.join(tempdir, _) for _ in bnames]
-        for fn in fnames:
-            with open(fn, 'w') as f:
-                f.write(FOO_DATA)
-        dss = lib.upload_from_galaxy_fs(fnames, **kwargs)
-    finally:
-        shutil.rmtree(tempdir)
-    return dss, fnames
-
-
-class MockWrapper(wrappers.Wrapper):
-    BASE_ATTRS = frozenset(['a', 'b'])
-
-    def __init__(self, *args, **kwargs):
-        super(MockWrapper, self).__init__(*args, **kwargs)
-
-    @property
-    def gi_module(self):
-        return super(MockWrapper, self).gi_module()
-
-
-class TestWrapper(unittest.TestCase):
-
-    def setUp(self):
-        self.d = {'a': 1, 'b': [2, 3], 'c': {'x': 4}}
-        self.assertRaises(TypeError, wrappers.Wrapper, self.d)
-        self.w = MockWrapper(self.d)
-
-    def test_initialize(self):
-        for k in MockWrapper.BASE_ATTRS:
-            self.assertEqual(getattr(self.w, k), self.d[k])
-        self.w.a = 222
-        self.w.b[0] = 222
-        self.assertEqual(self.w.a, 222)
-        self.assertEqual(self.w.b[0], 222)
-        self.assertEqual(self.d['a'], 1)
-        self.assertEqual(self.d['b'][0], 2)
-        self.assertRaises(AttributeError, getattr, self.w, 'foo')
-        self.assertRaises(AttributeError, setattr, self.w, 'foo', 0)
-
-    def test_taint(self):
-        self.assertFalse(self.w.is_modified)
-        self.w.a = 111  # pylint: disable=W0201
-        self.assertTrue(self.w.is_modified)
-
-    def test_serialize(self):
-        w = MockWrapper.from_json(self.w.to_json())
-        self.assertEqual(w.wrapped, self.w.wrapped)
-
-    def test_clone(self):
-        w = self.w.clone()
-        self.assertEqual(w.wrapped, self.w.wrapped)
-        w.b[0] = 111
-        self.assertEqual(self.w.b[0], 2)
-
-    def test_kwargs(self):
-        parent = MockWrapper({'a': 10})
-        w = MockWrapper(self.d, parent=parent)
-        self.assertIs(w.parent, parent)
-        self.assertRaises(AttributeError, setattr, w, 'parent', 0)
-
-
-class TestWorkflow(unittest.TestCase):
-
-    def setUp(self):
-        self.wf = wrappers.Workflow(SAMPLE_WF_DICT)
-
-    def test_initialize(self):
-        self.assertEqual(self.wf.id, '9005c5112febe774')
-        self.assertEqual(self.wf.name, 'paste_columns')
-        self.assertEqual(self.wf.deleted, False)
-        self.assertEqual(self.wf.published, False)
-        self.assertEqual(self.wf.tags, [])
-        self.assertEqual(
-            self.wf.input_labels_to_ids, {'Input Dataset': set(['571', '572'])})
-        self.assertEqual(self.wf.tool_labels_to_ids, {'Paste1': set(['573'])})
-        self.assertEqual(self.wf.data_input_ids, set(['571', '572']))
-        self.assertEqual(self.wf.source_ids, set(['571', '572']))
-        self.assertEqual(self.wf.sink_ids, set(['573']))
-
-    def test_dag(self):
-        inv_dag = {}
-        for h, tails in six.iteritems(self.wf.dag):
-            for t in tails:
-                inv_dag.setdefault(str(t), set()).add(h)
-        self.assertEqual(self.wf.inv_dag, inv_dag)
-        heads = set(self.wf.dag)
-        self.assertEqual(heads, set.union(*self.wf.inv_dag.values()))
-        tails = set(self.wf.inv_dag)
-        self.assertEqual(tails, set.union(*self.wf.dag.values()))
-        ids = self.wf.sorted_step_ids()
-        self.assertEqual(set(ids), heads | tails)
-        for h, tails in six.iteritems(self.wf.dag):
-            for t in tails:
-                self.assertLess(ids.index(h), ids.index(t))
-
-    def test_steps(self):
-        steps = SAMPLE_WF_DICT['steps']
-        for sid, s in six.iteritems(self.wf.steps):
-            self.assertIsInstance(s, wrappers.Step)
-            self.assertEqual(s.id, sid)
-            self.assertIn(sid, steps)
-            self.assertIs(s.parent, self.wf)
-        self.assertEqual(self.wf.data_input_ids, set(['571', '572']))
-        self.assertEqual(self.wf.tool_ids, set(['573']))
-
-    def test_taint(self):
-        self.assertFalse(self.wf.is_modified)
-        self.wf.steps['571'].tool_id = 'foo'
-        self.assertTrue(self.wf.is_modified)
-
-    def test_input_map(self):
-        class DummyLD(object):
-            SRC = 'ld'
-
-            def __init__(self, id_):
-                self.id = id_
-
-        label = 'Input Dataset'
-        self.assertEqual(self.wf.input_labels, set([label]))
-        input_map = self.wf.convert_input_map(
-            {label: [DummyLD('a'), DummyLD('b')]})
-        # {'571': {'id': 'a', 'src': 'ld'}, '572': {'id': 'b', 'src': 'ld'}}
-        # OR
-        # {'571': {'id': 'b', 'src': 'ld'}, '572': {'id': 'a', 'src': 'ld'}}
-        self.assertEqual(set(input_map), set(['571', '572']))
-        for d in six.itervalues(input_map):
-            self.assertEqual(set(d), set(['id', 'src']))
-            self.assertEqual(d['src'], 'ld')
-            self.assertIn(d['id'], 'ab')
-
-
-@test_util.skip_unless_galaxy()
-class GalaxyObjectsTestBase(unittest.TestCase):
-
-    def setUp(self):
-        galaxy_key = os.environ['BIOBLEND_GALAXY_API_KEY']
-        galaxy_url = os.environ['BIOBLEND_GALAXY_URL']
-        self.gi = galaxy_instance.GalaxyInstance(galaxy_url, galaxy_key)
-
-
-class TestGalaxyInstance(GalaxyObjectsTestBase):
-
-    def test_library(self):
-        name = 'test_%s' % uuid.uuid4().hex
-        description, synopsis = 'D', 'S'
-        lib = self.gi.libraries.create(
-            name, description=description, synopsis=synopsis)
-        self.assertEqual(lib.name, name)
-        self.assertEqual(lib.description, description)
-        self.assertEqual(lib.synopsis, synopsis)
-        self.assertEqual(len(lib.content_infos), 1)  # root folder
-        self.assertEqual(len(lib.folder_ids), 1)
-        self.assertEqual(len(lib.dataset_ids), 0)
-        self.assertIn(lib.id, [_.id for _ in self.gi.libraries.list()])
-        lib.delete()
-        self.assertFalse(lib.is_mapped)
-
-    def test_workflow_from_str(self):
-        with open(SAMPLE_FN) as f:
-            wf = self.gi.workflows.import_new(f.read())
-        self._check_and_del_workflow(wf)
-
-    def test_workflow_collections_from_str(self):
-        with open(SAMPLE_WF_COLL_FN) as f:
-            wf = self.gi.workflows.import_new(f.read())
-        self._check_and_del_workflow(wf)
-
-    def test_workflow_from_dict(self):
-        with open(SAMPLE_FN) as f:
-            wf = self.gi.workflows.import_new(json.load(f))
-        self._check_and_del_workflow(wf)
-
-    def test_workflow_publish_from_dict(self):
-        with open(SAMPLE_FN) as f:
-            wf = self.gi.workflows.import_new(json.load(f), publish=True)
-        self._check_and_del_workflow(wf, check_is_public=True)
-
-    def test_workflow_missing_tools(self):
-        with open(SAMPLE_FN) as f:
-            wf_dump = json.load(f)
-        wf_info = self.gi.gi.workflows.import_workflow_dict(wf_dump)
-        wf_dict = self.gi.gi.workflows.show_workflow(wf_info['id'])
-        for id_, step in six.iteritems(wf_dict['steps']):
-            if step['type'] == 'tool':
-                for k in 'tool_inputs', 'tool_version':
-                    wf_dict['steps'][id_][k] = None
-        wf = wrappers.Workflow(wf_dict, gi=self.gi)
-        self.assertFalse(wf.is_runnable)
-        self.assertRaises(RuntimeError, wf.run)
-        wf.delete()
-
-    def test_workflow_export(self):
-        with open(SAMPLE_FN) as f:
-            wf1 = self.gi.workflows.import_new(f.read())
-        wf2 = self.gi.workflows.import_new(wf1.export())
-        self.assertNotEqual(wf1.id, wf2.id)
-        for wf in wf1, wf2:
-            self._check_and_del_workflow(wf)
-
-    def _check_and_del_workflow(self, wf, check_is_public=False):
-        # Galaxy appends additional text to imported workflow names
-        self.assertTrue(wf.name.startswith('paste_columns'))
-        self.assertEqual(len(wf.steps), 3)
-        for step_id, step in six.iteritems(wf.steps):
-            self.assertIsInstance(step, wrappers.Step)
-            self.assertEqual(step_id, step.id)
-            self.assertIsInstance(step.tool_inputs, dict)
-            if step.type == 'tool':
-                self.assertIsNotNone(step.tool_id)
-                self.assertIsNotNone(step.tool_version)
-                self.assertIsInstance(step.input_steps, dict)
-            elif step.type in ('data_collection_input', 'data_input'):
-                self.assertIsNone(step.tool_id)
-                self.assertIsNone(step.tool_version)
-                self.assertEqual(step.input_steps, {})
-        wf_ids = set(_.id for _ in self.gi.workflows.list())
-        self.assertIn(wf.id, wf_ids)
-        if check_is_public:
-            self.assertTrue(wf.published)
-        wf.delete()
-
-    # not very accurate:
-    #   * we can't publish a wf from the API
-    #   * we can't directly get another user's wf
-    def test_workflow_from_shared(self):
-        all_prevs = dict(
-            (_.id, _) for _ in self.gi.workflows.get_previews(published=True)
-        )
-        pub_only_ids = set(all_prevs).difference(
-            _.id for _ in self.gi.workflows.get_previews())
-        if pub_only_ids:
-            wf_id = pub_only_ids.pop()
-            imported = self.gi.workflows.import_shared(wf_id)
-            self.assertIsInstance(imported, wrappers.Workflow)
-            imported.delete()
-        else:
-            self.skipTest('no published workflows, manually publish a workflow to run this test')
-
-    def test_get_libraries(self):
-        self._test_multi_get('library')
-
-    def test_get_histories(self):
-        self._test_multi_get('history')
-
-    def test_get_workflows(self):
-        self._test_multi_get('workflow')
-
-    def _normalized_functions(self, obj_type):
-        if obj_type == 'library':
-            create = self.gi.libraries.create
-            get_objs = self.gi.libraries.list
-            get_prevs = self.gi.libraries.get_previews
-            del_kwargs = {}
-        elif obj_type == 'history':
-            create = self.gi.histories.create
-            get_objs = self.gi.histories.list
-            get_prevs = self.gi.histories.get_previews
-            del_kwargs = {'purge': True}
-        elif obj_type == 'workflow':
-            def create(name):
-                with open(SAMPLE_FN) as f:
-                    d = json.load(f)
-                d['name'] = name
-                return self.gi.workflows.import_new(d)
-
-            get_objs = self.gi.workflows.list
-            get_prevs = self.gi.workflows.get_previews
-            del_kwargs = {}
-        return create, get_objs, get_prevs, del_kwargs
-
-    def _test_multi_get(self, obj_type):
-        create, get_objs, get_prevs, del_kwargs = self._normalized_functions(
-            obj_type)
-
-        def ids(seq):
-            return set(_.id for _ in seq)
-
-        names = ['test_%s' % uuid.uuid4().hex for _ in range(2)]
-        objs = []
-        try:
-            objs = [create(_) for _ in names]
-            self.assertLessEqual(ids(objs), ids(get_objs()))
-            if obj_type != 'workflow':
-                filtered = get_objs(name=names[0])
-                self.assertEqual(len(filtered), 1)
-                self.assertEqual(filtered[0].id, objs[0].id)
-                del_id = objs[-1].id
-                objs.pop().delete(**del_kwargs)
-                self.assertIn(del_id, ids(get_prevs(deleted=True)))
-            else:
-                # Galaxy appends info strings to imported workflow names
-                prev = get_prevs()[0]
-                filtered = get_objs(name=prev.name)
-                self.assertEqual(len(filtered), 1)
-                self.assertEqual(filtered[0].id, prev.id)
-        finally:
-            for o in objs:
-                o.delete(**del_kwargs)
-
-    def test_delete_libraries_by_name(self):
-        self._test_delete_by_name('library')
-
-    def test_delete_histories_by_name(self):
-        self._test_delete_by_name('history')
-
-    def test_delete_workflows_by_name(self):
-        self._test_delete_by_name('workflow')
-
-    def _test_delete_by_name(self, obj_type):
-        create, _, get_prevs, del_kwargs = self._normalized_functions(
-            obj_type)
-        name = 'test_%s' % uuid.uuid4().hex
-        objs = [create(name) for _ in range(2)]  # noqa: F812
-        final_name = objs[0].name
-        prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted]
-        self.assertEqual(len(prevs), len(objs))
-        del_kwargs['name'] = final_name
-        objs[0].gi_module.delete(**del_kwargs)
-        prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted]
-        self.assertEqual(len(prevs), 0)
-
-
-class TestLibrary(GalaxyObjectsTestBase):
-    # just something that can be expected to be always up
-    DS_URL = 'https://tools.ietf.org/rfc/rfc1866.txt'
-
-    def setUp(self):
-        super(TestLibrary, self).setUp()
-        self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
-
-    def tearDown(self):
-        self.lib.delete()
-
-    def test_root_folder(self):
-        r = self.lib.root_folder
-        self.assertIsNone(r.parent)
-
-    def test_folder(self):
-        name, desc = 'test_%s' % uuid.uuid4().hex, 'D'
-        folder = self.lib.create_folder(name, description=desc)
-        self.assertEqual(folder.name, name)
-        self.assertEqual(folder.description, desc)
-        self.assertIs(folder.container, self.lib)
-        self.assertEqual(folder.parent.id, self.lib.root_folder.id)
-        self.assertEqual(len(self.lib.content_infos), 2)
-        self.assertEqual(len(self.lib.folder_ids), 2)
-        self.assertIn(folder.id, self.lib.folder_ids)
-        retrieved = self.lib.get_folder(folder.id)
-        self.assertEqual(folder.id, retrieved.id)
-
-    def _check_datasets(self, dss):
-        self.assertEqual(len(dss), len(self.lib.dataset_ids))
-        self.assertEqual(set(_.id for _ in dss), set(self.lib.dataset_ids))
-        for ds in dss:
-            self.assertIs(ds.container, self.lib)
-
-    def test_dataset(self):
-        folder = self.lib.create_folder('test_%s' % uuid.uuid4().hex)
-        ds = self.lib.upload_data(FOO_DATA, folder=folder)
-        self.assertEqual(len(self.lib.content_infos), 3)
-        self.assertEqual(len(self.lib.folder_ids), 2)
-        self._check_datasets([ds])
-
-    def test_dataset_from_url(self):
-        if is_reachable(self.DS_URL):
-            ds = self.lib.upload_from_url(self.DS_URL)
-            self._check_datasets([ds])
-        else:
-            self.skipTest('%s not reachable' % self.DS_URL)
-
-    def test_dataset_from_local(self):
-        with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f:
-            f.write(FOO_DATA)
-            f.flush()
-            ds = self.lib.upload_from_local(f.name)
-        self._check_datasets([ds])
-
-    def test_datasets_from_fs(self):
-        bnames = ['f%d.txt' % i for i in range(2)]
-        dss, fnames = upload_from_fs(self.lib, bnames)
-        self._check_datasets(dss)
-        dss, fnames = upload_from_fs(
-            self.lib, bnames, link_data_only='link_to_files')
-        for ds, fn in zip(dss, fnames):
-            self.assertEqual(ds.file_name, fn)
-
-    def test_copy_from_dataset(self):
-        hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
-        try:
-            hda = hist.paste_content(FOO_DATA)
-            ds = self.lib.copy_from_dataset(hda)
-        finally:
-            hist.delete(purge=True)
-        self._check_datasets([ds])
-
-    def test_get_dataset(self):
-        ds = self.lib.upload_data(FOO_DATA)
-        retrieved = self.lib.get_dataset(ds.id)
-        self.assertEqual(ds.id, retrieved.id)
-
-    def test_get_datasets(self):
-        bnames = ['f%d.txt' % _ for _ in range(2)]
-        dss, _ = upload_from_fs(self.lib, bnames)
-        retrieved = self.lib.get_datasets()
-        self.assertEqual(len(dss), len(retrieved))
-        self.assertEqual(set(_.id for _ in dss), set(_.id for _ in retrieved))
-        name = '/%s' % bnames[0]
-        selected = self.lib.get_datasets(name=name)
-        self.assertEqual(len(selected), 1)
-        self.assertEqual(selected[0].name, bnames[0])
-
-
-class TestLDContents(GalaxyObjectsTestBase):
-
-    def setUp(self):
-        super(TestLDContents, self).setUp()
-        self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
-        self.ds = self.lib.upload_data(FOO_DATA)
-        self.ds.wait()
-
-    def tearDown(self):
-        self.lib.delete()
-
-    def test_dataset_get_stream(self):
-        for idx, c in enumerate(self.ds.get_stream(chunk_size=1)):
-            self.assertEqual(six.b(FOO_DATA[idx]), c)
-
-    def test_dataset_peek(self):
-        fetched_data = self.ds.peek(chunk_size=4)
-        self.assertEqual(six.b(FOO_DATA[0:4]), fetched_data)
-
-    def test_dataset_download(self):
-        with tempfile.TemporaryFile() as f:
-            self.ds.download(f)
-            f.seek(0)
-            self.assertEqual(six.b(FOO_DATA), f.read())
-
-    def test_dataset_get_contents(self):
-        self.assertEqual(six.b(FOO_DATA), self.ds.get_contents())
-
-    def test_dataset_delete(self):
-        self.ds.delete()
-        # Cannot test this yet because the 'deleted' attribute is not exported
-        # by the API at the moment
-        # self.assertTrue(self.ds.deleted)
-
-    @test_util.skip_unless_galaxy('release_17.09')
-    def test_dataset_update(self):
-        new_name = 'test_%s' % uuid.uuid4().hex
-        new_misc_info = 'Annotation for %s' % new_name
-        new_genome_build = 'hg19'
-        updated_ldda = self.ds.update(name=new_name, misc_info=new_misc_info, genome_build=new_genome_build)
-        self.assertEqual(self.ds.id, updated_ldda.id)
-        self.assertEqual(self.ds.name, new_name)
-        self.assertEqual(self.ds.misc_info, new_misc_info)
-        self.assertEqual(self.ds.genome_build, new_genome_build)
-
-
-class TestHistory(GalaxyObjectsTestBase):
-
-    def setUp(self):
-        super(TestHistory, self).setUp()
-        self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
-
-    def tearDown(self):
-        self.hist.delete(purge=True)
-
-    def test_create_delete(self):
-        name = 'test_%s' % uuid.uuid4().hex
-        hist = self.gi.histories.create(name)
-        self.assertEqual(hist.name, name)
-        hist_id = hist.id
-        self.assertIn(hist_id, [_.id for _ in self.gi.histories.list()])
-        hist.delete(purge=True)
-        self.assertFalse(hist.is_mapped)
-        try:
-            h = self.gi.histories.get(hist_id)
-            self.assertTrue(h.deleted)
-        except ConnectionError:
-            # Galaxy up to release_2015.01.13 gives a ConnectionError
-            pass
-
-    def _check_dataset(self, hda):
-        self.assertIsInstance(hda, wrappers.HistoryDatasetAssociation)
-        self.assertIs(hda.container, self.hist)
-        self.assertEqual(len(self.hist.dataset_ids), 1)
-        self.assertEqual(self.hist.dataset_ids[0], hda.id)
-
-    def test_import_dataset(self):
-        lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
-        lds = lib.upload_data(FOO_DATA)
-        self.assertEqual(len(self.hist.dataset_ids), 0)
-        hda = self.hist.import_dataset(lds)
-        lib.delete()
-        self._check_dataset(hda)
-
-    def test_upload_file(self):
-        with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f:
-            f.write(FOO_DATA)
-            f.flush()
-            hda = self.hist.upload_file(f.name)
-        self._check_dataset(hda)
-
-    def test_paste_content(self):
-        hda = self.hist.paste_content(FOO_DATA)
-        self._check_dataset(hda)
-
-    def test_get_dataset(self):
-        hda = self.hist.paste_content(FOO_DATA)
-        retrieved = self.hist.get_dataset(hda.id)
-        self.assertEqual(hda.id, retrieved.id)
-
-    def test_get_datasets(self):
-        bnames = ['f%d.txt' % _ for _ in range(2)]
-        lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
-        lds = upload_from_fs(lib, bnames)[0]
-        hdas = [self.hist.import_dataset(_) for _ in lds]
-        lib.delete()
-        retrieved = self.hist.get_datasets()
-        self.assertEqual(len(hdas), len(retrieved))
-        self.assertEqual(set(_.id for _ in hdas), set(_.id for _ in retrieved))
-        selected = self.hist.get_datasets(name=bnames[0])
-        self.assertEqual(len(selected), 1)
-        self.assertEqual(selected[0].name, bnames[0])
-
-    def test_export_and_download(self):
-        jeha_id = self.hist.export(wait=True, maxwait=60)
-        self.assertTrue(jeha_id)
-        tempdir = tempfile.mkdtemp(prefix='bioblend_test_')
-        temp_fn = os.path.join(tempdir, 'export.tar.gz')
-        try:
-            with open(temp_fn, 'wb') as fo:
-                self.hist.download(jeha_id, fo)
-            self.assertTrue(tarfile.is_tarfile(temp_fn))
-        finally:
-            shutil.rmtree(tempdir)
-
-    def test_update(self):
-        new_name = 'test_%s' % uuid.uuid4().hex
-        new_annotation = 'Annotation for %s' % new_name
-        new_tags = ['tag1', 'tag2']
-        updated_hist = self.hist.update(name=new_name, annotation=new_annotation, tags=new_tags)
-        self.assertEqual(self.hist.id, updated_hist.id)
-        self.assertEqual(self.hist.name, new_name)
-        self.assertEqual(self.hist.annotation, new_annotation)
-        self.assertEqual(self.hist.tags, new_tags)
-        updated_hist = self.hist.update(published=True)
-        self.assertEqual(self.hist.id, updated_hist.id)
-        self.assertTrue(self.hist.published)
-
-    def test_create_dataset_collection(self):
-        self._create_collection_description()
-        hdca = self.hist.create_dataset_collection(self.collection_description)
-        self.assertIsInstance(hdca, wrappers.HistoryDatasetCollectionAssociation)
-        self.assertEqual(hdca.collection_type, 'list')
-        self.assertIs(hdca.container, self.hist)
-        self.assertEqual(len(hdca.elements), 2)
-        self.assertEqual(self.dataset1.id, hdca.elements[0]['object']['id'])
-        self.assertEqual(self.dataset2.id, hdca.elements[1]['object']['id'])
-
-    def test_delete_dataset_collection(self):
-        self._create_collection_description()
-        hdca = self.hist.create_dataset_collection(self.collection_description)
-        hdca.delete()
-        self.assertTrue(hdca.deleted)
-
-    def _create_collection_description(self):
-        self.dataset1 = self.hist.paste_content(FOO_DATA)
-        self.dataset2 = self.hist.paste_content(FOO_DATA_2)
-        self.collection_description = dataset_collections.CollectionDescription(
-            name="MyDatasetList",
-            elements=[
-                dataset_collections.HistoryDatasetElement(name="sample1", id=self.dataset1.id),
-                dataset_collections.HistoryDatasetElement(name="sample2", id=self.dataset2.id),
-            ]
-        )
-
-
-class TestHDAContents(GalaxyObjectsTestBase):
-
-    def setUp(self):
-        super(TestHDAContents, self).setUp()
-        self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
-        self.ds = self.hist.paste_content(FOO_DATA)
-        self.ds.wait()
-
-    def tearDown(self):
-        self.hist.delete(purge=True)
-
-    def test_dataset_get_stream(self):
-        for idx, c in enumerate(self.ds.get_stream(chunk_size=1)):
-            self.assertEqual(six.b(FOO_DATA[idx]), c)
-
-    def test_dataset_peek(self):
-        fetched_data = self.ds.peek(chunk_size=4)
-        self.assertEqual(six.b(FOO_DATA[0:4]), fetched_data)
-
-    def test_dataset_download(self):
-        with tempfile.TemporaryFile() as f:
-            self.ds.download(f)
-            f.seek(0)
-            self.assertEqual(six.b(FOO_DATA), f.read())
-
-    def test_dataset_get_contents(self):
-        self.assertEqual(six.b(FOO_DATA), self.ds.get_contents())
-
-    def test_dataset_update(self):
-        new_name = 'test_%s' % uuid.uuid4().hex
-        new_annotation = 'Annotation for %s' % new_name
-        new_genome_build = 'hg19'
-        updated_hda = self.ds.update(name=new_name, annotation=new_annotation, genome_build=new_genome_build)
-        self.assertEqual(self.ds.id, updated_hda.id)
-        self.assertEqual(self.ds.name, new_name)
-        self.assertEqual(self.ds.annotation, new_annotation)
-        self.assertEqual(self.ds.genome_build, new_genome_build)
-
-    def test_dataset_delete(self):
-        self.ds.delete()
-        self.assertTrue(self.ds.deleted)
-        self.assertFalse(self.ds.purged)
-
-    @test_util.skip_unless_galaxy("release_17.05")
-    def test_dataset_purge(self):
-        self.ds.delete(purge=True)
-        # Galaxy from release_15.03 to release_17.01 wrongly reports ds.deleted as False, see https://github.com/galaxyproject/galaxy/issues/3548
-        self.assertTrue(self.ds.deleted)
-        self.assertTrue(self.ds.purged)
-
-
-class TestRunWorkflow(GalaxyObjectsTestBase):
-
-    def setUp(self):
-        super(TestRunWorkflow, self).setUp()
-        self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
-        with open(SAMPLE_FN) as f:
-            self.wf = self.gi.workflows.import_new(f.read())
-        self.contents = ['one\ntwo\n', '1\n2\n']
-        self.inputs = [self.lib.upload_data(_) for _ in self.contents]
-
-    def tearDown(self):
-        self.wf.delete()
-        self.lib.delete()
-
-    def _test(self, existing_hist=False, params=False):
-        hist_name = 'test_%s' % uuid.uuid4().hex
-        if existing_hist:
-            hist = self.gi.histories.create(hist_name)
-        else:
-            hist = hist_name
-        if params:
-            params = {'Paste1': {'delimiter': 'U'}}
-            sep = '_'  # 'U' maps to '_' in the paste tool
-        else:
-            params = None
-            sep = '\t'  # default
-        input_map = {'Input 1': self.inputs[0], 'Input 2': self.inputs[1]}
-        sys.stderr.write(os.linesep)
-        outputs, out_hist = self.wf.run(
-            input_map, hist, params=params, wait=True, polling_interval=1)
-        self.assertEqual(len(outputs), 1)
-        out_ds = outputs[0]
-        self.assertIn(out_ds.id, out_hist.dataset_ids)
-        res = out_ds.get_contents()
-        exp_rows = zip(*(_.splitlines() for _ in self.contents))
-        exp_res = six.b("\n".join(sep.join(t) for t in exp_rows) + "\n")
-        self.assertEqual(res, exp_res)
-        if existing_hist:
-            self.assertEqual(out_hist.id, hist.id)
-        out_hist.delete(purge=True)
-
-    def test_existing_history(self):
-        self._test(existing_hist=True)
-
-    def test_new_history(self):
-        self._test(existing_hist=False)
-
-    def test_params(self):
-        self._test(params=True)
-
-
-class TestRunDatasetCollectionWorkflow(GalaxyObjectsTestBase):
-
-    def setUp(self):
-        super(TestRunDatasetCollectionWorkflow, self).setUp()
-        with open(SAMPLE_WF_COLL_FN) as f:
-            self.wf = self.gi.workflows.import_new(f.read())
-        self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
-
-    def tearDown(self):
-        self.wf.delete()
-        self.hist.delete(purge=True)
-
-    def test_run_workflow_with_dataset_collection(self):
-        dataset1 = self.hist.paste_content(FOO_DATA)
-        dataset2 = self.hist.paste_content(FOO_DATA_2)
-        collection_description = dataset_collections.CollectionDescription(
-            name="MyDatasetList",
-            elements=[
-                dataset_collections.HistoryDatasetElement(name="sample1", id=dataset1.id),
-                dataset_collections.HistoryDatasetElement(name="sample2", id=dataset2.id),
-            ]
-        )
-        dataset_collection = self.hist.create_dataset_collection(collection_description)
-        input_map = {"Input Dataset Collection": dataset_collection,
-                     "Input 2": dataset1}
-        outputs, out_hist = self.wf.run(input_map, self.hist, wait=True)
-        self.assertEqual(len(outputs), 1)
-        out_hdca = outputs[0]
-        self.assertIsInstance(out_hdca, wrappers.HistoryDatasetCollectionAssociation)
-        self.assertEqual(out_hdca.collection_type, 'list')
-        self.assertEqual(len(out_hdca.elements), 2)
-        self.assertEqual(out_hist.id, self.hist.id)
-
-
-class TestJob(GalaxyObjectsTestBase):
-
-    def setUp(self):
-        super(TestJob, self).setUp()
-
-    def test_get(self):
-        job_prevs = self.gi.jobs.get_previews()
-        if len(job_prevs) > 0:
-            job_prev = job_prevs[0]
-            self.assertIsInstance(job_prev, wrappers.JobPreview)
-            job = self.gi.jobs.get(job_prev.id)
-            self.assertIsInstance(job, wrappers.Job)
-            self.assertEqual(job.id, job_prev.id)
-        for job in self.gi.jobs.list():
-            self.assertIsInstance(job, wrappers.Job)
-
-
-def suite():
-    loader = unittest.TestLoader()
-    s = unittest.TestSuite()
-    s.addTests([loader.loadTestsFromTestCase(c) for c in (
-        TestWrapper,
-        TestWorkflow,
-        TestGalaxyInstance,
-        TestLibrary,
-        TestLDContents,
-        TestHistory,
-        TestHDAContents,
-        TestRunWorkflow,
-    )])
-    return s
-
-
-if __name__ == '__main__':
-    # By default, run all tests.  To run specific tests, do the following:
-    #   python -m unittest <module>.<class>.<test_method>
-    tests = suite()
-    RUNNER = unittest.TextTestRunner(verbosity=2)
-    RUNNER.run(tests)