"""
Code to parse structures from XML sources.
The purpose of much of the mess here is to symmetrized XML attributes
and values. Basically, we want start, value, end events whether
or not a piece of data comes in an element with a certain tag name or
via a named attribute.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import io
import re
from gavo import utils
from gavo.base import activetags
from gavo.base import common
from gavo.base import parsecontext
ALL_WHITESPACE = re.compile("\s*$")
[docs]class EventProcessor(object):
"""A dispatcher for parse events to structures.
It is constructed with the root structure of the result tree, either
as a type or as an instance.
After that, events can be fed to the feed method that makes sure
they are routed to the proper object.
"""
# The event processor distinguishes between parsing atoms (just one
# value) and structured data using the next attribute. If it is not
# None, the next value coming in will be turned to a "value" event
# on the current parser. If it is None, we hand through the event
# to the current structure.
debug = False
def __init__(self, rootStruct, ctx):
self.rootStruct = rootStruct
self.curParser, self.next = self, None
self.result, self.ctx = None, ctx
def _feedToAtom(self, type, name, value):
if type=='start':
raise common.StructureError("%s elements cannot have %s children"%(
self.next, name))
elif type=='value' or type=="parsedvalue":
# For atoms, you can only set content_ -- they have no attributes.
if name!="content_":
raise common.StructureError("%s is atomic and thus has"
" no attribute %s."%(self.next, name))
self.curParser.feedEvent(self.ctx, 'value', self.next, value)
elif type=='end':
self.next = None
def _feedToStructured(self, type, name, value):
next = self.curParser.feedEvent(self.ctx, type, name, value)
if isinstance(next, str):
self.next = next
else:
self.curParser = next
[docs] def feed(self, type, name, value=None):
"""feeds an event.
This is the main entry point for user calls.
"""
# Special handling for active tags: They may occur everywhere and
# thus are not not parsed by the element parsers but by us.
# Active tags may define ACTIVE_NOEXPAND to undo that behaviour
# (i.e., see active tag events themselves).
if (type=="start"
and activetags.isActive(name)
and not hasattr(self.curParser, "ACTIVE_NOEXPAND")):
self.curParser = activetags.getActiveTag(name)(self.curParser)
return
if self.next is None:
self._feedToStructured(type, name, value)
else:
self._feedToAtom(type, name, value)
[docs] def feedEvent(self, ctx, evType, name, value):
"""dispatches an event to the root structure.
Do not call this yourself unless you know what you're doing. The
method to feed "real" events to is feed.
"""
if name!=self.rootStruct.name_:
raise common.StructureError("Expected root element %s, found %s"%(
self.rootStruct.name_, name))
if evType=="start":
if isinstance(self.rootStruct, type):
self.result = self.rootStruct(None)
else:
self.result = self.rootStruct
self.result.idmap = ctx.idmap
ctx.setPositionOn(self.result)
return self.result
else:
raise common.StructureError("Bad document structure")
[docs] def setRoot(self, root):
"""artificially inserts an instantiated root element.
In particular, this bypasses any checks that the event stream coming
is is actually destined for root. Use this for replay-type things
(feedFrom, active tags) exclusively.
"""
self.result = root
self.curParser = root
self.result.idmap = self.ctx.idmap
[docs] def clone(self):
return EventProcessor(self.rootStruct, self.ctx)
def _synthesizeAttributeEvents(evProc, context, attrs):
"""generates value events for the attributes in attrs.
"""
# original attributes must be fed first since they will usually
# yield a different target object
original = attrs.pop("original", None)
if original:
evProc.feed("value", "original", original)
# mixins must be fed last as they might depend on stuff set
# in other attributes
mixin = attrs.pop("mixin", None)
for key, val in attrs.items():
evProc.feed("value", key, val)
if mixin:
evProc.feed("value", "mixin", mixin)
[docs]def feedTo(rootStruct, eventSource, context, feedInto=False):
"""feeds events from eventSource to rootStruct.
A new event processor is used for feeding. No context
exit functions are run.
The processed root structure is returned.
if feedInto is true, the event creating the root structure is not
expected (TODO: this is crap; fix it so that this is always the
case when rootStruct is an instance).
"""
evProc = EventProcessor(rootStruct, context)
if feedInto:
evProc.setRoot(rootStruct)
buf = []
try:
for type, name, payload in eventSource:
# buffer data
if type=="data":
buf.append(payload)
continue
else:
if buf:
res = "".join(buf)
if not ALL_WHITESPACE.match(res):
evProc.feed("value", "content_", res)
buf = []
# "normal" event feed
evProc.feed(type, name, payload)
# start event: Synthesize value events for attributes.
if type=="start" and payload:
_synthesizeAttributeEvents(evProc, context, payload)
payload = None
except Exception as ex:
if (not getattr(ex, "posInMsg", False)
and getattr(ex, "pos", None) is None):
# only add pos when the message string does not already have it.
ex.pos = eventSource.pos
raise
return evProc.result
[docs]def parseFromStream(rootStruct, inputStream, context=None):
"""parses a tree rooted in rootStruct from some file-like object inputStream.
It returns the root element of the resulting tree. If rootStruct is
a type subclass, it will be instantiated to create a root
element, if it is an instance, this instance will be the root.
"""
eventSource = utils.iterparse(inputStream)
if context is None:
context = parsecontext.ParseContext()
context.setEventSource(eventSource)
res = feedTo(rootStruct, eventSource, context)
context.runExitFuncs(res)
return res
[docs]def parseFromString(rootStruct, inputString, context=None):
"""parses a DaCHS RD tree rooted in ``rootStruct`` from a string.
It returns the root element of the resulting tree. You would use this like
this::
parseFromString(rscdef.Column, "<column name='foo'/>")
"""
return parseFromStream(rootStruct, io.StringIO(inputString), context)