An interface to querying TAP servers (i.e., a TAP client).
This is deprecated. Use pyvo.dal.tap instead.
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import io
import http.client
import socket
import time
import traceback
import urllib.request, urllib.parse, urllib.error
import urllib.parse
from email.message import Message
from email.mime.multipart import MIMEMultipart
from xml import sax
from gavo import utils
from gavo.votable import votparse
from gavo.votable.model import VOTable as V
# Ward against typos
# override this as you see fit
USER_AGENT = "Python TAP library http://soft.g-vo.org/subpkgs"
debug = False
[docs]class Error(utils.Error):
"""The base class for all TAP-related exceptions.
[docs]class ProtocolError(Error):
"""is raised when the remote server violated the local assumptions.
[docs]class WrongStatus(ProtocolError):
"""is raised when request detects the server returned an invalid
These are constructed with the status returned (available as
foundStatus) data payload of the response (available as payload).
def __init__(self, msg, foundStatus, payload, hint=None):
ProtocolError.__init__(self, msg, hint)
self.args = [msg, foundStatus, payload, hint]
self.payload, self.foundStatus = payload, foundStatus
[docs]class RemoteError(Error):
"""is raised when the remote size signals an error.
The content of the remote error document can be retrieved in the
remoteMessage attribute.
def __init__(self, remoteMessage):
self.remoteMessage = remoteMessage
"Remote: "+remoteMessage,
hint="This means that"
" something in your query was bad according to the server."
" Details may be available in the Exceptions' remoteMessage"
" attribute")
self.args = [remoteMessage]
def __str__(self):
return self.remoteMessage
[docs]class RemoteAbort(Error):
"""is raised by certain check functions when the remote side has aborted
the job.
def __init__(self):
Error.__init__(self, "Aborted")
self.args = []
def __str__(self):
return "The remote side has aborted the job"
[docs]class NetworkError(Error):
"""is raised when a generic network error happens (can't connect,...)
class _FormData(MIMEMultipart):
"""is a container for multipart/form-data encoded messages.
This is usually used for file uploads.
def __init__(self):
MIMEMultipart.__init__(self, "form-data")
self.set_param("boundary", "========== bounda r y 930")
self.epilogue = ""
def addFile(self, paramName, fileName, data):
"""attaches the contents of fileName under the http parameter name
msg = Message()
msg["Content-Disposition"] = "form-data"
msg.set_param("name", paramName, "Content-Disposition")
msg.set_param("filename", fileName, "Content-Disposition")
def addParam(self, paramName, paramVal):
"""adds a form parameter paramName with the (string) value paramVal
msg = Message()
msg["Content-Disposition"] = "form-data"
msg.set_param("name", paramName, "Content-Disposition")
def forHTTPUpload(self):
"""returns a string serialisation of this message suitable for HTTP
This is as_string, except we're cutting off the header (which is
part of the HTTP header for us), and we are changing the line separator
to crlf.
data = self.as_string().split("\n\n", 1)[-1]
data = data.replace("\n", "\r\n")
return data
def fromDict(cls, dict):
self = cls()
for key, value in dict.items():
self.addParam(key, value)
return self
def _getErrorInfo(votString):
"""returns the message from a TAP error VOTable.
if votString is not a TAP error VOTable, it is returned verbatim
(decoded as UTF-8)
TODO: For large responses, this may take a while. It's probably
not worth it in such cases. Or at all. Maybe we should hunt
for the INFO somewhere else?
for el in votparse.parseBytes(votString, watchset=[V.INFO]):
if isinstance(el, V.INFO):
if el.name=="QUERY_STATUS" and el.value=="ERROR":
return el.text_
# it's data, which we want to skip quickly
for _ in el: pass
except Exception:
# votString's not a suitable VOTable, fall through to return votString
return votString.decode("utf-8", "ignore")
def _makeFlatParser(parseFunc):
"""returns a "parser" class for _parseWith just calling a function on a string.
_parseWith is designed for utils.StartEndParsers, but it's convenient
to use it when there's no XML in the responses as well.
So, this class wraps a simple function into a StartEndParser-compatible
class FlatParser(object):
def parseBytes(self, data):
self.result = parseFunc(data)
def getResult(self):
return self.result
return FlatParser
def _parseWith(parser, data):
"""uses the utils.StartEndParser-compatible parser to parse the string data.
return parser.getResult()
except (ValueError, IndexError, sax.SAXParseException):
if debug:
f = open("server_response", "wb")
raise ProtocolError("Malformed response document.", hint=
"If debug was enabled, you will find the server response in"
" the file server_response.")
class _PhaseParser(utils.StartEndHandler):
"""A parser accepting both plain text and XML replies.
Of course, the XML replies are against the standard, but -- ah, well.
def _end_phase(self, name, attrs, content):
self.result = content
def parseBytes(self, data):
if data.strip().startswith(b"<"): # XML :-)
utils.StartEndHandler.parseBytes(self, data)
self.result = data.strip().decode("ascii")
def getResult(self):
return self.result
class _QuoteParser(utils.StartEndHandler):
quote = None
def parseDate(self, literal):
literal = utils.debytify(literal)
val = None
if literal and literal!="NULL":
val = utils.parseISODT(literal)
return val
def _end_quote(self, name, attrs, content):
self.quote = self.parseDate(content.strip())
def parseBytes(self, data):
data = data.strip()
if data.startswith(b"<"): # XML :-)
utils.StartEndHandler.parseString(self, data)
self.quote = self.parseDate(data)
def getResult(self):
return self.quote
class _CaselessDictionary(dict):
"""A dictionary that only has lower-case keys but treats keys in any
capitalization as equivalent.
def __contains__(self, key):
dict.__contains__(self, key.lower())
def __getitem__(self, key):
return dict.__getitem__(self, key.lower())
def __setitem__(self, key, value):
dict.__setitem__(self, key.lower(), value)
def __delitem__(self, key):
dict.__delitem__(self, key.lower())
class _ParametersParser(utils.StartEndHandler):
def _initialize(self):
self.parameters = _CaselessDictionary()
def _end_parameter(self, name, attrs, content):
self.parameters[attrs["id"]] = content
def getResult(self):
return self.parameters
class _ResultsParser(utils.StartEndHandler):
def _initialize(self):
self.results = []
def _end_result(self, name, attrs, content):
attrs = self.getAttrsAsDict(attrs)
attrs.get("id"), attrs.get("type", "simple")))
def getResult(self):
return self.results
class _InfoParser(_ParametersParser, _ResultsParser):
def _initialize(self):
self.info = {}
def _end_jobId(self, name, attrs, content):
self.info[name] = content
_end_phase = _end_jobId
def _end_executionDuration(self, name, attrs, content):
self.info[name] = float(content)
def _end_destruction(self, name, attrs, content):
self.info[name] = utils.parseISODT(content)
def _end_job(self,name, attrs, content):
self.info["results"] = self.results
self.info["parameters"] = self.parameters
def getResult(self):
return self.info
class _AvailabilityParser(utils.StartEndHandler):
available = None
def _end_available(self, name, attrs, content):
content = content.strip()
if content=="true":
self.available = True
elif content=="false":
self.available = False
def getResult(self):
return self.available
def _pruneAttrNS(attrs):
return dict((k.split(":")[-1], v) for k,v in list(attrs.items()))
class _CapabilitiesParser(utils.StartEndHandler):
# VOSI; each capability is a dict with at least a key interfaces.
# each interface is a dict with key type (namespace prefix not expanded;
# change that?), accessURL, and use.
def __init__(self):
self.capabilities = []
def _start_capability(self, name, attrs):
self.curCap = {"interfaces": []}
self.curCap["standardID"] = attrs.get("standardID")
def _end_capability(self, name, attrs, content):
self.curCap = None
def _start_interface(self, name, attrs):
attrs = _pruneAttrNS(attrs)
self.curInterface = {"type": attrs["type"], "role": attrs.get("role")}
def _end_interface(self,name, attrs, content):
self.curInterface = None
def _end_accessURL(self, name, attrs, content):
self.curInterface["accessURL"] = content.strip()
self.curInterface["use"] = attrs.get("use")
def getResult(self):
return self.capabilities
class _TablesParser(utils.StartEndHandler):
def __init__(self):
self.tables = []
self.curCol = None
def _start_table(self, name, attrs):
def _start_column(self, name, attrs):
self.curCol = V.FIELD()
def _end_column(self, name, attrs, content):
self.curCol = None
def _end_description(self, attName, attrs, content):
if self.getParentTag()=="table":
destObj = self.tables[-1]
elif self.getParentTag()=="column":
destObj = self.curCol
# name/desc of something else -- ignore
def _endColOrTableAttr(self, attName, attrs, content):
if self.getParentTag()=="table":
destObj = self.tables[-1]
elif self.getParentTag()=="column":
destObj = self.curCol
# name/desc of something else -- ignore
destObj(**{str(attName): content.strip()})
_end_name = _endColOrTableAttr
def _endColAttr(self, attName, attrs, content):
self.curCol(**{str(attName): content.strip()})
_end_unit = _end_ucd = _endColAttr
def _end_dataType(self, attName, attrs, content):
if "arraysize" in attrs:
def getResult(self):
return self.tables
[docs]class UWSResult(object):
"""a container type for a result returned by an UWS service.
It exposes id, href, and type attributes.
def __init__(self, href, id=None, type=None):
self.href, self.id, self.type = href, id, type
[docs]class LocalResult(object):
def __init__(self, data, id, type):
self.data, self.id, self.type = data, id, type
def _canUseFormEncoding(params):
"""returns true if userParams can be transmitted in a
x-www-form-urlencoded payload.
for val in list(params.values()):
if not isinstance(val, str):
return False
return True
ACCEPTED_REDIRECTS = {301, 302, 303, 307}
[docs]def request(scheme, host, path, data="", customHeaders={}, method="GET",
expectedStatus=None, followRedirects=False, setResponse=None,
"""returns a HTTPResponse object for an HTTP request to path on host.
This function builds a new connection for every request.
On the returned object, you cannot use the read() method. Instead
any data returned by the server is available in the data attribute.
data usually is a byte string, but you can also pass a dictionary
which then will be serialized using _FormData above.
You can set followRedirects to True. This means that the
303 "See other" codes that many UWS action generate will be followed
and the document at the other end will be obtained. For many
operations this will lead to an error; only do this for slightly
broken services.
In setResponse, you can pass in a callable that is called with the
server response body as soon as it is in. This is for when you want
to store the response even if request raises an error later on
(i.e., for sync querying).
if scheme=="http":
connClass = http.client.HTTPConnection
elif scheme=="https":
connClass = http.client.HTTPSConnection
assert False
headers = {"connection": "close",
"user-agent": USER_AGENT}
if not isinstance(data, str):
if _canUseFormEncoding(data):
data = urllib.parse.urlencode(data)
headers["Content-Type"] = "application/x-www-form-urlencoded"
form = _FormData.fromDict(data)
data = form.forHTTPUpload()
headers["Content-Type"] = form.get_content_type()+'; boundary="%s"'%(
headers["Content-Length"] = len(data)
conn = connClass(host, timeout=timeout)
except TypeError: # probably python<2.6, no timeout support
conn = http.client.HTTPConnection(host)
conn.request(method, path, data, headers)
except (socket.error, http.client.error) as ex:
raise NetworkError("Problem connecting to %s (%s)"%
(host, str(ex)))
resp = conn.getresponse()
resp.data = resp.read()
if setResponse is not None:
if followRedirects and resp.status in ACCEPTED_REDIRECTS:
parts = urllib.parse.urlparse(resp.getheader("location"))
return request(parts.scheme, parts.netloc, parts.path,
method="GET", expectedStatus=expectedStatus,
if expectedStatus is not None:
if resp.status!=expectedStatus:
raise WrongStatus("Expected status %s, got status %s"%(
expectedStatus, resp.status), resp.status, resp.data)
return resp
def _makeAtomicValueGetter(methodPath, parser):
# This is for building ADQLTAPJob's properties (phase, etc.)
def getter(self):
destURL = self.jobPath+methodPath
response = request(self.destScheme, self.destHost, destURL,
return _parseWith(parser(), response.data)
return getter
def _makeAtomicValueSetter(methodPath, serializer, parameterName):
# This is for building ADQLTAPJob's properties (phase, etc.)
def setter(self, value):
destURL = self.jobPath+methodPath
request(self.destScheme, self.destHost, destURL,
{parameterName: serializer(value)}, method="POST",
return setter
class _WithEndpoint(object):
"""A helper class for classes constructed with an ADQL endpoint.
def _defineEndpoint(self, endpointURL):
self.endpointURL = endpointURL.rstrip("/")
parts = urllib.parse.urlsplit(self.endpointURL)
self.destScheme = parts.scheme
self.destHost = parts.hostname
if parts.port:
self.destHost = "%s:%s"%(self.destHost, parts.port)
self.destPath = parts.path
if self.destPath.endswith("/"):
self.destPath = self.destPath[:-1]
[docs]class ADQLTAPJob(_WithEndpoint):
"""A facade for an ADQL-based async TAP job.
Construct it with the URL of the async endpoint and a query.
Alternatively, you can give the endpoint URL and a jobId as a
keyword parameter. This only makes sense if the service has
handed out the jobId before (e.g., when a different program takes
up handling of a job started before).
See :dachsdoc:`adql.html` for details.
def __init__(self, endpointURL, query=None, jobId=None, lang="ADQL",
userParams={}, timeout=None):
self.timeout = timeout
self.destPath = utils.ensureOneSlash(self.destPath)+"async"
if query is not None:
self.jobId, self.jobPath = None, None
self._createJob(query, lang, userParams)
elif jobId is not None:
self.jobId = jobId
raise Error("Must construct ADQLTAPJob with at least query or jobId")
def _computeJobPath(self):
self.jobPath = "%s/%s"%(self.destPath, self.jobId)
def _createJob(self, query, lang, userParams):
params = {
"REQUEST": "doQuery",
"LANG": lang,
"QUERY": query}
for k,v in userParams.items():
params[k] = str(v)
response = request(self.destScheme, self.destHost, self.destPath, params,
method="POST", expectedStatus=303, timeout=self.timeout)
# The last part of headers[location] now contains the job id
self.jobId = urllib.parse.urlsplit(
response.getheader("location", "")).path.split("/")[-1]
except ValueError:
raise utils.logOldExc(
ProtocolError("Job creation returned invalid job id"))
[docs] def delete(self, usePOST=False):
"""removes the job on the remote side.
usePOST=True can be used for servers that do not support the DELETE
HTTP method (a.k.a. "are broken").
if self.jobPath is not None:
if usePOST:
request(self.destScheme, self.destHost, self.jobPath, method="POST",
data={"ACTION": "DELETE"}, expectedStatus=303,
request(self.destScheme, self.destHost, self.jobPath, method="DELETE",
expectedStatus=303, timeout=self.timeout)
[docs] def start(self):
"""asks the remote side to start the job.
request(self.destScheme, self.destHost, self.jobPath+"/phase",
{"PHASE": "RUN"}, method="POST", expectedStatus=303,
[docs] def abort(self):
"""asks the remote side to abort the job.
request(self.destScheme, self.destHost, self.jobPath+"/phase",
{"PHASE": "ABORT"}, method="POST", expectedStatus=303,
[docs] def raiseIfError(self):
"""raises an appropriate error message if job has thrown an error or
has been aborted.
phase = self.phase
if phase==ERROR:
raise RemoteError(self.getErrorFromServer())
elif phase==ABORTED:
raise RemoteAbort()
[docs] def waitForPhases(self, phases, pollInterval=1, increment=1.189207115002721,
"""waits for the job's phase to become one of the set phases.
This method polls. Initially, it does increases poll times
exponentially with increment until it queries every two minutes.
The magic number in increment is 2**(1/4.).
giveUpAfter, if given, is the number of iterations this method will
do. If none of the desired phases have been found until then,
raise a ProtocolError.
attempts = 0
while True:
curPhase = self.phase
if curPhase in phases:
pollInterval = min(120, pollInterval*increment)
attempts += 1
if giveUpAfter:
if attempts>giveUpAfter:
raise ProtocolError("None of the states in %s were reached"
" in time."%repr(phases),
hint="After %d attempts, phase was %s"%(attempts, curPhase))
[docs] def run(self, pollInterval=1):
"""runs the job and waits until it has finished.
The function raises an exception with an error message gleaned from the
self.waitForPhases(set([COMPLETED, ABORTED, ERROR]))
executionDuration = property(
_makeAtomicValueGetter("/executionduration", _makeFlatParser(float)),
_makeAtomicValueSetter("/executionduration", str, "EXECUTIONDURATION"))
destruction = property(
_makeAtomicValueGetter("/destruction", _makeFlatParser(
lambda data: utils.parseISODT(data.decode("ascii")))),
lambda dt: dt.strftime("%Y-%m-%dT%H:%M:%S.000"), "DESTRUCTION"))
[docs] def makeJobURL(self, jobPath):
return self.endpointURL+"/async/%s%s"%(self.jobId, jobPath)
def _queryJobResource(self, path, parser):
# a helper for phase, quote, etc.
response = request(self.destScheme, self.destHost, self.jobPath+path,
expectedStatus=200, timeout=self.timeout)
return _parseWith(parser, response.data)
def info(self):
"""returns a dictionary of much job-related information.
return self._queryJobResource("", _InfoParser())
def phase(self):
"""returns the phase the job is in according to the server.
return self._queryJobResource("/phase", _PhaseParser())
def quote(self):
"""returns the estimate the server gives for the run time of the job.
return self._queryJobResource("/quote", _QuoteParser())
def owner(self):
"""returns the owner of the job.
return self._queryJobResource("/owner", _makeFlatParser(
lambda data: data.decode("utf-8"))())
def parameters(self):
"""returns a dictionary mapping passed parameters to server-provided
string representations.
To set a parameter, use the setParameter function. Changing the
dictionary returned here will have no effect.
return self._queryJobResource("/parameters", _ParametersParser())
def allResults(self):
"""returns a list of UWSResult instances.
return self._queryJobResource("/results", _ResultsParser())
[docs] def getResultURL(self, simple=True):
"""returns the URL of the ADQL result table.
if simple:
return self.makeJobURL("/results/result")
return self.allResults[0].href
[docs] def openResult(self, simple=True):
"""returns a file-like object you can read the default TAP result off.
To have the embedded VOTable returned, say
If you pass simple=False, the URL will be taken from the
service's result list (the first one given there). Otherwise (the
default), results/result is used.
return urllib.request.urlopen(self.getResultURL())
[docs] def setParameter(self, key, value):
request(self.destScheme, self.destHost, self.jobPath+"/parameters",
data={key: value}, method="POST", expectedStatus=303,
[docs] def getErrorFromServer(self):
"""returns the error message the server gives, verbatim.
data = request(self.destScheme, self.destHost, self.jobPath+"/error",
expectedStatus=200, followRedirects=True,
return _getErrorInfo(data)
[docs] def addUpload(self, name, data):
"""adds uploaded tables, either from a file or as a remote URL.
You should not try to change UPLOAD yourself (e.g., using setParameter).
Data is either a string (i.e. a URI) or a file-like object (an upload).
uploadFragments = []
form = _FormData()
if isinstance(data, str): # a URI
assert ',' not in data
assert ';' not in data
uploadFragments.append("%s,%s"%(name, data))
else: # Inline upload, data is a file
uploadKey = utils.intToFunnyWord(id(data))
form.addFile(uploadKey, uploadKey, data.read())
uploadFragments.append("%s,param:%s"%(name, uploadKey))
form.addParam("UPLOAD", ";".join(uploadFragments))
request(self.destScheme, self.destHost, self.jobPath+"/parameters",
data=form.forHTTPUpload(), expectedStatus=303,
form.get_content_type()+'; boundary="%s"'%(form.get_boundary())})
[docs]class ADQLSyncJob(_WithEndpoint):
"""A facade for a synchronous TAP Job.
This really is just a very glorified urllib.urlopen. Maybe some
superficial parallels to ADQLTAPJob are useful.
You can construct it, add uploads, and then start or run the thing.
Methods that make no sense at all for sync jobs ("phase") silently
return some more or less sensible fakes.
def __init__(self, endpointURL, query=None, jobId=None, lang="ADQL",
userParams={}, timeout=None):
self.query, self.lang = query, lang
self.userParams = userParams.copy()
self.result = None
self.uploads = []
self._errorFromServer = None
self.timeout = timeout
[docs] def postToService(self, params):
return request(self.destScheme, self.destHost, self.destPath+"/sync",
method="POST", followRedirects=5, expectedStatus=200,
setResponse=self._setErrorFromServer, timeout=self.timeout)
[docs] def delete(self, usePOST=None):
# Nothing to delete
[docs] def abort(self):
"""does nothing.
You could argue that this could come from a different thread and we
could try to interrupt the ongoing request. Well, if you want it,
try it yourself or ask the author.
[docs] def raiseIfError(self):
if self._errorFromServer is not None:
raise Error(self._errorFromServer)
[docs] def waitForPhases(self, phases, pollInterval=None, increment=None,
# you could argue that sync jobs are in no phase, but I'd say
# they are in all of them at the same time:
def _setErrorFromServer(self, data):
# this is a somewhat convolved way to get server error messages
# out of request even when it later errors out. See the
# except construct around the postToService call in start()
# Also, try to interpret what's coming back as a VOTable with an
# error message; _getErrorInfo is robust against other junk.
self._errorFromServer = _getErrorInfo(data)
[docs] def start(self):
"REQUEST": "doQuery",
"LANG": self.lang,
"QUERY": self.query}
if self.uploads:
upFrags = []
for name, key, data in self.uploads:
upFrags.append("%s,param:%s"%(name, key))
params[key] = data
params["UPLOAD"] = ";".join(upFrags)
params = dict((k, str(v)) for k,v in params.items())
resp = self.postToService(params)
self.result = LocalResult(resp.data, "TAPResult", resp.getheader(
except Exception as msg:
# do not clear _errorFromServer; but if it's empty, make up one
# from our exception
if not self._errorFromServer:
self._errorFromServer = str(msg)
# all went well, clear error indicator
self._errorFromServer = None
return self
[docs] def run(self, pollInterval=None):
return self.start()
def info(self):
return {}
def phase(self):
return None
def quote(self):
return None
def owner(self):
return None
def parameters(self):
return self.userParameters
def allResults(self):
if self.result is None:
return []
return [self.result]
[docs] def openResult(self, simple=True):
if self.result is None:
raise Error("No result in so far")
return io.BytesIO(self.result.data)
[docs] def setParameter(self, key, value):
self.userParams[key] = value
[docs] def getErrorFromServer(self):
return self._errorFromServer
[docs] def addUpload(self, name, data):
if hasattr(data, "read"):
data = data.read()
if not isinstance(data, bytes):
raise NotImplementedError("Upload source must be file or bytes")
key = utils.intToFunnyWord(id(data))
self.uploads.append((name, key, data))
[docs]class ADQLEndpoint(_WithEndpoint):
"""A facade for an ADQL endpoint.
This is only needed for inspecting server metadata (i.e., in general
only for rather fancy applications).
def __init__(self, endpointURL):
[docs] def createJob(self, query, lang="ADQL-2.0", userParams={}):
return ADQLTAPJob(self.endpointURL, query, lang, userParams)
def available(self):
"""returns True, False, or None (undecidable).
None is returned when /availability gives a 404 (which is legal)
or the returned document doesn't parse.
response = request(self.destScheme, self.destHost,
self.destPath+"/availability", expectedStatus=200)
res = _parseWith(_AvailabilityParser(), response.data)
except WrongStatus:
res = None
return res
def capabilities(self):
"""returns a dictionary containing some meta info on the remote service.
Keys to look for include title, identifier, contact (the mail address),
and referenceURL.
If the remote server doesn't return capabilities as expected, an
empty dict is returned.
return _parseWith(_CapabilitiesParser(),
request(self.destScheme, self.destHost,
def tables(self):
"""returns a sequence of table definitions for the tables accessible
through this service.
The table definitions come as gavo.votable.Table instances.
return _parseWith(_TablesParser(),
request(self.destScheme, self.destHost, self.destPath+"/tables").data)