"""
A UWS-based interface to datalink.
TODO: There's quite a bit of parallel between this and useruws. This
should probably be reformulated along the lines of useruws.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import functools
import datetime
from twisted.web import server
from gavo import base
from gavo import svcs
from gavo import rscdesc #noflake: cache registration
from gavo.protocols import products
from gavo.protocols import uws
from gavo.protocols import uwsactions
# TODO: We're not supposed to import from helpers in main code. Fix
# this.
from gavo.helpers import testtricks
[docs]class DLTransitions(uws.ProcessBasedUWSTransitions):
"""The transition function for datalink jobs.
"""
def __init__(self):
uws.ProcessBasedUWSTransitions.__init__(self, "DL")
[docs] def queueJob(self, newState, wjob, ignored):
uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored)
return self.startJob(uws.EXECUTING, wjob, ignored)
[docs] def getCommandLine(self, wjob):
return "gavo", ["gavo", "dlrun", "--", str(wjob.jobId)]
[docs]class DLJob(uws.UWSJobWithWD):
"""a UWS job performing some datalink data preparation.
In addition to UWS parameters, it has
* serviceid -- the fully qualified id of the service that will process
the request
* datalinkargs -- the parameters (in request.args form) of the
datalink request.
"""
_jobsTDId = "//datalink#datalinkjobs"
_transitions = DLTransitions()
[docs]class DLUWS(uws.UWS):
"""the worker system for datalink jobs.
"""
joblistPreamble = ("<?xml-stylesheet href='/static"
"/xsl/dlasync-joblist-to-html.xsl' type='text/xsl'?>")
jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/"
"dlasync-job-to-html.xsl' type='text/xsl'?>")
_baseURLCache = None
def __init__(self):
uws.UWS.__init__(self, DLJob, uwsactions.JobActions())
@property
def baseURL(self):
return base.makeAbsoluteURL("datalinkuws")
@functools.cached_property
def parameterGrammar(self):
return base.parseFromString(svcs.ContextGrammar,
"""<contextGrammar>
<inputKey name="dlargs" type="raw"
description="All datalink parameters. These will only
be validated on execution. See the datalink descriptor
for the actual parameters usable here. You cannot pass
this in."
multiplicity="forced-single"/>
<inputKey name="serviceid" type="text"
description="The id of the service processing this. You
cannot pass this in; it is overridden by the renderer."
multiplicity="forced-single"/>
</contextGrammar>""")
[docs] def getURLForId(self, jobId):
"""returns a fully qualified URL for the job with jobId.
"""
return "%s/%s"%(self.baseURL, jobId)
DL_WORKER = DLUWS()
####################### twisted.web simulation
# This is so we can handle t.w resources coming back from datalink.
# Factor this out? This is essentially stolen from trialhelpers,
# and we might just put that somewhere where it's useful.
import warnings
from twisted.internet import defer
from twisted.internet import reactor
def _requestDone(result, request):
"""essentially calls render on result and stops the reactor.
This is a helper for our t.w simulation.
"""
if isinstance(result, str):
if result:
request.write(result)
else:
warnings.warn("Unsupported async datalink render result: %s"%repr(result))
request.deferred.callback(request.accumulator)
reactor.stop()
return request.accumulator, request
[docs]class WritingFakeRequest(testtricks.FakeRequest):
"""a simulator for actual t.w requests.
We want this here as we're rendering to a UWS result file
with the same code that renders to web requests.
One could probably go a lot simpler than testtricks.FakeRequest,
but since the code is there anyway, it probably doesn't hurt
to use it in case we want to do fancier things in the future.
The one thing I have to change vs. testtricks is that we want
to write to files.
"""
def __init__(self, destFile):
self.destFile = destFile
testtricks.FakeRequest.__init__(self, "")
[docs] def write(self, stuff):
self.destFile.write(stuff)
[docs] def finish(self):
self.deferred.callback(None)
self.destFile.close()
reactor.stop()
[docs]def writeResultTo(page, destFile):
"""arranges for the result of rendering the twisted.web resource
to be written to destFile.
This uses a very simple simulation of t.w rendering, so a few
tricks are possible. Also, it actually runs a reactor to do its magic.
Do not run this in a running DaCHS server; it has its own reactor
running. This is only for dachs dlrun code.
TODO: There's proabably code for this in t.w.
"""
def _(func, req):
try:
res = func(req)
except Exception:
request.finish()
raise
if res==server.NOT_DONE_YET:
# resource will finish the request itself later
return
try:
if res:
req.write(res)
finally:
req.finish()
request = WritingFakeRequest(destFile)
reactor.callWhenRunning(_, page.render, request)
reactor.run()
return request
####################### CLI
[docs]def parseCommandLine():
import argparse
parser = argparse.ArgumentParser(description="Run an asynchronous datalink"
" job (used internally)")
parser.add_argument("jobId", type=str, help="UWS id of the job to run")
return parser.parse_args()
[docs]def main():
args = parseCommandLine()
jobId = args.jobId
try:
job = DL_WORKER.getJob(jobId)
with job.getWritable() as wjob:
wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow())
args = job.parameters["dlargs"]
service = base.resolveCrossId(job.parameters["serviceid"])
data = service.run("dlget", args, svcs.emptyQueryMeta)
# Unfortunately, datalink cores can in principle return all kinds
# of messy things that may not even be representable in plain files
# (e.g., t.w resources returning redirects). We hence only
# handle (mime, payload) and (certain) Product instances here
# and error out otherwise.
if isinstance(data, tuple):
mime, payload = data
with job.openResult(mime, "result") as destF:
destF.write(payload)
elif isinstance(data, products.ProductBase):
# We could run render and grab the content-type from there
# (which probably would be better all around). For now, don't
# care:
with job.openResult("application/octet-stream", "result") as destF:
for chunk in data.iterData():
destF.write(chunk)
elif hasattr(data, "render"):
# these are t.w. resources. Let's run a reactor so these properly
# work.
with job.openResult(type, "result") as destF:
req = writeResultTo(data, destF)
job.fixTypeForResultName("result",
req.responseHeaders.getRawHeaders("content-type")[0])
else:
raise NotImplementedError("Cannot handle a service %s result yet."%
repr(data))
with job.getWritable() as wjob:
wjob.change(phase=uws.COMPLETED)
except SystemExit:
pass
except uws.JobNotFound:
base.ui.notifyInfo("Giving up non-existing datalink job %s."%jobId)
except Exception as ex:
base.ui.notifyError("Datalink runner %s major failure"%jobId)
# try to push job into the error state -- this may well fail given
# that we're quite hosed, but it's worth the try
DL_WORKER.changeToPhase(jobId, uws.ERROR, ex)
raise
if __name__=="__main__": # pagma: no cover
# silly test code, not normally reached
from twisted.web import resource
import os
class _Foo(resource.Resource):
def __init__(self, stuff):
self.stuff = stuff
def render(self, request):
if self.stuff=="booga":
return b"abc"
else:
return defer.maybeDeferred(_Foo, "booga").addBoth(self.cleanup)
def cleanup(self, res):
print("cleaning up")
return res
with open("bla", "w") as f:
writeResultTo(_Foo("ork"), f)
with open("bla") as f:
print(f.read())
os.unlink("bla")