"""
Description and definition of tables.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import functools
import re
from gavo import adql
from gavo import base
from gavo import dm
from gavo import stc
from gavo import utils
from gavo.rscdef import column
from gavo.rscdef import common
from gavo.rscdef import group
from gavo.rscdef import mixins
from gavo.rscdef import rmkfuncs
from gavo.rscdef import scripting
MS = base.makeStruct
[docs]@functools.total_ordering
class DBIndex(base.Structure):
"""A description of an index in the database.
In real databases, indices may be fairly complex things; still, the
most common usage here will be to just index a single column::
<index columns="my_col"/>
To index over functions, use the character content; you will have
to put parentheses when using expressions. An explicit specification
of the index expression is also necessary to allow RE pattern matches using
indices in character columns (outside of the C locale). That would be::
<index columns="uri">uri text_pattern_ops</index>
(you still want to give columns so the metadata engine is aware of the
index). See section "Operator Classes and Operator Families" in
the Postgres documentation for details.
For pgsphere-valued columns, you at the time of writing need to specify
the method::
<index columns="coverage" method="GIST"/>
To define q3c indices, use the ``//scs#q3cindex`` mixin; if you're
devious enough to require something more flexible, have a look at
that mixin's definition.
If indexed columns take part in a DaCHS-defined view, DaCHS will not
notice. You should still declare the indices so users will see them
in the metadata; writing::
<index columns="col1, col2, col3"/>
is sufficient for that.
"""
name_ = "index"
_name = base.UnicodeAttribute("name", default=base.Undefined,
description="Name of the index. Defaults to something computed from"
" columns; the name of the parent table will be prepended in the DB."
" The default will *not* work if you have multiple indices on one"
" set of columns.",
copyable=True)
_columns = base.StringListAttribute("columns", description=
"Table columns taking part in the index (must be given even if there"
" is an expression building the index and mention all columns taking"
" part in the index generated by it", copyable=True)
_cluster = base.BooleanAttribute("cluster", default=False,
description="Cluster the table according to this index?",
copyable=True)
_code = base.DataContent(copyable=True, description=
"Raw SQL specifying an expression the table should be"
" indexed for. If not given, the expression will be generated from"
" columns (which is what you usually want).")
_method = base.UnicodeAttribute("method", default=None,
description="The indexing method, like an index type. In the 8.x,"
" series of postgres, you need to set method=GIST for indices"
" over pgsphere columns; otherwise, you should not need to"
" worry about this.", copyable=True)
_kind = base.UnicodeAttribute("kind", default="straight",
description="A tag on the index; this is used by the ADQL translation"
" engine in some situations. Consider it a DaCHS implementation"
" detail and ignore it for now.",
copyable=True)
_metaOnly = base.UnicodeAttribute("metaOnly", default=False,
description="Do not tell the database to actually create the index,"
" just declare it in the metadata. This is for when you want"
" to tell users of the ADQL engine that columns in, say, a view"
" are indexed when the database cannot actually create the"
" index but the underlying tables provide one.",
copyable=True)
_options = base.ListOfAtomsAttribute("options",
itemAttD=base.UnicodeAttribute("option"),
default=[],
description="Index modifiers. For DaCHS, this is free text,"
" except that DaCHS will order INCLUDE, WITH, TABLESPACE, and"
" WHERE clauses it recognises to yield a syntactically correct"
" postgres statement.")
[docs] def completeElement(self, ctx):
if getattr(ctx, "restricted", False) and (
self.content_ or self.options):
raise base.RestrictedElement("index", hint="Free-form SQL on indices"
" is not allowed in restricted mode")
super().completeElement(ctx)
if not self.columns and not self.content_:
raise base.StructureError("Index without columns is verboten.")
if self.name is base.Undefined:
self.name = "%s"%(re.sub("[^\w]+", "_", "_".join(self.columns)))
if not self.content_:
self.content_ = "%s"%",".join(self.columns)
# indices are sorted by table; this should put them in an order
# optimal for importing. Right now, we only make sure indices
# with CLUSTER go as early as possible.
def __eq__(self, other):
return id(self)==id(other)
def __lt__(self, other):
othercluster = getattr(other, "cluster", None)
if ((self.cluster and othercluster)
or (not self.cluster and not othercluster)):
return id(self)<id(other)
else:
if self.cluster:
return True
else:
return False
# a mapping of index options prefixes to relative positions in the create
# index statement.
_optionPrefixMap = {
"include": 1,
"with": 2,
"tablespace": 3,
"where": 4}
def _collateOptions(self):
"""returns values of options sorted such that they'll hopefully
work in a CREATE INDEX statement.
This does not perform deeper inspection and just looks at the start of
the option fragments.
Anything we don't recognise is sorted to the front.
"""
parts = []
for phrase in self.options:
relPos = self._optionPrefixMap.get(
re.search("\w+", phrase).group(0).lower(), 0)
parts.append((relPos, phrase))
parts.sort()
return [p[1] for p in parts]
[docs] def iterCode(self):
destTableName = self.parent.getQName()
usingClause = ""
if self.method is not None:
usingClause = " USING %s"%self.method
options = ""
if self.options:
options = " "+" ".join(self._collateOptions())
yield self.parent.expand(
f"CREATE INDEX {self.dbname} ON {destTableName}{usingClause}"
f" ({self.content_}){options}")
if self.cluster:
yield self.parent.expand(
"CLUSTER %s ON %s"%(self.dbname, destTableName))
[docs] def create(self, querier):
"""creates the index on the parent table if necessary.
querier is an object mixing in the DBMethodsMixin, usually the
DBTable object the index should be created on.
"""
if not querier.hasIndex(self.parent.getQName(), self.dbname):
if not self.parent.system:
base.ui.notifyIndexCreation(
self.parent.expand(self.dbname))
for statement in self.iterCode():
querier.connection.execute(statement)
[docs] def drop(self, querier):
"""drops the index if it exists.
querier is an object mixing in the DBMethodsMixin, usually the
DBTable object the index possibly exists on.
"""
iName = self.parent.expand(self.dbname)
if querier.hasIndex(self.parent.getQName(), iName):
querier.connection.execute(
"DROP INDEX %s.%s"%(self.parent.rd.schema, iName))
@property
def dbname(self):
return "%s_%s"%(self.parent.id, self.name)
[docs]class ColumnTupleAttribute(base.StringListAttribute):
"""is a tuple of column names.
In a validate method, it checks that the names actually are in parent's
fields.
"""
[docs] def iterParentMethods(self):
"""adds a getPrimaryIn method to the parent class.
This function will return the value of the primary key in a row
passed. The whole thing is a bit dense in that I want to compile
that method to avoid having to loop every time it is called. This
compilation is done in a descriptor -- ah well, probably it's a waste
of time anyway.
"""
def makeGetPrimaryFunction(instance):
funcSrc = ('def getPrimaryIn(row):\n'
' return (%s)')%(" ".join(['row["%s"],'%name
for name in getattr(instance, self.name_)]))
return utils.compileFunction(funcSrc, "getPrimaryIn")
def getPrimaryIn(self, row):
try:
return self.__getPrimaryIn(row)
except AttributeError:
self.__getPrimaryIn = makeGetPrimaryFunction(self)
return self.__getPrimaryIn(row)
yield "getPrimaryIn", getPrimaryIn
[docs] def validate(self, parent):
for colName in getattr(parent, self.name_):
try:
parent.getColumnByName(colName)
except base.NotFoundError:
raise base.ui.logOldExc(base.LiteralParseError(self.name_, colName,
hint="Column tuple component %s is not in parent table"%colName))
[docs]class ForeignKey(base.Structure):
"""A description of a foreign key relation between this table and another
one.
"""
name_ = "foreignKey"
_inTable = base.ReferenceAttribute("inTable", default=base.Undefined,
description="Reference to the table the foreign key points to.",
copyable=True)
_source = base.UnicodeAttribute("source", default=base.Undefined,
description="Comma-separated list of local columns corresponding"
" to the foreign key. No sanity checks are performed here.",
copyable=True)
_dest = base.UnicodeAttribute("dest", default=base.NotGiven,
description="Comma-separated list of columns in the target table"
" belonging to its key. No checks for their existence, uniqueness,"
" etc. are done here. If not given, defaults to source.")
_metaOnly = base.BooleanAttribute("metaOnly", default=False,
description="Do not tell the database to actually create the foreign"
" key, just declare it in the metadata. This is for when you want"
" to document a relationship but don't want the DB to actually"
" enforce this. This is typically a wise thing to do when you have, say"
" a gigarecord of flux/density pairs and only several thousand metadata"
" records -- you may want to update the latter without having"
" to tear down the former.")
[docs] def getDescription(self):
return "%s:%s -> %s:%s"%(self.parent.getQName(), ",".join(self.source),
self.destTableName, ".".join(self.dest))
def _parseList(self, raw):
if isinstance(raw, list):
# we're being copied
return raw
return [s.strip() for s in raw.split(",") if s.strip()]
[docs] def onElementComplete(self):
self.destTableName = self.inTable.getQName()
self.isADQLKey = self.inTable.adql and self.inTable.adql!='hidden'
self.source = self._parseList(self.source)
if self.dest is base.NotGiven:
self.dest = self.source
else:
self.dest = self._parseList(self.dest)
super().onElementComplete()
[docs] def create(self, querier):
if self.metaOnly:
return
if not querier.foreignKeyExists(self.parent.getQName(),
self.destTableName,
self.source,
self.dest):
return querier.connection.execute("ALTER TABLE %s ADD FOREIGN KEY (%s)"
" REFERENCES %s (%s)"
" ON DELETE CASCADE"
" DEFERRABLE INITIALLY DEFERRED"%(
self.parent.getQName(),
",".join(self.source),
self.destTableName,
",".join(self.dest)))
[docs] def delete(self, querier):
if self.metaOnly:
return
try:
constraintName = querier.getForeignKeyName(self.parent.getQName(),
self.destTableName, self.source, self.dest)
except (ValueError, base.DBError): # key does not exist.
return
querier.connection.execute(
"ALTER TABLE %s DROP CONSTRAINT %s"%(self.parent.getQName(),
constraintName))
[docs] def getAnnotation(self, roleName, container, instance):
"""returns a dm annotation for this foreign key.
"""
return dm.ForeignKeyAnnotation(roleName, self, instance)
[docs]class STCDef(base.Structure):
"""A definition of a space-time coordinate system using STC-S.
"""
# Programmatically, you have
# * compiled -- an AST of the entire specification
# * iterColTypes -- iterates over column name/utype pairs, for the
# embedding table only; all others must not touch it
name_ = "stc"
_source = base.DataContent(copyable=True, description="An STC-S string"
" with column references (using quote syntax) instead of values")
# the following mapping translates legacy STC utypes to paths suitable
# for modern STC annotation in onElementComplete
# These are a bit sanitised in that the stupid indications of dimensionality
# are gone. To compare these, use the sanitizeUtype class method
# below. Aw, what madness!
_utypeToModern = {
"stc:AstroCoordSystem.SpaceFrame.CoordRefFrame":
"/space/frame/orientation",
"stc:AstroCoordSystem.SpaceFrame.ReferencePosition":
"/space/frame/refPosition",
"stc:AstroCoordSystem.SpaceFrame.CoordRefFrame.Equinox":
"/space/frame/equinox",
"stc:AstroCoordSystem.SpaceFrame.CoordFlavor":
"!!!flavor!!!",
"stc:AstroCoords.Position.Value.C1":
"/space/longitude",
"stc:AstroCoords.Position.Value.C2":
"/space/latitude",
"stc:AstroCoords.Position.Value.C3":
"/space/distance",
"stc:AstroCoords.Position.Value":
"/space/location",
"stc:AstroCoordArea.Polygon":
"/space/location",
"stc:AstroCoords.Velocity.Value.C1":
"/space/pm_longitude",
"stc:AstroCoords.Velocity.Value.C2":
"/space/pm_latitude",
"stc:AstroCoords.Velocity.Value.C3":
"/space/rv",
"stc:AstroCoords.Position.Epoch":
"/space/frame/epoch",
"stc:AstroCoords.Position.Epoch.yearDef":
"!!!yearDef!!!",
"stc:AstroCoords.Time.TimeInstant":
"/time/location",
"stc:AstroCoordSystem.TimeFrame.ReferencePosition":
"/time/frame/refPosition",
"stc:AstroCoordSystem.TimeFrame.TimeScale":
"/time/frame/timescale",
}
_sanitizer = re.compile(r"(Position|Value|Velocity)\dD?")
[docs] @classmethod
def sanitizeUtype(cls, utype):
"""returns utypes without the stupid 2D/3D qualifications.
"""
return cls._sanitizer.sub(r"\1", utype)
[docs] def completeElement(self, ctx):
super().completeElement(ctx)
try:
self.compiled = stc.parseQSTCS(self.content_)
except stc.STCSParseError as msg:
raise base.ui.logOldExc(base.StructureError(
"Bad stc definition: %s"%str(msg)))
self.compiled.stripUnits()
self._origFields = dict((value.dest, utype)
for utype, value in stc.getUtypes(self.compiled)
if isinstance(value, stc.ColRef))
[docs] def iterColTypes(self):
return iter(self._origFields.items())
[docs] def onParentComplete(self):
"""produces new-style STC annotation from the utypes.
This is being called as an onParentCompleted function from the tables'
stc attribute. It makes <stc> declarations work with new-style
COOSYS and perhaps one day with mivot-based annotation.
"""
translated = {}
for utype, value in stc.getUtypes(self.compiled):
utype = self.sanitizeUtype(utype)
if utype in self._utypeToModern:
translated[self._utypeToModern[utype]] = value
else:
pass # no match in the new data model --
if not translated:
return
# Ok, we can build a votable:Coords annotation
dm.buildAdhocSTC(self.parent, translated)
[docs]class ADQLVisibilityAttribute(base.BooleanAttribute):
"""An attribute that has values True/False and hidden.
"""
typeDesc_ = "boolean or 'hidden'"
# This can be hacked to hidden by dachs imp's --hide-adql option.
trueValue = True
[docs] def feedObject(self, instance, value):
if value=='hidden':
instance._readProfiles.feed(None, instance, "defaults,untrustedquery")
value = False
base.BooleanAttribute.feedObject(self, instance, value)
[docs] def parse(self, value):
if value.lower()=="hidden":
return "hidden"
return base.BooleanAttribute.parse(self, value) and self.trueValue
[docs] def unparse(self, value):
if value=="hidden":
return value
return base.BooleanAttribute.unparse(self, value)
[docs]class PublishableDataMixin(base.StructCallbacks):
"""A mixin with a few classes and attributes for data that can be
published to the VO registry.
In particular, this contains the publish element (registration attribute).
"""
_registration = base.StructAttribute("registration",
default=None,
childFactory=common.Registration,
copyable=False,
description="A registration (to the VO registry) of this table"
" or data collection.")
[docs] def getPublicationsForSet(self, setNames, includeDatalink=True):
"""returns a sequence of publication elements for the data, suitable
for OAI responses for the sets setNames.
Essentially: if registration is None, or its sets don't match
setNames, return an empty sequence.
If the registration mentions services, we turn their publications
into auxiliary publications and yield them
Otherwise, if we're published for ADQL, return the TAP service
as an auxiliary publication.
"""
if (self.registration is None
or not self.registration.sets & setNames):
return
services = self.registration.services
if not services:
services = [base.resolveCrossId("//tap#run")]
for service in services:
# we only want the "primary" publication here, which means: no
# VOSI or other synthesized crap. The logic for now is:
# Discard all publications with a renderer not named in allowed.
for pub in service.getPublicationsForSet(setNames):
if pub.render in service.allowed:
copy = pub.change(parent_=self, auxiliary=True)
copy.meta_ = self.registration.meta_
yield copy
[docs]class TableDef(base.Structure, base.ComputedMetaMixin, common.PrivilegesMixin,
common.IVOMetaMixin, base.StandardMacroMixin, PublishableDataMixin,
scripting.ScriptingMixin):
"""A definition of a table, both on-disk and internal.
Some attributes are ignored for in-memory tables, e.g., roles or adql.
Properties for tables:
* supportsModel -- a short name of a data model supported through this
table (for TAPRegExt dataModel); you can give multiple names separated
by commas.
* supportsModelURI -- a URI of a data model supported through this table.
You can give multiple URIs separated by blanks.
* forceStats -- if present (with any value), dachs limits on the
embedding RD will obtain statistics of this even if it is a view.
Somewhat inconsistently, to set a table's utype if you have to, set
its utype meta.
If you give multiple data model names or URIs, the sequences of names and
URIs must be identical (in particular, each name needs a URI).
"""
name_ = "table"
resType = "table"
acceptedScriptTypes = {"preIndex", "preCreation",
"postCreation", "beforeDrop", "afterMeta"}
# We don't want to force people to come up with an id for all their
# internal tables but want to avoid writing default-named tables to
# the db. Thus, the default is not a valid sql identifier.
_id = base.IdAttribute("id",
default=base.NotGiven,
description="Name of the table (must be SQL-legal for onDisk tables)")
_cols = common.ColumnListAttribute("columns",
childFactory=column.Column,
description="Columns making up this table.",
copyable=True)
_params = common.ColumnListAttribute("params",
childFactory=column.Param,
description='Param ("global columns") for this table.',
copyable=True)
_viewStatement = base.UnicodeAttribute("viewStatement",
default=None,
description="A single SQL statement to create a view. Setting this"
" makes this table a view. The statement will typically be something"
" like CREATE VIEW \\\\qName AS (SELECT \\\\colNames FROM...).",
copyable=True)
# onDisk must not be copyable since queries might copy the tds and havoc
# would result if the queries were to end up on disk.
_onDisk = base.BooleanAttribute("onDisk",
default=False,
description="Table in the database rather than in memory?")
_temporary = base.BooleanAttribute("temporary",
default=False,
description="If this is an onDisk table, make it temporary?"
" This is mostly useful for custom cores and such.",
copyable=True)
_adql = ADQLVisibilityAttribute("adql",
default=False,
description="Should this table be available for ADQL queries? In"
" addition to True/False, this can also be 'hidden' for tables"
" readable from the TAP machinery but not published in the"
" metadata; this is useful for, e.g., tables contributing to a"
" published view. Warning: adql=hidden is incompatible with setting"
" readProfiles manually.")
_system = base.BooleanAttribute("system",
default=False,
description="Is this a system table? If it is, it will not be"
" dropped on normal imports, and accesses to it will not be logged.")
_forceUnique = base.BooleanAttribute("forceUnique",
default=False,
description="Ignored legacy attribute.")
_dupePolicy = base.EnumeratedUnicodeAttribute("dupePolicy",
default=None,
validValues=["check", "drop", "overwrite", "dropOld"],
description= "Handle duplicate rows with identical primary keys manually"
" by raising an error if existing and new rows are not identical (check),"
" dropping the new one (drop), updating the old one (overwrite), or"
" dropping the old one and inserting the new one (dropOld)? Note that"
" if you change this, you will have to re-create the table to make"
" it take effect. The default is to have no special handling (which,"
" if a primary key is there at all, is like check, except an error"
" will be raised even if new and old row are identical).")
_primary = ColumnTupleAttribute("primary",
default=(),
description="Comma separated names of columns making up the primary key.",
copyable=True)
_indices = base.StructListAttribute("indices",
childFactory=DBIndex,
description="Indices defined on this table",
copyable=True)
_foreignKeys = base.StructListAttribute("foreignKeys",
childFactory=ForeignKey,
description="Foreign keys used in this table",
copyable=False)
_groups = base.StructListAttribute("groups",
childFactory=group.Group,
description="Groups for columns and params of this table",
copyable=True)
_nrows = base.IntAttribute("nrows",
description="Approximate number of rows in this table. While you"
" can hard-code this here, running dachs limits will put"
" an estimate into the database.")
# this actually induces an attribute annotations with the DM
# annotation instances
_annotations = dm.DataModelRolesAttribute()
_properties = base.PropertyAttribute()
# don't copy stc -- columns just keep the reference to the original
# stc on copy, and nothing should rely on column stc actually being
# defined in the parent tableDefs.
_stcs = base.StructListAttribute("stc", description="STC-S definitions"
" of coordinate systems.", childFactory=STCDef)
_rd = common.RDAttribute()
_mixins = mixins.MixinAttribute()
_original = base.OriginalAttribute()
_namePath = common.NamePathAttribute()
fixupFunction = None
metaModel = ("creationDate(1), description(1),"
"subject, referenceURL(1)")
[docs] @classmethod
def fromColumns(cls, columns, **kwargs):
"""returns a TableDef from a sequence of columns.
You can give additional constructor arguments. makeStruct is used
to build the instance, the mixin hack is applied.
Columns with identical names will be disambiguated.
"""
res = MS(cls,
columns=common.ColumnList(cls.disambiguateColumns(columns)),
**kwargs)
return res
def __iter__(self):
return iter(self.columns)
def __contains__(self, name):
try:
self.columns.getColumnByName(name)
except base.NotFoundError:
return False
return True
def __repr__(self):
try:
return "<Table definition of %s>"%self.getQName()
except base.Error:
return "<Non-RD table %s>"%self.id
[docs] def completeElement(self, ctx):
# Make room for DM annotations (these are currently filled by
# gavo.dm.dmrd.DataModelRoles, but we might reconsider this)
self.annotations = []
if self.viewStatement and getattr(ctx, "restricted", False):
raise base.RestrictedElement("table", hint="tables with"
" view creation statements are not allowed in restricted mode")
if self.registration:
if self.id is base.NotGiven:
raise base.StructureError("Published tables need an assigned id.")
else:
try:
self.setMeta("_metadataUpdated",
ctx.getInjected("resprop:%s#%s"%(self.rd.sourceId, self.id
))["rectimestamp"])
except (KeyError, AttributeError):
# fallback to RD meta isn't a disaster here
pass
if not self.id:
self._id.feed(ctx, self, utils.intToFunnyWord(id(self)))
# allow iterables to be passed in for columns and convert them
# to a ColumnList here
if not isinstance(self.columns, common.ColumnList):
self.columns = common.ColumnList(self.columns)
self._resolveSTC()
if ctx and not self.nrows:
self.nrows = ctx.getInjected("table:%s"%self.id, {"nrows": None}
).get("nrows")
super().completeElement(ctx)
self.columns.withinId = self.params.tableName = "table "+self.id
if ctx:
# this pushes min/max/median etc from the context to column/values
for col in self:
col.updateFromContext(self.id, col.name, ctx)
[docs] def validate(self):
if self.id.upper() in adql.ALL_RESERVED_WORDS:
raise base.StructureError("Reserved word %s is not allowed as a table"
" name"%self.id)
if self.scripts and not self.onDisk:
if self.rd and self.rd.schema=="test":
# for testing, we want to feed, say, obscore tables into memory
pass
else:
raise base.StructureError("Scripts are only supported for onDisk"
" tables")
super().validate()
[docs] def onElementComplete(self):
if self.adql:
self.readProfiles = (self.readProfiles |
base.getConfig("db", "adqlProfiles"))
self.dictKeys = [c.key for c in self]
self.indexedColumns = {}
for index in self.indices:
for col in index.columns:
if "\\" in col:
try:
col = self.expand(col)
except (base.Error, ValueError):
# cannot expand yet, ignore rather than pollute indexedColumns
continue
self.indexedColumns.setdefault(col, set()).add(index.kind)
if self.primary:
for col in self.primary:
self.indexedColumns.setdefault(col, set()).add("primary")
self._defineFixupFunction()
super().onElementComplete()
if self.registration:
self.registration.register()
# if there's no DM annotation yet, there's still a chance that our
# columns and params brought some with them. Try that.
if not self.annotations:
self.updateAnnotationFromChildren()
# we sort the indices for better performance (and possibly,
# one day, so things work at all, if there's dependencies).
# Right now, this only sorts indices to be clustered to the front.
self.indices.sort()
[docs] def getElementForName(self, name):
"""returns the first of column and param having name name.
The function raises a NotFoundError if neither column nor param with
name exists.
"""
try:
try:
return self.columns.getColumnByName(name)
except base.NotFoundError:
return self.params.getColumnByName(name)
except base.NotFoundError as ex:
ex.within = "table %s"%self.id
raise
def _resolveSTC(self):
"""adds STC related attributes to this tables' columns.
"""
for stcDef in self.stc:
for name, type in stcDef.iterColTypes():
destCol = self.getColumnByName(name)
if destCol.stc is not None:
# don't warn -- this kind of annotation is done for the future,
# when we can handle it properly.
continue
# base.ui.notifyWarning("Column %s is referenced twice from STC"
# " in table %s is referenced twice in STC groups. This"
# " is currently not supported, the second reference is"
# " ignored."%(name, self.getQName()))
destCol.stc = stcDef.compiled
destCol.stcUtype = type
def _defineFixupFunction(self):
"""defines a function to fix up records from column's fixup attributes.
This will leave a fixupFunction attribute which will be None if
no fixups are defined.
"""
fixups = []
for col in self:
if col.fixup is not None:
fixups.append((col.name, col.fixup))
if fixups:
assignments = []
for key, expr in fixups:
expr = expr.replace("___", "row['%s']"%key)
assignments.append(" row['%s'] = %s"%(key, expr))
source = self.expand(
"def fixup(row):\n%s\n return row"%("\n".join(assignments)))
self.fixupFunction = rmkfuncs.makeProc("fixup", source,
"", None)
[docs] def getQName(self):
if self.temporary or not hasattr(self, "parent"):
return self.id
else:
if self.rd is None:
raise base.Error("TableDefs without resource descriptor"
" have no qualified names")
return "%s.%s"%(self.rd.schema, self.id)
[docs] def validateRow(self, row):
"""checks that row is complete and complies with all known constraints on
the columns
The function raises a ValidationError with an appropriate message
and the relevant field if not.
"""
for col in self:
if col.key not in row:
raise base.ValidationError("%s not bound in row"%col.name,
col.name, row, hint="The table %s has a column named '%s',"
" but the input row %s does not give it. This typically means"
" bad input or a rowmaker failing on some corner case."%(
self.id, col.name, row))
try:
col.validateValue(row[col.key])
except base.ValidationError as ex:
ex.row = row
raise
[docs] def getFieldIndex(self, fieldName):
"""returns the index of the field named fieldName.
"""
return self.columns.getFieldIndex(fieldName)
[docs] def getParamByName(self, name):
return self.params.getColumnByName(name)
[docs] def getColumnByName(self, name):
return self.columns.getColumnByName(name)
[docs] def getColumnById(self, id):
return self.columns.getColumnById(id)
[docs] def getColumnsByUCD(self, ucd):
return self.columns.getColumnsByUCD(ucd)
[docs] def getColumnByUCD(self, ucd):
return self.columns.getColumnByUCD(ucd)
[docs] def getColumnByUCDs(self, *ucds):
return self.columns.getColumnByUCDs(*ucds)
[docs] def getColumnsByUCDs(self, *ucds):
res = []
for ucd in ucds:
res.extend(self.columns.getColumnsByUCD(ucd))
return res
[docs] def getByUtype(self, utype):
"""returns the column or param with utype.
This is supposed to be unique, but the function will just return
the first matching item it finds.
"""
try:
return self.params.getColumnByUtype(utype)
except base.NotFoundError:
return self.columns.getColumnByUtype(utype)
[docs] def getByUtypes(self, *utypes):
"""returns the first param or column matching the first utype
matching anything.
"""
for utype in utypes:
try:
return self.getByUtype(utype)
except base.NotFoundError:
pass
raise base.NotFoundError(", ".join(utypes),
what="param or column with utype in",
within="table %s"%self.id)
[docs] def getByName(self, name):
"""returns the column or param with name.
There is nothing keeping you from having both a column and a param with
the same name. If that happens, you will only see the column. But
don't do it.
"""
try:
return self.columns.getColumnByName(name)
except base.NotFoundError:
return self.params.getColumnByName(name)
[docs] def makeRowFromTuple(self, dbTuple):
"""returns a row (dict) from a row as returned from the database.
"""
preRes = dict(list(zip(self.dictKeys, dbTuple)))
if self.fixupFunction:
return self.fixupFunction(preRes)
return preRes
[docs] def getDefaults(self):
"""returns a mapping from column names to defaults to be used when
making a row for this table.
"""
defaults = {}
for col in self:
if col.values:
defaults[col.name] = col.values.default
elif not col.required:
defaults[col.name] = None
return defaults
[docs] def getSTCDefs(self):
"""returns a set of all STC specs referenced in this table as ASTs.
"""
# Do not use our stc attribute -- the columns may come from different
# tables and carry stc from there.
stcObjects = utils.uniqueItems(col.stc for col in self)
if None in stcObjects:
stcObjects.remove(None)
return stcObjects
[docs] def getNote(self, noteTag):
"""returns the table note meta value for noteTag.
This will raise a NotFoundError if we don't have such a note.
You will not usually use this to retrieve meta items since columns
have the meta values in their note attributes. Columns, of course,
use this to get their note attribute value.
"""
mi = self.getMeta("note") or []
for mv in mi:
if mv.tag==noteTag:
return mv
else:
raise base.NotFoundError(noteTag, what="note tag",
within="table %s"%self.id)
[docs] def getURL(self, rendName, absolute=True):
"""returns the URL DaCHS will show the table info page for this table
under.
Of course the URL is only valid for imported tables.
"""
basePath = "%stableinfo/%s"%(
base.getConfig("web", "nevowRoot"),
self.getQName())
if absolute:
basePath = base.makeAbsoluteURL(basePath)
return basePath
[docs] def getDDL(self):
"""returns an SQL statement that creates the table.
"""
preTable = ""
if self.temporary:
preTable = "TEMP "
statement = "CREATE %sTABLE %s (%s)"%(
preTable,
self.getQName(),
", ".join(column.getDDL() for column in self))
return statement
[docs] def getSimpleQuery(self,
selectClause=None,
fragments="",
postfix=""):
"""returns a query against this table.
selectClause is a list of column names (in which case the names
are validated against the real column names and you can use
user input) or a literal string (in which case you must not provide
user input or have a SQL injection hole).
fragments (the WHERE CLAUSE) and postfix are taken as literal strings (so
they must not contain user input).
This is purely a string operation, so you'll have your normal
value references in fragments and postfix, and should maintain
the parameter dictionaries as usual.
All parts are optional, defaulting to pulling the entire table.
"""
parts = ["SELECT"]
if selectClause is None:
parts.append("*")
elif isinstance(selectClause, list):
parts.append(", ".join(
self.getColumnByName(colName).name for colName in selectClause))
else:
parts.append(selectClause)
parts.append("FROM %s"%self.getQName())
if fragments:
parts.append("WHERE %s"%fragments)
if postfix:
parts.append(postfix)
return " ".join(parts)
@property
def caseFixer(self):
return dict((col.name.lower(), col.name) for col in self)
[docs] def doSimpleQuery(self,
selectClause=None,
fragments="",
params=None,
postfix=""):
"""runs a query generated via getSimpleQuery and returns a list
of rowdicts.
This uses a table connection and queryToDicts; the keys in the
dictionaries will have the right case for this table's columns, though.
params is a dictionary of fillers for fragments and postfix.
"""
with base.getTableConn() as conn:
return list(
conn.queryToDicts(
self.getSimpleQuery(
selectClause,
fragments,
postfix),
params,
caseFixer=self.caseFixer))
[docs] def macro_colNames(self):
"""returns an SQL-ready list of column names of this table.
"""
return ", ".join(c.name for c in self.columns)
[docs] def macro_curtable(self):
"""returns the qualified name of the current table.
(this is identical to the `macro qName`_, which you should prefer
in new RDs.)
"""
return self.getQName()
[docs] def macro_qName(self):
"""returns the qualified name of the current table.
"""
return self.getQName()
[docs] def macro_tablename(self):
"""returns the unqualified name of the current table.
In most contexts, you will probably need to use the `macro qName`_
instead of this.
"""
return self.id
[docs] def macro_nameForUCD(self, ucd):
"""returns the (unique!) name of the field having ucd in this table.
If there is no or more than one field with the ucd in this table,
we raise a ValueError.
"""
return self.getColumnByUCD(ucd).name
[docs] def macro_nameForUCDs(self, ucds):
"""returns the (unique!) name of the field having one
of ucds in this table.
Ucds is a selection of ucds separated by vertical bars
(|). The rules for when this raises errors are so crazy
you don't want to think about them. This really is
only intended for cases where "old" and "new" standards
are to be supported, like with pos.eq.*;meta.main and
POS_EQ_*_MAIN.
If there is no or more than one field with the ucd in
this table, we raise an exception.
"""
return self.getColumnByUCDs(*(s.strip() for s in ucds.split("|"))).name
[docs] def macro_getParam(self, parName, default=""):
"""returns the string representation of the parameter parName.
This is the parameter as given in the table definition. Any changes
to an instance are not reflected here.
If the parameter named does not exist, an empty string is returned.
NULLs/Nones are rendered as NULL; this is mainly a convenience
for obscore-like applications and should not be exploited otherwise,
since it's ugly and might change at some point.
If a default is given, it will be returned for both NULL and non-existing
params.
"""
try:
param = self.params.getColumnByName(parName)
except base.NotFoundError:
return default
if param.content_ is base.NotGiven or param.value is None:
return default or "NULL"
else:
return param.content_
[docs] @staticmethod
def disambiguateColumns(columns):
"""returns a sequence of columns without duplicate names.
"""
newColumns, seenNames = [], set()
for c in columns:
while c.name in seenNames:
c.name = c.name+"_"
newColumns.append(c)
seenNames.add(c.name)
return newColumns
def _meta_howtociteLink(self):
"""returns a link to a how-to-cite page for this service as an URL
meta.
"""
if self.onDisk:
# we assume we're sufficiently long-lived to merit a tableinfo if
# we're on disk
return base.META_CLASSES_FOR_KEYS["_related"](
self.getURL(None, True)+"#ti-citing",
title="Advice on citing this resource")
def _meta_referenceURL(self):
"""returns a link to the table-info page.
"""
return base.META_CLASSES_FOR_KEYS["_related"](
self.getURL(None, True),
title="Table information")
def _meta_shortName(self):
"""makes the shortName default to the (possibly shortened) table name.
This is so simply published tables have a proper short name without
any further action.
"""
return self.getQName()[:16]
[docs]def makeTDForColumns(name, cols, **moreKWs):
"""returns a TableDef object named names and having the columns cols.
cols is some sequence of Column objects. You can give arbitrary
table attributes in keyword arguments.
"""
kws = {"id": name, "columns": common.ColumnList(cols)}
kws.update(moreKWs)
return base.makeStruct(TableDef, **kws)