"""
A framework for regression tests within RDs.
The basic idea is that there's small pieces of python almost-declaratively
defining tests for a given piece of data. These things can then be
run while (or rather, after) executing dachs val.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import functools
import argparse
import base64
import collections
import io
import pickle as pickle
import http.client
import os
import queue
import random
import re
import sys
import time
import threading
import traceback
import unittest
import urllib.parse
try:
from urllib3 import filepost as req_filepost
from urllib3 import fields as req_fields
except ImportError:
# we need requests to format multipart uploads since python's email
# package is broken for that purpose (formats everything as text).
# I don't want to hard-depend on requests, though.
# TODO: skip tests that want it rather than crashing them
pass
from lxml import etree as lxtree
from gavo import base
from gavo import votable
from gavo import utils
from gavo.utils import EqualingRE #noflake: published name
from gavo.rscdef import common
from gavo.rscdef import procdef
################## Utilities
@functools.lru_cache(1)
def _loadCreds():
"""returns a dictionary of auth keys to user/password pairs from
~/.gavo/test.creds
"""
res = {}
try:
with open(os.path.join(os.environ["HOME"], ".gavo", "test.creds"),
"rb") as f:
for ln in f:
authKey, user, pw = ln.strip().split()
res[authKey.decode("utf-8")] = (user, pw)
except IOError:
pass
return res
[docs]def getAuthFor(authKey):
"""returns a header dictionary to authenticate for authKey.
authKey is a key into ~/.gavo/test.creds.
"""
try:
user, pw = _loadCreds()[authKey]
except KeyError:
raise base.NotFoundError(authKey, "Authorization info",
"~/.gavo/test.creds")
return {'Authorization': b"Basic "+(
base64.b64encode(b"%s:%s"%(user, pw))).strip()}
[docs]def doHTTPRequest(scheme, method, host, path, query,
payload, headers, timeout):
"""creates the HTTP request and retrieves the result.
"""
try:
connClass = {
"http": http.client.HTTPConnection,
"https": http.client.HTTPSConnection}[scheme]
except KeyError:
raise base.ReportableError(
f"Unsupported scheme for regTest URL: {scheme}")
conn = connClass(host, timeout=timeout)
conn.connect()
try:
if query:
path = path+"?"+query
conn.request(method, path, payload, headers)
resp = conn.getresponse()
respHeaders = resp.getheaders()
content = resp.read()
finally:
conn.close()
return resp.status, respHeaders, content
[docs]class Keywords(argparse.Action):
"""A class encapsulating test selection keywords.
There's a match method that takes a string and returns true if either
no keywords are defined or all keywords are present in other (after
case folding).
This doubles as an argparse action and as such is "self-parsing" if you
will.
"""
def __init__(self, *args, **kwargs):
argparse.Action.__init__(self, *args, **kwargs)
self.keywords = set()
def __call__(self, parser, namespace, values, option_string=None):
self.keywords = self._normalise(values)
setattr(namespace, self.dest, self)
def _normalise(self, s):
return set(re.sub("[^\w\s]+", "", s).lower().split())
[docs] def match(self, other):
if not self.keywords:
return True
return not self.keywords-self._normalise(other)
################## RD elements
[docs]class DynamicOpenVocAttribute(base.AttributeDef):
"""an attribute that collects arbitrary attributes in a sequence
of pairs.
The finished sequence is available as a freeAttrs attribute on the
embedding instance. No parsing is done, everything is handled as
a string.
"""
typeDesc_ = "any attribute not otherwise used"
def __init__(self, name, **kwargs):
base.AttributeDef.__init__(self, name, **kwargs)
[docs] def feedObject(self, instance, value):
if not hasattr(instance, "freeAttrs"):
instance.freeAttrs = []
instance.freeAttrs.append((self.name_, value))
[docs] def feed(self, ctx, instance, value):
self.feedObject(instance, value)
[docs] def getCopy(self, instance, newParent):
raise NotImplementedError("This needs some thought")
[docs] def makeUserDoc(self):
return "(ignore)"
[docs] def iterParentMethods(self):
def getAttribute(self, name):
# we need an instance-private attribute dict here:
if self.managedAttrs is self.__class__.managedAttrs:
self.managedAttrs = self.managedAttrs.copy()
try:
return base.Structure.getAttribute(self, name)
except base.StructureError: # no "real" attribute, it's a macro def
self.managedAttrs[name] = DynamicOpenVocAttribute(name)
# that's a decoy to make Struct.validate see a value for the attribute
setattr(self, name, None)
return self.managedAttrs[name]
yield "getAttribute", getAttribute
class _FormData(object):
"""a container for multipart/form-data encoded messages.
This is used for file uploads and depends on requests for that.
"""
def __init__(self):
self.fields = []
def addFile(self, paramName, fileName, data):
"""attaches the contents of fileName under the http parameter name
paramName.
"""
field = req_fields.RequestField(paramName, data, fileName)
field.make_multipart(content_type="application/octet-stream")
self.fields.append(field)
def addParam(self, paramName, paramVal):
"""adds a form parameter paramName with the (string) value paramVal
"""
field = req_fields.RequestField(paramName, paramVal)
field.make_multipart(content_type=None)
self.fields.append(field)
def encode(self):
"""returns the formatted payload for the upload as bytes, and
the content-type to use (including the boundary).
"""
return req_filepost.encode_multipart_formdata(self.fields)
[docs]class Upload(base.Structure):
"""An upload going with a URL.
"""
name_ = "httpUpload"
_src = common.ResdirRelativeAttribute("source",
default=base.NotGiven,
description="Path to a file containing the data to be uploaded.",
copyable=True)
_name = base.UnicodeAttribute("name",
default=base.Undefined,
description="Name of the upload parameter",
copyable=True)
_filename = base.UnicodeAttribute("fileName",
default="upload.dat",
description="Remote file name for the uploaded file.",
copyable=True)
_content = base.DataContent(description="Inline data to be uploaded"
" (conflicts with source)")
@property
def rd(self):
return self.parent.rd
[docs] def validate(self):
if (self.content_ and self.source
or not (self.content_ or self.source)):
raise base.StructureError("Exactly one of element content and source"
" attribute must be given for an upload.")
def _iterInChunks(stuff, chunkSize):
"""returns a function returning stuff in bits of chunkSize elements (of
stuff).
"""
def iterate():
offset = 0
while True:
chunk = stuff[offset:offset+chunkSize]
if not chunk:
return
else:
yield chunk
offset += chunkSize
return iterate
[docs]class DataURL(base.Structure):
"""A source document for a regression test.
As string URLs, they specify where to get data from, but the additionally
let you specify uploads, authentication, headers and http methods,
while at the same time saving you manual escaping of parameters.
The bodies is the path to run the test against. This is
interpreted as relative to the RD if there's no leading slash,
relative to the server if there's a leading slash, and absolute
if there's a scheme.
The attributes are translated to parameters, except for a few
pre-defined names. If you actually need those as URL parameters,
should at us and we'll provide some way of escaping these.
We don't actually parse the URLs coming in here. GET parameters
are appended with a & if there's a ? in the existing URL, with a ?
if not. Again, shout if this is too dumb for you (but urlparse
really isn't all that robust either...)
"""
name_ = "url"
# httpURL will be set to the URL actually used in retrieveResource
# Only use this to report the source of the data for, e.g., failing
# tests.
httpURL = "(not retrieved)"
_base = base.DataContent(description="Base for URL generation; embedded"
" whitespace will be removed, so you're free to break those wherever"
" you like.",
copyable=True)
_httpMethod = base.UnicodeAttribute("httpMethod",
description="Request method; usually one of GET or POST",
default="GET")
_httpPost = common.ResdirRelativeAttribute("postPayload",
default=base.NotGiven,
description="Path to a file containing material that should go"
" with a POST request (conflicts with additional parameters).",
copyable=True)
_postMediaType = base.UnicodeAttribute("httpPostMediaType",
default="application/octet-stream",
description="The media type of postPayload",
copyable=True)
_parset = base.EnumeratedUnicodeAttribute("parSet",
description="Preselect a default parameter set; form gives what"
" our framework adds to form queries.", default=base.NotGiven,
validValues=["form", "TAP"],
copyable=True)
_httpHeaders = base.DictAttribute("httpHeader",
description="Additional HTTP headers to pass.",
copyable=True)
_httpAuthKey = base.UnicodeAttribute("httpAuthKey",
description="A key into ~/.gavo/test.creds to find a user/password"
" pair for this request.",
default=base.NotGiven,
copyable=True)
_httpUploads = base.StructListAttribute("uploads",
childFactory=Upload,
description='HTTP uploads to add to request (must have httpMethod="POST")',
copyable=True)
_httpHonorRedirects = base.BooleanAttribute("httpHonorRedirects",
default=False,
description="Follow 30x redirects instead of just using"
" status, headers, and payload of the initial request.",
copyable=True)
_httpChunkSize = base.IntAttribute("httpChunkSize",
default=None,
description="If there are uploads, upload them in chunks of this"
" many bytes using chunked encoding.",
copyable=True)
_rd = common.RDAttribute()
_open = DynamicOpenVocAttribute("open")
[docs] def getValue(self, serverURL):
"""returns a pair of full request URL and postable payload for this
test.
"""
urlBase = re.sub(r"\s+", "", self.content_)
if "://" in urlBase:
# we believe there's a scheme in there
pass
elif urlBase.startswith("/"):
urlBase = serverURL+urlBase
else:
urlBase = serverURL+"/"+self.parent.rd.sourceId+"/"+urlBase
if self.httpMethod=="POST":
return urlBase
else:
return self._addParams(urlBase, urllib.parse.urlencode(self.getParams()))
[docs] def getParams(self):
"""returns the URL parameters as a sequence of kw, value pairs.
"""
params = getattr(self, "freeAttrs", [])
if self.parSet=="form":
params.extend([("__nevow_form__", "genForm"), ("submit", "Go"),
("_charset_", "UTF-8")])
elif self.parSet=='TAP':
params.extend([("LANG", "ADQL"), ("REQUEST", "doQuery")])
return params
[docs] def retrieveResource(self, serverURL, timeout):
"""returns a triple of status, headers, and content for retrieving
this URL.
"""
self.httpURL, payload = self.getValue(serverURL), None
headers = {
"user-agent": "DaCHS regression tester"}
headers.update(self.httpHeader)
if self.httpMethod=="POST":
if self.postPayload:
headers["content-type"] = self.httpPostMediaType
with open(self.postPayload, "rb") as f:
payload = f.read()
elif self.uploads:
form = _FormData()
for key, value in self.getParams():
form.addParam(key, value)
for upload in self.uploads:
upload.addToForm(form)
payload, ct = form.encode()
headers["Content-Type"] = ct
if self.httpChunkSize:
payload = _iterInChunks(payload, self.httpChunkSize)()
else:
payload = urllib.parse.urlencode(self.getParams())
headers["Content-Type"] = "application/x-www-form-urlencoded"
scheme, host, path, _, query, _ = urllib.parse.urlparse(str(self.httpURL))
if self.httpAuthKey is not base.NotGiven:
headers.update(getAuthFor(self.httpAuthKey))
status, respHeaders, content = doHTTPRequest(
scheme, str(self.httpMethod),
host, path, query, payload, headers, timeout)
while self.httpHonorRedirects and status in [301, 302, 303]:
scheme, host, path, _, query, _ = urllib.parse.urlparse(
getHeaderValue(respHeaders, "location"))
status, respHeaders, content = doHTTPRequest(scheme, "GET",
host, path, query, None, {}, timeout)
return status, respHeaders, content
def _addParams(self, urlBase, params):
"""a brief hack to add query parameters to GET-style URLs.
This is a workaround for not trusting urlparse and is fairly easy to
fool.
Params must already be fully encoded.
"""
if not params:
return urlBase
if "?" in urlBase:
return urlBase+"&"+params
else:
return urlBase+"?"+params
[docs] def validate(self):
if self.postPayload is not base.NotGiven:
if self.getParams():
raise base.StructureError("No parameters (or parSets) are"
" possible with postPayload")
if self.httpMethod!="POST":
raise base.StructureError("Only POST is allowed as httpMethod"
" together with postPayload")
if self.uploads:
if self.httpMethod!="POST":
raise base.StructureError("Only POST is allowed as httpMethod"
" together with upload")
super().validate()
[docs]class RegTest(procdef.ProcApp, unittest.TestCase):
"""A regression test.
Tests are defined through url and code elements. See `Regression Testing`_
for more information.
"""
name_ = "regTest"
requiredType = "regTest"
formalArgs = "self"
data = b"<No data retrieved yet>"
requestTime = None
runCount = 1
additionalNamesForProcs = {
"EqualingRE": EqualingRE}
_title = base.NWUnicodeAttribute("title",
default=base.Undefined,
description="A short, human-readable phrase describing what this"
" test is exercising.")
_url = base.StructAttribute("url",
childFactory=DataURL,
default=base.NotGiven,
description="The source from which to fetch the test data.")
_tags = base.StringSetAttribute("tags",
description="A list of (free-form) tags for this test. Tagged tests"
" are only run when the runner is constructed with at least one"
" of the tags given. This is mainly for restricting tags to production"
" or development servers.")
_rd = common.RDAttribute()
def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, "fakeForPyUnit")
procdef.ProcApp.__init__(self, *args, **kwargs)
[docs] def fakeForPyUnit(self):
raise AssertionError("This is not a pyunit test right now")
@property
def description(self):
source = ""
if self.rd:
id = self.rd.sourceId
source = " (%s)"%id
return self.title+source
[docs] def retrieveData(self, serverURL, timeout):
"""returns headers and content when retrieving the resource at url.
Sets the headers and data attributes of the test instance.
"""
startTime = time.time()
if self.url is base.NotGiven:
self.status, self.headers, self.data = None, None, None
else:
self.status, self.headers, self.data = self.url.retrieveResource(
serverURL, timeout=timeout)
self.requestTime = time.time()-startTime
[docs] def getDataSource(self):
"""returns a string pointing people to where data came from.
"""
if self.url is base.NotGiven:
return "(Unconditional)"
else:
return self.url.httpURL
[docs] def pointNextToLocation(self, addToPath=""):
"""arranges for the value of the location header to become the
base URL of the next test.
addToPath, if given, is appended to the location header.
If no location header was provided, the test fails.
All this of course only works for tests in sequential regSuites.
"""
if not hasattr(self, "followUp"):
raise AssertionError("pointNextToLocation only allowed within"
" sequential regSuites")
for key, value in self.headers:
if key.lower()=='location':
self.followUp.url.content_ = value+addToPath
break
else:
raise AssertionError("No location header in redirect")
[docs] @utils.document
def assertHasStrings(self, *strings):
"""checks that all its arguments are found within content.
If string arguments are passed, they are utf-8 encoded before
comparison. If that's not what you want, pass bytes yourself.
"""
for phrase in strings:
assert utils.bytify(phrase) in self.data, "%s missing"%repr(phrase)
[docs] @utils.document
def assertLacksStrings(self, *strings):
"""checks that all its arguments are *not* found within content.
"""
for phrase in strings:
assert utils.bytify(phrase) not in self.data, \
"Unexpected: '%s'"%repr(phrase)
[docs] @utils.document
def assertHTTPStatus(self, expectedStatus):
"""checks whether the request came back with expectedStatus.
"""
assert expectedStatus==self.status, ("Bad status received, %s instead"
" of %s"%(self.status, expectedStatus))
[docs] @utils.document
def assertValidatesXSD(self):
"""checks whether the returned data are XSD valid.
This uses DaCHS built-in XSD validator with the built-in schema
files; it hence will in general not retrieve schema files from
external sources.
"""
from gavo.helpers import testtricks
msgs = testtricks.getXSDErrors(self.data)
if msgs:
raise AssertionError("Response not XSD valid. Validator output"
" starts with\n%s"%(msgs[:160]))
XPATH_NAMESPACE_MAP = {
"v": "http://www.ivoa.net/xml/VOTable/v1.3",
"v2": "http://www.ivoa.net/xml/VOTable/v1.2",
"v1": "http://www.ivoa.net/xml/VOTable/v1.1",
"o": "http://www.openarchives.org/OAI/2.0/",
"h": "http://www.w3.org/1999/xhtml",
"m": "http://www.ivoa.net/xml/mivot",
}
[docs] @utils.document
def assertXpath(self, path, assertions):
"""checks an xpath assertion.
path is an xpath (as understood by lxml), with namespace
prefixes statically mapped; there's currently v2 (VOTable
1.2), v1 (VOTable 1.1), v (whatever VOTable version
is the current DaCHS default), h (the namespace of the
XHTML elements DaCHS generates), m (the provisional MIVOT namespace)
and o (OAI-PMH 2.0).
If you need more prefixes, hack the source and feed back
your changes (or just add to self.XPATH_NAMESPACE_MAP
locally).
path must match exactly one element.
assertions is a dictionary mapping attribute names to
their expected value. Use the key None to check the
element content, and match for None if you expect an
empty element. To match against a namespaced attribute, you
have to give the full URI; prefixes are not applied here.
This would look like::
"{http://www.w3.org/2001/XMLSchema-instance}type": "vg:OAIHTTP"
If you need an RE match rather than equality, there's
EqualingRE in your code's namespace.
"""
if not hasattr(self, "cached parsed tree"):
setattr(self, "cached parsed tree", lxtree.fromstring(self.data))
tree = getattr(self, "cached parsed tree")
res = tree.xpath(path, namespaces=self.XPATH_NAMESPACE_MAP)
if len(res)==0:
raise AssertionError("Element not found: %s"%path)
elif len(res)!=1:
raise AssertionError("More than one item matched for %s"%path)
el = res[0]
for key, val in assertions.items():
if key is None:
try:
foundVal = el.text
except AttributeError:
# assume the expression was for an attribute and just use the
# value
foundVal = el
else:
foundVal = el.attrib[key]
assert val==foundVal, "Trouble with %s: %s (%s, %s)"%(
key or "content", path, repr(val), repr(foundVal))
[docs] @utils.document
def getXpath(self, path, element=None):
"""returns the equivalent of tree.xpath(path) for an lxml etree
of the current document or in element, if passed in.
This uses the same namespace conventions as assertXpath.
"""
if element is None:
if not hasattr(self, "_parsedTree"):
self._parsedTree = lxtree.fromstring(self.data)
element = self._parsedTree
return element.xpath(path, namespaces=self.XPATH_NAMESPACE_MAP)
[docs] @utils.document
def getFirstVOTableRow(self, rejectExtras=True):
"""interprets data as a VOTable and returns the first row as a dictionary
It will normally ensure that only one row is returned. To make it
silently discard extra rows, make sure the result is sorted, or you will
get randomly failing tests. Database-querying cores (which is where order
is an issue) also honor _DBOPTIONS_ORDER).
"""
data, metadata = votable.loads(self.data)
rows = metadata.iterDicts(data)
result = next(rows)
if rejectExtras:
try:
secondRow = next(rows)
except StopIteration:
pass
else:
raise AssertionError(
f"getFirstVOTableRow swallows a row: {secondRow}")
return result
[docs] @utils.document
def getVOTableRows(self):
"""parses the first table in a result VOTable and returns the contents
as a sequence of dictionaries.
"""
data, metadata = votable.loads(self.data)
return list(metadata.iterDicts(data))
[docs] @utils.document
def getUnique(self, seq):
"""returns seq[0], asserting at the same time that len(seq) is 1.
The idea is that you can say row = self.getUnique(self.getVOTableRows())
and have a nice test on the side -- and no ugly IndexError on an
empty respone.
"""
self.assertEqual(len(seq), 1)
return seq[0]
[docs]class RegTestSuite(base.Structure):
"""A suite of regression tests.
"""
name_ = "regSuite"
_tests = base.StructListAttribute("tests",
childFactory=RegTest,
description="Tests making up this suite",
copyable=False)
_title = base.NWUnicodeAttribute("title",
description="A short, human-readable phrase describing what this"
" suite is about.")
_sequential = base.BooleanAttribute("sequential",
description="Set to true if the individual tests need to be run"
" in sequence.",
default=False)
[docs] def itertests(self, tags, keywords):
for test in self.tests:
if test.tags and not test.tags&tags:
continue
if keywords and not keywords.match(test.title):
continue
yield test
[docs] def completeElement(self, ctx):
if self.title is None:
self.title = "Test suite from %s"%self.parent.sourceId
super().completeElement(ctx)
[docs] def expand(self, *args, **kwargs):
"""hand macro expansion to the RD.
"""
return self.parent.expand(*args, **kwargs)
#################### Running Tests
[docs]class TestStatistics(object):
"""A statistics gatherer/reporter for the regression tests.
"""
def __init__(self, verbose=True):
self.verbose = False
self.runs = []
self.oks, self.fails, self.total = 0, 0, 0
self.globalStart = time.time()
self.lastTimestamp = time.time()+1
self.timeSum = 0
[docs] def add(self, status, runTime, title, payload, srcRD):
"""adds a test result to the statistics.
status is either OK, FAIL, or ERROR, runTime is the time
spent in running the test, title is the test's title,
and payload is "something" associated with failures that
should help diagnosing them.
"""
if status=="OK":
self.oks += 1
else:
if self.verbose:
print(">>>>>>>>", status)
self.fails += 1
self.total += 1
self.timeSum += runTime
#XXX TODO: Payload can use a lot of memory -- I'm nuking it for now
# -- maybe use an on-disk database to store this and allow later debugging?
self.runs.append((runTime, status, title,
None, #str(payload),
srcRD))
self.lastTimestamp = time.time()
[docs] def getReport(self):
"""returns a string representation of a short report on how the tests
fared.
"""
try:
return ("%d of %d bad. avg %.2f, min %.2f, max %.2f. %.1f/s, par %.1f"
)%(self.fails, self.fails+self.oks, self.timeSum/len(self.runs),
min(self.runs)[0], max(self.runs)[0], float(self.total)/(
self.lastTimestamp-self.globalStart),
self.timeSum/(self.lastTimestamp-self.globalStart))
except ZeroDivisionError:
return "No tests run (probably did not find any)."
[docs] def getFailures(self):
"""returns a string containing some moderately verbose info on the
failures collected.
"""
failures = {}
for runTime, status, title, payload, srcRD in self.runs:
if status!="OK":
failures.setdefault(srcRD, []).append("%s %s"%(status, title))
return "\n".join("From %s:\n %s\n\n"%(srcRD,
"\n ".join(badTests))
for srcRD, badTests in failures.items())
[docs] def save(self, target):
"""saves the entire test statistics to target.
This is a pickle of basically what's added with add. No tools
for doing something with this are provided so far.
"""
with open(target, "wb") as f:
pickle.dump(self.runs, f)
[docs]class TestRunner(object):
"""A runner for regression tests.
It is constructed with a sequence of suites (RegTestSuite instances)
and allows running these in parallel. It honors the suites' wishes
as to being executed sequentially.
"""
# The real trick here are the test suites with state (sequential=True). For
# those, the individual tests must be serialized, which happens using the magic
# followUp attribute on the tests.
def __init__(self, suites, serverURL=None,
verbose=True, dumpNegative=False, tags=None,
timeout=45, failFile=None, nRepeat=1,
execDelay=0, nThreads=8, printTitles=False,
keywords=None):
self.verbose, self.dumpNegative = verbose, dumpNegative
self.failFile, self.nRepeat = failFile, nRepeat
self.printTitles = printTitles
if tags:
self.tags = tags
else:
self.tags = frozenset()
self.timeout = timeout
self.execDelay = execDelay
self.nThreads = nThreads
self.keywords = keywords
self.serverURL = serverURL or base.getConfig("web", "serverurl")
self.curRunning = {}
self.threadId = 0
self._makeTestList(suites)
self.stats = TestStatistics(verbose=self.verbose)
self.resultsQueue = queue.Queue()
[docs] @classmethod
def fromRD(cls, rd, **kwargs):
"""constructs a TestRunner for a single ResourceDescriptor.
"""
return cls(rd.tests, **kwargs)
[docs] @classmethod
def fromSuite(cls, suite, **kwargs):
"""constructs a TestRunner for a RegTestSuite suite
"""
return cls([suite], **kwargs)
[docs] @classmethod
def fromTest(cls, test, **kwargs):
"""constructs a TestRunner for a single RegTest
"""
return cls([base.makeStruct(RegTestSuite, tests=[test],
parent_=test.parent.parent)],
**kwargs)
def _makeTestList(self, suites):
"""puts all individual tests from all test suites in a deque.
"""
self.testList = collections.deque()
for suite in suites:
if suite.sequential:
self._makeTestsWithState(suite)
else:
self.testList.extend(suite.itertests(self.tags, self.keywords))
def _makeTestsWithState(self, suite):
"""helps _makeTestList by putting suite's test in a way that they are
executed sequentially.
"""
# technically, this is done by just entering the suite's "head"
# and have that pull all the other tests in the suite behind it.
tests = list(suite.itertests(self.tags, self.keywords))
if tests:
firstTest = tests.pop(0)
self.testList.append(firstTest)
for test in tests:
firstTest.followUp = test
firstTest = test
def _spawnThread(self):
"""starts a new test in a thread of its own.
"""
test = self.testList.popleft()
if self.printTitles:
sys.stderr.write(" <%s> "%test.title)
sys.stderr.flush()
newThread = threading.Thread(target=self.runOneTest,
args=(test, self.threadId, self.execDelay))
newThread.description = test.description
newThread.setDaemon(True)
self.curRunning[self.threadId] = newThread
self.threadId += 1
newThread.start()
if test.runCount<self.nRepeat:
test.runCount += 1
self.testList.append(test)
[docs] def runOneTest(self, test, threadId, execDelay):
"""runs test and puts the results in the result queue.
This is usually run in a thread. However, threadId is only
used for reporting, so you may run this without threads.
To support sequential execution, if test has a followUp attribute,
this followUp is queued after the test has run.
If the execDelay argument is non-zero, the thread delays its execution
by that many seconds.
"""
if execDelay:
time.sleep(execDelay)
startTime = time.time()
try:
try:
test.retrieveData(self.serverURL, timeout=self.timeout)
test.compile()(test)
self.resultsQueue.put(("OK", test, None, None, time.time()-startTime))
except KeyboardInterrupt:
raise
except AssertionError as ex:
self.resultsQueue.put(("FAIL", test, ex, None,
time.time()-startTime))
# races be damned
if self.dumpNegative:
print("Content of failing test:\n%s\n"%test.data)
if self.failFile:
with open(self.failFile, "wb") as f:
f.write(test.data)
except Exception as ex:
if self.failFile and getattr(test, "data", None) is not None:
with open(self.failFile, "wb") as f:
f.write(test.data)
f = io.StringIO()
traceback.print_exc(file=f)
self.resultsQueue.put(("ERROR", test, ex, f.getvalue(),
time.time()-startTime))
finally:
if hasattr(test, "followUp"):
self.resultsQueue.put(("addTest", test.followUp, None, None, 0))
if threadId is not None:
self.resultsQueue.put(("collectThread", threadId, None, None, 0))
def _printStat(self, state, test, payload, traceback):
"""gives feedback to the user about the result of a test.
"""
if not self.verbose:
return
if state=="FAIL":
print("**** Test failed: %s -- %s\n"%(
test.title, test.getDataSource()))
print(">>>>", payload)
elif state=="ERROR":
print("**** Internal Failure: %s -- %s\n"%(test.title,
test.url.httpURL))
print(traceback)
def _runTestsReal(self, showDots=False):
"""executes the tests, taking tests off the queue and spawning
threads until the queue is empty.
showDots, if True, instructs the runner to push one dot to stderr
per test spawned.
"""
while self.testList or self.curRunning:
while len(self.curRunning)<self.nThreads and self.testList:
self._spawnThread()
evType, test, payload, traceback, dt = self.resultsQueue.get(
timeout=self.timeout)
if evType=="addTest":
self.testList.appendleft(test)
elif evType=="collectThread":
deadThread = self.curRunning.pop(test)
deadThread.join()
else:
self.stats.add(evType, dt, test.title, "", test.rd.sourceId)
if showDots:
if evType=="OK":
sys.stderr.write(".")
else:
sys.stderr.write("E")
sys.stderr.flush()
self._printStat(evType, test, payload, traceback)
if showDots:
sys.stderr.write("\n")
[docs] def runTests(self, showDots=False):
"""executes the tests in a random order and in parallel.
"""
random.shuffle(self.testList)
try:
self._runTestsReal(showDots=showDots)
except queue.Empty:
sys.stderr.write("******** Hung jobs\nCurrently executing:\n")
for thread in list(self.curRunning.values()):
sys.stderr.write("%s\n"%thread.description)
[docs] def runTestsInOrder(self):
"""runs all tests sequentially and in the order they were added.
"""
for test in self.testList:
self.runOneTest(test, None, self.execDelay)
try:
while True:
evType, test, payload, traceback, dt = self.resultsQueue.get(False)
if evType=="addTest":
self.testList.appendleft(test)
else:
self.stats.add(evType, dt, test.title, "", test.rd.sourceId)
self._printStat(evType, test, payload, traceback)
except queue.Empty:
pass
################### command line interface
[docs]def urlToURL():
"""converts HTTP (GET) URLs to URL elements.
"""
# This is what's invoked by the makeTestURLs command.
while True:
parts = urllib.parse.urlparse(input())
print("<url %s>%s</url>"%(
" ".join('%s="%s"'%(k,v[0])
for k,v in urllib.parse.parse_qs(parts.query).items()),
parts.path))
def _getRunnerForAll(runnerArgs, showProgress):
from gavo.registry import publication
from gavo import api
suites = []
for rdId in publication.findAllRDs():
if showProgress:
sys.stdout.write(rdId+" ")
sys.stdout.flush()
try:
rd = api.getRD(rdId, doQueries=False)
except Exception as msg:
base.ui.notifyError("Error loading RD %s (%s). Ignoring."%(
rdId, utils.safe_str(msg)))
suites.extend(rd.tests)
return TestRunner(suites, **runnerArgs)
def _getRunnerForSingle(testId, runnerArgs):
from gavo import api
testElement = common.getReferencedElement(testId, doQueries=False)
if isinstance(testElement, api.RD):
runner = TestRunner.fromRD(testElement, **runnerArgs)
elif isinstance(testElement, RegTestSuite):
runner = TestRunner.fromSuite(testElement, **runnerArgs)
elif isinstance(testElement, RegTest):
runner = TestRunner.fromTest(testElement, **runnerArgs)
else:
raise base.ReportableError("%s is not a testable element."%testId,
hint="Only RDs, regSuites, or regTests are eligible for testing.")
return runner
[docs]def parseCommandLine(args=None):
"""parses the command line for main()
"""
parser = argparse.ArgumentParser(description="Run tests embedded in RDs")
parser.add_argument("id", type=str,
help="RD id or cross-RD identifier for a testable thing.")
parser.add_argument("-v", "--verbose", help="Dump info on failed test",
action="store_true", dest="verbose")
parser.add_argument("-V", "--titles", help="Write title when starting"
" a test.",
action="store_true", dest="printTitles")
parser.add_argument("-d", "--dump-negative", help="Dump the content of"
" failing tests to stdout",
action="store_true", dest="dumpNegative")
parser.add_argument("-t", "--tag", help="Also run tests tagged with TAG.",
action="store", dest="tag", default=None, metavar="TAG")
parser.add_argument("-R", "--n-repeat", help="Run each test N times",
action="store", dest="nRepeat", type=int, default=1, metavar="N")
parser.add_argument("-T", "--timeout", help="Abort and fail requests"
" after inactivity of SECONDS",
action="store", dest="timeout", type=int, default=15, metavar="SECONDS")
parser.add_argument("-D", "--dump-to", help="Dump the content of"
" last failing test to FILE", metavar="FILE",
action="store", type=str, dest="failFile",
default=None)
parser.add_argument("-w", "--wait", help="Wait SECONDS before executing"
" a request", metavar="SECONDS", action="store",
dest="execDelay", type=int, default=0)
parser.add_argument("-u", "--serverURL", help="URL of the DaCHS root"
" at the server to test",
action="store", type=str, dest="serverURL",
default=base.getConfig("web", "serverURL"))
parser.add_argument("-n", "--number-par", help="Number of requests"
" to be run in parallel",
action="store", type=int, dest="nThreads",
default=8)
parser.add_argument("--seed", help="Seed the RNG with this number."
" Note that this doesn't necessarily make the execution sequence"
" predictable, just the submission sequence.",
action="store", type=int, dest="randomSeed", default=None)
parser.add_argument("-k", "--keywords", help="Only run tests"
" with descriptions containing all (whitespace-separated) keywords."
" Sequential tests will be run in full, nevertheless, if their head test"
" matches.",
action=Keywords, type=str, dest="keywords")
parser.add_argument("-p", "--progress", help="Show progress when"
" parsing RDs.",
action="store_true", dest="showProgress")
return parser.parse_args(args)
[docs]def main(args=None):
"""user interaction for gavo test.
"""
tags = None
args = parseCommandLine(args)
if args.randomSeed:
random.seed(args.randomSeed)
if args.tag:
tags = set([args.tag])
if args.serverURL:
args.serverURL = args.serverURL.rstrip("/")
runnerArgs = {
"verbose": args.verbose,
"dumpNegative": args.dumpNegative,
"serverURL": args.serverURL,
"tags": tags,
"failFile": args.failFile,
"nRepeat": args.nRepeat,
"timeout": args.timeout,
"execDelay": args.execDelay,
"nThreads": args.nThreads,
"printTitles": args.printTitles,
"keywords": args.keywords,
}
if args.id=="ALL":
runner = _getRunnerForAll(runnerArgs, args.showProgress)
else:
runner = _getRunnerForSingle(args.id, runnerArgs)
runner.runTests(showDots=True)
print(runner.stats.getReport())
if runner.stats.fails:
print(runner.stats.getFailures())
sys.exit(1)