"""
Stream parsing of VOTables.
This module builds on a shallow wrapping of expat in utils.iterparse.
There is an "almost-tight" parsing loop in the parse method. It
builds an xmlstan tree (mainly through the _processNodeDefault method).
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
# To fiddle with the nodes as they are generated, define an
# _end_ELEMENTNAME method. If you do this, you will have to do
# any adding of children to parents yourself (it happens in
# _processNodeDefault, which is called when no custom handler is
# present.
import io
from gavo import utils
from gavo.utils import ElementTree
from gavo.votable import common
from gavo.votable import model
from gavo.votable import tableparser
DEFAULT_WATCHSET = []
# We treat all VOTable versions as equal.
VOTABLE_NAMESPACES = [
"http://www.ivoa.net/xml/VOTable/v1.0",
"http://www.ivoa.net/xml/VOTable/v1.1",
"http://www.ivoa.net/xml/VOTable/v1.2",
"http://www.ivoa.net/xml/VOTable/v1.3",
]
[docs]class IGNORE(object):
"""this is a sentinel element used when an element is not known
but robust parsing is requested.
These should not end up in a DOM, but if they do, they're silent.
They're designed to largely behave like stanxml Elements; it can't
autoconstruct, though.
"""
def __init__(self):
pass
def __call__(self, **kwargs):
return self
def __getitem__(self, item):
pass
[docs] def isEmpty(self):
return True
[docs] def shouldBeSkipped(self):
return True
[docs] def apply(self, func):
return
def _processNodeDefault(text, child, parent):
"""the default node processor: Append child to parent, return child.
"""
assert not (text and text.strip()), (
"Content '%s' in must-empty VOTable element %s"%(text, repr(child)))
parent[child]
return child
def _processNodeWithContent(text, child, parent):
"""the node processor for nodes with text content.
"""
if text and text.strip():
child[text] # Attention: mixed content not supported
parent[child]
return child
_end_DESCRIPTION = _processNodeWithContent
_end_INFO = _processNodeWithContent
_end_MODEL = _processNodeWithContent
_end_URL = _processNodeWithContent
_end_LITERAL = _processNodeWithContent
_end_NAME = _processNodeWithContent
# STREAMs and TABLEDATA should ordinarily be processed by the table
# iterator, so this really is only interesting for special applications:
_end_STREAM = _processNodeWithContent
_end_TD = _processNodeWithContent
_end_IDREF = _processNodeWithContent
_end_LITERAL = _processNodeWithContent
def _end_VOTABLE(text, child, parent):
# VOTABLEs have no useful parents.
return child
def _computeEndProcessorsImpl():
"""returns a dictionary of tag names to end processors.
Each processor as defined using _end_XXXX has an entry each for
each namespace we're likely to encounter, and one non-namespaced.
"""
res, globs = {}, globals()
for n, v in globs.items():
if n.startswith("_end_"):
elName = n[5:]
res[elName] = v
for ns in VOTABLE_NAMESPACES:
res["%s:%s"%(ns, elName)] = v
return res
computeEndProcessors = utils.CachedGetter(_computeEndProcessorsImpl)
def _computeElementsImpl():
"""returns a dictionary of tag names to xmlstan elements building them.
All elements are present for each VOTABLE_NAMESPACE, plus once non-namespaced.
"""
res = {}
for n in dir(model.VOTable):
if not n.startswith("_"):
val = getattr(model.VOTable, n)
res[n] = val
for ns in VOTABLE_NAMESPACES:
res[ElementTree.QName(ns, n)] = val
return res
computeElements = utils.CachedGetter(_computeElementsImpl)
def _cleanAttributes(attrDict, element, raiseOnInvalid):
"""returns a sanitised version of attDict for element.
We force attribute keys to be byte strings (since they're being used
as keyword arguments), and we drop everything that's namespace related
-- it's not necessary for VOTables and people mess it up anyway.
Also, we complain about or filter out attributes that element
cannot deal with.
"""
cleaned = {}
for key, value in attrDict.items():
if ":" in key or key=="xmlns":
continue
key = str(key.replace("-", "_"))
if not hasattr(element, "_a_"+key):
if raiseOnInvalid:
raise KeyError(key)
else:
continue
cleaned[key] = value
return cleaned
[docs]def parse(inFile, watchset=DEFAULT_WATCHSET, raiseOnInvalid=True):
"""returns an iterator yielding items of interest.
inFile is a something that supports read(bytes)
watchset is a sequence of items of VOTable you want yielded. By
default, that's just VOTable.TABLE. You may want to see INFO
or PARAM of certain protocols.
"""
# This parser has gotten a bit too fat. Maybe move the whole thing
# to a class? All this isn't terribly critical to performance...
watchset = set(watchset)
idmap = {}
processors = computeEndProcessors()
elements = computeElements()
elementStack = [None] # None is VOTABLE's parent
iterator = utils.iterparse(inFile, common.VOTableParseError)
content = []
for type, tag, payload in iterator:
if type=="data":
content.append(payload)
elif type=="start":
# Element open: push new node on the stack...
if tag not in elements:
if raiseOnInvalid:
raise iterator.getParseError("Unknown tag: %s"%tag)
else:
element = IGNORE()
else:
element = elements[tag]()
if payload:
try:
payload = _cleanAttributes(payload, element, raiseOnInvalid)
except KeyError as msg:
raise iterator.getParseError("Attribute %s invalid on %s"%(
str(msg), element.name_))
elementStack.append(element(**payload))
# ...prepare for new content,...
content = []
# ...add the node to the id map if it has an ID...
elId = payload.get("ID")
if elId is not None:
idmap[elId] = elementStack[-1]
# ...and pass control to special iterator if DATA is coming in.
if tag=="DATA":
yield tableparser.Rows(elementStack[-2], iterator)
elif type=="end":
# Element close: process text content...
if content:
text = "".join(content)
content = []
else:
text = None
# ...see if we have any special procssing to do for the node type...
nodeProc = processors.get(tag, _processNodeDefault)
preChild = elementStack.pop()
if not isinstance(preChild, IGNORE):
# ...call handler with the current node and its future parent...
child = nodeProc(text, preChild, elementStack[-1])
# ...and let user do something with the element if she ordered it.
if child is not None and child.__class__ in watchset:
child.idmap = idmap
yield child
else:
assert False
[docs]def readRaw(inFile):
"""returns a V.VOTABLE instance with filled-in data for the input from
inFile.
"""
for el in parse(inFile, [model.VOTable.TABLE, model.VOTable.VOTABLE]):
if isinstance(el, tableparser.Rows):
el.tableDefinition.rows = list(el)
return el
[docs]def parseBytes(string, watchset=DEFAULT_WATCHSET, raiseOnInvalid=True):
"""returns an iterator yielding pairs of (table definition, row iterator).
string contains a VOTable literal as bytes (where we fall back to utf-8
encoding strings if necessary).
"""
return parse(io.BytesIO(utils.bytify(string)), watchset, raiseOnInvalid)