"""
Execution of UWS (right now, TAP only) requests.
This mainly intended to be exec'd (through some wrapper) by the queue
runner in the main server thread. The jobs executed have to be in
the database and have to have a job directory.
Primarily for testing an alternative interface rabRun exists that takes that
takes jobid, and parameters.
The tap runner takes the job to EXECUTING shortly before sending the
query to the DB server. When done, the job's state is one of COMPLETED,
ABORTED or ERROR.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import datetime
import os
import sys
import time
from gavo import base
from gavo import formats
from gavo import rsc
from gavo import svcs
from gavo import utils
from gavo import votable
from gavo.base import valuemappers
from gavo.formats import texttable #noflake: format registration
from gavo.formats import csvtable #noflake: format registration
from gavo.formats import jsontable #noflake: format registration
from gavo.formats import geojson #noflake: format registration
from gavo.formats import votableread
from gavo.formats import votablewrite
from gavo.protocols import adqlglue
from gavo.protocols import simbadinterface #noflake: UDF registration
from gavo.protocols import tap
from gavo.protocols import uws
# set to true by the signal handler
EXIT_PLEASE = False
# The pid of the worker db backend. This is used in the signal handler
# when it tries to kill the running query.
_WORKER_PID = None
# test instrumentation
class _MessilyCrash(Exception):
pass
def _assertSupportedLanguage(jobId, langSpec):
"""raises a UWSError if langSpec ("ADQL-3.1") cannot be processed by
this service.
"""
if "-" in langSpec:
name, version = langSpec.split("-", 1)
else:
name, version = langSpec, None
if name not in tap.SUPPORTED_LANGUAGES:
raise uws.UWSError("This service does not support"
" the query language %s"%name, jobId)
if version is not None:
if version not in tap.SUPPORTED_LANGUAGES[name]["versions"]:
raise uws.UWSError("This service does not support"
" version %s of the query language %s (but some other version;"
" see capabilities)."%(version, name), jobId)
def _parseTAPParameters(jobId, parameters):
"""gets and checks TAP parameters like version, request, and such.
The function returns a tuple of query and maxrec.
Since TAP is now a core, much of this is a no-op, as the parsing
has been done by the context grammar. However, since the lang
rules are a bit nightmarish, I still keep things in this extra
function.
"""
try:
if parameters["request"]!="doQuery":
raise uws.UWSError("This service only supports REQUEST=doQuery", jobId)
_assertSupportedLanguage(jobId, parameters["lang"])
query = parameters["query"]
except KeyError as key:
raise base.ui.logOldExc(base.ValidationError(
"Required parameter '%s' missing."%key, key))
try:
if parameters["maxrec"] is None:
maxrec = base.getConfig("async", "defaultMAXREC")
else:
maxrec = parameters["maxrec"]
maxrec = min(base.getConfig("async", "hardMAXREC"), maxrec)
except ValueError:
raise base.ui.logOldError(
uws.UWSError("Invalid MAXREC literal '%s'."%parameters["maxrec"]))
return query, maxrec
def _makeDataFor(resultTable):
"""returns an rsc.Data instance containing resultTable and some
additional metadata.
"""
resData = rsc.wrapTable(resultTable)
resData.contributingMetaCarriers.append(base.resolveCrossId("//tap#run"))
# secret handshake with adqlglue: Add all tables of which we know that
# contributed to the query result to the meta sources for Data Origin
if hasattr(resultTable.tableDef, "contributingTables"):
resData.contributingMetaCarriers.extend(
resultTable.tableDef.contributingTables)
resData.addMeta("info", "Query successful",
infoName="QUERY_STATUS", infoValue="OK")
resData.setMeta("_type", "results")
resData.overflowLimit = resultTable.tableDef.overflowLimit
return resData
[docs]def writeResultTo(format, res, outF):
# special-case votable formats to handle overflow conditions and such
if format.startswith("votable"):
# the following duplicates a mapping from votablewrite; that's
# bad, and I should figure out a way to teach formats.format
# whatever is needed to let it do what we're doing here. Meanwhile:
enc = {
"votable": "binary",
"votableb2": "binary2",
"votabletd": "td",
}.get(format, "td")
oe = votable.OverflowElement(
res.getPrimaryTable().tableDef.overflowLimit,
votable.V.INFO(name="QUERY_STATUS", value="OVERFLOW"))
ctx = votablewrite.VOTableContext(
tablecoding=enc,
acquireSamples=False,
overflowElement=oe)
votablewrite.writeAsVOTable(res, outF, ctx)
else:
formats.formatData(format, res, outF, acquireSamples=False)
def _ingestUploads(job, uploads, connection):
"""ingests uploads (in the format left by procotols.tap) into
temporary tables in connection.
"""
tds = []
for destName, src in uploads or []:
if src.startswith("file://"):
# here, these are always direct children of the job directory;
# as a bit of defense, we only use the last path segment.
srcF = job.openFile(src.split("/")[-1])
else:
try:
srcF = utils.urlopenRemote(src)
except IOError as ex:
raise base.ui.logOldExc(
base.ValidationError("Upload '%s' cannot be retrieved"%(
src), "UPLOAD", hint="The I/O operation failed with the message: "+
str(ex)))
if valuemappers.needsQuoting(destName):
raise base.ValidationError("'%s' is not a valid table name on"
" this site"%destName, "UPLOAD", hint="It either contains"
" non-alphanumeric characters or conflicts with an ADQL"
" reserved word. Quoted table names are not supported"
" at this site.")
try:
uploadedTable = votableread.uploadVOTable(destName, srcF, connection,
nameMaker=votableread.AutoQuotedNameMaker())
except Exception as ex:
raise base.ui.logOldExc(
base.ReportableError(f"While ingesting upload {destName}: {ex}"))
if uploadedTable is not None:
tds.append(uploadedTable.tableDef)
srcF.close()
return tds
def _noteWorkerPID(conn):
"""stores conn's worker PID in _WORKER_PID.
"""
global _WORKER_PID
curs = conn.cursor()
curs.execute("SELECT pg_backend_pid()")
_WORKER_PID = curs.fetchall()[0][0]
curs.close()
def _hangIfMagic(jobId, parameters, timeout):
# Test instrumentation. There are more effective ways to DoS me.
if parameters.get("query")=="JUST HANG around":
with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
job.change(executionDuration=timeout,
phase=uws.EXECUTING)
time.sleep(timeout)
with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
job.change(phase=uws.COMPLETED,
endTime=datetime.datetime.utcnow())
sys.exit()
if parameters.get("query")=="MESSILY CRASH":
raise _MessilyCrash()
[docs]def getQTableFromJob(parameters, job, queryProfile, timeout):
"""returns a QueryTable for a TAP job.
"""
query, maxrec = _parseTAPParameters(job.jobId, parameters)
connectionForQuery = base.getDBConnection(queryProfile)
try:
_noteWorkerPID(connectionForQuery)
except: # Don't fail just because we can't kill workers
base.ui.notifyError(
f"Could not obtain PID for the worker, job {job.jobId}")
tdsForUploads = _ingestUploads(job, parameters["upload"],
connectionForQuery)
return adqlglue.runTAPQuery(query, timeout, connectionForQuery,
tdsForUploads, maxrec)
[docs]def runTAPJobNoState(parameters, jobId, queryProfile, timeout):
"""executes a TAP job defined by parameters and writes the
result to the job's working directory.
This does not do state management. Use runTAPJob if you need it.
"""
_hangIfMagic(jobId, parameters, timeout)
# The following makes us bail out if a bad format was passed -- no
# sense spending the CPU on executing the query then, so we get the
# format here.
defaultFormat = "votable"
if base.getConfig("ivoa", "votDefaultEncoding")=="td":
defaultFormat = "votable/td"
rawFormat = (parameters.get("responseformat")
or parameters.get("format")
or defaultFormat)
format = formats.getKeyFor(rawFormat)
try:
job = tap.WORKER_SYSTEM.getJob(jobId)
res = _makeDataFor(getQTableFromJob(
parameters, job, queryProfile, timeout))
destF = job.openResult(
formats.getMIMEFor(
format,
rawFormat),
"result")
writeResultTo(format, res, destF)
destF.close()
except Exception:
# DB errors can occur here since we're streaming directly from
# the database.
svcs.mapDBErrors(*sys.exc_info())
# connectionForQuery closed by QueryTable
[docs]def runTAPJob(jobId, queryProfile="untrustedquery"):
"""executes a TAP job defined by parameters and job id.
This assumes the job has already been put into executing, and the
appropriate pid has been entered. To indicate that actual processing
has started and the job is killable, the start time is recorded, though.
"""
with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
job.change(startTime=datetime.datetime.utcnow())
timeout = job.executionDuration
parameters = job.parameters
try:
runTAPJobNoState(parameters, jobId, queryProfile, timeout)
except Exception as ex:
if not isinstance(ex, base.Error):
base.ui.notifyError("While executing TAP job %s: %s"%(jobId, ex))
tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex)
else:
tap.WORKER_SYSTEM.changeToPhase(jobId, uws.COMPLETED, None)
[docs]def runSyncTAPJob(jobId, queryMeta=None):
"""executes a TAP job synchronously.
When this is done, the job will be in an end state, i.e., ERROR,
COMPLETED or perhaps ABORTED.
You must call tap.WORKER_SYSTEM.destroy(jobId) when done yourself.
Essentially, this puts the job into EXECUTING and declares the
pid as -1. The UWS machinery will happily kill a job with a pid
when asked to abort such a job, and since sync jobs run with the
server's pid, that's really not what we want (conversely: sync
jobs can't really be aborted). Anyway: Do *not* put anything
getpid returns into a sync job's pid field.
"""
execTime = base.getConfig("async", "defaultExecTimeSync")
with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
job.change(
executionDuration=execTime,
phase=uws.EXECUTING,
pid=-1)
runTAPJob(jobId)
############### CLI
[docs]def setINTHandler(jobId):
"""installs a signal handler that pushes our job to aborted on SIGINT.
"""
import signal
def handler(signo, frame):
global EXIT_PLEASE
EXIT_PLEASE = True
signal.signal(signal.SIGINT, handler)
def _killWorker(jobId):
"""tries to kill the postgres worker for this job.
"""
with tap.WORKER_SYSTEM.changeableJob(jobId) as wjob:
wjob.change(phase=uws.ABORTED)
if _WORKER_PID:
base.ui.notifyInfo("Trying to abort %s, wpid %s"%(
jobId, _WORKER_PID))
with base.getUntrustedConn() as conn:
curs = conn.cursor()
curs.execute("SELECT pg_cancel_backend(%d)"%_WORKER_PID)
curs.close()
[docs]def joinInterruptibly(t, jobId):
while True:
t.join(timeout=0.5)
if not t.is_alive():
return
if EXIT_PLEASE:
_killWorker(jobId)
sys.exit(2)
def _runInThread(target, jobId):
# The standalone tap runner must run the query in a thread since
# it must be able to react to a SIGINT.
import threading
t = threading.Thread(target=target)
t.setDaemon(True)
t.start()
try:
joinInterruptibly(t, jobId)
except (SystemExit, Exception):
# give the thread a chance to quit cleanly
t.join(1)
raise
[docs]def parseCommandLine():
from optparse import OptionParser
parser = OptionParser(usage="%prog <jobid>",
description="runs the TAP job with <jobid> from the UWS table.")
opts, args = parser.parse_args()
if len(args)!=1:
parser.print_help(file=sys.stderr)
sys.exit(1)
return opts, args[0]
[docs]def main():
"""causes the execution of the job with jobId sys.argv[0].
"""
from gavo import rscdesc #noflake: cache registration
# there's a problem in CLI behaviour in that if anything goes wrong in
# main, a job that may have been created will remain QUEUED forever.
# There's little we can do about that, though, since we cannot put
# a job into ERROR when we don't know its id or cannot get it from the DB.
try:
base.DEBUG = False
opts, jobId = parseCommandLine()
setINTHandler(jobId)
try:
os.setsid() # we don't want to be killed if the server is restarted.
# The new server will pick us up.
except PermissionError:
# we already are the process group leader.
pass
try:
_runInThread(lambda: runTAPJob(jobId), jobId)
base.ui.notifyInfo("taprunner for %s finished"%jobId)
except SystemExit:
pass
except uws.JobNotFound: # someone destroyed the job before I was done
errmsg = "Giving up non-existing TAP job %s."%jobId
base.ui.notifyInfo(errmsg)
except _MessilyCrash:
# test instrumentation: simulate a severe internal error
base.ui.notifyInfo("taprunner was asked to crash messily")
sys.exit(1)
except Exception as ex:
base.ui.notifyError("taprunner %s major failure"%jobId)
# try to push job into the error state -- this may well fail given
# that we're quite hosed, but it's worth the try
tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex)
raise
finally:
pass