"""
Coding and decoding from binary.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import re #noflake: used by generated code
import struct #noflake: used by generated code
from gavo import stc #noflake: used by generated code
from gavo import utils #noflake: used by generated code
from gavo.utils import parseDefaultDatetime, parseDefaultDate #noflake: used by generated code
from gavo.utils import pgsphere #noflake: used by generated code
from gavo.votable import coding
from gavo.votable import common #noflake: used by generated code
from gavo.votable.model import VOTable
# literals for BINARY booleans
BINENCBOOL = {
b't': True,
b'T': True,
b'1': True,
b'f': False,
b'F': False,
b'0': False,
b'?': None,
}
def _addNullvalueCode(field, src, validator):
"""adds code to catch nullvalues if required by field.
validator must be a function returning a valid python literal for
a nullvalue attribute or raise an exception. This is security
critical since whatever validator returns gets embedded into
natively-run source code.
"""
nullvalue = coding.getNullvalue(field, validator)
if nullvalue:
src.extend([
'if val==%s:'%validator(nullvalue),
' row.append(None)',
'else:',
' row.append(val)',])
else:
src.append('row.append(val)')
return src
def _makeFloatDecoder(field):
numBytes, structCode, _ = _typemap[field.datatype]
return [
'val = struct.unpack("!%s", inF.read(%d))[0]'%(structCode, numBytes),
'if val!=val:',
' row.append(None)',
'else:',
' row.append(val)']
def _makeComplexDecoder(field, numBytes, structCode):
return [
'_re, _im = struct.unpack(%s, inF.read(%d))'%(repr(structCode), numBytes),
'if _re!=_re or _im!=_im:',
' row.append(None)',
'else:',
' row.append(_re+1j*_im)']
def _makeIntDecoder(field):
numBytes, structCode, _ = _typemap[field.datatype]
src = [
'val = struct.unpack("!%s", inF.read(%d))[0]'%(structCode, numBytes)
]
return _addNullvalueCode(field, src, int)
def _makeBooleanDecoder(field):
return [
'row.append(BINENCBOOL[inF.read(1)])']
def _getArraysizeCode(field):
src = []
if field.hasVarLength():
src.append('arraysize = struct.unpack("!i", inF.read(4))[0]')
else:
try:
src.append("arraysize = %d"%field.getLength())
except ValueError:
src.append("arraysize = 1")
return src
def _makeBitDecoder(field):
# bits/bit arrays are just dumped bits we turn to integers
# it's basically the same thing for arrays and single values.
src = _getArraysizeCode(field)
src.extend([
'if arraysize==0:',
' res = []',
'else:',
' numBytes = (arraysize+7)//8',
' topMask = (1<<(arraysize%8))-1', # mask for payload in topmost byte
' if topMask==0: topMask = 0xff',
' bytes = struct.unpack("%dB"%numBytes, inF.read(numBytes))',
' res = bytes[0]&topMask',
' for b in bytes[1:]:',
' res = (res<<8)+b',
'row.append(res)'])
return src
def _makeString(field, customSrc):
src = _getArraysizeCode(field)
src.extend(customSrc)
if field.isMultiDim():
src.append("val = coding.unravelArray(%s, val)"%repr(field.arraysize))
src.append("if '\\0' in val: val = val[:val.index('\\0')]")
src.extend(common.getXtypeDecoderCode(field))
_addNullvalueCode(field, src, repr)
return src
def _makeCharDecoder(field):
return _makeString(field, [
'val = struct.unpack("%ds"%arraysize, inF.read(arraysize))[0]'
'.decode("ascii", "qmreplace")'])
def _makeUnicodeCharDecoder(field):
# XXX BUG: Anything outside the BMP will kill this
return _makeString(field, [
'val = struct.unpack("%ds"%(2*arraysize), inF.read(2*arraysize)'
')[0].decode("utf-16be")'])
_typemap = {
"unsignedByte": (1, 'B', "bytelist"),
"short": (2, 'h', "intlist"),
"int": (4, 'i', "intlist"),
"long": (8, 'q', "intlist"),
"float": (4, 'f', "floatlist"),
"double": (8, 'd', "floatlist"),}
_decoders = {
'boolean': _makeBooleanDecoder,
'bit': _makeBitDecoder,
'char': _makeCharDecoder,
'unicodeChar': _makeUnicodeCharDecoder,
'unsignedByte': _makeIntDecoder,
'short': _makeIntDecoder,
'int': _makeIntDecoder,
'long': _makeIntDecoder,
'float': _makeFloatDecoder,
'double': _makeFloatDecoder,
'floatComplex': lambda v: _makeComplexDecoder(v, 8, '!ff'),
'doubleComplex': lambda v: _makeComplexDecoder(v, 16, '!dd'),
}
def _makeShortcutCode(field, type):
"""returns None or code to quickly decode a field array.
Fast decoding for whatever is mentioned in _typemap and no nullvalues
are defined.
"""
if type not in _typemap:
return None
if coding.getNullvalue(field, str) is not None:
return None
numBytes, typecode, listtype = _typemap[type]
src = _getArraysizeCode(field)
src.append(
'vals = struct.unpack("!%%d%s"%%arraysize, inF.read(arraysize*%d))'%(
typecode, numBytes))
if type=='float' or type=='double':
src.append(
'row.append(utils.%s(v!=v and None or v for v in vals))'%listtype)
else:
src.append(
'row.append(utils.%s(vals))'%listtype)
def _getArrayDecoderLines(field):
"""returns lines that decode arrays of literals.
Unfortunately, the spec is plain nuts, so we need to pull some tricks here.
"""
type = field.datatype
# Weird things
if type=="bit":
return _makeBitDecoder(field)
elif type=='char':
return _makeCharDecoder(field)
elif type=='unicodeChar':
return _makeUnicodeCharDecoder(field)
# Fast array decoding for fields without null values
src = _makeShortcutCode(field, type)
if src is not None:
return src
_, _, listtype = _typemap.get(type, (None, None, 'list'))
# default processing
src = [ # OMG. I'm still hellbent on not calling functions here.
'fullRow, row = row, []'
]
src.extend(_getArraysizeCode(field))
src.extend([
"for i in range(arraysize):"])
src.extend(common.indentList(_decoders[type](field), " "))
src.extend(coding.makeShapeValidator(field))
src.extend([
"val = utils.%s(row)"%listtype,
"row = fullRow"])
src.append("val = coding.unravelArray(%s, val)"%repr(field.arraysize))
src.extend(common.getXtypeDecoderCode(field))
src.append("row.append(val)")
return src
[docs]def getLinesFor(field):
"""returns a sequence of python source lines to decode BINARY-encoded
values for field.
"""
if field.isScalar():
return _decoders[field.datatype](field)
else:
return _getArrayDecoderLines(field)
[docs]def getRowDecoderSource(tableDefinition):
"""returns the source for a function deserializing a BINARY stream.
tableDefinition is a VOTable.TABLE instance. The function returned
expects a file-like object.
"""
source = ["def codec(inF):", " row = []"]
for index, field in enumerate(
tableDefinition.iterChildrenOfType(VOTable.FIELD)):
source.extend([
" try:",]+
common.indentList(getLinesFor(field), " ")+[
" except IOError:", # EOF on empty row is ok.
" if inF.atEnd() and row==[]:",
" return None",
" raise",
" except common.VOTableError:",
" raise",
" except:",
# " import traceback; traceback.print_exc()",
" raise common.BadVOTableLiteral('%s', repr(inF.lastRes), name=%r)"%(
field.datatype, field.name)])
source.append(" return row")
return "\n".join(source)
[docs]def getGlobals(tableDefinition):
return globals()