comparison env/lib/python3.9/site-packages/galaxy/tool_util/provided_metadata.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import json
2 import logging
3 import os
4 import re
5
6 from galaxy.util import stringify_dictionary_keys
7
8 log = logging.getLogger(__name__)
9
10
11 def parse_tool_provided_metadata(meta_file, provided_metadata_style=None, job_wrapper=None):
12 """Return a ToolProvidedMetadata object for specified file path.
13
14 If meta_file is absent, return a NullToolProvidedMetadata. If provided_metadata_style is None
15 attempt to guess tool provided metadata type.
16 """
17 if not os.path.exists(meta_file):
18 return NullToolProvidedMetadata()
19 if provided_metadata_style is None:
20 provided_metadata_style = _guess_tool_provided_metadata_style(meta_file)
21
22 assert provided_metadata_style in ["legacy", "default"]
23
24 if provided_metadata_style == "legacy":
25 return LegacyToolProvidedMetadata(meta_file, job_wrapper=job_wrapper)
26 elif provided_metadata_style == "default":
27 return ToolProvidedMetadata(meta_file)
28
29
30 def _guess_tool_provided_metadata_style(path):
31 try:
32 with open(path) as f:
33 metadata = json.load(f)
34 metadata_type = metadata.get("type", None)
35 return "legacy" if metadata_type in ["dataset", "new_primary_dataset"] else "default"
36 except ValueError:
37 # Either empty or multiple JSON lines, either way we can safely treat
38 # it as legacy style.
39 return "legacy"
40
41
42 class BaseToolProvidedMetadata:
43
44 def get_new_datasets(self, output_name):
45 """Find new datasets for dataset discovery for specified output.
46
47 Return a list of such datasets.
48
49 Called only in the context of discovering datasets when
50 discover_via="tool_provided_metadata" is defined in the tool.
51 """
52 return []
53
54 def has_failed_outputs(self):
55 """Determine if generation of any of the outputs failed.
56
57 If True, this method should also log information about at least the first such failed output.
58 """
59 return False
60
61 def get_new_dataset_meta_by_basename(self, output_name, basename):
62 """For a discovered dataset, get the corresponding metadata entry.
63
64 The discovery may have been from explicit listing in this file (returned
65 from get_new_datasets) or via file regex, either way the basename of the
66 file is used to index the fetching of the metadata entry.
67 """
68 return {}
69
70 def get_unnamed_outputs(self):
71 """Return unnamed outputs dataset introduced for upload 2.0.
72
73 Needs more formal specification but see output_collect for how destinations,
74 types, elements, etc... are consumed.
75 """
76 return []
77
78 def get_dataset_meta(self, output_name, dataset_id, dataset_uuid):
79 """Return primary dataset metadata for specified output.
80 """
81 return {}
82
83 def rewrite(self):
84 """Write metadata back to the file system.
85
86 If metadata has not changed via outputs specified as mutable, the
87 implementation class may opt to not re-write the file.
88 """
89 return None
90
91 def get_new_datasets_for_metadata_collection(self):
92 """Return all datasets tracked that are not explicit primary outputs.
93 """
94 return []
95
96
97 class NullToolProvidedMetadata(BaseToolProvidedMetadata):
98 pass
99
100
101 class LegacyToolProvidedMetadata(BaseToolProvidedMetadata):
102
103 def __init__(self, meta_file, job_wrapper=None):
104 self.meta_file = meta_file
105 self.tool_provided_job_metadata = []
106
107 with open(meta_file) as f:
108 for line in f:
109 try:
110 line = stringify_dictionary_keys(json.loads(line))
111 assert 'type' in line
112 except Exception:
113 log.exception('(%s) Got JSON data from tool, but data is improperly formatted or no "type" key in data' % getattr(job_wrapper, "job_id", None))
114 log.debug('Offending data was: %s' % line)
115 continue
116 # Set the dataset id if it's a dataset entry and isn't set.
117 # This isn't insecure. We loop the job's output datasets in
118 # the finish method, so if a tool writes out metadata for a
119 # dataset id that it doesn't own, it'll just be ignored.
120 dataset_id_not_specified = line['type'] == 'dataset' and 'dataset_id' not in line
121 if dataset_id_not_specified:
122 dataset_basename = line['dataset']
123 if job_wrapper:
124 try:
125 line['dataset_id'] = job_wrapper.get_output_file_id(dataset_basename)
126 except KeyError:
127 log.warning('(%s) Tool provided job dataset-specific metadata without specifying a dataset' % job_wrapper.job_id)
128 continue
129 else:
130 match = re.match(r'(galaxy_)?dataset_(.*)\.dat', dataset_basename)
131 if match is None:
132 raise Exception("processing tool_provided_metadata (e.g. galaxy.json) entry with invalid dataset name [%s]" % dataset_basename)
133 dataset_id = match.group(2)
134 if dataset_id.isdigit():
135 line['dataset_id'] = dataset_id
136 else:
137 line['dataset_uuid'] = dataset_id
138
139 self.tool_provided_job_metadata.append(line)
140
141 def get_dataset_meta(self, output_name, dataset_id, dataset_uuid):
142 for meta in self.tool_provided_job_metadata:
143 if meta['type'] == 'dataset' and 'dataset_id' in meta and int(meta['dataset_id']) == dataset_id:
144 return meta
145 if meta['type'] == 'dataset' and 'dataset_uuid' in meta and meta['dataset_uuid'] == dataset_uuid:
146 return meta
147 return {}
148
149 def get_new_dataset_meta_by_basename(self, output_name, basename):
150 for meta in self.tool_provided_job_metadata:
151 if meta['type'] == 'new_primary_dataset' and meta['filename'] == basename:
152 return meta
153
154 def get_new_datasets(self, output_name):
155 log.warning("Called get_new_datasets with legacy tool metadata provider - that is unimplemented.")
156 return []
157
158 def has_failed_outputs(self):
159 found_failed = False
160 for meta in self.tool_provided_job_metadata:
161 if meta.get("failed", False):
162 log.info("One or more tool outputs is marked as failed (%s)." % meta)
163 found_failed = True
164
165 return found_failed
166
167 def get_unnamed_outputs(self):
168 return []
169
170 def rewrite(self):
171 with open(self.meta_file, 'wt') as job_metadata_fh:
172 for meta in self.tool_provided_job_metadata:
173 job_metadata_fh.write("%s\n" % (json.dumps(meta)))
174
175 def get_new_datasets_for_metadata_collection(self):
176 for meta in self.tool_provided_job_metadata:
177 if meta['type'] == 'new_primary_dataset':
178 yield meta
179
180
181 class ToolProvidedMetadata(BaseToolProvidedMetadata):
182
183 def __init__(self, meta_file):
184 self.meta_file = meta_file
185 with open(meta_file) as f:
186 self.tool_provided_job_metadata = json.load(f)
187
188 def get_dataset_meta(self, output_name, dataset_id, dataset_uuid):
189 return self.tool_provided_job_metadata.get(output_name, {})
190
191 def get_new_dataset_meta_by_basename(self, output_name, basename):
192 datasets = self.tool_provided_job_metadata.get(output_name, {}).get("datasets", [])
193 for meta in datasets:
194 if meta['filename'] == basename:
195 return meta
196
197 def get_new_datasets(self, output_name):
198 datasets = self.tool_provided_job_metadata.get(output_name, {}).get("datasets", [])
199 if not datasets:
200 elements = self.tool_provided_job_metadata.get(output_name, {}).get("elements", [])
201 if elements:
202 datasets = self._elements_to_datasets(elements)
203 return datasets
204
205 def _elements_to_datasets(self, elements, level=0):
206 for element in elements:
207 extra_kwds = {"identifier_%d" % level: element["name"]}
208 if "elements" in element:
209 for inner_element in self._elements_to_datasets(element["elements"], level=level + 1):
210 dataset = extra_kwds.copy()
211 dataset.update(inner_element)
212 yield dataset
213 else:
214 dataset = extra_kwds
215 extra_kwds.update(element)
216 yield extra_kwds
217
218 def has_failed_outputs(self):
219 found_failed = False
220 for output_name, meta in self.tool_provided_job_metadata.items():
221 if output_name == "__unnamed_outputs":
222 continue
223
224 if meta.get("failed", False):
225 log.info("One or more tool outputs is marked as failed (%s)." % meta)
226 found_failed = True
227
228 return found_failed
229
230 def get_unnamed_outputs(self):
231 log.debug("unnamed outputs [%s]" % self.tool_provided_job_metadata)
232 return self.tool_provided_job_metadata.get("__unnamed_outputs", [])
233
234 def rewrite(self):
235 with open(self.meta_file, 'wt') as job_metadata_fh:
236 json.dump(self.tool_provided_job_metadata, job_metadata_fh)