Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/planemo/database/postgres.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 (2021-03-22) |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/planemo/database/postgres.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,106 @@ +"""Module describes a :class:`DatabaseSource` for local postgres databases.""" + +import subprocess + +from galaxy.util import unicodify + +from planemo.io import communicate +from .interface import DatabaseSource + + +class ExecutesPostgresSqlMixin: + + def list_databases(self): + """Use `psql --list` to generate a list of identifiers.""" + command_builder = self._psql_command_builder("--list") + stdout = unicodify(self._communicate(command_builder)) + output_lines = stdout.splitlines() + identifiers = [] + for line in output_lines: + identifiers.append(line.split("|")[0].strip()) + return [i for i in identifiers if i] + + def create_database(self, identifier): + """Use `psql -c "create database"` to create a database.""" + sql = "create database %s;" % identifier + self._run_sql_command(sql) + + def delete_database(self, identifier): + """Use `psql -c "drop database"` to delete a database.""" + sql = "drop database %s;" % identifier + self._run_sql_command(sql) + + def _run_sql_command(self, sql): + # communicate is just joining commands so we need to modify the + # sql as an argument - it shouldn't do this. + sql_arg = '%s' % sql + command_builder = self._psql_command_builder("--command", sql_arg) + self._communicate(command_builder) + + def _communicate(self, command_builder): + stdout, _ = communicate( + command_builder.command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + return stdout + + +class LocalPostgresDatabaseSource(ExecutesPostgresSqlMixin, DatabaseSource): + """Local postgres database source managed through psql application.""" + + def __init__(self, **kwds): + """Construct a postgres database source from planemo configuration.""" + self.psql_path = kwds.get("postgres_psql_path", None) or 'psql' + self.database_user = kwds.get("postgres_database_user", None) + self.database_host = kwds.get("postgres_database_host", None) + self.database_port = kwds.get("postgres_database_port", None) + self._kwds = kwds + + def sqlalchemy_url(self, identifier): + """Return URL or form postgresql://username:password@localhost/mydatabase.""" + hostname = self.database_host or "localhost" + if self.database_port: + hostname += ":%s" % self.database_port + return "postgresql://%s@%s/%s" % ( + self.database_user, + hostname, + identifier + ) + + def _psql_command_builder(self, *args): + command_builder = _CommandBuilder(self.psql_path) + # Print only tuples so output is easier to parse + command_builder.append_command("--tuples-only") + + # Specify connection information + if self.database_user: + command_builder.append_command("--username", self.database_user) + if self.database_host: + command_builder.append_command("--host", self.database_host) + if self.database_port: + command_builder.append_command("--port", self.database_port) + command_builder.append_command("-P", "pager=off") + command_builder.extend_command(args) + return command_builder + + +class _CommandBuilder(object): + + def __init__(self, *args): + self.command = list(args) + + def append_command(self, *args_or_none): + args_or_none = args_or_none or [] + for arg_or_none in args_or_none: + if arg_or_none is not None: + self.command.append(arg_or_none) + + def extend_command(self, args): + for arg in (args or []): + self.append_command(arg) + + +__all__ = ( + "LocalPostgresDatabaseSource", +)