"""
Helpers for morphing modules
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import contextlib
from gavo.adql import nodes
[docs]class State(object):
"""is a scratchpad for morphers to communicate state among
themselves.
Append to warnings a necessary. Also, traverse keeps an attribute
nodeStack in here letting elements look up its parent chain.
"""
def __init__(self):
self.warnings = []
self.nodeStack = []
[docs] @contextlib.contextmanager
def onNodeStack(self, node):
self.nodeStack.append(node)
try:
yield
finally:
popped = self.nodeStack.pop()
assert popped==node, "ADQL morphing node stack corruption"
_BOOLEANIZER_TABLE = {
('=', '0'): "NOT",
('!=', '1'): "NOT",
('=', '1'): "",
('!=', '0'): "",}
[docs]def addNotToBooleanized(expr, operator, operand):
"""prepends a NOT to expr if operator and operand suggest there should
be one for ADQL integerized boolean expressions.
The function will return None for unknown combinations of operator and
operand, and it will simply hand through Nones for expr, so calling
functions can just return addNotToBooleanized(...).
"""
if expr is None:
return expr
prefix = _BOOLEANIZER_TABLE.get((operator, operand), None)
if prefix is None:
# weird non-boolean-looking condition
return None
elif prefix:
return nodes.TransparentNode(children=[prefix, '(', expr, ')'])
else:
return expr
[docs]def analyzeFuncComp(node, acceptableOperators=["=", "!="]):
"""returns the (function, other operator) for a comparisonPredicate
This is regardless of the order of the comparison.
This will return None, None if
* node isn't a comparisonPredicate
* the operator is not symmetric
* none of the operators is a FunctionNode
"""
if (node.type!="comparisonPredicate"
or node.opr not in acceptableOperators):
return None, None
if isinstance(node.op1, nodes.FunctionNode):
return node.op1, node.op2
elif isinstance(node.op2, nodes.FunctionNode):
return node.op2, node.op1
else:
# no function call, leave things alone
return None, None
# Handler functions for booleanizeComparisons
_BOOLEANOID_FUNCTIONS = {}
[docs]def registerBooleanizer(funcName, handler):
"""registers handler as a booleanizer for ADQL functions called
funcName.
funcName must be all-uppercase for this to work. handler(node,
operand, operator) is a function that receives a function node
and the operand and operator of the comparison and either returns
None to say it can't handle it, or something else; that something
else is what the entire comparison node is morphed into.
You can call multiple booleanizers for the same function; they will
be tried in sequence. Hence, make sure you get your import sequences
right if you do this.
"""
_BOOLEANOID_FUNCTIONS.setdefault(funcName, []).append(handler)
[docs]def booleanizeComparisons(node, state):
"""turns a comparison expression that's really a boolean
expression into a boolean expression.
Actual morphers shouldn't use that but rather get their parent
from the stack and use its OVERRIDE_RESULT attribute. See the
DISTANCE morpher for an example.
For several reasons, ufuncs like ivo_hasword can't really do this.
Instead, they call registerBooleanizer with the function name
and callable that receives the function node, the operator, and
the operand. If that function returns non-None, that result is
used instead of the current node.
"""
fCall, opd = analyzeFuncComp(node)
if fCall is None:
# node is not a comparison with a function; this is probably not
# a good sign, as we shouldn't end up here in that case, but
# let's hope for the best and fall through to SQL.
return node
opd = nodes.flatten(opd)
for morpher in _BOOLEANOID_FUNCTIONS.get(fCall.funName, []):
res = morpher(fCall, node.opr, opd)
if res is not None:
node = res
break
return node
[docs]class Morpher(object):
"""A class managing the process of morphing an ADQL expression.
It is constructed with a a dictionary of morphers; the keys are node
types, the values morphing functions.
Morphing functions have the signature m(node, state) -> node. They
should return the node if they do not with to change it.
state is a State instance.
The main entry point is morph(origTree) -> state, tree. origTree is not
modified, the return value can be flattened but can otherwise be severely
damaged.
For special effects, there's also earlyMorphers. These will be called
when traversal reaches the node for the first time. If these return
None, traversal continues as usual, if not, their result will be
added to the tree and *not* further traversed. TODO: We don't currently
have anything requiring earlyMorphers. Do we want to drop the feature?
"""
def __init__(self, morphers, earlyMorphers={}):
self.morphers = morphers
self.earlyMorphers = earlyMorphers
def _getChangedForSeq(self, value, state):
newVal, changed = [], False
for child in value:
if isinstance(child, nodes.ADQLNode):
newVal.append(self._traverse(child, state))
else:
newVal.append(child)
if newVal[-1]!=child:
changed = True
if changed:
return tuple(newVal)
def _getChangedForNode(self, value, state):
newVal = self._traverse(value, state)
if not newVal is value:
return newVal
def _getChanges(self, name, value, state):
"""iterates over key/value pairs changed by morphing value under
the key name.
"""
if isinstance(value, (list, tuple)):
meth = self._getChangedForSeq
elif isinstance(value, nodes.ADQLNode):
meth = self._getChangedForNode
else:
return
newVal = meth(value, state)
if newVal is not None:
yield name, newVal
def _traverse(self, node, state):
if node.type in self.earlyMorphers:
res = self.earlyMorphers[node.type](node, state)
if res is not None:
return res
with state.onNodeStack(node):
changes = []
for name, value in node.iterAttributes():
changes.extend(self._getChanges(name, value, state))
# let handlers down the tree determine the total result (this
# is mainly for when comparisons become boolean function calls,
# but who knows?)
if getattr(node, "OVERRIDE_RESULT", None) is not None:
newNode = node.OVERRIDE_RESULT
elif changes:
newNode = node.change(**dict(changes))
newNode.original = node
else:
newNode = node
curType = getattr(newNode, "type", None)
if curType in self.morphers:
handlerResult = self.morphers[curType](newNode, state)
assert handlerResult is not None,\
"ADQL morph handler for %s returned None"%curType
return handlerResult
return newNode
[docs] def morph(self, tree):
state = State()
res = self._traverse(tree, state)
return state, res