"""
Code to add some basic nevow functionality back to twisted.
This needs to be imported as early as possible after importing twisted.web
for the first time.
The features were adding include:
* have nevow tag[] syntax again.
* swallow attributes with None values in tags
* (Ab)use slotData to manage what was nevow:data, pass it into the
render method as tag.slotData
* functions are allowed as render values
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from twisted.web import _flatten
from twisted.web import _stan
from twisted.web import http
try:
# this is where we want to monkeypatch twisted.template
# in twisted 21+
from twisted.web import _template_util as _template_patchtarget
except ImportError:
# this is where to monkeypatch in twisted 20
from twisted.web import template as _template_patchtarget
_ToStan = _template_patchtarget._ToStan
_NSContext = _template_patchtarget._NSContext
[docs]def patchTwTemplate(name, obj):
"""monkeypatches whatever module holds the twisted.web.template
definitions.
"""
setattr(_template_patchtarget, name, obj)
class _NoData(object):
"""A sentinel to signifiy no data was passed to clone.
"""
def __bool__(self):
return False
_StanTag = _stan.Tag
[docs]class Tag(_StanTag):
def __init__(self, tagName, attributes=None, children=None, render=None,
filename=None, lineNumber=None, columnNumber=None, data=None):
children = children or []
attributes = attributes or {}
_StanTag.__init__(self,
tagName=tagName,
attributes=attributes,
children=children,
filename=filename or "unknown",
lineNumber=lineNumber or -1,
columnNumber=columnNumber or -1)
if render is not None:
self.render = render
self.slotData = data
def __getitem__(self, stuff):
if isinstance(stuff, (list, tuple)):
return self(*stuff)
else:
return self(stuff)
def __call__(self, *items, **attrs):
if "render" in attrs:
self.render = attrs.pop("render")
for key, value in list(attrs.items()):
if value is None:
attrs.pop(key)
return _StanTag.__call__(self, *items, **attrs)
def _cloneSlotData(self, deep, newData):
"""returns a clone of this tag's slot data.
This is always a copy if slotData is a dict; otherwise, for
non-deep copying we hope people don't change data and we
save copying. For deep copying, we'll have to copy
sequences, too, because the may contain tags like slot and
such.
"""
if not self.slotData:
return self.slotData
if isinstance(self.slotData, dict):
# t.w code
newslotdata = self.slotData.copy()
for key in newslotdata:
newslotdata[key] = self._clone(newslotdata[key], True, newData)
return newslotdata
if deep:
if isinstance(self.slotData, (list, tuple)):
return self._clone(self.slotData, True, newData)
else:
# we *probably* should still copy this, but that may
# really be expensive, and we should outlaw modifying
# data in render and data methods anyway.
return self.slotData
else:
return self.slotData
def _clone(self, obj, deep, newData):
"""
Clone an arbitrary object; used by L{Tag.clone}.
@param obj: an object with a clone method, a list or tuple, or something
which should be immutable.
@param deep: whether to continue cloning child objects; i.e. the
contents of lists, the sub-tags within a tag.
@return: a clone of C{obj}.
"""
# we override this to set the slotData on tags while we're cloning
if hasattr(obj, 'clone'):
return obj.clone(deep, newData)
elif isinstance(obj, (list, tuple)):
return [self._clone(x, deep, newData) for x in obj]
else:
return obj
[docs] def clone(self, deep=True, newData=_NoData):
"""
Return a clone of this tag. If deep is True, clone all of this tag's
children. Otherwise, just shallow copy the children list without copying
the children themselves.
"""
# We have to override clone because we need to manage slotData
# somewhat more carefully now. This is clone as of 18.9.0,
# with newslotdata code taken out and replaced with a call
# to _cloneSlotData; also, we're passing newData to _clone.
if newData is _NoData:
newData = self._cloneSlotData(deep, newData)
if deep:
newchildren = [self._clone(x, True, newData)
for x in self.children]
else:
newchildren = self.children[:]
newattrs = self.attributes.copy()
for key in list(newattrs.keys()):
newattrs[key] = self._clone(newattrs[key], True, newData)
newtag = Tag(
self.tagName,
attributes=newattrs,
children=newchildren,
render=self.render,
filename=self.filename,
lineNumber=self.lineNumber,
columnNumber=self.columnNumber)
newtag.slotData = newData
return newtag
# we have to monkeypatch a couple of places in twisted.web
_stan.Tag = Tag
patchTwTemplate("Tag", Tag)
_flatten.Tag = Tag
[docs]class Raw(object):
"""a thing to stick into stan trees that can write whatever
it wants into the serialised result.
Just override getContent and return a byte string from it.
"""
[docs] def getContent(self, destFile):
raise NotImplementedError("You have to override getContent.")
# I extremely regret I have to overwrite the complex and fairly
# crazy _flattenElement from t.w._flatten, but managing
# the data attribute in any other way is just mad.
#
# This is form twisted 18.9.0, and we'll have to update it now and then.
#
# Changes are:
# * we evaluate newov:data attributes on tags (exactly like this;
# namespace processing happens in nevow's _ToStan
# * we support Raw instances that can write whatever the
# heck they like.
# * we render ints in the tree
def _flattenElement(request, root, write, slotData, renderFactory,
dataEscaper):
"""
Make C{root} slightly more flat by yielding all its immediate contents as
strings, deferreds or generators that are recursive calls to itself.
@param request: A request object which will be passed to
L{IRenderable.render}.
@param root: An object to be made flatter. This may be of type C{unicode},
L{str}, L{slot}, L{Tag <twisted.web.template.Tag>}, L{tuple}, L{list},
L{types.GeneratorType}, L{Deferred}, or an object that implements
L{IRenderable}.
@param write: A callable which will be invoked with each L{bytes} produced
by flattening C{root}.
@param slotData: A L{list} of L{dict} mapping L{str} slot names to data
with which those slots will be replaced.
@param renderFactory: If not L{None}, an object that provides
L{IRenderable}.
@param dataEscaper: A 1-argument callable which takes L{bytes} or
L{unicode} and returns L{bytes}, quoted as appropriate for the
rendering context. This is really only one of two values:
L{attributeEscapingDoneOutside} or L{escapeForContent}, depending on
whether the rendering context is within an attribute or not. See the
explanation in L{writeWithAttributeEscaping}.
@return: An iterator that eventually yields L{bytes} that should be written
to the output. However it may also yield other iterators or
L{Deferred}s; if it yields another iterator, the caller will iterate
it; if it yields a L{Deferred}, the result of that L{Deferred} will
either be L{bytes}, in which case it's written, or another generator,
in which case it is iterated. See L{_flattenTree} for the trampoline
that consumes said values.
@rtype: An iterator which yields L{bytes}, L{Deferred}, and more iterators
of the same type.
"""
def keepGoing(newRoot, dataEscaper=dataEscaper,
renderFactory=renderFactory, write=write):
return _flattenElement(request, newRoot, write, slotData,
renderFactory, dataEscaper)
if isinstance(root, (bytes, str)):
write(dataEscaper(root))
elif isinstance(root, _flatten.slot):
slotValue = _flatten._getSlotValue(root.name, slotData, root.default)
yield keepGoing(slotValue)
elif isinstance(root, _flatten.CDATA):
write(b'<![CDATA[')
write(_flatten.escapedCDATA(root.data))
write(b']]>')
elif isinstance(root, _flatten.Comment):
write(b'<!--')
write(_flatten.escapedComment(root.data))
write(b'-->')
elif isinstance(root, Tag):
if "nevow:data" in root.attributes:
root = _loadNevowData(request, root, renderFactory)
slotData.append(root.slotData)
if root.render is not None:
rendererName = root.render
rootClone = root.clone(False)
rootClone.render = None
renderMethod = renderFactory.lookupRenderMethod(rendererName)
result = renderMethod(request, rootClone)
yield keepGoing(result)
slotData.pop()
return
if not root.tagName:
yield keepGoing(root.children)
return
write(b'<')
if isinstance(root.tagName, str):
tagName = root.tagName.encode('ascii')
else:
tagName = root.tagName
write(tagName)
for k, v in root.attributes.items():
if isinstance(k, str):
k = k.encode('ascii')
write(b' ' + k + b'="')
# Serialize the contents of the attribute, wrapping the results of
# that serialization so that _everything_ is quoted.
yield keepGoing(
v,
_flatten.attributeEscapingDoneOutside,
write=_flatten.writeWithAttributeEscaping(write))
write(b'"')
if root.children or _flatten.nativeString(tagName) not in _flatten.voidElements:
write(b'>')
# Regardless of whether we're in an attribute or not, switch back
# to the escapeForContent dataEscaper. The contents of a tag must
# be quoted no matter what; in the top-level document, just so
# they're valid, and if they're within an attribute, they have to
# be quoted so that after applying the *un*-quoting required to re-
# parse the tag within the attribute, all the quoting is still
# correct.
yield keepGoing(root.children, _flatten.escapeForContent)
write(b'</' + tagName + b'>')
else:
write(b' />')
elif isinstance(root, (tuple, list, _flatten.GeneratorType)):
for element in root:
yield keepGoing(element)
elif isinstance(root, _flatten.CharRef):
escaped = '&#%d;' % (root.ordinal,)
write(escaped.encode('ascii'))
elif isinstance(root, _flatten.Deferred):
yield root.addCallback(lambda result: (result, keepGoing(result)))
# For now, no coroutines in DaCHS, and stretch twisted doesn't have iscoroutine
# yet.
# elif _flatten.iscoroutine(root):
# d = _flatten.ensureDeferred(root)
# yield d.addCallback(lambda result: (result, keepGoing(result)))
elif isinstance(root, int):
write(str(root).encode("ascii"))
elif _flatten.IRenderable.providedBy(root):
result = root.render(request)
yield keepGoing(result, renderFactory=root)
elif isinstance(root, Raw):
stuff = root.getContent()
assert(isinstance(stuff, bytes))
write(stuff)
elif root is None:
write(b"<<NULL>>")
else:
raise _flatten.UnsupportedType(root)
_flatten._flattenElement = _flattenElement
def _loadNevowData(request, tag, renderFactory):
"""updates slotData on tag from its nevow:data attribute.
This will shallow-copy tag if it changes slotData.
"""
if "nevow:data" not in tag.attributes:
return tag
dataSpec = tag.attributes["nevow:data"]
if callable(dataSpec):
dataFct = dataSpec
else:
dataFct = renderFactory.lookupDataMethod(dataSpec)
newData = dataFct(request, tag)
res = tag.clone(True, newData)
del res.attributes["nevow:data"]
return res
# Until twisted.web has some ok way to deal with file uploads (i.e.
# multipart request bodies) again, fake it using python3-multipart.
# When constructing your site, you need to
# site.requestFactory = nevowc.twistedpatch.MPRequest
# the following request class if you want file uploads to work.
from urllib import parse as urlparse
from multipart import multipart
from twisted.web import server
from twisted.web.http import parse_qs
[docs]class MPRequest(server.Request):
"""A request class that can sensibly deal with multipart request
payloads.
Basically, file uploads turn up in the .files attribute.
This has a new class attribute, maxUploadSize, the number of
bytes before we return a 413, defaulting to 64 MB.
"""
maxUploadSize = 64*1024*1024
def __init__(self, *args, **kwargs):
self.files = {}
super().__init__(*args, **kwargs)
[docs] def requestReceived(self, command, path, version):
# overridden because I want to use multipart to parse
# POST payloads.
#
# This is essentially requestReceived as of twisted 22.1 with
# the entire POST processing replaced.
#
# TODO: parsing this is still sync, which for large request bodies
# can take quite a while, during which we are otherwise inaccessible.
# I'd have to research hard how I can yield to the reactor from
# in here...
self.content.seek(0, 0)
self.args = {}
self.method, self.uri = command, path
self.clientproto = version
x = self.uri.split(b"?", 1)
if len(x) == 1:
self.path = self.uri
else:
self.path, argstring = x
self.args = parse_qs(argstring, 1)
# Argument processing
ctype = self.requestHeaders.getRawHeaders(b"content-type")
if ctype is not None:
ctype = ctype[0]
if self.method == b"POST" and ctype:
chunk_size = 1048576
strippedCtype, ctypeParams = multipart.parse_options_header(ctype)
if strippedCtype.lower()==b'application/x-www-form-urlencoded':
unquote = urlparse.unquote_plus
else:
unquote = urlparse.unquote
def addArg(field):
# The encoding situation is ugly on the incoming side.
# It's likely utf-8, but I don't want to fail hard
# if it's not. And I re-encode for compatibility
# with twisted.web's args.
if field.value is None:
# ignore empty parameters coming in (it's what cgi did, too)
return
try:
value = field.value.decode("utf-8")
except UnicodeDecodeError:
value = field.value.decode("iso-8859-1")
self.args.setdefault(field.field_name, []).append(
unquote(value).encode("utf-8"))
def addFile(file_obj):
# some versions of multipart pass in the files already
# consumed. Until this is fixed, rewind them here.
file_obj.file_object.seek(0)
self.files.setdefault(file_obj.field_name.decode("latin-1"), []
).append(file_obj)
# we could enfoce maxUploadSize using MAX_BODY_SIZE in
# multipart; however, that just skips the extra bytes.
# I'd like to return a 413, and hence I count myself.
total_read = 0
try:
parser = multipart.FormParser(
strippedCtype.decode("latin-1"), addArg, addFile,
boundary=ctypeParams.get(b"boundary"))
while True:
stuff = self.content.read(chunk_size)
if stuff==b"":
break
total_read += len(stuff)
if total_read>self.maxUploadSize:
body = b"Your upload is too large"
self.setResponseCode(http.REQUEST_ENTITY_TOO_LARGE)
self.setHeader(b"content-type", b"text/plain")
self.setHeader(b"content-length", b"%d" % (len(body),))
self.write(body)
self.finish()
return
parser.write(stuff)
parser.finalize()
except Exception:
# It was a bad request, or we got a signal.
self.channel._respondToBadRequestAndDisconnect()
raise
self.content.seek(0, 0)
self.process()
# vim: sw=4:et:sta