"""
Parsing and translating VOTables to internal data structures.
This is glue code to the more generic votable library. In general, you
should access this module through formats.votable.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import gzip
from gavo import base
from gavo import rsc
from gavo import rscdef
from gavo import utils
from gavo import votable
from gavo.base import valuemappers
from gavo.grammars import votablegrammar
from gavo.votable import V
from gavo.votable import modelgroups
MS = base.makeStruct
[docs]class QuotedNameMaker(object):
"""A name maker for makeTableDefForVOTable implementing TAP's requirements.
"""
def __init__(self):
self.index, self.seenNames = 0, set()
[docs] def makeName(self, field):
self.index += 1
res = getattr(field, "name", None)
if res is None:
raise base.ValidationError("Field without name in upload.",
"UPLOAD")
if res in self.seenNames:
raise base.ValidationError("Duplicate column name illegal in"
" uploaded tables (%s)"%res, "UPLOAD")
self.seenNames.add(res)
return utils.QuotedName(res)
_PG_RESERVED_COLUMN_NAMES = set([
"oid", "tableoid", "xmin", "cmin", "xmax", "cmax", "ctid"])
class _ChangedName(str):
"""a sentinel class to tell upstream that a name has been
changed in a way that must be reflected in a query.
"""
[docs]class AutoQuotedNameMaker(object):
"""A name maker for makeTableDefForVOTable quoting names as necessary.
This is for PostgreSQL; it will also avoid PG's reserved column names (oid
and friends); hence, this is what you should be using to put VOTables
into postgres tables.
This will break on duplicate names right now. I expect I'll change
that behaviour to just renaming away name clashes at some point.
"""
def __init__(self, forRowmaker=False):
self.seenNames = set()
[docs] def makeName(self, field):
name = getattr(field, "name", None)
if name is None:
raise base.ValidationError("Field without name in upload.",
"UPLOAD")
if valuemappers.needsQuoting(name):
if name in self.seenNames:
raise base.ValidationError("Duplicate column name illegal in"
" uploaded tables (%s)"%name, "UPLOAD")
self.seenNames.add(name)
return utils.QuotedName(name)
elif name.lower() in _PG_RESERVED_COLUMN_NAMES:
name = name.lower()+"_"
while name in self.seenNames:
name = name+"_"
self.seenNames.add(name)
return _ChangedName(name)
else:
if name.lower() in self.seenNames:
raise base.ValidationError("Duplicate column name illegal in"
" uploaded tables (%s)"%name, "UPLOAD")
self.seenNames.add(name.lower())
return name
def _getValuesFromField(votField):
"""returns None or an rscdef.Values instance for whatever is given
in votField.
"""
valArgs = {}
for valSpec in votField.iterChildrenOfType(V.VALUES):
if valSpec.null is not None:
valArgs["nullLiteral"] = valSpec.null
for minSpec in valSpec.iterChildrenOfType(V.MIN):
valArgs["min"] = minSpec.value
for maxSpec in valSpec.iterChildrenOfType(V.MAX):
valArgs["max"] = maxSpec.value
options = []
for optSpec in valSpec.iterChildrenOfType(V.OPTION):
# We don't support nested options in rscdef.
consArgs = {"content_": optSpec.value}
if optSpec.name:
consArgs["title"] = optSpec.name
options.append(base.makeStruct(rscdef.Option, **consArgs))
if options:
valArgs["options"] = options
if valArgs:
return base.makeStruct(rscdef.Values, **valArgs)
def _getColArgs(votInstance, name):
"""returns constructor arguments for an RD column or param from
a VOTable FIELD or PARAM.
"""
kwargs = {"name": name,
"tablehead": name.capitalize(),
"id": getattr(votInstance, "ID", None),
"type": base.voTableToSQLType(
votInstance.datatype, votInstance.arraysize, votInstance.xtype)}
for attName in ["ucd", "unit", "xtype"]:
if getattr(votInstance, attName, None) is not None:
kwargs[attName] = getattr(votInstance, attName)
if getattr(votInstance, "value", None) is not None:
kwargs["content_"] = votInstance.value
values = _getValuesFromField(votInstance)
if values:
kwargs["values"] = values
for desc in votInstance.iterChildrenOfType(V.DESCRIPTION):
kwargs["description"] = desc.text_
return kwargs
[docs]def makeTableDefForVOTable(tableId, votTable, nameMaker=None, rd=None,
**moreArgs):
"""returns a TableDef for a Table element parsed from a VOTable.
Pass additional constructor arguments for the table in moreArgs.
stcColumns is a dictionary mapping IDs within the source VOTable
to pairs of stc and utype.
nameMaker is an optional argument; if given, it must be an object
having a makeName(field) -> string or utils.QuotedName method.
It must return unique objects from VOTable fields and do that
reproducibly, i.e., for a given field the same name is returned.
The default is valuemappers.VOTNameMaker. When building TDs for Postgres,
use AutoQuotedNameMaker to generate valid column names.
As an extra service, in particular for ADQL name resolving, the column
objects returned here have an attribute originalName containing
whatever was originally in a FIELD's @name.
If unique "main" positions are given, a spatial q3c index will be
added.
"""
if nameMaker is None:
nameMaker = valuemappers.VOTNameMaker()
# make columns
columns = []
for f in votTable.iterChildrenOfType(V.FIELD):
newName = nameMaker.makeName(f)
columns.append(MS(rscdef.Column,
**_getColArgs(f, newName)))
# tell the ADQL machinery if we've significantly modified the
# name (i.e., more than just quoting).
if isinstance(newName, _ChangedName):
columns[-1].originalName = f.name
# make params
params = []
for f in votTable.iterChildrenOfType(V.PARAM):
try:
params.append(MS(rscdef.Param, **_getColArgs(f, f.name)))
except Exception as ex: # never die because of failing params
base.ui.notifyError("Unsupported PARAM ignored (%s)"%ex)
# Create the table definition
tableDef = MS(rscdef.TableDef, id=tableId, columns=columns,
params=params, parent_=rd, **moreArgs)
# Build STC info
for colInfo, ast in modelgroups.unmarshal_STC(votTable):
for colId, utype in colInfo.items():
try:
col = tableDef.getColumnById(colId)
col.stcUtype = utype
col.stc = ast
except utils.NotFoundError: # ignore broken STC
pass
return tableDef
[docs]def makeDDForVOTable(tableId, vot, gunzip=False, rd=None, **moreArgs):
"""returns a DD suitable for uploadVOTable.
moreArgs are additional keywords for the construction of the target
table.
Only the first resource will be turned into a DD. Currently,
only the first table is used. This probably has to change.
"""
tableDefs = []
for res in vot.iterChildrenOfType(V.RESOURCE):
for table in res.iterChildrenOfType(V.TABLE):
tableDefs.append(
makeTableDefForVOTable(tableId, table, rd=rd, **moreArgs))
break
break
if tableDefs:
makes = [MS(rscdef.Make, table=tableDefs[0])]
else:
makes = []
return MS(rscdef.DataDescriptor,
grammar=MS(votablegrammar.VOTableGrammar, gunzip=gunzip),
makes=makes)
def _getRowMaker(table):
"""returns a function turning a VOTable tuple to a database row
for table.
This is mainly just building a row dictionary, except we also
parse xtyped columns.
"""
from gavo.base.literals import parseDefaultDatetime #noflake: code gen
from gavo.stc import parseSimpleSTCS, simpleSTCSToPolygon #noflake: code gen
parts = []
for colInd, col in enumerate(table.tableDef):
valCode = "row[%d]"%colInd
parts.append("%s: %s"%(repr(col.key), valCode))
return utils.compileFunction(
"def makeRow(row):\n return {%s}"%(", ".join(parts)),
"makeRow",
locals())
[docs]def uploadVOTable(tableId, srcFile, connection, gunzip=False,
rd=None, **tableArgs):
"""creates a temporary table with tableId containing the first
table in the VOTable in srcFile.
The function returns a DBTable instance for the new file.
srcFile must be an open file object (or some similar object).
"""
if gunzip:
srcFile = gzip.GzipFile(fileobj=srcFile, mode="r")
try:
tuples = next(votable.parse(srcFile, raiseOnInvalid=False))
except StopIteration: # no table contained; bomb out
raise ValueError("Cannot parse VOTable (or no table contained)")
args = {"onDisk": True, "temporary": True}
args.update(tableArgs)
td = makeTableDefForVOTable(tableId, tuples.tableDefinition,
rd=rd, **args)
table = rsc.TableForDef(td, connection=connection, create=True)
makeRow = _getRowMaker(table)
with table.getFeeder() as feeder:
for tuple in tuples:
feeder.add(makeRow(tuple))
return table