"""
Basic handling for embedded procedures.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import functools
from gavo import base
from gavo import utils
from gavo.rscdef import common
from gavo.rscdef import rmkfuncs
# Move this one to utils?
[docs]def unionByKey(*sequences):
"""returns all items in sequences uniqued by the items' key attributes.
The order of the sequence items is not maintained, but items in
later sequences override those in earlier ones.
"""
allItems = {}
for seq in sequences:
for item in seq:
allItems[item.key] = item
return list(allItems.values())
[docs]class RDParameter(base.Structure):
"""A base class for parameters.
"""
_name = base.UnicodeAttribute("key", default=base.Undefined,
description="The name of the parameter", copyable=True, strip=True,
aliases=["name"])
_descr = base.NWUnicodeAttribute("description", default=None,
description="Some human-readable description of what the"
" parameter is about", copyable=True, strip=True)
_expr = base.DataContent(description="The default for the parameter."
" The special value __NULL__ indicates a NULL (python None) as usual."
" An empty content means a non-preset parameter, which must be filled"
" in applications. The magic value __EMPTY__ allows presetting an"
" empty string.",
copyable=True, strip=True, default=base.NotGiven)
_late = base.BooleanAttribute("late", default=False,
description="Bind the name not at setup time but at applying"
" time. In rowmaker procedures, for example, this allows you to"
" refer to variables like vars or rowIter in the bindings.")
[docs] def isDefaulted(self):
return self.content_ is not base.NotGiven
[docs] def validate(self):
super().validate()
if not utils.identifierPattern.match(self.key):
raise base.LiteralParseError("name", self.key, hint=
"The name you supplied was not defined by any procedure definition.")
[docs] def completeElement(self, ctx):
if self.content_=="__EMPTY__":
self.content_ = ""
super().completeElement(ctx)
[docs]class AliasableRDParameter(RDParameter):
"""A base class for RD parameter definitions that support aliases.
"""
_alias = base.UnicodeAttribute("alias", default=None,
description="A deprecated name for the parameter",
copyable=True, strip=True)
[docs]class ProcPar(AliasableRDParameter):
"""A parameter of a procedure definition.
Bodies of ProcPars are interpreted as python expressions, in
which macros are expanded in the context of the procedure application's
parent. If a body is empty, the parameter has no default and has
to be filled by the procedure application.
"""
name_ = "par"
[docs] def validate(self):
super().validate()
# Allow non-python syntax when things look like macro calls.
if (self.content_
and not "\\" in self.content_):
utils.ensureExpression(
common.replaceProcDefAt(self.content_), self.key)
[docs]class Binding(ProcPar):
"""A binding of a procedure definition parameter to a concrete value.
The value to set is contained in the binding body in the form of
a python expression. The body must not be empty.
"""
name_ = "bind"
[docs] def validate(self):
super().validate()
if not self.content_ or not self.content_.strip():
raise base.StructureError("Binding bodies must not be empty.")
[docs]class ProcSetup(base.Structure):
"""Prescriptions for setting up a namespace for a procedure application.
You can add names to this namespace you using par(ameter)s.
If a parameter has no default and an procedure application does
not provide them, an error is raised.
You can also add names by providing a code attribute containing
a python function body in code. Within, the parameters are
available. The procedure application's parent can be accessed
as parent. All names you define in the code are available as
globals to the procedure body.
Caution: Macros are expanded within the code; this means you
need double backslashes if you want a single backslash in python
code.
"""
name_ = "setup"
_code = base.ListOfAtomsAttribute("codeFrags",
description="Python function bodies setting globals for the function"
" application. Macros are expanded in the context"
" of the procedure's parent.",
itemAttD=base.UnicodeAttribute("code", description="Python function"
" bodies setting globals for the function application. Macros"
" are expanded in the context of the procedure's parent.",
copyable=True),
copyable=True)
_pars = base.StructListAttribute("pars", ProcPar,
description="Names to add to the procedure's global namespace.",
copyable=True)
_imports = base.UnicodeAttribute("imports",
description="A list of comma-separated imports to put into the"
" code's namespace. Dottesd specs like a.b.c are converted to"
" ``from a.b import c``",
default=base.NotGiven,
copyable=True)
_original = base.OriginalAttribute()
def _getParSettingCode(self, useLate, indent, bindings):
"""returns code that sets our parameters.
If useLate is true, generate for late bindings. Indent the
code by indent. Bindings is is a dictionary overriding
the defaults or setting parameter values.
"""
parCode = []
for p in self.pars:
if p.late==useLate:
val = bindings.get(p.key, base.NotGiven)
if val is base.NotGiven:
val = p.content_
if val is base.NotGiven:
# val must be bound somewhere else (since _ensureParsBound
# succeeded); don't emit any code for this par in this setup
# and hope for the best.
continue
parCode.append("%s%s = %s"%(indent, p.key, val))
return "\n".join(parCode)
[docs] def getParCode(self, bindings):
"""returns code doing setup bindings un-indented.
"""
return self._getParSettingCode(False, "", bindings)
[docs] def getLateCode(self, bindings):
"""returns code doing late (in-function) bindings indented with two
spaces.
"""
return self._getParSettingCode(True, " ", bindings)
[docs] def getImportsCode(self):
if not self.imports:
return
for spec in self.imports.strip().split(","):
parts = spec.strip().split(".")
if len(parts)>1:
yield "from %s import %s"%(".".join(parts[:-1]), parts[-1])
else:
yield "import "+parts[0]
[docs] def getBodyCode(self):
"""returns the body code un-indented.
"""
collectedCode = list(self.getImportsCode())
for frag in self.codeFrags:
try:
collectedCode.append(
utils.fixIndentation(frag, "", governingLine=1))
except base.LiteralParseError as ex:
ex.pos = self.getSourcePosition()
raise
return "\n".join(collectedCode)
[docs]class ProcDef(base.Structure, base.RestrictionMixin):
"""An embedded procedure.
Embedded procedures are python code fragments with some interface defined
by their type. They can occur at various places (which is called procedure
application generically), e.g., as row generators in grammars, as apply-s in
rowmakers, or as SQL phrase makers in condDescs.
They consist of the actual actual code and, optionally, definitions like
the namespace setup, configuration parameters, or a documentation.
The procedure applications compile into python functions with special
global namespaces. The signatures of the functions are determined by
the type attribute.
ProcDefs are referred to by procedure applications using their id.
"""
name_ = "procDef"
_code = base.UnicodeAttribute("code", default=base.NotGiven,
copyable=True, description="A python function body.")
_setup = base.StructListAttribute("setups", ProcSetup,
description="Setup of the namespace the function will run in",
copyable=True)
_doc = base.UnicodeAttribute("doc", default="", description=
"Human-readable docs for this proc (may be interpreted as restructured"
" text).", copyable=True)
_type = base.EnumeratedUnicodeAttribute("type", default=None, description=
"The type of the procedure definition. The procedure applications"
" will in general require certain types of definitions.",
validValues=["t_t", "apply", "rowfilter", "sourceFields", "mixinProc",
"phraseMaker", "descriptorGenerator", "dataFunction", "dataFormatter",
"metaMaker", "regTest", "iterator", "pargetter"],
copyable=True,
strip=True)
_deprecated = base.UnicodeAttribute("deprecated", default=None,
copyable=True, description="A deprecation message. This will"
" be shown if this procDef is being compiled.")
_original = base.OriginalAttribute()
[docs] def getCode(self):
"""returns the body code indented with two spaces.
"""
if self.code is base.NotGiven:
return ""
else:
try:
return utils.fixIndentation(self.code, " ", governingLine=1)
except base.LiteralParseError as ex:
ex.pos = self.getSourcePosition()
raise
[docs] @functools.lru_cache(1)
def getSetupPars(self):
"""returns all parameters used by setup items, where lexically
later items override earlier items of the same name.
"""
return unionByKey(*[s.pars for s in self.setups])
[docs] def getLateSetupCode(self, boundNames):
return "\n".join(s.getLateCode(boundNames) for s in self.setups)
[docs] def getParSetupCode(self, boundNames):
return "\n".join(s.getParCode(boundNames) for s in self.setups)
[docs] def getBodySetupCode(self, boundNames):
return "\n".join(s.getBodyCode() for s in self.setups)
[docs]class ProcApp(ProcDef):
"""An abstract base for procedure applications.
Deriving classes need to provide:
- a requiredType attribute specifying what ProcDefs can be applied.
- a formalArgs attribute containing a (python) formal argument list
- of course, a name_ for XML purposes.
They can, in addition, give a class attribute additionalNamesForProcs,
which is a dictionary that is joined into the global namespace during
procedure compilation.
"""
_procDef = base.ReferenceAttribute("procDef", forceType=ProcDef,
default=base.NotGiven, description="Reference to the procedure"
" definition to apply", copyable=True)
_bindings = base.StructListAttribute("bindings", description=
"Values for parameters of the procedure definition",
childFactory=Binding, copyable=True)
_name = base.UnicodeAttribute("name", default=base.NotGiven,
description="A name of the proc. ProcApps compute their (python)"
" names to be somewhat random strings. Set a name manually to"
" receive more easily decipherable error messages. If you do that,"
" you have to care about name clashes yourself, though.", strip=True)
requiredType = None
additionalNamesForProcs = {}
[docs] def validate(self):
if self.procDef and self.procDef.type and self.requiredType:
if self.procDef.type!=self.requiredType:
raise base.StructureError("The procDef %s has type %s, but"
" here %s procDefs are required."%(self.procDef.id,
self.procDef.type, self.requiredType))
if self.procDef:
if self.procDef.deprecated:
if self.getSourcePosition()!="<internally built>":
# for now, don't warn about these; they typically
# originate when copying/adapting cores and will just
# confuse operators
procId = "unnamed procApp"
if self.name:
procId = "procApp %s"%self.name
base.ui.notifyWarning("%s, %s: %s"%(
self.getSourcePosition(),
procId,
utils.fixIndentation(self.procDef.deprecated, "")))
super().validate()
self._ensureParsBound()
[docs] def completeElement(self, ctx):
super().completeElement(ctx)
if self.name is base.NotGiven: # make up a name from self's id
self.name = ("proc%x"%id(self)).replace("-", "")
[docs] @functools.lru_cache(1)
def getSetupPars(self):
"""returns the setup parameters for the proc app, where procDef
parameters may be overridden by self's parameters.
"""
allSetups = []
if self.procDef is not base.NotGiven:
allSetups.extend(self.procDef.setups)
allSetups.extend(self.setups)
return unionByKey(*[s.pars for s in allSetups])
def _ensureParsBound(self):
"""raises an error if non-defaulted pars of procDef are not filled
by the bindings.
"""
bindNames = dict((b.key, b) for b in self.bindings)
for p in self.getSetupPars():
if p.alias and p.alias in bindNames:
binding = bindNames.pop(p.alias)
binding.key = p.key
if p.key in bindNames:
raise base.StructureError("Both canonical name and alias bound:"
" %s, %s"%(p.key, p.alias))
else:
bindNames[p.key] = binding
if not p.isDefaulted():
if not p.key in bindNames:
raise base.StructureError("Parameter %s is not defaulted in"
" %s and thus must be bound."%(p.key, self.name))
if p.key in bindNames:
bindNames.pop(p.key)
if bindNames:
raise base.StructureError("May not bind non-existing parameter(s)"
" %s."%(", ".join(bindNames)))
[docs] def onElementComplete(self):
super().onElementComplete()
self._boundNames = dict((b.key, b.content_) for b in self.bindings)
def _combineWithProcDef(self, methodName, boundNames):
# A slightly tricky helper method for the implementation of get*SetupCode:
# this combines the results of calling methodName on a procDef
# (where applicable) with calling it on ProcDef for self.
parts = []
if self.procDef is not base.NotGiven:
parts.append(getattr(self.procDef, methodName)(boundNames))
parts.append(getattr(ProcDef, methodName)(self, boundNames))
return "\n".join(parts)
[docs] def getLateSetupCode(self, boundNames):
return self._combineWithProcDef("getLateSetupCode", boundNames)
[docs] def getParSetupCode(self, boundNames):
return self._combineWithProcDef("getParSetupCode", boundNames)
[docs] def getBodySetupCode(self, boundNames):
return self._combineWithProcDef("getBodySetupCode", boundNames)
[docs] def getSetupCode(self):
code = "\n".join((
self.getParSetupCode(self._boundNames),
self.getBodySetupCode(self._boundNames)))
if "\\" in code:
code = self.parent.expand(code)
return code
def _getFunctionDefinition(self, mainSource):
"""returns mainSource in a function definition with proper
signature including setup of late code.
"""
parts = [self.getLateSetupCode(self._boundNames)]
parts.append(mainSource)
body = "\n".join(parts)
if not body.strip():
body = " pass"
return "def %s(%s):\n%s"%(self.name, self.formalArgs,
body)
[docs] def getFuncCode(self):
"""returns a function definition for this proc application.
This includes bindings of late parameters.
Locally defined code overrides code defined in a procDef.
"""
mainCode = ""
if self.code is base.NotGiven:
if self.procDef is not base.NotGiven:
mainCode = self.procDef.getCode()
else:
mainCode = self.getCode()
code = self._getFunctionDefinition(mainCode)
if "\\" in code:
code = self.parent.expand(code)
return code
def _compileForParent(self, parent):
"""helps compile.
"""
# go get the RD for parent; it's always handy in this kind
# of code
curEl = parent
while not hasattr(curEl, "rd"):
if curEl.parent:
curEl = curEl.parent
else:
break
try:
rd = curEl.rd
except AttributeError:
# maybe an unrooted element
rd = None
return rmkfuncs.makeProc(
self.name, self.getFuncCode(),
self.getSetupCode(), parent,
rd=rd,
procDef=self,
**self.additionalNamesForProcs)
[docs] def breakCircles(self):
# overridden to undo additional memoization
ProcDef.breakCircles(self)
utils.forgetMemoized(self)
[docs] def compile(self, parent=None):
"""returns a callable for this procedure application.
You can pass a different parent; it will then be used to
expand macros. If you do not give it, the embedding structure will
be used.
"""
if parent is None:
parent = self.parent
return utils.memoizeOn(parent, self, self._compileForParent, parent)