Source code for gavo.votable.model

"""
xmlstan elements of VOTable.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import base64
import functools

from gavo import utils
from gavo.utils import ElementTree
from gavo.utils.stanxml import (
	Element, registerPrefix, getPrefixInfo, schemaURL, escapePCDATA)
from gavo.votable import common


NAMESPACES = {
	"1.4": "http://www.ivoa.net/xml/VOTable/v1.3",
	"1.3": "http://www.ivoa.net/xml/VOTable/v1.3",
	"1.2": "http://www.ivoa.net/xml/VOTable/v1.2",
	"1.1": "http://www.ivoa.net/xml/VOTable/v1.1",
}

# WARNING: if you change the default version of the VOTables generated,
# you'll probably have to adapt them in resources/xsl, too.
registerPrefix("vot", NAMESPACES["1.3"], schemaURL("VOTable-1.4.xsd"))
registerPrefix("vot2", NAMESPACES["1.2"], schemaURL("VOTable-1.2.xsd"))
registerPrefix("vot1", NAMESPACES["1.1"], schemaURL("VOTable-1.1.xsd"))

registerPrefix("mivot",
	"http://www.ivoa.net/xml/mivot", schemaURL("mivot.xsd"))


class _MIVOTElements(object):
	"""elements from MIVOT, i.e., data model annotation
	"""
	class _MIVOTElement(Element):
		_prefix = "mivot"
		_local = False
		_a_ID = None

	class VODML(_MIVOTElement):
		_childSequence = ["REPORT", "MODEL", "GLOBALS", "TEMPLATES"]

	class REPORT(_MIVOTElement):
		_mayBeEmpty = True
		_a_status = None

	class MODEL(_MIVOTElement):
		_mayBeEmpty = True
		_a_name = None
		_a_url = None

	class GLOBALS(_MIVOTElement):
		_childSequence = ["INSTANCE", "COLLECTION"]

	class TEMPLATES(_MIVOTElement):
		_a_tableref = None
		_childSequence = ["INSTANCE"]

	class COLLECTION(_MIVOTElement):
		_a_dmid = None
		_a_dmrole = None
		_a_dmtype = None
		_childSequence = ["ATTRIBUTE", "REFERENCE", "INSTANCE", "COLLECTION"]

	class INSTANCE(_MIVOTElement):
		_a_dmtype = None
		_childSequence = ["REFERENCE", "ATTRIBUTE"]

	class ATTRIBUTE(_MIVOTElement):
		_mayBeEmpty = True
		_a_dmrole = None
		_a_ref = None
		_a_dmtype = None
		_a_value = None
		_a_type = None
		_a_arrayindex = None
		_childSequence = [
			"COLLECTION", "INSTANCE", "REFERENCE"]

	class REFERENCE(_MIVOTElement):
		_mayBeEmpty = True
		_a_dmrole = None
		_a_sourceref = None
		_a_dmref = None


[docs]class VOTable(_MIVOTElements): """The container for VOTable elements. """ class _VOTElement(Element): _prefix = "vot" _local = True class _DescribedElement(_VOTElement): _a_ID = None _a_ref = None _a_name = None _a_ucd = None _a_utype = None _mayBeEmpty = True def getDesignation(self): """returns some name-like thing for a FIELD or PARAM. """ if self.name: res = self.name elif self.ID: res = self.ID else: res = "%s_%s"%(self.__class__.__name__, "%x"%id(self)) return res def getDescription(self): """returns the description for this element, or an empty string. """ try: return next(self.iterChildrenOfType(VOTable.DESCRIPTION)).text_ except StopIteration: return "" class _ValuedElement(_DescribedElement): _a_unit = None _a_xtype = None class _TypedElement(_ValuedElement): _a_ref = None _a_arraysize = None _a_datatype = None _a_precision = None _a_ref = None _a_type = None _a_width = None _a_format = None def isScalar(self): return self.arraysize is None or self.arraysize=='1' def isMultiDim(self): return common.isMultiDim(self.arraysize) def hasVarLength(self): return common.hasVarLength(self.arraysize) def getLength(self): """returns the number of items one should expect in value, or None for variable-length arrays. """ return common.getLength(self.arraysize) def getShape(self): return common.getShape(self.datatype, self.arraysize) def _setNULLValue(self, val): """sets the null literal of self to val. """ valEls = list(self.iterChildrenWithName("VALUES")) if valEls: valEls[0](null=val) else: self[VOTable.VALUES(null=val)] class _RefElement(_ValuedElement): _a_ref = None _a_ucd = None _a_utype = None childSequence = [] class _ContentElement(_VOTElement): """An element containing tabular data. These are usually serialized using some kind of streaming. See votable.tablewriter for details. """ def write(self, file): raise NotImplementedError("This _ContentElement cannot write yet") class _BinaryDataElement(_ContentElement): """a base class for both BINARY and BINARY2. """ _childSequence = ["STREAM"] encoding = "base64" def write(self, file): # To be able to write incrementally, encode chunks of multiples # of base64's block size until the stream is finished. blockSize = 57 buf, bufFil, flushThreshold = [], 0, blockSize*20 file.write( utils.bytify( '<%s>'%self.name_+ '<STREAM encoding="base64">')) try: for data in self.iterSerialized(): buf.append(data) bufFil += len(data) if bufFil>flushThreshold: curData = b''.join(buf) curBlockLen = (len(curData)//blockSize)*blockSize file.write(base64.b64encode(curData[:curBlockLen])) buf = [curData[curBlockLen:]] finally: file.write(base64.b64encode(b"".join(buf))) file.write(utils.bytify("</STREAM></%s>"%self.name_))
[docs] class BINARY(_BinaryDataElement): pass
[docs] class BINARY2(_BinaryDataElement): pass
[docs] class COOSYS(_VOTElement): _mayBeEmpty = True _a_ID = None _a_epoch = None _a_equinox = None _a_system = None _a_refposition = None
[docs] class TIMESYS(_VOTElement): _mayBeEmpty = True _a_ID = None _a_timescale = "UNKNOWN" _a_refposition = "UNKNOWN" _a_timeorigin = None
[docs] class TYPE(_VOTElement): pass
[docs] class ROLE(_VOTElement): pass
[docs] class DATA(_VOTElement): _childSequence = ["INFO", "TABLEDATA", "BINARY", "BINARY2", "FITS"]
[docs] class DEFINITIONS(_VOTElement): pass
[docs] class DESCRIPTION(_VOTElement): _childSequence = [None]
[docs] class FIELD(_TypedElement): _childSequence = ["DESCRIPTION", "VALUES", "LINK"]
[docs] class FIELDref(_RefElement): pass
[docs] class FITS(_VOTElement): _childSequence = ["STREAM"]
[docs] class GROUP(_DescribedElement): _mayBeEmpty = True _a_ref = None _childSequence = ["DESCRIPTION", "PARAM", "FIELDref", "PARAMref", "GROUP"]
[docs] class INFO(_ValuedElement): _a_ref = None _a_value = None _childSequence = [None]
[docs] def isEmpty(self): return self.value is None
[docs] class INFO_atend(INFO): # a bad hack; TAP mandates INFO items below table, and this is # the least complicated way to force this. name_ = "INFO"
[docs] class MAX(_VOTElement): _a_inclusive = None _a_value = None _childSequence = []
[docs] def isEmpty(self): return self.value is None
[docs] class MIN(_VOTElement): _a_inclusive = None _a_value = None _childSequence = []
[docs] def isEmpty(self): return self.value is None
[docs] class OPTION(_VOTElement): _a_name = None _a_value = "" # as with PARAM below _childSequence = ["OPTION"] _mayBeEmpty = True
[docs] class PARAM(_TypedElement): _mayBeEmpty = True _a_value = "" # supposed to mean "ah, somewhat null" # Needs to be cared for in client code. _childSequence = ["DESCRIPTION", "VALUES", "LINK"]
[docs] class PARAMref(_RefElement): pass
[docs] class RESOURCE(_VOTElement): _a_ID = None _a_name = None _a_type = None _a_utype = None _childSequence = ["DESCRIPTION", "VODML", "DEFINITIONS", "INFO", "COOSYS", "TIMESYS", "GROUP", "PARAM", "LINK", "TABLE", "INFO_atend", "RESOURCE", "stub"] # (stub for delayed overflow warnings and such)
[docs] def writeErrorElement(self, outputFile, exception): outputFile.write( utils.bytify( """<INFO name="QUERY_STATUS" value="ERROR">%s</INFO>"""% escapePCDATA("Error while serializing VOTable," " content is probably incomplete: %s"% utils.safe_str(exception))))
[docs] class STREAM(_VOTElement): _a_actuate = None _a_encoding = None _a_expires = None _a_href = None _a_rights = None _a_type = None _childSequence = [None]
[docs] class TABLE(_DescribedElement): """A TABLE element. If you want to access fields by name (getFieldForName), make sure name and ids are unique. """ _a_nrows = None _childSequence = ["DESCRIPTION", "INFO", "GROUP", "FIELD", "PARAM", "LINK", "DATA", "stub"] # (stub for delayed overflow warnings and such) _fieldIndex = None
[docs] @functools.lru_cache(1) def getFields(self): return list(self.iterChildrenOfType(VOTable.FIELD))
def _getFieldIndex(self): if self._fieldIndex is None: index = {} for child in self.getFields(): if child.name: index[child.name] = child if child.ID: index[child.ID] = child self._fieldIndex = index return self._fieldIndex
[docs] def getFieldForName(self, name): """returns the FIELD having a name or id of name. A KeyError is raised when the field does not exist; if names are not unique, the last column with the name specified is returned. """ return self._getFieldIndex()["name"]
[docs] class TABLEDATA(_ContentElement): _childSequence = ["TR"]
[docs] def write(self, file): file.write(b"<TABLEDATA>") try: for row in self.iterSerialized(): file.write(row.encode("utf-8")) finally: file.write(b"</TABLEDATA>")
[docs] class TD(_VOTElement): _a_encoding = None _childSequence = [None] _mayBeEmpty = True
[docs] class TR(_VOTElement): _a_ID = None _childSequence = ["TD"]
[docs] class VALUES(_VOTElement): _a_ID = None _a_null = None _a_ref = None _a_type = None
[docs] def isEmpty(self): return self.null is None and Element.isEmpty(self)
[docs] class VOTABLE(_VOTElement): _a_ID = None _a_version = "1.4" _prefix = "vot" _supressedPrefix = "vot" _mayBeEmpty = True # The following is for when the xmlstan tree is processed by # tablewriter.write rather than asETree _fixedTagMaterial = ('xmlns="%s" xmlns:xsi="%s"' ' xsi:schemaLocation="%s %s"')%(( getPrefixInfo("vot")[0], getPrefixInfo("xsi")[0]) +getPrefixInfo("vot")) _childSequence = ["DESCRIPTION", "VODML", "DEFINITIONS", "INFO", "COOSYS", "TIMESYS", "GROUP", "PARAM", "RESOURCE"]
[docs] class VOTABLE11(VOTABLE): # An incredibly nasty hack that kinda works due to the fact that # all elements here are local -- make this your top-level element # and only use what's legal in VOTable 1.1, and you get a VOTable1.1 # conforming document name_ = "VOTABLE" _a_version = "1.1" _prefix = "vot1" _supressedPrefix = "vot1" # The following is for when the xmlstan tree is processed by # tablewriter.write rather than asETree _fixedTagMaterial = ('xmlns="%s" xmlns:xsi="%s"' ' xsi:schemaLocation="%s %s"')%(( getPrefixInfo("vot1")[0], getPrefixInfo("xsi")[0]) +getPrefixInfo("vot1"))
[docs] class VOTABLE12(VOTABLE): # see VOTABLE11 name_ = "VOTABLE" _a_version = "1.2" _prefix = "vot2" _supressedPrefix = "vot2" # The following is for when the xmlstan tree is processed by # tablewriter.write rather than asETree _fixedTagMaterial = ('xmlns="%s" xmlns:xsi="%s"' ' xsi:schemaLocation="%s %s"')%(( getPrefixInfo("vot1")[0], getPrefixInfo("xsi")[0]) +getPrefixInfo("vot2"))
[docs]def voTag(tagName, version="1.4"): """returns the VOTable QName for tagName. You only need this if you want to search in ElementTrees. """ return ElementTree.QName(NAMESPACES[version], tagName)