Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/galaxy/util/properties.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 """ Module used to blend ini, environment, and explicit dictionary properties | |
| 2 to determine application configuration. Some hard coded defaults for Galaxy but | |
| 3 this should be reusable by tool shed and pulsar as well. | |
| 4 """ | |
| 5 import os | |
| 6 import os.path | |
| 7 import sys | |
| 8 from configparser import ConfigParser | |
| 9 from functools import partial | |
| 10 from itertools import product, starmap | |
| 11 | |
| 12 import yaml | |
| 13 | |
| 14 from galaxy.exceptions import InvalidFileFormatError | |
| 15 from galaxy.util.path import extensions, has_ext, joinext | |
| 16 | |
| 17 | |
| 18 def find_config_file(names, exts=None, dirs=None, include_samples=False): | |
| 19 """Locate a config file in multiple directories, with multiple extensions. | |
| 20 | |
| 21 >>> from shutil import rmtree | |
| 22 >>> from tempfile import mkdtemp | |
| 23 >>> def touch(d, f): | |
| 24 ... open(os.path.join(d, f), 'w').close() | |
| 25 >>> def _find_config_file(*args, **kwargs): | |
| 26 ... return find_config_file(*args, **kwargs).replace(d, '') | |
| 27 >>> d = mkdtemp() | |
| 28 >>> d1 = os.path.join(d, 'd1') | |
| 29 >>> d2 = os.path.join(d, 'd2') | |
| 30 >>> os.makedirs(d1) | |
| 31 >>> os.makedirs(d2) | |
| 32 >>> touch(d1, 'foo.ini') | |
| 33 >>> touch(d1, 'foo.bar') | |
| 34 >>> touch(d1, 'baz.ini.sample') | |
| 35 >>> touch(d2, 'foo.yaml') | |
| 36 >>> touch(d2, 'baz.yml') | |
| 37 >>> _find_config_file('foo', dirs=(d1, d2)) | |
| 38 '/d1/foo.ini' | |
| 39 >>> _find_config_file('baz', dirs=(d1, d2)) | |
| 40 '/d2/baz.yml' | |
| 41 >>> _find_config_file('baz', dirs=(d1, d2), include_samples=True) | |
| 42 '/d2/baz.yml' | |
| 43 >>> _find_config_file('baz', dirs=(d1,), include_samples=True) | |
| 44 '/d1/baz.ini.sample' | |
| 45 >>> _find_config_file('foo', dirs=(d2, d1)) | |
| 46 '/d2/foo.yaml' | |
| 47 >>> find_config_file('quux', dirs=(d,)) | |
| 48 >>> _find_config_file('foo', exts=('bar', 'ini'), dirs=(d1,)) | |
| 49 '/d1/foo.bar' | |
| 50 >>> rmtree(d) | |
| 51 """ | |
| 52 found = __find_config_files( | |
| 53 names, | |
| 54 exts=exts or extensions['yaml'] + extensions['ini'], | |
| 55 dirs=dirs or [os.getcwd(), os.path.join(os.getcwd(), 'config')], | |
| 56 include_samples=include_samples, | |
| 57 ) | |
| 58 if not found: | |
| 59 return None | |
| 60 # doesn't really make sense to log here but we should probably generate a warning of some kind if more than one | |
| 61 # config is found. | |
| 62 return found[0] | |
| 63 | |
| 64 | |
| 65 def load_app_properties( | |
| 66 kwds=None, | |
| 67 ini_file=None, | |
| 68 ini_section=None, | |
| 69 config_file=None, | |
| 70 config_section=None, | |
| 71 config_prefix="GALAXY_CONFIG_" | |
| 72 ): | |
| 73 if config_file is None: | |
| 74 config_file = ini_file | |
| 75 config_section = ini_section | |
| 76 | |
| 77 # read from file or init w/no file | |
| 78 if config_file: | |
| 79 properties = read_properties_from_file(config_file, config_section) | |
| 80 else: | |
| 81 properties = {'__file__': None} | |
| 82 | |
| 83 # update from kwds | |
| 84 if kwds: | |
| 85 properties.update(kwds) | |
| 86 | |
| 87 # update from env | |
| 88 override_prefix = "%sOVERRIDE_" % config_prefix | |
| 89 for key in os.environ: | |
| 90 if key.startswith(override_prefix): | |
| 91 config_key = key[len(override_prefix):].lower() | |
| 92 properties[config_key] = os.environ[key] | |
| 93 elif key.startswith(config_prefix): | |
| 94 config_key = key[len(config_prefix):].lower() | |
| 95 if config_key not in properties: | |
| 96 properties[config_key] = os.environ[key] | |
| 97 | |
| 98 return properties | |
| 99 | |
| 100 | |
| 101 def read_properties_from_file(config_file, config_section=None): | |
| 102 properties = {} | |
| 103 if has_ext(config_file, 'yaml', aliases=True, ignore='sample'): | |
| 104 if config_section is None: | |
| 105 config_section = "galaxy" | |
| 106 properties.update(__default_properties(config_file)) | |
| 107 raw_properties = _read_from_yaml_file(config_file) | |
| 108 if raw_properties: | |
| 109 properties.update(raw_properties.get(config_section) or {}) | |
| 110 elif has_ext(config_file, 'ini', aliases=True, ignore='sample'): | |
| 111 if config_section is None: | |
| 112 config_section = "app:main" | |
| 113 parser = nice_config_parser(config_file) # default properties loaded w/parser | |
| 114 if parser.has_section(config_section): | |
| 115 properties.update(dict(parser.items(config_section))) | |
| 116 else: | |
| 117 properties.update(parser.defaults()) | |
| 118 else: | |
| 119 raise InvalidFileFormatError("File '%s' doesn't have a supported extension" % config_file) | |
| 120 return properties | |
| 121 | |
| 122 | |
| 123 def _read_from_yaml_file(path): | |
| 124 with open(path) as f: | |
| 125 return yaml.safe_load(f) | |
| 126 | |
| 127 | |
| 128 def nice_config_parser(path): | |
| 129 parser = NicerConfigParser(path, defaults=__default_properties(path)) | |
| 130 parser.optionxform = str # Don't lower-case keys | |
| 131 with open(path) as f: | |
| 132 parser.read_file(f) | |
| 133 return parser | |
| 134 | |
| 135 | |
| 136 class NicerConfigParser(ConfigParser): | |
| 137 | |
| 138 def __init__(self, filename, *args, **kw): | |
| 139 ConfigParser.__init__(self, *args, **kw) | |
| 140 self.filename = filename | |
| 141 if hasattr(self, '_interpolation'): | |
| 142 self._interpolation = self.InterpolateWrapper(self._interpolation) | |
| 143 | |
| 144 read_file = getattr(ConfigParser, 'read_file', ConfigParser.readfp) | |
| 145 read_file.__doc__ = "" | |
| 146 | |
| 147 def defaults(self): | |
| 148 """Return the defaults, with their values interpolated (with the | |
| 149 defaults dict itself) | |
| 150 | |
| 151 Mainly to support defaults using values such as %(here)s | |
| 152 """ | |
| 153 defaults = ConfigParser.defaults(self).copy() | |
| 154 for key, val in defaults.items(): | |
| 155 defaults[key] = self.get('DEFAULT', key) or val | |
| 156 return defaults | |
| 157 | |
| 158 def _interpolate(self, section, option, rawval, vars): | |
| 159 # Python < 3.2 | |
| 160 try: | |
| 161 return ConfigParser._interpolate( | |
| 162 self, section, option, rawval, vars) | |
| 163 except Exception: | |
| 164 e = sys.exc_info()[1] | |
| 165 args = list(e.args) | |
| 166 args[0] = f'Error in file {self.filename}: {e}' | |
| 167 e.args = tuple(args) | |
| 168 e.message = args[0] | |
| 169 raise | |
| 170 | |
| 171 class InterpolateWrapper: | |
| 172 # Python >= 3.2 | |
| 173 def __init__(self, original): | |
| 174 self._original = original | |
| 175 | |
| 176 def __getattr__(self, name): | |
| 177 return getattr(self._original, name) | |
| 178 | |
| 179 def before_get(self, parser, section, option, value, defaults): | |
| 180 try: | |
| 181 return self._original.before_get(parser, section, option, | |
| 182 value, defaults) | |
| 183 except Exception: | |
| 184 e = sys.exc_info()[1] | |
| 185 args = list(e.args) | |
| 186 args[0] = f'Error in file {parser.filename}: {e}' | |
| 187 e.args = tuple(args) | |
| 188 e.message = args[0] | |
| 189 raise | |
| 190 | |
| 191 | |
| 192 def _running_from_source(): | |
| 193 paths = ['run.sh', 'lib/galaxy/__init__.py', 'scripts/common_startup.sh'] | |
| 194 return all(map(os.path.exists, paths)) | |
| 195 | |
| 196 | |
| 197 running_from_source = _running_from_source() | |
| 198 | |
| 199 | |
| 200 def get_data_dir(properties): | |
| 201 data_dir = properties.get('data_dir', None) | |
| 202 if data_dir is None: | |
| 203 if running_from_source: | |
| 204 data_dir = './database' | |
| 205 else: | |
| 206 config_dir = properties.get('config_dir', os.path.dirname(properties['__file__'])) | |
| 207 data_dir = os.path.join(config_dir, 'data') | |
| 208 return data_dir | |
| 209 | |
| 210 | |
| 211 def __get_all_configs(dirs, names): | |
| 212 return list(filter(os.path.exists, starmap(os.path.join, product(dirs, names)))) | |
| 213 | |
| 214 | |
| 215 def __find_config_files(names, exts=None, dirs=None, include_samples=False): | |
| 216 sample_names = [] | |
| 217 if isinstance(names, str): | |
| 218 names = [names] | |
| 219 if not dirs: | |
| 220 dirs = [os.getcwd()] | |
| 221 if exts: | |
| 222 # add exts to names, converts back into a list because it's going to be small and we might consume names twice | |
| 223 names = list(starmap(joinext, product(names, exts))) | |
| 224 if include_samples: | |
| 225 sample_names = map(partial(joinext, ext='sample'), names) | |
| 226 # check for all names in each dir before moving to the next dir. could do it the other way around but that makes | |
| 227 # less sense to me. | |
| 228 return __get_all_configs(dirs, names) or __get_all_configs(dirs, sample_names) | |
| 229 | |
| 230 | |
| 231 def __default_properties(path): | |
| 232 return { | |
| 233 'here': os.path.dirname(os.path.abspath(path)), | |
| 234 '__file__': os.path.abspath(path) | |
| 235 } | |
| 236 | |
| 237 | |
| 238 __all__ = ('find_config_file', 'get_data_dir', 'load_app_properties', 'NicerConfigParser', 'running_from_source') |
