Source code for gavo.base.sqlmunge

"""
Helpers for building SQL expressions.

Some of this code is concerned with SQL factories.  These are functions
with the signature::

	func(field, val, outPars) -> fragment

outPars is a dictionary that is used to transmit literal values into SQL.
The result must be an SQL boolean expression for embedding into a WHERE clause
(use None to signal no constraint).  Field is the field for which the
expression is being generated.

The factories currently are never called when val is a sequence; there's
special hard-coded behaviour for that in getSQLFactory.

To enter values in outPars, use getSQLKey.  Its docstring contains
an example that shows how that would look like.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from gavo import utils
from gavo.stc import mjdToDateTime

plusInfinity = float("Inf")
minusInfinity = float("-Inf")


[docs]def joinOperatorExpr(operator, operands): """filters empty operands and joins the rest using operator. The function returns an expression string or None for the empty expression. """ operands = [_f for _f in operands if _f] if not operands: return None elif len(operands)==1: return operands[0] else: return operator.join([" (%s) "%op for op in operands]).strip()
[docs]def getSQLKey(key, value, sqlPars): """adds value to sqlPars and returns a key for inclusion in a SQL query. This function is used to build parameter dictionaries for SQL queries, avoiding overwriting parameters with accidental name clashes. key usually a string matching the identifier pattern or a QuotedName (the latter are going to be horribly mogrified) As an extra service, if value is a list, it is turned into a set (rather than the default, which would be an array). We don't believe there's a great need to match against arrays. If you must match against arrays, use numpy arrays. >>> sqlPars = {} >>> getSQLKey("foo", 13, sqlPars) 'foo0' >>> getSQLKey("foo", 14, sqlPars) 'foo1' >>> getSQLKey("foo", 13, sqlPars) 'foo0' >>> sqlPars["foo0"], sqlPars["foo1"]; sqlPars = {} (13, 14) >>> "WHERE foo<%%(%s)s OR foo>%%(%s)s"%(getSQLKey("foo", 1, sqlPars), ... getSQLKey("foo", 15, sqlPars)) 'WHERE foo<%(foo0)s OR foo>%(foo1)s' >>> getSQLKey(utils.QuotedName("-x-"), "x", sqlPars) 'id2dx2d0' """ if isinstance(key, utils.QuotedName): key = key.makeIdentifier() if isinstance(value, list): value = frozenset(value) ct = 0 while True: dataKey = "%s%d"%(key, ct) if dataKey not in sqlPars or sqlPars[dataKey]==value: break ct += 1 sqlPars[dataKey] = value return dataKey
_REGISTRED_SQL_FACTORIES = {}
[docs]def registerSQLFactory(type, factory): """registers factory as an SQL factory for the type type (a string). """ _REGISTRED_SQL_FACTORIES[type] = factory
def _getSQLForSequence(field, val, sqlPars): if len(val)==0 or (len(val)==1 and val[0] is None): return "" return "%s IN %%(%s)s"%(field.name, getSQLKey(field.name, set(val), sqlPars)) def _convertIfFinite(val, converter): if minusInfinity<val<plusInfinity: return converter(val) return val def _getSQLForInterval(field, val, sqlPars): """returns SQL for DALI intervals. This presumes that val is a 2-array of numbers and will return an empty condition otherwise. """ if len(val)!=2: return "" if field.hasProperty("database-column-is-date"): val = [_convertIfFinite(v, mjdToDateTime) for v in val] if val[1]==plusInfinity: return "%s > %%(%s)s"%(field.name, getSQLKey(field.name, val[0], sqlPars)) elif val[0]==minusInfinity: return "%s < %%(%s)s"%(field.name, getSQLKey(field.name, val[1], sqlPars)) else: return "%s BETWEEN %%(%s)s AND %%(%s)s"%(field.name, getSQLKey(field.name, val[0], sqlPars), getSQLKey(field.name, val[1], sqlPars)) def _getSQLForSimple(field, val, sqlPars): return "%s=%%(%s)s"%(field.name, getSQLKey(field.name, val, sqlPars)) def _getSQLFactory(field, value): """returns an SQL factory for matching field's values against value. """ if field.xtype=="interval": return _getSQLForInterval elif isinstance(value, (list, tuple)): return _getSQLForSequence elif field.type in _REGISTRED_SQL_FACTORIES: return _REGISTRED_SQL_FACTORIES[field.type] else: return _getSQLForSimple
[docs]def getSQLForField(field, inPars, sqlPars): """returns an SQL fragment for a column-like thing. This will be empty if no input in inPars is present. If it is, (a) new key(s) will be left in sqlPars. getSQLForField defines the default behaviour; in DBCore condDescs, it can be overridden using phrase makers. inPars is supposed to be "typed"; we do not catch general parse errors here. """ val = inPars.get(field.name) if val is None: return None if isinstance(val, (list, set, tuple)) and len(val)==1: val = val[0] factory = _getSQLFactory(field, val) return factory(field, val, sqlPars)
def _test(): import doctest doctest.testmod() if __name__=="__main__": _test()