"""
BINARY2 VOTable encoding.
BINARY2 is like BINARY, except every record is preceded by a mask which
columns are NULL.
We do not determine any nullvalues any more here.
Sorry for gratuituously peeking into the guts of enc_binary here. But well,
it's family.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import datetime #noflake: used by generated code
import struct
from gavo import utils #noflake: used by generated code
from gavo.utils import pgsphere #noflake: used by generated code
from gavo.votable import coding #noflake: used by generated code
from gavo.votable import common
from gavo.votable import enc_binary
floatNaN = struct.pack("!f", common.NaN)
doubleNaN = struct.pack("!d", common.NaN)
def _makeBitEncoder(field):
return enc_binary._makeBitEncoder(field, allowNULL=True)
def _generateIntEncoderMaker(fmtCode):
def makeIntEncoder(field):
return [
"if val is None:",
" val = 0",
"tokens.append(struct.pack('%s', val))"%fmtCode]
return makeIntEncoder
def _makeCharEncoder(field):
return [
"if val is None:",
" val = 0",
"else:"
" val = ord(val)",
"tokens.append(bytes([val]))"]
def _makeUnicodeCharEncoder(field):
return [
"if val is None:",
r" tokens.append(b'\x00\x00')",
"else:",
" coded = val.encode('utf-16be')",
" tokens.append(struct.pack('%ds'%len(coded), coded))"]
def _makeUnsignedByteEncoder(field):
return [
"if isinstance(val, int):",
" tokens.append(struct.pack('B', val))",
"elif val is None:",
r" tokens.append(b'\xff')",
"else:",
" tokens.append(struct.pack('c', val))"]
_encoders = {
"boolean": enc_binary._makeBooleanEncoder,
"bit": enc_binary._makeBitEncoder,
"unsignedByte": _makeUnsignedByteEncoder,
"short": _generateIntEncoderMaker('!h'),
"int": _generateIntEncoderMaker('!i'),
"long": _generateIntEncoderMaker('!q'),
"char": _makeCharEncoder,
"unicodeChar": _makeUnicodeCharEncoder,
"double": enc_binary._generateFloatEncoderMaker("!d", "doubleNaN"),
"float": enc_binary._generateFloatEncoderMaker("!f", "floatNaN"),
"doubleComplex": enc_binary._generateComplexEncoderMaker(
"!dd", "doubleNaN"),
"floatComplex": enc_binary._generateComplexEncoderMaker("!ff", "floatNaN"),
}
def _makeCharArrayEncoder(field):
# special handling for character arrays, since we don't want to treat
# those as character arrays in python; while we're processing this,
# they're python strings, although we accept byte strings as long
# as they are ASCII
src = []
src.extend([
'if isinstance(val, bytes):',
' val = val.decode("ascii", "replace")'])
src.extend(common.getXtypeEncoderCode(field))
src.append(
r"val = coding.trimString(val, %s, '\0')"%repr(field.arraysize))
if field.hasVarLength():
src.extend([
"tokens.append(struct.pack('!i', len(val)))",
])
encoding = "utf-16be" if field.datatype=="unicodeChar" else "ascii"
src.append("val = val.encode('%s', 'replace')"%encoding)
src.append("tokens.append(val)")
return src
def _getArrayEncoderLines(field):
"""returns python lines to encode array values of field.
"""
type = field.datatype
# bit array literals are integers, same as bits
if type=="bit":
return _makeBitEncoder(field)
if type=="char" or type=="unicodeChar":
return _makeCharArrayEncoder(field)
# Everything else can use some common array shaping code since value comes in
# some kind of sequence.
padder = '[None]'
src = [ # Painful name juggling to avoid having to call functions.
"fullTokens = tokens",
"tokens = []",
"if val is None:",
" arr = []",
"else:",
" arr = val",
"for val in arr:"
]+common.indentList(_encoders[field.datatype](field), " ")
src.extend([
"fullTokens.append(b''.join(tokens))",
"tokens = fullTokens"])
return (common.getXtypeEncoderCode(field)
+ enc_binary._getArrayShapingCode(field, padder)
+ src)
[docs]def getLinesFor(field):
"""returns a sequence of python source lines to encode values described
by field into tabledata.
"""
if field.isScalar():
return _encoders[field.datatype](field)
else:
return _getArrayEncoderLines(field)
[docs]def getPreamble(tableDefinition):
return [
"tokens.append(nullFlags.serializeFromRow(tableRow))"]
[docs]def getPostamble(tableDefinition):
return [
"return b''.join(tokens)"]
[docs]def getGlobals(tableDefinition):
vars = globals().copy()
vars["nullFlags"] = common.NULLFlags(len(tableDefinition.getFields()))
return vars