Package gavo :: Package votable :: Module tapquery
[frames] | no frames]

Source Code for Module gavo.votable.tapquery

  1  """ 
  2  An interface to querying TAP servers (i.e., a TAP client). 
  3  """ 
  4   
  5  #c Copyright 2008-2019, the GAVO project 
  6  #c 
  7  #c This program is free software, covered by the GNU GPL.  See the 
  8  #c COPYING file in the source distribution. 
  9   
 10   
 11  import httplib 
 12  import socket 
 13  import time 
 14  import traceback 
 15  import urllib 
 16  import urlparse 
 17  from cStringIO import StringIO 
 18  from email.Message import Message 
 19  from email.MIMEMultipart import MIMEMultipart 
 20  from xml import sax 
 21   
 22  from gavo import utils 
 23  from gavo.votable import votparse 
 24  from gavo.votable.model import VOTable as V 
 25   
 26   
 27  # Ward against typos 
 28  PENDING = "PENDING" 
 29  QUEUED = "QUEUED" 
 30  EXECUTING = "EXECUTING" 
 31  COMPLETED = "COMPLETED" 
 32  ERROR = "ERROR" 
 33  ABORTED = "ABORTED" 
 34  UNKNOWN = "UNKNOWN" 
 35   
 36   
 37  debug = False 
38 39 40 -class Error(utils.Error):
41 """The base class for all TAP-related exceptions. 42 """
43
44 45 -class ProtocolError(Error):
46 """is raised when the remote server violated the local assumptions. 47 """
48
49 50 -class WrongStatus(ProtocolError):
51 """is raised when request detects the server returned an invalid 52 status. 53 54 These are constructed with the status returnd (available as 55 foundStatus) data payload of the response (available as payload). 56 """
57 - def __init__(self, msg, foundStatus, payload, hint=None):
58 ProtocolError.__init__(self, msg, hint) 59 self.args = [msg, foundStatus, payload, hint] 60 self.payload, self.foundStatus = payload, foundStatus
61
62 63 -class RemoteError(Error):
64 """is raised when the remote size signals an error. 65 66 The content of the remote error document can be retrieved in the 67 remoteMessage attribute. 68 """
69 - def __init__(self, remoteMessage):
70 self.remoteMessage = remoteMessage 71 Error.__init__(self, 72 "Remote: "+remoteMessage, 73 hint="This means that" 74 " something in your query was bad according to the server." 75 " Details may be available in the Exceptions' remoteMessage" 76 " attribute") 77 self.args = [remoteMessage]
78
79 - def __str__(self):
80 return self.remoteMessage
81
82 83 -class RemoteAbort(Error):
84 """is raised by certain check functions when the remote side has aborted 85 the job. 86 """
87 - def __init__(self):
88 Error.__init__(self, "Aborted") 89 self.args = []
90
91 - def __str__(self):
92 return "The remote side has aborted the job"
93
94 95 -class NetworkError(Error):
96 """is raised when a generic network error happens (can't connect,...) 97 """
98
99 100 -class _FormData(MIMEMultipart):
101 """is a container for multipart/form-data encoded messages. 102 103 This is usually used for file uploads. 104 """
105 - def __init__(self):
106 MIMEMultipart.__init__(self, "form-data") 107 self.set_param("boundary", "========== bounda r y 930 ") 108 self.epilogue = ""
109
110 - def addFile(self, paramName, fileName, data):
111 """attaches the contents of fileName under the http parameter name 112 paramName. 113 """ 114 msg = Message() 115 msg.set_type("application/octet-stream") 116 msg["Content-Disposition"] = "form-data" 117 msg.set_param("name", paramName, "Content-Disposition") 118 msg.set_param("filename", fileName, "Content-Disposition") 119 msg.set_payload(data) 120 self.attach(msg)
121
122 - def addParam(self, paramName, paramVal):
123 """adds a form parameter paramName with the (string) value paramVal 124 """ 125 msg = Message() 126 msg["Content-Disposition"] = "form-data" 127 msg.set_param("name", paramName, "Content-Disposition") 128 msg.set_payload(paramVal) 129 self.attach(msg)
130
131 - def forHTTPUpload(self):
132 """returns a string serialisation of this message suitable for HTTP 133 upload. 134 135 This is as_string, except we're introducing crlfs when it seems 136 the line separator is just an lf. 137 """ 138 data = self.as_string() 139 if not "\r\n" in data: 140 data = data.replace("\n", "\r\n") 141 return data
142 143 @classmethod
144 - def fromDict(cls, dict):
145 self = cls() 146 for key, value in dict.iteritems(): 147 self.addParam(key, value) 148 return self
149
150 151 -def _getErrorInfo(votString):
152 """returns the message from a TAP error VOTable. 153 154 if votString is not a TAP error VOTable, it is returned verbatim. 155 156 TODO: For large responses, this may take a while. It's probably 157 not worth it in such cases. Or at all. Maybe we should hunt 158 for the INFO somewhere else? 159 """ 160 try: 161 for el in votparse.parseString(votString, watchset=[V.INFO]): 162 if isinstance(el, V.INFO): 163 if el.name=="QUERY_STATUS" and el.value=="ERROR": 164 return el.text_ 165 else: 166 # it's data, which we want to skip quickly 167 for _ in el: pass 168 except Exception: 169 # votString's not a suitable VOTable, fall through to return votString 170 pass 171 return votString
172
173 174 -def _makeFlatParser(parseFunc):
175 """returns a "parser" class for _parseWith just calling a function on a string. 176 177 _parseWith is designed for utils.StartEndParsers, but it's convenient 178 to use it when there's no XML in the responses as well. 179 180 So, this class wraps a simple function into a StartEndParser-compatible 181 form. 182 """ 183 class FlatParser(object): 184 def parseString(self, data): 185 self.result = parseFunc(data)
186 def getResult(self): 187 return self.result 188 return FlatParser 189
190 191 -def _parseWith(parser, data):
192 """uses the utils.StartEndParser-compatible parser to parse the string data. 193 """ 194 try: 195 parser.parseString(data) 196 return parser.getResult() 197 except (ValueError, IndexError, sax.SAXParseException): 198 if debug: 199 traceback.print_exc() 200 f = open("server_response", "w") 201 f.write(data) 202 f.close() 203 raise ProtocolError("Malformed response document.", hint= 204 "If debug was enabled, you will find the server response in" 205 " the file server_response.")
206
207 208 -class _PhaseParser(utils.StartEndHandler):
209 """A parser accepting both plain text and XML replies. 210 211 Of course, the XML replies are against the standard, but -- ah, well. 212 """
213 - def _end_phase(self, name, attrs, content):
214 self.result = content
215
216 - def parseString(self, data):
217 if data.strip().startswith("<"): # XML :-) 218 utils.StartEndHandler.parseString(self, data) 219 else: 220 self.result = str(data).strip()
221
222 - def getResult(self):
223 return self.result
224
225 226 -class _QuoteParser(utils.StartEndHandler):
227 quote = None
228 - def parseDate(self, literal):
229 val = None 230 if literal and literal!="NULL": 231 val = utils.parseISODT(literal) 232 return val
233
234 - def _end_quote(self, name, attrs, content):
235 self.quote = self.parseDate(content.strip())
236
237 - def parseString(self, data):
238 data = data.strip() 239 if data.startswith("<"): # XML :-) 240 utils.StartEndHandler.parseString(self, data) 241 else: 242 self.quote = self.parseDate(data)
243
244 - def getResult(self):
245 return self.quote
246
247 248 -class _CaselessDictionary(dict):
249 """A dictionary that only has lower-case keys but treats keys in any 250 capitalization as equivalent. 251 """
252 - def __contains__(self, key):
253 dict.__contains__(self, key.lower())
254
255 - def __getitem__(self, key):
256 return dict.__getitem__(self, key.lower())
257
258 - def __setitem__(self, key, value):
259 dict.__setitem__(self, key.lower(), value)
260
261 - def __delitem__(self, key):
262 dict.__delitem__(self, key.lower())
263
264 265 -class _ParametersParser(utils.StartEndHandler):
266 - def _initialize(self):
267 self.parameters = _CaselessDictionary()
268
269 - def _end_parameter(self, name, attrs, content):
270 self.parameters[attrs["id"]] = content
271
272 - def getResult(self):
273 return self.parameters
274
275 276 -class _ResultsParser(utils.StartEndHandler):
277 - def _initialize(self):
278 self.results = []
279
280 - def _end_result(self, name, attrs, content):
281 attrs = self.getAttrsAsDict(attrs) 282 self.results.append(UWSResult(attrs["href"], 283 attrs.get("id"), attrs.get("type", "simple")))
284
285 - def getResult(self):
286 return self.results
287
288 289 -class _InfoParser(_ParametersParser, _ResultsParser):
290 - def _initialize(self):
291 self.info = {} 292 _ParametersParser._initialize(self) 293 _ResultsParser._initialize(self)
294
295 - def _end_jobId(self, name, attrs, content):
296 self.info[name] = content
297 298 _end_phase = _end_jobId 299
300 - def _end_executionDuration(self, name, attrs, content):
301 self.info[name] = float(content)
302
303 - def _end_destruction(self, name, attrs, content):
305
306 - def _end_job(self,name, attrs, content):
307 self.info["results"] = self.results 308 self.info["parameters"] = self.parameters
309
310 - def getResult(self):
311 return self.info
312
313 314 315 -class _AvailabilityParser(utils.StartEndHandler):
316 # VOSI 317 available = None
318 - def _end_available(self, name, attrs, content):
319 content = content.strip() 320 if content=="true": 321 self.available = True 322 elif content=="false": 323 self.available = False
324
325 - def getResult(self):
326 return self.available
327
328 329 -def _pruneAttrNS(attrs):
330 return dict((k.split(":")[-1], v) for k,v in attrs.items())
331
332 333 -class _CapabilitiesParser(utils.StartEndHandler):
334 # VOSI; each capability is a dict with at least a key interfaces. 335 # each interface is a dict with key type (namespace prefix not expanded; 336 # change that?), accessURL, and use.
337 - def __init__(self):
338 utils.StartEndHandler.__init__(self) 339 self.capabilities = []
340
341 - def _start_capability(self, name, attrs):
342 self.curCap = {"interfaces": []} 343 self.curCap["standardID"] = attrs.get("standardID")
344
345 - def _end_capability(self, name, attrs, content):
346 self.capabilities.append(self.curCap) 347 self.curCap = None
348
349 - def _start_interface(self, name, attrs):
350 attrs = _pruneAttrNS(attrs) 351 self.curInterface = {"type": attrs["type"], "role": attrs.get("role")}
352
353 - def _end_interface(self,name, attrs, content):
354 self.curCap["interfaces"].append(self.curInterface) 355 self.curInterface = None
356
357 - def _end_accessURL(self, name, attrs, content):
358 self.curInterface["accessURL"] = content.strip() 359 self.curInterface["use"] = attrs.get("use")
360
361 - def getResult(self):
362 return self.capabilities
363
364 365 -class _TablesParser(utils.StartEndHandler):
366 # VOSI
367 - def __init__(self):
368 utils.StartEndHandler.__init__(self) 369 self.tables = [] 370 self.curCol = None
371
372 - def _start_table(self, name, attrs):
373 self.tables.append(V.TABLE())
374
375 - def _start_column(self, name, attrs):
376 self.curCol = V.FIELD()
377
378 - def _end_column(self, name, attrs, content):
379 self.tables[-1][self.curCol] 380 self.curCol = None
381
382 - def _end_description(self, attName, attrs, content):
383 if self.getParentTag()=="table": 384 destObj = self.tables[-1] 385 elif self.getParentTag()=="column": 386 destObj = self.curCol 387 else: 388 # name/desc of something else -- ignore 389 return 390 destObj[V.DESCRIPTION[content]]
391
392 - def _endColOrTableAttr(self, attName, attrs, content):
393 if self.getParentTag()=="table": 394 destObj = self.tables[-1] 395 elif self.getParentTag()=="column": 396 destObj = self.curCol 397 else: 398 # name/desc of something else -- ignore 399 return 400 destObj(**{str(attName): content.strip()})
401 402 _end_name = _endColOrTableAttr 403
404 - def _endColAttr(self, attName, attrs, content):
405 self.curCol(**{str(attName): content.strip()})
406 407 _end_unit = _end_ucd = _endColAttr 408
409 - def _end_dataType(self, attName, attrs, content):
410 self.curCol(datatype=content.strip()) 411 if "arraysize" in attrs: 412 self.curCol(arraysize=attrs["arraysize"])
413
414 - def getResult(self):
415 return self.tables
416
417 418 -class UWSResult(object):
419 """a container type for a result returned by an UWS service. 420 421 It exposes id, href, and type attributes. 422 """
423 - def __init__(self, href, id=None, type=None):
424 self.href, self.id, self.type = href, id, type
425
426 427 -class LocalResult(object):
428 - def __init__(self, data, id, type):
429 self.data, self.id, self.type = data, id, type
430
431 432 -def _canUseFormEncoding(params):
433 """returns true if userParams can be transmitted in a 434 x-www-form-urlencoded payload. 435 """ 436 for val in params.values(): 437 if not isinstance(val, basestring): 438 return False 439 return True
440
441 442 -def request(scheme, host, path, data="", customHeaders={}, method="GET", 443 expectedStatus=None, followRedirects=False, setResponse=None, 444 timeout=None):
445 """returns a HTTPResponse object for an HTTP request to path on host. 446 447 This function builds a new connection for every request. 448 449 On the returned object, you cannot use the read() method. Instead 450 any data returned by the server is available in the data attribute. 451 452 data usually is a byte string, but you can also pass a dictionary 453 which then will be serialized using _FormData above. 454 455 You can set followRedirects to True. This means that the 456 303 "See other" codes that many UWS action generate will be followed 457 and the document at the other end will be obtained. For many 458 operations this will lead to an error; only do this for slightly 459 broken services. 460 461 In setResponse, you can pass in a callable that is called with the 462 server response body as soon as it is in. This is for when you want 463 to store the response even if request raises an error later on 464 (i.e., for sync querying). 465 """ 466 if scheme=="http": 467 connClass = httplib.HTTPConnection 468 elif scheme=="https": 469 connClass = httplib.HTTPSConnection 470 else: 471 assert False 472 473 headers = {"connection": "close", 474 "user-agent": "Python TAP library http://soft.g-vo.org/subpkgs"} 475 476 if not isinstance(data, basestring): 477 if _canUseFormEncoding(data): 478 data = urllib.urlencode(data) 479 headers["Content-Type"] = "application/x-www-form-urlencoded" 480 481 else: 482 form = _FormData.fromDict(data) 483 data = form.forHTTPUpload() 484 headers["Content-Type"] = form.get_content_type()+'; boundary="%s"'%( 485 form.get_boundary()) 486 headers["Content-Length"] = len(data) 487 headers.update(customHeaders) 488 489 try: 490 try: 491 conn = connClass(host, timeout=timeout) 492 except TypeError: # probably python<2.6, no timeout support 493 conn = httplib.HTTPConnection(host) 494 conn.request(method, path, data, headers) 495 except (socket.error, httplib.error) as ex: 496 raise NetworkError("Problem connecting to %s (%s)"% 497 (host, str(ex))) 498 499 resp = conn.getresponse() 500 resp.data = resp.read() 501 if setResponse is not None: 502 setResponse(resp.data) 503 conn.close() 504 505 if ((followRedirects and resp.status==303) 506 or resp.status==301 507 or resp.status==302): 508 parts = urlparse.urlparse(resp.getheader("location")) 509 return request(parts.scheme, parts.netloc, parts.path, 510 method="GET", expectedStatus=expectedStatus, 511 followRedirects=followRedirects-1) 512 513 if expectedStatus is not None: 514 if resp.status!=expectedStatus: 515 raise WrongStatus("Expected status %s, got status %s"%( 516 expectedStatus, resp.status), resp.status, resp.data) 517 return resp
518
519 520 -def _makeAtomicValueGetter(methodPath, parser):
521 # This is for building ADQLTAPJob's properties (phase, etc.) 522 def getter(self): 523 destURL = self.jobPath+methodPath 524 response = request(self.destScheme, self.destHost, destURL, 525 expectedStatus=200) 526 return _parseWith(parser(), response.data)
527 return getter 528
529 530 -def _makeAtomicValueSetter(methodPath, serializer, parameterName):
531 # This is for building ADQLTAPJob's properties (phase, etc.) 532 def setter(self, value): 533 destURL = self.jobPath+methodPath 534 request(self.destScheme, self.destHost, destURL, 535 {parameterName: serializer(value)}, method="POST", 536 expectedStatus=303)
537 return setter 538
539 540 -class _WithEndpoint(object):
541 """A helper class for classes constructed with an ADQL endpoint. 542 """
543 - def _defineEndpoint(self, endpointURL):
544 self.endpointURL = endpointURL.rstrip("/") 545 parts = urlparse.urlsplit(self.endpointURL) 546 self.destScheme = parts.scheme 547 self.destHost = parts.hostname 548 if parts.port: 549 self.destHost = "%s:%s"%(self.destHost, parts.port) 550 self.destPath = parts.path 551 if self.destPath.endswith("/"): 552 self.destPath = self.destPath[:-1]
553
554 555 -class ADQLTAPJob(_WithEndpoint):
556 """A facade for an ADQL-based async TAP job. 557 558 Construct it with the URL of the async endpoint and a query. 559 560 Alternatively, you can give the endpoint URL and a jobId as a 561 keyword parameter. This only makes sense if the service has 562 handed out the jobId before (e.g., when a different program takes 563 up handling of a job started before). 564 565 See :dachsdoc:`adql.html` for details. 566 """
567 - def __init__(self, endpointURL, query=None, jobId=None, lang="ADQL", 568 userParams={}, timeout=None):
569 self._defineEndpoint(endpointURL) 570 self.timeout = timeout 571 self.destPath = utils.ensureOneSlash(self.destPath)+"async" 572 if query is not None: 573 self.jobId, self.jobPath = None, None 574 self._createJob(query, lang, userParams) 575 elif jobId is not None: 576 self.jobId = jobId 577 else: 578 raise Error("Must construct ADQLTAPJob with at least query or jobId") 579 self._computeJobPath()
580
581 - def _computeJobPath(self):
582 self.jobPath = "%s/%s"%(self.destPath, self.jobId)
583
584 - def _createJob(self, query, lang, userParams):
585 params = { 586 "REQUEST": "doQuery", 587 "LANG": lang, 588 "QUERY": query} 589 for k,v in userParams.iteritems(): 590 params[k] = str(v) 591 response = request(self.destScheme, self.destHost, self.destPath, params, 592 method="POST", expectedStatus=303, timeout=self.timeout) 593 # The last part of headers[location] now contains the job id 594 try: 595 self.jobId = urlparse.urlsplit( 596 response.getheader("location", "")).path.split("/")[-1] 597 except ValueError: 598 raise utils.logOldExc( 599 ProtocolError("Job creation returned invalid job id"))
600
601 - def delete(self, usePOST=False):
602 """removes the job on the remote side. 603 604 usePOST=True can be used for servers that do not support the DELETE 605 HTTP method (a.k.a. "are broken"). 606 """ 607 if self.jobPath is not None: 608 if usePOST: 609 request(self.destScheme, self.destHost, self.jobPath, method="POST", 610 data={"ACTION": "DELETE"}, expectedStatus=303, 611 timeout=self.timeout) 612 else: 613 request(self.destScheme, self.destHost, self.jobPath, method="DELETE", 614 expectedStatus=303, timeout=self.timeout)
615
616 - def start(self):
617 """asks the remote side to start the job. 618 """ 619 request(self.destScheme, self.destHost, self.jobPath+"/phase", 620 {"PHASE": "RUN"}, method="POST", expectedStatus=303, 621 timeout=self.timeout)
622
623 - def abort(self):
624 """asks the remote side to abort the job. 625 """ 626 request(self.destScheme, self.destHost, self.jobPath+"/phase", 627 {"PHASE": "ABORT"}, method="POST", expectedStatus=303, 628 timeout=self.timeout)
629
630 - def raiseIfError(self):
631 """raises an appropriate error message if job has thrown an error or 632 has been aborted. 633 """ 634 phase = self.phase 635 if phase==ERROR: 636 raise RemoteError(self.getErrorFromServer()) 637 elif phase==ABORTED: 638 raise RemoteAbort()
639
640 - def waitForPhases(self, phases, pollInterval=1, increment=1.189207115002721, 641 giveUpAfter=None):
642 """waits for the job's phase to become one of the set phases. 643 644 This method polls. Initially, it does increases poll times 645 exponentially with increment until it queries every two minutes. 646 647 The magic number in increment is 2**(1/4.). 648 649 giveUpAfter, if given, is the number of iterations this method will 650 do. If none of the desired phases have been found until then, 651 raise a ProtocolError. 652 """ 653 attempts = 0 654 while True: 655 curPhase = self.phase 656 if curPhase in phases: 657 break 658 time.sleep(pollInterval) 659 pollInterval = min(120, pollInterval*increment) 660 attempts += 1 661 if giveUpAfter: 662 if attempts>giveUpAfter: 663 raise ProtocolError("None of the states in %s were reached" 664 " in time."%repr(phases), 665 hint="After %d attempts, phase was %s"%(attempts, curPhase))
666
667 - def run(self, pollInterval=1):
668 """runs the job and waits until it has finished. 669 670 The function raises an exception with an error message gleaned from the 671 server. 672 """ 673 self.start() 674 self.waitForPhases(set([COMPLETED, ABORTED, ERROR])) 675 self.raiseIfError()
676 677 executionDuration = property( 678 _makeAtomicValueGetter("/executionduration", _makeFlatParser(float)), 679 _makeAtomicValueSetter("/executionduration", str, "EXECUTIONDURATION")) 680 681 destruction = property( 682 _makeAtomicValueGetter("/destruction", _makeFlatParser(utils.parseISODT)), 683 _makeAtomicValueSetter("/destruction", 684 lambda dt: dt.strftime("%Y-%m-%dT%H:%M:%S.000"), "DESTRUCTION")) 685
686 - def makeJobURL(self, jobPath):
687 return self.endpointURL+"/async/%s%s"%(self.jobId, jobPath)
688
689 - def _queryJobResource(self, path, parser):
690 # a helper for phase, quote, etc. 691 response = request(self.destScheme, self.destHost, self.jobPath+path, 692 expectedStatus=200, timeout=self.timeout) 693 return _parseWith(parser, response.data)
694 695 @property
696 - def info(self):
697 """returns a dictionary of much job-related information. 698 """ 699 return self._queryJobResource("", _InfoParser())
700 701 @property
702 - def phase(self):
703 """returns the phase the job is in according to the server. 704 """ 705 return self._queryJobResource("/phase", _PhaseParser())
706 707 @property
708 - def quote(self):
709 """returns the estimate the server gives for the run time of the job. 710 """ 711 return self._queryJobResource("/quote", _QuoteParser())
712 713 @property
714 - def owner(self):
715 """returns the owner of the job. 716 """ 717 return self._queryJobResource("/owner", _makeFlatParser(str)())
718 719 @property
720 - def parameters(self):
721 """returns a dictionary mapping passed parameters to server-provided 722 string representations. 723 724 To set a parameter, use the setParameter function. Changing the 725 dictionary returned here will have no effect. 726 """ 727 return self._queryJobResource("/parameters", _ParametersParser())
728 729 @property
730 - def allResults(self):
731 """returns a list of UWSResult instances. 732 """ 733 return self._queryJobResource("/results", _ResultsParser())
734
735 - def getResultURL(self, simple=True):
736 """returns the URL of the ADQL result table. 737 """ 738 if simple: 739 return self.makeJobURL("/results/result") 740 else: 741 return self.allResults[0].href
742
743 - def openResult(self, simple=True):
744 """returns a file-like object you can read the default TAP result off. 745 746 To have the embedded VOTable returned, say 747 votable.load(job.openResult()). 748 749 If you pass simple=False, the URL will be taken from the 750 service's result list (the first one given there). Otherwise (the 751 default), results/result is used. 752 """ 753 return urllib.urlopen(self.getResultURL())
754
755 - def setParameter(self, key, value):
756 request(self.destScheme, self.destHost, self.jobPath+"/parameters", 757 data={key: value}, method="POST", expectedStatus=303, 758 timeout=self.timeout)
759
760 - def getErrorFromServer(self):
761 """returns the error message the server gives, verbatim. 762 """ 763 data = request(self.destScheme, self.destHost, self.jobPath+"/error", 764 expectedStatus=200, followRedirects=True, 765 timeout=self.timeout).data 766 return _getErrorInfo(data)
767
768 - def addUpload(self, name, data):
769 """adds uploaded tables, either from a file or as a remote URL. 770 771 You should not try to change UPLOAD yourself (e.g., using setParameter). 772 773 Data is either a string (i.e. a URI) or a file-like object (an upload). 774 """ 775 uploadFragments = [] 776 form = _FormData() 777 if isinstance(data, basestring): # a URI 778 assert ',' not in data 779 assert ';' not in data 780 uploadFragments.append("%s,%s"%(name, data)) 781 782 else: # Inline upload, data is a file 783 uploadKey = utils.intToFunnyWord(id(data)) 784 form.addFile(uploadKey, uploadKey, data.read()) 785 uploadFragments.append("%s,param:%s"%(name, uploadKey)) 786 787 form.addParam("UPLOAD", ";".join(uploadFragments)) 788 request(self.destScheme, self.destHost, self.jobPath+"/parameters", 789 method="POST", 790 data=form.forHTTPUpload(), expectedStatus=303, 791 customHeaders={"content-type": 792 form.get_content_type()+'; boundary="%s"'%(form.get_boundary())})
793
794 795 -class ADQLSyncJob(_WithEndpoint):
796 """A facade for a synchronous TAP Job. 797 798 This really is just a very glorified urllib.urlopen. Maybe some 799 superficial parallels to ADQLTAPJob are useful. 800 801 You can construct it, add uploads, and then start or run the thing. 802 Methods that make no sense at all for sync jobs ("phase") silently 803 return some more or less sensible fakes. 804 """
805 - def __init__(self, endpointURL, query=None, jobId=None, lang="ADQL", 806 userParams={}, timeout=None):
807 self._defineEndpoint(endpointURL) 808 self.query, self.lang = query, lang 809 self.userParams = userParams.copy() 810 self.result = None 811 self.uploads = [] 812 self._errorFromServer = None 813 self.timeout = timeout
814
815 - def postToService(self, params):
816 return request(self.destScheme, self.destHost, self.destPath+"/sync", 817 params, 818 method="POST", followRedirects=3, expectedStatus=200, 819 setResponse=self._setErrorFromServer, timeout=self.timeout)
820
821 - def delete(self, usePOST=None):
822 # Nothing to delete 823 pass
824
825 - def abort(self):
826 """does nothing. 827 828 You could argue that this could come from a different thread and we 829 could try to interrupt the ongoing request. Well, if you want it, 830 try it yourself or ask the author. 831 """
832
833 - def raiseIfError(self):
834 if self._errorFromServer is not None: 835 raise Error(self._errorFromServer)
836
837 - def waitForPhases(self, phases, pollInterval=None, increment=None, 838 giveUpAfter=None):
839 # you could argue that sync jobs are in no phase, but I'd say 840 # they are in all of them at the same time: 841 return
842
843 - def _setErrorFromServer(self, data):
844 # this is a somewhat convolved way to get server error messages 845 # out of request even when it later errors out. See the 846 # except construct around the postToService call in start() 847 # 848 # Also, try to interpret what's coming back as a VOTable with an 849 # error message; _getErrorInfo is robust against other junk. 850 self._errorFromServer = _getErrorInfo(data)
851
852 - def start(self):
853 params={ 854 "REQUEST": "doQuery", 855 "LANG": self.lang, 856 "QUERY": self.query} 857 params.update(self.userParams) 858 if self.uploads: 859 upFrags = [] 860 for name, key, data in self.uploads: 861 upFrags.append("%s,param:%s"%(name, key)) 862 params[key] = data 863 params["UPLOAD"] = ";".join(upFrags) 864 865 params = dict((k, str(v)) for k,v in params.iteritems()) 866 867 try: 868 resp = self.postToService(params) 869 self.result = LocalResult(resp.data, "TAPResult", resp.getheader( 870 "Content-Type")) 871 except Exception as msg: 872 # do not clear _errorFromServer; but if it's empty, make up one 873 # from our exception 874 if not self._errorFromServer: 875 self._errorFromServer = str(msg) 876 raise 877 else: 878 # all went well, clear error indicator 879 self._errorFromServer = None 880 return self
881
882 - def run(self, pollInterval=None):
883 return self.start()
884 885 @property
886 - def info(self):
887 return {}
888 889 @property
890 - def phase(self):
891 return None
892 893 @property
894 - def quote(self):
895 return None
896 897 @property
898 - def owner(self):
899 return None
900 901 @property
902 - def parameters(self):
903 return self.userParameters
904 905 @property
906 - def allResults(self):
907 if self.result is None: 908 return [] 909 else: 910 return [self.result]
911
912 - def openResult(self, simple=True):
913 if self.result is None: 914 raise Error("No result in so far") 915 return StringIO(self.result.data)
916
917 - def setParameter(self, key, value):
918 self.userParams[key] = value
919
920 - def getErrorFromServer(self):
921 return self._errorFromServer
922
923 - def addUpload(self, name, data):
924 if hasattr(data, "read"): 925 data = data.read() 926 if not isinstance(data, basestring): 927 raise NotImplementedError("Upload source must be file or string") 928 key = utils.intToFunnyWord(id(data)) 929 self.uploads.append((name, key, data))
930
931 932 -class ADQLEndpoint(_WithEndpoint):
933 """A facade for an ADQL endpoint. 934 935 This is only needed for inspecting server metadata (i.e., in general 936 only for rather fancy applications). 937 """
938 - def __init__(self, endpointURL):
939 self._defineEndpoint(endpointURL)
940
941 - def createJob(self, query, lang="ADQL-2.0", userParams={}):
942 return ADQLTAPJob(self.endpointURL, query, lang, userParams)
943 944 @property
945 - def available(self):
946 """returns True, False, or None (undecidable). 947 948 None is returned when /availability gives a 404 (which is legal) 949 or the returned document doesn't parse. 950 """ 951 try: 952 response = request(self.destScheme, self.destHost, 953 self.destPath+"/availability", expectedStatus=200) 954 res = _parseWith(_AvailabilityParser(), response.data) 955 except WrongStatus: 956 res = None 957 return res
958 959 @property
960 - def capabilities(self):
961 """returns a dictionary containing some meta info on the remote service. 962 963 Keys to look for include title, identifier, contact (the mail address), 964 and referenceURL. 965 966 If the remote server doesn't return capabilities as expected, an 967 empty dict is returned. 968 """ 969 return _parseWith(_CapabilitiesParser(), 970 request(self.destScheme, self.destHost, 971 self.destPath+"/capabilities").data)
972 973 @property
974 - def tables(self):
975 """returns a sequence of table definitions for the tables accessible 976 through this service. 977 978 The table definitions come as gavo.votable.Table instances. 979 """ 980 return _parseWith(_TablesParser(), 981 request(self.destScheme, self.destHost, self.destPath+"/tables").data)
982