comparison env/lib/python3.9/site-packages/galaxy/tool_util/loader_directory.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Utilities for loading and reasoning about unparsed tools in directories."""
2 import fnmatch
3 import glob
4 import logging
5 import os
6 import re
7 import sys
8
9 import yaml
10
11 from galaxy.tool_util import loader
12 from galaxy.tool_util.parser import get_tool_source
13 from galaxy.util import checkers
14
15 log = logging.getLogger(__name__)
16
17 PATH_DOES_NOT_EXIST_ERROR = "Could not load tools from path [%s] - this path does not exist."
18 PATH_AND_RECURSIVE_ERROR = "Cannot specify a single file and recursive."
19 LOAD_FAILURE_ERROR = "Failed to load tool with path %s."
20 TOOL_LOAD_ERROR = object()
21 TOOL_REGEX = re.compile(r"<tool\s")
22 DATA_MANAGER_REGEX = re.compile(r"\stool_type=\"manage_data\"")
23
24 YAML_EXTENSIONS = [".yaml", ".yml", ".json"]
25 CWL_EXTENSIONS = YAML_EXTENSIONS + [".cwl"]
26 EXCLUDE_WALK_DIRS = ['.hg', '.git', '.venv']
27
28
29 def load_exception_handler(path, exc_info):
30 """Default exception handler for use by load_tool_elements_from_path."""
31 log.warning(LOAD_FAILURE_ERROR % path, exc_info=exc_info)
32
33
34 def find_possible_tools_from_path(
35 path,
36 recursive=False,
37 enable_beta_formats=False,
38 ):
39 """Walk a directory and find potential tool files."""
40 possible_tool_files = []
41 for possible_tool_file in _find_tool_files(
42 path, recursive=recursive,
43 enable_beta_formats=enable_beta_formats
44 ):
45 try:
46 does_look_like_a_tool = looks_like_a_tool(
47 possible_tool_file,
48 enable_beta_formats=enable_beta_formats
49 )
50 except OSError:
51 # Some problem reading the tool file, skip.
52 continue
53
54 if does_look_like_a_tool:
55 possible_tool_files.append(possible_tool_file)
56
57 return possible_tool_files
58
59
60 def load_tool_sources_from_path(
61 path,
62 load_exception_handler=load_exception_handler,
63 recursive=False,
64 register_load_errors=False,
65 ):
66 """Walk a directory and ToolSource objects."""
67 return _load_tools_from_path(
68 path,
69 load_exception_handler=load_exception_handler,
70 recursive=recursive,
71 register_load_errors=register_load_errors,
72 loader_func=get_tool_source,
73 enable_beta_formats=True,
74 )
75
76
77 def load_tool_elements_from_path(
78 path,
79 load_exception_handler=load_exception_handler,
80 recursive=False,
81 register_load_errors=False,
82 ):
83 """Walk a directory and load tool XML elements."""
84 return _load_tools_from_path(
85 path,
86 load_exception_handler=load_exception_handler,
87 recursive=recursive,
88 register_load_errors=register_load_errors,
89 loader_func=loader.load_tool,
90 enable_beta_formats=False,
91 )
92
93
94 def _load_tools_from_path(
95 path,
96 load_exception_handler,
97 recursive,
98 register_load_errors,
99 loader_func,
100 enable_beta_formats,
101 ):
102 loaded_objects = []
103 for possible_tool_file in find_possible_tools_from_path(
104 path,
105 recursive=recursive,
106 enable_beta_formats=enable_beta_formats,
107 ):
108 try:
109 tool_element = loader_func(possible_tool_file)
110 loaded_objects.append((possible_tool_file, tool_element))
111 except Exception:
112 exc_info = sys.exc_info()
113 load_exception_handler(possible_tool_file, exc_info)
114 if register_load_errors:
115 loaded_objects.append((possible_tool_file, TOOL_LOAD_ERROR))
116 return loaded_objects
117
118
119 def is_tool_load_error(obj):
120 """Predicate to determine if object loaded for tool is a tool error."""
121 return obj is TOOL_LOAD_ERROR
122
123
124 def looks_like_a_tool(path_or_uri_like, invalid_names=None, enable_beta_formats=False):
125 """Quick check to see if a file looks like it may be a tool file.
126
127 Whether true in a strict sense or not, lets say the intention and
128 purpose of this procedure is to serve as a filter - all valid tools must
129 "looks_like_a_tool" but not everything that looks like a tool is actually
130 a valid tool.
131
132 invalid_names may be supplied in the context of the tool shed to quickly
133 rule common tool shed XML files.
134 """
135 invalid_names = invalid_names or []
136 path = resolved_path(path_or_uri_like)
137 if path is UNRESOLVED_URI:
138 # Assume the path maps to a real tool.
139 return True
140
141 looks = False
142
143 if os.path.basename(path) in invalid_names:
144 return False
145
146 if looks_like_a_tool_xml(path):
147 looks = True
148
149 if not looks and enable_beta_formats:
150 for tool_checker in BETA_TOOL_CHECKERS.values():
151 if tool_checker(path):
152 looks = True
153 break
154
155 return looks
156
157
158 def looks_like_xml(path, regex=TOOL_REGEX):
159 full_path = os.path.abspath(path)
160
161 if not full_path.endswith(".xml"):
162 return False
163
164 if not os.path.getsize(full_path):
165 return False
166
167 if(checkers.check_binary(full_path) or
168 checkers.check_image(full_path) or
169 checkers.is_gzip(full_path) or
170 checkers.is_bz2(full_path) or
171 checkers.is_zip(full_path)):
172 return False
173
174 with open(path, encoding='utf-8') as f:
175 try:
176 start_contents = f.read(5 * 1024)
177 except UnicodeDecodeError:
178 return False
179 if regex.search(start_contents):
180 return True
181
182 return False
183
184
185 def looks_like_a_tool_xml(path):
186 """Quick check to see if a file looks like it may be a Galaxy XML tool file."""
187 return looks_like_xml(path=path, regex=TOOL_REGEX)
188
189
190 def looks_like_a_data_manager_xml(path):
191 """Quick check to see if a file looks like it may be a Galaxy data manager XML file."""
192 return looks_like_xml(path=path, regex=DATA_MANAGER_REGEX)
193
194
195 def is_a_yaml_with_class(path, classes):
196 """Determine if a file is a valid YAML with a supplied ``class`` entry."""
197 if not _has_extension(path, YAML_EXTENSIONS):
198 return False
199
200 with open(path) as f:
201 try:
202 as_dict = yaml.safe_load(f)
203 except Exception:
204 return False
205
206 if not isinstance(as_dict, dict):
207 return False
208
209 file_class = as_dict.get("class", None)
210 return file_class in classes
211
212
213 def looks_like_a_tool_yaml(path):
214 """Quick check to see if a file looks like it may be a Galaxy YAML tool file."""
215 return is_a_yaml_with_class(path, ["GalaxyTool"])
216
217
218 def looks_like_a_cwl_artifact(path, classes=None):
219 """Quick check to see if a file looks like it may be a CWL artifact."""
220 if not _has_extension(path, CWL_EXTENSIONS):
221 return False
222
223 with open(path) as f:
224 try:
225 as_dict = yaml.safe_load(f)
226 except Exception:
227 return False
228
229 if not isinstance(as_dict, dict):
230 return False
231
232 file_class = as_dict.get("class", None)
233 if classes is not None and file_class not in classes:
234 return False
235
236 file_cwl_version = as_dict.get("cwlVersion", None)
237 return file_cwl_version is not None
238
239
240 def looks_like_a_tool_cwl(path):
241 """Quick check to see if a file looks like it may be a CWL tool."""
242 return looks_like_a_cwl_artifact(path, classes=["CommandLineTool", "ExpressionTool"])
243
244
245 def _find_tool_files(path_or_uri_like, recursive, enable_beta_formats):
246 path = resolved_path(path_or_uri_like)
247 if path is UNRESOLVED_URI:
248 # Pass the URI through and assume it maps to a real tool.
249 return [path_or_uri_like]
250
251 is_file = not os.path.isdir(path)
252 if not os.path.exists(path):
253 raise Exception(PATH_DOES_NOT_EXIST_ERROR)
254 elif is_file and recursive:
255 raise Exception(PATH_AND_RECURSIVE_ERROR)
256 elif is_file:
257 return [os.path.abspath(path)]
258 else:
259 if enable_beta_formats:
260 if not recursive:
261 files = glob.glob(path + "/*")
262 else:
263 files = _find_files(path, "*")
264 else:
265 if not recursive:
266 files = glob.glob(path + "/*.xml")
267 else:
268 files = _find_files(path, "*.xml")
269 return [os.path.abspath(_) for _ in files]
270
271
272 def _has_extension(path, extensions):
273 return any(path.endswith(e) for e in extensions)
274
275
276 def _find_files(directory, pattern='*'):
277 if not os.path.exists(directory):
278 raise ValueError(f"Directory not found {directory}")
279
280 matches = []
281 for root, dirnames, filenames in os.walk(directory):
282 # exclude some directories (like .hg) from traversing
283 dirnames[:] = [dir for dir in dirnames if dir not in EXCLUDE_WALK_DIRS]
284 for filename in filenames:
285 full_path = os.path.join(root, filename)
286 if fnmatch.filter([full_path], pattern):
287 matches.append(os.path.join(root, filename))
288 return matches
289
290
291 UNRESOLVED_URI = object()
292
293
294 def resolved_path(path_or_uri_like):
295 """If this is a simple file path, return the path else UNRESOLVED_URI."""
296 if "://" not in path_or_uri_like:
297 return path_or_uri_like
298 elif path_or_uri_like.startswith("file://"):
299 return path_or_uri_like[len("file://"):]
300 else:
301 return UNRESOLVED_URI
302
303
304 BETA_TOOL_CHECKERS = {
305 'yaml': looks_like_a_tool_yaml,
306 'cwl': looks_like_a_tool_cwl,
307 }
308
309 __all__ = (
310 "find_possible_tools_from_path",
311 "is_a_yaml_with_class",
312 "is_tool_load_error",
313 "load_tool_elements_from_path",
314 "load_tool_sources_from_path",
315 "looks_like_a_cwl_artifact",
316 "looks_like_a_tool_cwl",
317 "looks_like_a_tool_xml",
318 "looks_like_a_tool_yaml",
319 )