comparison env/lib/python3.9/site-packages/cwltool/pack.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Reformat a CWL document and all its references to be a single stream."""
2
3 import copy
4 import urllib
5 from typing import (
6 Any,
7 Callable,
8 Dict,
9 MutableMapping,
10 MutableSequence,
11 Optional,
12 Set,
13 Union,
14 cast,
15 )
16
17 from ruamel.yaml.comments import CommentedMap, CommentedSeq
18 from schema_salad.ref_resolver import Loader, SubLoader
19 from schema_salad.utils import ResolveType
20
21 from .context import LoadingContext
22 from .load_tool import fetch_document, resolve_and_validate_document
23 from .process import shortname, uniquename
24 from .update import ORDERED_VERSIONS, update
25 from .utils import CWLObjectType, CWLOutputType
26
27 LoadRefType = Callable[[Optional[str], str], ResolveType]
28
29
30 def find_run(
31 d: Union[CWLObjectType, ResolveType],
32 loadref: LoadRefType,
33 runs: Set[str],
34 ) -> None:
35 if isinstance(d, MutableSequence):
36 for s in d:
37 find_run(s, loadref, runs)
38 elif isinstance(d, MutableMapping):
39 if "run" in d and isinstance(d["run"], str):
40 if d["run"] not in runs:
41 runs.add(d["run"])
42 find_run(loadref(None, d["run"]), loadref, runs)
43 for s in d.values():
44 find_run(s, loadref, runs)
45
46
47 def find_ids(
48 d: Union[CWLObjectType, CWLOutputType, MutableSequence[CWLObjectType], None],
49 ids: Set[str],
50 ) -> None:
51 if isinstance(d, MutableSequence):
52 for s in d:
53 find_ids(cast(CWLObjectType, s), ids)
54 elif isinstance(d, MutableMapping):
55 for i in ("id", "name"):
56 if i in d and isinstance(d[i], str):
57 ids.add(cast(str, d[i]))
58 for s2 in d.values():
59 find_ids(cast(CWLOutputType, s2), ids)
60
61
62 def replace_refs(d: Any, rewrite: Dict[str, str], stem: str, newstem: str) -> None:
63 if isinstance(d, MutableSequence):
64 for s, v in enumerate(d):
65 if isinstance(v, str):
66 if v in rewrite:
67 d[s] = rewrite[v]
68 elif v.startswith(stem):
69 d[s] = newstem + v[len(stem) :]
70 rewrite[v] = d[s]
71 else:
72 replace_refs(v, rewrite, stem, newstem)
73 elif isinstance(d, MutableMapping):
74 for key, val in d.items():
75 if isinstance(val, str):
76 if val in rewrite:
77 d[key] = rewrite[val]
78 elif val.startswith(stem):
79 id_ = val[len(stem) :]
80 # prevent appending newstems if tool is already packed
81 if id_.startswith(newstem.strip("#")):
82 d[key] = "#" + id_
83 else:
84 d[key] = newstem + id_
85 rewrite[val] = d[key]
86 replace_refs(val, rewrite, stem, newstem)
87
88
89 def import_embed(
90 d: Union[MutableSequence[CWLObjectType], CWLObjectType, CWLOutputType],
91 seen: Set[str],
92 ) -> None:
93 if isinstance(d, MutableSequence):
94 for v in d:
95 import_embed(cast(CWLOutputType, v), seen)
96 elif isinstance(d, MutableMapping):
97 for n in ("id", "name"):
98 if n in d:
99 if isinstance(d[n], str):
100 ident = cast(str, d[n])
101 if ident in seen:
102 this = ident
103 d.clear()
104 d["$import"] = this
105 else:
106 this = ident
107 seen.add(this)
108 break
109
110 for k in sorted(d.keys()):
111 import_embed(cast(CWLOutputType, d[k]), seen)
112
113
114 def pack(
115 loadingContext: LoadingContext,
116 uri: str,
117 rewrite_out: Optional[Dict[str, str]] = None,
118 loader: Optional[Loader] = None,
119 ) -> CWLObjectType:
120
121 # The workflow document we have in memory right now may have been
122 # updated to the internal CWL version. We need to reload the
123 # document to go back to its original version.
124 #
125 # What's going on here is that the updater replaces the
126 # documents/fragments in the index with updated ones, the
127 # index is also used as a cache, so we need to go through the
128 # loading process with an empty index and updating turned off
129 # so we have the original un-updated documents.
130 #
131 loadingContext = loadingContext.copy()
132 document_loader = SubLoader(loader or loadingContext.loader or Loader({}))
133 loadingContext.do_update = False
134 loadingContext.loader = document_loader
135 loadingContext.loader.idx = {}
136 loadingContext.metadata = {}
137 loadingContext, docobj, uri = fetch_document(uri, loadingContext)
138 loadingContext, fileuri = resolve_and_validate_document(
139 loadingContext, docobj, uri, preprocess_only=True
140 )
141 if loadingContext.loader is None:
142 raise Exception("loadingContext.loader cannot be none")
143 processobj, metadata = loadingContext.loader.resolve_ref(uri)
144 document_loader = loadingContext.loader
145
146 if isinstance(processobj, MutableMapping):
147 document_loader.idx[processobj["id"]] = CommentedMap(processobj.items())
148 elif isinstance(processobj, MutableSequence):
149 _, frag = urllib.parse.urldefrag(uri)
150 for po in processobj:
151 if not frag:
152 if po["id"].endswith("#main"):
153 uri = po["id"]
154 document_loader.idx[po["id"]] = CommentedMap(po.items())
155 document_loader.idx[metadata["id"]] = CommentedMap(metadata.items())
156
157 found_versions = {
158 cast(str, loadingContext.metadata["cwlVersion"])
159 } # type: Set[str]
160
161 def loadref(base: Optional[str], lr_uri: str) -> ResolveType:
162 lr_loadingContext = loadingContext.copy()
163 lr_loadingContext.metadata = {}
164 lr_loadingContext, lr_workflowobj, lr_uri = fetch_document(
165 lr_uri, lr_loadingContext
166 )
167 lr_loadingContext, lr_uri = resolve_and_validate_document(
168 lr_loadingContext, lr_workflowobj, lr_uri
169 )
170 found_versions.add(cast(str, lr_loadingContext.metadata["cwlVersion"]))
171 if lr_loadingContext.loader is None:
172 raise Exception("loader should not be None")
173 return lr_loadingContext.loader.resolve_ref(lr_uri, base_url=base)[0]
174
175 ids = set() # type: Set[str]
176 find_ids(processobj, ids)
177
178 runs = {uri}
179 find_run(processobj, loadref, runs)
180
181 # Figure out the highest version, everything needs to be updated
182 # to it.
183 m = 0
184 for fv in found_versions:
185 m = max(m, ORDERED_VERSIONS.index(fv))
186 update_to_version = ORDERED_VERSIONS[m]
187
188 for f in runs:
189 find_ids(document_loader.resolve_ref(f)[0], ids)
190
191 names = set() # type: Set[str]
192 if rewrite_out is None:
193 rewrite = {} # type: Dict[str, str]
194 else:
195 rewrite = rewrite_out
196
197 mainpath, _ = urllib.parse.urldefrag(uri)
198
199 def rewrite_id(r: str, mainuri: str) -> None:
200 if r == mainuri:
201 rewrite[r] = "#main"
202 elif r.startswith(mainuri) and r[len(mainuri)] in ("#", "/"):
203 if r[len(mainuri) :].startswith("#main/"):
204 rewrite[r] = "#" + uniquename(r[len(mainuri) + 1 :], names)
205 else:
206 rewrite[r] = "#" + uniquename("main/" + r[len(mainuri) + 1 :], names)
207 else:
208 path, frag = urllib.parse.urldefrag(r)
209 if path == mainpath:
210 rewrite[r] = "#" + uniquename(frag, names)
211 else:
212 if path not in rewrite:
213 rewrite[path] = "#" + uniquename(shortname(path), names)
214
215 sortedids = sorted(ids)
216
217 for r in sortedids:
218 rewrite_id(r, uri)
219
220 packed = CommentedMap(
221 (("$graph", CommentedSeq()), ("cwlVersion", update_to_version))
222 )
223 namespaces = metadata.get("$namespaces", None)
224
225 schemas = set() # type: Set[str]
226 if "$schemas" in metadata:
227 for each_schema in metadata["$schemas"]:
228 schemas.add(each_schema)
229 for r in sorted(runs):
230 dcr, metadata = document_loader.resolve_ref(r)
231 if isinstance(dcr, CommentedSeq):
232 dcr = dcr[0]
233 dcr = cast(CommentedMap, dcr)
234 if not isinstance(dcr, MutableMapping):
235 continue
236
237 dcr = update(
238 dcr,
239 document_loader,
240 r,
241 loadingContext.enable_dev,
242 metadata,
243 update_to_version,
244 )
245
246 if "http://commonwl.org/cwltool#original_cwlVersion" in metadata:
247 del metadata["http://commonwl.org/cwltool#original_cwlVersion"]
248 if "http://commonwl.org/cwltool#original_cwlVersion" in dcr:
249 del dcr["http://commonwl.org/cwltool#original_cwlVersion"]
250
251 if "$schemas" in metadata:
252 for s in metadata["$schemas"]:
253 schemas.add(s)
254 if dcr.get("class") not in ("Workflow", "CommandLineTool", "ExpressionTool"):
255 continue
256 dc = cast(Dict[str, Any], copy.deepcopy(dcr))
257 v = rewrite[r]
258 dc["id"] = v
259 for n in ("name", "cwlVersion", "$namespaces", "$schemas"):
260 if n in dc:
261 del dc[n]
262 packed["$graph"].append(dc)
263
264 if schemas:
265 packed["$schemas"] = list(schemas)
266
267 for r in list(rewrite.keys()):
268 v = rewrite[r]
269 replace_refs(packed, rewrite, r + "/" if "#" in r else r + "#", v + "/")
270
271 import_embed(packed, set())
272
273 if len(packed["$graph"]) == 1:
274 # duplicate 'cwlVersion' and $schemas inside $graph when there is only
275 # a single item because we will print the contents inside '$graph'
276 # rather than whole dict
277 packed["$graph"][0]["cwlVersion"] = packed["cwlVersion"]
278 if schemas:
279 packed["$graph"][0]["$schemas"] = list(schemas)
280 # always include $namespaces in the #main
281 if namespaces:
282 packed["$graph"][0]["$namespaces"] = namespaces
283
284 return packed