"""
Binary VOTable encoding.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import datetime #noflake: used in generated code
import struct
from gavo import utils #noflake: used by generated code
from gavo.utils import pgsphere #noflake: used by generated code
from gavo.votable import coding #noflake: used by generated code
from gavo.votable import common
floatNaN = struct.pack("!f", common.NaN)
doubleNaN = struct.pack("!d", common.NaN)
def _getArrayShapingCode(field, padder):
"""returns common code for almost all array serialization.
Field must describe an array (as opposed to a single value).
padder must be python-source for whatever is used to pad
arrays that are too short.
"""
base = [
"if val is None: val = []"]
if field.isMultiDim():
# it's an n-d array (n>1): flatten out all values
base.append("val = coding.ravel(val)")
if field.hasVarLength():
return base+["tokens.append(struct.pack('!i', len(val)))"]
else:
return base+["val = coding.trim(val, %s, %s)"%(
field.getLength(), padder)]
def _addNullvalueCode(field, nullvalue, src):
"""adds code to let null values kick in a necessary.
nullvalue here has to be a ready-made *python* literal. Take care
when passing in user supplied values here.
"""
if nullvalue is None:
action = (" raise common.BadVOTableData('None passed for field"
" that has no NULL value', None, '%s', hint='Integers in VOTable"
" have no natural serializations for missing values. You need to"
" define one using values null to allow for NULL in integer columns')"
)%field.getDesignation()
else:
action = " tokens.append(%s)"%nullvalue
return [
"if val is None:",
action,
"else:"
]+common.indentList(src, " ")
def _makeBooleanEncoder(field):
return [
"if val is None:",
" tokens.append(b'?')",
"elif val:",
" tokens.append(b'1')",
"else:",
" tokens.append(b'0')",
]
def _makeBitEncoder(field, allowNULL=False):
# bits and bit arrays are just (possibly long) integers
# length may be None for var length.
length = field.getLength()
if allowNULL:
src = [
"if val is None:"
" tokens.append(b'\\0\\0\\0\\0')",]
else:
src = [
"if val is None:",
" raise common.BadVOTableData('Bits have no NULL value', None,",
" '%s')"%field.getDesignation(),]
src.extend([
"else:",
" tmp = []",
" curByte, rest = val%256, val//256",
" while curByte:",
" tmp.append(curByte)",
" curByte, rest = rest%256, rest//256",
" if not tmp:", # make sure we leave something even for 0
" tmp.append(0)",
" tmp.reverse()",])
if length!=1: # this not just a single bit
if length is None: # variable length: dump number of bits
src.extend([
" tokens.append(struct.pack('!i', len(tmp)*8))"])
else: # crop/expand as necessary
numBytes = int(length)//8+(not not int(length)%8)
src.extend([
" if len(tmp)<%d: tmp = [0]*(%d-len(tmp))+tmp"%(
numBytes, numBytes),
" if len(tmp)>%d: tmp = tmp[-%d:]"%(numBytes, numBytes)])
src.extend([
" tokens.append(bytes(tmp))"])
return src
def _generateFloatEncoderMaker(fmtCode, nullName):
def makeFloatEncoder(field):
return [
"if val is None:",
" tokens.append(%s)"%nullName,
"else:",
" tokens.append(struct.pack('%s', val))"%fmtCode]
return makeFloatEncoder
def _generateComplexEncoderMaker(fmtCode, singleNull):
def makeComplexEncoder(field):
return [
"if val is None:",
" tokens.append(%s+%s)"%(singleNull, singleNull),
"else:",
" tokens.append(struct.pack('%s', val.real, val.imag))"%fmtCode]
return makeComplexEncoder
def _generateIntEncoderMaker(fmtCode):
def makeIntEncoder(field):
nullvalue = coding.getNullvalue(field, int)
if nullvalue is not None:
nullvalue = repr(struct.pack(fmtCode, int(nullvalue)))
return _addNullvalueCode(field, nullvalue,[
"tokens.append(struct.pack('%s', val))"%fmtCode])
return makeIntEncoder
def _makeUnsignedByteEncoder(field):
# allow these to come from strings, too (db type bytea)
nullvalue = coding.getNullvalue(field, int)
if nullvalue is not None:
nullvalue = repr(struct.pack("B", int(nullvalue)))
return _addNullvalueCode(field, nullvalue, [
"if isinstance(val, int):",
" tokens.append(struct.pack('B', val))",
"else:",
" tokens.append(struct.pack('c', val[:1]))"])
def _makeCharEncoder(field):
nullvalue = coding.getNullvalue(field, lambda _: True)
if nullvalue is not None:
nullvalue = repr(struct.pack("c", utils.bytify(nullvalue)))
return ["if isinstance(val, str): val = val.encode('ascii')",
]+_addNullvalueCode(field, nullvalue, [
"tokens.append(bytes(val))"])
def _makeUnicodeCharEncoder(field):
nullvalue = coding.getNullvalue(field, lambda _: True)
if nullvalue is not None:
coded = nullvalue.encode("utf-16be")
nullvalue = repr(struct.pack("%ds"%len(coded), coded))
return _addNullvalueCode(field, nullvalue, [
"coded = val.encode('utf-16be')",
"tokens.append(struct.pack('%ds'%len(coded), coded))"])
def _makeCharArrayEncoder(field):
# special handling for character arrays, since we don't want to treat
# those as character arrays in python.
nullvalue = coding.getNullvalue(field, lambda _: True, default="")
src = []
src.extend(common.getXtypeEncoderCode(field))
src.append("val = coding.trimString(val, %s)"%repr(field.arraysize))
if field.hasVarLength():
src.append("tokens.append(struct.pack('!i', len(val)))")
if nullvalue is None:
nullvalue = repr('\0\0\0\0')
else:
# The bytes in the next line allows nullvalue to be str (containing
# ascii, of course)
nullvalue = repr(struct.pack("!i%ds"%len(nullvalue),
len(nullvalue), nullvalue.encode("utf-8")))
else:
if nullvalue is not None:
nullvalue = repr(struct.pack("%ds"%field.getLength(),
coding.trimString(nullvalue, field.arraysize).encode("utf-8")))
# no predefined nullvalue for constant-length strings
if field.datatype=="unicodeChar":
src.append("val = val.encode('utf-16be')")
elif field.datatype=="char":
src.extend([
'if isinstance(val, str):',
' val = val.encode("ascii", "replace")'])
src.append("tokens.append(struct.pack('%ds'%len(val), val))")
return _addNullvalueCode(field, nullvalue, src)
_encoders = {
"boolean": _makeBooleanEncoder,
"bit": _makeBitEncoder,
"unsignedByte": _makeUnsignedByteEncoder,
"short": _generateIntEncoderMaker('!h'),
"int": _generateIntEncoderMaker('!i'),
"long": _generateIntEncoderMaker('!q'),
"char": _makeCharEncoder,
"unicodeChar": _makeUnicodeCharEncoder,
"double": _generateFloatEncoderMaker("!d", "doubleNaN"),
"float": _generateFloatEncoderMaker("!f", "floatNaN"),
"doubleComplex": _generateComplexEncoderMaker("!dd", "doubleNaN"),
"floatComplex": _generateComplexEncoderMaker("!ff", "floatNaN"),
}
def _getArrayEncoderLines(field):
"""returns python lines to encode array values of field.
"""
type = field.datatype
# bit array literals are integers, same as bits
if type=="bit":
return _makeBitEncoder(field)
if type=="char" or type=="unicodeChar":
return _makeCharArrayEncoder(field)
# Everything else can use some common array shaping code since value comes in
# some kind of sequence.
padder = '[None]'
src = [ # Painful name juggling to avoid having to call functions.
"fullTokens = tokens",
"tokens = []",
"if val is None:",
" arr = []",
"else:",
" arr = val",
"for val in arr:"
]+common.indentList(_encoders[field.datatype](field), " ")
src.extend([
"fullTokens.append(b''.join(tokens))",
"tokens = fullTokens"])
return (common.getXtypeEncoderCode(field)
+ _getArrayShapingCode(field, padder)
+ src)
[docs]def getLinesFor(field):
"""returns a sequence of python source lines to encode values described
by field into tabledata.
"""
if field.isScalar():
return _encoders[field.datatype](field)
else:
return _getArrayEncoderLines(field)
[docs]def getPostamble(tableDefinition):
return [
"return b''.join(tokens)"]
[docs]def getGlobals(tableDefinition):
return globals()