"""
Representation of structured data deserializable from XML.
We want all the managed attribute stuff since the main user input comes
from resource descriptors, and we want relatively strong input validation
here. Also, lots of fancy copying and crazy cross-referencing is
going on in our resource definitions, so we want a certain amount of
rigorous structure. Finally, a monolithic parser for that stuff
becomes *really* huge and tedious, so I want to keep the XML parsing
information in the constructed objects themselves.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from gavo import utils
from gavo.base import attrdef
from gavo.base import common
from gavo.base import parsecontext
[docs]def sortAttrs(attrSeq):
"""evaluates the before attributes on the AttributeDefs in attrsSeq
and returns a sequence satisfying them.
It returns a reference to attrSeq for convenience.
"""
beforeGraph, prependMeta = [], False
for att in attrSeq:
if att.before:
beforeGraph.append((att.name_, att.before))
if att.name_=="meta_":
prependMeta = True
if beforeGraph:
attDict = dict((a.name_, a) for a in attrSeq)
sortedNames = utils.topoSort(beforeGraph)
# Hack: metadata always comes first
if prependMeta:
sortedNames[:0] = ["meta_"]
sortedAtts = [attDict[n] for n in sortedNames]
attrSeq = sortedAtts+list(set(attrSeq)-set(sortedAtts))
return attrSeq
[docs]class StructType(type):
"""A metaclass for the representation of structured data.
StructType classes with this will be called structures within
the DC software.
Structures do quite a bit of the managed attribute nonsense to
meaningfully catch crazy user input.
Basically, you give a Structure class attributes (preferably with
underscores in front) specifying the attributes the instances
should have and how they should be handled.
Structures must be constructed with a parent (for the root
element, this is None). All other arguments should be keyword
arguments. If given, they have to refer to existing attributes,
and their values will directly give the the values of the
attribute (i.e., parsed values).
Structures should always inherit from StructBase below and
arrange for its constructor to be called, since, e.g., default
processing happens there.
Structures have a managedAttrs dictionary containing names and
attrdef.AttributeDef objects for the defined attributes.
"""
def __init__(cls, name, bases, dict):
type.__init__(cls, name, bases, dict)
cls._collectManagedAttrs()
cls._insertAttrMethods()
def _collectManagedAttrs(cls):
"""collects a dictionary of managed attributes in managedAttrs.
"""
managedAttrs, completedCallbacks, attrSeq = {}, [], []
for name in dir(cls):
if not hasattr(cls, name):
continue
val = getattr(cls, name)
if isinstance(val, attrdef.AttributeDef):
managedAttrs[val.name_] = val
attrSeq.append(val)
if hasattr(val, "xmlName_"):
managedAttrs[val.xmlName_] = val
if val.aliases:
for alias in val.aliases:
managedAttrs[alias] = val
cls.attrSeq = sortAttrs(attrSeq)
cls.managedAttrs = managedAttrs
cls.completedCallbacks = completedCallbacks
def _insertAttrMethods(cls):
"""adds methods defined by cls's managedAttrs for the parent to
cls.
"""
for val in set(cls.managedAttrs.values()):
for name, meth in val.iterParentMethods():
setattr(cls, name, meth)
[docs]class DataContent(attrdef.UnicodeAttribute):
"""A magic attribute that allows character content to be added to
a structure.
You can configure it with all the arguments available for UnicodeAttribute.
Since parsers may call characters with an empty string for
empty elements, the empty string will not be fed (i.e., the default
will be preserved). This makes setting an empty string as an element content
impossible (you could use DataContent with strip=True, though), but that's
probably not a problem.
"""
typeDesc_ = "string"
def __init__(self, default="",
description="Undocumented", **kwargs):
attrdef.UnicodeAttribute.__init__(self, "content_", default=default,
description=description, **kwargs)
[docs] def feed(self, ctx, instance, value):
if value=='':
return
return attrdef.UnicodeAttribute.feed(self, ctx, instance, value)
[docs] def makeUserDoc(self):
return ("Character content of the element (defaulting to %s) -- %s"%(
repr(self.default_), self.description_))
[docs]class StructureBase(object, metaclass=StructType):
"""is a base class for all structures.
You must arrange for calling its constructor from classes inheriting
this.
The constructor receives a parent (another structure, or None)
and keyword arguments containing values for actual attributes
(which will be set without any intervening consultation of the
AttributeDef).
The attribute definitions talking about structures let you
set parent to None when constructing default values; they will
then insert the actual parent.
"""
name_ = attrdef.Undefined
_id = parsecontext.IdAttribute("id",
description="Node identity for referencing")
# the following is managed by setPosition/getSourcePosition
__fName = __lineNumber = None
def __init__(self, parent, **kwargs):
self.setParent(parent)
# set defaults
for val in self.attrSeq:
try:
if not hasattr(self, val.name_): # don't clobber properties
# set up by attributes.
setattr(self, val.name_, val.default_)
except AttributeError: # default on property given
raise utils.logOldExc(common.StructureError(
"%s attributes on %s have builtin defaults only."%(
val.name_, self.name_)))
# set keyword arguments
for name, val in kwargs.items():
if name in self.managedAttrs:
if not hasattr(self.managedAttrs[name], "computed_"):
self.managedAttrs[name].feedObject(self, val)
else:
raise common.StructureError("%s objects have no attribute %s"%(
self.__class__.__name__, name))
def _nop(self, *args, **kwargs):
pass
[docs] def setParent(self, parent):
"""sets the parent of a Structure.
This is a method mainly to let individual elements override the
behaviour.
"""
self.parent = parent
super().setParent(parent)
[docs] def setPosition(self, fName, lineNumber):
"""should be called by parsers to what file at what line the
serialisation came from.
"""
self.__fName, self.__lineNumber = fName, lineNumber
[docs] def getSourcePosition(self):
"""returns a string representation of where the struct was parsed
from.
"""
if self.__fName is None:
return "<internally built>"
else:
return "%s, line %s"%(self.__fName, self.__lineNumber)
[docs] def getAttributes(self, attDefsFrom=None):
"""returns a dict of the current attributes, suitable for making
a shallow copy of self.
Struct attributes will not be reparented, so there are limits to
what you can do with such shallow copies.
"""
if attDefsFrom is None:
attrs = set(self.managedAttrs.values())
else:
attrs = set(attDefsFrom.managedAttrs.values())
try:
return dict([(att.name_, getattr(self, att.name_))
for att in attrs])
except AttributeError as msg:
raise common.logOldExc(common.StructureError(
"Attempt to copy from invalid source: %s"%str(msg)))
[docs] def getCopyableAttributes(self, ignoreKeys=set(), ctx=None, newParent=None):
"""returns a dictionary mapping attribute names to copyable children.
ignoreKeys can be a set or dict of additional attribute names to ignore.
The children are orphan deep copies.
"""
return dict((att.name_, att.getCopy(self, newParent, ctx))
for att in self.attrSeq
if att.copyable and att.name_ not in ignoreKeys)
[docs] def change(self, **kwargs):
"""returns a copy of self with all attributes in kwargs overridden with
the passed values.
"""
parent = kwargs.pop("parent_", self.parent)
runExits, ctx = False, kwargs.pop("ctx", None)
if ctx is None:
runExits, ctx = True, parsecontext.ParseContext()
newInstance = self.__class__(parent)
for attName, attValue in self.getCopyableAttributes(
kwargs, ctx, newInstance).items():
newInstance.feedObject(attName, attValue)
for attName, attValue in kwargs.items():
newInstance.feedObject(attName, attValue)
newInstance.finishElement(ctx)
if runExits:
ctx.runExitFuncs(newInstance)
return newInstance
[docs] def copy(self, parent, ctx=None):
"""returns a deep copy of self, reparented to parent.
This is a shallow wrapper around change, present for backward
compatibility.
"""
return self.change(parent_=parent, ctx=ctx)
[docs] def adopt(self, struct):
struct.setParent(self)
return struct
[docs] def iterChildren(self):
"""iterates over structure children of self.
To make this work, attributes containing structs must define
iterChildren methods (and the others must not).
"""
for att in self.attrSeq:
if hasattr(att, "iterChildren"):
for c in att.iterChildren(self):
yield c
[docs] @classmethod
def fromStructure(cls, newParent, oldStructure):
consArgs = dict([(att.name_, getattr(oldStructure, att.name_))
for att in oldStructure.attrSeq])
return cls(newParent, **consArgs)
[docs] def breakCircles(self):
"""removes the parent attributes from all child structures recursively.
The struct will probably be broken after this, but this is sometimes
necessary to help the python garbage collector.
In case you're asking: parent cannot be a weak reference with the current
parse architecture, as it usually is the only reference to the embedding
object. Yes, we should probably change that.
"""
for child in self.iterChildren():
# we don't want to touch structs that aren't our children
if hasattr(child, "parent") and child.parent is self:
if hasattr(child, "breakCircles"):
child.breakCircles()
delattr(child, "parent")
[docs]class ParseableStructure(StructureBase, common.StructCallbacks, common.Parser):
"""is a base class for Structures parseable from EventProcessors (and
thus XML).
This is still abstract in that you need at least a name_ attribute.
But it knows how to be fed from a parser, plus you have feed and feedObject
methods that look up the attribute names and call the methods on the
respective attribute definitions.
"""
_pristine = True
def __init__(self, parent, **kwargs):
StructureBase.__init__(self, parent, **kwargs)
[docs] def finishElement(self, ctx):
return self
[docs] def getAttribute(self, name):
"""Returns an attribute instance from name.
This function will raise a StructureError if no matching attribute
definition is found.
"""
if name in self.managedAttrs:
return self.managedAttrs[name]
if name=="content_":
raise common.StructureError("%s elements must not have character data"
" content."%(self.name_))
raise common.StructureError(
"%s elements have no %s attributes or children."%(self.name_, name))
[docs] def end_(self, ctx, name, value):
try:
self.finishElement(ctx)
except common.Replace as ex:
if ex.newName is not None:
name = ex.newName
if ex.newOb.id is not None:
ctx.registerId(ex.newOb.id, ex.newOb)
self.parent.feedObject(name, ex.newOb)
except common.Ignore:
pass
else:
if self.parent:
self.parent.feedObject(name, self)
# del self.feedEvent (at some point we might selectively reclaim parsers)
return self.parent
[docs] def value_(self, ctx, name, value):
attDef = self.getAttribute(name)
try:
attDef.feed(ctx, self, value)
except common.Replace as ex:
return ex.newOb
self._pristine = False
return self
[docs] def start_(self, ctx, name, value):
attDef = self.getAttribute(name)
if hasattr(attDef, "create"):
return attDef.create(self, ctx, name)
else:
return name
[docs] def feed(self, name, literal, ctx=None):
"""feeds the literal to the attribute name.
If you do not have a proper parse context ctx, so there
may be restrictions on what literals can be fed.
"""
self.managedAttrs[name].feed(ctx, self, literal)
[docs] def feedObject(self, name, ob):
"""feeds the object ob to the attribute name.
"""
self.managedAttrs[name].feedObject(self, ob)
[docs] def iterEvents(self):
"""yields an event sequence that transfers the copyable information
from self to something receiving the events.
If something is not copyable, it is ignored (i.e., keeps its default
on the target object).
"""
for att in self.attrSeq:
if not att.copyable:
continue
if hasattr(att, "iterEvents"):
for ev in att.iterEvents(self):
yield ev
else:
val = getattr(self, att.name_)
if val!=att.default_:
yield ("value", att.name_, att.unparse(val))
[docs] def feedFrom(self, other, ctx=None, suppress=set()):
"""feeds parsed objects from another structure.
This only works if the other structure is a of the same or a superclass
of self.
"""
from gavo.base import xmlstruct
if ctx is None:
ctx = parsecontext.ParseContext()
evProc = xmlstruct.EventProcessor(None, ctx)
evProc.setRoot(self)
for ev in other.iterEvents():
evProc.feed(*ev)
[docs]class Structure(ParseableStructure):
"""is the base class for user-defined structures.
It will do some basic validation and will call hooks to complete elements
and compute computed attributes, based on ParseableStructure's finishElement
hook.
Also, it supports onParentComplete callbacks; this works by checking
if any managedAttr has a onParentComplete method and calling it
with the current value of that attribute if necessary.
"""
[docs] def callCompletedCallbacks(self):
for attName, attType in self.managedAttrs.items():
if hasattr(attType, "onParentComplete"):
attVal = getattr(self, attType.name_)
if attVal!=attType.default_:
attType.onParentComplete(attVal)
[docs] def finishElement(self, ctx=None):
self.completeElement(ctx)
self.validate()
self.onElementComplete()
self.callCompletedCallbacks()
return self
[docs] def validate(self):
for val in set(self.managedAttrs.values()):
if getattr(self, val.name_) is attrdef.Undefined:
raise common.StructureError("You must set %s on %s elements"%(
val.name_, self.name_))
if hasattr(val, "validate"):
val.validate(self)
super().validate()
[docs] def onElementComplete(self):
super().onElementComplete()
[docs] def completeElement(self, ctx):
super().completeElement(ctx)
[docs]class RestrictionMixin(common.StructCallbacks):
"""A mixin for structure classes not allowed in untrusted RDs.
"""
[docs] def completeElement(self, ctx):
if getattr(ctx, "restricted", False):
raise common.RestrictedElement(self.name_)
super().completeElement(ctx)
[docs]def makeStruct(structClass, **kwargs):
"""creates a parentless instance of structClass with ``**kwargs``.
You can pass in a ``parent_`` kwarg to force a parent, and a ``ctx_``
if you need a parse context.
This is the preferred way to create struct instances in DaCHS, as it
will cause the sequence of completers and validators run. Use it like
this::
MS(rscdef.Column, name="ra", type="double precision)
"""
ctx = kwargs.pop("ctx_", None)
parent = kwargs.pop("parent_", None)
return structClass(parent, **kwargs).finishElement(ctx)