"""
TAP: schema maintenance, job/parameter definition incl. upload and UWS actions.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import datetime
import functools
import os
import signal
from twisted.internet import threads
from gavo import base
from gavo import formats
from gavo import rsc
from gavo import svcs
from gavo import utils
from gavo.protocols import taprunner
from gavo.protocols import uws
from gavo.protocols import uwsactions
from gavo.utils.parsetricks import ParseException
TAP_VERSION = "1.1"
RD_ID = "__system__/tap"
# used in the computation of quote
EST_TIME_PER_JOB = datetime.timedelta(minutes=10)
def _R(**kws): return kws
# this is used below in for registry purposes (values are pairs of
# IVOA id and a human-readable label).
SUPPORTED_LANGUAGES = {
"ADQL": _R(versions={
"2.0": "ivo://ivoa.net/std/ADQL#v2.0",
"2.1": "ivo://ivoa.net/std/ADQL#v2.1"},
description="The Astronomical Data Query Language is the standard"
" IVOA dialect of SQL; it contains a very general SELECT statement"
" as well as some extensions for spherical geometry and higher"
" mathematics.")}
# A list of supported upload methods. This is only used in the registry
# interface right now.
UPLOAD_METHODS = {
"upload-inline": "POST inline upload",
"upload-http": "http URL",
"upload-https": "https URL",
"upload-ftp": "ftp URL",
}
[docs]class TAPError(uws.UWSError):
"""here for backward compatibility.
Deprecated.
"""
######################## registry interface helpers
[docs]def getSupportedLanguages():
"""returns a list of tuples for the supported languages.
This is SUPPORTED_LANGUAGES in a format suitable for the
TAP capabilities element.
Each tuple returned is made up of
(name, description, [(version, ivo-id)...]).
"""
for name, desc in SUPPORTED_LANGUAGES.items():
yield (name, desc["description"], list(desc["versions"].items()))
######################## maintaining TAP schema
def _insertRDIntoTAP_SCHEMA(rd, connection):
"""helps publishToTAP.
Actually, it does all its work, except not rejecting //tap itself.
This is an implementation detail of letting //tap#createSchema's
postCreation script do its work.
"""
# first check if we have any adql tables at all, and don't attempt
# anything if we don't (this is cheap optimizing and keeps TAP_SCHEMA
# from being created on systems that don't do ADQL).
for table in rd.tables:
# the readProfile condition in the next line is a proxy for adql=hidden.
if table.adql or "untrustedquery" in table.readProfiles:
break
else:
return
tapRD = base.caches.getRD(RD_ID)
for ddId in ["importTablesFromRD", "importDMsFromRD", "importColumnsFromRD",
"importFkeysFromRD", "importGroupsFromRD"]:
dd = tapRD.getById(ddId)
rsc.makeData(dd, forceSource=rd, parseOptions=rsc.parseValidating,
connection=connection, runCommit=False)
# finally, remove schemas that don't have any tables (which happens
# with adql-hidden tables, when moving RDs, etc. Such leftovers
# are trouble because they're in TAP_SCHEMA but not in /tables any more.
connection.execute("""
DELETE FROM tap_schema.schemas
WHERE
schema_name IN (
SELECT schema_name
FROM tap_schema.schemas
LEFT OUTER JOIN tap_schema.tables USING (schema_name)
WHERE table_name IS NULL)""")
[docs]def publishToTAP(rd, connection):
"""publishes info for all ADQL-enabled tables of rd to the TAP_SCHEMA.
"""
if rd.sourceId=='__system__/tap':
# if we're being built ourselves, skip this; tap's tap_schema
# maintenance is manual.
return
_insertRDIntoTAP_SCHEMA(rd, connection)
[docs]def unpublishFromTAP(rd, connection):
"""removes all information originating from rd from TAP_SCHEMA.
"""
if rd.sourceId=='__system__/tap':
# if we're being built ourselves, skip this; tap's tap_schema
# maintenance is manual.
return
rd.setProperty("moribund", "True") # the embedded grammar take this
# to mean "kill this"
publishToTAP(rd, connection)
rd.clearProperty("moribund")
[docs]def getAccessibleTables():
"""returns a list of qualified table names for the TAP-published tables.
"""
with base.getTableConn() as conn:
return [r[0]
for r in conn.query("select table_name from tap_schema.tables"
" order by table_name")]
########################## Maintaining TAP jobs
[docs]class TAPTransitions(uws.ProcessBasedUWSTransitions):
"""The transition function for TAP jobs.
There's a hack here: After each transition, when you've released
your lock on the job, call checkProcessQueue (in reality, only
PhaseAction does this).
"""
def __init__(self):
uws.SimpleUWSTransitions.__init__(self, "TAP")
[docs] def getCommandLine(self, wjob):
return "gavo", ["gavo", "--ui", "stingy", "taprun", "--", str(wjob.jobId)]
[docs] def queueJob(self, newState, wjob, ignored):
"""puts a job on the queue.
"""
uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored)
wjob.uws.scheduleProcessQueueCheck()
[docs] def errorOutJob(self, newPhase, wjob, ignored):
uws.SimpleUWSTransitions.errorOutJob(self, newPhase, wjob, ignored)
wjob.uws.scheduleProcessQueueCheck()
[docs] def completeJob(self, newPhase, wjob, ignored):
uws.SimpleUWSTransitions.completeJob(self, newPhase, wjob, ignored)
wjob.uws.scheduleProcessQueueCheck()
[docs] def killJob(self, newPhase, wjob, ignored):
try:
uws.ProcessBasedUWSTransitions.killJob(self, newPhase, wjob, ignored)
finally:
wjob.uws.scheduleProcessQueueCheck()
########################## The TAP UWS job
[docs]@functools.lru_cache(1)
def getUploadGrammar():
from gavo.utils.parsetricks import (Word, ZeroOrMore, Suppress, StringEnd,
alphas, alphanums, CharsNotIn, pyparsingWhitechars)
# Should we allow more tableNames?
with pyparsingWhitechars(" \t"):
tableName = Word( alphas+"_", alphanums+"_" )
# What should we allow/forbid in terms of URIs?
uri = CharsNotIn(" ;,")
uploadSpec = tableName("name") + "," + uri("uri")
uploads = uploadSpec + ZeroOrMore(
Suppress(";") + uploadSpec) + StringEnd()
uploadSpec.addParseAction(lambda s,p,t: (t["name"], t["uri"]))
return uploads
[docs]def parseUploadString(uploadString):
"""iterates over pairs of tableName, uploadSource from a TAP upload string.
"""
try:
res = utils.pyparseString(getUploadGrammar(), uploadString).asList()
return res
except ParseException as ex:
raise base.ValidationError(
"Syntax error in UPLOAD parameter (near %s)"%(ex.loc), "UPLOAD",
hint="Note that we only allow regular SQL identifiers as table names,"
" i.e., basically only alphanumerics are allowed.")
[docs]class LocalFile(object):
"""A sentinel class representing a file within a job work directory
(as resulting from an upload).
"""
# TODO: unify with uws.LocalFile
def __init__(self, jobId, wd, fileName):
self.jobId, self.fileName = jobId, fileName
self.fullPath = os.path.join(wd, fileName)
def __str__(self):
# This is mainly for serialisation of the upload parameter in the
# UWS job.
return f"file://{self.fileName}"
[docs] def getURL(self):
"""returns the URL the file is retrievable under for the life time of
the job.
"""
return base.caches.getRD(RD_ID).getById("run").getURL("async",
absolute=True)+"/%s/results/%s"%(
self.jobId,
self.fileName)
[docs]def mangleUploads(request):
"""parses TAP-compliant UPLOAD specifications in request and return
something that the core can handle.
This is being executed from the sync/async renderers and probably can't be
used anywhere else.
The underlying trouble is that the core cannot see the request any more,
and thus could not resolve param: uploads.
Since this is different in TAP than in DALI, we have extra code here
vs. dali.mangleUploads.
"""
uploadSpec = request.strargs.pop("upload", [])
uploadSpec = ";".join(uploadSpec)
if not uploadSpec:
return
parsed = []
for tableName, upload in parseUploadString(uploadSpec):
if upload.startswith("param:"):
try:
upload = request.files[upload[6:]][0]
parsed.append(
(tableName, (upload.file_name, upload.file_object)))
except KeyError:
raise base.ui.logOldExc(
base.ValidationError(f"No inline upload '{upload[6:]}' found",
"UPLOAD"))
except AttributeError:
raise base.ui.logOldExc(
base.ValidationError(
f"Upload parameter references non-file '{upload[6:]}'",
"UPLOAD"))
else:
parsed.append((tableName, upload))
request.strargs["upload"] = parsed
[docs]class TAPJob(uws.UWSJobWithWD):
_jobsTDId = "//tap#tapjobs"
_transitions = TAPTransitions()
@property
def quote(self):
"""returns an estimation of the job completion.
This currently is very naive: we give each job that's going to run
before this one 10 minutes.
This method needs to be changed when the dequeueing algorithm
is changed.
"""
with base.getTableConn() as conn:
nBefore = self.uws.runCanned('countQueuedBefore',
{'dt': self.destructionTime}, conn)[0]["count"]
return datetime.datetime.utcnow()+nBefore*EST_TIME_PER_JOB
[docs] def prepareForDestruction(self):
if self.phase==uws.EXECUTING:
# since we're about to kill all state anyway, we're just TERM-ing
# the child rather than ask it politely.
# Let's just hope self.phase isn't lying; if it is, we'll possibly
# kill something else...
os.kill(self.pid, signal.SIGTERM)
super().prepareForDestruction()
#################### The TAP worker system
[docs]class PlanAction(uwsactions.JobAction):
"""retrieve a query plan.
This is actually a TAP action; as we add UWSes, we'll need to think
about how we can customize uwsactions my UWS type.
"""
name = "plan"
[docs] def getPlan(self, job, request):
from gavo.protocols import taprunner
qTable = taprunner.getQTableFromJob(job.parameters,
job, "untrustedquery", 1)
request.setHeader("content-type", "text/plain;charset=utf-8")
return qTable, qTable.getPlan()
[docs] def doGET(self, job, request):
return threads.deferToThread(self.getPlan, job, request
).addCallback(
self.formatPlan)
[docs]class TAPUWS(uws.UWSWithQueueing):
"""The UWS responsible for processing async TAP requests.
"""
_baseURLCache = None
joblistPreamble = ("<?xml-stylesheet href='/static"
"/xsl/tap-joblist-to-html.xsl' type='text/xsl'?>")
jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/"
"tap-job-to-html.xsl' type='text/xsl'?>")
def __init__(self):
self.runcountGoal = base.getConfig("async", "maxTAPRunning")
uws.UWSWithQueueing.__init__(self, TAPJob, uwsactions.JobActions(
PlanAction))
@functools.cached_property
def parameterGrammar(self):
"""A grammar to be used to process parameters in UWS calls.
"""
return base.makeStruct(
svcs.ContextGrammar,
inputTD=base.resolveCrossId("//tap#run").core.inputTable)
@property
def baseURL(self):
if self._baseURLCache is None:
self._baseURLCache = base.caches.getRD(
RD_ID).getById("run").getURL("sync")[:-5]
return self._baseURLCache
[docs] def getURLForId(self, jobId):
"""returns a fully qualified URL for the job with jobId.
"""
return "%s/%s/%s"%(self.baseURL, "async", jobId)
WORKER_SYSTEM = TAPUWS()
######################### The TAP core
[docs]class TAPCore(svcs.Core):
"""A core for the TAP renderer.
"""
name_ = "tapCore"
workerSystem = WORKER_SYSTEM
inputTableXML = f"""
<inputTable>
<inputKey name="request" type="text" required="True"
std="True" multiplicity="force-single"
description="Type of operation requested; this can be doQuery
or getCapabilities. Preferably, don't pass it at all.">
<values default="doQuery"/>
</inputKey>
<inputKey name="lang" type="text" required="True"
std="True" multiplicity="force-single"
description="A name of a language that QUERY should be parsed as.
See capabilities for what you can pass in here.">
</inputKey>
<inputKey name="query" type="text" required="True"
std="True" multiplicity="force-single"
description="The query to be executed.">
</inputKey>
<inputKey name="version" type="text"
std="True"
description="Don't use this parameter. You'll only break stuff.">
<values default="{TAP_VERSION}"/>
<preparse>
if input!="{TAP_VERSION}":
raise ValueError(
"Version mismatch; this service only supports"
" TAP version {TAP_VERSION}.")
</preparse>
</inputKey>
<inputKey name="format" type="text"
description="Deprecated alias of RESPONSEFORMAT"/>
<inputKey name="upload" type="raw"
description="A TAP-compliant upload; roughly:
(tablename,source-uri), where source-uri can also have
a param: scheme.">
</inputKey>
<FEED source="//pql#DALIPars">
<PRUNE name="VERB"/>
</FEED>
</inputTable>"""
# The output table is ignored.
[docs] def run(self, service, inputTable, queryMeta):
jobId = WORKER_SYSTEM.getNewIdFromArgs(
{}, inputTable.getParamDict())
try:
taprunner.runSyncTAPJob(jobId, queryMeta)
job = WORKER_SYSTEM.getJob(jobId)
if job.phase==uws.COMPLETED:
# This is TAP, so there's exactly one result
res = job.getResults()[0]
name, type = res["resultName"], res["resultType"]
# hold on to the result fd so its inode is not lost when we delete
# the job.
f = open(os.path.join(job.getWD(), name), "rb")
return (f, type)
elif job.phase==uws.ERROR:
exc = job.error
raise base.Error(exc["msg"], hint=exc["hint"])
elif job.phase==uws.ABORTED:
raise uws.UWSError("Job was manually aborted. For synchronous"
" jobs, this probably means the operators killed it.",
jobId)
else:
raise uws.UWSError("Internal error. Invalid UWS phase.", jobId)
finally:
WORKER_SYSTEM.destroy(jobId)
[docs] def getRelevantTables(self):
tables = []
with base.getTableConn() as conn:
for tableName, in conn.query("SELECT table_name"
" FROM TAP_SCHEMA.tables"):
try:
tables.append(base.getTableDefForTable(conn, tableName))
except:
base.ui.notifyError("Failure trying to retrieve table definition"
" for table %s. Please fix the corresponding RD."%tableName)
return [t for t in tables if t is not None and t.rd is not None]