view env/lib/python3.9/site-packages/bioblend/_tests/TestGalaxyObjects.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

# pylint: disable=C0103,E1101
import json
import os
import shutil
import socket
import sys
import tarfile
import tempfile
import uuid
from ssl import SSLError
from urllib.error import URLError
from urllib.request import urlopen

import bioblend
import bioblend.galaxy.objects.galaxy_instance as galaxy_instance
import bioblend.galaxy.objects.wrappers as wrappers
from bioblend.galaxy import dataset_collections
from . import test_util
from .test_util import unittest

bioblend.set_stream_logger('test', level='INFO')
socket.setdefaulttimeout(10.0)
SAMPLE_FN = test_util.get_abspath(os.path.join('data', 'paste_columns.ga'))
SAMPLE_WF_COLL_FN = test_util.get_abspath(os.path.join('data', 'paste_columns_collections.ga'))
SAMPLE_WF_PARAMETER_INPUT_FN = test_util.get_abspath(os.path.join('data', 'workflow_with_parameter_input.ga'))
FOO_DATA = 'foo\nbar\n'
FOO_DATA_2 = 'foo2\nbar2\n'
SAMPLE_WF_DICT = {
    'deleted': False,
    'id': '9005c5112febe774',
    'inputs': {
        '571': {'label': 'Input Dataset', 'value': ''},
        '572': {'label': 'Input Dataset', 'value': ''},
    },
    'model_class': 'StoredWorkflow',
    'name': 'paste_columns',
    'owner': 'user_foo',
    'published': False,
    'steps': {
        '571': {
            'id': 571,
            'input_steps': {},
            'tool_id': None,
            'tool_inputs': {'name': 'Input Dataset'},
            'tool_version': None,
            'type': 'data_input',
        },
        '572': {
            'id': 572,
            'input_steps': {},
            'tool_id': None,
            'tool_inputs': {'name': 'Input Dataset'},
            'tool_version': None,
            'type': 'data_input',
        },
        '573': {
            'id': 573,
            'input_steps': {
                'input1': {'source_step': 571, 'step_output': 'output'},
                'input2': {'source_step': 572, 'step_output': 'output'},
            },
            'tool_id': 'Paste1',
            'tool_inputs': {
                'delimiter': '"T"',
                'input1': 'null',
                'input2': 'null',
            },
            'tool_version': '1.0.0',
            'type': 'tool',
        }
    },
    'tags': [],
    'url': '/api/workflows/9005c5112febe774',
}


def is_reachable(url):
    res = None
    try:
        res = urlopen(url, timeout=5)
    except (SSLError, URLError, socket.timeout):
        return False
    if res is not None:
        res.close()
    return True


def upload_from_fs(lib, bnames, **kwargs):
    tempdir = tempfile.mkdtemp(prefix='bioblend_test_')
    try:
        fnames = [os.path.join(tempdir, _) for _ in bnames]
        for fn in fnames:
            with open(fn, 'w') as f:
                f.write(FOO_DATA)
        dss = lib.upload_from_galaxy_fs(fnames, **kwargs)
    finally:
        shutil.rmtree(tempdir)
    return dss, fnames


class MockWrapper(wrappers.Wrapper):
    BASE_ATTRS = frozenset(['a', 'b'])

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    @property
    def gi_module(self):
        return super().gi_module()


class TestWrapper(unittest.TestCase):

    def setUp(self):
        self.d = {'a': 1, 'b': [2, 3], 'c': {'x': 4}}
        self.assertRaises(TypeError, wrappers.Wrapper, self.d)
        self.w = MockWrapper(self.d)

    def test_initialize(self):
        for k in MockWrapper.BASE_ATTRS:
            self.assertEqual(getattr(self.w, k), self.d[k])
        self.w.a = 222
        self.w.b[0] = 222
        self.assertEqual(self.w.a, 222)
        self.assertEqual(self.w.b[0], 222)
        self.assertEqual(self.d['a'], 1)
        self.assertEqual(self.d['b'][0], 2)
        self.assertRaises(AttributeError, getattr, self.w, 'foo')
        self.assertRaises(AttributeError, setattr, self.w, 'foo', 0)

    def test_taint(self):
        self.assertFalse(self.w.is_modified)
        self.w.a = 111  # pylint: disable=W0201
        self.assertTrue(self.w.is_modified)

    def test_serialize(self):
        w = MockWrapper.from_json(self.w.to_json())
        self.assertEqual(w.wrapped, self.w.wrapped)

    def test_clone(self):
        w = self.w.clone()
        self.assertEqual(w.wrapped, self.w.wrapped)
        w.b[0] = 111
        self.assertEqual(self.w.b[0], 2)

    def test_kwargs(self):
        parent = MockWrapper({'a': 10})
        w = MockWrapper(self.d, parent=parent)
        self.assertIs(w.parent, parent)
        self.assertRaises(AttributeError, setattr, w, 'parent', 0)


class TestWorkflow(unittest.TestCase):

    def setUp(self):
        self.wf = wrappers.Workflow(SAMPLE_WF_DICT)

    def test_initialize(self):
        self.assertEqual(self.wf.id, '9005c5112febe774')
        self.assertEqual(self.wf.name, 'paste_columns')
        self.assertEqual(self.wf.deleted, False)
        self.assertEqual(self.wf.owner, 'user_foo')
        self.assertEqual(self.wf.published, False)
        self.assertEqual(self.wf.tags, [])
        self.assertEqual(
            self.wf.input_labels_to_ids, {'Input Dataset': {'571', '572'}})
        self.assertEqual(self.wf.tool_labels_to_ids, {'Paste1': {'573'}})
        self.assertEqual(self.wf.data_input_ids, {'571', '572'})
        self.assertEqual(self.wf.source_ids, {'571', '572'})
        self.assertEqual(self.wf.sink_ids, {'573'})

    def test_dag(self):
        inv_dag = {}
        for h, tails in self.wf.dag.items():
            for t in tails:
                inv_dag.setdefault(str(t), set()).add(h)
        self.assertEqual(self.wf.inv_dag, inv_dag)
        heads = set(self.wf.dag)
        self.assertEqual(heads, set.union(*self.wf.inv_dag.values()))
        tails = set(self.wf.inv_dag)
        self.assertEqual(tails, set.union(*self.wf.dag.values()))
        ids = self.wf.sorted_step_ids()
        self.assertEqual(set(ids), heads | tails)
        for h, tails in self.wf.dag.items():
            for t in tails:
                self.assertLess(ids.index(h), ids.index(t))

    def test_steps(self):
        steps = SAMPLE_WF_DICT['steps']
        for sid, s in self.wf.steps.items():
            self.assertIsInstance(s, wrappers.Step)
            self.assertEqual(s.id, sid)
            self.assertIn(sid, steps)
            self.assertIs(s.parent, self.wf)
        self.assertEqual(self.wf.data_input_ids, {'571', '572'})
        self.assertEqual(self.wf.tool_ids, {'573'})

    def test_taint(self):
        self.assertFalse(self.wf.is_modified)
        self.wf.steps['571'].tool_id = 'foo'
        self.assertTrue(self.wf.is_modified)

    def test_input_map(self):
        class DummyLD:
            SRC = 'ld'

            def __init__(self, id_):
                self.id = id_

        label = 'Input Dataset'
        self.assertEqual(self.wf.input_labels, {label})
        input_map = self.wf.convert_input_map(
            {label: [DummyLD('a'), DummyLD('b')]})
        # {'571': {'id': 'a', 'src': 'ld'}, '572': {'id': 'b', 'src': 'ld'}}
        # OR
        # {'571': {'id': 'b', 'src': 'ld'}, '572': {'id': 'a', 'src': 'ld'}}
        self.assertEqual(set(input_map), {'571', '572'})
        for d in input_map.values():
            self.assertEqual(set(d), {'id', 'src'})
            self.assertEqual(d['src'], 'ld')
            self.assertIn(d['id'], 'ab')


@test_util.skip_unless_galaxy()
class GalaxyObjectsTestBase(unittest.TestCase):

    def setUp(self):
        galaxy_key = os.environ['BIOBLEND_GALAXY_API_KEY']
        galaxy_url = os.environ['BIOBLEND_GALAXY_URL']
        self.gi = galaxy_instance.GalaxyInstance(galaxy_url, galaxy_key)


class TestGalaxyInstance(GalaxyObjectsTestBase):

    def test_library(self):
        name = 'test_%s' % uuid.uuid4().hex
        description, synopsis = 'D', 'S'
        lib = self.gi.libraries.create(
            name, description=description, synopsis=synopsis)
        self.assertEqual(lib.name, name)
        self.assertEqual(lib.description, description)
        self.assertEqual(lib.synopsis, synopsis)
        self.assertEqual(len(lib.content_infos), 1)  # root folder
        self.assertEqual(len(lib.folder_ids), 1)
        self.assertEqual(len(lib.dataset_ids), 0)
        self.assertIn(lib.id, [_.id for _ in self.gi.libraries.list()])
        lib.delete()
        self.assertFalse(lib.is_mapped)

    def test_workflow_from_str(self):
        with open(SAMPLE_FN) as f:
            wf = self.gi.workflows.import_new(f.read())
        self._check_and_del_workflow(wf)

    def test_workflow_collections_from_str(self):
        with open(SAMPLE_WF_COLL_FN) as f:
            wf = self.gi.workflows.import_new(f.read())
        self._check_and_del_workflow(wf)

    @test_util.skip_unless_galaxy('release_19.01')
    def test_workflow_parameter_input(self):
        with open(SAMPLE_WF_PARAMETER_INPUT_FN) as f:
            self.gi.workflows.import_new(f.read())

    def test_workflow_from_dict(self):
        with open(SAMPLE_FN) as f:
            wf = self.gi.workflows.import_new(json.load(f))
        self._check_and_del_workflow(wf)

    def test_workflow_publish_from_dict(self):
        with open(SAMPLE_FN) as f:
            wf = self.gi.workflows.import_new(json.load(f), publish=True)
        self._check_and_del_workflow(wf, check_is_public=True)

    def test_workflow_missing_tools(self):
        with open(SAMPLE_FN) as f:
            wf_dump = json.load(f)
        wf_info = self.gi.gi.workflows.import_workflow_dict(wf_dump)
        wf_dict = self.gi.gi.workflows.show_workflow(wf_info['id'])
        for id_, step in wf_dict['steps'].items():
            if step['type'] == 'tool':
                for k in 'tool_inputs', 'tool_version':
                    wf_dict['steps'][id_][k] = None
        wf = wrappers.Workflow(wf_dict, gi=self.gi)
        self.assertFalse(wf.is_runnable)
        self.assertRaises(RuntimeError, wf.run)
        wf.delete()

    def test_workflow_export(self):
        with open(SAMPLE_FN) as f:
            wf1 = self.gi.workflows.import_new(f.read())
        wf2 = self.gi.workflows.import_new(wf1.export())
        self.assertNotEqual(wf1.id, wf2.id)
        for wf in wf1, wf2:
            self._check_and_del_workflow(wf)

    def _check_and_del_workflow(self, wf, check_is_public=False):
        # Galaxy appends additional text to imported workflow names
        self.assertTrue(wf.name.startswith('paste_columns'))
        self.assertEqual(type(wf.owner), str)
        self.assertEqual(len(wf.steps), 3)
        for step_id, step in wf.steps.items():
            self.assertIsInstance(step, wrappers.Step)
            self.assertEqual(step_id, step.id)
            self.assertIsInstance(step.tool_inputs, dict)
            if step.type == 'tool':
                self.assertIsNotNone(step.tool_id)
                self.assertIsNotNone(step.tool_version)
                self.assertIsInstance(step.input_steps, dict)
            elif step.type in ('data_collection_input', 'data_input'):
                self.assertIsNone(step.tool_id)
                self.assertIsNone(step.tool_version)
                self.assertEqual(step.input_steps, {})
        wf_ids = {_.id for _ in self.gi.workflows.list()}
        self.assertIn(wf.id, wf_ids)
        if check_is_public:
            self.assertTrue(wf.published)
        wf.delete()

    # not very accurate:
    #   * we can't publish a wf from the API
    #   * we can't directly get another user's wf
    def test_workflow_from_shared(self):
        all_prevs = {
            _.id: _ for _ in self.gi.workflows.get_previews(published=True)
        }
        pub_only_ids = set(all_prevs).difference(
            _.id for _ in self.gi.workflows.get_previews())
        if pub_only_ids:
            wf_id = pub_only_ids.pop()
            imported = self.gi.workflows.import_shared(wf_id)
            self.assertIsInstance(imported, wrappers.Workflow)
            imported.delete()
        else:
            self.skipTest('no published workflows, manually publish a workflow to run this test')

    def test_get_libraries(self):
        self._test_multi_get('library')

    def test_get_histories(self):
        self._test_multi_get('history')

    def test_get_workflows(self):
        self._test_multi_get('workflow')

    def _normalized_functions(self, obj_type):
        if obj_type == 'library':
            create = self.gi.libraries.create
            get_objs = self.gi.libraries.list
            get_prevs = self.gi.libraries.get_previews
            del_kwargs = {}
        elif obj_type == 'history':
            create = self.gi.histories.create
            get_objs = self.gi.histories.list
            get_prevs = self.gi.histories.get_previews
            del_kwargs = {'purge': True}
        elif obj_type == 'workflow':
            def create(name):
                with open(SAMPLE_FN) as f:
                    d = json.load(f)
                d['name'] = name
                return self.gi.workflows.import_new(d)

            get_objs = self.gi.workflows.list
            get_prevs = self.gi.workflows.get_previews
            del_kwargs = {}
        return create, get_objs, get_prevs, del_kwargs

    def _test_multi_get(self, obj_type):
        create, get_objs, get_prevs, del_kwargs = self._normalized_functions(
            obj_type)

        def ids(seq):
            return {_.id for _ in seq}

        names = ['test_%s' % uuid.uuid4().hex for _ in range(2)]
        objs = []
        try:
            objs = [create(_) for _ in names]
            self.assertLessEqual(ids(objs), ids(get_objs()))
            if obj_type != 'workflow':
                filtered = get_objs(name=names[0])
                self.assertEqual(len(filtered), 1)
                self.assertEqual(filtered[0].id, objs[0].id)
                del_id = objs[-1].id
                objs.pop().delete(**del_kwargs)
                self.assertIn(del_id, ids(get_prevs(deleted=True)))
            else:
                # Galaxy appends info strings to imported workflow names
                prev = get_prevs()[0]
                filtered = get_objs(name=prev.name)
                self.assertEqual(len(filtered), 1)
                self.assertEqual(filtered[0].id, prev.id)
        finally:
            for o in objs:
                o.delete(**del_kwargs)

    def test_delete_libraries_by_name(self):
        self._test_delete_by_name('library')

    def test_delete_histories_by_name(self):
        self._test_delete_by_name('history')

    def test_delete_workflows_by_name(self):
        self._test_delete_by_name('workflow')

    def _test_delete_by_name(self, obj_type):
        create, _, get_prevs, del_kwargs = self._normalized_functions(
            obj_type)
        name = 'test_%s' % uuid.uuid4().hex
        objs = [create(name) for _ in range(2)]  # noqa: F812
        final_name = objs[0].name
        prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted]
        self.assertEqual(len(prevs), len(objs))
        del_kwargs['name'] = final_name
        objs[0].gi_module.delete(**del_kwargs)
        prevs = [_ for _ in get_prevs(name=final_name) if not _.deleted]
        self.assertEqual(len(prevs), 0)


class TestLibrary(GalaxyObjectsTestBase):
    # just something that can be expected to be always up
    DS_URL = 'https://tools.ietf.org/rfc/rfc1866.txt'

    def setUp(self):
        super().setUp()
        self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)

    def tearDown(self):
        self.lib.delete()

    def test_root_folder(self):
        r = self.lib.root_folder
        self.assertIsNone(r.parent)

    def test_folder(self):
        name, desc = 'test_%s' % uuid.uuid4().hex, 'D'
        folder = self.lib.create_folder(name, description=desc)
        self.assertEqual(folder.name, name)
        self.assertEqual(folder.description, desc)
        self.assertIs(folder.container, self.lib)
        self.assertEqual(folder.parent.id, self.lib.root_folder.id)
        self.assertEqual(len(self.lib.content_infos), 2)
        self.assertEqual(len(self.lib.folder_ids), 2)
        self.assertIn(folder.id, self.lib.folder_ids)
        retrieved = self.lib.get_folder(folder.id)
        self.assertEqual(folder.id, retrieved.id)

    def _check_datasets(self, dss):
        self.assertEqual(len(dss), len(self.lib.dataset_ids))
        self.assertEqual({_.id for _ in dss}, set(self.lib.dataset_ids))
        for ds in dss:
            self.assertIs(ds.container, self.lib)

    def test_dataset(self):
        folder = self.lib.create_folder('test_%s' % uuid.uuid4().hex)
        ds = self.lib.upload_data(FOO_DATA, folder=folder)
        self.assertEqual(len(self.lib.content_infos), 3)
        self.assertEqual(len(self.lib.folder_ids), 2)
        self._check_datasets([ds])

    def test_dataset_from_url(self):
        if is_reachable(self.DS_URL):
            ds = self.lib.upload_from_url(self.DS_URL)
            self._check_datasets([ds])
        else:
            self.skipTest('%s not reachable' % self.DS_URL)

    def test_dataset_from_local(self):
        with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f:
            f.write(FOO_DATA)
            f.flush()
            ds = self.lib.upload_from_local(f.name)
        self._check_datasets([ds])

    def test_datasets_from_fs(self):
        bnames = ['f%d.txt' % i for i in range(2)]
        dss, fnames = upload_from_fs(self.lib, bnames)
        self._check_datasets(dss)
        dss, fnames = upload_from_fs(
            self.lib, bnames, link_data_only='link_to_files')
        for ds, fn in zip(dss, fnames):
            self.assertEqual(ds.file_name, fn)

    def test_copy_from_dataset(self):
        hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
        try:
            hda = hist.paste_content(FOO_DATA)
            ds = self.lib.copy_from_dataset(hda)
        finally:
            hist.delete(purge=True)
        self._check_datasets([ds])

    def test_get_dataset(self):
        ds = self.lib.upload_data(FOO_DATA)
        retrieved = self.lib.get_dataset(ds.id)
        self.assertEqual(ds.id, retrieved.id)

    def test_get_datasets(self):
        bnames = ['f%d.txt' % _ for _ in range(2)]
        dss, _ = upload_from_fs(self.lib, bnames)
        retrieved = self.lib.get_datasets()
        self.assertEqual(len(dss), len(retrieved))
        self.assertEqual({_.id for _ in dss}, {_.id for _ in retrieved})
        name = '/%s' % bnames[0]
        selected = self.lib.get_datasets(name=name)
        self.assertEqual(len(selected), 1)
        self.assertEqual(selected[0].name, bnames[0])


class TestLDContents(GalaxyObjectsTestBase):

    def setUp(self):
        super().setUp()
        self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
        self.ds = self.lib.upload_data(FOO_DATA)
        self.ds.wait()

    def tearDown(self):
        self.lib.delete()

    def test_dataset_get_stream(self):
        for idx, c in enumerate(self.ds.get_stream(chunk_size=1)):
            self.assertEqual(FOO_DATA[idx].encode(), c)

    def test_dataset_peek(self):
        fetched_data = self.ds.peek(chunk_size=4)
        self.assertEqual(FOO_DATA[0:4].encode(), fetched_data)

    def test_dataset_download(self):
        with tempfile.TemporaryFile() as f:
            self.ds.download(f)
            f.seek(0)
            self.assertEqual(FOO_DATA.encode(), f.read())

    def test_dataset_get_contents(self):
        self.assertEqual(FOO_DATA.encode(), self.ds.get_contents())

    def test_dataset_delete(self):
        self.ds.delete()
        # Cannot test this yet because the 'deleted' attribute is not exported
        # by the API at the moment
        # self.assertTrue(self.ds.deleted)

    def test_dataset_update(self):
        new_name = 'test_%s' % uuid.uuid4().hex
        new_misc_info = 'Annotation for %s' % new_name
        new_genome_build = 'hg19'
        updated_ldda = self.ds.update(name=new_name, misc_info=new_misc_info, genome_build=new_genome_build)
        self.assertEqual(self.ds.id, updated_ldda.id)
        self.assertEqual(self.ds.name, new_name)
        self.assertEqual(self.ds.misc_info, new_misc_info)
        self.assertEqual(self.ds.genome_build, new_genome_build)


class TestHistory(GalaxyObjectsTestBase):

    def setUp(self):
        super().setUp()
        self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)

    def tearDown(self):
        self.hist.delete(purge=True)

    def test_create_delete(self):
        name = 'test_%s' % uuid.uuid4().hex
        hist = self.gi.histories.create(name)
        self.assertEqual(hist.name, name)
        hist_id = hist.id
        self.assertIn(hist_id, [_.id for _ in self.gi.histories.list()])
        hist.delete(purge=True)
        self.assertFalse(hist.is_mapped)
        h = self.gi.histories.get(hist_id)
        self.assertTrue(h.deleted)

    def _check_dataset(self, hda):
        self.assertIsInstance(hda, wrappers.HistoryDatasetAssociation)
        self.assertIs(hda.container, self.hist)
        self.assertEqual(len(self.hist.dataset_ids), 1)
        self.assertEqual(self.hist.dataset_ids[0], hda.id)

    def test_import_dataset(self):
        lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
        lds = lib.upload_data(FOO_DATA)
        self.assertEqual(len(self.hist.dataset_ids), 0)
        hda = self.hist.import_dataset(lds)
        lib.delete()
        self._check_dataset(hda)

    def test_upload_file(self):
        with tempfile.NamedTemporaryFile(mode='w', prefix='bioblend_test_') as f:
            f.write(FOO_DATA)
            f.flush()
            hda = self.hist.upload_file(f.name)
        self._check_dataset(hda)

    def test_paste_content(self):
        hda = self.hist.paste_content(FOO_DATA)
        self._check_dataset(hda)

    def test_get_dataset(self):
        hda = self.hist.paste_content(FOO_DATA)
        retrieved = self.hist.get_dataset(hda.id)
        self.assertEqual(hda.id, retrieved.id)

    def test_get_datasets(self):
        bnames = ['f%d.txt' % _ for _ in range(2)]
        lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
        lds = upload_from_fs(lib, bnames)[0]
        hdas = [self.hist.import_dataset(_) for _ in lds]
        lib.delete()
        retrieved = self.hist.get_datasets()
        self.assertEqual(len(hdas), len(retrieved))
        self.assertEqual({_.id for _ in hdas}, {_.id for _ in retrieved})
        selected = self.hist.get_datasets(name=bnames[0])
        self.assertEqual(len(selected), 1)
        self.assertEqual(selected[0].name, bnames[0])

    def test_export_and_download(self):
        jeha_id = self.hist.export(wait=True, maxwait=60)
        self.assertTrue(jeha_id)
        tempdir = tempfile.mkdtemp(prefix='bioblend_test_')
        temp_fn = os.path.join(tempdir, 'export.tar.gz')
        try:
            with open(temp_fn, 'wb') as fo:
                self.hist.download(jeha_id, fo)
            self.assertTrue(tarfile.is_tarfile(temp_fn))
        finally:
            shutil.rmtree(tempdir)

    def test_update(self):
        new_name = 'test_%s' % uuid.uuid4().hex
        new_annotation = 'Annotation for %s' % new_name
        new_tags = ['tag1', 'tag2']
        updated_hist = self.hist.update(name=new_name, annotation=new_annotation, tags=new_tags)
        self.assertEqual(self.hist.id, updated_hist.id)
        self.assertEqual(self.hist.name, new_name)
        self.assertEqual(self.hist.annotation, new_annotation)
        self.assertEqual(self.hist.tags, new_tags)
        updated_hist = self.hist.update(published=True)
        self.assertEqual(self.hist.id, updated_hist.id)
        self.assertTrue(self.hist.published)

    def test_create_dataset_collection(self):
        self._create_collection_description()
        hdca = self.hist.create_dataset_collection(self.collection_description)
        self.assertIsInstance(hdca, wrappers.HistoryDatasetCollectionAssociation)
        self.assertEqual(hdca.collection_type, 'list')
        self.assertIs(hdca.container, self.hist)
        self.assertEqual(len(hdca.elements), 2)
        self.assertEqual(self.dataset1.id, hdca.elements[0]['object']['id'])
        self.assertEqual(self.dataset2.id, hdca.elements[1]['object']['id'])

    def test_delete_dataset_collection(self):
        self._create_collection_description()
        hdca = self.hist.create_dataset_collection(self.collection_description)
        hdca.delete()
        self.assertTrue(hdca.deleted)

    def _create_collection_description(self):
        self.dataset1 = self.hist.paste_content(FOO_DATA)
        self.dataset2 = self.hist.paste_content(FOO_DATA_2)
        self.collection_description = dataset_collections.CollectionDescription(
            name="MyDatasetList",
            elements=[
                dataset_collections.HistoryDatasetElement(name="sample1", id=self.dataset1.id),
                dataset_collections.HistoryDatasetElement(name="sample2", id=self.dataset2.id),
            ]
        )


class TestHDAContents(GalaxyObjectsTestBase):

    def setUp(self):
        super().setUp()
        self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)
        self.ds = self.hist.paste_content(FOO_DATA)
        self.ds.wait()

    def tearDown(self):
        self.hist.delete(purge=True)

    def test_dataset_get_stream(self):
        for idx, c in enumerate(self.ds.get_stream(chunk_size=1)):
            self.assertEqual(FOO_DATA[idx].encode(), c)

    def test_dataset_peek(self):
        fetched_data = self.ds.peek(chunk_size=4)
        self.assertEqual(FOO_DATA[0:4].encode(), fetched_data)

    def test_dataset_download(self):
        with tempfile.TemporaryFile() as f:
            self.ds.download(f)
            f.seek(0)
            self.assertEqual(FOO_DATA.encode(), f.read())

    def test_dataset_get_contents(self):
        self.assertEqual(FOO_DATA.encode(), self.ds.get_contents())

    def test_dataset_update(self):
        new_name = 'test_%s' % uuid.uuid4().hex
        new_annotation = 'Annotation for %s' % new_name
        new_genome_build = 'hg19'
        updated_hda = self.ds.update(name=new_name, annotation=new_annotation, genome_build=new_genome_build)
        self.assertEqual(self.ds.id, updated_hda.id)
        self.assertEqual(self.ds.name, new_name)
        self.assertEqual(self.ds.annotation, new_annotation)
        self.assertEqual(self.ds.genome_build, new_genome_build)

    def test_dataset_delete(self):
        self.ds.delete()
        self.assertTrue(self.ds.deleted)
        self.assertFalse(self.ds.purged)

    def test_dataset_purge(self):
        self.ds.delete(purge=True)
        self.assertTrue(self.ds.deleted)
        self.assertTrue(self.ds.purged)


class TestRunWorkflow(GalaxyObjectsTestBase):

    def setUp(self):
        super().setUp()
        self.lib = self.gi.libraries.create('test_%s' % uuid.uuid4().hex)
        with open(SAMPLE_FN) as f:
            self.wf = self.gi.workflows.import_new(f.read())
        self.contents = ['one\ntwo\n', '1\n2\n']
        self.inputs = [self.lib.upload_data(_) for _ in self.contents]

    def tearDown(self):
        self.wf.delete()
        self.lib.delete()

    def _test(self, existing_hist=False, params=False):
        hist_name = 'test_%s' % uuid.uuid4().hex
        if existing_hist:
            hist = self.gi.histories.create(hist_name)
        else:
            hist = hist_name
        if params:
            params = {'Paste1': {'delimiter': 'U'}}
            sep = '_'  # 'U' maps to '_' in the paste tool
        else:
            params = None
            sep = '\t'  # default
        input_map = {'Input 1': self.inputs[0], 'Input 2': self.inputs[1]}
        sys.stderr.write(os.linesep)
        outputs, out_hist = self.wf.run(
            input_map, hist, params=params, wait=True, polling_interval=1)
        self.assertEqual(len(outputs), 1)
        out_ds = outputs[0]
        self.assertIn(out_ds.id, out_hist.dataset_ids)
        res = out_ds.get_contents()
        exp_rows = zip(*(_.splitlines() for _ in self.contents))
        exp_res = ("\n".join(sep.join(t) for t in exp_rows) + "\n").encode()
        self.assertEqual(res, exp_res)
        if existing_hist:
            self.assertEqual(out_hist.id, hist.id)
        out_hist.delete(purge=True)

    def test_existing_history(self):
        self._test(existing_hist=True)

    def test_new_history(self):
        self._test(existing_hist=False)

    def test_params(self):
        self._test(params=True)


class TestRunDatasetCollectionWorkflow(GalaxyObjectsTestBase):

    def setUp(self):
        super().setUp()
        with open(SAMPLE_WF_COLL_FN) as f:
            self.wf = self.gi.workflows.import_new(f.read())
        self.hist = self.gi.histories.create('test_%s' % uuid.uuid4().hex)

    def tearDown(self):
        self.wf.delete()
        self.hist.delete(purge=True)

    def test_run_workflow_with_dataset_collection(self):
        dataset1 = self.hist.paste_content(FOO_DATA)
        dataset2 = self.hist.paste_content(FOO_DATA_2)
        collection_description = dataset_collections.CollectionDescription(
            name="MyDatasetList",
            elements=[
                dataset_collections.HistoryDatasetElement(name="sample1", id=dataset1.id),
                dataset_collections.HistoryDatasetElement(name="sample2", id=dataset2.id),
            ]
        )
        dataset_collection = self.hist.create_dataset_collection(collection_description)
        input_map = {"Input Dataset Collection": dataset_collection,
                     "Input 2": dataset1}
        outputs, out_hist = self.wf.run(input_map, self.hist, wait=True)
        self.assertEqual(len(outputs), 1)
        out_hdca = outputs[0]
        self.assertIsInstance(out_hdca, wrappers.HistoryDatasetCollectionAssociation)
        self.assertEqual(out_hdca.collection_type, 'list')
        self.assertEqual(len(out_hdca.elements), 2)
        self.assertEqual(out_hist.id, self.hist.id)


class TestJob(GalaxyObjectsTestBase):

    def test_get(self):
        job_prevs = self.gi.jobs.get_previews()
        if len(job_prevs) > 0:
            job_prev = job_prevs[0]
            self.assertIsInstance(job_prev, wrappers.JobPreview)
            job = self.gi.jobs.get(job_prev.id)
            self.assertIsInstance(job, wrappers.Job)
            self.assertEqual(job.id, job_prev.id)
        for job in self.gi.jobs.list():
            self.assertIsInstance(job, wrappers.Job)


def suite():
    loader = unittest.TestLoader()
    s = unittest.TestSuite()
    s.addTests([loader.loadTestsFromTestCase(c) for c in (
        TestWrapper,
        TestWorkflow,
        TestGalaxyInstance,
        TestLibrary,
        TestLDContents,
        TestHistory,
        TestHDAContents,
        TestRunWorkflow,
    )])
    return s


if __name__ == '__main__':
    tests = suite()
    RUNNER = unittest.TextTestRunner(verbosity=2)
    RUNNER.run(tests)