"""
Renderers and helpers for asynchronous services.
For TAP (which was the first prototype of these), there's a separate
module using some of this; on the long run, it should probably be
integrated here.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import functools
from twisted.internet import defer
from twisted.python import failure
from twisted.web import resource
from twisted.web import server
from gavo import base
from gavo import svcs
from gavo import utils
from gavo.protocols import dali
from gavo.protocols import dlasync
from gavo.protocols import uws
from gavo.protocols import uwsactions
from gavo.web import grend
from gavo.web import weberrors
[docs]def redirectUWS(baseURL, location):
"""raises an UWS-compatible (303) redirection.
baseURL and location and then just raise svc.Found
The locations used here are relative to baseURL, which essentially
has to be the the full absolute URL of the endpoint (i.e.,
service/renderer). As a special service, for TAP async is being
added as long as the renderer isn't fixed to not do dispatching.
This essentially just mogrifies SeeOther exceptions rendered elsewhere.
It should therefore go at some point (probably when TAP uses the async
renderer).
"""
# TODO: Temporary hack as long as TAP isn't modernized to use
# an async renderer: fix the redirect to TAP's async endpoint if
# baseURL is the TAP renderer:
if baseURL.endswith("tap"):
baseURL = baseURL+"/async"
if location:
if location.startswith("http://") or location.startswith("https://"):
location = str(location)
else:
location = str("%s/%s"%(baseURL, location))
else:
location = str(baseURL)
raise svcs.SeeOther(location)
[docs]class UWSResource(resource.Resource):
"""a resource dealing with a UWS.
It is constructed with a worker system, a concrete renderer, and
the service that executes requests.
It also delivers errors in UWS (well, TAP, actually) style.
"""
def __init__(self, workerSystem, renderer, service):
self.workerSystem, self.service = workerSystem, service
self.renderer = renderer
resource.Resource.__init__(self)
def _deliverError(self, flr, request, httpCode=200):
# auth requests and redirects handled by normal dc methods.
if isinstance(flr.value, (svcs.Authenticate, svcs.SeeOther)):
return weberrors.renderDCErrorPage(flr, request)
else:
if isinstance(flr.value, uws.JobNotFound):
httpCode = 404
else:
base.ui.notifyFailure(flr)
return dali.serveDALIError(request, flr.value, httpCode)
[docs] def render(self, request):
try:
return resource.Resource.render(self, request)
except Exception as ex:
return self._deliverError(failure.Failure(ex), request, httpCode=400)
[docs]class JoblistResource(UWSResource):
"""The web resource corresponding to async root.
GET yields a job list, POST creates a job.
There's an extra hack not in UWS: if get with something like
dachs_authenticate=anything and haven't passed a user, this will ask
for credentials.
"""
[docs] def render_GET(self, request):
if "dachs_authenticate" in request.strargs and not request.getUser():
raise svcs.Authenticate()
request.setHeader("content-type", "text/xml")
args = svcs.CoreArgs.fromRawArgs(
self.getJoblistInputTD(),
request.strargs).args
res = uwsactions.getJobList(self.workerSystem,
request.getAuthUser() or None,
phase=args["PHASE"],
last=args["LAST"],
after=args["AFTER"])
return res
[docs] def render_POST(self, request):
jobId = self.workerSystem.getNewIdFromRequest(request)
redirectUWS(self.service.getURL(self.renderer), str(jobId))
def _deliverResult(self, res, request):
request.setHeader("content-type", "text/xml")
return res
[docs]class JobResource(UWSResource):
"""The web resource corresponding to async requests for jobs.
This currently uses a custom hack for resource resolution and method
dispatch. Let's move it to using twisted resources one day.
"""
def __init__(self, workerSystem, renderer, service, segments):
self.service, self.segments = service, segments
self.workerSystem, self.renderer = workerSystem, renderer
[docs] def render(self, request):
defer.maybeDeferred(
uwsactions.doJobAction,
self.workerSystem, request, self.segments
).addCallback(self._deliverResult, request
).addErrback(self._redirectAsNecessary, request
).addErrback(self._deliverError, request)
return server.NOT_DONE_YET
def _redirectAsNecessary(self, flr, request):
flr.trap(svcs.SeeOther)
redirectUWS(self.service.getURL(self.renderer),
flr.value.rawDest)
def _deliverResult(self, result, request):
if result is server.NOT_DONE_YET:
# the job action is rendering itself -- this is where we'd like
# to go for non-XML replies.
return result
elif isinstance(result, resource.Resource):
# nevow-style returned resource -- we'd like to get rid of that
base.ui.notifyWarning("UWS job resource returned a resource rather than"
" rendering itself. We'd like to stop this.")
return result.render(request)
else:
# convenience function: result must be a stan tree we can just
# render. the content-type is set by uwsaction._JobActions.dispatch
request.write(utils.xmlrender(result))
request.finish()
[docs]def getAsyncResource(
request, workerSystem, renderer, service, firstSegment):
"""returns a UWS-compliant resource for request.
Note: This expects that the renderer has already called uws.prepareRequest.
"""
segments = request.popSegments(firstSegment)
if segments==[""]:
# redirect async/ to async so our style sheets work
raise svcs.Found(request.uri[:-1])
elif not segments:
return JoblistResource(workerSystem, renderer, service)
else:
return JobResource(workerSystem, renderer, service, segments)
[docs]class AsyncRendererBase(grend.ServiceBasedPage):
"""An abstract renderer for things running in a UWS.
To make these concrete, they need a name and a workerSystem attribute.
"""
parameterStyle = "pql"
[docs] def render(self, request):
# We don't do anything ourselves -- everything has to go through
# getAsyncResource and hence getChild
try:
return self.getChild(None, request).render(request)
except Exception as ex:
base.ui.notifyError(f"UWS root render failed: {ex}")
[docs] def getChild(self, name, request):
from gavo.web import asyncrender
if request.prepath[-1]==b"":
# trailing slash: redirect away so our XSLT works properly
raise svcs.Found(b"/".join(request.prepath[:-1]))
try:
uws.prepareRequest(request, self.service)
return asyncrender.getAsyncResource(request,
self.workerSystem,
self.name,
self.service,
name)
except Exception as ex:
base.ui.notifyError(f"UWS child construction failed: {ex}")
return dali.DALIErrorResource(ex)
[docs]class DatalinkAsyncRenderer(AsyncRendererBase):
"""A renderer for asynchronous datalink.
"""
# we need a special case here because this needs to put the
# id of the calling service into strargs. Note that this
# cannot do any uploads in this form, as we are deferring all
# inspection of the arguments to when the job actually runs.
name = "dlasync"
workerSystem = dlasync.DL_WORKER
[docs] def render(self, request):
# This sort of parameter re-packing quite certainly isn't what
# we should be doing on the long run. Let's see what we come
# up with the next time we revisit this.
request._strargs = {
"dlargs": request.strargs,
"serviceid": [self.service.getFullId()]}
return AsyncRendererBase.render(self, request)
[docs]class DALIAsyncRenderer(AsyncRendererBase):
"""A renderer speaking UWS.
This is for asynchronous execution of larger jobs. This is what is executed
by the async renderer. It requests the worker system required from the
service, which in turn obtains it from the core; these must hence
cooperate with this to allow async operation.
See `Custom UWSes`_ for how to use this with your own cores.
"""
name = "async"
aliases = frozenset(["uws.xml"])
@property
def workerSystem(self):
return self.service.getUWS()