Parsing and generating STC-S
The general plan is to parse STC-S into some sort of tree (dictionaries
with list values, possibly containing more such dictionaries). These
trees can then be processed into something roughly resembling the data
model, furnished with defaults, and processed by what essentially is
user code.
Extensions to what the note says:
- After flavor, you can add an epoch using something like "Epoch J2000.0".
- After the FK5, FK4 and ECLIPTIC frame specs, you can add an optional
astroYear (Bnnnn, Jnnnn) designating a custom equinox.
- There is a system subphrase that lets you specify a system from the
STC library (without the ivo:// decoration). It starts with System
and is specified last. It will override all other system specifications.
- If enabled, you can use identifiers in double quotes wherever values
are allowed; this will generate column references.
- After the reference position, you can optionally mention the planetary
ephemeris used; currently, only JPL-DE200 and JPL-DE405 are allowed.
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import itertools
from gavo import utils
from gavo.stc import common
from gavo.stc import stcsdefaults
from gavo.stc import times
from gavo.utils.parsetricks import (
Word, Literal, Optional, alphas, CaselessKeyword,
ZeroOrMore, OneOrMore, StringEnd,
Suppress, Forward,
Regex, alphanums,
ParseException, ParseResults,
[docs]class AComputedDefault(object):
"""A sentinel for computed default values.
# STC-S spatial flavors, with dimensions and stc flavors
stcsFlavors = {
"CART1": (1, "CARTESIAN"),
"CART2": (2, "CARTESIAN"),
"CART3": (3, "CARTESIAN"),
spatialUnits = set(["deg", "arcmin", "arcsec", "m", "mm", "km", "AU",
"pc", "kpc", "Mpc", "rad"])
temporalUnits = set(["yr", "cy", "s", "d", "a"])
spectralUnits = set(["MHz", "GHz", "Hz", "Angstrom", "keV", "MeV",
"eV", "mm", "um", "nm", "m"])
def _assertGrammar(cond, msg, pos):
if not cond:
raise common.STCSParseError(msg, pos)
def _iterDictNode(node, path):
"""does iterNode's work for dict nodes.
for k, v in node.items():
if isinstance(v, list):
subIter = _iterListNode(v, path+(k,))
elif isinstance(v, dict):
subIter = _iterDictNode(v, path+(k,))
continue # content does not contain a subtree
for res in subIter:
yield res
yield path, node
def _iterListNode(node, path):
"""does iterNode's work for list nodes.
for subNode in node:
if isinstance(subNode, dict):
for res in _iterDictNode(subNode, path):
yield res
[docs]def iterNodes(tree):
"""traverses the concrete syntax tree in postorder, returning pairs of
paths and nodes.
A node returned here is always a dictionary. The path consists of the
keys leading to the node in a tuple.
if isinstance(tree, list):
return _iterListNode(tree, ())
elif isinstance(tree, dict):
return _iterDictNode(tree, ())
raise common.STCInternalError("Bad node in tree %s"%tree)
[docs]def addDefaults(tree):
"""adds defaults for missing values for a concrete syntax tree.
The tree is changed in place. For details, see stcsdefaults.
for path, node in iterNodes(tree):
if path and path[-1] in stcsdefaults.defaultingFunctions:
return tree
[docs]def removeDefaults(tree):
"""removes defaults from a concrete syntax tree.
The tree is changed in place. For details, see stcsdefaults.
for path, node in iterNodes(tree):
if path and path[-1] in stcsdefaults.undefaultingFunctions:
return tree
[docs]def makeTree(parseResult):
"""returns the pyparsing parseResult as a data structure consisting
of simple python dicts and lists.
The "tree" has two kinds of nodes: Dictionaries having lists as
values, and lists containing (as a rule) literals or (for more deeply
nested constructs, which are rare in STC-S) other dictionaries of
this kind.
A parse node becomes a dict node if it has named children. The root
always is a dict.
Note that unnamed children of nodes becoming dicts will be lost in
the result.
if not len(parseResult): # empty parse results become Nones
res = None
elif list(parseResult.keys()): # named children, generate a dict
res = {}
for k in list(parseResult.keys()):
v = parseResult[k]
# discard empty branches
if isinstance(v, ParseResults):
v = makeTree(v)
if v is not None: # discard empty branches
res[k] = v
else: # no named children, generate a list
if isinstance(parseResult[0], ParseResults):
res = [makeTree(child) for child in parseResult]
res = list(parseResult)
return res
def _reFromKeys(iterable):
"""returns a regular expression matching any of the strings in iterable.
The trick is that the longest keys must come first.
return "|".join(sorted(iterable, key=lambda x:-len(x)))
def _makeSymDict(locals, exportAll):
"""returns a dictionary of pyparsing symbols defined in the locals.
locals would be the value locals() as a rule.
syms = dict((n, v) for n, v in locals.items()
if hasattr(v, "setName"))
if not exportAll:
syms = dict((n, v) for n, v in syms.items()
if not n.startswith("_"))
return syms
def _stringifyBlank(s, p, t):
"""a parse action joining items in parse results with blanks.
return " ".join(t)
def _stringify(s, p, t):
"""a parse action joining items in parse results.
return "".join(t)
def _makeSingle(s, p, t):
"""a parse action that returns the first item of the tokens.
You typically want this when you know there's only one token, e.g.,
on Disjunctions or such
return t[0]
def _unpackTime(s, p, t):
"""unravels on level of list from t["coos"] or t["pos"].
We need this because of the way setResultsName works since pyparsing 2.4.
if "coos" in t:
coos = t.pop("coos")
for r in itertools.chain(*coos):
t["coos"] = r
if "pos" in t:
coos = t.pop("pos")
for r in itertools.chain(*coos):
t["pos"] = r
return None # (changes in place)
def _getSTCSGrammar(numberLiteral,
"""returns a dictionary of symbols for a grammar parsing STC-S into
a concrete syntax tree.
numberLiteral and timeLiteral are pyparsing symbols for numbers and
datetimes, respectively.
_addGeoReferences lets you write quoted references to vectors
(like Circle "center" 20.).
with pyparsingWhitechars("\n\t\r "):
number = numberLiteral
del numberLiteral
# units
_unitOpener = Suppress( CaselessKeyword("unit") )
_spaceUnitWord = Regex(_reFromKeys(spatialUnits))
_timeUnitWord = Regex(_reFromKeys(temporalUnits))
spaceUnit = _unitOpener - OneOrMore( _spaceUnitWord ).addParseAction(
timeUnit = _unitOpener - _timeUnitWord("unit")
spectralUnit = _unitOpener - Regex(_reFromKeys(spectralUnits))("unit")
redshiftUnit = _unitOpener - (
(_spaceUnitWord + "/" + _timeUnitWord).addParseAction(_stringify)
| CaselessKeyword("nil"))("unit")
velocityUnit = _unitOpener - (OneOrMore(
(_spaceUnitWord + "/" + _timeUnitWord).addParseAction(_stringify)
# basic productions common to most STC-S subphrases
literalAstroYear = Regex("[BJ][0-9]+([.][0-9]*)?")
if _astroYearOverride:
astroYear = _astroYearOverride
astroYear = literalAstroYear
fillfactor = (Suppress( CaselessKeyword("fillfactor")
) + number("fillfactor"))
noEqFrame = (CaselessKeyword("J2000")
| CaselessKeyword("B1950")
| CaselessKeyword("ICRS")
| CaselessKeyword("GALACTIC")
| CaselessKeyword("GALACTIC_I")
| CaselessKeyword("GALACTIC_II")
| CaselessKeyword("SUPER_GALACTIC")
| CaselessKeyword("GEO_C")
| CaselessKeyword("GEO_D")
| CaselessKeyword("HPR")
| CaselessKeyword("HGS")
| CaselessKeyword("HGC")
| CaselessKeyword("HPC")
| CaselessKeyword("UNKNOWNFrame"))("frame")
eqFrameName = (CaselessKeyword("FK5")
| CaselessKeyword("FK4")
| CaselessKeyword("ECLIPTIC"))("frame")
eqFrame = eqFrameName + Optional( literalAstroYear("equinox") )
frame = eqFrame | noEqFrame
plEphemeris = CaselessKeyword("JPL-DE200") | CaselessKeyword("JPL-DE405")
refpos = ((Regex(_reFromKeys(common.stcRefPositions)))("refpos")
+ Optional( plEphemeris("plEphemeris") ))
flavor = (Regex(_reFromKeys(stcsFlavors)))("flavor")
# properties of coordinates
error = Suppress( CaselessKeyword("Error") ) + OneOrMore( number )
resolution = Suppress( CaselessKeyword("Resolution")
) + OneOrMore( number )
size = Suppress( CaselessKeyword("Size") ) + OneOrMore(number)
pixSize = Suppress( CaselessKeyword("PixSize") ) + OneOrMore(number)
cooProps = (Optional(error("error"))
+ Optional(resolution("resolution"))
+ Optional(size("size"))
+ Optional(pixSize("pixSize")))
# properties of most spatial specs
_coos = ZeroOrMore( number )("coos")
_pos = Optional( ZeroOrMore( number )("pos") )
if _addGeoReferences: # include references to vectors, for getColrefSymbols
complexColRef = Regex(r'\[[A-Za-z_][A-Za-z_0-9]*\]').addParseAction(
lambda s,p,toks: common.GeometryColRef(toks[0][1:-1]))
_coos = complexColRef("coos") | _coos
_pos = complexColRef("pos") | _pos
positionSpec = Suppress( CaselessKeyword("Position") ) + _pos
epochSpec = Suppress( CaselessKeyword("Epoch") ) - astroYear
_spatialProps = Optional(spaceUnit) + cooProps
velocitySpec = (CaselessKeyword("Velocity")("type")
+ OneOrMore( number )("pos"))
velocityInterval = (
+ Optional( fillfactor )
+ _coos )
+ Optional(velocitySpec)
+ Optional(velocityUnit)
+ cooProps).addParseAction(makeTree)
_spatialTail = (_spatialProps +
Optional( velocityInterval)("velocity"))
_regionTail = Optional( positionSpec ) + _spatialTail
_commonSpaceItems = ( frame + Optional( refpos ) +
Optional( flavor ) + Optional(
epochSpec("epoch").addParseAction(lambda s,p,t: t[0])))
_commonRegionItems = Optional( fillfactor ) + _commonSpaceItems
# times and time intervals
timescale = (Regex("|".join(common.stcTimeScales)))("timescale")
timephrase = Suppress( CaselessKeyword("Time") ) + timeLiteral
_commonTimeItems = Optional(timeUnit) + cooProps
_intervalOpener = ( Optional( fillfactor ) +
Optional( timescale("timescale") ) +
Optional( refpos ) )
_intervalCloser = (Optional(timephrase.setResultsName("pos", True))
+ _commonTimeItems)
timeInterval = (CaselessKeyword("TimeInterval")("type") +
_intervalOpener + ZeroOrMore(timeLiteral.setResultsName("coos", True))
+ _intervalCloser).addParseAction(_unpackTime)
startTime = (CaselessKeyword("StartTime")("type") + _intervalOpener +
timeLiteral.setResultsName("coos", True) + _intervalCloser
stopTime = (CaselessKeyword("StopTime")("type") + _intervalOpener +
timeLiteral.setResultsName("coos", True) + _intervalCloser
time = (CaselessKeyword("Time")("type")
+ Optional(timescale("timescale"))
+ Optional(refpos) + Optional(
timeLiteral.setResultsName("pos", True).addParseAction(_unpackTime))
+ _commonTimeItems)
timeSubPhrase = (timeInterval
| startTime
| stopTime
| time).addParseAction(makeTree)
# atomic "geometries"; I do not bother to specify their actual
# arguments since, without knowing the frame, they may be basically
# anything. Also, I want to allow geometry column references.
_atomicGeometryKey = ( CaselessKeyword("AllSky").setName("sub-geometry")
| CaselessKeyword("Circle")
| CaselessKeyword("Ellipse")
| CaselessKeyword("Box")
| CaselessKeyword("Polygon")
| CaselessKeyword("Convex")
| CaselessKeyword("PositionInterval") )
atomicGeometry = ( _atomicGeometryKey("type")
+ _commonRegionItems
+ _coos
+ _regionTail )
# compound "geometries"
_compoundGeoExpression = Forward()
_compoundGeoOperand = (( _atomicGeometryKey("subtype") + _coos )
| _compoundGeoExpression ).addParseAction(lambda s,p,t: dict(t))
_compoundGeoOperatorUnary = CaselessKeyword("Not")
_compoundGeoOperandsUnary = ( Suppress( '(' )
+ _compoundGeoOperand + Suppress( ')' ) )
_compoundGeoExprUnary = ( _compoundGeoOperatorUnary("subtype")
+ _compoundGeoOperandsUnary("children") )
_compoundGeoOperatorBinary = CaselessKeyword("Difference")
_compoundGeoOperandsBinary = ( Suppress( '(' )
+ _compoundGeoOperand + _compoundGeoOperand + Suppress( ')' ) )
_compoundGeoExprBinary = ( _compoundGeoOperatorBinary("subtype")
+ _compoundGeoOperandsBinary("children") )
_compoundGeoOperatorNary = ( CaselessKeyword("Union")
| CaselessKeyword("Intersection") )
_compoundGeoOperandsNary = ( Suppress( '(' )
+ _compoundGeoOperand + _compoundGeoOperand
+ ZeroOrMore( _compoundGeoOperand ) + Suppress( ')' ) )
_compoundGeoExprNary = ( _compoundGeoOperatorNary("subtype")
+ _compoundGeoOperandsNary("children") )
_compoundGeoExpression << ( _compoundGeoExprUnary
| _compoundGeoExprBinary
| _compoundGeoExprNary )
compoundGeoPhrase = ( _compoundGeoOperatorUnary("type")
+ _commonRegionItems
+ _compoundGeoOperandsUnary("children") + _regionTail
| _compoundGeoOperatorBinary("type")
+ _commonRegionItems
+ _compoundGeoOperandsBinary("children") + _regionTail
| _compoundGeoOperatorNary("type")
+ _commonRegionItems
- _compoundGeoOperandsNary("children") + _regionTail )
# space subphrase
positionInterval = ( CaselessKeyword("PositionInterval")("type")
+ _commonRegionItems
+ _coos
+ _regionTail )
position = ( CaselessKeyword("Position")("type")
+ _commonSpaceItems
+ _pos
+ _spatialTail )
spaceSubPhrase = ( positionInterval
| position
| atomicGeometry
| compoundGeoPhrase ).addParseAction(makeTree)
# spectral subphrase
spectralSpec = (Suppress( CaselessKeyword("Spectral") )
+ number)("pos")
_spectralTail = Optional(spectralUnit) + cooProps
spectralInterval = (CaselessKeyword("SpectralInterval")("type")
+ Optional( fillfactor )
+ Optional( refpos )
+ _coos
+ Optional( spectralSpec )
+ _spectralTail)
spectral = (CaselessKeyword("Spectral")("type")
+ Optional( refpos )
+ _pos
+ _spectralTail)
spectralSubPhrase = (spectralInterval | spectral ).addParseAction(
# redshift subphrase
redshiftType = Regex("VELOCITY|REDSHIFT")("redshiftType")
redshiftSpec = (Suppress( CaselessKeyword("Redshift") ) + number)("pos")
dopplerdef = Regex("OPTICAL|RADIO|RELATIVISTIC")("dopplerdef")
_redshiftTail = Optional(redshiftUnit) + cooProps
redshiftInterval = (CaselessKeyword("RedshiftInterval")("type")
+ Optional(fillfactor)
+ Optional(refpos)
+ Optional(redshiftType)
+ Optional(dopplerdef)
+ _coos
+ Optional(redshiftSpec)
+ _redshiftTail)
redshift = (CaselessKeyword("Redshift")("type")
+ Optional(refpos)
+ Optional(redshiftType)
+ Optional(dopplerdef)
+ _pos
+ _redshiftTail)
redshiftSubPhrase = (redshiftInterval | redshift).addParseAction(
# system subphrase (extension, see docs)
# ids match Name from XML spec; we're not doing char refs and similar here
xmlName = Word(alphas+"_:", alphanums+'.-_:').addParseAction(_stringify)
systemDefinition = (Suppress( CaselessKeyword("System") )
+ xmlName("libSystem"))
# top level
stcsPhrase = ( #noflake: stcsPhrase is returned through locals()
Optional( timeSubPhrase )("time") +
Optional( spaceSubPhrase )("space") +
Optional( spectralSubPhrase )("spectral") +
Optional( redshiftSubPhrase )("redshift") +
Optional( systemDefinition ) ) + StringEnd()
return _makeSymDict(locals(), _exportAll)
[docs]def getSymbols(_exportAll=False, _colrefLiteral=None,
"""returns an STC-S grammar with terminal values.
with pyparsingWhitechars("\n\t\r "):
_exactNumericRE = r"[+-]?\d+(\.(\d+)?)?|[+-]?\.\d+"
exactNumericLiteral = Regex(_exactNumericRE)
numberLiteral = Regex(r"(?i)(%s)(E[+-]?\d+)?"%_exactNumericRE
).addParseAction(lambda s,p,toks: float(toks[0]))
jdLiteral = (Suppress( Literal("JD") ) + exactNumericLiteral
).addParseAction(lambda s,p,toks: times.jdnToDateTime(float(toks[0])))
mjdLiteral = (Suppress( Literal("MJD") ) + exactNumericLiteral
).addParseAction(lambda s,p,toks: times.mjdToDateTime(float(toks[0])))
isoTimeLiteral = Regex(r"\d\d\d\d-?\d\d-?\d\d(T\d\d:?\d\d:?\d\d(\.\d*)?Z?)?"
).addParseAction(lambda s,p,toks: times.parseISODT(toks[0]))
timeLiteral = (isoTimeLiteral | jdLiteral | mjdLiteral)
astroYear = Regex("[BJ][0-9]+([.][0-9]*)?")
if _colrefLiteral:
numberLiteral = _colrefLiteral ^ numberLiteral
timeLiteral = _colrefLiteral ^ timeLiteral
astroYear = _colrefLiteral ^ astroYear
res = _getSTCSGrammar(numberLiteral,
res.update(_makeSymDict(locals(), _exportAll))
return res
[docs]def getColrefSymbols():
"""returns an STC-S grammar with column references as values.
The column references used here have the form "<colref>" to cut down
on ambiguities. We only accept simple identifiers (i.e., not quoted in
the SQL sense), though.
def makeColRef(s, p, toks):
return common.ColRef(toks[0][1:-1])
with pyparsingWhitechars("\n\t\r "):
atomicColRef = Regex('"[A-Za-z_][A-Za-z_0-9]*"').addParseAction(
return getSymbols(_colrefLiteral=atomicColRef, _addGeoReferences=True)
[docs]def enableDebug(syms, debugNames=None):
if not debugNames:
debugNames = syms
for name in debugNames:
ob = syms[name]
getGrammar = utils.CachedGetter(getSymbols)
getColrefGrammar = utils.CachedGetter(getColrefSymbols)
[docs]def getCST(literal, grammarFactory=None):
"""returns a CST for an STC-S expression.
grammarFactory is a function returning the grammar, in this case
either getGrammar (which gets used if the argument is left out) or
# special case: the empty input yields an empty CST
if not literal.strip():
return {}
if grammarFactory is None:
grammarFactory = getGrammar
tree = makeTree(utils.pyparseString(
grammarFactory()["stcsPhrase"], literal))
except (ParseException, ParseSyntaxException) as ex:
raise common.STCSParseError(
"Invalid STCS expression (%s at %s)"%(ex.msg, ex.loc),
expr=literal, pos=ex.loc)
return tree
if __name__=="__main__":
import pprint
syms = getColrefSymbols()
"Position ICRS Epoch J2000.0 20 21"
, parseAll=True)))