"""
A caching proxy for CDS' Simbad object resolver.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import json
import os
import socket
from urllib import request, parse
if __name__=="__main__":
# see below on why this doesn't have normal unit tests.
os.environ["GAVO_OOTTEST"] = "dontcare"
from gavo.helpers import testhelpers
from gavo import base
from gavo.utils import ElementTree
[docs]class ObjectCache(object):
"""a cache for simbad queries kept in dc.metastore.
This used to be file-based, and used different keys for different
purposes. The different keys didn't seem to be useful, so they're
ignored now.
This only caches positive responses; there's too much that can
go wrong when caching negatives, and the expectation is that negatives
are so varying that there's little to win anyway.
The values passed in are json-encoded (for simbad, these are dictionaries).
"""
[docs] def addItem(self, key, value):
"""adds an item to the cache.
value is json-encoded before writing it.
"""
with base.getWritableAdminConn() as conn:
base.setDBMeta(conn,
'simbad:'+key,
json.dumps(value))
[docs] def getItem(self, key):
"""returns a previously stored object of key.
This raises a KeyError if nothing has been stored before.
"""
return json.loads(base.getDBMeta('simbad:'+key))
[docs]class Sesame(object):
"""is a simple interface to the simbad name resolver.
"""
# we're using several simbad mirrors if we have to, and only give
# up if all of them fail.
svc_urls = [
"http://cdsweb.u-strasbg.fr/cgi-bin/nph-sesame/-ox/SN?",
"http://vizier.cfa.harvard.edu/viz-bin/nph-sesame/-ox/SN?"]
def __init__(self):
self.cache = ObjectCache()
def _parseXML(self, simbadXML):
try:
et = ElementTree.fromstring(simbadXML)
except Exception as msg: # simbad returned weird XML
base.ui.notifyWarning("Bad XML from simbad (%s)"%str(msg))
return None
res = {}
nameMatch = et.find("Target/name")
if nameMatch is None:
# no such object, return a negative
return None
res["oname"] = nameMatch.text
firstResponse = et.find("Target/Resolver")
if not firstResponse:
return None
res["otype"] = getattr(firstResponse.find("otype"), "text", None)
try:
res["RA"] = float(firstResponse.find("jradeg").text)
res["dec"] = float(firstResponse.find("jdedeg").text)
except (ValueError, AttributeError):
# presumably null position
return None
return res
[docs] def query(self, ident):
try:
return self.cache.getItem(ident)
except KeyError:
# cache miss, fall through to actually querying sesame
pass
for svc_url in self.svc_urls:
try:
with request.urlopen(svc_url+parse.quote(ident), timeout=2) as f:
newOb = self._parseXML(f.read())
self.cache.addItem(ident, newOb)
return newOb
except socket.error:
# Try next mirror
pass
else:
# all mirrors fail
raise base.ui.logOldExc(base.ValidationError(
"Simbad is offline, cannot query.",
"hscs_pos", # really, this should be added by the widget
hint="If this problem persists, complain to us rather than simbad."))
[docs] def getPositionFor(self, identifier):
rec = self.query(identifier)
if not rec:
raise KeyError(identifier)
return float(rec["RA"]), float(rec["dec"])
[docs]def getSimbadPositions(identifier):
"""returns ra and dec from Simbad for identifier.
It raises a KeyError if Simbad doesn't know identifier.
"""
return base.caches.getSesame("").getPositionFor(identifier)
# This used to accept a "key" to separate different uses of Sesame.
# That's not turned out to be useful, so we're now ignoring the
# key.
base.caches.makeCache("getSesame", lambda key="ignored": Sesame())
############## ADQL ufunc
from gavo import adql
@adql.userFunction("ivo_simbadpoint",
"(identifier TEXT) -> POINT",
"""
gavo_simbadpoint queries simbad for an identifier and returns the
corresponding point. Note that identifier can only be a literal,
i.e., as simple string rather than a column name. This is because
our database cannot query simbad, and we probably wouldn't want
to fire off millions of simbad queries anyway; use simbad's own
TAP service for this kind of application.
""",
"point", ucd="pos.eq;src",
additionalNames=["gavo_simbadpoint"])
def _simbadpoint(args):
from gavo.adql import nodes
if len(args)!=1 or args[0].type!="characterStringLiteral":
raise adql.UfuncError(
"gavo_simbadpoint takes exactly one string literal as argument")
object = args[0].value
resolver = base.caches.getSesame("")
try:
alpha, delta = resolver.getPositionFor(object)
except KeyError:
raise adql.UfuncError("No simbad position for '%s'"%object)
raise nodes.ReplaceNode(nodes.Point(cooSys=None,
x=nodes.Factor([repr(alpha)]), y=nodes.Factor([repr(delta)])))
def _getTestSuite():
import unittest
with base.getWritableAdminConn() as conn:
conn.execute("DELETE FROM dc.metastore WHERE key LIKE 'simbad:%%'")
sc = base.caches.getSesame("anything")
# NOTE: all these tests assume the cache has been cleared before
# them, and that the configured mirrors are up.
# Cache clearing happens a few lines up.
class QueryTest(testhelpers.VerboseTest):
def testBasic(self):
res = getSimbadPositions("Antares")
self.assertAlmostEqual(res[0], 247.351915, 5)
self.assertAlmostEqual(res[1], -26.432002, 5)
def testCaching(self):
res = getSimbadPositions("M31")
self.assertAlmostEqual(res[0], 10.684708, 5)
self.assertAlmostEqual(res[1], 41.26875, 5)
tmp = Sesame.svc_urls
Sesame.svcs_urls = []
try:
res = getSimbadPositions("M31")
self.assertAlmostEqual(res[0], 10.684708, 5)
self.assertAlmostEqual(res[1], 41.26875, 5)
finally:
Sesame.svcs_urls = tmp
def testMirrorFailover(self):
tmp = Sesame.svc_urls[0]
Sesame.svc_urls[0] = "http://localhost:39293?"
try:
res = getSimbadPositions("epsilon Eri")
self.assertAlmostEqual(res[0], 53.232687, 5)
self.assertAlmostEqual(res[1], -9.458258, 5)
finally:
Sesame.svc_urls[0] = tmp
def testCacheInstallation(self):
res = base.caches.getSesame("anything").getPositionFor("ε Eri")
self.assertAlmostEqual(res[0], 53.232687, 5)
self.assertAlmostEqual(res[1], -9.458258, 5)
l = locals()
tests = [l[name] for name in l
if isinstance(l[name], type) and issubclass(l[name], unittest.TestCase)]
loader = unittest.TestLoader()
suite = unittest.TestSuite([loader.loadTestsFromTestCase(t)
for t in tests])
return suite
if __name__=="__main__":
# we don't want to test this as part of the normal unit tests, as
# there's little to sensibly test without a live network connection
# (and we don't want to require that for the unit tests).
import unittest
suite = _getTestSuite()
unittest.TextTestRunner().run(suite)