"""
Definition of rowmakers.
rowmakers are objects that take a dictionary of some kind and emit
a row suitable for inclusion into a table.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import bisect
import fnmatch
import re
import sys
import traceback
from gavo import base
from gavo import utils
from gavo.rscdef import common
from gavo.rscdef import column
from gavo.rscdef import procdef
from gavo.rscdef import rmkfuncs
from gavo.rscdef import rowtriggers
__docformat__ = "restructuredtext en"
[docs]class Error(base.Error):
pass
[docs]class MappedExpression(base.Structure):
"""a base class for map and var.
You must give a destDict class attribute to make these work.
"""
destDict = None
restrictedMode = False
_dest = column.ColumnNameAttribute("key",
default=base.Undefined,
description="Name of the column the value is to end up in.",
copyable=True,
strip=True,
aliases=["dest", "name"])
_src = base.UnicodeAttribute("source",
default=None,
description="Source key name to convert to column value (either a grammar"
" key or a var).",
copyable=True,
strip=True,
aliases=["src"])
_nullExcs = base.UnicodeAttribute("nullExcs",
default=base.NotGiven,
description="Exceptions that should be caught and"
" cause the value to be NULL, separated by commas.")
_expr = base.DataContent(
description="A python expression giving the value for key.",
copyable=True,
strip=True)
_nullExpr = base.UnicodeAttribute("nullExpr",
default=base.NotGiven,
description="A python expression for a value that is mapped to"
" NULL (None). Equality is checked after building the value, so"
" this expression has to be of the column type. Use map with"
" the parseWithNull function to catch null values before type"
" conversion.")
[docs] def completeElement(self, ctx):
self.restrictedMode = getattr(ctx, "restricted", False)
if self.restrictedMode and (
self.content_
or self.nullExpr
or self.nullValue):
raise base.RestrictedElement("map", hint="In restricted mode, only"
" maps with a source attribute are allowed; nullExpr or nullValue"
" are out, too, since they can be used to inject raw code.")
if not self.content_ and not self.source:
self.source = self.key
if self.content_ and "\\" in self.content_:
self.content_ = self.parent.expand(self.content_)
[docs] def validate(self):
"""checks that code content is a parseable python expression and that
the destination exists in the tableDef
"""
super().validate()
if (self.content_ and self.source) or not (self.content_ or self.source):
raise base.StructureError("Map must have exactly one of source attribute"
" or element content")
if self.nullExpr is not base.NotGiven:
utils.ensureExpression(self.nullExpr)
if self.content_:
utils.ensureExpression(common.replaceProcDefAt(self.content_), self.name_)
if self.nullExcs is not base.NotGiven:
utils.ensureExpression(self.nullExcs, "%s.nullExcs"%(self.name_))
[docs] def getAutoMapper(self):
"""returns an expression literal for turning what is self.source to a
value suitable for self.key.
This must be defined for concrete classes derived from this.
"""
raise NotImplementedError("No automatic mappers defined here")
[docs] def getCode(self, columns):
"""returns python source code for this map.
"""
code = []
if isinstance(self.key, utils.QuotedName):
destIndex = '"%s"'%(self.key.name.replace('"', '\\"'))
else:
destIndex = '"%s"'%self.key
if self.content_:
code.append('%s[%s] = %s'%(self.destDict, destIndex, self.content_))
else:
code.append('%s[%s] = %s'%(self.destDict,
destIndex,
self.getAutoMapper(columns)))
if self.nullExpr is not base.NotGiven:
code.append('\nif %s[%s]==%s: %s[%s] = None'%(
self.destDict,
destIndex,
self.nullExpr,
self.destDict,
destIndex))
code = "".join(code)
if self.nullExcs is not base.NotGiven:
code = 'try:\n%s\nexcept (%s): %s[%s] = None'%(
re.sub("(?m)^", " ", code),
self.nullExcs,
self.destDict,
destIndex)
return code
[docs]class MapRule(MappedExpression):
"""A mapping rule.
To specify the source of a mapping, you can either
- grab a value from what's emitted by the grammar or defined using var via
the source attribute. The value given for source is converted to a
python value and stored.
- or give a python expression in the body. In that case, no further
type conversion will be attempted.
If neither source or a body is given, map uses the key attribute as its
source attribute.
The map rule generates a key/value pair in the result record.
"""
name_ = "map"
destDict = "result"
[docs] def getAutoMapper(self, columns):
"""returns an expression to automatically map self.source to
a column in the destination table.
"""
colDef = columns.getColumnByName(self.key)
try:
return base.sqltypeToPythonCode(colDef.type)%(
'vars["{}"]'.format(
self.source.replace("\\", r"\\").replace('"', '\\"')))
except base.ConversionError:
raise base.ui.logOldExc(base.LiteralParseError("map", colDef.type,
hint="Auto-mapping to %s is impossible since"
" no default map for %s is known"%(self.key, colDef.type)))
[docs]class VarDef(MappedExpression):
"""A definition of a rowmaker variable.
It consists of a name and a python expression, including function
calls. The variables are entered into the input row coming from
the grammar.
var elements are evaluated before apply elements, in the sequence
they are in the RD. You can refer to keys defined by vars already
evaluated in the usual @key manner.
"""
name_ = "var"
destDict = "vars"
[docs] def getAutoMapper(self, columns):
"""returns var[self.source].
Having source with var doesn't make a lot of sense, but it's
a nifty way to introduce None-s for missing keys if one wants.
And it should do *something*.
"""
return 'vars["{}"]'.format(
self.source.replace("\\", r"\\").replace('"', '\\"'))
[docs]class ApplyDef(procdef.ProcApp):
"""A code fragment to manipulate the result row (and possibly more).
Apply elements allow embedding python code in rowmakers.
The current input fields from the grammar (including the rowmaker's vars)
are available in the vars dictionary and can be changed there. You can
also add new keys.
You can add new keys for shipping out in the result dictionary.
The active rowmaker is available as parent. It is also used to
expand macros.
The table that the rowmaker feeds to can be accessed as targetTable.
You probably only want to change meta information here (e.g., warnings
or infos).
As always in procApps, you can get the embedding RD as rd; this is
useful to, e.g., resolve references using rd.getByRD, and specify
resdir-relative file names using rd.getAbsPath.
"""
name_ = "apply"
requiredType = "apply"
formalArgs = "vars, result, targetTable, _self"
[docs] def getFuncCode(self):
return common.replaceProcDefAt(procdef.ProcApp.getFuncCode(self))
[docs]class RowmakerMacroMixin(base.StandardMacroMixin):
"""A collection of macros available to rowmakers.
NOTE: All macros should return only one single physical python line,
or they will mess up the calculation of what constructs caused errors.
"""
[docs] def macro_standardPubDID(self):
r"""returns the "standard publisher DID" for the current product.
The publisher dataset identifier (PubDID) is important in protocols like
SSAP and obscore. If you use this macro, the PubDID will be your
authority, the path component ~, and the current value of @prodtblAccref.
It thus will only work where products#define (or a replacement) is in
action. If it isn't, a normal function call
getStandardPubDID(\\inputRelativePath) would be an obvious alternative.
You *can* of course define your PubDIDs in a different way.
"""
return ('getStandardPubDID(vars["prodtblAccref"])')
[docs] def macro_fullPath(self):
"""returns an expression expanding to the full path of the current
input file.
"""
return 'vars["parser_"].sourceToken'
[docs] def macro_rowsProcessed(self):
"""returns an expression giving the number of records already
delivered by the grammar.
"""
return 'vars["parser_"].recNo'
[docs] def macro_rowsMade(self):
"""returns an expression giving the number of records already
returned by this row maker.
This number excludes failed and skipped rows.
"""
return '_self.rowsMade'
[docs] def macro_property(self, propName):
"""returns an expression giving the value of the property propName
on the current DD.
"""
return 'curDD_.getProperty("%s")'%propName
[docs] def macro_sourceDate(self):
"""returns an expression giving the timestamp of the current source.
This is a timestamp of the modification date; use dateTimeToJdn or
dateTimeToMJD to turn this into JD or MJD (which is usually preferred
in database tables). See also the sourceCDate macro.
"""
return ('datetime.datetime.utcfromtimestamp('
'os.path.getmtime(vars["parser_"].sourceToken))')
[docs] def macro_sourceCDate(self):
"""returns an expression giving the timestamp for the create
date of the current source.
Use dateTimeToJdn or dateTimeToMJD to turn this into JD or MJD (which
is usually preferred in database tables). See also the sourceDate macro.
"""
return ('datetime.datetime.utcfromtimestamp('
'os.path.getctime(vars["parser_"].sourceToken))')
[docs] def macro_srcstem(self):
"""returns python code for the stem of the source file currently parsed in a rowmaker.
Example: if you're currently parsing /tmp/foo.bar.gz, the stem is foo.
"""
return ('getFileStem(vars["parser_"].sourceToken)')
[docs] def macro_lastSourceElements(self, numElements):
"""returns an expression calling rmkfuncs.lastSourceElements on
the current input path.
"""
return 'lastSourceElements(vars["parser_"].sourceToken, %d)'%(
int(numElements))
[docs] def macro_rootlessPath(self):
"""returns an expression giving the current source's path with
the resource descriptor's root removed.
"""
return 'utils.getRelativePath(vars["parser_"].sourceToken, rd_.resdir)'
[docs] def macro_docField(self, name):
"""returns an expression giving the value of the column name in the
document row.
"""
return '_parser.getParameters()[fieldName]'
[docs] def macro_qName(self):
"""returns the qName of the table we are currently parsing into.
"""
return "tableDef_.getQName()"
[docs]class RowmakerDef(base.Structure, RowmakerMacroMixin):
"""A definition of the mapping between grammar input and finished rows
ready for shipout.
Rowmakers consist of variables, procedures and mappings. They
result in a python callable doing the mapping. In python code
within rowmaker elements, you can use a large number of functions.
See `Functions available for row makers`_ in the reference documentation.
RowmakerDefs double as macro packages for the expansion of various
macros. The standard macros will need to be quoted, the rowmaker macros
above yield python expressions.
Within map and var bodies as well as late apply pars and apply bodies,
you can refer to the grammar input as vars["name"] or, shorter @name.
To add output keys, use map or, in apply bodies, add keys to the
``result`` dictionary.
"""
name_ = "rowmaker"
_maps = base.StructListAttribute("maps", childFactory=MapRule,
description="Mapping rules.", copyable=True)
_vars = base.StructListAttribute("vars", childFactory=VarDef,
description="Definitions of intermediate variables.",
copyable=True)
_apps = base.StructListAttribute("apps",
childFactory=ApplyDef, description="Procedure applications.",
copyable=True)
_rd = common.RDAttribute()
_idmaps = base.StringListAttribute("idmaps", description="List of"
' column names that are just "mapped through" (like map with key'
" only); you can use shell patterns to select multiple columns at once.",
copyable=True)
_simplemaps = base.IdMapAttribute("simplemaps", description=
"Abbreviated notation for <map source>; each pair is destination:source",
copyable=True)
_ignoreOn = base.StructAttribute("ignoreOn", default=None,
childFactory=rowtriggers.IgnoreOn, description="Conditions on the"
" input record coming from the grammar to cause the input"
" record to be dropped by the rowmaker, i.e., for this specific"
" table. If you need to drop a row for all tables being fed,"
" use a trigger on the grammar.", copyable=True)
_original = base.OriginalAttribute()
[docs] @classmethod
def makeIdentityFromTable(cls, table, **kwargs):
"""returns a rowmaker that just maps input names to column names.
"""
if "id" not in kwargs:
kwargs["id"] = "autogenerated rowmaker for table %s"%table.id
return base.makeStruct(cls, idmaps=[c.key for c in table], **kwargs)
[docs] @classmethod
def makeTransparentFromTable(cls, table, **kwargs):
"""returns a rowmaker that maps input names to column names without
touching them.
This is for crazy cases in which the source actually provides
pre-parsed data that any treatment would actually ruin.
"""
if "id" not in kwargs:
kwargs["id"] = "autogenerated rowmaker for table %s"%table.id
return base.makeStruct(cls, maps=[
base.makeStruct(MapRule, key=c.name, content_="vars[%s]"%repr(c.name))
for c in table],
**kwargs)
[docs] def completeElement(self, ctx):
if self.simplemaps:
for k,v in self.simplemaps.items():
nullExcs = base.NotGiven
if v.startswith("@"):
v = v[1:]
nullExcs = "KeyError,"
self.feedObject("maps", base.makeStruct(MapRule,
key=k, source=v, nullExcs=nullExcs))
super().completeElement(ctx)
def _getSourceFromColset(self, columns):
"""returns the source code for a mapper to a column set.
"""
lineMap, line = {}, 0
source = []
def appendToSource(srcLine, line, lineMarker):
source.append(srcLine)
line += 1
lineMap[line] = lineMarker
line += source[-1].count("\n")
return line
if self.ignoreOn:
line = appendToSource("if checkTrigger(vars):\n"
" raise IgnoreThisRow(vars)",
line, "Checking ignore")
for v in self.vars:
line = appendToSource(v.getCode(columns), line, "assigning "+str(v.key))
for a in self.apps:
line = appendToSource(
"%s(vars, result, targetTable, _self)"%a.name,
line, "executing "+a.name)
for m in self.maps:
line = appendToSource(m.getCode(columns), line, "building "+str(m.key))
return "\n".join(source), lineMap
def _getSource(self, tableDef):
"""returns the source code for a mapper to tableDef's columns.
"""
return self._getSourceFromColset(tableDef.columns)
def _getGlobals(self, tableDef):
globals = {}
for a in self.apps:
globals[a.name] = a.compile()
if self.ignoreOn:
globals["checkTrigger"] = self.ignoreOn
globals["tableDef_"] = tableDef
globals["rd_"] = tableDef.rd
globals["curDD_"] = tableDef.parent
return globals
def _resolveIdmaps(self, columns):
"""adds mappings for self's idmap within column set.
"""
existingMaps = set(m.key for m in self.maps)
baseNames = [c.key for c in columns]
for colName in self.idmaps:
matching = fnmatch.filter(baseNames, colName)
if not matching:
raise base.NotFoundError(colName, "columns matching", "unknown")
for dest in matching:
if dest not in existingMaps:
self.maps.append(MapRule(self, key=dest).finishElement(None))
self.idmaps = []
def _checkTable(self, columns, id):
"""raises a LiteralParseError if we try to map to non-existing
columns.
"""
for map in self.maps:
try:
columns.getColumnByName(map.key)
except KeyError:
raise base.ui.logOldExc(base.LiteralParseError(self.name_, map.key,
"Cannot map to '%s' since it does not exist in %s"%(
map.key, id)))
def _buildForTable(self, tableDef):
"""returns a RowmakerDef with everything expanded and checked for
tableDef.
This may raise LiteralParseErrors if self's output is incompatible
with tableDef.
"""
res = self.copyShallowly()
try:
res._resolveIdmaps(tableDef.columns)
res._checkTable(tableDef.columns, tableDef.id)
except base.NotFoundError as ex:
ex.within = "table %s's columns"%tableDef.id
raise
return res
def _realCompileForTableDef(self, tableDef):
"""helps compileForTableDef.
"""
rmk = self._buildForTable(tableDef)
source, lineMap = rmk._getSource(tableDef)
globals = rmk._getGlobals(tableDef)
return Rowmaker(common.replaceProcDefAt(source),
self.id or "<rowmaker without id>",
globals, tableDef.getDefaults(), lineMap)
[docs] def compileForTableDef(self, tableDef):
"""returns a function receiving a dictionary of raw values and
returning a row ready for adding to a tableDef'd table.
To do this, we first make a rowmaker instance with idmaps resolved
and then check if the rowmaker result and the table structure
are compatible.
"""
return utils.memoizeOn(tableDef, self, self._realCompileForTableDef,
tableDef)
[docs] def copyShallowly(self):
return base.makeStruct(self.__class__, maps=self.maps[:],
vars=self.vars[:], idmaps=self.idmaps[:],
apps=self.apps[:], ignoreOn=self.ignoreOn)
[docs]class ParmakerDef(RowmakerDef):
name_ = "parmaker"
def _buildForTable(self, tableDef):
res = self.copyShallowly()
try:
res._resolveIdmaps(tableDef.params)
res._checkTable(tableDef.params, tableDef.id)
except base.NotFoundError as ex:
ex.within = "table %s's params"%tableDef.id
raise
return res
def _getSource(self, tableDef):
"""returns the source code for a mapper to tableDef's columns.
"""
return self._getSourceFromColset(tableDef.params)
identityRowmaker = base.makeStruct(RowmakerDef, idmaps="*")
[docs]class Rowmaker(object):
"""A callable that arranges for the mapping of key/value pairs to
other key/value pairs.
Within DaCHS, Rowmakers generate database rows (and parameter dictionaries)
from the output of grammars.
They are constructed with the source of the mapping function, a dictionary of
globals the function should see, a dictionary of defaults, giving keys to be
inserted into the incoming rowdict before the mapping function is called, and
a map of line numbers to names handled in that line.
It is called with a dictionary of locals for the functions (i.e.,
usually the result of a grammar iterRows).
"""
def __init__(self, source, name, globals, defaults, lineMap):
try:
self.code = compile(source, "generated mapper code", "exec")
except SyntaxError as msg:
raise base.ui.logOldExc(
base.BadCode(source, "rowmaker", msg))
self.source, self.name = source, name
globals.update(rmkfuncs.__dict__)
self.globals, self.defaults = globals, defaults
self.keySet = set(self.defaults)
self.lineMap = sorted(lineMap.items())
self.rowsMade = 0
def _guessExSourceName(self, tb):
"""returns an educated guess as to which mapping should have
caused that traceback in tb.
This is done by inspecting the second-topmost stackframe. It
must hold the generated line that, possibly indirectly, caused
the exception. This line should be in the lineMap generated by
RowmakerDef._getSource.
"""
if tb.tb_next:
excLine = tb.tb_next.tb_lineno
base.ui.notifyDebug(
"Here's the traceback:\n%s"%"".join(traceback.format_tb(tb)))
else: # toplevel failure, internal
return "in toplevel (internal failure)"
destInd = min(len(self.lineMap)-1,
bisect.bisect_left(self.lineMap, (excLine, "")))
# If we're between lineMap entries, the one before the guessed one
# is the one we want
if self.lineMap[destInd][0]>excLine and destInd:
destInd -= 1
return self.lineMap[destInd][1]
def _guessError(self, ex, rowdict, tb):
"""tries to shoehorn a ValidationError out of ex.
"""
base.ui.notifyDebug("Rowmaker failed. Exception below. Failing source"
" is:\n%s"%self.source)
destName = self._guessExSourceName(tb)
if isinstance(ex, KeyError):
msg = "Key %s not found in a mapping."%str(ex)
hint = ("This probably means that your grammar did not yield the"
" field asked for. Alternatively, bugs in procs might also"
" cause this.")
else:
msg = str(ex)
hint = ("This is a failure in more-or-less user-provided code."
" If you run again with the global --debug flag, the source of"
" the failing code should be in the logs/dcInfos (but make"
" sure it's the source the error is reported for; with procs,"
" this might not be the case).")
raise base.ui.logOldExc(base.ValidationError("While %s in %s: %s"%(
destName, self.name, msg), destName.split()[-1], rowdict,
hint=hint))
def __call__(self, vars, table):
try:
locals = {
"vars": vars,
"result": {},
"_self": self,
"targetTable": table
}
missingKeys = self.keySet-set(vars)
for k in missingKeys:
vars[k] = self.defaults[k]
exec(self.code, self.globals, locals)
self.rowsMade += 1
return locals["result"]
except base.ExecutiveAction: # pass these on
raise
except base.ValidationError: # hopefully downstream knows better than we
raise
except Exception as ex:
self._guessError(ex, locals["vars"], sys.exc_info()[2])