Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/galaxy/util/properties.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """ Module used to blend ini, environment, and explicit dictionary properties | |
2 to determine application configuration. Some hard coded defaults for Galaxy but | |
3 this should be reusable by tool shed and pulsar as well. | |
4 """ | |
5 import os | |
6 import os.path | |
7 import sys | |
8 from configparser import ConfigParser | |
9 from functools import partial | |
10 from itertools import product, starmap | |
11 | |
12 import yaml | |
13 | |
14 from galaxy.exceptions import InvalidFileFormatError | |
15 from galaxy.util.path import extensions, has_ext, joinext | |
16 | |
17 | |
18 def find_config_file(names, exts=None, dirs=None, include_samples=False): | |
19 """Locate a config file in multiple directories, with multiple extensions. | |
20 | |
21 >>> from shutil import rmtree | |
22 >>> from tempfile import mkdtemp | |
23 >>> def touch(d, f): | |
24 ... open(os.path.join(d, f), 'w').close() | |
25 >>> def _find_config_file(*args, **kwargs): | |
26 ... return find_config_file(*args, **kwargs).replace(d, '') | |
27 >>> d = mkdtemp() | |
28 >>> d1 = os.path.join(d, 'd1') | |
29 >>> d2 = os.path.join(d, 'd2') | |
30 >>> os.makedirs(d1) | |
31 >>> os.makedirs(d2) | |
32 >>> touch(d1, 'foo.ini') | |
33 >>> touch(d1, 'foo.bar') | |
34 >>> touch(d1, 'baz.ini.sample') | |
35 >>> touch(d2, 'foo.yaml') | |
36 >>> touch(d2, 'baz.yml') | |
37 >>> _find_config_file('foo', dirs=(d1, d2)) | |
38 '/d1/foo.ini' | |
39 >>> _find_config_file('baz', dirs=(d1, d2)) | |
40 '/d2/baz.yml' | |
41 >>> _find_config_file('baz', dirs=(d1, d2), include_samples=True) | |
42 '/d2/baz.yml' | |
43 >>> _find_config_file('baz', dirs=(d1,), include_samples=True) | |
44 '/d1/baz.ini.sample' | |
45 >>> _find_config_file('foo', dirs=(d2, d1)) | |
46 '/d2/foo.yaml' | |
47 >>> find_config_file('quux', dirs=(d,)) | |
48 >>> _find_config_file('foo', exts=('bar', 'ini'), dirs=(d1,)) | |
49 '/d1/foo.bar' | |
50 >>> rmtree(d) | |
51 """ | |
52 found = __find_config_files( | |
53 names, | |
54 exts=exts or extensions['yaml'] + extensions['ini'], | |
55 dirs=dirs or [os.getcwd(), os.path.join(os.getcwd(), 'config')], | |
56 include_samples=include_samples, | |
57 ) | |
58 if not found: | |
59 return None | |
60 # doesn't really make sense to log here but we should probably generate a warning of some kind if more than one | |
61 # config is found. | |
62 return found[0] | |
63 | |
64 | |
65 def load_app_properties( | |
66 kwds=None, | |
67 ini_file=None, | |
68 ini_section=None, | |
69 config_file=None, | |
70 config_section=None, | |
71 config_prefix="GALAXY_CONFIG_" | |
72 ): | |
73 if config_file is None: | |
74 config_file = ini_file | |
75 config_section = ini_section | |
76 | |
77 # read from file or init w/no file | |
78 if config_file: | |
79 properties = read_properties_from_file(config_file, config_section) | |
80 else: | |
81 properties = {'__file__': None} | |
82 | |
83 # update from kwds | |
84 if kwds: | |
85 properties.update(kwds) | |
86 | |
87 # update from env | |
88 override_prefix = "%sOVERRIDE_" % config_prefix | |
89 for key in os.environ: | |
90 if key.startswith(override_prefix): | |
91 config_key = key[len(override_prefix):].lower() | |
92 properties[config_key] = os.environ[key] | |
93 elif key.startswith(config_prefix): | |
94 config_key = key[len(config_prefix):].lower() | |
95 if config_key not in properties: | |
96 properties[config_key] = os.environ[key] | |
97 | |
98 return properties | |
99 | |
100 | |
101 def read_properties_from_file(config_file, config_section=None): | |
102 properties = {} | |
103 if has_ext(config_file, 'yaml', aliases=True, ignore='sample'): | |
104 if config_section is None: | |
105 config_section = "galaxy" | |
106 properties.update(__default_properties(config_file)) | |
107 raw_properties = _read_from_yaml_file(config_file) | |
108 if raw_properties: | |
109 properties.update(raw_properties.get(config_section) or {}) | |
110 elif has_ext(config_file, 'ini', aliases=True, ignore='sample'): | |
111 if config_section is None: | |
112 config_section = "app:main" | |
113 parser = nice_config_parser(config_file) # default properties loaded w/parser | |
114 if parser.has_section(config_section): | |
115 properties.update(dict(parser.items(config_section))) | |
116 else: | |
117 properties.update(parser.defaults()) | |
118 else: | |
119 raise InvalidFileFormatError("File '%s' doesn't have a supported extension" % config_file) | |
120 return properties | |
121 | |
122 | |
123 def _read_from_yaml_file(path): | |
124 with open(path) as f: | |
125 return yaml.safe_load(f) | |
126 | |
127 | |
128 def nice_config_parser(path): | |
129 parser = NicerConfigParser(path, defaults=__default_properties(path)) | |
130 parser.optionxform = str # Don't lower-case keys | |
131 with open(path) as f: | |
132 parser.read_file(f) | |
133 return parser | |
134 | |
135 | |
136 class NicerConfigParser(ConfigParser): | |
137 | |
138 def __init__(self, filename, *args, **kw): | |
139 ConfigParser.__init__(self, *args, **kw) | |
140 self.filename = filename | |
141 if hasattr(self, '_interpolation'): | |
142 self._interpolation = self.InterpolateWrapper(self._interpolation) | |
143 | |
144 read_file = getattr(ConfigParser, 'read_file', ConfigParser.readfp) | |
145 read_file.__doc__ = "" | |
146 | |
147 def defaults(self): | |
148 """Return the defaults, with their values interpolated (with the | |
149 defaults dict itself) | |
150 | |
151 Mainly to support defaults using values such as %(here)s | |
152 """ | |
153 defaults = ConfigParser.defaults(self).copy() | |
154 for key, val in defaults.items(): | |
155 defaults[key] = self.get('DEFAULT', key) or val | |
156 return defaults | |
157 | |
158 def _interpolate(self, section, option, rawval, vars): | |
159 # Python < 3.2 | |
160 try: | |
161 return ConfigParser._interpolate( | |
162 self, section, option, rawval, vars) | |
163 except Exception: | |
164 e = sys.exc_info()[1] | |
165 args = list(e.args) | |
166 args[0] = f'Error in file {self.filename}: {e}' | |
167 e.args = tuple(args) | |
168 e.message = args[0] | |
169 raise | |
170 | |
171 class InterpolateWrapper: | |
172 # Python >= 3.2 | |
173 def __init__(self, original): | |
174 self._original = original | |
175 | |
176 def __getattr__(self, name): | |
177 return getattr(self._original, name) | |
178 | |
179 def before_get(self, parser, section, option, value, defaults): | |
180 try: | |
181 return self._original.before_get(parser, section, option, | |
182 value, defaults) | |
183 except Exception: | |
184 e = sys.exc_info()[1] | |
185 args = list(e.args) | |
186 args[0] = f'Error in file {parser.filename}: {e}' | |
187 e.args = tuple(args) | |
188 e.message = args[0] | |
189 raise | |
190 | |
191 | |
192 def _running_from_source(): | |
193 paths = ['run.sh', 'lib/galaxy/__init__.py', 'scripts/common_startup.sh'] | |
194 return all(map(os.path.exists, paths)) | |
195 | |
196 | |
197 running_from_source = _running_from_source() | |
198 | |
199 | |
200 def get_data_dir(properties): | |
201 data_dir = properties.get('data_dir', None) | |
202 if data_dir is None: | |
203 if running_from_source: | |
204 data_dir = './database' | |
205 else: | |
206 config_dir = properties.get('config_dir', os.path.dirname(properties['__file__'])) | |
207 data_dir = os.path.join(config_dir, 'data') | |
208 return data_dir | |
209 | |
210 | |
211 def __get_all_configs(dirs, names): | |
212 return list(filter(os.path.exists, starmap(os.path.join, product(dirs, names)))) | |
213 | |
214 | |
215 def __find_config_files(names, exts=None, dirs=None, include_samples=False): | |
216 sample_names = [] | |
217 if isinstance(names, str): | |
218 names = [names] | |
219 if not dirs: | |
220 dirs = [os.getcwd()] | |
221 if exts: | |
222 # add exts to names, converts back into a list because it's going to be small and we might consume names twice | |
223 names = list(starmap(joinext, product(names, exts))) | |
224 if include_samples: | |
225 sample_names = map(partial(joinext, ext='sample'), names) | |
226 # check for all names in each dir before moving to the next dir. could do it the other way around but that makes | |
227 # less sense to me. | |
228 return __get_all_configs(dirs, names) or __get_all_configs(dirs, sample_names) | |
229 | |
230 | |
231 def __default_properties(path): | |
232 return { | |
233 'here': os.path.dirname(os.path.abspath(path)), | |
234 '__file__': os.path.abspath(path) | |
235 } | |
236 | |
237 | |
238 __all__ = ('find_config_file', 'get_data_dir', 'load_app_properties', 'NicerConfigParser', 'running_from_source') |