"""
Some XML hacks.
StartEndHandler simplifies the creation of SAX parsers, intended for
client code or non-DC XML parsing.
iterparse is an elementtree-inspired thin expat layer; both VOTable
and base.structure parsing builds on it.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import collections
import weakref
import xml.sax
from xml.parsers import expat
from xml.sax.handler import ContentHandler
from gavo.utils import excs
from gavo.utils import misctricks
from gavo.utils import texttricks
[docs]class ErrorPosition(object):
"""A wrapper for an error position.
Construct it with file name, line number, and column. Use None
for missing or unknown values.
"""
fName = None
def __init__(self, fName, line, column):
self.line = line or '?'
self.col = column
if self.col is None:
self.col = '?'
self.fName = fName
def __str__(self):
if self.fName:
return "%s, (%s, %s)"%(self.fName, self.line, self.col)
else:
return "(%s, %s)"%(self.line, self.col)
[docs]class iterparse(object):
"""iterates over start, data, and end events in source.
To keep things simple downstream, we swallow all namespace prefixes,
if present.
iterparse is constructed with a source (anything that can read(source))
and optionally a custom error class. This error class needs to
have the message as the first argument. Since expat error messages
usually contain line number and column in them, no extra pos attribute
is supported.
Since the parser typically is far ahead of the events seen, we
do our own bookkeeping by storing the parser position with each
event. The *end* of the construct that caused an event can
be retrieved using pos.
"""
chunkSize = 2**20
"The number of bytes handed to expat from iterparse at one go."
def __init__(self, source, parseErrorClass=excs.StructureError):
self.source = source
self.parseErrorClass = parseErrorClass
if hasattr(source, "name"):
self.inputName = source.name
elif hasattr(source, "getvalue"):
self.inputName = texttricks.makeEllipsis("IO:'"
+texttricks.safe_str(source.getvalue()))+"'"
else:
self.inputName = texttricks.makeSourceEllipsis(source)
self.parser = expat.ParserCreate()
self.parser.buffer_text = True
self.lastLine, self.lastColumn = 1, 0
self.evBuf = collections.deque()
self.parser.StartElementHandler = self._startElement
self.parser.EndElementHandler = self._endElement
self.parser.CharacterDataHandler = self._characters
def __iter__(self):
return self
def _startElement(self, name, attrs):
self.evBuf.append(
(("start", name.split(":")[-1], attrs),
(self.parser.CurrentLineNumber, self.parser.CurrentColumnNumber)))
def _endElement(self, name):
self.evBuf.append((("end", name.split(":")[-1], None),
(self.parser.CurrentLineNumber, self.parser.CurrentColumnNumber)))
def _characters(self, data):
self.evBuf.append((("data", None, data), None))
[docs] def pushBack(self, type, name, payload):
self.evBuf.appendleft(((type, name, payload), None))
def __next__(self):
while not self.evBuf:
try:
nextChunk = self.source.read(self.chunkSize)
if nextChunk:
self.parser.Parse(nextChunk)
else:
self.close()
break
except expat.ExpatError as ex:
srcDesc = getattr(self.source, "name", "(internal source)")
newEx = self.parseErrorClass(srcDesc+" "+str(ex))
newEx.posInMsg = True # see base.xmlstruct
newEx.inFile = srcDesc
raise misctricks.logOldExc(newEx)
if not self.evBuf:
raise StopIteration("End of Input")
event, pos = self.evBuf.popleft()
if pos is not None:
self.lastLine, self.lastColumn = pos
return event
[docs] def close(self):
self.parser.Parse("", True)
self.parser.StartElementHandler =\
self.parser.EndElementHandler = \
self.parser.CharacterDataHandler = None
@property
def pos(self):
return ErrorPosition(self.inputName, self.lastLine, self.lastColumn)
[docs] def getParseError(self, msg):
res = self.parseErrorClass("At %s: %s"%(self.pos, msg))
res.posInMsg = True # see base.xmlstruct
return res
[docs]class StartEndHandler(ContentHandler):
"""This class provides startElement, endElement and characters
methods that translate events into method calls.
When an opening tag is seen, we look of a _start_<element name>
method and, if present, call it with the name and the attributes.
When a closing tag is seen, we try to call _end_<element name> with
name, attributes and contents. If the _end_xxx method returns a
string (or similar), this value will be added to the content of the
enclosing element.
Rather than overriding __init__, you probably want to override
the _initialize() method to create the data structures you want
to fill from XML.
StartEndHandlers clean element names from namespace prefixes, and
they ignore them in every other way. If you need namespaces, use
a different interface.
"""
def __init__(self):
ContentHandler.__init__(self)
self.realHandler = weakref.proxy(self)
self.elementStack = []
self.contentsStack = [[]]
self._initialize()
def _initialize(self):
pass
[docs] def processingInstruction(self, target, data):
self.contentsStack[-1].append(data)
[docs] def cleanupName(self, name):
return name.split(":")[-1].replace("-", "_")
[docs] def startElementNS(self, namePair, qName, attrs):
newAttrs = {}
for ns, name in list(attrs.keys()):
if ns is None:
newAttrs[name] = attrs[(ns, name)]
else:
newAttrs["{%s}%s"%(ns, name)] = attrs[(ns, name)]
self.startElement(namePair[1], newAttrs)
[docs] def startElement(self, name, attrs):
self.contentsStack.append([])
name = self.cleanupName(name)
self.elementStack.append((name, attrs))
if hasattr(self.realHandler, "_start_%s"%name):
getattr(self.realHandler, "_start_%s"%name)(name, attrs)
elif hasattr(self, "_defaultStart"):
self._defaultStart(name, attrs)
[docs] def endElementNS(self, namePair, qName):
self.endElement(namePair[1])
[docs] def endElement(self, name, suppress=False):
contents = "".join(self.contentsStack.pop())
name = self.cleanupName(name)
_, attrs = self.elementStack.pop()
res = None
if hasattr(self.realHandler, "_end_%s"%name):
res = getattr(self.realHandler,
"_end_%s"%name)(name, attrs, contents)
elif hasattr(self, "_defaultEnd"):
res = self._defaultEnd(name, attrs, contents)
if isinstance(res, str) and not suppress:
self.contentsStack[-1].append(res)
[docs] def characters(self, chars):
self.contentsStack[-1].append(chars)
[docs] def getResult(self):
return self.contentsStack[0][0]
[docs] def getParentTag(self, depth=1):
"""Returns the name of the parent element.
This only works as written here in end handlers. In start handlers,
you have to path depth=2 (since their tag already is on the stack.
"""
if self.elementStack:
return self.elementStack[-depth][0]
[docs] def parse(self, stream):
xml.sax.parse(stream, self)
return self
[docs] def parseString(self, string):
xml.sax.parseString(string, self)
return self
# xml.sax is smart enough to do the right thing when it gets passed bytes.
parseBytes = parseString
[docs] def getAttrsAsDict(self, attrs):
"""returns attrs as received from SAX as a dictionary.
The main selling point is that any namespace prefixes are removed from
the attribute names. Any prefixes on attrs remain, though.
"""
return dict((k.split(":")[-1], v) for k, v in list(attrs.items()))
[docs] def setDocumentLocator(self, locator):
self.locator = locator
[docs]def traverseETree(eTree):
"""iterates the elements of an elementTree in postorder.
"""
for child in eTree:
for gc in traverseETree(child):
yield gc
yield eTree