Source code for gavo.rsc.qtable
"""
A table representing a query.
This is mainly for streaming applications. The table represents
a DB query result. All you can do with the data itself is iterate over
the rows. The metadata is usable as with any other table.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from gavo import base
from gavo import rscdef
from gavo.rsc import dbtable
from gavo.rsc import table
[docs]class QueryTable(table.BaseTable, dbtable.DBMethodsMixin):
"""QueryTables are constructed with a table definition and a DB query
feeding this table definition.
A QueryTable must be constructed with a transactional connection (in
sqlsupport terms: Writable). If you pass autoClose=True, it will close this
connection after the data is delivered.
This funky semantics is for the benefit of taprunner; it needs a
connection up front for uploads.
There's an alternative constructor allowing "quick" construction of
the result table (fromColumns).
"""
connection = None
def __init__(self, tableDef, query, connection, **kwargs):
self.connection = connection
self.autoClose = kwargs.pop("autoClose", False)
if "rows" in kwargs:
raise base.ReportableError("QueryTables cannot be constructed"
" with rows.")
self.query = query
table.BaseTable.__init__(self, tableDef, connection=connection,
**kwargs)
self._parametersToRestore = []
[docs] @classmethod
def fromColumns(cls, colSpec, query, connection, **kwargs):
"""returns a QueryTable object for query, where the result table is
inferred from colSpec.
colSpec is a sequence consisting of either dictionaries with constructor
arguments to rscdef.Column or complete objects suitable as rscdef.Column
objects; further kwargs are passed on the the QueryTable's constructor.
"""
columns = []
for c in colSpec:
if isinstance(c, dict):
columns.append(base.makeStruct(rscdef.Column, **c))
else:
columns.append(c)
return cls(base.makeStruct(rscdef.TableDef, columns=columns),
query, connection=connection, **kwargs)
def __iter__(self):
"""actually runs the query and returns rows (dictionaries).
You can only iterate once. At exhaustion, the connection will
be closed.
"""
if self.connection is None:
raise base.ReportableError("QueryTable already exhausted.")
nRows = 0
# We want to enable parallel execution for queries where it's
# worth it, but we'd like to stream your giant select *-type
# queries. First gung-ho criterion: If there's a GROUP (sc. BY)
# in query,
# second gung-ho criterion: obscore queries.
loQ = self.query.lower()
if "group" in loQ or "ivoa.obscore" in loQ:
cursor = self.connection.cursor()
else:
cursor = self.connection.cursor("cursor"+hex(id(self)))
cursor.execute(self.query)
try:
while True:
nextRows = cursor.fetchmany(100000)
if not nextRows:
break
for row in nextRows:
nRows += 1
yield self.tableDef.makeRowFromTuple(row)
cursor.close()
# overflowLimit is usually set by the TAP machinery; we don't
# want to depend on it, though.
if getattr(self.tableDef, "overflowLimit", None)==nRows:
self.setMeta("_queryStatus", "OVERFLOW")
finally:
self.cleanup()
def __len__(self):
return None
def _restoreParameters(self):
"""helps configureOnClose.
"""
if self._parametersToRestore and self.connection is not None:
try:
self.connection.configure(self._parametersToRestore)
except base.DBError:
# the connection might be in an error state. Roll back and
# try again. If that fails again, things are really bad
# and nobody will be interested in the fact that parameter
# restoration has failed, so fail silently.
try:
self.connection.rollback()
self.connection.configure(self._parametersToRestore)
except base.DBError: # see above
pass
[docs] def cleanup(self):
if getattr(self, "connection", None) is not None:
if self.autoClose:
try:
self.connection.close()
except base.DBError:
# Connection already closed or similarly ignorable
pass
else:
self._restoreParameters()
self.connection = None
[docs] def getPlan(self):
"""returns a parsed query plan for the current query.
After you use this method, the iterator is exhausted and the
connection will be closed.
"""
cursor = self.connection.cursor()
cursor.execute("EXPLAIN "+self.query)
res = "\n".join(s[0] for s in cursor)
self.cleanup()
return res
def __del__(self):
self.cleanup()