"""
A cli-facing module providing functionality to "validate" one or more
resource descriptors.
Validation means giving some prognosis as to whether RD will properly work
within both the DC and the VO.
While validation is active there's base.VALIDATING=True. If RDs
to anything expensive, they're advised to have something like::
if getattr(base, "VALIDATING", False):
(don't do the expensive thing)
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import argparse
import itertools
import re
import sys
import traceback
from gavo import api
from gavo import adql
from gavo import base
from gavo import svcs
from gavo import stc
from gavo import registry
from gavo import utils
from gavo.helpers import testtricks
from gavo.imp import astropyucd
from gavo.registry import builders
from gavo.protocols import datalink
from gavo.protocols import vocabularies
from gavo.user import errhandle
from gavo.web import htmltable
from gavo.web import examplesrender #noflake: for RST registration
builders.VALIDATING = True
# have some non-UAT keywords that we swallow for some reason or other
NON_UAT_SUBJECTS = {"stars", "DOI"}
# just accept some UCDs mentioned in standards; some of
# these have actually been made legal later, but we don't want to
# report them even with old astropys
UCD_WHITELIST = {'instr.fov', "em.line"}
[docs]class TestsCollector(object):
"""a singleton that collects use cases to run.
Don't instantiate, this is a global singleton.
The testsToRun attribute contains the test suites to run.
"""
testsToRun = []
[docs] @classmethod
def addRD(cls, rd):
"""adds tests from rd.
"""
for suite in rd.tests:
cls.testsToRun.append(suite)
[docs]def outputDependentMessage(aString):
"""an output function for errhandle.raiseAndCatch.
It is used here to indent dependent error messages.
"""
print(re.sub("(?m)^", " ", aString))
[docs]def outputError(rdId, message, verbose=False):
print("[ERROR] %s: %s"%(rdId, message))
if verbose:
errhandle.raiseAndCatch(output=outputDependentMessage)
[docs]def outputWarning(rdId, message, verbose=False):
print("[WARNING] %s: %s"%(rdId, message))
if verbose:
errhandle.raiseAndCatch(output=outputDependentMessage)
[docs]def loadRD(rdId):
"""returns the RD identified by rdId.
If that fails, diagnostics are printed and None is returned.
"""
try:
rd = api.getReferencedElement(rdId, doQueries=False)
# This is so we can validate userconfig.rd
if hasattr(rd, "getRealRD"):
rd = rd.getRealRD()
except api.RDNotFound:
outputError(rdId, "RD or dependency not found, message follows", True)
except api.LiteralParseError:
outputError(rdId, "Bad literal in RD, message follows", True)
except api.StructureError:
outputError(rdId, "Malformed RD input, message follows", True)
except api.Error:
outputError(rdId, "Syntax or internal error, message follows", True)
else:
return rd
# Fallthrough: RD could not be loaded
return None
_XSD_VALIDATOR = testtricks.XSDTestMixin()
[docs]def isIVOPublished(res):
"""returns true if res has a publication facing the VO.
"""
if hasattr(res, "registration"):
# it's a data item
return "ivo_managed" in res.registration.sets
else:
# it's a service:
for pub in res.publications:
if "ivo_managed" in pub.sets:
return True
else:
return False
[docs]def iterPublishedResources(rd, args):
for svc in rd.services:
if args.prePublication or svc.publications:
yield svc
for res in rd.resRecs:
yield res
for res in itertools.chain(rd.tables, rd.dds):
if res.registration:
yield res
[docs]def validateServices(rd, args):
"""outputs to stdout various diagnostics about the services on rd.
"""
validSoFar = True
for res in iterPublishedResources(rd, args):
try:
base.validateStructure(res)
except api.MetaValidationError as ex:
validSoFar = False
outputError(rd.sourceId, "Missing metadata for publication of"
" service %s:\n%s"%(res.id, str(ex)))
continue # further checks will just add verbosity
# see if subject keywords are from UAT
if isIVOPublished(res):
uat = vocabularies.get_vocabulary("uat")
for subject in res.iterMeta("subject", propagate=True):
if (str(subject) not in uat["terms"]
and str(subject) not in NON_UAT_SUBJECTS):
outputWarning(rd.sourceId, "Service %s has subject %s"
", which is not from http://ivoa.net/rdf/uat."%(
res.id, subject))
if isinstance(res, svcs.Service):
if not (args.prePublication or isIVOPublished(res)):
# require sane metadata on services only if the VO will see the service
continue
# error out if the identifier cannot be generated
api.getMetaText(res, "identifier")
registryRecord = None
try:
registryRecord = builders.getVORMetadataElement(res)
except stc.STCSParseError as msg:
validSoFar = False
outputError(rd.sourceId, "Invalid STC-S (probably in coverage meta)"
": %s"%str(msg))
except:
validSoFar = False
outputError(rd.sourceId, "Error when producing registry record"
" of service %s:"%res.id, True)
# make sure the registry record is XSD-valid
if registryRecord is not None:
try:
_XSD_VALIDATOR.assertValidates(
registryRecord.render(), leaveOffending=True)
except AssertionError as msg:
validSoFar = False
outputError(rd.sourceId, "Invalid registry record for service"
" %s:\n%s"%(res.id, str(msg)))
return validSoFar
[docs]def validateRST(rd, args):
"""outputs diagnostics on RST formatting problems.
"""
def validateRSTOne(el):
validSoFar = True
for key, val in getattr(el, "getAllMetaPairs", lambda: [])():
if val.format=='rst':
content = val.getExpandedContent(macroPackage=el)
_, msg = utils.rstxToHTMLWithWarning(content)
if msg:
outputWarning(rd.sourceId,
"%s metadata on %s (%s) has an RST problem: %s"%(
key, el, utils.makeEllipsis(content, 80), msg))
for child in el.iterChildren():
if child:
validSoFar = validSoFar and validateRSTOne(child)
return validSoFar
return validateRSTOne(rd)
[docs]def validateRowmakers(rd, args):
"""tries to build all rowmakers mentioned in the RD and bails out
if one is bad.
"""
for dd in rd:
for m in dd.makes:
m.table.onDisk = False
try:
api.TableForDef(m.table)
m.rowmaker.compileForTableDef(m.table)
finally:
m.table.onDisk = True
return True
[docs]def validateOtherCode(rd, args):
"""tries to compile other pieces of code in an RD and bails out
if one is bad.
"""
retval = True
for suite in rd.tests:
for test in suite.tests:
try:
test.compile()
except Exception as msg:
outputError(rd.sourceId, "Bad test '%s': %s"%(test.title,
msg))
retval = False
for svc in rd.services:
for outputField in svc.getCurOutputFields():
if outputField.formatter:
try:
htmltable._compileRenderer(outputField.formatter, None, rd)
except Exception as msg:
outputError(rd.sourceId, "Bad formatter on output field '%s': %s"%(
outputField.name, msg))
retval = False
if isinstance(svc.core, datalink.DatalinkCore):
try:
if "dlmeta" in svc.allowed:
svc.core.descriptorGenerator.compile(svc.core)
if "dlget" in svc.allowed:
for df in svc.core.dataFunctions:
df.compile(svc.core)
svc.core.dataFormatter.compile(svc.core)
except Exception as msg:
outputError(rd.sourceId, "Bad datalink function in service '%s': %s"%(
svc.id, msg))
if isinstance(msg, base.BadCode):
outputError(rd.sourceId, "Bad code:\n%s"%msg.code)
retval = False
for job in rd.jobs:
try:
job.job.compile(parent=rd)
except Exception as msg:
outputError(rd.sourceId, "Bad code in job '%s': %s"%(
job.title, msg))
retval = False
# TODO: iterate over service/cores and standalone cores and
# fiddle out condDescs
# TODO: Iterate over scripts and data/make/scripts, see which
# are python and try to compile them
# TODO: Iterate over grammars and validate rowfilters
return retval
[docs]def validateTables(rd, args):
"""does some sanity checks on the (top-level) tables within rd.
"""
valid = True
identifierSymbol = adql.getSymbols()["identifier"]
for td in rd.tables:
curTableName = td.getQName()
try:
base.validateStructure(td)
except api.MetaValidationError as ex:
valid = False
outputError(rd.sourceId, "Missing metadata in"
" table %s:\n%s"%(td.id, str(ex)))
continue # further checks will just add verbosity
for col in td:
try:
if col.unit:
parsedUnit = api.parseUnit(col.unit)
if not parsedUnit.isValid:
outputWarning(rd.sourceId,
f"Column {curTableName}.{col.name}: Unit {col.unit} is"
" not interoperable (in VOUnit's list of recognised units)")
except api.BadUnit:
valid = False
outputError(rd.sourceId, "Bad unit in table %s, column %s: %s"%(
curTableName, col.name, repr(col.unit)))
if col.ucd and not col.ucd in UCD_WHITELIST:
try:
astropyucd.parse_ucd(col.ucd,
check_controlled_vocabulary=True,
has_colon=True)
except ValueError as msg:
outputWarning(rd.sourceId,
f"Column {curTableName}.{col.name}: UCD {col.ucd}"
f" not accepted by astropy ({msg}).")
try:
identifierSymbol.parseString(str(col.name), parseAll=True)
except base.ParseException:
outputWarning(rd.sourceId, "Column %s.%s: Name is not a regular"
" ADQL identifier."%(td.id, col.name))
if td.onDisk and args.compareDB:
with base.getTableConn() as conn:
q = base.UnmanagedQuerier(conn)
if q.getTableType(curTableName) is not None:
t = api.TableForDef(td, connection=conn)
try:
t.ensureOnDiskMatches()
except api.DataError as msg:
valid = False
outputError(rd.sourceId,
utils.makeEllipsis(utils.safe_str(msg), 160))
# associated datalink services and the columns must exist.
for dldef in td.iterMeta("_associatedDatalinkService"):
try:
_ = base.resolveId(td.rd,
base.getMetaText(dldef, "serviceId"))
_ = td.getColumnByName(base.getMetaText(dldef, "idColumn"))
_
except (base.NotFoundError, base.MetaError) as msg:
valid = False
outputError(rd.sourceId,
utils.makeEllipsis(utils.safe_str(msg), 160))
if td.registration:
registryRecord = None
try:
registryRecord = builders.getVORMetadataElement(td)
except Exception as msg:
valid = False
outputError(rd.sourceId,
"Table publication of %s could not be built: %s"%(
td.id, str(msg)))
if registryRecord is not None:
try:
_XSD_VALIDATOR.assertValidates(
registryRecord.render(), leaveOffending=True)
except AssertionError as msg:
valid = False
outputError(rd.sourceId, "Invalid registry record for table"
" %s:\n%s"%(td.id, str(msg)))
return valid
[docs]def validateOne(rdId, args):
"""outputs to stdout various information on the RD identified by rdId.
"""
with testtricks.collectedEvents("Info", "Warning") as warnings:
rd = loadRD(rdId)
if rd is None:
return
for warning in warnings:
outputWarning(rdId, warning[1])
if args.runTests:
TestsCollector.addRD(rd)
validSoFar = validateServices(rd, args)
validSoFar = validSoFar and validateRowmakers(rd, args)
validSoFar = validSoFar and validateTables(rd, args)
validSoFar = validSoFar and validateOtherCode(rd, args)
validSoFar = validSoFar and validateRST(rd, args)
return validSoFar
[docs]def validateAll(args):
"""validates all accessible RDs.
"""
if args.rd[0]=="ALL":
rdSource = registry.findPublishedRDs()
else:
rdSource = registry.findAllRDs()
for rdId in rdSource:
if args.verbose:
sys.stdout.write(rdId+" ")
sys.stdout.flush()
try:
validateOne(rdId, args)
except Exception:
sys.stderr.write("Severe error while validating %s:\n"%rdId)
traceback.print_exc()
if args.verbose:
sys.stdout.write("\n")
[docs]def parseCommandLine():
parser = argparse.ArgumentParser(description="Check RDs for well-formedness"
" and some aspects of VO-friendlyness")
parser.add_argument("rd", nargs="+", type=str,
help="RD identifier or file system path. Use magic value ALL to"
" check all published RDs, ALL_RECURSE to look for RDs in the file"
" system.")
parser.add_argument("-p", "--pre-publication", help="Validate"
" as if all services were IVOA published even if they are not"
" (this may produce spurious errors if unpublished services are in"
" the RD).",
action="store_true", dest="prePublication")
parser.add_argument("-v", "--verbose", help="Talk while working",
action="store_true", dest="verbose")
parser.add_argument("-t", "--run-tests", help="Run regression tests"
" embedded in the checked RDs", action="store_true", dest="runTests")
parser.add_argument("-T", "--timeout", help="When running tests, abort"
" and fail requests after inactivity of SECONDS",
action="store", dest="timeout", type=int, default=15, metavar="SECONDS")
parser.add_argument("-c", "--compare-db", help="Also make sure that"
" tables that are on disk (somewhat) match the definition in the RD.",
action="store_true", dest="compareDB")
return parser.parse_args()
[docs]def main():
base.VALIDATING = True
args = parseCommandLine()
if len(args.rd)==1 and args.rd[0] in ("ALL", "ALL_RECURSE"):
validateAll(args)
else:
for rd in args.rd:
print(rd, "--", end=' ')
sys.stdout.flush()
if validateOne(rd, args):
print("OK")
else:
print("Fail")
if args.runTests:
print("\nRunning regression tests\n")
from gavo.rscdef import regtest
runner = regtest.TestRunner(TestsCollector.testsToRun,
verbose=False, timeout=args.timeout)
runner.runTests(showDots=True)
print(runner.stats.getReport())
if runner.stats.fails:
print("\nThe following tests failed:\n")
print(runner.stats.getFailures())