"""
A special renderer for testish things requiring the full server to be up
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from twisted.internet import reactor
from twisted.web import resource
from twisted.web import template
from twisted.web.template import tags as T
from gavo import base
from gavo.formal import nevowc
from gavo.svcs import streaming
from gavo.web import common
[docs]class FooPage(resource.Resource):
"""is the most basic page conceivable.
"""
loader = common.doctypedStan(T.html[
T.head[
T.title["A page"],
],
T.body[
T.p["If you see this, you had better know why."]]])
[docs]class StreamerPage(resource.Resource):
"""is a page that delivers senseless but possibly huge streams of
data through streaming.py
"""
[docs] def render(self, request):
dataSize = int(request.strargs.get("size", [300])[0])
chunkSize = int(request.strargs.get("chunksize", [1000])[0])
def writeNoise(f):
for i in range(dataSize//chunkSize):
f.write(b"x"*chunkSize)
lastPayload = b"1234567890"
toSend = dataSize%chunkSize
f.write(lastPayload*(toSend//10)+lastPayload[:toSend%10])
return streaming.streamOut(writeNoise, request)
[docs]class Delay(resource.Resource):
"""A page returning some data after some time has passed.
"""
[docs] def render(self, request):
from twisted.internet import task
delay = int(request.strargs.get("seconds", [10])[0])
request.setHeader("content-type", "text/plain")
return task.deferLater(reactor, delay, lambda: b"Hello world\n")
[docs]class Block(resource.Resource):
"""A page blocking the entire server, including signals, for a while.
"""
[docs] def render(self, request):
import os
request.setHeader("content-type", "text/plain")
os.system("sleep 20")
return b"Living again\n"
[docs]class StreamerCrashPage(resource.Resource):
"""is a page that starts streaming out data and then crashes.
"""
[docs] def render(self, request):
request.setHeader("content-type", "text/plain")
def writeNoise(f):
f.buffer.chunkSize = 30
f.write("Here is some data. (and some more, just to cause a flush)\n")
raise Exception
return streaming.streamOut(writeNoise, request)
[docs]class ExitPage(resource.Resource):
"""A page that lets the server exit (useful for profiling).
"""
[docs] def render(self, request):
request.setHeader("content-type", "text/plain")
reactor.callLater(1, lambda: reactor.stop())
return b"The server is now exiting in 1 second."
[docs]class RenderCrashPage(resource.Resource):
"""A page that crashes during render.
"""
[docs] @template.renderer
def crash(self, ctx, data):
try:
raise Exception("Wanton crash")
except:
import traceback
traceback.print_exc()
raise
loader = common.doctypedStan(T.html[
T.head[
T.title["A page"],
],
T.body[
T.p(render="crash")["You should not see this"]]])
[docs]class BadGatewayPage(resource.Resource):
"""A page that returns a 502 error.
"""
[docs] def render(self, request):
request.setResponseCode(502, message="Bad Gateway")
request.setHeader("content-type", "text/plain")
return b"Bad Gateway"
[docs]class ServiceUnloadPage(resource.Resource):
"""a page that clears the services RD.
"""
[docs] def render(self, request):
request.setHeader("content-type", "text/plain")
base.caches.clearForName("__system__/services")
return b"Cleared the services RD"
class _UnrenderableException(Exception):
def __str__(self):
ddt #noflake: intended f'up.
[docs]class BlowUpErrorPage(resource.Resource):
"""a page that produces an unrenderable error when rendering.
"""
[docs] def render_GET(self, request):
raise _UnrenderableException()
[docs]class Tests(nevowc.TemplatedPage):
def __init__(self):
resource.Resource.__init__(self)
self.putChild(b"foo", FooPage())
self.putChild(b"stream", StreamerPage())
self.putChild(b"streamcrash", StreamerCrashPage())
self.putChild(b"rendercrash", RenderCrashPage())
self.putChild(b"badgateway", BadGatewayPage())
self.putChild(b"block", Block())
self.putChild(b"exit", ExitPage())
self.putChild(b"clearservice", ServiceUnloadPage())
self.putChild(b"delay", Delay())
self.putChild(b"blowuperror", BlowUpErrorPage())
loader = common.doctypedStan(T.html[
T.head[
T.title["Wrong way"],
],
T.body[
T.p["There is nothing here. Trust me."]]])