"""
Transformation of STC-S CSTs to STC ASTs.
The central function here is buildTree; the rest of the module basically
provides the handler functions. All this is tied together in parseSTCS.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from gavo.stc import common
from gavo.stc import dm
from gavo.stc import stcs
from gavo.stc import syslib
[docs]def buildTree(tree, context, pathFunctions={}, nameFunctions={},
typeFunctions={}):
"""traverses tree, calling functions on nodes.
pathFunctions is a dictionary mapping complete paths (i.e., tuples
of node labels) to handler functions, nameFunctions name a single
label and are called for nodes that don't match a pathFunction if
the last item of their paths is the label. Both of these currently
are not used. Instead, everything hinges on the fallback, which is
a node's type value (generated from the key words), matched against
typeFunctions.
The handler functions must be iterators. If they yield anything,
it must be key-value pairs.
All key-value pairs are collected in a dictionary that is then
returned. If value is a tuple, it is appended to the current value
for the key.
Context is an arbitrary object containing ancillary information for
building nodes. What's in there and what's not is up to the functions
and their callers.
"""
resDict = {}
for path, node in stcs.iterNodes(tree):
if path in pathFunctions:
handler = pathFunctions[path]
elif path and path[-1] in nameFunctions:
handler = nameFunctions[path[-1]]
elif node.get("type") in typeFunctions:
handler = typeFunctions[node["type"]]
else: # No handler, ignore this node
continue
for res in handler(node, context):
k, v = res
if isinstance(v, tuple):
resDict.setdefault(k, []).extend(v)
else:
if k in resDict:
raise common.STCInternalError("Attempt to overwrite key '%s', old"
" value %s, new value %s (this should probably have been"
" a tuple)"%(k, resDict[k], v))
resDict[k] = v
return resDict
[docs]class GenericContext(object):
"""is an object that can be used for context.
It simply exposes all its constructor arguments as attributes.
"""
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
############## Coordinate systems
def _makeRefpos(node):
refposName = node.get("refpos")
if refposName=="UNKNOWNRefPos":
refposName = None
return dm.RefPos(standardOrigin=refposName,
planetaryEphemeris=node.get("plEphemeris"))
def _buildRedshiftFrame(node, context):
yield "redshiftFrame", dm.RedshiftFrame(dopplerDef=node["dopplerdef"],
type=node["redshiftType"], refPos=_makeRefpos(node))
# Simple translations of reference frame names from STC-S to STC-X
_frameTrans = {
"GALACTIC": "GALACTIC_II",
"UNKNOWNFrame": None,}
def _buildSpaceFrame(node, context):
nDim, flavor = stcs.stcsFlavors[node["flavor"]]
frame = node["frame"]
frame = _frameTrans.get(frame, frame)
equinox = None
if node.get("equinox"):
if "." in node["equinox"]:
equinox = node["equinox"]
else: # allow J2000 and expand it to J2000.0
equinox = node["equinox"]+".0"
yield "spaceFrame", dm.SpaceFrame(refPos=_makeRefpos(node),
flavor=flavor, nDim=nDim, refFrame=frame, equinox=equinox)
def _buildSpectralFrame(node, context):
yield "spectralFrame", dm.SpectralFrame(
refPos=_makeRefpos(node))
def _buildTimeFrame(node, context):
ts = node.get("timescale")
if ts=="nil":
ts = None
yield "timeFrame", dm.TimeFrame(refPos=_makeRefpos(node),
timeScale=ts)
[docs]def getCoordSys(cst):
"""returns constructor arguments for a CoordSys from an STC-S CST.
"""
args = buildTree(cst, None, nameFunctions={
'redshift': _buildRedshiftFrame,
'space': _buildSpaceFrame,
'spectral': _buildSpectralFrame,
'time': _buildTimeFrame,
})
return "system", dm.CoordSys(**args)
############## Coordinates and their intervals
[docs]def iterVectors(values, dim, spatial):
"""iterates over dim-dimensional vectors made of values.
The function does not check if the last vector is actually complete.
"""
if isinstance(values, common.ColRef):
yield values
return
if dim==1 and not spatial:
for v in values:
yield v
else:
for index in range(0, len(values), dim):
yield tuple(values[index:index+dim])
def _iterIntervals(coos, dim, spatial=False):
"""iterates over pairs dim-dimensional vectors.
It will always return at least one empty (i.e., None, None) pair.
The last pair returned may be incomplete (specifying a start
value only, supposedly) but not empty.
"""
first, startValue = True, None
for item in iterVectors(coos, dim, spatial):
if startValue is None:
if first:
first = False
startValue = item
else:
yield (startValue, item)
startValue = None
if startValue is None:
if first:
yield (None, None)
else:
yield (startValue, None)
def _makeWiggleValues(nDim, val, minItems=None, maxItems=None, spatial=False):
if val is None:
return
values = _makeCooValues(nDim, val, minItems, maxItems, spatial)
if not values:
return
if nDim>1: # might be error radii if all values are equal
if set([1])==set(len(set(v)) for v in values):
return dm.RadiusWiggle(radii=tuple(v[0] for v in values))
return dm.CooWiggle(values=values)
def _validateCoos(values, nDim, minItems, maxItems):
"""makes sure values is valid a source of between minItems and maxItems
nDim-dimensional tuples.
minItems and maxItems may both be None to signify no limit.
"""
if isinstance(values, common.GeometryColRef):
values.expectedLength = nDim
numItems = len(values)//nDim
if numItems*nDim!=len(values):
# special case: a *single* ColRef is good for anything (could be
# an array or something)
if len(values)==1 and isinstance(values[0], common.ColRef):
return
raise common.STCSParseError("%s is not valid input to create %d-dimensional"
" coordinates"%(values, nDim))
if minItems is not None and numItems<minItems:
raise common.STCSParseError("Expected at least %d coordinates in %s."%(
minItems, values))
if maxItems is not None and numItems>maxItems:
raise common.STCValueError(
"Expected not more than %d coordinates in %s."%(maxItems, values))
def _makeCooValues(nDim, values, minItems=None, maxItems=None, spatial=False):
"""returns a list of nDim-Tuples made up of values.
If values does not contain an integral multiple of nDim items,
the function will raise an STCSParseError. You can also optionally
give a minimally or maximally expected number of tuples. If the
constraints are violated, again an STCSParseError is raised.
If spatial is true, tuples will be returned even for 1D data.
"""
if values is None:
if minItems:
raise common.STCSParseError("Expected at least %s coordinate items but"
" found none."%minItems)
else:
return
_validateCoos(values, nDim, minItems, maxItems)
return tuple(v for v in iterVectors(values, nDim, spatial))
def _addUnitPlain(args, node, frame):
args["unit"] = node.get("unit")
def _addUnitRedshift(args, node, frame):
unit = node.get("unit", [None])
assert isinstance(unit, list), "not a list"
unit = unit[0]
if unit=="nil":
args["unit"] = ""
args["velTimeUnit"] = None
elif unit:
parts = unit.split("/")
if len(parts)!=2:
raise common.STCSParseError("'%s' is not a valid unit for redshifts"%unit)
args["unit"] = parts[0]
args["velTimeUnit"] = parts[1]
def _mogrifySpaceUnit(unit, nDim):
if unit:
parts = unit.split()
if len(parts)==nDim:
return tuple(parts)
elif len(parts)==1:
return (unit,)*nDim
else:
raise common.STCSParseError("'%s' is not a valid for unit %d-dimensional"
" spatial coordinates"%(unit, nDim))
def _addUnitSpatial(args, node, frame):
unit, nDim = node.get("unit"), frame.nDim
args["unit"] = _mogrifySpaceUnit(unit, nDim)
def _addUnitVelocity(args, node, frame):
unit, nDim = node.get("unit"), frame.nDim
if unit:
su, vu = [], []
parts = unit.split()
for uS in parts:
up = uS.split("/")
if len(up)!=2:
raise common.STCSParseError(
"'%s' is not a valid unit for velocities."%uS)
su.append(up[0])
vu.append(up[1])
args["unit"] = _mogrifySpaceUnit(" ".join(su), nDim)
args["velTimeUnit"] = _mogrifySpaceUnit(" ".join(vu), nDim)
_unitMakers = {
dm.SpectralType: _addUnitPlain,
dm.TimeType: _addUnitPlain,
dm.SpaceType: _addUnitSpatial,
dm.RedshiftType: _addUnitRedshift,
dm.VelocityType: _addUnitVelocity,
}
def _makeBasicCooArgs(node, frame, posClass, spatial=False):
"""returns a dictionary containing constructor arguments common to
all items dealing with coordinates.
"""
nDim = frame.nDim
args = {
"error": _makeWiggleValues(nDim, node.get("error"), maxItems=2,
spatial=spatial),
"resolution": _makeWiggleValues(nDim, node.get("resolution"), maxItems=2,
spatial=spatial),
"pixSize": _makeWiggleValues(nDim, node.get("pixSize"), maxItems=2,
spatial=spatial),
"size": _makeWiggleValues(nDim, node.get("size"), maxItems=2,
spatial=spatial),
"frame": frame,
}
if spatial and node.get("epoch"):
if isinstance(node["epoch"], common.ColRef):
args["epoch"] = node["epoch"]
args["yearDef"] = "J"
else:
args["epoch"] = float(node["epoch"][1:])
args["yearDef"] = node["epoch"][0]
_unitMakers[posClass.cType](args, node, frame)
return args
def _makeCooBuilder(frameName, intervalClass, intervalKey,
posClass, posKey, iterIntervKeys, spatial=False):
"""returns a function(node, context) -> ASTNode for building a
coordinate-like AST node.
frameName is the name of the coordinate frame within
context.system,
(interval|pos)(Class|Key) are the class (key) to be used
(returned) for the interval/geometry and simple coordinate found
in the phrase. If intervalClass is None, no interval/geometry
will be built.
iterIntervKeys is an iterator that yields key/value pairs for intervals
or geometries embedded.
Single positions are always expected under the coo key.
"""
positionExclusiveKeys = ["error", "resolution", "pixSize", "value",
"size", "unit", "velTimeUnit", "epoch", "yearDef"]
def builder(node, context):
frame = getattr(context.system, frameName)
nDim = frame.nDim
args = _makeBasicCooArgs(node, frame, posClass, spatial)
# Yield a coordinate
if "pos" in node:
args["value"] = _makeCooValues(nDim, node["pos"],
minItems=1, maxItems=1, spatial=spatial)[0]
else:
args["value"] = None
yield posKey, posClass(**args)
# Yield an area if defined in this phrase and non-empty
if intervalClass is None:
return
for key in positionExclusiveKeys:
if key in args:
del args[key]
for k, v in iterIntervKeys(node, nDim, spatial=spatial):
args[k] = v
if "fillfactor" in node:
args["fillFactor"] = node["fillfactor"]
if len(set(args))>1: # don't yield intervals that just define a frame
yield intervalKey, (intervalClass(**args),)
return builder
def _makeIntervalKeyIterator(preferUpper=False):
"""returns a function yielding ASTNode constructor keys for intervals.
"""
def iterKeys(node, nDim, spatial=False):
res, coos = {}, node.get("coos", ())
_validateCoos(coos, nDim, None, None)
for interval in _iterIntervals(coos, nDim, spatial):
if preferUpper:
res["upperLimit"], res["lowerLimit"] = interval
else:
res["lowerLimit"], res["upperLimit"] = interval
if res["upperLimit"]:
yield "upperLimit", res["upperLimit"]
if res["lowerLimit"]:
yield "lowerLimit", res["lowerLimit"]
return iterKeys
###################### Geometries
def _makeGeometryKeyIterator(argDesc, clsName):
"""returns a key iterator for use with _makeCooBuilder that yields
the keys particular to certain geometries.
ArgDesc describes what keys should be parsed from the node's coos key.
It consists for tuples of name and type code, where type code is one of:
- r -- a single real value.
- v -- a vector of dimensionality given by the system (i.e., nDim).
- rv -- a sequence of v items of arbitrary length.
- cv -- a sequence of "Convex" vectors (dim 4) of arbitrary length.
rv may only occur at the end of argDesc since it will consume all
remaining coordinates.
"""
parseLines = [
"def iterKeys(node, nDim, spatial=True):",
' if False: yield', # make sure the thing is an iterator
' coos = node.get("coos", ())',
' yield "origUnit",'
' _mogrifySpaceUnit(node.get("unit"), nDim)',
" try:",
" pass"]
# Everything below here just coordinates
parseLines.extend([
' if isinstance(coos, common.GeometryColRef):',
' yield "geoColRef", coos',
' return'])
for name, code in argDesc:
if code=="r":
parseLines.append(' yield "%s", coos.pop(0)'%name)
elif code=="v":
parseLines.append(' vec = coos[:nDim]')
parseLines.append(' coos = coos[nDim:]')
parseLines.append(' _validateCoos(vec, nDim, 1, 1)')
parseLines.append(' yield "%s", tuple(vec)'%name)
elif code=="rv":
parseLines.append(' yield "%s", _makeCooValues(nDim, coos)'%name)
parseLines.append(' coos = []')
elif code=="cv":
parseLines.append(' yield "%s", _makeCooValues(4, coos)'%name)
parseLines.append(' coos = []')
parseLines.append(' except IndexError:')
parseLines.append(' raise common.STCSParseError("Not enough coordinates'
' while parsing %s")'%clsName)
parseLines.append(
' if coos: raise common.STCSParseError("Too many coordinates'
' while building %s, remaining: %%s"%%coos)'%clsName)
exec("\n".join(parseLines))
return locals()["iterKeys"]
def _makeGeometryKeyIterators():
return dict(
(clsName, _makeGeometryKeyIterator(argDesc, clsName))
for clsName, argDesc in [
("AllSky", []),
("Circle", [('center', 'v'), ('radius', 'r')]),
("Ellipse", [('center', 'v'), ('smajAxis', 'r'), ('sminAxis', 'r'),
('posAngle', 'r')]),
("Box", [('center', 'v'), ('boxsize', 'v')]),
("Polygon", [("vertices", "rv")]),
("Convex", [("vectors", "cv")]),
("PositionInterval", [("lowerLimit", "v"), ("upperLimit", "v")]),
])
_geometryKeyIterators = _makeGeometryKeyIterators()
def _makeGeometryBuilder(cls):
"""returns a builder for Geometries.
See _makeGeometryKeyIterator for the meaning of the arguments.
"""
return _makeCooBuilder("spaceFrame", cls, "areas", dm.SpaceCoo,
"place", _geometryKeyIterators[cls.__name__], spatial=True)
def _compoundGeometryKeyIterator(node, nDim, spatial):
"""yields keys to configure compound geometries.
"""
children = []
for c in node["children"]:
childType = c["subtype"]
destCls = getattr(dm, childType)
if childType in _geometryKeyIterators:
children.append(destCls(**dict(
_geometryKeyIterators[childType](c, nDim, True))))
else: # child is another compound geometry
children.append(destCls(**dict(
_compoundGeometryKeyIterator(c, nDim, True))))
yield "children", children
def _makeCompoundGeometryBuilder(cls):
"""returns a builder for compound geometries.
"""
return _makeCooBuilder("spaceFrame", cls, "areas", dm.SpaceCoo,
"place", _compoundGeometryKeyIterator, spatial=True)
###################### Top level
[docs]def getCoords(cst, system):
"""returns an argument dict for constructing STCSpecs for plain coordinates.
"""
context = GenericContext(system=system)
return buildTree(cst, context, typeFunctions = {
"Time": _makeCooBuilder("timeFrame", None, None,
dm.TimeCoo, "time", None),
"StartTime": _makeCooBuilder("timeFrame", dm.TimeInterval, "timeAs",
dm.TimeCoo, "time", _makeIntervalKeyIterator()),
"StopTime": _makeCooBuilder("timeFrame", dm.TimeInterval, "timeAs",
dm.TimeCoo, "time", _makeIntervalKeyIterator(preferUpper=True)),
"TimeInterval": _makeCooBuilder("timeFrame", dm.TimeInterval, "timeAs",
dm.TimeCoo, "time", _makeIntervalKeyIterator()),
"Position": _makeCooBuilder("spaceFrame", None, None, dm.SpaceCoo,
"place", None, spatial=True),
"PositionInterval": _makeCooBuilder("spaceFrame",
dm.SpaceInterval, "areas", dm.SpaceCoo, "place",
_makeIntervalKeyIterator(), spatial=True),
"Velocity": _makeCooBuilder("spaceFrame",
dm.VelocityInterval, "velocityAs", dm.VelocityCoo, "velocity",
_makeIntervalKeyIterator(), spatial=True),
"VelocityInterval": _makeCooBuilder("spaceFrame",
dm.VelocityInterval, "velocityAs", dm.VelocityCoo, "velocity",
_makeIntervalKeyIterator(), spatial=True),
"AllSky": _makeGeometryBuilder(dm.AllSky),
"Circle": _makeGeometryBuilder(dm.Circle),
"Ellipse": _makeGeometryBuilder(dm.Ellipse),
"Box": _makeGeometryBuilder(dm.Box),
"Polygon": _makeGeometryBuilder(dm.Polygon),
"Convex": _makeGeometryBuilder(dm.Convex),
"Union": _makeCompoundGeometryBuilder(dm.Union),
"Intersection": _makeCompoundGeometryBuilder(dm.Intersection),
"Difference": _makeCompoundGeometryBuilder(dm.Difference),
"Not": _makeCompoundGeometryBuilder(dm.Not),
"Spectral": _makeCooBuilder("spectralFrame", None, None,
dm.SpectralCoo, "freq", None),
"SpectralInterval": _makeCooBuilder("spectralFrame",
dm.SpectralInterval, "freqAs", dm.SpectralCoo, "freq",
_makeIntervalKeyIterator()),
"Redshift": _makeCooBuilder("redshiftFrame", None, None,
dm.RedshiftCoo, "redshift", None),
"RedshiftInterval": _makeCooBuilder("redshiftFrame",
dm.RedshiftInterval, "redshiftAs", dm.RedshiftCoo, "redshift",
_makeIntervalKeyIterator()),
})
[docs]def parseSTCS(literal, grammarFactory=None):
"""returns an STC AST for an STC-S expression.
"""
cst = stcs.getCST(literal, grammarFactory)
if "libSystem" in cst:
system = syslib.getLibrarySystem(cst["libSystem"])
else:
system = getCoordSys(cst)[1]
args = {"astroSystem": system}
args.update(getCoords(cst, system))
return dm.STCSpec(**args).polish()
[docs]def parseQSTCS(literal):
"""returns an STC AST for an STC-S expression with identifiers instead of
values.
The identifiers are denoted in double-quoted strings. Legal identifiers
follow the python syntax (i.e., these are *not* SQL quoted identifiers).
"""
return parseSTCS(literal, grammarFactory=stcs.getColrefGrammar)
if __name__=="__main__":
print(parseSTCS("Union FK5 (Box 12 -13 2 2 Not (Circle 14 -13.5 3))"))