"""
Attributes with structure (i.e., containing structures or more than one
atom).
These come with parsers of their own, in some way or other.
Structure attributes, which do not have string literals and have some sort
of internal structure, add methods
- create(instance, ctx, name) -> structure -- creates a new object suitable
as attribute value and returns it (for structures, instance becomes the
parent of the new structure as a side effect of this operation). This
is what should later be fed to feedObject. It must work as a parser,
i.e., have a feedEvent method. The name argument gives the name of
the element that caused the create call, allowing for polymorphic attrs.
- replace(instance, oldVal, newVal) -> None -- replaces oldVal with newVal; this
works like feedObject, except that an old value is overwritten.
- iterEvents(instance) -> events -- yields events to recreate its value
on another instance.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from gavo.base import attrdef
from gavo.base import common
from gavo.base import literals
from gavo.utils import excs
__docformat__ = "restructuredtext en"
class CollOfAtomsAttribute(attrdef.AtomicAttribute):
"""A base class for simple collections of atomic attributes.
"""
def __init__(self, name, default=[],
itemAttD=attrdef.UnicodeAttribute("listItem"),
**kwargs):
attrdef.AttributeDef.__init__(self, name,
default=attrdef.Computed, **kwargs)
self.xmlName_ = itemAttD.name_
self.itemAttD = itemAttD
self.realDefault = default
def iterEvents(self, instance):
for item in getattr(instance, self.name_):
yield ("start", self.xmlName_, None)
yield ("value", "content_", self.itemAttD.unparse(item))
yield ("end", self.xmlName_, None)
[docs]class ListOfAtomsAttribute(CollOfAtomsAttribute):
"""is an attribute definition for an item containing many elements
of the same type.
It is constructed with an AttributeDef for the items. Note that it's
safe to pass in lists as defaults since they are copied before being
added to the instances, so you won't (and can't) have aliasing here.
"""
@property
def default_(self):
return self.realDefault[:]
@property
def typeDesc_(self):
return "Zero or more %s-typed *%s* elements"%(
self.itemAttD.typeDesc_,
self.itemAttD.name_)
[docs] def feed(self, ctx, instance, value):
getattr(instance, self.name_).append(self.itemAttD.parse(value))
[docs] def feedObject(self, instance, value):
if isinstance(value, list):
for item in value:
self.feedObject(instance, item)
else:
getattr(instance, self.name_).append(value)
self.doCallbacks(instance, value)
[docs] def getCopy(self, instance, newParent, ctx):
return getattr(instance, self.name_)[:]
[docs] def unparse(self, value):
return str(value)
[docs]class SetOfAtomsAttribute(CollOfAtomsAttribute):
"""is an attribute definition for an item containing many elements
of the same type, when order doesn't matter but lookup times do.
It is constructed with an AttributeDef for the items. Note that it's
safe to pass in lists as defaults since they are copied before being
added to the instances, so you won't (and can't) have aliasing here.
"""
@property
def default_(self):
return set(self.realDefault)
@property
def typeDesc_(self):
return "Set of %ss"%self.itemAttD.typeDesc_
[docs] def feed(self, ctx, instance, value):
getattr(instance, self.name_).add(self.itemAttD.parse(value))
[docs] def feedObject(self, instance, value):
if isinstance(value, set):
for item in value:
self.feedObject(instance, value)
else:
getattr(instance, self.name_).add(value)
self.doCallbacks(instance, value)
[docs] def getCopy(self, instance, newParent, ctx):
return set(getattr(instance, self.name_))
class _DictAttributeParser(common.Parser):
"""a parser for DictAttributes.
These need a custom parser since they accept some exotic features, as
discussed in DictAttribute's docstring.
The parser keeps state in the _key and _adding attributes and needs to
be _reset after use.
"""
def __init__(self,
dict,
nextParser,
parseValue,
keyName,
inverted=False,
expander=None):
self.dict, self.nextParser, self.parseValue = (
dict, nextParser, parseValue)
self.keyName, self.inverted, self.expander = (
keyName, inverted, expander)
self._reset()
def _reset(self):
self._key, self._adding = attrdef.Undefined, False
def addPair(self, key, value):
if self.inverted:
key, value = value, key
if self._adding:
self.dict[key] = self.dict.get(key, "")+value
else:
self.dict[key] = value
def value_(self, ctx, name, value):
if name=="key" or name==self.keyName:
self._key = value
elif name=="cumulate":
self._adding = literals.parseBooleanLiteral(value)
elif name=="content_":
if self._key is attrdef.Undefined:
raise common.StructureError("Content '%s' has no %s attribute"%(
value, self.keyName))
value = self.parseValue(value)
if (self.expander
and isinstance(value, str)
and "\\" in value):
value = self.expander.expand(value)
self.addPair(self._key, value)
self._reset()
else:
raise common.StructureError('No "%s" attributes on mappings'%name)
return self
def start_(self, ctx, name, value):
raise common.StructureError("No %s elements in mappings"%name)
def end_(self, ctx, name, value):
if self._key is not attrdef.Undefined:
raise common.StructureError("Empty mappings not allowed")
return self.nextParser
[docs]class DictAttribute(attrdef.AttributeDef):
"""an attribute containing a mapping.
DictAttributes are fairly complex beasts supporting a number of input
forms.
The input to those looks like <d key="foo">abc</d>; they are constructed
with an itemAttD (like StructAttributes), but the name on those
is ignored; they are just used for parsing from the strings in the
element bodies, which means that itemAttDs must be derived from
AtomicAttribute.
You can give a different keyNames; the key attribute is always
accepted, though.
For sufficiently exotic situations, you can construct DictAttributes
with inverted=True; the resulting dictionary will then have the keys as
values and vice versa (this is a doubtful feature; let us know when
you use it).
You can also add to existing values using the cumulate XML attribute;
<d key="s">a</d><d key="s" cumulate="True">bc</a> will leave
abc in s.
"""
def __init__(self, name, description="Undocumented",
itemAttD=attrdef.UnicodeAttribute("value"),
keyName="key",
inverted=False, expandMacros=False, **kwargs):
attrdef.AttributeDef.__init__(self, name,
attrdef.Computed, description, **kwargs)
self.xmlName_ = itemAttD.name_
self.itemAttD = itemAttD
self.keyName = keyName
self.inverted = inverted
self.expandMacros = expandMacros
@property
def typeDesc_(self):
return "Dict mapping strings to %s"%self.itemAttD.typeDesc_
@property
def default_(self):
return {}
[docs] def feedObject(self, instance, value):
setattr(instance, self.name_, value)
self.doCallbacks(instance, value)
[docs] def create(self, parent, ctx, name):
expander = None
if self.expandMacros and hasattr(parent, "expand"):
expander = parent
return _DictAttributeParser(getattr(parent, self.name_),
parent, self.itemAttD.parse, keyName=self.keyName,
inverted=self.inverted,
expander=expander)
[docs] def iterEvents(self, instance):
for key, value in getattr(instance, self.name_).items():
yield ("start", self.xmlName_, None)
yield ("value", "key", key)
yield ("value", "content_", self.itemAttD.unparse(value))
yield ("end", self.xmlName_, None)
[docs] def getCopy(self, instance, newParent, ctx):
return getattr(instance, self.name_).copy()
[docs] def makeUserDoc(self):
if self.inverted:
expl = ("the key is the element content, the value is in the 'key'"
" (or, equivalently, %s) attribute"%self.keyName)
else:
expl = ("the value is the element content, the key is in the 'key'"
" (or, equivalently, %s) attribute"%self.keyName)
return "**%s** (mapping; %s) -- %s"%(
self.xmlName_, expl, self.description_)
[docs]class PropertyAttribute(DictAttribute):
"""adds the property protocol to the parent instance.
The property protocol consists of the methods
- setProperty(name, value),
- getProperty(name, default=Undefined)
- clearProperty(name)
- hasProperty(name)
getProperty works like dict.get, except it will raise a KeyError
without a default.
This is provided for user information and, to some extent, some
DC-internal purposes.
"""
def __init__(self, description="Properties (i.e., user-defined"
" key-value pairs) for the element.", **kwargs):
DictAttribute.__init__(self, "properties", description=description,
keyName="name", expandMacros=True, **kwargs)
self.xmlName_ = "property"
[docs] def iterParentMethods(self):
def setProperty(self, name, value):
self.properties[name] = value
yield "setProperty", setProperty
def getProperty(self, name, default=attrdef.Undefined):
if default is attrdef.Undefined:
try:
return self.properties[name]
except KeyError:
raise excs.NotFoundError(name,
"property",
repr(self))
else:
return self.properties.get(name, default)
yield "getProperty", getProperty
def clearProperty(self, name):
if name in self.properties:
del self.properties[name]
yield "clearProperty", clearProperty
def hasProperty(self, name):
return name in self.properties
yield "hasProperty", hasProperty
[docs] def makeUserDoc(self):
return ("**property** (mapping of user-defined keywords in the"
" name attribute to string values) -- %s"%self.description_)
[docs]class StructAttribute(attrdef.AttributeDef):
"""describes an attribute containing a Structure
These are constructed with a childFactory that must have a feedEvent
method. Otherwise, they are normal structs, i.e., the receive a
parent as the first argument and keyword arguments for values.
In addition, you can pass a onParentComplete callback that
are collected in the completedCallback list by the struct decorator.
ParseableStruct instances call these when they receive their end
event during XML deserialization.
"""
def __init__(self, name, childFactory, default=attrdef.Undefined,
description="Undocumented", **kwargs):
xmlName = kwargs.pop("xmlName", None)
attrdef.AttributeDef.__init__(self, name, default, description, **kwargs)
self.childFactory = childFactory
if xmlName is not None:
self.xmlName_ = xmlName
elif self.childFactory is not None:
self.xmlName_ = self.childFactory.name_
if getattr(self.childFactory, "aliases", None):
if self.aliases:
self.aliases.extend(self.childFactory.aliases)
else:
self.aliases = self.childFactory.aliases[:]
@property
def typeDesc_(self):
return getattr(self.childFactory, "docName_", self.childFactory.name_)
[docs] def feedObject(self, instance, value):
if value and value.parent is None: # adopt if necessary
value.parent = instance
setattr(instance, self.name_, value)
self.doCallbacks(instance, value)
[docs] def feed(self, ctx, instance, value):
# if the child factory actually admits content_ (and needs nothing
# else), allow attributes to be fed in, too.
if "content_" in self.childFactory.managedAttrs:
child = self.childFactory(instance, content_=value).finishElement(ctx)
return self.feedObject(instance, child)
raise common.LiteralParseError(self.name_,
value, hint="These attributes have no literals at all, i.e.,"
" they are for internal use only.")
[docs] def create(self, structure, ctx, name):
if self.childFactory is attrdef.Recursive:
res = structure.__class__(structure)
else:
res = self.childFactory(structure)
ctx.setPositionOn(res)
return res
[docs] def getCopy(self, instance, newParent, ctx):
val = getattr(instance, self.name_)
if val is not None:
return val.copy(newParent, ctx=ctx)
[docs] def replace(self, instance, oldStruct, newStruct):
setattr(instance, self.name_, newStruct)
[docs] def iterEvents(self, instance):
val = getattr(instance, self.name_)
if val is common.NotGiven:
return
if val is None:
return
yield ("start", val.name_, None)
for ev in val.iterEvents():
yield ev
yield ("end", val.name_, None)
[docs] def iterChildren(self, instance):
if getattr(instance, self.name_) is not None:
yield getattr(instance, self.name_)
[docs] def remove(self, child):
setattr(child.parent, self.name_, self.default)
[docs] def onParentComplete(self, val):
if hasattr(val, "onParentComplete"):
val.onParentComplete()
[docs] def makeUserDoc(self):
if self.childFactory is attrdef.Recursive:
contains = "(contains an instance of the embedding element)"
else:
contains = "(contains `Element %s`_)"%self.typeDesc_
return "%s %s -- %s"%(
self.name_, contains, self.description_)
[docs]class MultiStructAttribute(StructAttribute):
"""describes an attribute containing one of a class of Structures.
This is to support things like grammars or cores -- these can
be of many types.
This works like StructAttribute, except that childFactory now is
a *function* returning elements (i.e., it's a childFactoryFactory).
"""
def __init__(self, name, childFactory, childNames, **kwargs):
StructAttribute.__init__(self, name, None, **kwargs)
self.childFactory = childFactory
self.aliases = childNames
@property
def typeDesc_(self):
return ("one of %s"%", ".join(self.aliases))
[docs] def create(self, structure, ctx, name):
res = self.childFactory(name)(structure)
ctx.setPositionOn(res)
return res
[docs] def makeUserDoc(self):
return "%s (contains one of %s) -- %s"%(
self.name_, ", ".join(self.aliases), self.description_)
[docs]class StructListAttribute(StructAttribute):
"""describes an attribute containing a homogeneous list of structures.
"""
def __init__(self, name, childFactory, description="Undocumented",
**kwargs):
StructAttribute.__init__(self, name, childFactory, attrdef.Computed,
description, **kwargs)
@property
def default_(self):
return []
@property
def typeDesc_(self):
if self.childFactory is attrdef.Recursive:
return "Recursive element list"
else:
return "List of %s"%self.childFactory.name_
[docs] def addStruct(self, instance, value, destIndex=None):
"""adds a structure to the attribute's value.
Do *not* directly add to the list, always go through this
method; derived classes override it for special behaviour.
Also, this is where callbacks are called.
Use destIndex to overwrite an (existing!) struct; default is appending.
"""
if value.parent is None: # adopt if necessary
value.parent = instance
if destIndex is None:
getattr(instance, self.name_).append(value)
else:
getattr(instance, self.name_)[destIndex] = value
self.doCallbacks(instance, value)
[docs] def feedObject(self, instance, value):
if isinstance(value, list):
for item in value:
self.feedObject(instance, item)
else:
self.addStruct(instance, value)
[docs] def getCopy(self, instance, newParent, ctx):
res = [c.copy(newParent, ctx=ctx)
for c in getattr(instance, self.name_)]
return res
[docs] def replace(self, instance, oldStruct, newStruct):
# This will only replace the first occurrence of oldStruct if
# multiple identical items are in the list. Any other behaviour
# would be about as useful, so let's leave it at this for now.
ind = getattr(instance, self.name_).index(oldStruct)
self.addStruct(instance, newStruct, ind)
[docs] def iterEvents(self, instance):
for val in getattr(instance, self.name_):
yield ("start", val.name_, None)
for ev in val.iterEvents():
yield ev
yield ("end", val.name_, None)
[docs] def iterChildren(self, instance):
return iter(getattr(instance, self.name_))
[docs] def remove(self, child):
getattr(child.parent, self.name_).remove(child)
[docs] def onParentComplete(self, val):
if val:
for item in val:
if hasattr(item, "onParentComplete"):
item.onParentComplete()
[docs] def makeUserDoc(self):
if self.childFactory is attrdef.Recursive:
contains = "(contains an instance of the embedding element"
else:
contains = "(contains `Element %s`_"%self.childFactory.name_
return ("%s %s and may be repeated zero or more"
" times) -- %s")%(self.name_, contains, self.description_)
[docs]class UniquedStructListAttribute(StructListAttribute):
"""A StructListAttribute that will only admit one child per value
of uniqueAttribute, overwriting existing entries if existing.
Actually, you can pass a policy="drop" argument to just keep
an existing element and drop the new one.
"""
def __init__(self, name, childFactory, uniqueAttribute,
policy="overwrite", **kwargs):
self.uniqueAttribute = uniqueAttribute
if policy not in ["overwrite", "drop"]:
raise common.StructureError("UniquedStructListAttribute policy"
" must be either overwrite or drop")
self.policy = policy
StructListAttribute.__init__(self, name, childFactory, **kwargs)
@property
def typeDesc_(self):
return "List of %s, uniqued on %s's value"%(
self.childFactory.name_, self.uniqueAttribute)
[docs] def addStruct(self, instance, value):
# we expect lists will not get so long as to make a linear search
# actually expensive. Linear searching, on the other hand, saves
# us from having to maintain and index (in the presence of
# possible deletions!)
uniqueOn = getattr(value, self.uniqueAttribute)
for index, item in enumerate(iter(getattr(instance, self.name_))):
if getattr(item, self.uniqueAttribute)==uniqueOn:
if self.policy=="overwrite":
StructListAttribute.addStruct(self, instance, value, index)
break
else:
StructListAttribute.addStruct(self, instance, value)
# Ok, so the inheritance here is evil. I'll fix it if it needs more work.
[docs]class MultiStructListAttribute(StructListAttribute, MultiStructAttribute):
"""describes a list of polymorphous children.
See rscdesc cores as to why one could want this; the arguments are
as for MultiStructAttribute.
"""
def __init__(self, name, childFactory, childNames, **kwargs):
StructListAttribute.__init__(self, name, None, **kwargs)
self.childFactory = childFactory
self.aliases = childNames
@property
def typeDesc_(self):
return "List of any of %s"%(", ".join(self.aliases))
[docs] def create(self, structure, ctx, name):
res = MultiStructAttribute.create(self, structure, ctx, name)
ctx.setPositionOn(res)
return res
[docs] def makeUserDoc(self):
if self.childFactory is attrdef.Recursive:
contains = "(contains an instance of the embedding element"
else:
contains = "(contains any of %s"%",".join(self.aliases)
return ("%s %s and may be repeated zero or more"
" times) -- %s")%(self.name_, contains, self.description_)
__all__ = ["ListOfAtomsAttribute", "DictAttribute", "StructAttribute",
"MultiStructAttribute", "StructListAttribute", "MultiStructListAttribute",
"UniquedStructListAttribute", "SetOfAtomsAttribute", "PropertyAttribute"]