"""
Common definitions for the GAVO VOTable modules.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import codecs
import functools
from gavo import utils
NaN = float("NaN")
[docs]class VOTableError(utils.Error):
"""The base class of VOTable-related errors.
"""
[docs]class BadVOTableLiteral(VOTableError):
"""Raised when a literal in a VOTable is invalid.
"""
def __init__(self,
type,
literal,
hint=None,
originalException=None,
name=None):
if name is None:
name = "<Unknown>"
VOTableError.__init__(self,
"Invalid literal for %s (field %s): '%s'"%(type, name, repr(literal)),
hint=hint)
self.type, self.literal, self.name = type, literal, name
self.originalException = originalException
def __str__(self):
return "Invalid literal for %s (field %s): %s"%(
self.type, self.name, repr(self.literal))
[docs]class BadVOTableData(VOTableError):
"""Raised when something is wrong with a value being inserted into
a VOTable.
"""
def __init__(self, msg, val, fieldName, hint=None):
VOTableError.__init__(self, msg, hint=hint)
self.fieldName, self.val = fieldName, repr(val)
def __getstate__(self):
return {"msg": self.msg, "val": self.val, "fieldName": self.fieldName}
def __str__(self):
return "Field '%s', value %s: %s"%(self.fieldName, self.val, self.msg)
[docs]class VOTableParseError(VOTableError):
"""Raised when something is grossly wrong with the document structure.
Note that the message passed already contains line and position. I'd
like to have them in separate attributes, but the expat library mashes
them up. iterparse.getParseError is the canonical way of obtaining these
when you have no positional information.
"""
[docs]def qmreplace(exc):
"""a dumb handler for decoder errors.
This is like python's "replace" handler except that we'll always return
question marks rather than ufffd. The latter makes sense in a unicode
environment, but we need this for VOTable chars, and there that's just
a nuisance.
"""
return '?', exc.start+1
codecs.register_error("qmreplace", qmreplace)
[docs]def validateTDComplex(val):
re, im = list(map(float, val.split()))
[docs]def validateVOTInt(val):
"""raise an error if val is not a legal int for VOTables.
Actually, this is for tabledata, and after the relaxed 1.3 rules, we allow
the empty string ("NULL"), too.
"""
if val=="":
return
try:
int(val[2:], 16)
except ValueError:
int(val)
[docs]def indentList(lines, indent):
"""prepens indent to all elements in lines.
"""
return [indent+l for l in lines]
[docs]def getLoopifier(field):
"""returns a function to map code over arrays.
This is used by ``*XtypeEncoderCode`` functions below, and for now only
deals with 1D arrays of xtyped things, which right now means 2D arrays
of votable arrays.
This will return a callable accepting a list of lines (the xtype
decoder for an elementary thing), nor None if the array is too complex.
"""
loopify = lambda x: x
# All xtyped things are 1D arrays so far. We're using this to decide
# if we have to loop
if field.isMultiDim():
if field.arraysize.count("x")==1:
# 1-d array of xtyped thing; handle it
def loopify(code):
return [
"seq, arr = val, []",
"for val in seq:",
]+indentList(code, " ")+[
" arr.append(val)",
"val = arr"]
else:
# just forget it; if there are native objects in the value, it's fail,
# but since the decoder operate the same way, roundtrip will work.
return None
return loopify
[docs]def getXtypeEncoderCode(field):
"""returns code that turns special internal representations for
xtyped fields to what's serialised in VOTables.
For None or unknown xtypes, this will return an empty list. Otherwise,
it expects the value in a local variable val and will leave the transformed
value there.
This is currently only called for char and float arrays, as no
xtypes are defined for other types. If that changes, you'll have
to change the ``*_enc`` modules.
This will handle 1D arrays of xtyped things but nothing more deeply
nested. More deeply nested structures will be left alone (which will
only work under very special conditions and yield ugly error messages
otherwise).
"""
loopify = getLoopifier(field)
if loopify is None:
return []
if (field.xtype=="adql:TIMESTAMP" # legacy, delete ~ 2024
or field.xtype=="timestamp"):
return loopify([
"if isinstance(val, datetime.datetime):",
" val = utils.formatISODT(val)"])
if field.xtype=="timestamp-interval": # local addition
return loopify([
"if isinstance(val, datetime.datetime):",
" val = utils.formatISODT(val)"])
elif field.xtype=="dachs:DATE":
return loopify([
"if isinstance(val, datetime.date):",
" val = val.isoformat()"])
elif field.xtype in ["adql:POINT", "adql:REGION"]:
return loopify([
"if isinstance(val, pgsphere.PgSAdapter):",
" val = val.asSTCS('UNKNOWNFrame')"])
elif field.xtype in ["point", "circle", "polygon", "moc", "x-box"]:
return loopify([
"if isinstance(val, pgsphere.PgSAdapter):",
" val = val.asDALI()"])
else:
return []
[docs]def getXtypeDecoderCode(field):
"""returns code that turns generic VOTable arrays into special internal
representations based on xtype.
This returns a list of lines or an empty list if no known xtype
is found. The code is executed with the unpacked array seen as val,
and it should set val to the special representation.
This will handle 1D arrays of xtyped things but nothing more deeply
nested. More deeply nested structures will be left alone (which is
ok for round-tripping but probably will fail when DaCHS components
want to process stuff).
"""
if not field.xtype:
return []
loopify = getLoopifier(field)
if loopify is None:
return []
src = [
# the val.strip() is a workaround for a TOPCAT bug that would sometimes
# turn empty strings into single blanks.
"if not val or (isinstance(val, str) and not val.strip()):",
" val = None",
"else:"]
if field.xtype=="adql:POINT":
src.extend([
" val = stc.parseSimpleSTCS(val)"])
elif field.xtype=="adql:REGION":
src.extend([
" val = stc.simpleSTCSToPolygon(val)"])
elif field.xtype=="point":
src.extend([
" val = pgsphere.SPoint.fromDALI(val)"])
elif field.xtype=="circle":
src.extend([
" val = pgsphere.SCircle.fromDALI(val)"])
elif field.xtype=="polygon":
src.extend([
" val = pgsphere.SPoly.fromDALI(val)"])
elif field.xtype=="moc":
src.extend([
" val = pgsphere.SMoc.fromDALI(val)"])
elif field.xtype=="x-box":
src.extend([
" val = pgsphere.SBox.fromDALI(val)"])
elif (field.xtype=="adql:TIMESTAMP"
or field.xtype=="timestamp"
or field.xtype=="timestamp-interval"):
src.extend([
" val = parseDefaultDatetime(val)"])
# GAVO-specific extension for consistency in our type systems
elif field.xtype=="dachs:DATE":
src.extend([
" val = parseDefaultDate(val)"])
else:
# unknown xtype; ignore it and process stuff as usual
return []
return loopify(src)
[docs]class NULLFlags(object):
"""an interface to the BINARY2 NULL flags.
Construct it with the number of fields, then use
"""
masks = [0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01]
def __init__(self, nFields):
self.nFields = nFields
self.nBytes = (self.nFields+7)//8
[docs] def serialize(self, nullMap):
"""returns null bytes for nullMap, which is a sequence of booleans
with Trues where the field is NULL.
It is an error to pass in nullMaps with lengths!=nFields.
"""
assert len(nullMap)==self.nFields
mapBytes, curBits, val = [], 0, 0
for isNull in nullMap:
if isNull:
val = (val<<1)+1
else:
val <<= 1
curBits += 1
if curBits==8:
mapBytes.append(val)
curBits, val = 0, 0
if curBits:
val <<= (8-curBits)
mapBytes.append(val)
return bytes(mapBytes)
[docs] def serializeFromRow(self, row):
"""returns null bytes for a row, which is a sequence of values.
Everything that's None is flagged as NULL.
"""
return self.serialize([v is None for v in row])
[docs] def deserialize(self, toDecode):
"""returns a sequence of booleans giving for each element in a row
if there's a NULL there.
"""
nulls = []
for byte in toDecode:
for mask in self.masks:
if mask&byte:
nulls.append(True)
else:
nulls.append(False)
if len(nulls)==self.nFields:
break
return nulls
[docs] def getFromFile(self, file):
"""returns a sequence of booleans giving for each element in a row
if there's a NULL there.
"""
return self.deserialize(file.read(self.nBytes))
[docs]def isMultiDim(arraysize):
"""returns True if the VOTable arraysize denotes a >1D-array.
"""
return arraysize is not None and "x" in arraysize
[docs]def hasVarLength(arraysize):
"""returns True if the VOTable arraysize denotes a variable-length array.
This is, of course, False for None arraysizes,
"""
return arraysize and arraysize.endswith("*")
[docs]def getLength(arraysize):
"""returns the number of elements expected for an array described with
the VOTable attribute arraysize.
A 1-element array isn't told apart from a scalar here. Both return 1.
For variable-length arrays, this returns None.
Bad arraysize specs will give ValueErrors (perhaps not always with the
most helpful messages).
>>> getLength(None)
1
>>> getLength("*")
>>> getLength("5")
5
>>> getLength("5x*")
>>> getLength("5x6*")
>>> getLength("7x5x6")
210
>>> getLength("7*x5x6")
Traceback (most recent call last):
ValueError: invalid literal for int() with base 10: '7*'
"""
if arraysize is None:
return 1
if arraysize.endswith("*"):
return None
elif isMultiDim(arraysize):
return functools.reduce(
lambda a, b: a*b, map(int, arraysize.split("x")))
else:
try:
return int(arraysize)
except ValueError:
# fall through to exception at function exit
pass
raise ValueError("Invalid arraysize specification: %s"%arraysize)
[docs]def getShape(datatype, arraysize):
"""returns a numpy-compatible shape for a VOTable arraysize.
For variable length 1D arrays, this returns None; for 2+D arrays, the
last dimension is currently replaced by 1. Which doesn't sound smart.
"""
if arraysize is None:
return None
if datatype=="char" and not "x" in arraysize:
# special case: 1d char arrays are just scalar strings
return None
if arraysize=="*":
return None # What should we really return here?
val = arraysize.replace("*", "")
if "x" in val:
if val.endswith("x"): # variable last dimension
val = val+'1'
return tuple(int(d) for d in val.split("x"))
else:
return (int(val),)
if __name__=="__main__": # pragma: no cover
import doctest
doctest.testmod()