"""
Coding and decoding from tabledata.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import re #noflake: used by generated code
from gavo import utils #noflake: used by generated code
from gavo.utils import parseDefaultDatetime, parseDefaultDate #noflake: used by generated code
from gavo.utils import pgsphere #noflake: used by generated code
from gavo.votable import coding
from gavo.votable import common
from gavo.votable.model import VOTable
try:
from gavo import stc #noflake: used by generated code
except ImportError:
# see modelgroups
pass
# literals for TDENC booleans
TDENCBOOL = {
't': True,
'1': True,
'true': True,
'f': False,
'0': False,
'false': False,
'?': None,
'': None,
}
[docs]def tokenizeComplexArr(val):
"""iterates over suitable number literal pairs from val.
"""
last = None
if val is None:
return
for item in val.split():
if not item:
continue
if last is None:
last = item
else:
yield "%s %s"%(last, item)
last = None
if last:
yield last
[docs]def tokenizeBitArr(val):
"""iterates over 0 or 1 tokens in val, discarding everything else.
"""
if val is None:
return
for item in val:
if item in "01":
yield item
[docs]def tokenizeNormalArr(val):
"""iterates over all whitespace-separated tokens in val
"""
if val is None:
return
for item in val.split():
if item:
yield item
def _addNullvalueCode(field, src, validator):
"""adds code to catch nullvalues if required by field.
"""
nullvalue = coding.getNullvalue(field, validator)
if nullvalue is not None:
src = [
'if val=="%s":'%nullvalue,
' row.append(None)',
'else:']+common.indentList(src, " ")
return src
def _makeFloatDecoder(field):
src = [
'if not val or val=="NaN":',
' row.append(None)',
'else:',
' row.append(float(val))',]
return _addNullvalueCode(field, src, float)
def _makeComplexDecoder(field):
src = [
'if not val:',
' row.append(None)',
'else:',
' try:',
' r, i = val.split()',
' except ValueError:',
' r, i = float(val), 0',
' if r!=r or i!=i:',
' row.append(None)',
' else:'
' row.append(complex(float(r), float(i)))',]
return _addNullvalueCode(field, src, common.validateTDComplex)
def _makeIntDecoder(field, maxInt):
src = [
'if not val:',
' row.append(None)',
'elif val.startswith("0x"):',
' unsigned = int(val[2:], 16)',
# Python hex parsing is unsigned, fix manually based on maxInt
' if unsigned>=%d:'%maxInt,
' row.append(unsigned-%d)'%((maxInt+1)*2),
' else:',
' row.append(unsigned)',
'else:',
' row.append(int(val))']
return _addNullvalueCode(field, src, common.validateVOTInt)
def _makeCharDecoder(field, emptyIsNull=True, fallbackEncoding="ascii"):
"""parseBytes enables return of empty string (as opposed to None).
"""
# Elementtree makes sure we're only seeing unicode strings here
# We simply pass these on -- while this makes us accept more than
# we should, in practice returning bytes here would cause a lot
# more headache. Ask astropy users.
src = []
if emptyIsNull:
src.extend([
'if not val:',
' val = None',])
else:
src.extend([
'if val is None:',
' val = ""'])
nullvalue = coding.getNullvalue(field, str, "")
if nullvalue:
src.extend([
'if val==%s:'%repr(nullvalue),
' val = None',])
if field.isMultiDim():
src.append("val = coding.unravelArray(%s, val)"%repr(field.arraysize))
xtypeDecoder = common.getXtypeDecoderCode(field)
if xtypeDecoder:
src.extend(xtypeDecoder)
src.append("row.append(val)")
return src
def _makeUnicodeDecoder(field, emptyIsNull=True):
return _makeCharDecoder(field, emptyIsNull, fallbackEncoding=None)
def _makeBooleanDecoder(field):
return ['row.append(TDENCBOOL[val.strip().lower()])']
def _makeBitDecoder(field):
return ['row.append(int(val))']
_decoders = {
'boolean': (_makeBooleanDecoder, 'list'),
'bit': (_makeBitDecoder, 'list'),
'unsignedByte': (lambda v: _makeIntDecoder(v, 256), 'bytelist'),
'char': (_makeCharDecoder, 'list'),
'unicodeChar': (_makeUnicodeDecoder, 'list'),
'short': (lambda v: _makeIntDecoder(v, 32767), 'intlist'),
'int': (lambda v: _makeIntDecoder(v, 2147483647), 'intlist'),
'long': (lambda v: _makeIntDecoder(v, 9223372036854775807), 'intlist'),
'float': (_makeFloatDecoder, 'floatlist'),
'double': (_makeFloatDecoder, 'floatlist'),
'floatComplex': (_makeComplexDecoder, 'complexlist'),
'doubleComplex': (_makeComplexDecoder, 'complexlist'),
}
def _getArrayDecoderLines(field):
"""returns lines that decode arrays of literals.
Unfortunately, the spec is plain nuts, so we need to pull some tricks here.
As per VOTable 1.3, we translate empty strings to Nones; we use the
liberty that empty and NULL arrays are not distinguished to return
empty arrays as empty arrays, though.
"""
# TODO: this now contains enough generic code to marry it with the
# respective function in dec_binary (and move the result to coding.py).
type = field.datatype
if type=='char':
return _makeCharDecoder(field, emptyIsNull=True)
elif type=='unicodeChar':
return _makeUnicodeDecoder(field, emptyIsNull=True)
decoder, listtype = _decoders[type]
src = [ # OMG. I'm still hellbent on not calling functions here.
'arrayLiteral = val',
'fullRow, row = row, []',
]
if type=='floatComplex' or type=='doubleComplex':
src.append("for val in tokenizeComplexArr(arrayLiteral):")
elif type=='bit':
src.append("for val in tokenizeBitArr(arrayLiteral):")
else:
src.append("for val in tokenizeNormalArr(arrayLiteral):")
src.extend(common.indentList(decoder(field), " "))
src.extend(coding.makeShapeValidator(field))
src.extend([
"val = utils.%s(row)"%listtype,
"row = fullRow"])
src.append("val = coding.unravelArray(%s, val)"%repr(field.arraysize))
src.extend(common.getXtypeDecoderCode(field))
src.extend([
"row.append(val)"])
return [
"if val=='':",
" row.append(None)",
"else:"]+common.indentList(src, " ")
[docs]def getLinesFor(field):
"""returns a sequence of python source lines to decode TABLEDATA-encoded
values for field.
"""
if field.isScalar():
return _decoders[field.datatype][0](field)
else:
return _getArrayDecoderLines(field)
[docs]def getRowDecoderSource(tableDefinition):
"""returns the source for a function deserializing rows of tableDefition
in TABLEDATA.
tableDefinition is a VOTable.TABLE instance.
"""
source = ["def codec(rawRow):", " row = []"]
for index, field in enumerate(
tableDefinition.iterChildrenOfType(VOTable.FIELD)):
source.extend([
" try:",
" val = rawRow[%d]"%index,]+
common.indentList(getLinesFor(field), " ")+[
" except common.VOTableError:",
" raise",
" except Exception as ex:",
# " import traceback; traceback.print_exc()",
" raise common.BadVOTableLiteral('%s', val, ex, name=%r)"%(
field.datatype, field.name)])
source.append(" return row")
return "\n".join(source)
return source
[docs]def getGlobals(tableDefinition):
return globals()