comparison env/lib/python3.9/site-packages/planemo/database/postgres.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Module describes a :class:`DatabaseSource` for local postgres databases."""
2
3 import subprocess
4
5 from galaxy.util import unicodify
6
7 from planemo.io import communicate
8 from .interface import DatabaseSource
9
10
11 class ExecutesPostgresSqlMixin:
12
13 def list_databases(self):
14 """Use `psql --list` to generate a list of identifiers."""
15 command_builder = self._psql_command_builder("--list")
16 stdout = unicodify(self._communicate(command_builder))
17 output_lines = stdout.splitlines()
18 identifiers = []
19 for line in output_lines:
20 identifiers.append(line.split("|")[0].strip())
21 return [i for i in identifiers if i]
22
23 def create_database(self, identifier):
24 """Use `psql -c "create database"` to create a database."""
25 sql = "create database %s;" % identifier
26 self._run_sql_command(sql)
27
28 def delete_database(self, identifier):
29 """Use `psql -c "drop database"` to delete a database."""
30 sql = "drop database %s;" % identifier
31 self._run_sql_command(sql)
32
33 def _run_sql_command(self, sql):
34 # communicate is just joining commands so we need to modify the
35 # sql as an argument - it shouldn't do this.
36 sql_arg = '%s' % sql
37 command_builder = self._psql_command_builder("--command", sql_arg)
38 self._communicate(command_builder)
39
40 def _communicate(self, command_builder):
41 stdout, _ = communicate(
42 command_builder.command,
43 stdout=subprocess.PIPE,
44 stderr=subprocess.PIPE,
45 )
46 return stdout
47
48
49 class LocalPostgresDatabaseSource(ExecutesPostgresSqlMixin, DatabaseSource):
50 """Local postgres database source managed through psql application."""
51
52 def __init__(self, **kwds):
53 """Construct a postgres database source from planemo configuration."""
54 self.psql_path = kwds.get("postgres_psql_path", None) or 'psql'
55 self.database_user = kwds.get("postgres_database_user", None)
56 self.database_host = kwds.get("postgres_database_host", None)
57 self.database_port = kwds.get("postgres_database_port", None)
58 self._kwds = kwds
59
60 def sqlalchemy_url(self, identifier):
61 """Return URL or form postgresql://username:password@localhost/mydatabase."""
62 hostname = self.database_host or "localhost"
63 if self.database_port:
64 hostname += ":%s" % self.database_port
65 return "postgresql://%s@%s/%s" % (
66 self.database_user,
67 hostname,
68 identifier
69 )
70
71 def _psql_command_builder(self, *args):
72 command_builder = _CommandBuilder(self.psql_path)
73 # Print only tuples so output is easier to parse
74 command_builder.append_command("--tuples-only")
75
76 # Specify connection information
77 if self.database_user:
78 command_builder.append_command("--username", self.database_user)
79 if self.database_host:
80 command_builder.append_command("--host", self.database_host)
81 if self.database_port:
82 command_builder.append_command("--port", self.database_port)
83 command_builder.append_command("-P", "pager=off")
84 command_builder.extend_command(args)
85 return command_builder
86
87
88 class _CommandBuilder(object):
89
90 def __init__(self, *args):
91 self.command = list(args)
92
93 def append_command(self, *args_or_none):
94 args_or_none = args_or_none or []
95 for arg_or_none in args_or_none:
96 if arg_or_none is not None:
97 self.command.append(arg_or_none)
98
99 def extend_command(self, args):
100 for arg in (args or []):
101 self.append_command(arg)
102
103
104 __all__ = (
105 "LocalPostgresDatabaseSource",
106 )