comparison env/lib/python3.9/site-packages/galaxy/util/properties.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """ Module used to blend ini, environment, and explicit dictionary properties
2 to determine application configuration. Some hard coded defaults for Galaxy but
3 this should be reusable by tool shed and pulsar as well.
4 """
5 import os
6 import os.path
7 import sys
8 from configparser import ConfigParser
9 from functools import partial
10 from itertools import product, starmap
11
12 import yaml
13
14 from galaxy.exceptions import InvalidFileFormatError
15 from galaxy.util.path import extensions, has_ext, joinext
16
17
18 def find_config_file(names, exts=None, dirs=None, include_samples=False):
19 """Locate a config file in multiple directories, with multiple extensions.
20
21 >>> from shutil import rmtree
22 >>> from tempfile import mkdtemp
23 >>> def touch(d, f):
24 ... open(os.path.join(d, f), 'w').close()
25 >>> def _find_config_file(*args, **kwargs):
26 ... return find_config_file(*args, **kwargs).replace(d, '')
27 >>> d = mkdtemp()
28 >>> d1 = os.path.join(d, 'd1')
29 >>> d2 = os.path.join(d, 'd2')
30 >>> os.makedirs(d1)
31 >>> os.makedirs(d2)
32 >>> touch(d1, 'foo.ini')
33 >>> touch(d1, 'foo.bar')
34 >>> touch(d1, 'baz.ini.sample')
35 >>> touch(d2, 'foo.yaml')
36 >>> touch(d2, 'baz.yml')
37 >>> _find_config_file('foo', dirs=(d1, d2))
38 '/d1/foo.ini'
39 >>> _find_config_file('baz', dirs=(d1, d2))
40 '/d2/baz.yml'
41 >>> _find_config_file('baz', dirs=(d1, d2), include_samples=True)
42 '/d2/baz.yml'
43 >>> _find_config_file('baz', dirs=(d1,), include_samples=True)
44 '/d1/baz.ini.sample'
45 >>> _find_config_file('foo', dirs=(d2, d1))
46 '/d2/foo.yaml'
47 >>> find_config_file('quux', dirs=(d,))
48 >>> _find_config_file('foo', exts=('bar', 'ini'), dirs=(d1,))
49 '/d1/foo.bar'
50 >>> rmtree(d)
51 """
52 found = __find_config_files(
53 names,
54 exts=exts or extensions['yaml'] + extensions['ini'],
55 dirs=dirs or [os.getcwd(), os.path.join(os.getcwd(), 'config')],
56 include_samples=include_samples,
57 )
58 if not found:
59 return None
60 # doesn't really make sense to log here but we should probably generate a warning of some kind if more than one
61 # config is found.
62 return found[0]
63
64
65 def load_app_properties(
66 kwds=None,
67 ini_file=None,
68 ini_section=None,
69 config_file=None,
70 config_section=None,
71 config_prefix="GALAXY_CONFIG_"
72 ):
73 if config_file is None:
74 config_file = ini_file
75 config_section = ini_section
76
77 # read from file or init w/no file
78 if config_file:
79 properties = read_properties_from_file(config_file, config_section)
80 else:
81 properties = {'__file__': None}
82
83 # update from kwds
84 if kwds:
85 properties.update(kwds)
86
87 # update from env
88 override_prefix = "%sOVERRIDE_" % config_prefix
89 for key in os.environ:
90 if key.startswith(override_prefix):
91 config_key = key[len(override_prefix):].lower()
92 properties[config_key] = os.environ[key]
93 elif key.startswith(config_prefix):
94 config_key = key[len(config_prefix):].lower()
95 if config_key not in properties:
96 properties[config_key] = os.environ[key]
97
98 return properties
99
100
101 def read_properties_from_file(config_file, config_section=None):
102 properties = {}
103 if has_ext(config_file, 'yaml', aliases=True, ignore='sample'):
104 if config_section is None:
105 config_section = "galaxy"
106 properties.update(__default_properties(config_file))
107 raw_properties = _read_from_yaml_file(config_file)
108 if raw_properties:
109 properties.update(raw_properties.get(config_section) or {})
110 elif has_ext(config_file, 'ini', aliases=True, ignore='sample'):
111 if config_section is None:
112 config_section = "app:main"
113 parser = nice_config_parser(config_file) # default properties loaded w/parser
114 if parser.has_section(config_section):
115 properties.update(dict(parser.items(config_section)))
116 else:
117 properties.update(parser.defaults())
118 else:
119 raise InvalidFileFormatError("File '%s' doesn't have a supported extension" % config_file)
120 return properties
121
122
123 def _read_from_yaml_file(path):
124 with open(path) as f:
125 return yaml.safe_load(f)
126
127
128 def nice_config_parser(path):
129 parser = NicerConfigParser(path, defaults=__default_properties(path))
130 parser.optionxform = str # Don't lower-case keys
131 with open(path) as f:
132 parser.read_file(f)
133 return parser
134
135
136 class NicerConfigParser(ConfigParser):
137
138 def __init__(self, filename, *args, **kw):
139 ConfigParser.__init__(self, *args, **kw)
140 self.filename = filename
141 if hasattr(self, '_interpolation'):
142 self._interpolation = self.InterpolateWrapper(self._interpolation)
143
144 read_file = getattr(ConfigParser, 'read_file', ConfigParser.readfp)
145 read_file.__doc__ = ""
146
147 def defaults(self):
148 """Return the defaults, with their values interpolated (with the
149 defaults dict itself)
150
151 Mainly to support defaults using values such as %(here)s
152 """
153 defaults = ConfigParser.defaults(self).copy()
154 for key, val in defaults.items():
155 defaults[key] = self.get('DEFAULT', key) or val
156 return defaults
157
158 def _interpolate(self, section, option, rawval, vars):
159 # Python < 3.2
160 try:
161 return ConfigParser._interpolate(
162 self, section, option, rawval, vars)
163 except Exception:
164 e = sys.exc_info()[1]
165 args = list(e.args)
166 args[0] = f'Error in file {self.filename}: {e}'
167 e.args = tuple(args)
168 e.message = args[0]
169 raise
170
171 class InterpolateWrapper:
172 # Python >= 3.2
173 def __init__(self, original):
174 self._original = original
175
176 def __getattr__(self, name):
177 return getattr(self._original, name)
178
179 def before_get(self, parser, section, option, value, defaults):
180 try:
181 return self._original.before_get(parser, section, option,
182 value, defaults)
183 except Exception:
184 e = sys.exc_info()[1]
185 args = list(e.args)
186 args[0] = f'Error in file {parser.filename}: {e}'
187 e.args = tuple(args)
188 e.message = args[0]
189 raise
190
191
192 def _running_from_source():
193 paths = ['run.sh', 'lib/galaxy/__init__.py', 'scripts/common_startup.sh']
194 return all(map(os.path.exists, paths))
195
196
197 running_from_source = _running_from_source()
198
199
200 def get_data_dir(properties):
201 data_dir = properties.get('data_dir', None)
202 if data_dir is None:
203 if running_from_source:
204 data_dir = './database'
205 else:
206 config_dir = properties.get('config_dir', os.path.dirname(properties['__file__']))
207 data_dir = os.path.join(config_dir, 'data')
208 return data_dir
209
210
211 def __get_all_configs(dirs, names):
212 return list(filter(os.path.exists, starmap(os.path.join, product(dirs, names))))
213
214
215 def __find_config_files(names, exts=None, dirs=None, include_samples=False):
216 sample_names = []
217 if isinstance(names, str):
218 names = [names]
219 if not dirs:
220 dirs = [os.getcwd()]
221 if exts:
222 # add exts to names, converts back into a list because it's going to be small and we might consume names twice
223 names = list(starmap(joinext, product(names, exts)))
224 if include_samples:
225 sample_names = map(partial(joinext, ext='sample'), names)
226 # check for all names in each dir before moving to the next dir. could do it the other way around but that makes
227 # less sense to me.
228 return __get_all_configs(dirs, names) or __get_all_configs(dirs, sample_names)
229
230
231 def __default_properties(path):
232 return {
233 'here': os.path.dirname(os.path.abspath(path)),
234 '__file__': os.path.abspath(path)
235 }
236
237
238 __all__ = ('find_config_file', 'get_data_dir', 'load_app_properties', 'NicerConfigParser', 'running_from_source')