Source code for gavo.grammars.odbcgrammar
"""
A Grammar feeding from an odbc connection.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from psycopg2.extensions import adapt
from gavo import base
from gavo import rscdef
from gavo import utils
from gavo.grammars.common import Grammar, RowIterator
pyodbc = utils.DeferredImport("pyodbc", "import pyodbc")
[docs]class QueryGenerator(rscdef.ProcApp):
"""A generator of ODBC queries.
This is is mainly useful when doing ODBC queries to incrementally
havest some external resource.
The current ODBC iterator will be available as ``self``.
The procedures also see a function escapeSQL(val) that returns val
as a SQL literal (actually, it's psycopg2's adapt at the moment).
This is intended to be used somewhat like this with a monotonously
increasing column insertion_time::
<makeQuery>
<code>
# find to until when we have data locally
try:
with base.getTableConn() as conn:
localMax = next(conn.query(
"SELECT MAX(insertion_time) FROM \schema.main"))[0]
fragment = " WHERE insertion_time>{}".format(
sqlEscape(localMax))
except base.DBError as msg:
base.ui.notifyWarning(f"{msg} while harvesting: full re-harvest")
fragment = ""
return f"SELECT * FROM remote_table{fragment}"
</code>
</makeQuery>
"""
name_ = "makeQuery"
requiredType = "makeQuery"
formalArgs = "self"
additionalNamesForProcs = {
"escapeSQL": adapt,
}
[docs]class ODBCIterator(RowIterator):
def _iterRows(self):
with open(self.sourceToken) as f:
accessToken = f.read().strip()
conn = pyodbc.connect(accessToken)
if self.grammar.query:
remoteQuery = self.grammar.query
else:
remoteQuery = self.grammar.makeQuery.compile()(self)
cursor = conn.cursor()
cursor.execute(remoteQuery)
keys = [d[0] for d in cursor.description]
for row in cursor:
yield dict(list(zip(keys, row)))
[docs]class ODBCGrammar(Grammar):
"""A grammar that feeds from a remote database.
This works as a sort of poor man's foreign data wrapper: you pull
data from a remote database now and then, mogrifying it into whatever
format you want locally.
This expects files containing pyodbc connection strings as sources,
so you'll normally just have one source. Having the credentials
externally helps keeping RDs using this safe for public version control.
An example for an ODBC connection string::
DRIVER={SQL Server};SERVER=localhost;DATABASE=testdb;UID=me;PWD=pass
See also http://www.connectionstrings.com/
This will only work if pyodbc (debian: python3-pyodbc) is installed.
Additionally, you will have to install the odbc driver corresponding
to your source database (e.g., odbc-postgresql).
"""
name_ = "odbcGrammar"
_query = base.UnicodeAttribute("query",
description="The query to run on the remote server. The keys of"
" the grammar will be the names of the result columns.",
default=base.NotGiven,
copyable=True)
_makeQuery = base.StructAttribute("makeQuery",
childFactory=QueryGenerator,
default=base.NotGiven,
description="Code returning the query to execute on the remote"
" system.")
rowIterator = ODBCIterator
[docs] def validate(self):
super().validate()
if self.query and self.makeQuery:
raise base.StructureError("Cannot give both query and makeQuery"
" in an odbcGrammar.")
if not (self.query or self.makeQuery):
raise base.StructureError("Need to give at least one of query"
" and makeQuery in an odbcGrammar.")
[docs] def completeElement(self, ctx):
if ctx.restricted and self.makeQuery:
raise base.RestrictedElement("makeQuery")
super().completeElement(ctx)