Package gavo :: Package helpers :: Module trialhelpers
[frames] | no frames]

Source Code for Module gavo.helpers.trialhelpers

  1  """ 
  2  Helpers for trial-based tests, in particular retrieving pages. 
  3  """ 
  4   
  5  #c Copyright 2008-2019, the GAVO project 
  6  #c 
  7  #c This program is free software, covered by the GNU GPL.  See the 
  8  #c COPYING file in the source distribution. 
  9   
 10   
 11  import os 
 12  import urlparse 
 13  import warnings 
 14  import weakref 
 15  from cStringIO import StringIO 
 16   
 17  from nevow import context 
 18  from nevow import inevow 
 19  from nevow import util 
 20  from nevow import testutil 
 21  from nevow import url 
 22  from twisted.trial.unittest import TestCase as TrialTest 
 23  from twisted.python import failure  #noflake: exported name 
 24  from twisted.internet import defer 
 25  from twisted.web.http_headers import Headers 
 26   
 27  from gavo.helpers import testhelpers 
 28  from gavo.helpers import testtricks 
 29   
 30  from gavo import base 
 31  from gavo import rsc 
 32  from gavo import utils 
 33   
 34  base.setConfig("web", "enabletests", "True") 
 35  from gavo.web import weberrors 
 36  from gavo.web import root 
37 38 39 -def _requestDone(result, request, ctx):
40 if isinstance(result, basestring): 41 if result: 42 request.write(result) 43 elif isinstance(result, url.URL): 44 request.code = 303 45 request.headers["location"] = str(result) 46 elif hasattr(result, "renderHTTP"): 47 return _deferredRender((result, ()), ctx) 48 else: 49 warnings.warn("Unsupported render result: %s"%repr(result)) 50 request.d.callback(request.accumulator) 51 return request.accumulator, request
52
53 54 -def _renderCrashAndBurn(flr, ctx):
55 return _renderException(flr, ctx)
56
57 58 -def _renderException(flr, ctx):
59 return _doRender( 60 weberrors.getDCErrorPage(flr), ctx, formatExcs=False)
61
62 63 -def _doRender(page, ctx, formatExcs=True):
64 request = inevow.IRequest(ctx) 65 if not hasattr(page, "renderHTTP"): 66 return _requestDone(page, request, ctx) 67 68 d = util.maybeDeferred(page.renderHTTP, 69 context.PageContext( 70 tag=page, parent=context.RequestContext(tag=request))) 71 72 d.addCallback(_requestDone, request, ctx) 73 if formatExcs: 74 d.addErrback(_renderCrashAndBurn, ctx) 75 return d
76
77 78 -def _deferredRender(res, ctx):
79 page, segments = res 80 if segments: 81 return util.maybeDeferred(page.locateChild, 82 ctx, segments 83 ).addCallback(_deferredRender, ctx 84 ).addErrback(_renderException, ctx) 85 86 else: 87 try: 88 return _doRender(page, ctx) 89 except: 90 assert False, "Exceptions should not escape from _doRender"
91
92 93 -class FakeFieldStorage(object):
94 filename = None
95 - def __init__(self, args):
96 self.args = args
97
98 - def __iter__(self):
99 return iter(self.args)
100
101 - def getfirst(self, key):
102 return self.args[key][0]
103
104 - def __getitem__(self, key):
105 return self.args[key][0]
106
107 - def keys(self):
108 return self.args.keys()
109 110 @property
111 - def file(self):
112 return StringIO(self.args[0])
113
114 115 -class _HeaderFaker(object):
116 """A helper for simulating the old request.headers attribute in 117 twisted > jessie 118 """
119 - def __init__(self, request):
120 self.request = weakref.proxy(request)
121
122 - def __getitem__(self, name):
123 try: 124 return self.request.responseHeaders.getRawHeaders(name)[0] 125 except IndexError: 126 raise KeyError(name)
127
128 - def __contains__(self, name):
129 return self.request.responseHeaders.hasHeader(name)
130
131 132 -class FakeRequest(testutil.AccumulatingFakeRequest):
133 """A Request for testing purpuses. 134 135 We have a version of our own for this since nevow's has a 136 registerProducer that produces an endless loop with push 137 producers (which is what we have). 138 """
139 - def __init__(self, *args, **kwargs):
140 self.finishDeferred = defer.Deferred() 141 testutil.AccumulatingFakeRequest.__init__(self, *args, **kwargs) 142 self.headers_out = _HeaderFaker(self) 143 # compatibility code for trial < stretch (remove at some point) 144 if not hasattr(self, "responseHeaders"): 145 self.responseHeaders = Headers() 146 def _(key, val): 147 self.responseHeaders.setRawHeaders(key, [val]) 148 testutil.AccumulatingFakeRequest.setHeader(self, key, val)
149 self.setHeader = _
150
151 - def registerProducer(self, producer, isPush):
152 self.channel = 1 # must be non-null so we unregister 153 self.producer = producer 154 if not isPush: 155 testutil.AccumulatingFakeRequest.registerProducer( 156 self, producer, isPush)
157
158 - def unregisterProducer(self):
159 del self.channel 160 del self.producer
161
162 - def notifyFinish(self):
163 return self.finishDeferred
164
165 166 -def _buildRequest(method, path, rawArgs, requestClass=FakeRequest):
167 args = {} 168 for k, v in rawArgs.iteritems(): 169 if isinstance(v, list): 170 args[k] = v 171 else: 172 args[k] = [v] 173 if path.startswith("http://"): 174 path = urlparse.urlparse(path).path 175 req = requestClass(uri="/"+path, args=args) 176 # Service for my TAPRequest hack (see web.taprender). 177 req.fields = FakeFieldStorage(args) 178 req.headers = {} 179 req.method = method 180 return req
181
182 183 -def getRequestContext(path, method="GET", args=None, 184 requestMogrifier=None, requestClass=FakeRequest):
185 if args is None: 186 args = {} 187 req = _buildRequest(method, "http://localhost"+path, args, 188 requestClass=requestClass) 189 if requestMogrifier is not None: 190 requestMogrifier(req) 191 ctx = context.WovenContext() 192 ctx.remember(req) 193 return ctx
194
195 196 -def runQuery(page, method, path, args, requestMogrifier=None, 197 requestClass=FakeRequest):
198 """runs a query on a page. 199 200 The query should look like it's coming from localhost. 201 202 The thing returns a deferred firing a pair of the result (a string) 203 and the request (from which you can glean headers and such). 204 """ 205 ctx = getRequestContext(path, method, args, requestMogrifier, 206 requestClass=requestClass) 207 segments = tuple(path.split("/"))[1:] 208 return util.maybeDeferred( 209 page.locateChild, ctx, segments 210 ).addCallback(_deferredRender, ctx)
211
212 213 -class RenderTest(TrialTest):
214 """a base class for tests of twisted web resources. 215 """ 216 renderer = None # Override with the resource to be tested. 217
218 - def assertStringsIn(self, result, strings, inverse=False, 219 customTest=None):
220 content = result[0] 221 try: 222 for s in strings: 223 if inverse: 224 self.failIf(s in content, "'%s' in remote.data"%s) 225 else: 226 self.failIf(s not in content, "'%s' not in remote.data"%s) 227 228 if customTest is not None: 229 customTest(content) 230 except AssertionError: 231 with open("remote.data", "w") as f: 232 f.write(content) 233 raise 234 return result
235
236 - def assertResultHasStrings(self, method, path, args, strings, 237 rm=None, inverse=False, customTest=None):
238 return runQuery(self.renderer, method, path, args, rm, 239 ).addCallback(self.assertStringsIn, strings, inverse=inverse, 240 customTest=customTest)
241
242 - def assertGETHasStrings(self, path, args, strings, rm=None, 243 customTest=None):
244 return self.assertResultHasStrings("GET", 245 path, args, strings, rm, customTest=customTest)
246
247 - def assertGETLacksStrings(self, path, args, strings, rm=None):
248 return self.assertResultHasStrings("GET", 249 path, args, strings, rm, inverse=True)
250
251 - def assertPOSTHasStrings(self, path, args, strings, rm=None):
252 return self.assertResultHasStrings("POST", path, args, strings, 253 rm)
254
255 - def assertStatus(self, path, status, args={}, rm=None):
256 def check(res): 257 self.assertEqual(res[1].code, status) 258 return res
259 return runQuery(self.renderer, "GET", path, args).addCallback( 260 check)
261
262 - def assertGETRaises(self, path, args, exc, alsoCheck=None):
263 def cb(res): 264 raise AssertionError("%s not raised (returned %s instead)"%( 265 exc, res))
266 def eb(flr): 267 flr.trap(exc) 268 if alsoCheck is not None: 269 alsoCheck(flr) 270 271 return runQuery(self.renderer, "GET", path, args 272 ).addCallback(cb 273 ).addErrback(eb) 274
275 - def assertResponseIsValid(self, res):
276 errs = testtricks.getXSDErrors(res[0], True) 277 if errs: 278 with open("remote.data", "w") as f: 279 f.write(res[0]) 280 raise AssertionError(errs)
281
282 - def assertGETIsValid(self, path, args={}):
283 return runQuery(self.renderer, "GET", path, args 284 ).addCallback(self.assertResponseIsValid)
285
286 287 -class ArchiveTest(RenderTest):
288 renderer = root.ArchiveService()
289
290 291 @utils.memoized 292 -def getImportConnection():
293 # we cannot use the connection pools here since they may created threads. 294 return base.getDBConnection("admin")
295
296 297 -def provideRDData(rdName, ddId, _imported=set()):
298 """makes ddId from rdName and returns a cleanup function. 299 300 This is for creating temporary data for tests; it's supposed to be used 301 as in:: 302 303 atexit.register(provideRDData("test", "import_fitsprod")) 304 305 This keeps track of (rdName, ddId) pairs it's already loaded and 306 doesn't import them again. 307 """ 308 if (rdName, ddId) in _imported: 309 return lambda: None 310 311 dd = testhelpers.getTestRD(rdName).getById(ddId) 312 conn = getImportConnection() 313 dataCreated = rsc.makeData(dd, connection=conn) 314 conn.commit() 315 _imported.add((rdName, ddId)) 316 317 # may be gone in atexit 318 nvArg = rsc.parseNonValidating 319 320 def cleanup(): 321 dataCreated.dropTables(nvArg)
322 323 return cleanup 324 325 326 if os.environ.get("GAVO_LOG")!="no": 327 from gavo.user import logui 328 logui.LoggingUI(base.ui) 329