"""
Updating table and column metadata.
While column statistics can be explicitly defined in values elements
(and there may be cases when manually defining them makes sense), the
typical case is to gather statistics from the database and keep them in
a few tables in the dc schema.
Starting with DaCHS 2.3.1 (schema version 27), there's
dc.simple_col_stats for floats and "2 sigma" statistics.
Starting with DaCHS 2.5.2 (schema version 30), there's in addition
dc.string_col_dist for statistics of enumerated string columns.
The actual acquisition of the statistics is currently done in user.info
(and should probably move to rscdef).
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from gavo import base
from gavo import rsc
from gavo import utils
from gavo.user import info
[docs]def iterCoverageItems(updater):
"""yields coverage items for inclusion in RDs.
NOTE: so far, we can only have one coverage item. So, it's enough
to just say "fill this into axis x of coverage". If and when we
have more than one coverage items, we'll have to re-think that.
That's why there's the "reserved" value in the tuples. We'll have to
put something in there (presumably the index of the coverage element,
but perhaps we'll have a better identity at some point).
"""
if updater is base.NotGiven:
return
if updater.parent.spatial is not None:
sourceTable = updater.spaceTable or updater.sourceTable
if sourceTable:
cov = info.getMOCForStdTable(sourceTable, updater.mocOrder)
if cov:
yield "spatial", cov.asASCII()
if updater.parent.temporal is not None:
sourceTable = updater.timeTable or updater.sourceTable
if sourceTable:
res = []
for pair in info.iterScalarLimits(
sourceTable,
info.getTimeLimitsExprs):
res.extend(pair)
yield "temporal", res
if updater.parent.spectral is not None:
sourceTable = updater.spectralTable or updater.sourceTable
if sourceTable:
res = []
for pair in info.iterScalarLimits(
sourceTable,
info.getSpectralLimitsExprs):
res.extend(pair)
yield "spectral", res
[docs]def updateTableLevelStats(
td,
conn,
samplePercent=None,
acquireColumnMeta=True):
"""determines column metadata for the table td and inserts it into
dc.*stats.
samplePercent, if given, says how much of the table to look at; giving
this on views will fail.
If acquireColumnMeta is False, only the size of the table is estimated.
"""
tableType = base.UnmanagedQuerier(conn).getTableType(td.getQName())
if tableType is None:
base.ui.notifyWarning("Skipping non-existing table %s"%td.getQName())
return
elif tableType=="VIEW":
samplePercent = 0
info.annotateDBTable(td, samplePercent, acquireColumnMeta)
for toImport in [
"//dc_tables#import_simple_col_stats",
"//dc_tables#import_discrete_string_values"]:
rsc.makeData(
base.resolveCrossId(toImport),
forceSource=td,
connection=conn)
conn.execute("UPDATE dc.tablemeta SET nrows=%(nrows)s"
" WHERE tableName=%(tableName)s",
{"nrows": td.nrows, "tableName": td.getQName()})
[docs]def updateForRD(rd, conn, samplePercent=None, acquireColumnMeta=True):
"""obtains RD- and table-level metadata for rd and writes it to
the meta data tables through conn.
"""
base.ui.notifyInfo(f"Obtaining metadata for rd {rd.sourceId}...")
updateRDLevelMetadata(rd, conn)
for td in rd.tables:
if td.onDisk:
if td.viewStatement and not td.getProperty("forceStats", False):
continue
updateTableLevelStats(
td, conn, samplePercent, acquireColumnMeta)
def _getUpdatableRdIds():
"""returns a list of RD ids that presumably had dachs limits
run on them before (because they have coverage or table stats).
"""
with base.getTableConn() as conn:
return [r[0] for r in conn.query(
"SELECT sourceRD FROM"
" dc.rdmeta"
" WHERE spatial IS NOT NULL"
" OR temporal IS NOT NULL OR spectral IS NOT NULL"
" UNION"
" SELECT sourceRD FROM"
" dc.tablemeta"
" WHERE nrows IS NOT NULL")]
[docs]def dumpTableLevelStats(td, conn):
"""writes limits metadata for the table td.
"""
if not td.onDisk:
return
qName = td.getQName()
heading = f"Statistics for {qName}"
print("\n"+heading+"\n"+"-"*len(heading))
try:
print("|rows| = {}\n".format(next(conn.query(
"SELECT nrows FROM dc.tablemeta WHERE tablename=%(qName)s",
locals()))[0] or "<Unknown>"))
except StopIteration:
print("No metadata (table not imported)?")
return
statsTD = base.resolveCrossId("//dc_tables#simple_col_stats")
colNames = ("column_name min_value max_value percentile03"
" median percentile97 fill_factor").split()
stats = [["{:.5g}".format(v) if isinstance(v, float)
else utils.makeEllipsis(str(v), 12, "…") for v in r]
for r in conn.query(statsTD.getSimpleQuery(
colNames,
"tablename=%(tablename)s"),
{"tablename": td.getQName()})]
stats.sort()
print(utils.formatSimpleTable(stats, False, colNames))
[docs]def dumpStatsForRD(rd, conn):
"""writes metadata for rd and its tables
"""
print("="*72)
rdId = rd.sourceId
print(f"Statistics for RD {rdId}")
try:
print("spatial converage {}".format(next(conn.query(
"SELECT spatial FROM dc.rdmeta WHERE sourcerd=%(rdId)s",
locals()))[0] or "<Unknown>"))
except StopIteration:
print("No RD stats.")
else:
print("temporal converage {}".format(next(conn.query(
"SELECT temporal FROM dc.rdmeta WHERE sourcerd=%(rdId)s",
locals()))[0] or "<Unknown>"))
print("spectral converage {}".format(next(conn.query(
"SELECT spectral FROM dc.rdmeta WHERE sourcerd=%(rdId)s",
locals()))[0] or "<Unknown>"))
for td in rd.tables:
dumpTableLevelStats(td, conn)
[docs]def parseCmdLine():
from argparse import ArgumentParser
parser = ArgumentParser(
description="Updates existing values min/max items in a referenced"
" table or RD.")
parser.add_argument("-t", "--tables-only",
dest="tablesOnly",
action="store_true",
help="Only acquire table/resource-level metadata (rather than column"
" metadata, which usually takes a lot longer).")
parser.add_argument("-s", "--sample-percent",
type=float,
default=None,
dest="samplePercent",
metavar="P",
help="Only look at P percent of the table to determine min/max/mean.")
parser.add_argument("-d", "--dump",
action="store_true",
default=False,
dest="dumpOnly",
help="Do not obtain statistics but rather dump the results of the"
" last run")
parser.add_argument("itemId",
nargs="+",
help="Cross-RD reference of a table or"
" RD to update, as in ds/q or ds/q#mytable; only RDs in inputsDir"
" can be updated. A single ALL will expand to all RDs that already"
" have limits-obtained metadata.")
return parser.parse_args()
[docs]def main():
from gavo import api
args = parseCmdLine()
if len(args.itemId)==1 and args.itemId[0]=="ALL":
args.itemId = _getUpdatableRdIds()
with api.getWritableAdminConn() as conn:
for itemRef in args.itemId:
item = api.getReferencedElement(itemRef)
if isinstance(item, api.TableDef):
if args.dumpOnly:
dumpTableLevelStats(item, conn)
else:
updateTableLevelStats(item, conn,
args.samplePercent, not args.tablesOnly)
elif isinstance(item, api.RD):
if args.dumpOnly:
dumpStatsForRD(item, conn)
else:
updateForRD(item, conn, args.samplePercent, not args.tablesOnly)
else:
raise base.ReportableError(
"%s references neither an RD nor a table definition"%args.itemId)
conn.commit()