"""
Support for UWSes defined in user RDs.
To understand this, start at makeUWSForService.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import datetime
import weakref
from gavo import base
from gavo import formats
from gavo import rsc
from gavo import rscdesc #noflake: for registration
from gavo import svcs
from gavo.protocols import uws
from gavo.protocols import uwsactions
[docs]class UserUWSTransitions(uws.ProcessBasedUWSTransitions):
"""The transition function for user-defined UWSes.
"""
def __init__(self):
uws.ProcessBasedUWSTransitions.__init__(self, "User")
[docs] def queueJob(self, newState, wjob, ignored):
"""puts a job on the queue.
"""
uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored)
wjob.uws.scheduleProcessQueueCheck()
[docs] def getCommandLine(self, wjob):
args = ["dachs", "uwsrun", "--", str(wjob.jobId)]
if base.DEBUG:
args[1:1] = ["--debug", "--traceback"]
return "gavo", args
[docs]class UserUWSJobBase(uws.UWSJobWithWD):
"""The base class for the service-specific user UWS jobs.
(i.e., the things that the UserUWSJobFactory spits out)
"""
_transitions = UserUWSTransitions()
_jobsTDId = "//uws#userjobs"
[docs]def makeUserUWSJobClass(service):
"""returns a class object for representing UWS jobs processing requests
for service
"""
class UserUWSJob(UserUWSJobBase):
pass
UserUWSJob._default_jobClass = classmethod(
lambda _, v=service.getFullId(): v)
return UserUWSJob
[docs]class UserUWS(uws.UWSWithQueueing):
"""A UWS for "user jobs", i.e., generic things an a core.
These dynamically create job classes based on the processing core's
parameters. To make this happen, we'll need to override some of the
common UWS functions.
Note: For async operation (without a custom UWS), you can only have
uploads in the upload parameter.
"""
joblistPreamble = ("<?xml-stylesheet href='/static"
"/xsl/useruws-joblist-to-html.xsl' type='text/xsl'?>")
jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/"
"useruws-job-to-html.xsl' type='text/xsl'?>")
def __init__(self, service, jobActions, parameterGrammar):
self.runcountGoal = base.getConfig("async", "maxUserUWSRunningDefault")
self.service = weakref.proxy(service)
uws.UWSWithQueueing.__init__(self,
makeUserUWSJobClass(service), jobActions)
self.parameterGrammar = parameterGrammar
[docs] def getURLForId(self, jobId):
return self.service.getURL("uws.xml")+"/"+jobId
def _getJob(self, jobId, conn, writable=False):
"""returns the named job as uws.UWS._getJob.
However, in a user UWS, there can be jobs from multiple services.
It would be nonsense to load another UWS's job's parameters into our
job class. To prevent this, we redirect if we find the new job's
class isn't ours. On the web interface, that should do the trick.
Everywhere else, this may not be entirely clear but still prevent
major confusion.
This is repeating code from uws.UWS._getJob; some refactoring at
some point would be nice.
"""
statementId = 'getById'
if writable:
statementId = 'getByIdEx'
res = self.runCanned(statementId, {"jobId": jobId}, conn)
if len(res)!=1:
raise uws.JobNotFound(jobId)
if res[0]["jobClass"]!=self.service.getFullId():
raise svcs.WebRedirect(
base.resolveCrossId(res[0]["jobClass"]).getUWS().getURLForId(jobId))
return self.jobClass(res[0], self, writable)
[docs] def getIdsAndPhases(self, *args, **kwargs):
# for user UWSes, we only want jobs from our service in the job
# list resource. We insert extra conditions to the basic queries.
# getById and getAllIds don't change, though, as they're used internally
# and could influence, e.g., queueing and such.
return uws.UWSWithQueueing.getIdsAndPhases(self, *args,
initFragments=["jobClass=%(jobClass)s"],
initPars={"jobClass": self.service.getFullId()},
**kwargs)
[docs]def makeUWSForService(service):
"""returns a UserUWS instance tailored to service.
All these share a jobs table, but the all have different job
classes with the parameters custom-made for the service's core.
A drawback of this is that each UWS created in this way runs the
job table purger again. That shouldn't be a problem per se but
may become cumbersome at some point. We can always introduce a
class Attribute on UserUWS to keep additional UWSes from starting
cron jobs of their own.
"""
return UserUWS(
service,
uwsactions.JobActions(),
base.makeStruct(svcs.ContextGrammar, inputTD=service.core.inputTable))
####################### CLI
[docs]def parseCommandLine():
import argparse
parser = argparse.ArgumentParser(description="Run an asynchronous"
" generic job (used internally)")
parser.add_argument("jobId", type=str, help="UWS id of the job to run")
return parser.parse_args()
[docs]def main():
args = parseCommandLine()
jobId = args.jobId
with base.getTableConn() as conn:
svcId = list(
conn.query("SELECT jobclass FROM uws.userjobs WHERE jobId=%(jobId)s",
{"jobId": jobId}))[0][0]
service = base.resolveCrossId(svcId)
# we're the only ones running, so we're safe using this.
queryMeta = svcs.emptyQueryMeta
try:
job = service.getUWS().getJob(jobId)
with job.getWritable() as wjob:
wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow())
service = base.resolveCrossId(job.jobClass)
inputTable = svcs.CoreArgs(service.core.inputTable,
job.parameters, job.parameters)
inputTable.job = job
data = service._runWithInputTable(
service.core, inputTable, queryMeta)
# Our cores either return a table, a pair of mime and data,
# or None (in which case they added the results themselves)
if isinstance(data, tuple):
mime, payload = data
with job.openResult(mime, "result") as destF:
destF.write(payload)
elif isinstance(data, rsc.Data):
destFmt = inputTable.getParam("responseformat"
) or "application/x-votable+xml"
with job.openResult(destFmt, "result") as destF:
formats.formatData(destFmt, data, destF, False)
elif data is None:
pass
else:
raise NotImplementedError("Cannot handle a service %s result yet."%
repr(data))
with job.getWritable() as wjob:
wjob.change(phase=uws.COMPLETED)
except SystemExit:
pass
except uws.JobNotFound:
base.ui.notifyInfo("Giving up non-existing UWS job %s."%jobId)
except Exception as ex:
base.ui.notifyError("UWS runner %s major failure"%jobId)
# try to push job into the error state -- this may well fail given
# that we're quite hosed, but it's worth the try
service.getUWS().changeToPhase(jobId, uws.ERROR, ex)
raise