Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/update.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import copy | |
2 from functools import partial | |
3 from typing import ( | |
4 Callable, | |
5 Dict, | |
6 MutableMapping, | |
7 MutableSequence, | |
8 Optional, | |
9 Tuple, | |
10 Union, | |
11 cast, | |
12 ) | |
13 | |
14 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
15 from schema_salad.exceptions import ValidationException | |
16 from schema_salad.ref_resolver import Loader | |
17 from schema_salad.sourceline import SourceLine | |
18 | |
19 from .loghandler import _logger | |
20 from .utils import CWLObjectType, CWLOutputType, aslist, visit_class, visit_field | |
21 | |
22 | |
23 def v1_1to1_2( | |
24 doc: CommentedMap, loader: Loader, baseuri: str | |
25 ) -> Tuple[CommentedMap, str]: # pylint: disable=unused-argument | |
26 """Public updater for v1.1 to v1.2.""" | |
27 doc = copy.deepcopy(doc) | |
28 | |
29 upd = doc | |
30 if isinstance(upd, MutableMapping) and "$graph" in upd: | |
31 upd = cast(CommentedMap, upd["$graph"]) | |
32 for proc in aslist(upd): | |
33 if "cwlVersion" in proc: | |
34 del proc["cwlVersion"] | |
35 | |
36 return doc, "v1.2" | |
37 | |
38 | |
39 def v1_0to1_1( | |
40 doc: CommentedMap, loader: Loader, baseuri: str | |
41 ) -> Tuple[CommentedMap, str]: # pylint: disable=unused-argument | |
42 """Public updater for v1.0 to v1.1.""" | |
43 doc = copy.deepcopy(doc) | |
44 | |
45 rewrite = { | |
46 "http://commonwl.org/cwltool#WorkReuse": "WorkReuse", | |
47 "http://arvados.org/cwl#ReuseRequirement": "WorkReuse", | |
48 "http://commonwl.org/cwltool#TimeLimit": "ToolTimeLimit", | |
49 "http://commonwl.org/cwltool#NetworkAccess": "NetworkAccess", | |
50 "http://commonwl.org/cwltool#InplaceUpdateRequirement": "InplaceUpdateRequirement", | |
51 "http://commonwl.org/cwltool#LoadListingRequirement": "LoadListingRequirement", | |
52 } | |
53 | |
54 def rewrite_requirements(t: CWLObjectType) -> None: | |
55 if "requirements" in t: | |
56 for r in cast(MutableSequence[CWLObjectType], t["requirements"]): | |
57 if isinstance(r, MutableMapping): | |
58 cls = cast(str, r["class"]) | |
59 if cls in rewrite: | |
60 r["class"] = rewrite[cls] | |
61 else: | |
62 raise ValidationException( | |
63 "requirements entries must be dictionaries: {} {}.".format( | |
64 type(r), r | |
65 ) | |
66 ) | |
67 if "hints" in t: | |
68 for r in cast(MutableSequence[CWLObjectType], t["hints"]): | |
69 if isinstance(r, MutableMapping): | |
70 cls = cast(str, r["class"]) | |
71 if cls in rewrite: | |
72 r["class"] = rewrite[cls] | |
73 else: | |
74 raise ValidationException( | |
75 "hints entries must be dictionaries: {} {}.".format(type(r), r) | |
76 ) | |
77 if "steps" in t: | |
78 for s in cast(MutableSequence[CWLObjectType], t["steps"]): | |
79 if isinstance(s, MutableMapping): | |
80 rewrite_requirements(s) | |
81 else: | |
82 raise ValidationException( | |
83 "steps entries must be dictionaries: {} {}.".format(type(s), s) | |
84 ) | |
85 | |
86 def update_secondaryFiles(t, top=False): | |
87 # type: (CWLOutputType, bool) -> Union[MutableSequence[MutableMapping[str, str]], MutableMapping[str, str]] | |
88 if isinstance(t, CommentedSeq): | |
89 new_seq = copy.deepcopy(t) | |
90 for index, entry in enumerate(t): | |
91 new_seq[index] = update_secondaryFiles(entry) | |
92 return new_seq | |
93 elif isinstance(t, MutableSequence): | |
94 return CommentedSeq( | |
95 [update_secondaryFiles(cast(CWLOutputType, p)) for p in t] | |
96 ) | |
97 elif isinstance(t, MutableMapping): | |
98 return cast(MutableMapping[str, str], t) | |
99 elif top: | |
100 return CommentedSeq([CommentedMap([("pattern", t)])]) | |
101 else: | |
102 return CommentedMap([("pattern", t)]) | |
103 | |
104 def fix_inputBinding(t: CWLObjectType) -> None: | |
105 for i in cast(MutableSequence[CWLObjectType], t["inputs"]): | |
106 if "inputBinding" in i: | |
107 ib = cast(CWLObjectType, i["inputBinding"]) | |
108 for k in list(ib.keys()): | |
109 if k != "loadContents": | |
110 _logger.warning( | |
111 SourceLine(ib, k).makeError( | |
112 "Will ignore field '{}' which is not valid in {} " | |
113 "inputBinding".format(k, t["class"]) | |
114 ) | |
115 ) | |
116 del ib[k] | |
117 | |
118 visit_class(doc, ("CommandLineTool", "Workflow"), rewrite_requirements) | |
119 visit_class(doc, ("ExpressionTool", "Workflow"), fix_inputBinding) | |
120 visit_field(doc, "secondaryFiles", partial(update_secondaryFiles, top=True)) | |
121 | |
122 upd = doc | |
123 if isinstance(upd, MutableMapping) and "$graph" in upd: | |
124 upd = cast(CommentedMap, upd["$graph"]) | |
125 for proc in aslist(upd): | |
126 proc.setdefault("hints", CommentedSeq()) | |
127 proc["hints"].insert( | |
128 0, CommentedMap([("class", "NetworkAccess"), ("networkAccess", True)]) | |
129 ) | |
130 proc["hints"].insert( | |
131 0, | |
132 CommentedMap( | |
133 [("class", "LoadListingRequirement"), ("loadListing", "deep_listing")] | |
134 ), | |
135 ) | |
136 if "cwlVersion" in proc: | |
137 del proc["cwlVersion"] | |
138 | |
139 return (doc, "v1.1") | |
140 | |
141 | |
142 def v1_1_0dev1to1_1( | |
143 doc: CommentedMap, loader: Loader, baseuri: str | |
144 ) -> Tuple[CommentedMap, str]: # pylint: disable=unused-argument | |
145 """Public updater for v1.1.0-dev1 to v1.1.""" | |
146 return (doc, "v1.1") | |
147 | |
148 | |
149 def v1_2_0dev1todev2( | |
150 doc: CommentedMap, loader: Loader, baseuri: str | |
151 ) -> Tuple[CommentedMap, str]: # pylint: disable=unused-argument | |
152 """Public updater for v1.2.0-dev1 to v1.2.0-dev2.""" | |
153 return (doc, "v1.2.0-dev2") | |
154 | |
155 | |
156 def v1_2_0dev2todev3( | |
157 doc: CommentedMap, loader: Loader, baseuri: str | |
158 ) -> Tuple[CommentedMap, str]: # pylint: disable=unused-argument | |
159 """Public updater for v1.2.0-dev2 to v1.2.0-dev3.""" | |
160 doc = copy.deepcopy(doc) | |
161 | |
162 def update_pickvalue(t: CWLObjectType) -> None: | |
163 for step in cast(MutableSequence[CWLObjectType], t["steps"]): | |
164 for inp in cast(MutableSequence[CWLObjectType], step["in"]): | |
165 if "pickValue" in inp: | |
166 if inp["pickValue"] == "only_non_null": | |
167 inp["pickValue"] = "the_only_non_null" | |
168 | |
169 visit_class(doc, "Workflow", update_pickvalue) | |
170 upd = doc | |
171 if isinstance(upd, MutableMapping) and "$graph" in upd: | |
172 upd = cast(CommentedMap, upd["$graph"]) | |
173 for proc in aslist(upd): | |
174 if "cwlVersion" in proc: | |
175 del proc["cwlVersion"] | |
176 return (doc, "v1.2.0-dev3") | |
177 | |
178 | |
179 def v1_2_0dev3todev4( | |
180 doc: CommentedMap, loader: Loader, baseuri: str | |
181 ) -> Tuple[CommentedMap, str]: # pylint: disable=unused-argument | |
182 """Public updater for v1.2.0-dev3 to v1.2.0-dev4.""" | |
183 return (doc, "v1.2.0-dev4") | |
184 | |
185 | |
186 def v1_2_0dev4todev5( | |
187 doc: CommentedMap, loader: Loader, baseuri: str | |
188 ) -> Tuple[CommentedMap, str]: # pylint: disable=unused-argument | |
189 """Public updater for v1.2.0-dev4 to v1.2.0-dev5.""" | |
190 return (doc, "v1.2.0-dev5") | |
191 | |
192 | |
193 def v1_2_0dev5to1_2( | |
194 doc: CommentedMap, loader: Loader, baseuri: str | |
195 ) -> Tuple[CommentedMap, str]: # pylint: disable=unused-argument | |
196 """Public updater for v1.2.0-dev5 to v1.2.""" | |
197 return (doc, "v1.2") | |
198 | |
199 | |
200 ORDERED_VERSIONS = [ | |
201 "v1.0", | |
202 "v1.1.0-dev1", | |
203 "v1.1", | |
204 "v1.2.0-dev1", | |
205 "v1.2.0-dev2", | |
206 "v1.2.0-dev3", | |
207 "v1.2.0-dev4", | |
208 "v1.2.0-dev5", | |
209 "v1.2", | |
210 ] | |
211 | |
212 UPDATES = { | |
213 "v1.0": v1_0to1_1, | |
214 "v1.1": v1_1to1_2, | |
215 "v1.2": None, | |
216 } # type: Dict[str, Optional[Callable[[CommentedMap, Loader, str], Tuple[CommentedMap, str]]]] | |
217 | |
218 DEVUPDATES = { | |
219 "v1.1.0-dev1": v1_1_0dev1to1_1, | |
220 "v1.2.0-dev1": v1_2_0dev1todev2, | |
221 "v1.2.0-dev2": v1_2_0dev2todev3, | |
222 "v1.2.0-dev3": v1_2_0dev3todev4, | |
223 "v1.2.0-dev4": v1_2_0dev4todev5, | |
224 "v1.2.0-dev5": v1_2_0dev5to1_2, | |
225 } # type: Dict[str, Optional[Callable[[CommentedMap, Loader, str], Tuple[CommentedMap, str]]]] | |
226 | |
227 | |
228 ALLUPDATES = UPDATES.copy() | |
229 ALLUPDATES.update(DEVUPDATES) | |
230 | |
231 INTERNAL_VERSION = "v1.2" | |
232 | |
233 ORIGINAL_CWLVERSION = "http://commonwl.org/cwltool#original_cwlVersion" | |
234 | |
235 | |
236 def identity( | |
237 doc: CommentedMap, loader: Loader, baseuri: str | |
238 ) -> Tuple[CommentedMap, str]: # pylint: disable=unused-argument | |
239 """Do-nothing, CWL document upgrade function.""" | |
240 return (doc, cast(str, doc["cwlVersion"])) | |
241 | |
242 | |
243 def checkversion( | |
244 doc: Union[CommentedSeq, CommentedMap], | |
245 metadata: CommentedMap, | |
246 enable_dev: bool, | |
247 ) -> Tuple[CommentedMap, str]: | |
248 """Check the validity of the version of the give CWL document. | |
249 | |
250 Returns the document and the validated version string. | |
251 """ | |
252 cdoc = None # type: Optional[CommentedMap] | |
253 if isinstance(doc, CommentedSeq): | |
254 if not isinstance(metadata, CommentedMap): | |
255 raise Exception("Expected metadata to be CommentedMap") | |
256 lc = metadata.lc | |
257 metadata = copy.deepcopy(metadata) | |
258 metadata.lc.data = copy.copy(lc.data) | |
259 metadata.lc.filename = lc.filename | |
260 metadata["$graph"] = doc | |
261 cdoc = metadata | |
262 elif isinstance(doc, CommentedMap): | |
263 cdoc = doc | |
264 else: | |
265 raise Exception("Expected CommentedMap or CommentedSeq") | |
266 | |
267 version = metadata["cwlVersion"] | |
268 cdoc["cwlVersion"] = version | |
269 | |
270 updated_from = metadata.get(ORIGINAL_CWLVERSION) or cdoc.get(ORIGINAL_CWLVERSION) | |
271 | |
272 if updated_from: | |
273 if version != INTERNAL_VERSION: | |
274 raise ValidationException( | |
275 "original_cwlVersion is set (%s) but cwlVersion is '%s', expected '%s' " | |
276 % (updated_from, version, INTERNAL_VERSION) | |
277 ) | |
278 elif version not in UPDATES: | |
279 if version in DEVUPDATES: | |
280 if enable_dev: | |
281 pass | |
282 else: | |
283 keys = list(UPDATES.keys()) | |
284 keys.sort() | |
285 raise ValidationException( | |
286 "Version '%s' is a development or deprecated version.\n " | |
287 "Update your document to a stable version (%s) or use " | |
288 "--enable-dev to enable support for development and " | |
289 "deprecated versions." % (version, ", ".join(keys)) | |
290 ) | |
291 else: | |
292 raise ValidationException("Unrecognized version %s" % version) | |
293 | |
294 return (cdoc, version) | |
295 | |
296 | |
297 def update( | |
298 doc: Union[CommentedSeq, CommentedMap], | |
299 loader: Loader, | |
300 baseuri: str, | |
301 enable_dev: bool, | |
302 metadata: CommentedMap, | |
303 update_to: Optional[str] = None, | |
304 ) -> CommentedMap: | |
305 """Update a CWL document to 'update_to' (if provided) or INTERNAL_VERSION.""" | |
306 if update_to is None: | |
307 update_to = INTERNAL_VERSION | |
308 | |
309 (cdoc, version) = checkversion(doc, metadata, enable_dev) | |
310 originalversion = copy.copy(version) | |
311 | |
312 nextupdate = ( | |
313 identity | |
314 ) # type: Optional[Callable[[CommentedMap, Loader, str], Tuple[CommentedMap, str]]] | |
315 | |
316 while version != update_to and nextupdate: | |
317 (cdoc, version) = nextupdate(cdoc, loader, baseuri) | |
318 nextupdate = ALLUPDATES[version] | |
319 | |
320 cdoc["cwlVersion"] = version | |
321 metadata["cwlVersion"] = version | |
322 metadata["http://commonwl.org/cwltool#original_cwlVersion"] = originalversion | |
323 cdoc["http://commonwl.org/cwltool#original_cwlVersion"] = originalversion | |
324 | |
325 return cdoc |