"""
Building ASTs from STC-X trees.
The idea here is run buildTree on an ElementTree of the STC-X input.
buildTree has a dictionary mapping element names to handlers. This dictionary
is built with a modicum of metaprogramming within _getHandlers.
Each handler receives the ElementTree node it is to operate on, the current
buildArgs (i.e., a dictionary containing for building instances collected by
buildTree while walking the tree), and a context object.
Handlers yield keyword-value pairs that are added to the buildArgs. If
the value is a tuple or list, it will be appended to the current value
for that keyword, otherwise it will fill this keyword. Overwrites are
not allowed.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
# TODO: throw out all XML namespacing and only operate on local names
# throughout (well, actually: throw out the whole thing:-)
from gavo import utils
from gavo.stc import common
from gavo.stc import dm
from gavo.stc import syslib
from gavo.stc import times
from gavo.stc import units
WIGGLE_TYPES = ["error", "resolution", "size", "pixSize"]
[docs]class SIBLING_ASTRO_SYSTEM(object):
"""A sentinel class to tell the id resolver to use the sibling AstroCoordSys
element."""
####################### Helpers
[docs]def STCElement(name):
return utils.ElementTree.QName(common.STCNamespace, name)
_n = STCElement
def _localname(qName):
"""hacks the local tag name from a {ns}-serialized qName.
"""
qName = str(qName)
return qName[qName.find("}")+1:]
def _passthrough(node, buildArgs, context):
"""yields the items of buildArgs.
This can be used for "no-op" elements.
"""
return iter(buildArgs.items())
def _noIter(ign, ored):
if False: yield None
def _buildTuple(val):
return (val,)
def _makeUnitYielder(unitKeys, prefix="", tuplify=False):
"""returns a function that yields unit information from an elementTree
node.
"""
if tuplify:
mkRes = _buildTuple
else:
mkRes = utils.identity
def yieldUnits(node, buildArgs):
for key in unitKeys:
if key in node.attrib:
yield prefix+key, mkRes(node.get(key))
elif key in buildArgs:
yield prefix+key, buildArgs[key]
return yieldUnits
def _makeKeywordBuilder(kw):
"""returns a builder that returns the node's text content under kw.
"""
def buildKeyword(node, buildArgs, context):
yield kw, node.text
return buildKeyword
def _assembleVals(buildArgs, context):
"""takes C1, C2, C3 from buildArgs to add a vals key to buildArgs.
If vals already is there, nothing is changed.
Calling this is necessary for vector-valued values. buildArgs is changed
in place.
"""
if "vals" in buildArgs:
return
vals = []
for cooInd in range(context.peekDim()):
vals.append(buildArgs.pop("C%d"%(cooInd+1), None))
if set(vals)==frozenset([None]):
# no coordinates given, don't bother
pass
else:
buildArgs['vals'] = tuple(vals)
def _makeKwValuesBuilder(kwName, tuplify=False, units=_noIter):
"""returns a builder that takes vals from the buildArgs and
returns a tuple of them under kwName.
The vals key is left by builders like _buildVector.
"""
if tuplify:
def buildNode(node, buildArgs, context):
_assembleVals(buildArgs, context)
yield kwName, (tuple(buildArgs["vals"]),)
for res in units(node, buildArgs): yield res
else:
def buildNode(node, buildArgs, context): #noflake: previous def conditional
_assembleVals(buildArgs, context)
yield kwName, (buildArgs["vals"],)
for res in units(node, buildArgs): yield res
return buildNode
def _makeKwValueBuilder(kwName, tuplify=False, units=_noIter):
"""returns a builder that takes vals from the buildArgs and
returns a single value under kwName.
The vals key is left by builders like _buildVector.
"""
if tuplify:
def buildNode(node, buildArgs, context):
_assembleVals(buildArgs, context)
yield kwName, tuple(buildArgs.get("vals", ())),
for res in units(node, buildArgs): yield res
else:
def buildNode(node, buildArgs, context): #noflake: previous def conditional
_assembleVals(buildArgs, context)
yield kwName, buildArgs.get("vals", None),
for res in units(node, buildArgs): yield res
return buildNode
def _makeKwFloatBuilder(kwName, multiple=True, units=_noIter):
"""returns a builder that returns float(node.text) under kwName.
The builder will also yield unit keys if units are present.
If multiple is True, the values will be returned in 1-tuples, else as
simple values.
"""
if multiple:
def buildNode(node, buildArgs, context):
if isinstance(node.text, common.ColRef):
yield kwName, (node.text,)
elif node.text and node.text.strip():
yield kwName, (float(node.text),)
for res in units(node, buildArgs): yield res
else:
def buildNode(node, buildArgs, context): #noflake: previous def conditional
if isinstance(node.text, common.ColRef):
yield kwName, node.text
elif node.text and node.text.strip():
yield kwName, float(node.text)
for res in units(node, buildArgs): yield res
return buildNode
def _makeNodeBuilder(kwName, astObject):
"""returns a builder that makes astObject with the current buildArgs
and returns the thing under kwName.
"""
def buildNode(node, buildArgs, context):
buildArgs["id"] = node.get("id", None)
yield kwName, astObject(**buildArgs)
return buildNode
def _fixSpectralUnits(node, buildArgs, context):
unit = None
if "unit" in node.attrib:
unit = node.get("unit")
if "unit" in buildArgs:
unit = buildArgs["unit"]
if "spectral_unit" in buildArgs:
unit = buildArgs["spectral_unit"]
del buildArgs["spectral_unit"]
buildArgs["unit"] = unit
def _fixTimeUnits(node, buildArgs, context):
unit = None
if "unit" in node.attrib:
unit = node.get("unit")
if "unit" in buildArgs:
unit = buildArgs["unit"]
if "time_unit" in buildArgs:
unit = buildArgs["time_unit"]
del buildArgs["time_unit"]
buildArgs["unit"] = unit
def _fixRedshiftUnits(node, buildArgs, context):
sUnit = node.get("unit")
if "unit" in buildArgs:
sUnit = buildArgs["unit"]
if "pos_unit" in buildArgs:
sUnit = buildArgs["pos_unit"]
del buildArgs["pos_unit"]
vUnit = node.get("vel_time_unit")
if "vel_time_unit" in buildArgs:
vUnit = buildArgs["vel_time_unit"]
del buildArgs["vel_time_unit"]
buildArgs["unit"] = sUnit
buildArgs["velTimeUnit"] = vUnit
def _makeSpatialUnits(nDim, *unitSources):
"""returns a units value from unitSources.
The tuple has length nDim, unitSources are arguments that are either
None, strings, or tuples. The first non-None-one wins, strings and 1-tuples
are expanded to length nDim.
"""
for unit in unitSources:
if not unit:
continue
if isinstance(unit, (tuple, list)):
if len(unit)==1:
return tuple(unit*nDim)
elif len(unit)==nDim:
return tuple(unit)
else:
raise common.STCValueError("Cannot construct %d-dimensional units from"
" %s."%(nDim, repr(unit)))
else: # a string or something similar
return (unit,)*nDim
return None
def _fixSpatialUnits(node, buildArgs, context):
nDim = context.peekDim()
# buildArgs["unit"] may have been left in build_args from upstream
buildArgs["unit"] = _makeSpatialUnits(nDim, buildArgs.pop("unit", None),
node.get("unit", "").split())
# This only kicks in for velocities
buildArgs["velTimeUnit"] = _makeSpatialUnits(nDim,
buildArgs.pop("vel_time_unit", ()), node.get("vel_time_unit", "").split())
if not buildArgs["velTimeUnit"]:
del buildArgs["velTimeUnit"]
if not buildArgs["unit"]:
# it's actually legal to have to unit on the position but on some
# wiggle (sigh). Adopt the first one we find if that's true.
for wiggleType in WIGGLE_TYPES:
if wiggleType in buildArgs:
if buildArgs[wiggleType].origUnit:
buildArgs["unit"] = buildArgs[wiggleType].origUnit
break
else:
del buildArgs["unit"]
# sometimes people give position1d for nDim=2... *Presumably*
# actual units are the same on both axes then. Sigh
mainUnit = buildArgs.get("unit")
if mainUnit and len(mainUnit)==2 and mainUnit[1] is None:
buildArgs["unit"] = (mainUnit[0], mainUnit[0])
_unitFixers = {
"spectralFrame": _fixSpectralUnits,
"redshiftFrame": _fixRedshiftUnits,
"timeFrame": _fixTimeUnits,
"spaceFrame": _fixSpatialUnits,
}
def _fixUnits(frameName, node, buildArgs, context):
"""changes the keys in buildArgs to match the requirements of node.
This fans out to frame type-specific helper functions. The principle is:
Attributes inherited from lower-level items (i.e. the specific values)
override a unit specification on node.
"""
return _unitFixers[frameName](node, buildArgs, context)
def _iterCooMeta(node, context, frameName):
"""yields various meta information for coordinate-like objects.
For frame, it returns a proxy for a coordinate's reference frame.
For unit, if one is given on the element, override whatever we may
have got from downtree.
Rules for inferring the frame:
If there's a frame id on node, use it.
Else see if there's a coosys id on the frame. If it's missing, take
it from the context, then make a proxy to the referenced system's
spatial frame.
"""
if "frame_id" in node.attrib:
yield "frame", IdProxy(idref=node.get("frame_id"))
elif "coord_system_id" in node.attrib:
yield "frame", IdProxy(idref=node.get("frame_id"), useAttr=frameName)
else:
yield "frame", IdProxy(idref=context.sysIdStack[-1],
useAttr=frameName)
if "fill_factor" in node.attrib and node.get("fill_factor"):
yield "fillFactor", float(node.get("fill_factor"))
if "id" in node.attrib and node.get("id"):
yield "id", node.get("id")
# A dictionary mapping STC-X element names to the dimensionality of
# coordinates within them. You only need to give them here if _guessNDim
# doesn't otherwise the it right.
_dimExceptions = {
}
def _guessNDim(kw):
"""guesses the number of dimensions that should be present under the STC-X
element named kw by inspecting the name.
"""
if kw in _dimExceptions:
return _dimExceptions[kw]
if "3" in kw:
return 3
elif "2" in kw:
return 2
else:
return 1
def _makeIntervalBuilder(kwName, astClass, frameName, tuplify=False):
"""returns a builder that makes astObject with the current buildArgs
and fixes its frame reference.
"""
if tuplify:
def mkVal(v):
if isinstance(v, (tuple, list)):
return v
else:
return (v,)
else:
def mkVal(v): #noflake: previous def conditional
return v
def buildNode(node, buildArgs, context):
for key, value in _iterCooMeta(node, context, frameName):
buildArgs[key] = value
if "lowerLimit" in buildArgs:
buildArgs["lowerLimit"] = mkVal(buildArgs["lowerLimit"][0])
if "upperLimit" in buildArgs:
buildArgs["upperLimit"] = mkVal(buildArgs["upperLimit"][0])
_fixUnits(frameName, node, buildArgs, context)
buildArgs["origUnit"] = (buildArgs.pop("unit", None),
buildArgs.pop("velTimeUnit", None))
yield kwName, (astClass(**buildArgs),)
return buildNode
def _fixWiggles(buildArgs):
"""modifies buildArgs so all wiggles are properly wrapped in their
classes.
"""
for wiggleType in WIGGLE_TYPES:
localArgs = {}
wigClass = None
# pop any units destined for us from buildArgs -- boy, this whole
# units stuff is messy. How the heck was that meant to work?
velTimeUnit = buildArgs.pop(wiggleType+"vel_time_unit", None)
if wiggleType+"unit" in buildArgs:
localArgs["origUnit"] = (buildArgs.pop(wiggleType+"unit", None),
velTimeUnit)
if wiggleType+"pos_unit" in buildArgs:
localArgs["origUnit"] = (buildArgs.pop(wiggleType+"pos_unit", None),
velTimeUnit)
if wiggleType in buildArgs:
localArgs["values"] = tuple(buildArgs.pop(wiggleType))
wigClass = dm.CooWiggle
elif wiggleType+"Radius" in buildArgs:
wigClass = dm.RadiusWiggle
localArgs["radii"] = buildArgs.pop(wiggleType+"Radius")
elif wiggleType+"Matrix" in buildArgs:
localArgs["matrices"] = buildArgs.pop(wiggleType+"Matrix")
wigClass = dm.MatrixWiggle
if wigClass is not None:
buildArgs[wiggleType] = wigClass(**localArgs)
def _makePositionBuilder(kw, astClass, frameName, tuplify=False):
"""returns a builder for a coordinate of astClass to be added with kw.
"""
def buildPosition(node, buildArgs, context):
_assembleVals(buildArgs, context)
if buildArgs.get("vals"):
buildArgs["value"] = buildArgs["vals"][0]
# Fix 1D space coordinates
if tuplify and not isinstance(buildArgs["value"], (list, tuple)):
buildArgs["value"] = (buildArgs["value"],)
del buildArgs["vals"]
for key, value in _iterCooMeta(node, context, frameName):
buildArgs[key] = value
_fixWiggles(buildArgs)
_fixUnits(frameName, node, buildArgs, context)
yield kw, astClass(**buildArgs)
return buildPosition
[docs]class ContextActions(object):
"""A specification of context actions for certain elements.
You will want to override both start and stop. The methods
should not change node.
"""
[docs] def start(self, context, node):
pass
[docs] def stop(self, context, node):
pass
################ Coordinate systems
_xlinkHref = utils.ElementTree.QName(common.XlinkNamespace, "href")
def _buildAstroCoordSystem(node, buildArgs, context):
buildArgs["id"] = node.get("id", None)
# allow non-qnamed href, too, mainly for accommodating our
# namespace-eating relational resource importer
if _xlinkHref in node.attrib:
hrefVal = node.attrib[_xlinkHref]
elif "xlink:href" in node.attrib:
hrefVal = node.attrib["xlink:href"]
else:
hrefVal = None
if hrefVal is None:
newEl = dm.CoordSys(**buildArgs)
else:
newEl = syslib.getLibrarySystem(hrefVal).change(**buildArgs)
# Hack -- make sure we have a good id here, even when this means
# a violation of our non-mutability ideology.
if newEl.id is None:
newEl.id = utils.intToFunnyWord(id(newEl))
yield "astroSystem", newEl
def _buildPlanetaryEphem(node, buildArgs, context):
res = node.text.strip()
if res:
yield 'planetaryEphemeris', res
def _buildRefpos(node, buildArgs, context):
refposName = _localname(node.tag)
if refposName=="UNKNOWNRefPos":
refposName = None
yield 'refPos', dm.RefPos(standardOrigin=refposName,
**buildArgs)
def _buildFlavor(node, buildArgs, context):
yield 'flavor', _localname(node.tag)
naxes = node.get("coord_naxes")
if naxes is not None:
yield 'nDim', int(naxes)
def _buildRefFrame(node, buildArgs, context):
frameName = _localname(node.tag)
if frameName=="UNKNOWNFrame":
yield 'refFrame', None
else:
yield 'refFrame', frameName
for item in buildArgs.items():
yield item
def _makeFrameBuilder(attName, frameObj, **defaults):
"""returns a function yielding keywords for frames.
You can pass additional defaults.
"""
def buildFrame(node, buildArgs, context):
if "value_type" in node.attrib: # for redshifts
buildArgs["type"] = node.get("value_type")
buildArgs["id"] = node.get("id")
for key, val in defaults.items():
if key not in buildArgs:
buildArgs[key] = val
yield attName, frameObj(**buildArgs)
return buildFrame
################# Coordinates
[docs]class CooSysActions(ContextActions):
"""Actions for containers of coordinates.
The actions push and pop the system ids of the coordinate containers
so leaves can build their frame proxies from them.
If none are present, None is pushed, which is to be understood as
"use any ol' AstroCoordSystem you can find".
"""
[docs] def start(self, context, node):
context.sysIdStack.append(node.get("coord_system_id",
SIBLING_ASTRO_SYSTEM))
[docs] def stop(self, context, node):
context.sysIdStack.pop()
def _buildTime(node, buildArgs, context):
"""adds vals from the time node.
node gets introspected to figure out what kind of time we're talking
about. The value always is a datetime instance.
"""
if isinstance(node.text, common.ColRef):
yield "vals", (node.text,)
else:
parser = {
"ISOTime": times.parseISODT,
"JDTime": lambda v: times.jdnToDateTime(float(v)),
"MJDTime": lambda v: times.mjdToDateTime(float(v)),
}[_localname(node.tag)]
yield "vals", (parser(node.text),)
_handledUnits = ("unit", "vel_time_unit", "pos_unit")
_buildFloat = _makeKwFloatBuilder("vals",
units=_makeUnitYielder(_handledUnits, tuplify=True))
_unitKeys = ("unit", "vel_time_unit")
_genUnitKeys = ("pos_unit", "time_unit", "spectral_unit", "angle_unit",
"gen_unit")
def _buildVector(node, buildArgs, context):
_assembleVals(buildArgs, context)
if 'vals' in buildArgs:
yield 'vals', (tuple(buildArgs["vals"][:context.peekDim()]),)
for uk in _unitKeys:
if uk in buildArgs:
yield uk, tuple(buildArgs[uk])
for uk in _genUnitKeys:
if uk in buildArgs:
yield "unit", tuple(buildArgs[uk])
def _buildElnameFloat(node, buildArgs, context,
units=_makeUnitYielder(_handledUnits, tuplify=True)):
"""returns float(node.context) as the value for node.tag.name.
An empty text is equivalent to a None value.
"""
tagName = _localname(node.tag)
if node.text:
if isinstance(node.text, str):
yield tagName, float(node.text)
else:
yield tagName, node.text
else:
yield tagName, None
for res in units(node, buildArgs): yield res
def _buildEpoch(node, buildArgs, context):
yield "yearDef", node.get("yearDef", "J")
yield "epoch", float(node.text)
################# Geometries
[docs]class BoxActions(ContextActions):
"""Context actions for Boxes: register a special handler for Size.
"""
boxHandlers = {
_n("Size"): _makeKwValueBuilder("boxsize", tuplify=True, units=
_makeUnitYielder(("unit",), "size")),
}
[docs] def start(self, context, node):
context.specialHandlerStack.append(self.boxHandlers)
[docs] def stop(self, context, node):
context.specialHandlerStack.pop()
def _buildHalfspace(node, buildArgs, context):
yield "vectors", (tuple(buildArgs["vector"])+tuple(buildArgs["offset"]),)
def _adaptCircleUnits(buildArgs):
buildArgs["unit"] = buildArgs.pop("unit", ("deg", "deg"))
if "radiuspos_unit" in buildArgs:
buildArgs["radius"] = units.getBasicConverter(
buildArgs.pop("radiuspos_unit"), buildArgs["unit"][0])(
buildArgs["radius"])
def _adaptEllipseUnits(buildArgs):
buildArgs["unit"] = buildArgs.pop("unit", ("deg", "deg"))
if "smajAxispos_unit" in buildArgs:
buildArgs["smajAxis"] = units.getBasicConverter(
buildArgs.pop("smajAxispos_unit"), buildArgs["unit"][0])(
buildArgs["smajAxis"])
if "sminAxispos_unit" in buildArgs:
buildArgs["sminAxis"] = units.getBasicConverter(
buildArgs.pop("sminAxispos_unit"), buildArgs["unit"][0])(
buildArgs["sminAxis"])
if "posAngleunit" in buildArgs:
buildArgs["posAngle"] = units.getBasicConverter(
buildArgs.pop("posAngleunit"), "deg")(buildArgs["posAngle"])
def _adaptBoxUnits(buildArgs):
buildArgs["unit"] = buildArgs.get("unit", ("deg", "deg"))
if "sizeunit" in buildArgs:
su = buildArgs.pop("sizeunit")
if isinstance(su, str):
su = (su, su)
buildArgs["boxsize"] = units.getVectorConverter(su,
buildArgs["unit"])(buildArgs["boxsize"])
def _makeGeometryBuilder(astClass, adaptDepUnits=None):
"""returns a builder for STC-S geometries.
"""
def buildGeo(node, buildArgs, context):
for key, value in _iterCooMeta(node, context, "spaceFrame"):
buildArgs[key] = value
_fixSpatialUnits(node, buildArgs, context)
if adaptDepUnits:
adaptDepUnits(buildArgs)
buildArgs["origUnit"] = (buildArgs.pop("unit", None), None)
if isinstance(node.text, common.ColRef):
buildArgs["geoColRef"] = common.GeometryColRef(str(node.text))
yield 'areas', (astClass(**buildArgs),)
return buildGeo
def _validateCompoundChildren(buildArgs):
"""makes sure that all children of a future compound geometry agree in
units and propagates units as necessary.
origUnit attributes of children are nulled out in the process. Sorry
'bout ignoring immutability.
"""
children = buildArgs.pop("areas")
cUnits, selfUnit = [], buildArgs.pop("unit", None)
for c in children:
if c.origUnit!=(None,None):
cUnits.append(c.origUnit)
c.origUnit = None
if len(set(cUnits))>1:
raise common.STCNotImplementedError(
"Different units within compound children are not supported")
elif len(set(cUnits))==1:
ou = (selfUnit, None)
if selfUnit is not None and ou!=cUnits[0]:
raise common.STCNotImplementedError(
"Different units on compound and compound children are not supported")
buildArgs["origUnit"] = cUnits[0]
else:
if selfUnit is not None:
buildArgs["origUnit"] = (selfUnit, None)
buildArgs["children"] = children
def _makeCompoundBuilder(astClass):
def buildCompound(node, buildArgs, context):
_validateCompoundChildren(buildArgs)
buildArgs.update(_iterCooMeta(node, context, "spaceFrame"))
yield "areas", (astClass(**buildArgs),)
return buildCompound
################# Toplevel
_areasAndPositions = [("timeAs", "time"), ("areas", "place"),
("freqAs", "freq"), ("redshiftAs", "redshift"),
("velocityAs", "velocity")]
def _addPositionsForAreas(buildArgs):
"""adds positions for areas defined by buildArgs.
This only happens if no position is given so far. The function is a
helper for _adaptAreaUnits. BuildArgs is changed in place.
"""
for areaAtt, posAtt in _areasAndPositions:
if buildArgs.get(areaAtt) and not buildArgs.get(posAtt):
areas = buildArgs[areaAtt]
posAttrs = {}
for area in areas:
if area.origUnit is not None:
posAttrs["unit"] = area.origUnit[0]
if area.origUnit[1]:
posAttrs["velTimeUnit"] = area.origUnit[1]
area.origUnit = None
break
# if no spatial area had a unit, it's probably something like AllSky,
# but we still must have *something*, anything
if posAtt=="place" and areas and posAttrs["unit"] is None:
posAttrs["unit"] = ("deg", "deg")
buildArgs[posAtt] = area.getPosition(posAttrs)
def _adaptAreaUnits(buildArgs):
"""changes area's units in buildArgs to match the positions's units.
When areas without positions are present, synthesize the appropriate
positions to hold units.
"""
_addPositionsForAreas(buildArgs)
for areaAtt, posAtt in _areasAndPositions:
newAreas = []
for area in buildArgs.get(areaAtt, ()):
if area.origUnit is not None and area.origUnit[0] is not None:
newAreas.append(area.adaptValuesWith(
buildArgs[posAtt].getUnitConverter(area.origUnit)))
else:
newAreas.append(area)
buildArgs[areaAtt] = tuple(newAreas)
def _buildToplevel(node, buildArgs, context):
_adaptAreaUnits(buildArgs)
if "astroSystem" not in buildArgs:
# even for a disastrous STC-X, make sure there's a AstroCoords
# with rudimentary space and time frames
buildArgs["astroSystem"] = dm.CoordSys(
timeFrame=dm.TimeFrame(refPos=dm.RefPos()),
spaceFrame=dm.SpaceFrame(refPos=dm.RefPos(),
flavor="SPHERICAL", nDim=2))
yield 'stcSpec', ((node.tag, dm.STCSpec(**buildArgs)),)
[docs]class IdProxy(common.ASTNode):
"""A stand-in for a coordinate system during parsing.
We do this to not depend on ids being located before positions. STC
should have that in general, but let's be defensive here.
"""
_a_idref = None
_a_useAttr = None
[docs] def resolve(self, idMap):
ob = idMap[self.idref]
if self.useAttr:
return getattr(ob, self.useAttr)
return ob
[docs]def resolveProxies(forest):
"""replaces IdProxies in the AST sequence forest with actual references.
"""
map = {}
for rootTag, ast in forest:
ast.buildIdMap()
map.update(ast.idMap)
for rootTag, ast in forest:
map[SIBLING_ASTRO_SYSTEM] = ast.astroSystem
for node in ast.iterNodes():
for attName, value in node.iterAttributes(skipEmpty=True):
if isinstance(value, IdProxy):
setattr(node, attName, value.resolve(map))
_2VEC_TAGS = [
"Value2", "Position2D", "Resolution2", "PixSize2", "Error2",
"Size2", "HiLimit2Vec", "LoLimit2Vec", "Position2VecInterval",
"Velocity2D",
"Velocity2VecInterval", "Circle", "Circle2", "Box", "Box2",
"Polygon", "Polygon2", "Union", "PositionInterval", "Ellipse",
"Union", "Intersection", "Difference", "Negation", "AllSky"]
_3VEC_TAGS = [
"Value3", "Position3D", "Resolution3", "PixSize3", "Error3",
"Size3", "HiLimit3Vec", "LoLimit3Vec", "Position3VecInterval",
"Velocity3D", "Velocity3VecInterval", "Convex"]
[docs]class STCXContext(object):
"""A parse context containing handlers, stacks, etc.
A special feature is that there are "context-active" tags. For those
the context gets notified by buildTree when their processing is started
or ended. We use this to note the active coordinate systems during, e.g.,
AstroCoords parsing.
"""
def __init__(self,
elementHandlers, activeTags, vecTags=getVecTags(), **kwargs):
self.sysIdStack = []
self.nDimStack = []
self.specialHandlerStack = [{}]
self.elementHandlers = elementHandlers
self.activeTags = activeTags
self.vecTags = vecTags
for k, v in kwargs.items():
setattr(self, k, v)
[docs] def getHandler(self, elementName):
"""returns a builder for the qName elementName, and the expected
dimension of child vectors.
If no such handler exists, we return None, None
"""
nD = self.vecTags.get(elementName)
if elementName in self.specialHandlerStack[-1]:
return self.specialHandlerStack[-1][elementName], nD
return self.elementHandlers.get(elementName), nD
[docs] def startTag(self, node):
self.activeTags[node.tag].start(self, node)
[docs] def endTag(self, node):
self.activeTags[node.tag].stop(self, node)
[docs] def pushDim(self, nDim):
self.nDimStack.append(nDim)
[docs] def popDim(self):
return self.nDimStack.pop()
[docs] def peekDim(self):
if not self.nDimStack:
# if we're not processing vectors, we're processing scalars
return 1
return self.nDimStack[-1]
_yieldErrUnits = _makeUnitYielder(_handledUnits, "error")
_yieldPSUnits = _makeUnitYielder(_handledUnits, "pixSize")
_yieldResUnits = _makeUnitYielder(_handledUnits, "resolution")
_yieldSzUnits = _makeUnitYielder(_handledUnits, "size")
# A sequence of tuples (dict builder, [stcxElementNames]) to handle
# STC-X elements by calling functions
_stcBuilders = [
(_buildElnameFloat, ["C1", "C2", "C3"]),
(_buildTime, ["ISOTime", "JDTime", "MJDTime"]),
(_buildVector, ["Value2", "Value3"]),
(_buildRefpos, common.stcRefPositions),
(_buildFlavor, common.stcCoordFlavors),
(_buildRefFrame, common.stcSpaceRefFrames),
(_makePositionBuilder('place', dm.SpaceCoo, "spaceFrame", tuplify=True),
["Position1D", "Position3D", "Position2D"]),
(_makePositionBuilder('velocity', dm.VelocityCoo, "spaceFrame",
tuplify=True),
["Velocity1D", "Velocity3D", "Velocity2D"]),
(_makeKwValuesBuilder("resolution", tuplify=True, units=_yieldResUnits),
["Resolution2", "Resolution3"]),
(_makeKwValuesBuilder("pixSize", tuplify=True, units=_yieldPSUnits),
["PixSize2", "PixSize3"]),
(_makeKwValuesBuilder("error", tuplify=True, units=_yieldErrUnits),
["Error2", "Error3"]),
(_makeKwValuesBuilder("size", tuplify=True, units=_yieldSzUnits),
["Size2", "Size3"]),
(_makeKwFloatBuilder("resolutionRadius", units=_yieldResUnits),
["Resolution2Radius", "Resolution3Radius"]),
(_makeKwFloatBuilder("pixSizeRadius", units=_yieldPSUnits),
["PixSize2Radius", "PixSize3Radius"]),
(_makeKwFloatBuilder("errorRadius", units=_yieldErrUnits),
["Error2Radius", "Error3Radius"]),
(_makeKwFloatBuilder("sizeRadius", units=_yieldSzUnits),
["Size2Radius", "Size3Radius"]),
(_makeKwValuesBuilder("resolutionMatrix", units=_yieldResUnits),
["Resolution2Matrix", "Resolution3Matrix"]),
(_makeKwValuesBuilder("pixSizeMatrix", units=_yieldSzUnits),
["PixSize2Matrix", "PixSize3Matrix"]),
(_makeKwValuesBuilder("errorMatrix", units=_yieldErrUnits),
["Error2Matrix", "Error3Matrix"]),
(_makeKwValuesBuilder("sizeMatrix", units=_yieldSzUnits),
["Size2Matrix", "Size3Matrix"]),
(_makeKwValuesBuilder("upperLimit", tuplify=True),
["HiLimit2Vec", "HiLimit3Vec"]),
(_makeKwValuesBuilder("lowerLimit", tuplify=True),
["LoLimit2Vec", "LoLimit3Vec"]),
(_makeIntervalBuilder("areas", dm.SpaceInterval, "spaceFrame", tuplify=True),
["PositionScalarInterval", "Position2VecInterval",
"Position3VecInterval"]),
(_makeIntervalBuilder("velocityAs", dm.VelocityInterval, "spaceFrame",
tuplify=True),
["VelocityScalarInterval", "Velocity2VecInterval",
"Velocity3VecInterval"]),
(_makeGeometryBuilder(dm.AllSky), ["AllSky", "AllSky2"]),
(_makeGeometryBuilder(dm.Circle, _adaptCircleUnits), ["Circle", "Circle2"]),
(_makeGeometryBuilder(dm.Ellipse, _adaptEllipseUnits), [
"Ellipse", "Ellipse2"]),
(_makeGeometryBuilder(dm.Box, _adaptBoxUnits), ["Box", "Box2"]),
(_makeGeometryBuilder(dm.Polygon), ["Polygon", "Polygon2"]),
(_makeGeometryBuilder(dm.Convex), ["Convex", "Convex2"]),
(_makeCompoundBuilder(dm.Union), ["Union", "Union2"]),
(_makeCompoundBuilder(dm.Intersection), ["Intersection", "Intersection2"]),
(_makeCompoundBuilder(dm.Difference), ["Difference", "Difference2"]),
(_makeCompoundBuilder(dm.Not), ["Negation", "Negation2"]),
(_buildToplevel, ["ObservatoryLocation", "ObservationLocation",
"STCResourceProfile", "STCSpec"]),
(_passthrough, ["ObsDataLocation", "AstroCoords", "TimeInstant",
"AstroCoordArea", "Position"]),
]
def _getHandlers():
handlers = {
_n("AstroCoordSystem"): _buildAstroCoordSystem,
_n("PlanetaryEphem"): _buildPlanetaryEphem,
_n("Error"): _makeKwFloatBuilder("error", units=_yieldErrUnits),
_n("PixSize"): _makeKwFloatBuilder("pixSize", units=_yieldPSUnits),
_n("Resolution"): _makeKwFloatBuilder("resolution", units=_yieldResUnits),
_n("Size"): _makeKwFloatBuilder("size", units=_yieldSzUnits),
_n("Redshift"): _makePositionBuilder('redshift', dm.RedshiftCoo,
"redshiftFrame"),
_n("Spectral"): _makePositionBuilder('freq',
dm.SpectralCoo, "spectralFrame"),
_n("StartTime"): _makeKwValueBuilder("lowerLimit"),
_n("StopTime"): _makeKwValueBuilder("upperLimit"),
_n("LoLimit"): _makeKwFloatBuilder("lowerLimit"),
_n("HiLimit"): _makeKwFloatBuilder("upperLimit"),
_n("Time"): _makePositionBuilder('time', dm.TimeCoo, "timeFrame"),
_n("Timescale"): _makeKeywordBuilder("timeScale"),
_n("TimeScale"): _makeKeywordBuilder("timeScale"),
_n("Equinox"): _makeKeywordBuilder("equinox"),
_n("Value"): _makeKwFloatBuilder("vals"),
_n("Epoch"): _buildEpoch,
_n("Radius"): _makeKwFloatBuilder("radius", multiple=False,
units=_makeUnitYielder(("pos_unit",), "radius")),
_n("Center"): _makeKwValueBuilder("center", tuplify=True,
units=_makeUnitYielder(("unit",))),
_n("SemiMajorAxis"): _makeKwFloatBuilder("smajAxis", multiple=False,
units=_makeUnitYielder(("pos_unit",), "smajAxis")),
_n("SemiMinorAxis"): _makeKwFloatBuilder("sminAxis", multiple=False,
units=_makeUnitYielder(("pos_unit",), "sminAxis")),
_n("PosAngle"): _makeKwFloatBuilder("posAngle", multiple=False,
units=_makeUnitYielder(("unit",), "posAngle")),
_n("Vertex"): _makeKwValuesBuilder("vertices", tuplify=True),
_n("Vector"): _makeKwValueBuilder("vector", tuplify=True),
_n("Offset"): _makeKwFloatBuilder("offset"),
_n("Halfspace"): _buildHalfspace,
_n('TimeFrame'): _makeFrameBuilder('timeFrame', dm.TimeFrame,
timeScale="TT"),
_n('SpaceFrame'): _makeFrameBuilder('spaceFrame', dm.SpaceFrame),
_n('SpectralFrame'): _makeFrameBuilder('spectralFrame', dm.SpectralFrame),
_n('RedshiftFrame'): _makeFrameBuilder('redshiftFrame', dm.RedshiftFrame),
_n("DopplerDefinition"): _makeKeywordBuilder("dopplerDef"),
_n("TimeInterval"): _makeIntervalBuilder("timeAs", dm.TimeInterval,
"timeFrame"),
_n("SpectralInterval"): _makeIntervalBuilder("freqAs",
dm.SpectralInterval, "spectralFrame"),
_n("RedshiftInterval"): _makeIntervalBuilder("redshiftAs",
dm.RedshiftInterval, "redshiftFrame"),
}
for builder, stcEls in _stcBuilders:
for el in stcEls:
handlers[_n(el)] = builder
return handlers
getHandlers = utils.CachedGetter(_getHandlers)
def _getActiveTags():
return {
_n("AstroCoords"): CooSysActions(),
_n("AstroCoordArea"): CooSysActions(),
_n("Box"): BoxActions(),
_n("Box2"): BoxActions(),
}
getActiveTags = utils.CachedGetter(_getActiveTags)
[docs]def buildTree(csNode, context):
"""traverses the ElementTree cst, trying handler functions for
each node.
The handler functions are taken from the context.elementHandler
dictionary that maps QNames to callables. These callables have
the signature handle(STCNode, context) -> iterator, where the
iterator returns key-value pairs for inclusion into the argument
dictionaries for STCNodes.
Unknown nodes are simply ignored. If you need to bail out on certain
nodes, raise explicit exceptions in handlers.
This also manages the expected number of dimensions in vectors by
pushing and popping from the context's dimension stack. The actual
wisdom on what elements set which dimensions is kept in the context
object as well.
"""
resDict = {}
handler, nDim = context.getHandler(csNode.tag)
# Elements with no handlers are ignored (add option to fail on these?)
if handler is None:
return
if nDim is not None:
context.pushDim(nDim)
# print(">>>>>>>>> %s %s"%(csNode.tag, context.peekDim()))
if csNode.tag in context.activeTags:
context.startTag(csNode)
# collect constructor keywords from child nodes
for child in csNode:
for res in buildTree(child, context):
if res is None: # ignored child
continue
k, v = res
if isinstance(v, (tuple, list)):
resDict[k] = resDict.get(k, ())+v
else:
if k in resDict:
raise common.STCInternalError("Attempt to overwrite key '%s', old"
" value %s, new value %s (this should probably have been"
" a tuple)"%(k, resDict[k], v))
resDict[k] = v
# collect constructor keywords from this node's handler
for res in handler(csNode, resDict, context):
yield res
if csNode.tag in context.activeTags:
context.endTag(csNode)
if nDim is not None:
context.popDim()
[docs]def parseFromETree(eTree):
"""returns a sequence of pairs (root element, AST) for eTree containing
parsed STC-X.
"""
context = STCXContext(
elementHandlers=getHandlers(),
activeTags=getActiveTags())
parsed = dict(buildTree(eTree, context))
if "stcSpec" not in parsed:
raise common.STCXBadError("No STC-X found in or below %r"%eTree)
forest = parsed["stcSpec"]
resolveProxies(forest)
return [(rootTag, ast.polish()) for rootTag, ast in forest]
[docs]def parseSTCX(stcxLiteral):
"""returns a sequence of pairs (root element, AST) for the STC-X
specifications in stcxLiteral.
"""
return parseFromETree(utils.ElementTree.fromstring(stcxLiteral))