"""
Helpers for generating boosters for HDF5 data.
HDF5 is fairly complex, and directgrammar is too long as is. Also, I don't
want to require h5py as a fixed dependency of DaCHS; if it's not
there, you should still be able to use other sorts of direct grammars.
TODO: Most of the hdf interface functions return 0 on success, != on
failure; we'd like a macro to catch these.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import h5py
from gavo import base
from gavo import utils
# See directgrammar.HDF5vaexCodeGenerator for how this import works
from gavo.grammars import directgrammar
[docs]class BaseHDF5CodeGenerator(directgrammar._NumpyMetaCodeGenerator):
"""An abstract base for generating boosters for various sorts of
HDF5 files.
Our strategy when parsing from them is to read CHUNK_SIZE items
at a time into the corresponding arrays, and then iterating
over these chunks, building the records.
The difference between the various flavours is where the column
metadata is taken from and where the data is then pulled from.
"""
_h5typemap = {
"byte": "H5T_NATIVE_CHAR",
"int8": "H5T_NATIVE_CHAR",
"ubyte": "H5T_NATIVE_UCHAR",
"uint8": "H5T_NATIVE_UCHAR",
"short": "H5T_NATIVE_SHORT",
"int16": "H5T_NATIVE_SHORT",
"ushort": "H5T_NATIVE_USHORT",
"uint16": "H5T_NATIVE_USHORT",
"cint": "H5T_NATIVE_INT",
"int32": "H5T_NATIVE_INT",
"uint": "H5T_NATIVE_UINT",
"uintc": "H5T_NATIVE_UINT",
"int64": "H5T_NATIVE_LLONG",
"uint64": "H5T_NATIVE_ULLONG",
"ulonglong": "H5T_NATIVE_ULLONG",
"single": "H5T_NATIVE_FLOAT",
"float16": "H5T_NATIVE_FLOAT",
"float32": "H5T_NATIVE_FLOAT",
"double": "H5T_NATIVE_DOUBLE",
"float64": "H5T_NATIVE_DOUBLE",
}
def __init__(self, grammar, tableDef):
directgrammar._NumpyMetaCodeGenerator.__init__(self, grammar, tableDef)
if self.grammar.parent.sources is None:
raise base.StructureError("Cannot make HDF5 vaex booster without"
" a sources element on the embedding data.")
try:
self.sampleSource = next(self.grammar.parent.sources.iterSources())
hdf = h5py.File(self.sampleSource)
except StopIteration:
raise base.StructureError("Building an HDF5 booster requires"
" at least one matching source.")
cols = self.getColumns(hdf)
self.inputColumns = []
for name, numpyType in cols:
if numpyType.ndim!=0:
raise NotImplementedError("Cannot handle arrays in vaex"
" HDF5 yet. Please contact the authors.")
try:
cType = self.numpyTypes[str(numpyType)]
except KeyError:
raise NotImplementedError("Cannot handle the numpy type"
f" {numpyType} yet. Please contact the authors.")
self.inputColumns.append((name, numpyType, cType))
hdf.close()
nameMap = {}
if self.grammar.mapKeys:
nameMap = self.grammar.mapKeys.maps
self.byName = utils.CaseSemisensitiveDict(
(nameMap.get(t[0], t[0]), t)
for t in self.inputColumns)
def _getStructDefinition(self):
"""returns a definition for a C structure for holding one record.
"""
res = ["typedef struct InRec_s {"]
for name, _, cType in self.inputColumns:
res.append(f" {cType} {name};")
res.append("} InRec;")
return "\n".join(res)
def _getArrayDecls(self):
"""returns the declarations for the buffer arrays.
"""
return "\n".join(
f" {cType} arr_{name}[CHUNK_SIZE];"
for name, _, cType in self.inputColumns)
def _getRefillCode(self):
"""returns code to (up to) CHUNK_SIZE items into the column
arrays.
"""
res = [
"offsets[0] = total_read;",
"if (total_read+chunk_size[0]>nrecs) {",
" chunk_size[0] = nrecs-total_read;",
" HDFGUARD(H5Sselect_hyperslab(memspace, H5S_SELECT_SET,"
" null_offset, NULL, chunk_size, NULL));",
"}",
"HDFGUARD(H5Sselect_hyperslab(colspace, H5S_SELECT_SET,"
" offsets, NULL, chunk_size, NULL));",]
for index, (name, npType, cType) in enumerate(self.inputColumns):
memTypeId = self._h5typemap[str(npType)]
res.append(
f"HDFGUARD(H5Dread(datasets[{index}], {memTypeId}, memspace,"
f" colspace, H5P_DEFAULT, arr_{name}));")
return "\n".join(res)
def _getTupleBuilder(self):
"""returns code to build the InRec struct passed to getTuple.
"""
res = []
for index, (name, _, cType) in enumerate(self.inputColumns):
res.append(f"cur_rec.{name} = arr_{name}[index_in_chunk];");
return "\n".join(res)
[docs] def getPreamble(self):
n_cols = len(self.inputColumns)
return directgrammar._NumpyMetaCodeGenerator.getPreamble(self
)+[
"#include <inttypes.h>",
"#include <stdlib.h>",
"#include <hdf5.h>",
f"#define N_COLS {n_cols}",
# how many rows to process in one go?
"#define CHUNK_SIZE {}".format(
self.grammar.getProperty("chunkSize", 5000)),
self._getStructDefinition(),
"#define HDFGUARD(x) if ((x)<0) abort();",]
[docs] def getPrototype(self):
return "Field *getTuple(InRec *data, int rowIndex)"
[docs] def getItemParser(self, item, index):
nameForItem = directgrammar.getNameForItem(item)
typeMacro = directgrammar._getMakeMacro(item)
res = [
f"/* {item.description} ({item.type}) */",]
if item.name not in self.byName:
res.append(
f"MAKE_NULL({nameForItem});"
f"/* {typeMacro}({nameForItem}, FILL IN VALUE); */")
else:
origName, _, castTo = self.byName[item.name]
# there is a source column in the HDF5, make a default map
res.append(
f"{typeMacro}({nameForItem}, (({castTo})(data->{origName})));")
return res
[docs] def getColumns(self):
"""has to return a sequence of (name, type) pairs for the columns
of the relation encoded in the HDF5 file.
"""
raise NotImplementedError("HDF5 getColumns")
def _getDatasetsCode(self):
"""has to return C code to fill datasets[0...input_columns] with
the values the C code should to process.
"""
raise NotImplementedError("HDF5 _getDatasetsCode")
[docs]class HDF5vaexCodeGenerator(BaseHDF5CodeGenerator):
"""A code generator for boosters importing HDF5 files in VAEX convention.
These have one array per column in a "columns" group; the actual data
is in a "data" group.
Our strategy when parsing from them is to read CHUNK_SIZE items
at a time into the corresponding arrays, and then iterating
over these chunks, building the records.
"""
[docs] def getColumns(self, hdf):
# SIDE EFFECT: self.dsName is the dataset name to read from after
# this function.
try:
self.dsName = self.grammar.getProperty("dataset", "table")
cols = hdf[self.dsName]["columns"]
except KeyError:
raise base.StructureError(f"Cannot access dataset {self.dsName} in"
f" {self.sampleSource}. Override the grammar's dataset property"
" to point it to the right dataset.")
return [(name, col["data"].dtype) for name, col in cols.items()]
def _getDatasetsCode(self):
"""returns code to create the datasets for the columns; the
result is kept in a datasets array.
"""
res = []
for index, (name, _, cType) in enumerate(self.inputColumns):
col_ds_name = "/".join([self.dsName, "columns", name, "data"])
res.extend([
f" datasets[{index}] = H5Dopen2("
f'input_id, "{col_ds_name}", H5P_DEFAULT);',
f" if (datasets[{index}]<0) abort();"])
return "\n".join(res)
[docs]class HDF5rootarrCodeGenerator(BaseHDF5CodeGenerator):
"""A code generator for boosters importing HDF5 files having
their columns in arrays in the root dataset.
"""
[docs] def getColumns(self, hdf):
return [(name, col.dtype) for name, col in hdf.items()]
def _getDatasetsCode(self):
"""returns code to create the datasets for the columns; the
result is kept in a datasets array.
"""
res = []
for index, (name, _, cType) in enumerate(self.inputColumns):
res.extend([
f" datasets[{index}] = H5Dopen2("
f'input_id, "{name}", H5P_DEFAULT);',
f" if (datasets[{index}]<0) abort();"])
return "\n".join(res)