"""
Generating VOTables from internal data representations.
This is glue code to the more generic GAVO votable library. In particular,
it governs the application of base.SerManagers and their column descriptions
(which are what is passed around as colDescs in this module to come up with
VOTable FIELDs and the corresponding values.
You should access this module through formats.votable.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import contextlib
import datetime
import functools
import io
import itertools
from gavo import base
from gavo import dm
from gavo import rsc
from gavo import stc
from gavo import utils
from gavo import votable
from gavo.base import meta
from gavo.base import valuemappers
from gavo.formats import common
from gavo.votable import V
from gavo.votable import modelgroups
[docs]class Error(base.Error):
pass
tableEncoders = {
"td": V.TABLEDATA,
"binary": V.BINARY,
"binary2": V.BINARY2,
}
[docs]class VOTableContext(utils.IdManagerMixin):
"""A context object for writing VOTables.
The constructor arguments work as keyword arguments to ``getAsVOTable``.
Some other high-level functions accept finished contexts.
This class provides management for unique ID attributes, the value mapper
registry, and possibly additional services for writing VOTables.
VOTableContexts optionally take
- a value mapper registry (by default, valuemappers.defaultMFRegistry)
- the tablecoding (currently, td, binary, or binary2)
- version=(1,1) to order a 1.1-version VOTable, (1,2) for 1.2.
(default is now 1.4).
- acquireSamples=False to suppress reading some rows to get
samples for each column
- suppressNamespace=False to leave out a namespace declaration
(mostly convenient for debugging)
- overflowElement (see votable.tablewriter.OverflowElement)
There's also an attribute produceVODML that will automatically be
set for VOTable 1.5; you can set it to true manually, but the
resulting VOTables will probably be invalid.
If VO-DML processing is enabled, the context also manages models declared;
that's the modelsUsed dictionary, mapping prefix -> dm.Model instances
"""
def __init__(self, mfRegistry=valuemappers.defaultMFRegistry,
tablecoding='binary', version=None, acquireSamples=True,
suppressNamespace=False, overflowElement=None):
self.mfRegistry = mfRegistry
self.tablecoding = tablecoding
self.version = version or (1,4)
self.acquireSamples = acquireSamples
self.suppressNamespace = suppressNamespace
self.overflowElement = overflowElement
self._containerStack = []
self._tableStack = []
self._dmAttrStack = []
self._pushedRefs = {}
# state for VO-DML serialisation
self.produceVODML = self.version[0]>1 or self.version[1]>4
# group-serialising annotations must enter their python ids when they
# end up in the tree so they get only included once when
# referenced by multiple other annotations.
self.groupIdsInTree = set()
self.modelsDeclared = set()
self.rootVODML = V.VODML()
self.vodmlTemplates = V.TEMPLATES()
self.rootVODML[self.vodmlTemplates]
# While we're producing both note-style STC1 utypes and ad-hoc
# COOSYS refs from <stc>, make them share COOSYS elements based
# on the id of the underlying STC ast. Remove this when we've
# dumped STC1 utypes.
self.coosysByAST = {}
[docs] def addVODMLPrefix(self, prefix):
"""arranges the DM with prefix to be included in modelsUsed.
"""
if prefix not in self.modelsDeclared:
self.rootVODML[dm.getModelForPrefix(prefix).getVOT(self, None)]
self.modelsDeclared.add(prefix)
[docs] def addVODMLMaterial(self, stuff):
"""adds VODML annotation to this VOTable.
Note that it will only be rendered if produceVODML is true
(in general, for target versions >1.4).
"""
self.vodmlTemplates[stuff]
[docs] def makeTable(self, table):
"""returns xmlstan for a table.
This is exposed as a method of context as the dm subpackage
needs it, but I don't want to import formats there (yet).
This may go away as I fix the interdependence of dm, votable, and
format.
"""
return makeTable(self, table)
[docs] def getEnclosingTable(self):
"""returns the xmlstan element of the table currently built.
This returns a ValueError if the context isn't aware of a table
being built.
(This depends builders using activeContainer)
"""
for el in reversed(self._containerStack):
if el.name_=="TABLE":
return el
raise ValueError("Not currently building a table.")
[docs] def getEnclosingResource(self):
"""returns the xmlstan element of the resource currently built.
This returns a ValueError if the context isn't aware of a resource
being built.
(This depends builders using activeContainer)
"""
for el in reversed(self._containerStack):
if el.name_=="RESOURCE":
return el
raise ValueError("Not currently building a table.")
[docs] def getEnclosingContainer(self):
"""returns the innermost container element the builders have declared.
"""
return self._containerStack[-1]
@property
def currentTable(self):
"""the DaCHS table object from which things are currently built.
If no builder has declared a table being built (using buildingFromTable),
it's a value error.
"""
if not self._tableStack:
raise ValueError("No table being processed.")
return self._tableStack[-1]
[docs] @contextlib.contextmanager
def activeContainer(self, container):
"""a context manager to be called by VOTable builders when
they open a new TABLE or RESOURCE.
"""
self._containerStack.append(container)
try:
yield
finally:
self._containerStack.pop()
[docs] @contextlib.contextmanager
def buildingFromTable(self, table):
"""a context manager to control code that works on a DaCHS table.
"""
self._tableStack.append(table)
try:
yield
finally:
self._tableStack.pop()
[docs] @contextlib.contextmanager
def settingAttribute(self, dmAttr):
"""a context manager controlling the curDMAttr attribute.
This is used by the DM annotators that sometimes need to directly
manipulate the ATTRIBUTE element.
"""
self._dmAttrStack.append(dmAttr)
try:
yield dmAttr
finally:
self._dmAttrStack.pop()
@property
def curDMAttr(self):
"""returns the V.ATTRIBUTE element currently worked on.
This will raise an IndexError if not ATTRIBUTE is being built at the
moment.
"""
return self._dmAttrStack[-1]
[docs] def pushRefFor(self, rdEl, refVal):
"""orders refVal to be set as ref on rdEl's VOTable representation
if such a thing is being serialised.
This currently is more a hack for PARAMs with COOSYS than something
that should be really used; if this were to become a general pattern,
we should work out a way to assign the ref if rdEl's representation
already is in the tree...
"""
self._pushedRefs[id(rdEl)] = refVal
[docs] def addID(self, rdEl, votEl):
"""adds an ID attribute to votEl if rdEl has an id managed by self.
Also, if a ref has been noted for rdEl, a ref attribute is being
added, too. This is a special hack for params and coosys; and I suspect
we shouldn't go beyond that.
"""
try:
votEl.ID = self.getIdFor(rdEl)
except base.NotFoundError:
# the param is not referenced and thus needs no ID
pass
if id(rdEl) in self._pushedRefs:
votEl.ref = self._pushedRefs[id(rdEl)]
return votEl
################# Turning simple metadata into VOTable elements.
def _iterInfoInfos(dataSet):
"""returns a sequence of V.INFO items from the info meta of dataSet.
"""
for infoItem in dataSet.getMeta("info", default=[]):
name, value, id = infoItem.infoName, infoItem.infoValue, infoItem.infoId
yield V.INFO(name=name, value=value, ID=id)[infoItem.getContent()]
def _iterWarningInfos(dataSet):
"""yields INFO items containing warnings from the tables in dataSet.
"""
for table in list(dataSet.tables.values()):
for warning in table.getMeta("_warning", propagate=False, default=[]):
yield V.INFO(name="warning", value="In table %s: %s"%(
table.tableDef.id, warning.getContent("text", macroPackage=table)))
def _iterDatalinkResources(ctx, dataSet):
"""yields RESOURCE elements for datalink services defined for tables
we have.
This needs to be called before the tables are serialised because we
put ids on fields.
"""
for table in list(dataSet.tables.values()):
for svcMeta in table.iterMeta("_associatedDatalinkService"):
try:
service = base.resolveId(table.tableDef.rd,
base.getMetaText(svcMeta, "serviceId"))
# datalink cannot be globally imported, as we can't depend on
# anything in protocols. Importing it here is ok, though;
# we'll just not produce datalink declarations if the server
# can't do it.
from gavo.protocols import datalink
yield datalink.makeDatalinkServiceDescriptor(
ctx, service, table.tableDef,
base.getMetaText(svcMeta, "idColumn"))
# Temporary hack: for now, if something wants to have immediate
# soda resources on results (so far, only SSAP does), they add
# generating functions to data.sodaGenerators. This needs
# to be replaced with something better when we understand what
# we actually want.
while getattr(dataSet, "sodaGenerators", []):
yield dataSet.sodaGenerators.pop()(ctx)
except Exception as ex:
base.ui.notifyWarning("RD %s: request for datalink service"
" could not be satisfied (%s)"%(
getattr(table.tableDef.rd, "sourceId", "<internal>"),
ex))
def _iterUniqueInfosFromMeta(
ctx, metaCarriers, metaKey, infoName, ucd, infoText):
"""yields standard INFOs from meta items on a sequence of metaCarriers.
metaKey references the meta items to make INFOs from. InfoName and
ucd are set on the INFO elements, infoText will be formatted
into the body text, where this functions locals are template arguments.
That includes value as the INFO's value.
For when the UCD depends on the INFO's value, ucd can also be
a callable, which receives the value and must return the UCD.
The function makes sure that no item is returned more than once.
"""
seenItems = set()
if ctx.version<(1,2):
# No UCD on INFO for ancient VOTables
ucd = None
for mc in metaCarriers:
for metaValue in mc.iterMeta(metaKey, propagate=True):
value = metaValue.getContent("text", macroPackage=mc)
if value in seenItems:
continue
seenItems.add(value)
if callable(ucd):
thisUCD = ucd(value)
else:
thisUCD = ucd
yield V.INFO(name=infoName, value=value, ucd=thisUCD)[
infoText.format(**locals())]
def _iterResourceMeta(ctx, dataSet):
"""adds Data Origin metadata to the RESOURCE parent.
At this point, we don't know the creating service any more, and depending
on the sort of thing we are doing here, that's where we should take
the metadata from. We therefore inspect the contributingMetaCarriers
attribute on the data item coming in and integrate the metadata
from everything that's in there (usually a service, but in the TAP
case perhaps also joined tables).
If this kind of things becomes more common, we should think of a
less hacky way to smuggle extra meta sources in here.
"""
yield V.DESCRIPTION[base.getMetaText(dataSet, "description",
macroPackage=dataSet.dd.rd, propagate=False)]
for el in itertools.chain(
_iterInfoInfos(dataSet), _iterWarningInfos(dataSet)):
yield el
yield V.INFO(name="server_software", value=base.SERVER_SOFTWARE)[
"Software that produced this VOTable"]
yield V.INFO(name="server", value=base.getConfig("web", "serverURL"))[
"Base URI of the server"]
sources = dataSet.contributingMetaCarriers
yield from _iterUniqueInfosFromMeta(ctx, sources,
"howtociteLink",
"citation",
"",
"Advice on citing this resource")
yield from _iterUniqueInfosFromMeta(ctx, sources,
"source",
"publication_id",
lambda value: "meta.bib.bibcode"
if utils.couldBeABibcode(value)
else "meta.bib",
"A bibliographic source citable for (parts of) this data")
yield from _iterUniqueInfosFromMeta(ctx, sources,
"published_identifier",
"ivoid",
"meta.ref.ivoid",
"Originating VO resource")
yield from _iterUniqueInfosFromMeta(ctx, sources,
"publisher",
"publisher",
None,
"Data centre that has delivered the data")
yield from _iterUniqueInfosFromMeta(ctx, sources,
"rights",
"rights",
None,
"Legal conditions applicable to (parts of) the data contained")
yield from _iterUniqueInfosFromMeta(ctx, sources,
"rights.rightsURI",
"rights_uri",
None,
"URI of legal conditions applicable to (parts of) the data contained")
yield V.INFO(name="request_date", ucd="time.creation",
value=utils.formatISODT(datetime.datetime.utcnow()))
yield from _iterUniqueInfosFromMeta(ctx, sources,
"contact.email",
"contact",
"meta.email",
"Contact option")
yield from _iterUniqueInfosFromMeta(ctx, sources,
"referenceURL",
"reference_url",
"meta.ref.url",
"More information on the data Source")
yield from _iterUniqueInfosFromMeta(ctx, sources,
"version",
"resource_version",
"meta.version",
"Version of a contributing resource.")
yield from _iterUniqueInfosFromMeta(ctx, sources,
"creator.name",
"creator",
"meta.bib.author",
"Name of a person or entity that produced a contributing resource")
def _iterToplevelMeta(ctx, dataSet):
"""yields meta elements for the entire VOTABLE from dataSet's RD.
"""
rd = dataSet.dd.rd
if rd is None:
return
yield V.DESCRIPTION[base.getMetaText(rd, "description",
macroPackage=dataSet.dd.rd)]
for infoItem in rd.iterMeta("copyright"):
yield V.INFO(name="legal", value=infoItem.getContent("text",
macroPackage=dataSet.dd.rd))
# link elements may be defined using the votlink meta on RESOURCE, TABLE,
# GROUP, FIELD, or PARAM; within in the DC, GROUPs have no meta structure,
# so we don't run _linkBuilder on them.
def _makeLinkForMeta(args, localattrs=None):
localattrs.update({"href": args[0]})
return V.LINK(**localattrs)
_linkBuilder = meta.ModelBasedBuilder([
('votlink', _makeLinkForMeta, (), {
"href": "href",
"content_role": "role",
"content_type": "contentType",
"name": "linkname",})])
################# Generating FIELD and PARAM elements.
def _makeValuesForColDesc(colDesc):
"""returns a VALUES element for a column description.
This just stringifies whatever is in colDesc's respective columns,
so for anything fancy pass in byte strings to begin with.
"""
valEl = V.VALUES()
if colDesc.get("min") is None:
colDesc["min"] = getattr(colDesc.original.values, "min", None)
if colDesc.get("max") is None:
colDesc["max"] = getattr(colDesc.original.values, "max", None)
if colDesc["max"] is utils.Infimum:
colDesc["max"] = None
if colDesc["min"] is utils.Supremum:
colDesc["min"] = None
if colDesc["min"] is not None:
valEl[V.MIN(value=str(colDesc["min"]))]
if colDesc["max"] is not None:
valEl[V.MAX(value=str(colDesc["max"]))]
if colDesc["nullvalue"] is not None:
valEl(null=colDesc["nullvalue"])
for option in getattr(colDesc.original.values, "options", []):
valEl[V.OPTION(value=option.content_ or "", name=option.title)]
return valEl
# keys copied from colDescs to FIELDs in _getFieldFor
_voFieldCopyKeys = ["name", "datatype", "ucd", "utype", "ref"]
[docs]def defineField(ctx, element, colDesc):
"""adds attributes and children to element from colDesc.
element can be a V.FIELD or a V.PARAM *instance* and is changed in place.
This function returns None to remind people we're changing in place
here.
"""
# bomb if you got an Element rather than an instance -- with an
# Element, things would appear to work, but changes are lost when
# this function ends.
assert not isinstance(element, type), ("Got FIELD/PARAM element"
" instead of instance in VOTable defineField")
if colDesc["arraysize"]!='1':
element(arraysize=colDesc["arraysize"])
# (for char, keep arraysize='1' to keep topcat happy)
if colDesc["datatype"]=='char' and colDesc["arraysize"]=='1':
element(arraysize='1')
if colDesc["unit"]:
element(unit=colDesc["unit"])
element(ID=colDesc["id"])
# don't include xtype if writing 1.1
xtype = colDesc.get("xtype")
if ctx.version>(1,1):
element(xtype=xtype)
if isinstance(element, V.PARAM):
if hasattr(colDesc.original, "getStringValue"):
try:
element(value=str(colDesc.original.getStringValue()))
except:
# there's too much that can legitimately go wrong here to bother:
pass
if colDesc.original:
rscCol = colDesc.original
if rscCol.hasProperty("targetType"):
element[V.LINK(
content_type=rscCol.getProperty("targetType"),
title=rscCol.getProperty("targetTitle", "Link"))]
element(**dict((key, colDesc.get(key) or None)
for key in _voFieldCopyKeys))[
V.DESCRIPTION[colDesc["description"]],
_makeValuesForColDesc(colDesc),
_linkBuilder.build(colDesc.original)
]
[docs]def makeFieldFromColumn(ctx, colType, rscCol):
"""returns a VOTable colType for a rscdef column-type thing.
This function lets you make PARAM and FIELD elements (colType) from
column or param instances.
"""
instance = colType()
defineField(ctx, instance, valuemappers.AnnotatedColumn(rscCol))
return instance
def _iterFields(ctx, serManager):
"""iterates over V.FIELDs based on serManger's columns.
"""
for colDesc in serManager:
el = V.FIELD()
defineField(ctx, el, colDesc)
yield el
def _makeVOTParam(ctx, param):
"""returns VOTable stan for param.
"""
# note that we're usually accessing the content, i.e., the string
# serialization we got. The only exception is when we're seeing
# nulls or null-equivalents.
if param.content_ is base.NotGiven or param.value is None:
content = None
else:
content = param.content_
el = V.PARAM()
defineField(ctx, el, valuemappers.AnnotatedColumn(param))
# id management is a particular pain for params at this moment,
# because DM annotation uses the tableDef's params, whereas
# we see the copied params here. We should stop copying
# the params in rsc.common.ParamMixin.
try:
el.ID = ctx.getIdFor(param.originalParam)
except (base.NotFoundError, AttributeError):
try:
el.ID = ctx.getIdFor(param)
except base.NotFoundError:
pass
if content is None:
el.value = ""
else:
el.value = content
return el
def _iterTableParams(ctx, serManager):
"""iterates over V.PARAMs based on the table's param elements.
"""
for param in serManager.table.iterParams():
votEl = _makeVOTParam(ctx, param)
if votEl is not None:
ctx.addID(param, votEl)
yield votEl
####################### Tables and Resources
# an ad-hoc mapping of what STC1 has in frame to what VOTable 1.1 has for
# COOSYS/@system
STC_FRAMES_TO_COOSYS = {
'ICRS': 'ICRS',
'FK5': 'eq_FK5',
'FK4': 'eq_FK4',
'ECLIPTIC': 'ecl_FK5', # neglecting the odds there's actually ecl_FK4 data
'GALACTIC_II': 'galactic',
'SUPERGALACTIC': 'supergalactic',
None: None}
# these are utypes for which column refs should be ref-ed to the COOSYS.
COLUMN_REF_UTYPES = [
"stc:astrocoords.position2d.value2.c1",
"stc:astrocoords.position2d.value2.c2",
"stc:astrocoords.position2d.error2.c1",
"stc:astrocoords.position2d.error2.c2",
"stc:astrocoords.velocity2d.value2.c1",
"stc:astrocoords.velocity2d.value2.c2",
"stc:astrocoords.velocity2d.error2.c1",
"stc:astrocoords.velocity2d.error2.c2",
"stc:astrocoords.redshift.value",
"stc:astrocoords.position3d.value3.c1",
"stc:astrocoords.position3d.value3.c2",
"stc:astrocoords.position3d.value3.c3",
"stc:astrocoords.time.timeinstant",
"stc:astrocoords.velocity3d.value3.c1",
"stc:astrocoords.velocity3d.value3.c2",
"stc:astrocoords.velocity3d.value3.c3",
"stc:astrocoords.position2d.epoch",
"stc:astrocoords.position3d.epoch",
]
def _addEpochToCoosys(coosys, utypeMap):
"""tries to locate epoch information utypeMap and adds it to the
V.COOSYS instance coosys if found.
"""
for epochBase in [
"stc:astrocoords.position2d.", "stc:astrocoords.position3d."]:
if epochBase+"epoch" in utypeMap:
break
else:
return
epVal = utypeMap[epochBase+"epoch"]
if not isinstance(epVal, stc.ColRef):
# If the epoch is not a column reference, inline it.
# (the code below will ignore it in that case)
coosys(epoch="%s%s"%(
utypeMap.get(epochBase+"epoch.yeardef", "J"),
epVal))
def _makeCOOSYSFromSTC1(utypeMap, serManager):
"""returns a VOTable 1.1 COOSYS element inferred from a map of stc utypes
to STC1 values.
We let through coordinate frames not defined in VOTable 1.1 at the
expense of making the VOTables XSD-invalid with such frames; this
seems preferable to not declaring anything.
As a side effect, this will change column/@ref attributes (possibly also
param/@ref). If a column is part of two STC structures, the first
one will win. Yeah, that spec sucks.
"""
coosys = V.COOSYS()
sysId = serManager.makeIdFor(coosys, "system")
coosys(ID=sysId)
_addEpochToCoosys(coosys, utypeMap)
stcFrame = utypeMap.get(
"stc:astrocoordsystem.spaceframe.coordrefframe", None)
coosys(system=STC_FRAMES_TO_COOSYS.get(stcFrame, stcFrame))
if "references-from-coosys" in base.getConfig("future"):
coosys(refposition=utypeMap.get(
"stc:astrocoordsystem.spaceframe.referenceposition", None))
for utype in COLUMN_REF_UTYPES:
if utype in utypeMap:
val = utypeMap[utype]
if isinstance(val, stc.ColRef):
col = serManager.getColumnByName(str(val))
if not col.get("ref"):
col["ref"] = sysId
return coosys
def _iterSTC(ctx, tableDef, serManager):
"""adds STC groups for the systems to votTable fetching data from
tableDef.
"""
def getColumnFor(colRef):
try:
return serManager.getColumnByName(colRef.dest)
except KeyError:
# in ADQL processing, names are lower-cased, and there's not
# terribly much we can do about it without breaking other things.
# Hence, let's try and see whether our target is there with
# case normalization:
return serManager.getColumnByName(colRef.dest.lower())
def getIdFor(colRef):
return getColumnFor(colRef)["id"]
for ast in tableDef.getSTCDefs():
container, utypeMap = modelgroups.marshal_STC(ast, getIdFor)
if ctx.version>(1,1):
# "Note-style" STC only supported in 1.2 and higher
yield container
# legacy COOSYS specification supported everywhere
coosysEl = _makeCOOSYSFromSTC1(utypeMap, serManager)
ctx.coosysByAST[str(id(ast))] = coosysEl
ctx.getEnclosingResource()[coosysEl]
def _addStupidRefByAnnotation(ctx, annotation, serManager, destId):
"""adds a @ref attribute to destId to a PARAM or FIELD that
annotation references.
This is for legacy TIMESYS or COOSYS, and the born-legacy PHOTCAL
group. Sigh.
"""
if isinstance(annotation, dm.ColumnAnnotation):
col = serManager.getColumnByName(annotation.value.name)
if not col.get("ref"):
col["ref"] = destId
elif isinstance(annotation, dm.ParamAnnotation):
ctx.pushRefFor(
serManager.table.getParamByName(annotation.value.name),
destId)
# mapping from our votable-stc role names to COOSYS utypes
_STC_UTYPE_MAPPING = {
"longitude": "votable:LonLatPoint-lon",
"latitude": "votable:LonLatPoint-lat",
"pm_longitude": "votable:ProperMotion-lon",
"pm_latitude": "votable:ProperMotion-lat",
"distance": "votable:LonLatPoint-dist",
"rv": "votable:ProperMotion-rv",
}
if "references-from-coosys" in base.getConfig("future"):
def _makeLegacyRefForAnn(ctx, annotation, utype):
"""returns a PARAMref or FIELDref for annotation with the utype
passed.
The target of annotation will receive an id in ctx. If the
"""
valId = ctx.getOrMakeIdFor(
annotation.value, suggestion=annotation.value.name)
if isinstance(annotation, dm.ColumnAnnotation):
return V.FIELDref(utype=utype, ref=valId)
elif isinstance(annotation, dm.ParamAnnotation):
return V.PARAMref(utype=utype, ref=valId)
else:
base.ui.notifyWarning(f"Unknown annotation {annotation}")
# fall through to returning None, which the serialiser will swallow
else:
def _makeLegacyRefForAnn(ctx, annotation, utype):
return None
def _guessTimeOrigin(frame, timeCol):
"""returns a plausible value for a TIMESYS' timeorigin attribute
from a frame definition and a time column.
This is a depressing mess; thanks, DM WG.
"""
if "time0" in frame:
# that's the only sane option
return frame.get("time0")
if not isinstance(timeCol, dm.ParamLikeAnnotation):
# it's a literal and we have no further metadata.
return
timeCol = timeCol.weakref()
# let's guess time origin based on units and such.
if timeCol.unit=="d":
# assuming JD or MJD. Since people didn't give a time0, try to guess.
if "mjd" in timeCol.name:
return "MJD-origin"
# JD is a silly default, but what can I do? SIGH!
return "JD-origin"
if timeCol.unit=="s":
# if that's really not a unix timestamp, people need to
# give a time0.
return "2440587.5"
# we ought to assert unit="yr" here
return None
def _addLegacySYSFromVOTableSTC(ctx, tableDef, serManager):
"""adds COOSYS and TIMESYS elements from votable-stc annotation to
the enclosing RESOURCE.
"""
for ann in tableDef.iterAnnotationsOfType("votable:Coords"):
if "time" in ann:
timeFrame = ann["time"].get("frame", {})
timesysEl = V.TIMESYS(
timescale=timeFrame.get("timescale", "UNKNOWN"),
refposition=timeFrame.get("refPosition", "UNKNOWN"),
timeorigin=_guessTimeOrigin(timeFrame, ann["time"].get("location")))
timesysId = serManager.makeIdFor(timesysEl, "ts")
timesysEl(ID=timesysId)
ctx.getEnclosingResource()[
timesysEl]
if "location" in ann["time"]:
_addStupidRefByAnnotation(
ctx,
ann["time"]["location"],
serManager,
timesysId)
if "space" in ann:
spaceFrame = ann["space"]["frame"]
coosys = None
if "astid" in ann:
# the annotation was built from an <stc> declaration, and
# _iterSTC may already have created the coosys element, in
# which case we want to re-use it.
coosys = ctx.coosysByAST.get(str(ann["astid"]))
if coosys is None:
# no such coosys created yet
coosys = V.COOSYS()
ctx.getEnclosingResource()[coosys]
sysId = serManager.getOrMakeIdFor(coosys, "system")
# we probably should be more careful with literals vs. references
# for the orientation, too
coosys(ID=sysId,
system=STC_FRAMES_TO_COOSYS[spaceFrame.get("orientation")],
refposition=spaceFrame.get("refPosition"))
epVal = spaceFrame.get("epoch")
if isinstance(epVal, str):
coosys(epoch=epVal)
elif epVal:
coosys[_makeLegacyRefForAnn(
ctx, epVal, "votable:CustomRefLocation-epoch")]
for child in ann["space"].iterChildRoles():
_addStupidRefByAnnotation(ctx, child, serManager, sysId)
if (child.name in _STC_UTYPE_MAPPING and
isinstance(child, dm.ParamLikeAnnotation)):
coosys[_makeLegacyRefForAnn(
ctx, child, _STC_UTYPE_MAPPING[child.name])]
_PHOT_CAL_PARAMS = {
"filterIdentifier": dict(name="filterIdentifier", datatype="char",
ucd="meta.id;instr.filter", arraysize="*",
utype="photDM:PhotometryFilter.identifier"),
"zeroPointFlux": dict(name="zeroPointFlux",
ucd="phot.mag;arith.zp", utype="photDM:PhotCal.zeroPoint.flux.value",
unit="Jy", datatype="double"),
"magnitudeSystem": dict(name="magnitudeSystem", ucd="meta.code",
utype="photDM:PhotCal.magnitudeSystem.type", datatype="char",
arraysize="*"),
"effectiveWavelength": dict(name="effectiveWavelength",
ucd="em.wl.effective", unit="m", datatype="double",
utype="photDM:PhotometryFilter.spectralLocation.value"),
}
def _addLegacyPhotCal(ctx, tableDef, serManager):
"""adds PhotCal groups to the enclosing RESOURCE.
"""
for ann in tableDef.iterAnnotationsOfType("phot:PhotCal"):
photGroup = V.GROUP(name="photcal")
photGroup(ID=serManager.makeIdFor(photGroup, "phot_def"))
for child in ann.iterChildRoles():
if child.name=="value":
# DaCHS extension: reference the column/param this is for
photGroup[V.FIELDref(utype="adhoc:location",
ref=ctx.getOrMakeIdFor(child.value))]
_addStupidRefByAnnotation(ctx, child, serManager, photGroup.ID)
elif child.name in _PHOT_CAL_PARAMS:
photGroup[V.PARAM(**_PHOT_CAL_PARAMS[child.name])(
value=str(child.value))]
else:
# just ignore items we don't understand
pass
ctx.getEnclosingResource()[photGroup]
def _iterNotes(serManager):
"""yields GROUPs for table notes.
The idea is that the note is in the group's description, and the FIELDrefs
give the columns that the note applies to.
"""
# add notes as a group with FIELDrefs, but don't fail on them
for key, note in serManager.notes.items():
noteId = serManager.getOrMakeIdFor(note)
noteGroup = V.GROUP(name="note-%s"%key, ID=noteId)[
V.DESCRIPTION[note.getContent(targetFormat="text")]]
for col in serManager:
if col["note"] is note:
noteGroup[V.FIELDref(ref=col["id"])]
yield noteGroup
def _makeRef(baseType, ref, container, serManager):
"""returns a new node of baseType reflecting the group.TypedRef
instance ref.
container is the destination of the reference. For columns, that's
the table definition, but for parameters, this must be the table
itself rather than its definition because it's the table's
params that are embedded in the VOTable.
"""
return baseType(
ref=serManager.getOrMakeIdFor(ref.resolve(container)),
utype=ref.utype,
ucd=ref.ucd)
def _iterGroups(ctx, container, serManager):
"""yields GROUPs for the RD groups within container, taking params and
fields from serManager's table.
container can be a tableDef or a group.
"""
for group in container.groups:
votGroup = V.GROUP(ucd=group.ucd, utype=group.utype, name=group.name)
votGroup[V.DESCRIPTION[group.description]]
for ref in group.columnRefs:
votGroup[_makeRef(V.FIELDref, ref,
serManager.table.tableDef, serManager)]
for ref in group.paramRefs:
votGroup[_makeRef(V.PARAMref, ref,
serManager.table, serManager)]
for param in group.params:
votGroup[_makeVOTParam(ctx, param)]
for subgroup in _iterGroups(ctx, group, serManager):
votGroup[subgroup]
yield votGroup
[docs]def makeTable(ctx, table, isMeta=False):
"""returns a Table node for the table.Table instance table.
"""
sm = valuemappers.SerManager(table, mfRegistry=ctx.mfRegistry,
idManager=ctx, acquireSamples=ctx.acquireSamples)
# this must happen before FIELDs and such are serialised to ensure
# referenced things have IDs.
result = V.TABLE()
with ctx.activeContainer(result):
# start out with VO-DML annotation so everything that needs
# an id has one.
if ctx.produceVODML:
for ann in table.tableDef.annotations:
try:
ctx.addVODMLMaterial(ann.getVOT(ctx, table))
except Exception as msg:
# never fail just because stupid DM annotation doesn't work out
base.ui.notifyError("%s-typed DM annotation failed: %s"%(
ann.type, msg))
# iterate STC before serialising the columns so the columns
# have the stupid ref to COOSYS
result[_iterSTC(ctx, table.tableDef, sm)]
# same for votable-STC (except here the FIELDs need reliable IDs)
try:
_addLegacySYSFromVOTableSTC(ctx, table.tableDef, sm)
except Exception as ex:
base.ui.notifyError(f"Could not serialise VOTable COOSYS/TIMESYS: {ex}")
# photometry hack
_addLegacyPhotCal(ctx, table.tableDef, sm)
result(
name=base.getMetaText(table, "name", table.tableDef.id),
utype=base.getMetaText(table, "utype", macroPackage=table.tableDef,
propagate=False))[
# _iterGroups must run before _iterFields and _iterParams since it
# may need to add ids to the respective items. XSD-correct ordering of
# the elements is done by xmlstan.
V.DESCRIPTION[base.getMetaText(table, "description",
macroPackage=table.tableDef, propagate=False)],
_iterGroups(ctx, table.tableDef, sm),
_iterFields(ctx, sm),
_iterTableParams(ctx, sm),
_iterNotes(sm),
_linkBuilder.build(table.tableDef),
]
if isMeta:
# we take the "should not contain data" from the spec as "will not".
# That is, we will simply drop any data on resources declared as meta
return result
else:
return votable.DelayedTable(result,
sm.getMappedTuples(),
tableEncoders[ctx.tablecoding],
overflowElement=ctx.overflowElement)
def _makeResource(ctx, data):
"""returns a Resource node for the rsc.Data instance data.
"""
resType = base.getMetaText(data, "_type")
res = V.RESOURCE()
# For now, DaCHS will only have one RESOURCE that can contain VODML
# annotation. In case that ever changes, ctx will have to keep track
# of what "top-level" RESOURCE we're annotating (unless MIVOT gets
# sanitised).
if ctx.produceVODML:
res[V.RESOURCE(type="meta")[ctx.rootVODML]]
with ctx.activeContainer(res):
res(type=resType,
utype=base.getMetaText(data, "utype"))[
_iterResourceMeta(ctx, data), [
_makeVOTParam(ctx, param) for param in data.iterParams()],
_linkBuilder.build(data.dd),
]
for table in data:
with ctx.buildingFromTable(table):
res[makeTable(ctx, table, isMeta=resType=="meta")]
res[ctx.overflowElement]
return res
############################# Toplevel/User-exposed code
makeResource = _makeResource
[docs]def makeVOTable(data, ctx=None, **kwargs):
"""returns a votable.V.VOTABLE object representing data.
data can be an rsc.Data or an rsc.Table. data can be a data or a table
instance, tablecoding any key in votable.tableEncoders.
You may pass a VOTableContext object; if you don't a context
with all defaults will be used.
A deprecated alternative is to directly pass VOTableContext constructor
arguments as additional keyword arguments. Don't do this, though,
we'll probably remove the option to do so at some point.
You will usually pass the result to votable.write. The object returned
contains DelayedTables, i.e., most of the content will only be realized at
render time.
"""
ctx = ctx or VOTableContext(**kwargs)
data = rsc.wrapTable(data)
if ctx.version==(1,1):
vot = V.VOTABLE11()
elif ctx.version==(1,2):
vot = V.VOTABLE12()
elif ctx.version==(1,3):
raise votable.VOTableError("Cannot write VOTable 1.3 any more"
" (and you shouldn't have reason to).")
elif ctx.version==(1,4):
vot = V.VOTABLE()
elif ctx.version==(1,5):
vot = V.VOTABLE()
else:
raise votable.VOTableError("No toplevel element for VOTable version %s"%
repr(ctx.version))
dlResources = list(_iterDatalinkResources(ctx, data))
vot[_iterToplevelMeta(ctx, data)]
vot[_makeResource(ctx, data)]
vot[dlResources]
if ctx.produceVODML:
vot._fixedTagMaterial += ' xmlns:mivot="{}"'.format(
utils.getPrefixInfo("mivot")[0])
if ctx.suppressNamespace:
# use this for "simple" table with nice element names
vot._fixedTagMaterial = ""
# What follows is a hack around the insanity of stuffing
# unused namespaces and similar detritus into VOTable's roots.
rootAttrs = data.getMeta("_votableRootAttributes")
if rootAttrs:
rootHacks = [vot._fixedTagMaterial]+[
item.getContent() for item in rootAttrs]
vot._fixedTagMaterial = " ".join(s for s in rootHacks if s)
return vot
[docs]def writeAsVOTable(data, outputFile, ctx=None, **kwargs):
"""writes ``data`` to the ``outputFile``.
data can be a table or ``Data`` item.
``ctx`` can be a ``VOTableContext`` instance; alternatively,
``VOTableContext`` constructor arguments can be passed in as
``kwargs``.
"""
ctx = ctx or VOTableContext(**kwargs)
vot = makeVOTable(data, ctx)
votable.write(vot, outputFile)
[docs]def getAsVOTable(data, ctx=None, **kwargs):
"""returns a string containing a VOTable representation of data.
``kwargs`` can be constructor arguments for VOTableContext.
"""
ctx = ctx or VOTableContext(**kwargs)
dest = io.BytesIO()
writeAsVOTable(data, dest, ctx)
return dest.getvalue()
common.registerDataWriter("votable", format,
base.votableType, "Default VOTable", ".vot",
tapId="ivo://ivoa.net/std/TAPRegExt#output-votable-binary")
common.registerDataWriter("votableb2", functools.partial(
format, tablecoding="binary2"),
"application/x-votable+xml;serialization=BINARY2",
"Binary2 VOTable",
".votb2",
"votable/b2",
tapId="ivo://ivoa.net/std/TAPRegExt#output-votable-binary2")
common.registerDataWriter("votabletd", functools.partial(
format, tablecoding="td"),
"application/x-votable+xml;serialization=TABLEDATA", "Tabledata VOTable",
".vottd",
"text/xml",
"votable/td",
tapId="ivo://ivoa.net/std/TAPRegExt#output-votable-td")
common.registerDataWriter("votabletd1.1", functools.partial(
format, tablecoding="td", version=(1,1)),
"application/x-votable+xml;serialization=TABLEDATA;version=1.1",
"Tabledata VOTable version 1.1",
".vot1",
"text/xml")
common.registerDataWriter("votable1.1", functools.partial(
format, tablecoding="binary", version=(1,1)),
"application/x-votable+xml;version=1.1",
"Tabledata VOTable version 1.1",
".vot1",
"text/xml")
common.registerDataWriter("votabletd1.2", functools.partial(
format, tablecoding="td", version=(1,2)),
"application/x-votable+xml;serialization=TABLEDATA;version=1.2",
"Tabledata VOTable version 1.2",
".vot2",
"text/xml")
common.registerDataWriter("vodml", functools.partial(
format, tablecoding="td", version=(1,5)),
"application/x-votable+xml;serialization=TABLEDATA;version=1.5",
"VOTable version 1.5, tabledata",
".vot5")
common.registerDataWriter("vodmlb", functools.partial(
format, version=(1,5)),
"application/x-votable+xml;version=1.5",
"VOTable version 1.5",
".vot5")