"""
Making data out of descriptors and sources.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import itertools
import operator
import sys
from gavo import base
from gavo import rscdef
from gavo import utils
from gavo.rsc import common
from gavo.rsc import dbtable
from gavo.rsc import table
from gavo.rsc import tables
from functools import reduce
MS = base.makeStruct
class _DataFeeder(table._Feeder):
"""is a feeder for data (i.e., table collections).
This is basically a collection of all feeders of the tables belonging
to data, except it will also call the table's mappers, i.e., add
expects source rows from data's grammars.
Feeders can be dispatched; this only works if the grammar returns
pairs of role and row rather than only the row. Dispatched
feeders only pass rows to the makes corresponding to the role.
If you pass in a connection, the data feeder will manage it (i.e.
commit if all went well, rollback otherwise).
"""
def __init__(self, data, batchSize=1024, dispatched=False,
runCommit=True, connection=None, dumpIngestees=False):
self.data, self.batchSize = data, batchSize
self.runCommit = runCommit
self.nAffected = 0
self.connection = connection
self.dumpIngestees = dumpIngestees
if dispatched:
makeAdders = self._makeFeedsDispatched
else:
makeAdders = self._makeFeedsNonDispatched
addersDict, parAddersDict, self.feeders = self._getAdders()
self.add, self.addParameters = makeAdders(addersDict, parAddersDict)
def _getAdders(self):
"""returns a triple of (rowAdders, parAdders, feeds) for the data we
feed to.
rowAdders contains functions to add raw rows returned from a grammar,
parAdders the same for parameters returned by the grammar, and
feeds is a list containing all feeds the adders add to (this
is necessary to let us exit all of them.
"""
adders, parAdders, feeders = {}, {}, []
for make in self.data.makes:
table = self.data.tables[make.table.id]
feeder = table.getFeeder(batchSize=self.batchSize)
makeRow = make.rowmaker.compileForTableDef(table.tableDef)
def addRow(srcRow, feeder=feeder, makeRow=makeRow, table=table):
try:
procRow = makeRow(srcRow, table)
if self.dumpIngestees:
print("PROCESSED ROW:", procRow)
feeder.add(procRow)
except rscdef.IgnoreThisRow:
pass
if make.rowSource=="parameters":
parAdders.setdefault(make.role, []).append(addRow)
else:
adders.setdefault(make.role, []).append(addRow)
if make.parmaker:
parAdders.setdefault(make.role, []).append(
lambda row, m=make, t=table: m.runParmakerFor(row, t))
feeders.append(feeder)
return adders, parAdders, feeders
def _makeFeedsNonDispatched(self, addersDict, parAddersDict):
adders = reduce(operator.add, list(addersDict.values()), [])
parAdders = reduce(operator.add, list(parAddersDict.values()), [])
def add(row):
for adder in adders:
adder(row)
def addParameters(row):
for adder in parAdders:
adder(row)
return add, addParameters
def _makeFeedsDispatched(self, addersDict, parAddersDict):
def add(roleRow):
role, row = roleRow
if role not in addersDict:
raise base.ReportableError("Grammar tries to feed to role '%s',"
" but there is no corresponding make"%role)
for adder in addersDict[role]:
adder(row)
# for parameters, allow broadcast
def addParameters(roleRow):
try:
role, row = roleRow
except ValueError:
# assume we only got a row, broadcast it
for adder in itertools.chain(*list(parAddersDict.values())):
adder(roleRow)
else:
for adder in parAddersDict[role]:
adder(row)
return add, addParameters
def flush(self):
for feeder in self.feeders:
feeder.flush()
def reset(self):
for feeder in self.feeders:
feeder.reset()
def __enter__(self):
for feeder in self.feeders:
feeder.__enter__()
return self
def _breakCycles(self):
del self.feeders
del self.add
del self.addParameters
def _exitFailing(self, *excInfo):
"""calls all subordinate exit methods when there was an error in
the controlled block.
This ignores any additional exceptions that might come out of
the exit methods.
The connection is rolled back, and we unconditionally propagate
the exception.
"""
for feeder in self.feeders:
try:
feeder.__exit__(*excInfo)
except:
base.ui.notifyError("Ignored exception while exiting data feeder"
" on error.")
if self.connection and self.runCommit:
self.connection.rollback()
self._breakCycles()
def _exitSuccess(self):
"""calls all subordinate exit methods when the controlled block
exited successfully.
If one of the exit methods fails, we run _exitFailing and re-raise
the exception.
If all went well and we control a connection, we commit it (unless
clients explicitly forbid it).
"""
affected = []
for feeder in self.feeders:
try:
feeder.__exit__(None, None, None)
except:
self._exitFailing(*sys.exc_info())
raise
affected.append(feeder.getAffected())
if self.connection and self.runCommit:
self.connection.commit()
self._breakCycles()
if affected:
self.nAffected = max(affected)
def __exit__(self, *excInfo):
if excInfo and excInfo[0]:
return self._exitFailing(*excInfo)
else:
self._exitSuccess()
def getAffected(self):
return self.nAffected
[docs]class Data(base.MetaMixin, common.ParamMixin):
"""A collection of tables.
``Data``, in essence, is the instantiation of a ``DataDescriptor``.
It is what ``makeData`` returns. In typical one-table situations,
you just want to call the ``getPrimaryTable()`` method to obtain the
table built.
These also have an attribute contributingMetaCarriers, a list of
base.MetaCarrier-s used by votablewrite to create Data Origin INFO-s. By
default, that's the first table. You can add to that attribute
"""
def __init__(self,
dd,
tables,
parseOptions=common.parseNonValidating,
overrideMakes=None):
base.MetaMixin.__init__(self) # we're not a structure
self.dd, self.parseOptions = dd, parseOptions
self.makes = overrideMakes or self.dd.makes
self.tables = tables
self.contributingMetaCarriers = []
self.setMetaParent(self.dd)
self._initParams(self.dd)
def __iter__(self):
for make in self.makes:
yield self.tables[make.table.id]
[docs] @classmethod
def create(cls, dd, parseOptions=common.parseNonValidating,
connection=None):
"""returns a new data instance for dd.
Existing tables on the database are not touched. To actually
re-create them, call recrateTables.
"""
controlledTables = {}
res = cls(dd, controlledTables, parseOptions)
for make in dd.makes:
controlledTables[make.table.id
] = make.create(connection,
parseOptions,
tables.TableForDef,
parent=res)
return res
[docs] @classmethod
def drop(cls, dd, parseOptions=common.parseNonValidating, connection=None):
"""drops all tables made by dd if necessary.
"""
controlledTables = {}
for make in dd.makes:
controlledTables[make.table.id
] = tables.TableForDef(
make.table,
create=False,
connection=connection,
make=make)
data = cls(dd, controlledTables, parseOptions)
data.dropTables(parseOptions)
return data
[docs] @classmethod
def createWithTable(cls,
dd,
tableDef,
parseOptions=common.parseNonValidating):
"""builds a table for tableDef with this data item.
This is for when there are many rather similar table structures
that can all be built with the same data item.
This can only work if dd only has one make.
"""
if len(dd.makes)!=1:
raise base.ReportableError("Data.create_table is only"
" allowed on one-make DDs")
if tableDef.onDisk:
raise base.ReportableError("Data.create_table so far"
" doesn't allow onDisk tables (this is because nobody"
" has thought about it; it would probably be harmless).")
controlledTables = {tableDef.id: tables.TableForDef(tableDef)}
res = cls(dd, controlledTables, parseOptions,
overrideMakes=[dd.makes[0].change(table=tableDef)])
res.contributingMetaCarriers.append(tableDef)
return res
[docs] def dbCatalogChanged(self):
"""returns true if a database table has been newly created by
this class.
"""
return any([
isinstance(t, dbtable.DBTable)
and t.newlyCreated for t in list(self.tables.values())])
[docs] def validateParams(self):
"""raises a ValidationError if any required parameters within
this data's tables are still None.
"""
for t in self:
t.validateParams()
[docs] def dropTables(self, parseOptions):
for t in self:
if t.tableDef.onDisk:
if not parseOptions.systemImport and t.tableDef.system:
continue
t.drop()
[docs] def recreateTables(self, connection):
"""drops and recreates all table that are onDisk.
System tables are only recreated when the systemImport parseOption
is true.
"""
if self.dd.updating:
if self.parseOptions.dropIndices:
for t in self:
if t.tableDef.onDisk:
t.dropIndices()
for t in self:
if t.newlyCreated:
t.runScripts("preImport")
return
for t in self:
if t.tableDef.system and not self.parseOptions.systemImport:
continue
if t.tableDef.onDisk and not t.newlyCreated:
t.recreate()
t.runScripts("preImport")
[docs] def getParam(self, paramName, default=base.NotGiven):
"""returns self's parameter of paramName, or, failing that, paramName
from self's primaryTable.
"""
try:
return common.ParamMixin.getParam(self, paramName, default)
except base.NotFoundError:
try:
return self.getPrimaryTable().getParam(paramName, default)
except (base.DataError, base.NotFoundError):
# param not in primary table, or no primary table:
# raise first exception
pass
raise
[docs] def getPrimaryTable(self):
"""returns the table contained if there is only one, or the one
with the role primary.
If no matching table can be found, raise a DataError.
"""
if len(self.tables)==1:
return list(self.tables.values())[0]
try:
return self.tables[self.dd.getPrimary().id]
except (KeyError, base.StructureError):
raise base.DataError(
"No primary table in this data")
[docs] def getTableWithRole(self, role):
try:
return self.tables[self.dd.getTableDefWithRole(role).id]
except (KeyError, base.StructureError):
raise base.DataError(
"No table with role %s known here"%repr(role))
[docs] def getFeeder(self, **kwargs):
return _DataFeeder(self, **kwargs)
[docs] def runScripts(self, phase, **kwargs):
for make in self.makes:
make.getRunner()(self.tables[make.table.id], phase, **kwargs)
class _EnoughRows(base.ExecutiveAction):
"""is an internal exception that allows processSource to tell makeData
to stop handling more sources.
"""
def _pipeRows(srcIter, feeder, opts):
pars = srcIter.getParameters()
if opts.dumpIngestees:
print("PROCESSED PARAMS:", pars)
feeder.addParameters(pars)
for srcRow in srcIter:
if srcRow is common.FLUSH:
feeder.flush()
continue
if srcIter.notify:
base.ui.notifyIncomingRow(srcRow)
if opts.dumpRows:
print(srcRow)
feeder.add(srcRow)
if opts.maxRows:
if base.ui.totalRead>=opts.maxRows:
raise _EnoughRows
def _processSourceReal(data, source, feeder, opts):
"""helps processSource.
"""
if data.dd.grammar is None:
raise base.ReportableError("The data descriptor %s cannot be used"
" to make data since it has no defined grammar."%data.dd.id)
data.runScripts("newSource", sourceToken=source)
srcIter = data.dd.grammar.parse(source, data)
if hasattr(srcIter, "getParameters"): # is a "normal" grammar
try:
_pipeRows(srcIter, feeder, opts)
except (base.Error,base.ExecutiveAction):
raise
except Exception as msg:
raise base.ui.logOldExc(
base.SourceParseError(repr(msg),
source=utils.makeLeftEllipsis(repr(source), 80),
location=srcIter.getLocator()))
else: # magic grammars (like those of boosters) return a callable
srcIter(data)
data.runScripts("sourceDone", sourceToken=source, feeder=feeder)
[docs]def processSource(data, source, feeder, opts, connection=None):
"""ingests source into the Data instance data.
If this builds database tables, you must pass in a connection object.
If opts.keepGoing is True,the system will continue importing
even if a particular source has caused an error. In that case,
everything contributed by the bad source is rolled back (this will
only work when filling database tables).
"""
if not opts.keepGoing:
# simple shortcut if we don't want to recover from bad sources
_processSourceReal(data, source, feeder, opts)
else: # recover from bad sources, be more careful
if connection is None:
raise base.ReportableError("Can only ignore source errors"
" when filling database tables.",
hint="The -c flag on dachs imp and its friends builds on database"
" savepoints. You can thus only meaningfully use it when your"
" table has onDisk='True'.")
try:
with connection.savepoint():
_processSourceReal(data, source, feeder, opts)
feeder.flush()
except base.ExecutiveAction:
raise
except Exception as ex:
feeder.reset()
base.ui.notifyError("Error while importing source %s; changes from"
" this source will be rolled back, processing will continue."
" (%s)"%(
utils.makeSourceEllipsis(source),
utils.safe_str(ex)))
class _TableCornucopeia(object):
"""a scaffolding class instances of which return something (eventually
table-like) for all keys it is asked for.
"""
def __getitem__(self, key):
return None
def __len__(self):
# is is mainly a signal to getPrimaryTable to try a bit harder
# when we stand in for data.tables.
return 0
[docs]class MultiForcedSources:
"""This lets you pass in arbitrary sequences as forceSource in makeData.
Without this, the list will be interpreted as a single source.
"""
def __init__(self, seq):
self.seq = seq
[docs] def iterSources(self, connection):
return iter(self.seq)
[docs]def makeData(dd, parseOptions=common.parseNonValidating,
forceSource=None, connection=None, data=None, runCommit=True):
"""returns a data instance built from ``dd``.
It will arrange for the parsing of all tables generated from dd's grammar.
If database tables are being made, you *must* pass in a connection.
The entire operation will then run within a single transaction within
this connection (except for building dependents; they will be built
in separate transactions).
The connection will be rolled back or committed depending on the
success of the operation (unless you pass ``runCommit=False``, in
which case even a successful import will not be committed)..
You can pass in a data instance created by yourself in data. This
makes sense if you want to, e.g., add some meta information up front.
makeData will usually iterate over the sources given in dd. You
can override this with forceSource, which can contain a single
source passed to a grammar. If you need to pass in multiple
sources, use a MultiForcedSources object (or anything that has
an iterSources(dbConnection) method).
"""
# Some proc setup does expensive things like actually building data.
# We don't want that when validating and return some empty data thing.
if getattr(base, "VALIDATING", False):
return Data(dd, _TableCornucopeia())
if data is None:
res = Data.create(dd, parseOptions, connection=connection)
else:
res = data
res.recreateTables(connection)
feederOpts = {"batchSize": parseOptions.batchSize, "runCommit": runCommit,
"dumpIngestees": parseOptions.dumpIngestees}
if dd.grammar and dd.grammar.isDispatching:
feederOpts["dispatched"] = True
with res.getFeeder(connection=connection, **feederOpts) as feeder:
if forceSource is None:
sources = dd.iterSources(connection)
else:
if hasattr(forceSource, "iterSources"):
sources = forceSource.iterSources(connection)
else:
sources = [forceSource]
for source in sources:
try:
processSource(res, source, feeder, parseOptions, connection)
except _EnoughRows:
base.ui.notifyWarning("Source hit import limit, import aborted.")
break
except base.SkipThis:
continue
res.validateParams()
res.nAffected = feeder.getAffected()
if parseOptions.buildDependencies:
makeDependentsFor(
[dd],
parseOptions,
connection,
res.dbCatalogChanged())
return res
[docs]class DDDependencyGraph(object):
"""a graph giving the dependency structure between DDs.
This is constructed with a list of DDs.
From it, you can get a build sequence (least-depending thing build first) or
a destroy sequence (most-depending things built first).
If you pass spanRDs=True, only DDs residing within the first DD's RD are
considered.
"""
def __init__(self, dds, spanRDs=True):
self.limitToRD = None
if not spanRDs and dds:
self.limitToRD = dds[0].rd
self._edges, self._seen = set(), set()
self._gather(dds)
def _gatherOne(self, dd):
for dependentId in dd.dependents:
try:
dependentDD = base.resolveId(dd.rd, dependentId)
if self.limitToRD and self.limitToRD!=dependentDD.rd:
continue
self._edges.add((dd, dependentDD))
if dependentDD not in self._seen:
self._seen.add(dependentDD)
self._gatherOne(dependentDD)
except (base.StructureError, base.NotFoundError) as msg:
base.ui.notifyWarning("Ignoring dependent %s of %s (%s)"%(
dependentId, dd.getFullId(), str(msg)))
def _gather(self, dds):
for dd in dds:
self._gatherOne(dd)
[docs] def getBuildSequence(self):
return utils.topoSort(self._edges)
[docs] def getDestroySequence(self):
inverted = [(b,a) for a, b in self._edges]
return utils.topoSort(inverted)
[docs]def makeDependentsFor(dds, parseOptions, connection, sysCatChanged):
"""rebuilds all data dependent on one of the DDs in the dds sequence.
"""
if parseOptions.buildDependencies:
parseOptions = parseOptions.change(buildDependencies=False)
try:
buildSequence = DDDependencyGraph(dds).getBuildSequence()
except ValueError as ex:
raise utils.logOldExc(base.ReportableError("Could not sort"
" dependent DDs topologically (use --hints to learn more).",
hint="This is most likely because there's a cyclic dependency."
" Please check your dependency structure. The original message"
" is: %s"%utils.safe_str(ex)))
# remove DDs passed in from the build sequence, as long as nothing
# is built in between (which might necessitate a re-build of something
# we already did)
for dd in buildSequence[:]:
if dd in dds:
buildSequence.pop(0)
else:
break
if parseOptions.metaOnly:
# TODO: Is metaOnly just a special case of sysCatChange=False?
if buildSequence:
base.ui.notifyWarning("Only importing metadata, not rebuilding"
" dependencies. Depending on your changes, it may be"
" necessary to manually re-make one of these: %s"%
", ".join(dd.getFullId() for dd in buildSequence))
else:
for dd in buildSequence:
if sysCatChanged or dd.remakeOnDataChange:
base.ui.notifyInfo("Making dependent %s"%dd.getFullId())
makeData(dd, parseOptions=parseOptions, connection=connection)
[docs]def makeDataById(ddId, parseOptions=common.parseNonValidating,
connection=None, inRD=None):
"""returns the data set built from the DD with ddId (which must be
fully qualified).
"""
dd = base.resolveId(inRD, ddId)
return makeData(dd, parseOptions=parseOptions, connection=connection)
[docs]def wrapTable(table, rdSource=None, resTypeDefault="results"):
"""returns a Data instance containing only table (or table if it's already
a data instance).
If table has no rd, you must pass rdSource, which must be an object having
and rd attribute (rds, tabledefs, etc, work).
resTypeDefault will be used as the new data item's _type meta. If you
want to override that later, use setMeta("_type"...) rather than addMeta.
This will grab info meta from the table.
"""
if hasattr(table, "dd"):
# we trust it's already a Data instance (don't want to use isinstance
# here since people may pass in fakes).
return table
if rdSource is None:
rd = table.tableDef.rd
elif hasattr(rdSource, "rd"):
rd = rdSource.rd
else:
raise TypeError("Invalid RD source: %s"%rdSource)
newDD = MS(rscdef.DataDescriptor, makes=[
MS(rscdef.Make, table=table.tableDef, rowmaker=None)], parent_=rd)
if rdSource:
newDD.adopt(table.tableDef)
newDD.setMetaParent(table.tableDef.rd)
res = Data(newDD, tables={table.tableDef.id: table})
res.contributingMetaCarriers.append(table)
res.setMeta("_type", resTypeDefault)
for infoMeta in table.iterMeta("info"):
res.addMeta("info", infoMeta)
for mi in table.iterMeta("_votableRootAttributes", propagate=False):
res.addMeta("_votableRootAttributes", mi)
return res
[docs]def makeCombinedData(baseDD, tablesForRoles):
"""returns a Data instance containing all of tablesForRoles.
A DD is being generated based on baseDD; if baseDD has any tables, they are
discarded.
tablesForRoles is a mapping from strings (one of which should be "primary")
to tables; the strings end up as roles.
"""
newDD = baseDD.change(
makes=[MS(rscdef.Make, table=t.tableDef, rowmaker=None, role=role)
for role, t in tablesForRoles.items()])
newDD.meta_ = baseDD._metaAttr.getCopy(baseDD, newDD, None)
return Data(newDD, tables=dict((t.tableDef.id, t)
for t in list(tablesForRoles.values())))