"""
Serialisation of python values to VOTable PARAM values.
This has two aspects:
- Guessing proper VOTable type descriptors for python values
(use guessParamAttrsForValue)
- Serialising the python values to strings suitable for the PARAM.
(use serializeToParam)
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import functools
import datetime
from gavo import utils
from gavo.utils import pgsphere
from gavo.utils import serializers
from gavo.votable import coding
from gavo.votable import common
from gavo.votable import enc_tabledata
from gavo.votable import dec_tabledata
from gavo.votable.model import VOTable as V
_SEQUENCE_TYPES = (tuple, list)
_ATOMIC_TYPES = [
(int, {"datatype": "int"}),
(str, {"datatype": "char", "arraysize": "*"}),
(str, {"datatype": "unicodeChar", "arraysize": "*"}),
(float, {"datatype": "double"}),
(type(None), {"datatype": "double"}),
(complex, {"datatype": "doubleComplex"}),
(datetime.datetime, {"datatype": "char",
"arraysize": "20",
"xtype": "timestamp"}),
(datetime.date, {"datatype": "char",
"arraysize": "20",
"xtype": "dachs:DATE"}),
(pgsphere.SPoint, {"datatype": "double precision",
"arraysize": "2",
"xtype": "point"}),
(pgsphere.SCircle, {"datatype": "double precision",
"arraysize": "3",
"xtype": "circle"}),
(pgsphere.SPoly, {"datatype": "double precision",
"arraysize": "*",
"xtype": "polygon"}),]
def _combineArraysize(arraysize, attrs):
"""makes an arraysize attribute for a value with attrs.
This will in particular check that any existing arraysize in
attrs does not end with a star (as variable length is only allowed
in the slowest coordinate).
attrs is changed in place.
"""
if "arraysize" in attrs:
if attrs["arraysize"].endswith("*"):
raise ValueError("Arrays of variable-length arrays are not allowed.")
attrs["arraysize"] = "%sx%s"%(attrs["arraysize"], arraysize)
else:
attrs["arraysize"] = arraysize
def _guessParamAttrsForSequence(pythonVal):
"""helps guessParamAttrsForValue when the value is a sequence.
"""
arraysize = str(len(pythonVal))
if len(pythonVal)==0:
return {
"datatype": "char",
"arraysize": "0"}
elementVal = pythonVal[0]
if isinstance(elementVal, str):
# special case as this may become common
attrs = {
"arraysize": "%sx%s"%(
max(len(s) for s in pythonVal), arraysize),
"datatype": "char"}
elif isinstance(elementVal, _SEQUENCE_TYPES):
attrs = _guessParamAttrsForSequence(elementVal)
_combineArraysize(arraysize, attrs)
else:
attrs = _guessParamAttrsForAtom(elementVal)
_combineArraysize(arraysize, attrs)
return attrs
def _guessParamAttrsForAtom(pythonVal):
"""helps guessParamAttrsForValue when the value is atomic.
(where "atomic" includes string, and other things that actually
have non-1 arraysize).
"""
retval = None
for type, attrs in _ATOMIC_TYPES:
if isinstance(pythonVal, type):
retval = attrs.copy()
break
if retval is None:
raise utils.NotFoundError(repr(pythonVal),
"VOTable type code for", "paramval.py predefined types")
if retval.get("datatype")=="int":
if abs(pythonVal)<32768:
retval["datatype"] = "short"
elif abs(pythonVal)>2147483647:
retval["datatype"] = "long"
return retval
[docs]def guessParamAttrsForValue(pythonVal):
"""returns a dict of proposed attributes for a PARAM to keep pythonVal.
There is, of course, quite a bit of heuristics involved. For instance,
we assume sequences are homogeneous.
"""
if isinstance(pythonVal, _SEQUENCE_TYPES):
return _guessParamAttrsForSequence(pythonVal)
else:
return _guessParamAttrsForAtom(pythonVal)
def _setNULLValue(param, val):
"""sets the null literal of param to val.
"""
valEls = list(param.iterChildrenWithName("VALUES"))
if valEls:
valEls[0](null=val)
else:
param[V.VALUES(null=val)]
def _serializeNULL(param):
"""changes the VOTable PARAM param so it evaluates to NULL.
"""
if param.datatype in ["float", "double"]:
element = "NaN "
elif param.datatype in ["unsignedByte", "short", "int", "long"]:
element = "99 "
_setNULLValue(param, element)
elif param.datatype in ["char", "unicodeChar"]:
element = "x"
_setNULLValue(param, element)
else:
raise ValueError("No recipe for %s null values"%param.datatype)
if param.isScalar():
param.value = element.strip()
elif param.hasVarLength():
param.value = ""
else:
param.value = (element*param.getLength()).strip()
[docs]class PrimitiveAnnotatedColumn(dict):
"""A stand-in for serializers.AnnotatedColumn.
We don't want to use the full thing as it's too fat here, and
getVOTSerializer doesn't have the original param anyway (as
it shouldn't, as that would break memoization).
"""
[docs] class original(object):
stc = None
xtype = None
def __init__(self, datatype, arraysize, xtype):
dict.__init__(self, {
"nullvalue": "",
"name": "anonymous",
"dbtype": None,
"displayHint": {},
"note": None,
"ucd": None,
"utype": None,
"unit": None,
"description": None,
"id": None,
"datatype": datatype,
"arraysize": arraysize,
"xtype": xtype})
[docs]@functools.lru_cache(None)
def getVOTSerializer(datatype, arraysize, xtype):
"""returns a function serializing for values of params with the
attributes given.
"""
lines = "\n".join([
"def codec(val):"]+
common.indentList([
"val = mapper(val)",
"tokens = []"]+
enc_tabledata.getLinesFor(V.PARAM(**locals()))+[
"return tokens[0]"], " "))
mapper = serializers.defaultMFRegistry.getMapper(
PrimitiveAnnotatedColumn(datatype, arraysize, xtype))
env = enc_tabledata.getGlobals(None).copy()
env["mapper"] = mapper
return coding.buildCodec(lines, env)
[docs]def serializeToParam(param, val):
"""changes the VOTable PARAM param such that val is represented.
This may involve adding a null value.
"""
if val is None:
_serializeNULL(param)
else:
param.value = getVOTSerializer(
param.datatype, param.arraysize, param.xtype)(val)
[docs]@functools.lru_cache(None)
def getVOTParser(datatype, arraysize, xtype):
"""returns a function deserializing values in a param with datatype,
arraysize, and xtype.
"""
p = V.PARAM(name="anonymous", datatype=datatype, arraysize=arraysize,
xtype=xtype)
lines = "\n".join([
"def codec(val):"]
+common.indentList([
"row = []"]
+dec_tabledata.getLinesFor(p)
+[
"return row[0]"], " "))
return coding.buildCodec(lines, dec_tabledata.getGlobals(None))