Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/schema_salad/python_codegen.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
"""Python code generator for a given schema salad definition.""" from io import StringIO from typing import ( IO, Any, Dict, List, MutableMapping, MutableSequence, Optional, Set, Union, ) from pkg_resources import resource_stream from . import schema from .codegen_base import CodeGenBase, TypeDef from .exceptions import SchemaException from .schema import shortname prims = { "http://www.w3.org/2001/XMLSchema#string": TypeDef( "strtype", "_PrimitiveLoader((str, str))" ), "http://www.w3.org/2001/XMLSchema#int": TypeDef("inttype", "_PrimitiveLoader(int)"), "http://www.w3.org/2001/XMLSchema#long": TypeDef( "inttype", "_PrimitiveLoader(int)" ), "http://www.w3.org/2001/XMLSchema#float": TypeDef( "floattype", "_PrimitiveLoader(float)" ), "http://www.w3.org/2001/XMLSchema#double": TypeDef( "floattype", "_PrimitiveLoader(float)" ), "http://www.w3.org/2001/XMLSchema#boolean": TypeDef( "booltype", "_PrimitiveLoader(bool)" ), "https://w3id.org/cwl/salad#null": TypeDef( "None_type", "_PrimitiveLoader(type(None))" ), "https://w3id.org/cwl/salad#Any": TypeDef("Any_type", "_AnyLoader()"), } class PythonCodeGen(CodeGenBase): """Generation of Python code for a given Schema Salad definition.""" def __init__(self, out: IO[str], copyright: Optional[str]) -> None: super().__init__() self.out = out self.current_class_is_abstract = False self.serializer = StringIO() self.idfield = "" self.copyright = copyright @staticmethod def safe_name(name): # type: (str) -> str avn = schema.avro_name(name) if avn in ("class", "in"): # reserved words avn = avn + "_" return avn def prologue(self): # type: () -> None self.out.write( """# # This file was autogenerated using schema-salad-tool --codegen=python # The code itself is released under the Apache 2.0 license and the help text is # subject to the license of the original schema. """ ) if self.copyright: self.out.write( """ # # The original schema is {copyright}. """.format( copyright=self.copyright ) ) stream = resource_stream(__name__, "python_codegen_support.py") self.out.write(stream.read().decode("UTF-8")) stream.close() self.out.write("\n\n") for primative in prims.values(): self.declare_type(primative) def begin_class( self, # pylint: disable=too-many-arguments classname: str, extends: MutableSequence[str], doc: str, abstract: bool, field_names: MutableSequence[str], idfield: str, optional_fields: Set[str], ): # type: (...) -> None classname = self.safe_name(classname) if extends: ext = ", ".join(self.safe_name(e) for e in extends) else: ext = "Savable" self.out.write(f"class {classname}({ext}):\n") if doc: self.out.write(' """\n') self.out.write(str(doc)) self.out.write('\n """\n') self.serializer = StringIO() self.current_class_is_abstract = abstract if self.current_class_is_abstract: self.out.write(" pass\n\n\n") return required_field_names = [f for f in field_names if f not in optional_fields] optional_field_names = [f for f in field_names if f in optional_fields] safe_inits = [" self,"] # type: List[str] safe_inits.extend( [ " {}, # type: Any".format(self.safe_name(f)) for f in required_field_names if f != "class" ] ) safe_inits.extend( [ " {}=None, # type: Any".format(self.safe_name(f)) for f in optional_field_names if f != "class" ] ) self.out.write( " def __init__(\n" + "\n".join(safe_inits) + "\n extension_fields=None, " + "# type: Optional[Dict[str, Any]]" + "\n loadingOptions=None # type: Optional[LoadingOptions]" + "\n ): # type: (...) -> None\n" + """ if extension_fields: self.extension_fields = extension_fields else: self.extension_fields = yaml.comments.CommentedMap() if loadingOptions: self.loadingOptions = loadingOptions else: self.loadingOptions = LoadingOptions() """ ) field_inits = "" for name in field_names: if name == "class": field_inits += """ self.class_ = "{}" """.format( classname ) else: field_inits += """ self.{0} = {0} """.format( self.safe_name(name) ) self.out.write( field_inits + """ @classmethod def fromDoc(cls, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> {} _doc = copy.copy(doc) if hasattr(doc, 'lc'): _doc.lc.data = doc.lc.data _doc.lc.filename = doc.lc.filename _errors__ = [] """.format( classname ) ) self.idfield = idfield self.serializer.write( """ def save(self, top=False, base_url="", relative_uris=True): # type: (bool, str, bool) -> Dict[str, Any] r = yaml.comments.CommentedMap() # type: Dict[str, Any] for ef in self.extension_fields: r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef] """ ) if "class" in field_names: self.out.write( """ if _doc.get('class') != '{class_}': raise ValidationException("Not a {class_}") """.format( class_=classname ) ) self.serializer.write( """ r['class'] = '{class_}' """.format( class_=classname ) ) def end_class(self, classname, field_names): # type: (str, List[str]) -> None if self.current_class_is_abstract: return self.out.write( """ extension_fields = yaml.comments.CommentedMap() for k in _doc.keys(): if k not in cls.attrs: if ":" in k: ex = expand_url(k, "", loadingOptions, scoped_id=False, vocab_term=False) extension_fields[ex] = _doc[k] else: _errors__.append( ValidationException( "invalid field `{{}}`, expected one of: {attrstr}".format(k), SourceLine(_doc, k, str) ) ) break if _errors__: raise ValidationException(\"Trying '{class_}'\", None, _errors__) """.format( attrstr=", ".join([f"`{f}`" for f in field_names]), class_=self.safe_name(classname), ) ) self.serializer.write( """ # top refers to the directory level if top: if self.loadingOptions.namespaces: r["$namespaces"] = self.loadingOptions.namespaces if self.loadingOptions.schemas: r["$schemas"] = self.loadingOptions.schemas """ ) self.serializer.write(" return r\n\n") self.serializer.write(f" attrs = frozenset({field_names})\n") safe_init_fields = [ self.safe_name(f) for f in field_names if f != "class" ] # type: List[str] safe_inits = [f + "=" + f for f in safe_init_fields] safe_inits.extend( ["extension_fields=extension_fields", "loadingOptions=loadingOptions"] ) self.out.write(" return cls(" + ", ".join(safe_inits) + ")\n") self.out.write(str(self.serializer.getvalue())) self.out.write("\n\n") def type_loader(self, type_declaration): # type: (Union[List[Any], Dict[str, Any], str]) -> TypeDef if isinstance(type_declaration, MutableSequence): sub = [self.type_loader(i) for i in type_declaration] return self.declare_type( TypeDef( "union_of_{}".format("_or_".join(s.name for s in sub)), "_UnionLoader(({},))".format(", ".join(s.name for s in sub)), ) ) if isinstance(type_declaration, MutableMapping): if type_declaration["type"] in ( "array", "https://w3id.org/cwl/salad#array", ): i = self.type_loader(type_declaration["items"]) return self.declare_type( TypeDef(f"array_of_{i.name}", f"_ArrayLoader({i.name})") ) if type_declaration["type"] in ("enum", "https://w3id.org/cwl/salad#enum"): for sym in type_declaration["symbols"]: self.add_vocab(shortname(sym), sym) return self.declare_type( TypeDef( self.safe_name(type_declaration["name"]) + "Loader", '_EnumLoader(("{}",))'.format( '", "'.join( self.safe_name(sym) for sym in type_declaration["symbols"] ) ), ) ) if type_declaration["type"] in ( "record", "https://w3id.org/cwl/salad#record", ): return self.declare_type( TypeDef( self.safe_name(type_declaration["name"]) + "Loader", "_RecordLoader({})".format( self.safe_name(type_declaration["name"]) ), ) ) raise SchemaException("wft {}".format(type_declaration["type"])) if type_declaration in prims: return prims[type_declaration] if type_declaration in ("Expression", "https://w3id.org/cwl/cwl#Expression"): return self.declare_type( TypeDef( self.safe_name(type_declaration) + "Loader", "_ExpressionLoader(str)", ) ) return self.collected_types[self.safe_name(type_declaration) + "Loader"] def declare_id_field( self, name: str, fieldtype: TypeDef, doc: str, optional: bool, subscope: Optional[str], ) -> None: if self.current_class_is_abstract: return self.declare_field(name, fieldtype, doc, True) if optional: opt = """{safename} = "_:" + str(_uuid__.uuid4())""".format( safename=self.safe_name(name) ) else: opt = """raise ValidationException("Missing {fieldname}")""".format( fieldname=shortname(name) ) if subscope is not None: name = name + subscope self.out.write( """ if {safename} is None: if docRoot is not None: {safename} = docRoot else: {opt} baseuri = {safename} """.format( safename=self.safe_name(name), opt=opt ) ) def declare_field( self, name: str, fieldtype: TypeDef, doc: Optional[str], optional: bool ) -> None: if self.current_class_is_abstract: return if shortname(name) == "class": return if optional: self.out.write( " if '{fieldname}' in _doc:\n".format(fieldname=shortname(name)) ) spc = " " else: spc = "" self.out.write( """{spc} try: {spc} {safename} = load_field(_doc.get( {spc} '{fieldname}'), {fieldtype}, baseuri, loadingOptions) {spc} except ValidationException as e: {spc} _errors__.append( {spc} ValidationException( {spc} \"the `{fieldname}` field is not valid because:\", {spc} SourceLine(_doc, '{fieldname}', str), {spc} [e] {spc} ) {spc} ) """.format( safename=self.safe_name(name), fieldname=shortname(name), fieldtype=fieldtype.name, spc=spc, ) ) if optional: self.out.write( """ else: {safename} = None """.format( safename=self.safe_name(name) ) ) if name == self.idfield or not self.idfield: baseurl = "base_url" else: baseurl = "self.{}".format(self.safe_name(self.idfield)) if fieldtype.is_uri: self.serializer.write( """ if self.{safename} is not None: u = save_relative_uri( self.{safename}, {baseurl}, {scoped_id}, {ref_scope}, relative_uris) if u: r['{fieldname}'] = u """.format( safename=self.safe_name(name), fieldname=shortname(name).strip(), baseurl=baseurl, scoped_id=fieldtype.scoped_id, ref_scope=fieldtype.ref_scope, ) ) else: self.serializer.write( """ if self.{safename} is not None: r['{fieldname}'] = save( self.{safename}, top=False, base_url={baseurl}, relative_uris=relative_uris) """.format( safename=self.safe_name(name), fieldname=shortname(name), baseurl=baseurl, ) ) def uri_loader(self, inner, scoped_id, vocab_term, ref_scope): # type: (TypeDef, bool, bool, Union[int, None]) -> TypeDef return self.declare_type( TypeDef( f"uri_{inner.name}_{scoped_id}_{vocab_term}_{ref_scope}", "_URILoader({}, {}, {}, {})".format( inner.name, scoped_id, vocab_term, ref_scope ), is_uri=True, scoped_id=scoped_id, ref_scope=ref_scope, ) ) def idmap_loader(self, field, inner, map_subject, map_predicate): # type: (str, TypeDef, str, Union[str, None]) -> TypeDef return self.declare_type( TypeDef( "idmap_{}_{}".format(self.safe_name(field), inner.name), "_IdMapLoader({}, '{}', '{}')".format( inner.name, map_subject, map_predicate ), ) ) def typedsl_loader(self, inner, ref_scope): # type: (TypeDef, Union[int, None]) -> TypeDef return self.declare_type( TypeDef( f"typedsl_{inner.name}_{ref_scope}", f"_TypeDSLLoader({inner.name}, {ref_scope})", ) ) def secondaryfilesdsl_loader(self, inner): # type: (TypeDef) -> TypeDef return self.declare_type( TypeDef( f"secondaryfilesdsl_{inner.name}", f"_SecondaryDSLLoader({inner.name})", ) ) def epilogue(self, root_loader): # type: (TypeDef) -> None self.out.write("_vocab = {\n") for k in sorted(self.vocab.keys()): self.out.write(' "{}": "{}",\n'.format(k, self.vocab[k])) self.out.write("}\n") self.out.write("_rvocab = {\n") for k in sorted(self.vocab.keys()): self.out.write(' "{}": "{}",\n'.format(self.vocab[k], k)) self.out.write("}\n\n") for _, collected_type in self.collected_types.items(): self.out.write(f"{collected_type.name} = {collected_type.init}\n") self.out.write("\n") self.out.write( """ def load_document(doc, baseuri=None, loadingOptions=None): # type: (Any, Optional[str], Optional[LoadingOptions]) -> Any if baseuri is None: baseuri = file_uri(os.getcwd()) + "/" if loadingOptions is None: loadingOptions = LoadingOptions() return _document_load(%(name)s, doc, baseuri, loadingOptions) def load_document_by_string(string, uri, loadingOptions=None): # type: (Any, str, Optional[LoadingOptions]) -> Any result = yaml.main.round_trip_load(string, preserve_quotes=True) add_lc_filename(result, uri) if loadingOptions is None: loadingOptions = LoadingOptions(fileuri=uri) loadingOptions.idx[uri] = result return _document_load(%(name)s, result, uri, loadingOptions) def load_document_by_yaml(yaml, uri, loadingOptions=None): # type: (Any, str, Optional[LoadingOptions]) -> Any '''Shortcut to load via a YAML object. yaml: must be from ruamel.yaml.main.round_trip_load with preserve_quotes=True ''' add_lc_filename(yaml, uri) if loadingOptions is None: loadingOptions = LoadingOptions(fileuri=uri) loadingOptions.idx[uri] = yaml return _document_load(%(name)s, yaml, uri, loadingOptions) """ % dict(name=root_loader.name) )