Source code for gavo.svcs.common
"""
Common functions and classes for services and cores.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import re
import os
import pkg_resources
from gavo import base
from gavo import utils
from gavo.formal import nevowc
[docs]class Error(base.ExecutiveAction):
def __init__(self, msg, rd=None, hint=None, htmlMessage=None):
self.rd = rd
self.msg = msg
self.hint = hint
if htmlMessage is not None:
self.htmlMessage = htmlMessage
base.ExecutiveAction.__init__(self, msg)
[docs]class BadMethod(Error):
"""raised to generate an HTTP 405 response.
"""
def __str__(self):
return "This resource cannot respond to the HTTP '%s' method"%self.msg
[docs]class UnknownURI(Error, base.NotFoundError):
"""raised to generate an HTTP 404 response.
"""
def __str__(self):
return Error.__str__(self)
[docs]class RequestEntityTooLarge(Error):
"""raised to generate an HTTP 413 response.
"""
def __init__(self, *args, **kwargs):
if "hint" not in kwargs:
kwargs["hint"] = (
"If you receive this error using sync, try again"
" with async (the limits are generally more generous there). If you"
" receive this message with async, see if the upload can be"
" reduced by transmitting only columns absolutely necessary.")
Error.__init__(self, *args, **kwargs)
[docs]class Authenticate(Error):
"""raised to initiate an authentication request.
Authenticates are optionally constructed with the realm the user
shall authenticate in. If you leave the realm out, the DC-wide default
will be used.
"""
def __init__(self, realm=base.getConfig("web", "realm"), hint=None):
self.realm = realm
Error.__init__(self, "This is a request to authenticate against %s"%realm,
hint=hint)
[docs]class RedirectBase(Error):
def __init__(self, dest, hint=None):
if isinstance(dest, bytes):
dest = dest.decode("utf-8")
self.rawDest = dest
dest = str(dest)
if not dest.startswith("http"):
dest = base.getConfig("web", "serverURL")+base.makeSitePath(dest)
self.dest = dest
Error.__init__(self, "This is supposed to redirect to %s"%dest,
hint=hint)
[docs]class WebRedirect(RedirectBase):
"""raised to redirect a user agent to a different resource (HTTP 301).
WebRedirectes are constructed with the destination URL that can be
relative (to webRoot) or absolute (starting with http).
"""
[docs]class SeeOther(RedirectBase):
"""raised to redirect a user agent to a different resource (HTTP 303).
SeeOthers are constructed with the destination URL that can be
relative (to webRoot) or absolute (starting with http).
They are essentially like WebRedirect, except they put out a 303
instead of a 301.
"""
[docs]class Found(RedirectBase):
"""raised to redirect a user agent to a different resource (HTTP 302).
Found instances are constructed with the destination URL that can be
relative (to webRoot) or absolute (starting with http).
They are essentially like WebRedirect, except they put out a 302
instead of a 301.
"""
[docs]def parseServicePath(serviceParts):
"""returns a tuple of resourceDescriptor, serviceName.
A service id consists of an inputsDir-relative path to a resource
descriptor, a slash, and the name of a service within this descriptor.
This function returns a tuple of inputsDir-relative path and service name.
It raises a gavo.Error if sid has an invalid format. The existence of
the resource or the service are not checked.
"""
return "/".join(serviceParts[:-1]), serviceParts[-1]
[docs]class QueryMeta(dict):
"""A class keeping information on the query environment.
It is constructed with a plain dictionary (there are alternative
constructors for t.w requests are below) mapping
certain keys (you'll currently have to figure out which from the
source) to values, mostly strings, except for the keys listed in
listKeys, which should be sequences of strings.
If you pass an empty dict, some sane defaults will be used. You
can get that "empty" query meta as common.emptyQueryMeta, but make
sure you don't mutate it.
QueryMetas constructed from request will have the user and password
items filled out.
If you're using formal, you should set the formal_data item
to the dictionary created by formal. This will let people use
the parsed parameters in templates.
Note: You cannot trust qm["user"] -- it is not validated against
any credentials.
"""
# a set of keys that have sequences as values (needed for construction
# from t.w request.strargs)
listKeys = set(["_ADDITEM", "_DBOPTIONS_ORDER", "_SET"])
def __init__(self, initArgs=None, defaultLimit=None):
if initArgs is None:
initArgs = {}
self.ctxArgs = utils.CaseSemisensitiveDict(initArgs)
if defaultLimit is None:
self.defaultLimit = base.getConfig("db", "defaultLimit")
else:
self.defaultLimit = defaultLimit
self["formal_data"] = {}
self["user"] = self["password"] = None
self["accept"] = {}
self["inputTable"] = None
self._fillOutput(self.ctxArgs)
self._fillDbOptions(self.ctxArgs)
self._fillSet(self.ctxArgs)
[docs] @classmethod
def fromRequestArgs(cls, inArgs, **kwargs):
"""constructs a QueryMeta from a gavo.web.common.Request.strargs
"""
args = {}
for key, value in inArgs.items():
# defense against broken legacy code: listify if necessary
if not isinstance(value, list):
value = [value]
if key in cls.listKeys:
args[key] = value
else:
if value:
args[key] = value[0]
return cls(args, **kwargs)
[docs] @classmethod
def fromRequest(cls, request, **kwargs):
"""constructs a QueryMeta from a gavo.web.common.Request.
In addition to getting information from the arguments, this
also sets user and password.
"""
res = cls.fromRequestArgs(request.strargs, **kwargs)
res["accept"] = utils.parseAccept(request.getHeader("accept"))
res["user"] = request.getUser() or None
res["password"] = utils.debytify(request.getPassword(), "utf-8") or None
return res
def _fillOutput(self, args):
"""interprets RESPONSEFORMAT and _FORMAT, as well as _VERB/VERB
We will have no format key if neither is given, with RESPONSEFORMAT
taking precedence. Services must then inspect the current renderer's
defaultOutputFormat attribute.
verbosity will default to 20. _VERB gives verlevels directory,
VERB is SCS-compliant.
This also reads the legacy _TDENC and _VOTABLE_VERSION keys
that the ancient HTML format selector still produces. Let's move that
to using RESPONSEFORMAT one of these days.
"""
if "RESPONSEFORMAT" in args:
self["format"] = args["RESPONSEFORMAT"]
elif "_FORMAT" in args:
self["format"] = args["_FORMAT"]
try:
# prefer fine-grained "verbosity" over _VERB or VERB
# Hack: malformed _VERBs result in None verbosity, which is taken to
# mean about "use fields of HTML". Absent _VERB or VERB, on the other
# hand, means VERB=2, i.e., a sane default
if "verbosity" in args:
self["verbosity"] = int(args["verbosity"])
elif "_VERB" in args: # internal verb parameter
self["verbosity"] = int(args["_VERB"])*10
elif "VERB" in args: # verb parameter for SCS and such
self["verbosity"] = int(args["VERB"])*10
else:
self["verbosity"] = 20
except ValueError:
self["verbosity"] = "HTML" # VERB given, but not an int.
self["tdEnc"] = base.getConfig("ivoa", "votDefaultEncoding")=="td"
if "_TDENC" in args:
try:
self["tdEnc"] = base.parseBooleanLiteral(args["_TDENC"])
except ValueError:
pass
try:
self["VOTableVersion"] = tuple(int(v) for v in
args["_VOTABLE_VERSION"].split("."))
except: # simple ignore malformed version specs
pass
self["additionalFields"] = args.get("_ADDITEM", [])
def _fillSet(self, args):
"""interprets the output of a ColumnSet widget.
"""
self["columnSet"] = None
if "_SET" in args:
self["columnSet"] = set(args["_SET"])
def _fillDbOptions(self, args):
self["dbSortKeys"] = [s.strip()
for s in args.get("_DBOPTIONS_ORDER", []) if s.strip()]
self["direction"] = {"ASC": "ASC", "DESC": "DESC"}[
args.get("_DBOPTIONS_DIR", "ASC")]
try:
self["dbLimit"] = int(args["MAXREC"])
except (ValueError, KeyError):
self["dbLimit"] = self.defaultLimit
try:
self["timeout"] = max(float(args["_TIMEOUT"]), 0.001)
except (ValueError, KeyError):
self["timeout"] = base.getConfig("web", "sqlTimeout")
[docs] def overrideDbOptions(self, sortKeys=None, limit=None, sortFallback=None,
direction=None):
if sortKeys is not None:
self["dbSortKeys"] = sortKeys
if not self["dbSortKeys"] and sortFallback is not None:
self["dbSortKeys"] = sortFallback.split(",")
if limit is not None:
self["dbLimit"] = int(limit)
if direction is not None:
self["direction"] = direction
[docs] def asSQL(self):
"""returns the dbLimit and dbSortKey values as an SQL fragment.
"""
frag, pars = [], {}
sortKeys = self["dbSortKeys"]
dbLimit = self["dbLimit"]
# TODO: Sorting needs a thorough redesign, presumably giving column instance
# as column keys. These could carry "default up" or "default down" in
# properties. Meanwhile, there should be a UI to let users decide on
# sort direction.
# Meanwhile, let's do an emergency hack.
if sortKeys:
# Ok, we need to do some emergency securing here. There should be
# pre-validation that we're actually seeing a column key, but
# just in case let's make sure we're seeing an SQL identifier.
# (We can't rely on dbapi's escaping since we're not talking values here)
frag.append("ORDER BY %s %s"%(",".join(
re.sub('[^A-Za-z0-9"_]+', "", key) for key in sortKeys),
self["direction"]))
if dbLimit:
frag.append("LIMIT %(_matchLimit)s")
pars["_matchLimit"] = int(dbLimit)+1
return " ".join(frag), pars
emptyQueryMeta = QueryMeta()
[docs]def getTemplatePath(key):
"""see loadSystemTemplate.
"""
userPath = os.path.join(base.getConfig("rootDir"), "web/templates", key)
if os.path.exists(userPath):
return userPath
else:
resPath = "resources/templates/"+key
if pkg_resources.resource_exists('gavo', resPath):
return pkg_resources.resource_filename('gavo', resPath)
else:
raise base.NotFoundError(key, "template", "system templates")
[docs]def loadSystemTemplate(key):
"""returns a nevow template for system pages from key.
path is interpreted as relative to gavo_root/web/templates (first)
and package internal (last). If no template is found, None is
returned (this harmonizes with the fallback in CustomTemplateMixin).
Note that nevowc.XMLFile at this point ignores the doctype of the
template. For non-XHTML doctypes, set a gavo_useDoctype attribute
on the renderer.
"""
try:
tp = getTemplatePath(key)
if tp is not None:
return nevowc.XMLFile(tp)
except IOError:
pass