"""
xmlstan elements of VOTable.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import base64
import functools
from gavo import utils
from gavo.utils import ElementTree
from gavo.utils.stanxml import (
Element, registerPrefix, getPrefixInfo, schemaURL, escapePCDATA)
from gavo.votable import common
NAMESPACES = {
"1.4": "http://www.ivoa.net/xml/VOTable/v1.3",
"1.3": "http://www.ivoa.net/xml/VOTable/v1.3",
"1.2": "http://www.ivoa.net/xml/VOTable/v1.2",
"1.1": "http://www.ivoa.net/xml/VOTable/v1.1",
}
# WARNING: if you change the default version of the VOTables generated,
# you'll probably have to adapt them in resources/xsl, too.
registerPrefix("vot", NAMESPACES["1.3"], schemaURL("VOTable-1.4.xsd"))
registerPrefix("vot2", NAMESPACES["1.2"], schemaURL("VOTable-1.2.xsd"))
registerPrefix("vot1", NAMESPACES["1.1"], schemaURL("VOTable-1.1.xsd"))
registerPrefix("mivot",
"http://www.ivoa.net/xml/mivot", schemaURL("mivot.xsd"))
class _MIVOTElements(object):
"""elements from MIVOT, i.e., data model annotation
"""
class _MIVOTElement(Element):
_prefix = "mivot"
_local = False
_a_ID = None
class VODML(_MIVOTElement):
_childSequence = ["REPORT", "MODEL", "GLOBALS", "TEMPLATES"]
class REPORT(_MIVOTElement):
_mayBeEmpty = True
_a_status = None
class MODEL(_MIVOTElement):
_mayBeEmpty = True
_a_name = None
_a_url = None
class GLOBALS(_MIVOTElement):
_childSequence = ["INSTANCE", "COLLECTION"]
class TEMPLATES(_MIVOTElement):
_a_tableref = None
_childSequence = ["INSTANCE"]
class COLLECTION(_MIVOTElement):
_a_dmid = None
_a_dmrole = None
_a_dmtype = None
_childSequence = ["ATTRIBUTE", "REFERENCE", "INSTANCE", "COLLECTION"]
class INSTANCE(_MIVOTElement):
_a_dmtype = None
_childSequence = ["REFERENCE", "ATTRIBUTE"]
class ATTRIBUTE(_MIVOTElement):
_mayBeEmpty = True
_a_dmrole = None
_a_ref = None
_a_dmtype = None
_a_value = None
_a_type = None
_a_arrayindex = None
_childSequence = [
"COLLECTION", "INSTANCE", "REFERENCE"]
class REFERENCE(_MIVOTElement):
_mayBeEmpty = True
_a_dmrole = None
_a_sourceref = None
_a_dmref = None
[docs]class VOTable(_MIVOTElements):
"""The container for VOTable elements.
"""
class _VOTElement(Element):
_prefix = "vot"
_local = True
class _DescribedElement(_VOTElement):
_a_ID = None
_a_ref = None
_a_name = None
_a_ucd = None
_a_utype = None
_mayBeEmpty = True
def getDesignation(self):
"""returns some name-like thing for a FIELD or PARAM.
"""
if self.name:
res = self.name
elif self.ID:
res = self.ID
else:
res = "%s_%s"%(self.__class__.__name__, "%x"%id(self))
return res
def getDescription(self):
"""returns the description for this element, or an empty string.
"""
try:
return next(self.iterChildrenOfType(VOTable.DESCRIPTION)).text_
except StopIteration:
return ""
class _ValuedElement(_DescribedElement):
_a_unit = None
_a_xtype = None
class _TypedElement(_ValuedElement):
_a_ref = None
_a_arraysize = None
_a_datatype = None
_a_precision = None
_a_ref = None
_a_type = None
_a_width = None
_a_format = None
def isScalar(self):
return self.arraysize is None or self.arraysize=='1'
def isMultiDim(self):
return common.isMultiDim(self.arraysize)
def hasVarLength(self):
return common.hasVarLength(self.arraysize)
def getLength(self):
"""returns the number of items one should expect in value, or
None for variable-length arrays.
"""
return common.getLength(self.arraysize)
def getShape(self):
return common.getShape(self.datatype, self.arraysize)
def _setNULLValue(self, val):
"""sets the null literal of self to val.
"""
valEls = list(self.iterChildrenWithName("VALUES"))
if valEls:
valEls[0](null=val)
else:
self[VOTable.VALUES(null=val)]
class _RefElement(_ValuedElement):
_a_ref = None
_a_ucd = None
_a_utype = None
childSequence = []
class _ContentElement(_VOTElement):
"""An element containing tabular data.
These are usually serialized using some kind of streaming.
See votable.tablewriter for details.
"""
def write(self, file):
raise NotImplementedError("This _ContentElement cannot write yet")
class _BinaryDataElement(_ContentElement):
"""a base class for both BINARY and BINARY2.
"""
_childSequence = ["STREAM"]
encoding = "base64"
def write(self, file):
# To be able to write incrementally, encode chunks of multiples
# of base64's block size until the stream is finished.
blockSize = 57
buf, bufFil, flushThreshold = [], 0, blockSize*20
file.write(
utils.bytify(
'<%s>'%self.name_+
'<STREAM encoding="base64">'))
try:
for data in self.iterSerialized():
buf.append(data)
bufFil += len(data)
if bufFil>flushThreshold:
curData = b''.join(buf)
curBlockLen = (len(curData)//blockSize)*blockSize
file.write(base64.b64encode(curData[:curBlockLen]))
buf = [curData[curBlockLen:]]
finally:
file.write(base64.b64encode(b"".join(buf)))
file.write(utils.bytify("</STREAM></%s>"%self.name_))
[docs] class BINARY(_BinaryDataElement):
pass
[docs] class BINARY2(_BinaryDataElement):
pass
[docs] class COOSYS(_VOTElement):
_mayBeEmpty = True
_a_ID = None
_a_epoch = None
_a_equinox = None
_a_system = None
_a_refposition = None
[docs] class TIMESYS(_VOTElement):
_mayBeEmpty = True
_a_ID = None
_a_timescale = "UNKNOWN"
_a_refposition = "UNKNOWN"
_a_timeorigin = None
[docs] class TYPE(_VOTElement):
pass
[docs] class ROLE(_VOTElement):
pass
[docs] class DATA(_VOTElement):
_childSequence = ["INFO", "TABLEDATA", "BINARY", "BINARY2", "FITS"]
[docs] class DEFINITIONS(_VOTElement):
pass
[docs] class DESCRIPTION(_VOTElement):
_childSequence = [None]
[docs] class FIELD(_TypedElement):
_childSequence = ["DESCRIPTION", "VALUES", "LINK"]
[docs] class FIELDref(_RefElement): pass
[docs] class FITS(_VOTElement):
_childSequence = ["STREAM"]
[docs] class GROUP(_DescribedElement):
_mayBeEmpty = True
_a_ref = None
_childSequence = ["DESCRIPTION", "PARAM", "FIELDref",
"PARAMref", "GROUP"]
[docs] class INFO(_ValuedElement):
_a_ref = None
_a_value = None
_childSequence = [None]
[docs] def isEmpty(self):
return self.value is None
[docs] class INFO_atend(INFO):
# a bad hack; TAP mandates INFO items below table, and this is
# the least complicated way to force this.
name_ = "INFO"
[docs] class LINK(_VOTElement):
_a_ID = None
_a_action = None
_a_content_role = None
_name_a_content_role = "content-role"
_a_content_type = None
_name_a_content_type = "content-type"
_a_gref = None
_a_href = None
_a_name = None
_a_value = None
_a_title = None
_childSequence = []
_mayBeEmpty = True
[docs] class MAX(_VOTElement):
_a_inclusive = None
_a_value = None
_childSequence = []
[docs] def isEmpty(self):
return self.value is None
[docs] class MIN(_VOTElement):
_a_inclusive = None
_a_value = None
_childSequence = []
[docs] def isEmpty(self):
return self.value is None
[docs] class OPTION(_VOTElement):
_a_name = None
_a_value = "" # as with PARAM below
_childSequence = ["OPTION"]
_mayBeEmpty = True
[docs] class PARAM(_TypedElement):
_mayBeEmpty = True
_a_value = "" # supposed to mean "ah, somewhat null"
# Needs to be cared for in client code.
_childSequence = ["DESCRIPTION", "VALUES", "LINK"]
[docs] class PARAMref(_RefElement): pass
[docs] class RESOURCE(_VOTElement):
_a_ID = None
_a_name = None
_a_type = None
_a_utype = None
_childSequence = ["DESCRIPTION", "VODML", "DEFINITIONS",
"INFO", "COOSYS", "TIMESYS", "GROUP",
"PARAM", "LINK", "TABLE", "INFO_atend", "RESOURCE", "stub"]
# (stub for delayed overflow warnings and such)
[docs] def writeErrorElement(self, outputFile, exception):
outputFile.write(
utils.bytify(
"""<INFO name="QUERY_STATUS" value="ERROR">%s</INFO>"""%
escapePCDATA("Error while serializing VOTable,"
" content is probably incomplete: %s"%
utils.safe_str(exception))))
[docs] class STREAM(_VOTElement):
_a_actuate = None
_a_encoding = None
_a_expires = None
_a_href = None
_a_rights = None
_a_type = None
_childSequence = [None]
[docs] class TABLE(_DescribedElement):
"""A TABLE element.
If you want to access fields by name (getFieldForName), make sure
name and ids are unique.
"""
_a_nrows = None
_childSequence = ["DESCRIPTION", "INFO", "GROUP", "FIELD", "PARAM", "LINK",
"DATA", "stub"] # (stub for delayed overflow warnings and such)
_fieldIndex = None
[docs] @functools.lru_cache(1)
def getFields(self):
return list(self.iterChildrenOfType(VOTable.FIELD))
def _getFieldIndex(self):
if self._fieldIndex is None:
index = {}
for child in self.getFields():
if child.name:
index[child.name] = child
if child.ID:
index[child.ID] = child
self._fieldIndex = index
return self._fieldIndex
[docs] def getFieldForName(self, name):
"""returns the FIELD having a name or id of name.
A KeyError is raised when the field does not exist; if names are
not unique, the last column with the name specified is returned.
"""
return self._getFieldIndex()["name"]
[docs] class TABLEDATA(_ContentElement):
_childSequence = ["TR"]
[docs] def write(self, file):
file.write(b"<TABLEDATA>")
try:
for row in self.iterSerialized():
file.write(row.encode("utf-8"))
finally:
file.write(b"</TABLEDATA>")
[docs] class TD(_VOTElement):
_a_encoding = None
_childSequence = [None]
_mayBeEmpty = True
[docs] class TR(_VOTElement):
_a_ID = None
_childSequence = ["TD"]
[docs] class VALUES(_VOTElement):
_a_ID = None
_a_null = None
_a_ref = None
_a_type = None
[docs] def isEmpty(self):
return self.null is None and Element.isEmpty(self)
[docs] class VOTABLE(_VOTElement):
_a_ID = None
_a_version = "1.4"
_prefix = "vot"
_supressedPrefix = "vot"
_mayBeEmpty = True
# The following is for when the xmlstan tree is processed by
# tablewriter.write rather than asETree
_fixedTagMaterial = ('xmlns="%s" xmlns:xsi="%s"'
' xsi:schemaLocation="%s %s"')%((
getPrefixInfo("vot")[0],
getPrefixInfo("xsi")[0])
+getPrefixInfo("vot"))
_childSequence = ["DESCRIPTION", "VODML", "DEFINITIONS", "INFO", "COOSYS",
"TIMESYS", "GROUP", "PARAM", "RESOURCE"]
[docs] class VOTABLE11(VOTABLE):
# An incredibly nasty hack that kinda works due to the fact that
# all elements here are local -- make this your top-level element
# and only use what's legal in VOTable 1.1, and you get a VOTable1.1
# conforming document
name_ = "VOTABLE"
_a_version = "1.1"
_prefix = "vot1"
_supressedPrefix = "vot1"
# The following is for when the xmlstan tree is processed by
# tablewriter.write rather than asETree
_fixedTagMaterial = ('xmlns="%s" xmlns:xsi="%s"'
' xsi:schemaLocation="%s %s"')%((
getPrefixInfo("vot1")[0],
getPrefixInfo("xsi")[0])
+getPrefixInfo("vot1"))
[docs] class VOTABLE12(VOTABLE):
# see VOTABLE11
name_ = "VOTABLE"
_a_version = "1.2"
_prefix = "vot2"
_supressedPrefix = "vot2"
# The following is for when the xmlstan tree is processed by
# tablewriter.write rather than asETree
_fixedTagMaterial = ('xmlns="%s" xmlns:xsi="%s"'
' xsi:schemaLocation="%s %s"')%((
getPrefixInfo("vot1")[0],
getPrefixInfo("xsi")[0])
+getPrefixInfo("vot2"))
[docs]def voTag(tagName, version="1.4"):
"""returns the VOTable QName for tagName.
You only need this if you want to search in ElementTrees.
"""
return ElementTree.QName(NAMESPACES[version], tagName)