# -*- coding: utf-8 -*-
"""
Helper classes for the DaCHS' unit tests.
WARNING: This messes up some global state. DO NOT import into modules
doing regular work. testtricks is the module for that kind for stuff.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import http.server
import contextlib
import gc
import io
import importlib
import os
import pickle
import re
import subprocess
import sys
import threading
import time
import traceback
import unittest
import warnings
try:
import coverage
except ImportError:
# we only need this if someone set COVERAGE_FILE; don't do that
# if you don't have coverage.py installed.
pass
# This sets up a test environment of the DaCHS software.
#
# To make this work, the current user must be allowed to run
# createdb (in practice, you should have done something like
#
# sudo -u postgres -s `id -nu`
#
# ). You should be able to tear both ~/_gavo_test and the database
# down, and this should automatically recreate everything. That's
# an integration test for DaCHS, too.
#
# This must be run before anything else from gavo is imported because
# it manipulates the config stuff; this, in turn, runs as soon as
# base is imported.
# The following forces tests to be run from the tests directory.
# Reasonable, I'd say.
#
# All the custom setup can be suppressed by setting a GAVO_OOTTEST
# env var before importing this. That's for "out of tree test"
# and is used by the relational registry "unit" tests (and possibly
# others later).
def ensureResources():
# overridden below if necessary
pass
if "GAVO_OOTTEST" in os.environ:
from gavo import base
else:
TEST_BASE = os.getcwd()
originalEnvironment = os.environ.copy()
os.environ["GAVOCUSTOM"] = "/invalid"
os.environ["GAVOSETTINGS"] = os.path.join(TEST_BASE,
"test_data", "test-gavorc")
if not os.path.exists(os.environ["GAVOSETTINGS"]):
warnings.warn("testhelpers imported from non-test directory. This"
" is almost certainly not what you want (or set GAVO_OOTTEST).")
os.environ["GAVO_INIT_RUNNING"] = "1"
from gavo import base #noflake: import above is conditional
if os.path.exists(base.getConfig("rootDir")):
# we're assuming the test environment is complete then
# (i.e., including the test database)
from gavo.base import sqlsupport
sqlsupport.initPsycopg()
else:
from gavo.user import initdachs
dbname = "dachstest"
dsn = initdachs.DSN(dbname)
initdachs.createFSHierarchy(dsn, "test")
with open(os.path.join(
base.getConfig("configDir"), "defaultmeta.txt"), "a",
encoding="utf-8") as f:
f.write("!organization.description: Mein wüster Club\n")
f.write("!contact.email: invalid@wherever.else\n")
with open(os.path.join(
base.getConfig("configDir"), "vanitynames.txt"), "w") as f:
f.write('http://sofo-hd.de sofo !redirect\n'
'data/cores/scs/scs.xml fromvanitynames\n')
from gavo.base import config, meta
meta.makeFallbackMeta(reload=True)
os.symlink(os.path.join(TEST_BASE, "test_data"),
os.path.join(base.getConfig("inputsDir"), "data"))
os.rmdir(os.path.join(base.getConfig("inputsDir"), "__system"))
os.symlink(os.path.join(TEST_BASE, "test_data", "__system"),
os.path.join(base.getConfig("inputsDir"), "__system"))
os.mkdir(os.path.join(base.getConfig("inputsDir"), "test"))
[docs] def ensureResources(): #noflake: deliberate override
# delay everything that needs DB connectivity until
# after the import; otherwise, we may create a conn pool
# and that, as used in DaCHS, may create a thread. That means
# risking a deadlock while python is importing.
try:
subprocess.check_call(["createdb", "--template=template0",
"--encoding=UTF-8", "--locale=C", dbname])
initdachs.initDB(dsn)
from gavo.base import sqlsupport
# update the type bindings
importlib.reload(sqlsupport)
from gavo.base import sqlsupport
sqlsupport.initPsycopg()
# update the UDFs
importlib.reload(ufunctions)
from gavo.registry import publication
from gavo import rsc
from gavo import rscdesc #noflake: caches registration
from gavo.protocols import creds
publication.updateServiceList([base.caches.getRD("//rds")])
publication.updateServiceList([base.caches.getRD("//services")])
publication.updateServiceList([base.caches.getRD("//tap")])
# Import some resources necessary in trial tests
from gavo.user import importing
importing.process(rsc.getParseOptions(), ["//obscore"])
with base.getWritableAdminConn() as conn:
rsc.makeData(base.resolveCrossId("//uws#enable_useruws"),
connection=conn)
creds.addUser(conn, "testing", "testing", "powerless test user")
creds.addUser(conn, "other", "other", "another test user")
except:
traceback.print_exc()
sys.stderr.write("Creation of test environment failed. Remove %s\n"
" before trying again.\n"%(base.getConfig("rootDir")))
sys.exit(1)
# we really don't want to send mail from the test suite, so override
# osinter.sendMail with a no-op
from gavo.base import osinter
osinter.sendMail = lambda *args, **kwargs: None
from gavo import utils
from gavo.base import config #noflake: previous import conditional
from gavo.base import sqlsupport
from gavo.adql import ufunctions
[docs]class FakeSimbad(object):
"""we monkeypatch simbadinterface such that we don't query simbad during
tests.
Also, we don't persist cached Simbad responses. It's a bit sad that
that functionality therefore doesn't get exercised.
"""
simbadData = {'Aldebaran': {'RA': 68.98016279,
'dec': 16.50930235,
'oname': 'Aldebaran',
'otype': 'LP?'},
'M1': {'RA': 83.63308333, 'dec': 22.0145, 'oname': 'M1', 'otype': 'SNR'},
'Wozzlfoo7xx': None}
def __init__(self, *args, **kwargs):
pass
[docs] def query(self, ident):
return self.simbadData.get(ident)
# Here's the deal on TestResource: When setting up complicated stuff for
# tests (like, a DB table), define a TestResource for it. Override
# the make(dependents) method returning something and the clean(res)
# method to destroy whatever you created in make().
#
# Then, in VerboseTests, have a class attribute
# resource = [(name1, res1), (name2, res2)]
# giving attribute names and resource *instances*.
# There's an example in adqltest.py
#
# If you use this and you have a setUp of your own, you *must* call
# the superclass's setUp method.
import testresources
[docs]class ResourceInstance(object):
"""A helper class for TestResource.
See that docstring for info on what this is about; in case you
encounter one of these but need the real thing, just pull
.original.
"""
def __init__(self, original):
self.original = original
def __getattr__(self, name):
return getattr(self.original, name)
def __getitem__(self, index):
return self.original[index]
def __iter__(self):
return iter(self.original)
def __str__(self):
return str(self.original)
def __len__(self):
return len(self.original)
def __contains__(self, other):
return other in self.original
[docs]class TestResource(testresources.TestResource):
"""A wrapper for testresources maintaining some backward compatibility.
testresources 2.0.1 pukes into the namespaces of what's
returned from make. I've not really researched what they
intend people to return from make these days, but in order
to avoid major surgery on the code, this class simply wraps
objects that don't allow arbitrary attributes with ResourceInstance
when returned from make.
To make that happen, I need to override code from testresources itself,
which is a pity. In case this breaks: Take all methods that call .make
and replace make with _make_and_wrap.
Caution: when you implement reset(), you'll have to wrap the
result with testhelpers.ResourceInstance manually; but then you'd
have to copy dependencies manually, which is crazy, and so I think
manual reset currently really is broken.
"""
def _make_and_wrap(self, deps):
res = self.make(deps)
try:
# see if we can set attributes and return if so...
res.improbably_named_attribute = None
del res.improbably_named_attribute
return res
except AttributeError:
# ...else wrap the result
return ResourceInstance(res)
##### Methods adapted from testresources
def _make_all(self, result):
"""Make the dependencies of this resource and this resource."""
self._call_result_method_if_exists(result, "startMakeResource", self)
dependency_resources = {}
for name, resource in self.resources:
dependency_resources[name] = resource.getResource()
resource = self._make_and_wrap(dependency_resources)
for name, value in list(dependency_resources.items()):
setattr(resource, name, value)
self._call_result_method_if_exists(result, "stopMakeResource", self)
return resource
def _reset(self, resource, dependency_resources):
"""Override this to reset resources other than via clean+make.
This method should reset the self._dirty flag (assuming the manager can
ever be clean) and return either the old resource cleaned or a fresh
one.
:param resource: The resource to reset.
:param dependency_resources: A dict mapping name -> resource instance
for the resources specified as dependencies.
"""
self.clean(resource)
return self._make_and_wrap(dependency_resources)
from gavo.helpers.testtricks import ( #noflake: exported names
XSDTestMixin, testFile, getMemDiffer, getXMLTree, collectedEvents)
[docs]class ForkingSubprocess(subprocess.Popen):
"""A subprocess that doesn't exec but fork.
"""
def _execute_child(self, args, executable, preexec_fn, close_fds,
pass_fds, cwd, env,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite,
restore_signals,
# post-3.7, some additional parameters were added, which
# fortunately we don't need.
*ignored_args):
# stolen largely from 2.7 subprocess. Unfortunately, I can't just override the
# exec action; so, I'm replacing the the whole exec shebang with a simple
# call to executable().
#
# Also, I'm doing some extra hack to collect coverage info if it looks
# like we're collecting coverage.
#
# The signature (and a few inside parts) is from 3.7 subprocess. We're
# ignoring everything that 2.7 hasn't known under the bold assumption that our
# simple testing forks that doesn't matter (which ought to be true for
# start_new_session at least:-).
sys.argv = args
if executable is None:
executable = args[0]
def _close_in_parent(fd):
os.close(fd)
# For transferring possible exec failure from child to parent.
# Data format: "exception name:hex errno:description"
# Pickle is not used; it is complex and involves memory allocation.
errpipe_read, errpipe_write = os.pipe()
try:
gc_was_enabled = gc.isenabled()
# Disable gc to avoid bug where gc -> file_dealloc ->
# write to stderr -> hang. http://bugs.python.org/issue1336
gc.disable()
try:
self.pid = os.fork()
except:
if gc_was_enabled:
gc.enable()
raise
self._child_created = True
if self.pid == 0:
# Child
try:
# Close parent's pipe ends
if p2cwrite!=-1:
os.close(p2cwrite)
if c2pread!=-1:
os.close(c2pread)
if errread!=-1:
os.close(errread)
os.close(errpipe_read)
# When duping fds, if there arises a situation
# where one of the fds is either 0, 1 or 2, it
# is possible that it is overwritten (#12607).
if c2pwrite == 0:
c2pwrite = os.dup(c2pwrite)
if errwrite == 0 or errwrite == 1:
errwrite = os.dup(errwrite)
# Dup fds for child
def _dup2(a, b):
# dup2() removes the CLOEXEC flag but
# we must do it ourselves if dup2()
# would be a no-op (issue #10806).
if a == b:
self._set_cloexec_flag(a, False)
elif a!=-1:
os.dup2(a, b)
_dup2(p2cread, 0)
_dup2(c2pwrite, 1)
_dup2(errwrite, 2)
# Close pipe fds. Make sure we don't close the
# same fd more than once, or standard fds.
closed = set([-1])
for fd in [p2cread, c2pwrite, errwrite]:
if fd not in closed and fd > 2:
os.close(fd)
closed.add(fd)
if cwd is not None:
os.chdir(cwd)
if "COVERAGE_FILE" in os.environ:
cov = coverage.Coverage(data_file="forked.cov",
source=["gavo"],
auto_data=True)
cov.config.disable_warnings = ["module-not-measured"]
cov.start()
if preexec_fn:
preexec_fn()
exitcode = 0
except:
exc_type, exc_value, tb = sys.exc_info()
# Save the traceback and attach it to the exception object
exc_lines = traceback.format_exception(exc_type,
exc_value,
tb)
exc_value.child_traceback = ''.join(exc_lines)
os.write(errpipe_write, pickle.dumps(exc_value))
os.close(errpipe_write)
os._exit(255)
os.close(errpipe_write)
try:
executable()
except SystemExit as ex:
exitcode = ex.code
if "COVERAGE_FILE" in os.environ:
cov.stop()
cov.save()
sys.stderr.close()
sys.stdout.close()
os._exit(exitcode)
# Parent
os.close(errpipe_write)
if gc_was_enabled:
gc.enable()
# Now wait for the child to come up (at which point it will
# close its error pipe)
errpipe_data = bytearray()
while True:
part = os.read(errpipe_read, 50000)
errpipe_data += part
if not part or len(errpipe_data)>50000:
break
if errpipe_data:
child_exception = pickle.loads(errpipe_data)
raise child_exception
finally:
# close the FDs used by the child if they were opened
if p2cread!=-1 and p2cwrite!=-1:
_close_in_parent(p2cread)
if c2pwrite!=-1 and c2pread!=-1:
_close_in_parent(c2pwrite)
if errwrite!=-1 and errread!=-1:
_close_in_parent(errwrite)
# be sure the error pipe FD is eventually no matter what
os.close(errpipe_read)
[docs]class VerboseTest(testresources.ResourcedTestCase):
"""A TestCase with a couple of convenient assert methods.
"""
[docs] def assertEqualForArgs(self, callable, result, *args):
self.assertEqual(callable(*args), result,
"Failed for arguments %s. Expected result is: %s, result found"
" was: %s"%(str(args), repr(result), repr(callable(*args))))
[docs] def assertRaisesVerbose(self, exception, callable, args, msg):
try:
callable(*args)
except exception:
return
except:
raise
else:
raise self.failureException(msg)
[docs] def assertRaisesWithMsg(self, exception, errMsg, callable, args, msg=None,
**kwargs):
try:
value = callable(*args, **kwargs)
except exception as ex:
if errMsg==str(ex):
pass # make sure we decide on __eq__ for EqualingRE's sake
else:
raise self.failureException(
"Expected %r, got %r as exception message"%(errMsg, str(ex)))
return ex
except:
raise
else:
raise self.failureException(msg or
"%s not raised (function returned %s)"%(
str(exception), repr(value)))
[docs] def assertRuns(self, callable, args, msg=None):
try:
callable(*args)
except Exception as ex:
raise self.failureException("Should run, but raises %s (%s) exception"%(
ex.__class__.__name__, str(ex)))
[docs] def assertAlmostEqualVector(self, first, second, places=7, msg=None):
try:
for f, s in zip(first, second):
self.assertAlmostEqual(f, s, places)
except AssertionError:
if msg:
raise AssertionError(msg)
else:
raise AssertionError("%s != %s within %d places"%(
first, second, places))
[docs] def assertEqualToWithin(self, a, b, ratio=1e-7, msg=None):
"""asserts that abs(a-b/(a+b))<ratio.
If a+b are an underflow, we error out right now.
"""
if msg is None:
msg = "%s != %s to within %s of the sum"%(a, b, ratio)
denom = abs(a+b)
self.assertTrue(abs(a-b)/denom<ratio, msg)
[docs] def assertOutput(self, toExec, argList, expectedStdout=None,
expectedStderr="", expectedRetcode=0, input=None,
stdoutStrings=None):
"""checks that execName called with argList has the given output and return
value.
expectedStdout and expectedStderr can be functions. In that case,
the output is passed to the function, and an assertionError is raised
if the functions do not return true.
The 0th argument in argList is automatically added, only pass "real"
command line arguments.
toExec may also be a zero-argument python function. In that case, the
process is forked and the function is called, with sys.argv according to
argList. This helps to save startup time for python main functions.
"""
for name in ["output.stderr", "output.stdout"]:
try:
os.unlink(name)
except os.error:
pass
if isinstance(toExec, str):
p = subprocess.Popen([toExec]+argList, executable=toExec,
stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
else:
p = ForkingSubprocess(["test harness"]+argList, executable=toExec,
stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate(input=utils.bytify(input))
retcode = p.wait()
try:
if isinstance(expectedStderr, (bytes, str)):
self.assertEqual(err, utils.bytify(expectedStderr))
elif isinstance(expectedStderr, list):
for sample in expectedStderr:
self.assertTrue(utils.bytify(sample) in err,
"%s missing in stderr"%repr(sample))
else:
self.assertTrue(expectedStderr(err), "Stderr didn't match functional"
" expectation: %s"%err.decode("ascii", "?"))
self.assertEqual(expectedRetcode, retcode)
except AssertionError:
with open("output.stdout", "wb") as f:
f.write(out)
with open("output.stderr", "wb") as f:
f.write(err)
raise
try:
if isinstance(expectedStdout, (bytes, str)):
self.assertEqual(out, utils.bytify(expectedStdout))
elif isinstance(expectedStdout, list):
for sample in expectedStdout:
self.assertTrue(utils.bytify(sample) in out,
"%s missing in stdout"%repr(sample))
elif expectedStdout is not None:
self.assertTrue(expectedStdout(out))
except AssertionError:
with open("output.stdout", "wb") as f:
f.write(out)
raise
[docs] def assertEqualIgnoringAliases(self, result, expectation):
pat = re.escape(re.sub(r"\s+", " ", expectation)
).replace("ASWHATEVER", "AS [a-z]+")+"$"
if not re.match(pat, re.sub(r"\s+", " ", result)):
raise AssertionError("%r != %r"%(result, expectation))
_xmlJunkPat = re.compile("|".join([
'(xmlns(:[a-z0-9]+)?="[^"]*"\s*)',
'((frame_|coord_system_)?id="[^"]*")',
'(xsi:schemaLocation="[^"]*"\s*)']))
[docs]def cleanXML(aString):
"""removes IDs and some other detritus from XML literals.
The result will be invalid XML, and all this assumes the fixed-prefix
logic of the DC software.
For new tests, you should just getXMLTree and XPath tests.
aString can be bytes, in which case it will be interpreted as ASCII.
cleanXML returns a string in any case.
"""
return re.sub("\s+", " ",
_xmlJunkPat.sub('',
utils.debytify(aString))).strip(
).replace(" />", "/>").replace(" >", ">")
[docs]class SamplesBasedAutoTest(type):
"""A metaclass that builds tests out of a samples attribute of a class.
To use this, give the class a samples attribute containing a sequence
of anything, and a _runTest(sample) method receiving one item of
that sequence.
The metaclass will create one test<n> method for each sample.
"""
def __new__(cls, name, bases, dict):
for sampInd, sample in enumerate(dict.get("samples", ())):
def testFun(self, sample=sample):
self._runTest(sample)
dict["test%02d"%sampInd] = testFun
return type.__new__(cls, name, bases, dict)
[docs]class SimpleSampleComparisonTest(VerboseTest, metaclass=SamplesBasedAutoTest):
"""A base class for tests that simply run a function and compare
for equality.
The function to be called is in the functionToRun attribute (wrap
it in a staticmethod).
The samples are pairs of (input, output). Output may be an
exception (or just the serialised form of the exception).
"""
def _runTest(self, sample):
val, expected = sample
try:
self.assertEqual(self.functionToRun(val),
expected)
except AssertionError:
raise
except Exception as ex:
if str(ex)!=str(expected):
raise
[docs]def computeWCSKeys(pos, size, cutCrap=False):
"""returns a dictionary containing a 2D WCS structure for an image
centered at pos with angular size. Both are 2-tuples in degrees.
"""
imgPix = (1000., 1000.)
res = {
"CRVAL1": pos[0],
"CRVAL2": pos[1],
"CRPIX1": imgPix[0]/2.,
"CRPIX2": imgPix[1]/2.,
"CUNIT1": "deg",
"CUNIT2": "deg",
"CD1_1": size[0]/imgPix[0],
"CD1_2": 0,
"CD2_2": size[1]/imgPix[1],
"CD2_1": 0,
"NAXIS1": imgPix[0],
"NAXIS2": imgPix[1],
"NAXIS": 2,
"CTYPE1": 'RA---TAN-SIP',
"CTYPE2": 'DEC--TAN-SIP',
"LONPOLE": 180.}
if not cutCrap:
res.update({"imageTitle": "test image at %s"%repr(pos),
"instId": None,
"dateObs":55300+pos[0],
"refFrame": None,
"wcs_equinox": None,
"bandpassId": None,
"bandpassUnit": None,
"bandpassRefval": None,
"bandpassLo": pos[0]+0.0001,
"bandpassHi": pos[0]+size[0],
"pixflags": None,
"accref": "image/%s/%s"%(pos, size),
"accsize": (30+int(pos[0]+pos[1]+size[0]+size[1]))*1024,
"embargo": None,
"owner": None,
})
return res
[docs]class StandIn(object):
"""A class having the attributes passed as kwargs to the constructor.
"""
def __init__(self, **kwargs):
for key, value in list(kwargs.items()):
setattr(self, key, value)
[docs]def getTestRD(id="test.rd"):
from gavo import rscdesc #noflake: import above is conditional
from gavo import base
return base.caches.getRD("data/%s"%id)
[docs]def getTestTable(tableName, id="test.rd"):
return getTestRD(id).getTableDefById(tableName)
[docs]def getTestData(dataId):
return getTestRD().getDataById(dataId)
[docs]def hasUDF(udfName):
"""returns true if this system has an ADQL user defined function named
udfName.
This is intended to be used as in::
@unittest.skipUnless(testhelpers.hasUDF("GAVO_FOO"), "...")
Caution: UDF names are upper case here.
"""
return udfName in ufunctions.UFUNC_REGISTRY
[docs]def captureOutput(callable, args=(), kwargs={}):
"""runs ``callable(*args, **kwargs)`` and captures the output.
The function returns a tuple of return value, stdout output, stderr output.
"""
realOut, realErr = sys.stdout, sys.stderr
sys.stdout, sys.stderr = io.StringIO(), io.StringIO()
try:
retVal = 2 # in case the callable sys.exits
try:
retVal = callable(*args, **kwargs)
except SystemExit:
# don't terminate just because someone thinks it's a good idea
pass
finally:
outCont, errCont = sys.stdout.getvalue(), sys.stderr.getvalue()
sys.stdout, sys.stderr = realOut, realErr
return retVal, outCont, errCont
[docs]class CatchallUI(object):
"""A replacement for base.ui, collecting the messages being sent.
This is to write tests against producing UI events. Use it with
the messageCollector context manager below.
"""
def __init__(self):
self.events = []
[docs] def record(self, evType, args, kwargs):
self.events.append((evType, args, kwargs))
def __getattr__(self, attName):
if attName.startswith("notify"):
return lambda *args, **kwargs: self.record(attName[6:], args, kwargs)
[docs]@contextlib.contextmanager
def messageCollector():
"""A context manager recording UI events.
The object returned by the context manager is a CatchallUI; get the
events accumulated during the run time in its events attribute.
"""
tempui = CatchallUI()
realui = base.ui
try:
base.ui = tempui
yield tempui
finally:
base.ui = realui
[docs]@contextlib.contextmanager
def fakeTime(t):
"""makes time.time() return t within the controlled block.
"""
time_time = time.time
try:
time.time = lambda *args: t
yield
finally:
time.time = time_time
[docs]def getServerInThread(data, onlyOnce=False, contentType="text/plain"):
"""runs a server in a thread and returns thread and base url.
onlyOnce will configure the server such that it destroys itself
after having handled one request. The thread would still need
to be joined.
So, better use the DataServer context manager.
"""
if isinstance(data, str):
data = data.encode("utf-8")
class Handler(http.server.BaseHTTPRequestHandler):
def log_request(self, *args):
pass
def do_404(self):
self.send_response(404, "Not Found")
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"You know this wouldn't be here")
def do_GET(self):
if self.path.endswith("does-not-exist"):
return self.do_404()
self.send_response(200, "Ok")
self.send_header("Content-Type", contentType)
self.end_headers()
self.wfile.write(data)
do_POST = do_GET
port = 34000
httpd = http.server.HTTPServer(('', port), Handler)
if onlyOnce:
serve = httpd.handle_request
else:
serve = httpd.serve_forever
t = threading.Thread(target=serve)
t.setDaemon(True)
t.start()
return httpd, t, "http://localhost:%s"%port
[docs]@contextlib.contextmanager
def DataServer(data):
"""a context manager for briefly running a web server returning data.
This yields the base URL the server is listening on.
"""
httpd, t, baseURL = getServerInThread(data)
try:
yield baseURL
finally:
httpd.shutdown()
t.join(10)
[docs]@contextlib.contextmanager
def userconfigContent(content):
"""a context manager to temporarily set some content to userconfig.
This cleans up after itself and clears any userconfig cache before
it sets to work.
content are RD elements without the root (resource) tag.
"""
userConfigPath = os.path.join(
base.getConfig("configDir"), "userconfig.rd")
base.caches.clearForName(userConfigPath[:-3])
with open(userConfigPath, "w") as f:
f.write('<resource schema="__system">\n'
+content
+'\n</resource>\n')
try:
yield
finally:
os.unlink(userConfigPath)
base.caches.clearForName(userConfigPath[:-3])
[docs]@contextlib.contextmanager
def tempConfig(*items):
"""sets (sect, key, value) pairs in the config temporarily and re-sets
the original values after the controlled block.
"""
origValues = []
for sect, key, val in items:
origValues.append((sect, key, config.get(sect, key)))
config.set(sect, key, val)
try:
yield
finally:
for sect, key, val in origValues:
config.getitem(sect, key).value = val
[docs]@contextlib.contextmanager
def forceConnection(conn):
"""makes the sqlsupport connection context managers return conn
in the controlled block.
It is up to the programmer to make sure that actually makes sense
in a given test. As opposed to the originals, there will be no
automatic commits here. Caveat emptor.
"""
symbols = ["getUntrustedConn", "getTableConn", "getAdminConn",
"getWritableUntrustedConn", "getWritableTableConn", "getWritableAdminConn"]
def _():
yield conn
cm = contextlib.contextmanager(_)
originals = []
for sym in symbols:
originals.append(getattr(sqlsupport, sym))
setattr(sqlsupport, sym, cm)
try:
yield
finally:
originals.reverse()
for sym in symbols:
originals.append(getattr(sqlsupport, sym))
setattr(sqlsupport, sym, originals.pop())
[docs]def main(testClass, methodPrefix=None):
ensureResources()
if os.environ.get("GAVO_LOG")!="no":
base.DEBUG = True
from gavo.user import logui
logui.LoggingUI(base.ui)
if "GAVO_OOTTEST" not in os.environ:
# run any pending upgrades (that's a test for them, too... of sorts)
from gavo.user import upgrade
upgrade.upgrade()
className, methodPrefix = None, None
try:
# two args: first one is class name, locate it in caller's globals
# and ignore anything before any dot for cut'n'paste convenience
if len(sys.argv)==3:
className = sys.argv[1].split(".")[-1]
methodPrefix = sys.argv[2]
# one arg: dissect dots and decide what to do then
elif len(sys.argv)==2:
parts = sys.argv[1].split(".")
if len(parts)==3:
className, methodPrefix = parts[1:]
elif len(parts)==2:
className, methodPrefix = parts
elif len(parts)==1:
className = parts[0]
else:
sys.exit(f"{className}?")
elif len(sys.argv)==1:
pass
else:
sys.exit("usage: test-id or class-name test-prefix")
if className is None:
suite = testresources.TestLoader().loadTestsFromModule(
sys.modules["__main__"])
else:
testClass = getattr(sys.modules["__main__"], className)
suite = unittest.makeSuite(testClass, methodPrefix or "test",
suiteClass=testresources.OptimisingTestSuite)
runner = unittest.TextTestRunner(
verbosity=int(os.environ.get("TEST_VERBOSITY", 1)))
result = runner.run(suite)
if result.errors or result.failures:
sys.exit(1)
else:
sys.exit(0)
except (SystemExit, KeyboardInterrupt):
raise
except:
base.showHints = True
from gavo.user import errhandle
traceback.print_exc()
errhandle.raiseAndCatch(base)
sys.exit(1)