"""
Tables on disk
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import sys
from gavo import base
from gavo import rscdef
from gavo import utils
from gavo.base import sqlsupport
from gavo.rsc import common
from gavo.rsc import table
class _Feeder(table._Feeder):
"""A context manager for feeding data into a table.
This feeder hands through batchSize items at a time to the database.
After an exit, the instances have an nAffected attribute that says
how many rows were processed by the database through this feeder.
A feeder is constructed with a parent table (that also provides
the connection), an insert command, and potentially some options.
Note that the table feeder does *not* do any connection management.
You have to commit or rollback yourself (or do it properly and go
through data, which can do connection management).
"""
def __init__(self, parent, insertCommand, batchSize=2000, notify=True):
self.nAffected, self.notify = 0, notify
table._Feeder.__init__(self, parent)
self.feedCommand, self.batchSize = insertCommand, batchSize
self.batchCache = []
def shipout(self):
if self.batchCache:
try:
self.cursor.executemany(self.feedCommand, self.batchCache)
except sqlsupport.IntegrityError:
base.ui.notifyInfo("One or more of the following rows clashed: "+
str(self.batchCache))
raise
except sqlsupport.DataError:
base.ui.notifyInfo("Bad input. Run with -b1 to pin down offending"
" record. First rec: %s"%self.batchCache[0])
raise
self.nAffected += self.cursor.rowcount
if self.notify:
base.ui.notifyShipout(len(self.batchCache))
self.batchCache = []
def add(self, data):
self._assertActive()
if self.table.validateRows:
try:
self.table.tableDef.validateRow(data)
except rscdef.IgnoreThisRow:
return
self.batchCache.append(data)
if len(self.batchCache)>=self.batchSize:
self.shipout()
def flush(self):
self._assertActive()
self.shipout()
def reset(self):
self._assertActive()
self.batchCache = []
def __enter__(self):
self.cursor = self.table.connection.cursor()
return table._Feeder.__enter__(self)
def __exit__(self, *args):
if not args or args[0] is None: # regular exit, ship out
try:
self.shipout()
# The following sucks, but rowcount seems to be always 1 on insert operations.
# However, we at least want a chance to catch update operations matching
# nothing. So, if rowcount is 0, it's a sign something went wrong, and
# we want to override our initial guess.
if self.cursor.rowcount==0:
self.nAffected = 0
self.cursor.close()
except:
del self.cursor
table._Feeder.__exit__(self, *sys.exc_info())
raise
if hasattr(self, "cursor"):
del self.cursor
table._Feeder.__exit__(self, *args)
return False
def getAffected(self):
return self.nAffected
class _RaisingFeeder(_Feeder):
"""is a feeder that will bomb on any attempt to feed data to it.
It is useful for tables that can't be written, specifically, views.
"""
def add(self, data):
raise base.DataError("Attempt to feed to a read-only table")
[docs]class DBMethodsMixin(sqlsupport.QuerierMixin):
"""is a mixin for on-disk tables.
The parent must have tableDef, tableName (from tabledef.getQName()),
and connection attributes.
"""
scripts = None # set by data on import, defined by make
def _definePrimaryKey(self):
if self.tableDef.primary and not self.hasIndex(self.tableName,
self.getPrimaryIndexName(self.tableDef.id)):
if not self.tableDef.system:
base.ui.notifyIndexCreation("Primary key on %s"%self.tableName)
try:
self.connection.execute("ALTER TABLE %s ADD PRIMARY KEY (%s)"%(
self.tableName, ", ".join(self.tableDef.primary)))
except sqlsupport.DBError as msg:
raise base.ui.logOldExc(
common.DBTableError("Primary key %s could not be added (%s)"%(
self.tableDef.primary, repr(str(msg))), self.tableName,
hint="The howDoI documentation text may contain help on"
" how to find the offending elements."))
def _dropPrimaryKey(self):
"""drops a primary key if it exists.
*** Postgres specific ***
"""
constraintName = str(self.getPrimaryIndexName(self.tableDef.id))
if self.tableDef.primary and self.hasIndex(
self.tableName, constraintName):
self.connection.execute("ALTER TABLE %s DROP CONSTRAINT %s"%(
self.tableName, constraintName))
def _addForeignKeys(self):
"""adds foreign key constraints if necessary.
"""
for fk in self.tableDef.foreignKeys:
if not self.tableDef.system:
base.ui.notifyIndexCreation(
self.tableDef.expand(fk.getDescription()))
fk.create(self)
def _dropForeignKeys(self):
"""drops foreign key constraints if necessary.
"""
for fk in self.tableDef.foreignKeys:
fk.delete(self)
[docs] def dropIndices(self):
if not self.exists():
return
self._dropForeignKeys()
# Don't drop the primary key for now -- foreign key relationships may
# depend on it, and postgres doesn't seem to mind if we just create it later.
# This is presumably still bad if someone changed the primary key. Let's
# worry about it then.
# self._dropPrimaryKey()
for index in reversed(self.tableDef.indices):
index.drop(self)
return self
[docs] def makeIndices(self):
"""creates all indices on the table, including any definition of
a primary key.
"""
self.connection.execute("SET maintenance_work_mem=%s000"%
base.getConfig("db", "indexworkmem"))
if self.suppressIndex or not self.exists():
return
if self.tableDef.primary:
self._definePrimaryKey()
for index in self.tableDef.indices:
index.create(self)
self._addForeignKeys()
return self
[docs] def getDeleteQuery(self, matchCondition, pars={}):
return "DELETE FROM %s WHERE %s"%(
self.tableName, matchCondition), pars
[docs] def deleteMatching(self, matchCondition, pars={}):
"""deletes all rows matching matchCondition.
For now, matchCondition a boolean SQL expression. All rows matching
it will be deleted.
"""
self.connection.execute(*self.getDeleteQuery(matchCondition, pars))
[docs] def copyIn(self, inFile, binary=True):
fmt = " WITH BINARY" if binary else ""
cursor = self.connection.cursor()
cursor.copy_expert("COPY %s FROM STDIN %s"%(self.tableName, fmt), inFile)
cursor.close()
return self
[docs] def copyOut(self, outFile, binary=True):
fmt = " WITH BINARY" if binary else ""
cursor = self.connection.cursor()
cursor.copy_expert("COPY %s TO STDOUT %s"%(self.tableName, fmt), outFile)
cursor.close()
return self
[docs] def ensureSchema(self):
"""creates self's schema if necessary.
"""
if self.tableDef.temporary: # these never are in a schema
return
schemaName = self.tableDef.rd.schema
if not self.schemaExists(schemaName):
self.connection.execute("CREATE SCHEMA %(schemaName)s"%locals())
self.setSchemaPrivileges(self.tableDef.rd)
return self
[docs] def ensureOnDiskMatches(self):
"""raises a DataError if the on-disk structure of a table doesn't
match DaCHS' idea of it.
If the table doesn't exist on disk, this will raise a NotFoundError.
"""
dbCols = self.getColumnsFromDB(self.tableDef.getQName())
mismatches = []
if len(self.tableDef.columns)<len(dbCols):
mismatches.append("extra columns in DB (%s)"%", ".join(
name for name, _ in dbCols[len(self.tableDef.columns):]))
for index, col in enumerate(self.tableDef):
try:
name, type = dbCols[index]
except IndexError:
mismatches.append("from column %s on: No matches in DB"%col.name)
break
if isinstance(col.name, utils.QuotedName):
if col.name.name!=name:
mismatches.append("mismatching delimited name of %s (DB: %s)"%(
col.name, name))
elif col.name.lower()!=name:
mismatches.append("mismatching name of %s (DB: %s)"%(col.name, name))
continue
try:
base.sqltypeToPgValidator(col.type)(type)
except TypeError as ex:
mismatches.append("type mismatch in column %s (%s)"%(
col.name, utils.safe_str(ex)))
if mismatches:
raise base.DataError("Table %s: %s"%(
self.tableDef.getQName(), "; ".join(mismatches)))
[docs]class DBTable(DBMethodsMixin, table.BaseTable, MetaTableMixin):
"""An interface to a table in the database.
These are usually created using ``api.TableForDef(tableDef)`` with a
table definition obtained, e.g., from an RD, saying ``onDisk=True``.
When constructing a DBTable, it will be created if necessary (unless
``create=False`` is passed), but indices or primary keys keys will only be
created on a call to ``importFinished``.
The constructor does not check if the schema of the table on disk matches
the tableDef. If the two diverge, all kinds of failures are conceivable;
use ``dachs val -c`` to make sure on-disk structure match the RDs.
You can pass a ``nometa`` boolean kw argument to suppress entering the table
into the ``dc_tables`` table.
You can pass an exclusive boolean kw argument; if you do, the
``iterQuery`` (and possibly similar methods in the future) method
will block concurrent writes to the selected rows ("FOR UPDATE")
as long as the transaction is active.
DbTables will run preCreation, preIndex, postCreation, and beforeDrop
scripts, both from the table definition and the make they are being
created from. No scripts except beforeDrop are run when an existing table is
operated on from an updating dd.
The main attributes (with API guarantees) include:
* tableDef -- the defining tableDef
* getFeeder() -- returns a function you can call with rowdicts to
insert them into the table.
* importFinished(nImported) -- must be called after you've fed all rows when
importing data; pass the number of rows fed in.
* drop() -- drops the table in the database
* recreate() -- drops the table and generates a new empty one.
* getTableForQuery(...) -- returns a Table instance built from a query
over this table (you probably to use ``conn.query*`` and
``td.getSimpleQuery`` instead).
"""
def __init__(self, tableDef, **kwargs):
self.connection = kwargs.pop("connection", None)
if self.connection is None:
raise base.ReportableError("DBTable built without connection.",
hint="In pre-1.0 DaCHS, database tables could automatically"
" open and manage connections. This turned out to be much"
" more trouble than it was worth. See develNotes for how"
" to do things today.")
self.suppressIndex = kwargs.pop("suppressIndex", False)
self.tableUpdates = kwargs.pop("tableUpdates", False)
self.exclusive = kwargs.pop("exclusive", False)
self.updating = kwargs.pop("updating", False)
self.commitAfterMeta = kwargs.pop("commitAfterMeta", False)
table.BaseTable.__init__(self, tableDef, **kwargs)
if self.tableDef.rd is None and not self.tableDef.temporary:
raise base.ReportableError("TableDefs without resource descriptor"
" cannot be used to access database tables")
self.tableName = self.tableDef.getQName()
self.nometa = (kwargs.get("nometa", False)
or self.tableDef.temporary or tableDef.rd.schema=="dc")
self.newlyCreated = False
if kwargs.get("create", False):
self.createIfNecessary()
if self.tableUpdates:
self.addCommand = "UPDATE %s SET %s WHERE %s"%(
self.tableName,
", ".join("%s=%%(%s)s"%(f.name, f.key)
for f in self.tableDef),
" AND ".join("%s=%%(%s)s"%(n, n) for n in self.tableDef.primary))
else:
self.addCommand = ("INSERT INTO %s (%s) VALUES (%s)"%(
self.tableName,
", ".join([str(c.name) for c in self.tableDef.columns]),
", ".join(["%%(%s)s"%c.key for c in self.tableDef.columns])))
if "rows" in kwargs:
self.feedRows(kwargs["rows"])
def __iter__(self):
# Do we want named cursors by default here?
cursor = self.connection.cursor()
cursor.execute("SELECT * FROM %s"%self.tableName)
for row in cursor:
yield self.tableDef.makeRowFromTuple(row)
cursor.close()
def __len__(self):
return list(self.connection.query("select count(*) from %s"%
self.tableDef.getQName()))[0][0]
[docs] def exists(self):
return self.getTableType(self.tableDef.getQName()) is not None
[docs] def getFeeder(self, **kwargs):
if "notify" not in kwargs:
kwargs["notify"] = not self.tableDef.system or not self.tableDef.onDisk
return _Feeder(self, self.addCommand, **kwargs)
[docs] def importFinished(self, nAffected):
if self.newlyCreated:
self.runScripts("preIndex")
self.makeIndices()
self.runScripts("postCreation")
else:
base.ui.notifyDBTableModified(self.tableName)
if nAffected>100:
# don't run analyze when there's just a few records changed,
# as when uploading to products.
self.connection.execute("ANALYZE %s"%self.tableName)
return self
[docs] def importFailed(self, *excInfo):
# rollback is handled by the feeder.
return False
[docs] def feedRows(self, rows):
"""Feeds a sequence of rows to the table.
The method returns the number of rows affected. Exceptions are
handed through upstream, but the connection is rolled back.
"""
with self.getFeeder() as feeder:
for r in rows:
feeder.add(r)
return feeder.nAffected
[docs] def addRow(self, row):
"""adds a row to the table.
Use this only to add one or two rows, otherwise go for getFeeder.
"""
try:
self.connection.execute(self.addCommand, row)
except sqlsupport.IntegrityError:
raise base.ui.logOldExc(
base.ValidationError("Row %s cannot be added since it clashes"
" with an existing record on the primary key"%row, row=row,
colName="unknown"))
[docs] def getRow(self, *key):
"""returns the row with the primary key key from the table.
This will raise a DataError on tables without primaries.
"""
if not self.tableDef.primary:
raise base.DataError("Table %s has no primary key and thus does"
" not support getRow"%self.tableName)
res = list(self.iterQuery(self.tableDef,
" AND ".join("%s=%%(%s)s"%(n,n) for n in self.tableDef.primary),
pars=dict(list(zip(self.tableDef.primary, key)))))
if not res:
raise KeyError(key)
return res[0]
[docs] def createUniquenessRules(self):
# these two drop rules and triggers created by old DaCHS
# versions. I'd say they can be removed ~ 2025, when all
# tables with such triggers/rules have beeen removed.
self.connection.execute('DROP TRIGGER IF EXISTS "dropOld_%s"'
' ON %s'%(self.tableName, self.tableName))
self.connection.execute('DROP RULE IF EXISTS updatePolicy'
' ON %s'%(self.tableName))
# this is the new trigger used by all policies
self.connection.execute('DROP TRIGGER IF EXISTS "dupe_policy_%s"'
' ON %s'%(self.tableName, self.tableName))
if not self.tableDef.dupePolicy:
return
def getMatchCondition():
return " AND ".join("%s=new.%s"%(n,n) for n in self.tableDef.primary)
# uniqueness checking becomes really slow if primary key definition
# is delayed until index creation (as usual), so let's do it now.
self._definePrimaryKey()
args = {
"table": self.tableName,
"matchCond": getMatchCondition(),
"colNames": ",".join(str(c.name) for c in self.tableDef),
"mungedColNames": ",".join(
"munged_vals."+str(c.name) for c in self.tableDef)}
if self.tableDef.dupePolicy=="drop":
self.connection.execute("""
CREATE OR REPLACE FUNCTION
"handle_dupe_%(table)s"()
RETURNS trigger AS $body$
BEGIN
IF (EXISTS(SELECT 1 FROM %(table)s WHERE %(matchCond)s)) THEN
RETURN NULL;
END IF;
RETURN NEW;
END
$body$ LANGUAGE plpgsql"""%args)
elif self.tableDef.dupePolicy=="check":
# This one is tricky: if the inserted column is *different*,
# the rule does not fire and we get a pkey violation.
# Furthermore, special NULL handling is required -- we
# do not check columns that have NULLs in new or old.
self.connection.execute("""
CREATE OR REPLACE FUNCTION
"handle_dupe_%(table)s"()
RETURNS trigger AS $body$
BEGIN
SELECT * INTO OLD FROM %(table)s WHERE %(matchCond)s;
IF NOT FOUND THEN
RETURN NEW;
ELSEIF (OLD IS NOT DISTINCT FROM NEW) THEN
RETURN NULL;
ELSE
RAISE 'Overwrite attempt with different tuple with check policy %%%% %%%%.', OLD, NEW;
END IF;
RETURN NEW;
END
$body$ LANGUAGE plpgsql"""%args)
elif self.tableDef.dupePolicy=="dropOld":
self.connection.execute("""
CREATE OR REPLACE FUNCTION "handle_dupe_%(table)s"()
RETURNS trigger AS $body$
BEGIN
IF (EXISTS(SELECT 1 FROM %(table)s WHERE %(matchCond)s)) THEN
DELETE FROM %(table)s WHERE %(matchCond)s;
END IF;
RETURN NEW;
END $body$ LANGUAGE plpgsql"""%args)
elif self.tableDef.dupePolicy=="overwrite":
self.connection.execute("""
CREATE OR REPLACE FUNCTION "handle_dupe_%(table)s"()
RETURNS trigger AS $body$
DECLARE
munged_vals RECORD;
BEGIN
munged_vals = NEW;
IF (EXISTS(SELECT 1 FROM %(table)s WHERE %(matchCond)s)) THEN
UPDATE %(table)s SET (%(colNames)s)=(%(mungedColNames)s)
WHERE %(matchCond)s;
RETURN NULL;
END IF;
RETURN NEW;
END $body$ LANGUAGE plpgsql"""%args)
else:
raise base.DataError("Invalid dupePolicy: %s"%self.tableDef.dupePolicy)
self.connection.execute(
'CREATE TRIGGER "dupe_policy_%(table)s" BEFORE INSERT'
' ON %(table)s FOR EACH ROW EXECUTE'
' PROCEDURE "handle_dupe_%(table)s"()'%args)
[docs] def setStatisticsTargets(self):
for col in self.tableDef:
if col.hasProperty("statisticsTarget"):
target = int(col.getProperty("statisticsTarget"))
self.connection.execute("ALTER TABLE %s ALTER COLUMN %s"
" SET STATISTICS %d"%(self.tableName, col.name, target))
[docs] def create(self):
base.ui.notifyDebug("Create DB Table %s"%self.tableName)
self.ensureSchema()
self.newlyCreated = True
self.runScripts("preCreation")
self.connection.execute(self.tableDef.getDDL())
return self.updateMeta()
[docs] def createIfNecessary(self):
if not self.exists():
self.create()
return self
def _iterDerivedViews(self):
"""yields view that contain the current table.
This is in particular used when dropping the table (rather than rely on
cascade) in order to give view's dropping scripts a chance to run.
"""
depNames = set(r[0] for r in self.connection.query("""
SELECT DISTINCT v.oid::regclass as table_name
FROM pg_depend AS d
JOIN pg_rewrite AS r ON r.oid = d.objid
JOIN pg_class AS v ON v.oid = r.ev_class
WHERE v.relkind = 'v'
AND d.classid = 'pg_rewrite'::regclass
AND d.refclassid = 'pg_class'::regclass
AND d.deptype = 'n'
AND refobjid=%(qname)s::regclass""",
{"qname": self.tableDef.getQName()}))
if not depNames:
return
myId = self.tableDef.getFullId()
for depId in [r[1]+"#"+r[0].split(".")[-1]
for r in self.connection.query("""SELECT tablename, sourcerd
FROM dc.tablemeta
WHERE tablename IN %(depNames)s""",
locals())]:
if depId==myId:
# can't be bothered to figure out why a view apparently
# depends on itself by the logic above
continue
try:
yield base.resolveCrossId(depId)
except base.NotFoundError:
base.ui.notifyError("Would have liked to drop %s before"
" dropping %s, but found its definition gone."%(
depId, self.tableDef.getQName()))
[docs] def drop(self, seenIds=()):
"""drops the table.
This will recurse into dependent objects.
"""
self.runScripts("beforeDrop")
if self.exists():
if seenIds==():
seenIds = set()
# if there are any dependent views (known), drop them in a controlled
# fashion becore CASCADE hits
for viewDef in self._iterDerivedViews():
if id(viewDef) in seenIds:
continue
seenIds.add(id(viewDef))
View(
viewDef,
connection=self.connection,
create=False).drop(seenIds)
self.dropTable(self.tableDef.getQName(), cascade=True)
if not self.nometa:
self.cleanFromMeta()
return self
[docs] def recreate(self):
self.drop()
self.create()
return self
[docs] def query(self, query, data={}):
"""runs query within this table's connection.
query is macro-expanded within the table definition (i.e., you can,
e.g., write \qName to obtain the table's qualified name).
Don't use this in new code; use t.connection.query or execute
as required.
"""
return DBMethodsMixin.query(self, self.expand(query), data)
[docs] def getSelectClause(self, resultTableDef):
"""returns the select clause to come up with resultTableDef.
"""
parts = []
for of in resultTableDef:
select = getattr(of, "select", None)
if select:
parts.append("%s AS %s"%(select, of.name))
else:
parts.append(of.name)
return ", ".join(parts)
[docs] def getQuery(self, resultTableDef, fragment, pars=None,
distinct=False, limits=None, groupBy=None, samplePercent=None):
"""returns a result table definition, query string and a parameters
dictionary for a query against this table.
See getTableForQuery for the meaning of the arguments.
"""
if pars is None:
pars = {}
if not isinstance(resultTableDef, rscdef.TableDef):
resultTableDef = base.makeStruct(rscdef.TableDef,
id="iterQuery", columns=resultTableDef)
query = ["SELECT "]
if distinct:
query.append("DISTINCT ")
query.append(self.getSelectClause(resultTableDef)+" ")
query.append("FROM %s "%self.tableName)
if samplePercent:
query.append("TABLESAMPLE SYSTEM (%f)"%samplePercent)
if fragment and fragment.strip():
query.append("WHERE %s "%fragment)
if groupBy:
query.append("GROUP BY %s "%groupBy)
if limits:
query.append(limits[0]+" ")
pars.update(limits[1])
if self.exclusive:
query.append("FOR UPDATE ")
return resultTableDef, "".join(query), pars
[docs] def iterQuery(self, resultTableDef=None, fragment="", pars=None,
distinct=False, limits=None, groupBy=None):
"""like getTableForQuery, except that an iterator over the
result rows is returned.
(there is no advantage in using this as we will pull the entire
thing in memory anyway; use qtables if you need streaming).
"""
for row in self.getTableForQuery(resultTableDef, fragment,
pars, distinct, limits, groupBy).rows:
yield row
[docs] def getTableForQuery(self, resultTableDef=None, fragment="", pars=None,
distinct=False, limits=None, groupBy=None, samplePercent=None):
"""returns a Table instance for a query on this table.
resultTableDef is a TableDef with svc.OutputField columns
(rscdef.Column instances will do), or possibly just a list
of Columns. Fragment is empty or an SQL
where-clause with
dictionary placeholders, pars is the dictionary filling
fragment, distinct, if True, adds a distinct clause,
and limits, if given, is a pair of an SQL string to be
appended to the SELECT clause and parameters filling it.
queryMeta.asSQL returns what you need here.
pars may be mutated in the process.
"""
if resultTableDef is None:
resultTableDef = self.tableDef.copy(None)
resultTableDef, query, pars = self.getQuery(
resultTableDef,
fragment,
pars=pars,
distinct=distinct,
limits=limits,
groupBy=groupBy,
samplePercent=samplePercent)
return table.InMemoryTable(resultTableDef,
rows=[resultTableDef.makeRowFromTuple(tupRow)
for tupRow in self.connection.query(query, pars)])
[docs] def runScripts(self, phase, **kwargs):
"""runs scripts from both the tableDef and the make.
The reason there's not a single place is mainly historical; on the
other hand, one day postCreation scripts only run when a table is
created in a special way might come in handy, so I'll not take away
table scripts from makes.
"""
if self.updating:
if (not self.newlyCreated
and phase in {"preImport", "preIndex", "postCreation"}):
return
self.tableDef.getRunner()(self, phase, **kwargs)
if self.makeRunning:
self.makeRunning.getRunner()(self, phase, **kwargs)
[docs]class View(DBTable):
"""is a view, i.e., a table in the database you can't add to.
Strictly, I should derive both View and DBTable from a common
base, but that's currently not worth the effort.
Technically, Views are DBTables with a non-None viewStatement
(this is what TableForDef checks for when deciding whether to
construct a DBTable or a View). You can get a feeder for them,
but trying to actually feed anything will raise a DataError.
On import, views only run postCreation scripts; we assume everything
else (preIndex, postIndex, preImport, newSource, etc) has run
in the contributing tables. Materialised views *will* create indices,
however.
"""
def __init__(self, *args, **kwargs):
DBTable.__init__(self, *args, **kwargs)
del self.addCommand
[docs] def addRow(self, row):
raise base.DataError("You cannot add data to views")
feedRows = addRow
[docs] def setStatisticsTargets(self):
# no statistics on views
# (we don't even warn since we've probably got this from the table
# where it's going to happen)
pass
[docs] def getFeeder(self, **kwargs):
# all kwargs ignored since the feeder will raise an exception on any
# attempts to feed anyway.
return _RaisingFeeder(self, None)
[docs] def create(self):
base.ui.notifyDebug("Create DB View %s"%self.tableName)
self.newlyCreated = True
self.ensureSchema()
try:
self.connection.execute(
self.tableDef.expand(self.tableDef.viewStatement))
return self.updateMeta()
except base.DBError as msg:
raise base.ReportableError("View statement of table at %s bad."
" Postgres error message: %s"%(
self.tableDef.getSourcePosition(),
msg))
[docs] def makeIndices(self):
if self.getTableType(self.tableDef.getQName())=="MATERIALIZED VIEW":
super().makeIndices()
[docs] def importFinished(self, nImported):
self.makeIndices()
self.runScripts("postCreation")
return self