Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/galaxy/util/dbkeys.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/galaxy/util/dbkeys.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,151 @@ +""" +Functionality for dealing with dbkeys. +""" +import logging +import os.path +import re +from json import loads + +from galaxy.util import ( + galaxy_directory, + unicodify, +) +from galaxy.util.object_wrapper import sanitize_lists_to_string + +log = logging.getLogger(__name__) + + +def read_dbnames(filename): + """ Read build names from file """ + db_names = [] + try: + ucsc_builds = {} + man_builds = [] # assume these are integers + name_to_db_base = {} + if filename is None: + # Should only be happening with the galaxy.tools.parameters.basic:GenomeBuildParameter docstring unit test + filename = os.path.join(galaxy_directory(), 'tool-data', 'shared', 'ucsc', 'builds.txt.sample') + for line in open(filename): + try: + if line[0:1] == "#": + continue + fields = line.replace("\r", "").replace("\n", "").split("\t") + # Special case of unspecified build is at top of list + if fields[0] == "?": + db_names.insert(0, (fields[0], fields[1])) + continue + try: # manual build (i.e. microbes) + int(fields[0]) + man_builds.append((fields[1], fields[0])) + except Exception: # UCSC build + db_base = fields[0].rstrip('0123456789') + if db_base not in ucsc_builds: + ucsc_builds[db_base] = [] + name_to_db_base[fields[1]] = db_base + # we want to sort within a species numerically by revision number + build_rev = re.compile(r'\d+$') + try: + build_rev = int(build_rev.findall(fields[0])[0]) + except Exception: + build_rev = 0 + ucsc_builds[db_base].append((build_rev, fields[0], fields[1])) + except Exception: + continue + sort_names = sorted(name_to_db_base.keys()) + for name in sort_names: + db_base = name_to_db_base[name] + ucsc_builds[db_base].sort() + ucsc_builds[db_base].reverse() + ucsc_builds[db_base] = [(build, name) for _, build, name in ucsc_builds[db_base]] + db_names = list(db_names + ucsc_builds[db_base]) + if len(db_names) > 1 and len(man_builds) > 0: + db_names.append((GenomeBuilds.default_value, '----- Additional Species Are Below -----')) + man_builds.sort() + man_builds = [(build, name) for name, build in man_builds] + db_names = list(db_names + man_builds) + except Exception as e: + log.error("ERROR: Unable to read builds file: %s", unicodify(e)) + return db_names + + +class GenomeBuilds: + default_value = "?" + default_name = "unspecified (?)" + + def __init__(self, app, data_table_name="__dbkeys__", load_old_style=True): + self._app = app + self._data_table_name = data_table_name + self._static_chrom_info_path = app.config.len_file_path + # A dbkey can be listed multiple times, but with different names, so we can't use dictionaries for lookups + if load_old_style: + self._static_dbkeys = list(read_dbnames(app.config.builds_file_path)) + else: + self._static_dbkeys = [] + + def get_genome_build_names(self, trans=None): + # FIXME: how to deal with key duplicates? + rval = [(self.default_value, self.default_name)] + # load user custom genome builds + if trans is not None: + if trans.history: + # This is a little bit Odd. We are adding every .len file in the current history to dbkey list, + # but this is previous behavior from trans.db_names, so we'll continue to do it. + # It does allow one-off, history specific dbkeys to be created by a user. But we are not filtering, + # so a len file will be listed twice (as the build name and again as dataset name), + # if custom dbkey creation/conversion occurred within the current history. + datasets = trans.sa_session.query(self._app.model.HistoryDatasetAssociation) \ + .filter_by(deleted=False, history_id=trans.history.id, extension="len") + for dataset in datasets: + rval.append((dataset.dbkey, f"{dataset.name} ({dataset.dbkey}) [History]")) + user = trans.user + if user and hasattr(user, 'preferences') and 'dbkeys' in user.preferences: + user_keys = loads(user.preferences['dbkeys']) + for key, chrom_dict in user_keys.items(): + rval.append((key, "{} ({}) [Custom]".format(chrom_dict['name'], key))) + # Load old builds.txt static keys + rval.extend(self._static_dbkeys) + # load dbkeys from dbkey data table + dbkey_table = self._app.tool_data_tables.get(self._data_table_name, None) + if dbkey_table is not None: + for field_dict in dbkey_table.get_named_fields_list(): + rval.append((field_dict['value'], field_dict['name'])) + return rval + + def get_chrom_info(self, dbkey, trans=None, custom_build_hack_get_len_from_fasta_conversion=True): + # FIXME: flag to turn off custom_build_hack_get_len_from_fasta_conversion should not be required + chrom_info = None + db_dataset = None + # Collect chromInfo from custom builds + if trans: + db_dataset = trans.db_dataset_for(dbkey) + if db_dataset: + chrom_info = db_dataset.file_name + else: + # Do Custom Build handling + if trans.user and ('dbkeys' in trans.user.preferences) and (dbkey in loads(trans.user.preferences['dbkeys'])): + custom_build_dict = loads(trans.user.preferences['dbkeys'])[dbkey] + # HACK: the attempt to get chrom_info below will trigger the + # fasta-to-len converter if the dataset is not available or, + # which will in turn create a recursive loop when + # running the fasta-to-len tool. So, use a hack in the second + # condition below to avoid getting chrom_info when running the + # fasta-to-len converter. + if 'fasta' in custom_build_dict and custom_build_hack_get_len_from_fasta_conversion: + # Build is defined by fasta; get len file, which is obtained from converting fasta. + build_fasta_dataset = trans.sa_session.query(trans.app.model.HistoryDatasetAssociation).get(custom_build_dict['fasta']) + chrom_info = build_fasta_dataset.get_converted_dataset(trans, 'len').file_name + elif 'len' in custom_build_dict: + # Build is defined by len file, so use it. + chrom_info = trans.sa_session.query(trans.app.model.HistoryDatasetAssociation).get(custom_build_dict['len']).file_name + # Check Data table + if not chrom_info: + dbkey_table = self._app.tool_data_tables.get(self._data_table_name, None) + if dbkey_table is not None: + chrom_info = dbkey_table.get_entry('value', dbkey, 'len_path', default=None) + # use configured server len path + if not chrom_info: + # Default to built-in build. + # Since we are using an unverified dbkey, we will sanitize the dbkey before use + chrom_info = os.path.join(self._static_chrom_info_path, "%s.len" % sanitize_lists_to_string(dbkey)) + chrom_info = os.path.abspath(chrom_info) + return (chrom_info, db_dataset)