"""
Common code for coding and decoding VOTable data.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from gavo import utils
from gavo.votable import common
from gavo.votable.model import VOTable
from functools import reduce
[docs]def getRowEncoderSource(tableDefinition, encoderModule):
"""returns the source for a function encoding rows of tableDefition
in the format implied encoderModule
tableDefinition is a VOTable.TABLE instance, encoderModule
is one of the enc_whatever modules (this function needs getLinesFor
and getPostamble from them).
"""
source = [
"def codec(tableRow):",
" tokens = []",
" val = None"]
source.extend(
common.indentList(
getattr(encoderModule, "getPreamble", lambda td: [])(
tableDefinition), " "))
for index, field in enumerate(
tableDefinition.iterChildrenOfType(VOTable.FIELD)):
source.extend([
" try:",
" val = tableRow[%d]"%index])
source.extend(common.indentList(encoderModule.getLinesFor(field), " "))
source.extend([
" except common.VOTableError:",
" raise",
" except Exception as ex:",
# " import traceback; traceback.print_exc()",
" raise common.BadVOTableData(str(ex), repr(val), '%s')"%
field.getDesignation()])
source.extend(common.indentList(
encoderModule.getPostamble(tableDefinition), " "))
return "\n".join(source)
[docs]def buildCodec(source, env):
"""returns a compiled function for source in env.
Source is the result of one of the makeXXX functions in this module,
env typically the result of a getGlobals() on the codec module.
"""
ns = {}
ns.update(env)
# with open("codec.py", "wb") as f: f.write(source.encode("utf-8"))
return utils.compileFunction(source, "codec", useGlobals=ns)
[docs]def buildEncoder(tableDefinition, encoderModule):
return buildCodec(
getRowEncoderSource(tableDefinition, encoderModule),
encoderModule.getGlobals(tableDefinition))
[docs]def buildDecoder(tableDefinition, decoderModule):
return buildCodec(
decoderModule.getRowDecoderSource(tableDefinition),
decoderModule.getGlobals(tableDefinition))
[docs]def getNullvalue(field, validator, default=None):
"""returns None or the nullvalue defined for field.
validator is a function that raises some exception if the nullvalue
is inappropriate. It should do so in particular on everything that
contains quotes and such; the nullvalues are included in source code
and thus might be used to inject code if not validated.
"""
nullvalue = None
for values in field.iterChildrenOfType(VOTable.VALUES):
if values.null is not None:
nullvalue = values.null
if nullvalue is None or nullvalue=='':
return default
else:
validator(nullvalue)
return nullvalue
[docs]def unravelArray(arraysize, seq):
"""turns a flat sequence into an n-dim array as specified by the votable
arraysize spec arraysize.
arraysize is <int>{"x"<int>}*?|*.
No padding or cropping will take place. This means that the last
row(s) may have improper sizes if seq is incompatible with arraysize.
>>> unravelArray("2x3", "012345")
['01', '23', '45']
>>> unravelArray("2x*", "012345")
['01', '23', '45']
>>> unravelArray("3x2x*", "012345012345")
[['012', '345'], ['012', '345']]
"""
parts = arraysize.split("x")
if len(parts)<2:
return seq
del parts[-1]
# this is so we preserve utils.intlist and friends.
listCons = list
if isinstance(seq, list):
listCons = seq.__class__
for step in map(int, parts):
seq = listCons(seq[i:i+step] for i in range(0, len(seq), step))
return seq
[docs]def parseVOTableArraysizeEl(spec, fieldName):
"""parses a single VOTable arraysize number to (flexible, length).
This will accept single numbers (returns False, number),
number* (returns True, number) and just * (returns 0, number).
This is used to parse the last part of an n-d array spec. Everything
before that must be an integer only.
"""
try:
if spec=="*":
return True, 0
elif spec.endswith("*"):
return True, int(spec[:-1])
else:
return False, int(spec)
except ValueError:
raise common.VOTableError("Invalid arraysize fragment '%s' in"
" field or param name '%s'"%(spec, fieldName))
[docs]def makeShapeValidator(field):
"""returns code lines to validate an an array shape against a flat
sequence in row.
This is used by the array decoders.
"""
arraysize = field.arraysize
if not arraysize:
return []
dimensions = arraysize.strip().split("x")
stride = 1
# all dimensions except the last must be integers
if len(dimensions)>1:
try:
stride = reduce(lambda a,b: a*b, [int(l) for l in dimensions[:-1]])
except ValueError:
raise common.VOTableError("Invalid arraysize '%s' specified in"
" field or param name '%s'"%(
field.arraysize, field.name))
flexible, length = parseVOTableArraysizeEl(dimensions[-1], field.name)
if flexible:
# 0..n; all we have to do is check that the length is a multiple of
# stride, if that's non-trivial.
# TODO: enfoce length limits? By error or by cropping?
if stride>1:
return [
"if len(row) %% %d:"%stride,
" raise common.BadVOTableLiteral('%s[%s]',"
" '<%%d token(s)>'%%(len(row)), name=%r)"%(
field.datatype, field.arraysize, field.name)]
else:
# exact size specification
return [
"if len(row)!=%d:"%(length*stride),
" raise common.BadVOTableLiteral('%s[%s]',"
" '<%%d token(s)>'%%(len(row)), name=%r)"%(
field.datatype, field.arraysize, field.name)]
# fallback: no validation
return []
[docs]def ravel(seq):
"""expands flattens out any sub-sequences (lists or tuples) in seq
recursively.
This is used by the array encoders.
"""
res = []
iteratorStack = [iter(seq)]
while iteratorStack:
try:
item = next(iteratorStack[-1])
if isinstance(item, (list, tuple)):
iteratorStack.append(iter(item))
# continue iterating from the current item
else:
res.append(item)
except StopIteration:
iteratorStack.pop()
return res
[docs]def trim(seq, arraysize, padder):
"""returns seq with length arraysize.
arraysize is an int; you should just use field.getLength() when
trimming VOTable arraysizes since the arraysize attribute is rather
complex. Arraysize may be None for convenience; trim is a no-op then.
If seq is shorter, padder*missing will be appended, if it is longer, seq will
be shortened from the end.
This is intended as a helper for array encoders.
"""
seq = ravel(seq)
if arraysize is None:
return seq
if len(seq)<arraysize:
seq = seq+padder*(arraysize-len(seq))
elif len(seq)>arraysize:
seq = seq[:arraysize]
return seq
[docs]def trimString(val, arraysize, padChar=" "):
"""returns val flattened and padded with padChar/cropped to length.
field is a V.FIELD or V.PARAM instance for which val should be
prepared.
val can also be a sequence of strings (or nested more deeply). In that
case, trimString will flatten the value(s), padding and cropping as
necessary.
If val is None, then as many padChars will be returned as arraysize
wants (which is 0 for variable-length fields).
trimString expects to deal with strings. It will ascii-decode
bytes if it sees them, though.
For chars, arraysize None is equivalent to arraysize 1.
>>> trimString("abc", "4")
'abc '
>>> trimString(["abc", "de", "f"], "2x*")
'abdef '
>>> trimString([["abc", "cd", "e"], ["", "fgh", "i"]], "2x4x3")
'abcde fgi '
>>> trimString(None, "4x2", 'z')
'zzzzzzzz'
>>> trimString(None, "4x2*", 'z')
''
>>> trimString("abc", None)
'a'
>>> trimString(b"abc", "5", "x")
'abcxx'
"""
if arraysize is None:
arraysize = "1"
if val is None:
expected = common.getLength(arraysize)
if expected:
return padChar*expected
else:
return ""
if isinstance(val, bytes):
val = val.decode("ascii")
if "x" in arraysize:
rest, destLength = arraysize.rsplit("x", 1)
if not destLength.endswith('*'):
destLength = int(destLength)
val = val[:destLength]+[None]*max(0, destLength-len(val))
return "".join(trimString(item, rest, padChar) for item in val)
else:
if arraysize.endswith('*'):
return val
else:
destLength = int(arraysize)
return val[:destLength]+padChar*max(0, destLength-len(val))
if __name__=="__main__": # pragma: no cover
import doctest
doctest.testmod()