"""
Bindings for the pgsphere library and psycopg2.
Basically, once per program run, you need to call preparePgSphere(connection),
and you're done.
All native representation is in rad.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from __future__ import annotations
import io
import math
import re
import tempfile
from astropy import units as u # type: ignore
import numpy
from gavo.utils import codetricks
from gavo.utils import excs
from gavo.utils import mathtricks
from gavo.utils import misctricks
from gavo.utils import texttricks
from gavo.utils.fitstools import pyfits
from gavo.utils.mathtricks import DEG
from gavo.utils.dachstypes import (Any, Dict, List, Optional, Sequence,
Union, Tuple, TYPE_CHECKING, cast)
if TYPE_CHECKING:
Component = Union[float, u.Quantity]
from astropy.coordinates import ( # type: ignore
UnitSphericalRepresentation)
_TRAILING_ZEROES = re.compile("0+(\s|$)")
[docs]def removeTrailingZeroes(s: str) -> str:
"""remove zeroes in front of whitespace or the string end.
This is used for cosmetics in STC-S strings.
>>> removeTrailingZeroes("23.3420 21.2 12.00000")
'23.342 21.2 12.'
"""
return _TRAILING_ZEROES.sub(r"\1", s)
[docs]class PgSAdapter(object):
"""A base class for objects adapting pgSphere objects.
The all need a pgType attribute and two static methods
``_adaptToPgSphere(obj)`` and ``_castFromPgSphere(value, cursor)``.
You must also define a sequence checkedAttributes; all attributes
mentioned there must be equal for two adapted values to be equal (equality
testing here really is mainly for unit tests with hand-crafted values).
Also, all subclasses you should provide an asPoly returning a spherical
polygon. This is used when uploading VOTables with REGION columns.
In practice, you also want:
:fromDALI(cls, daliSeq):
a classmethod turning the DALI votable representation (typically,
float arrays in degrees) into a corresponding object.
:asDALI(self):
the inverse of fromDALI
:asSODA(self):
returns a representation of the geometry as used in SODA parameters.
"""
pgType: Optional[str] = None
checkedAttributes: List[str] = []
def __eq__(self, other: Any):
if self.__class__!=other.__class__:
return False
for attName in self.checkedAttributes:
if getattr(self, attName)!=getattr(other, attName):
return False
return True
def __ne__(self, other: Any):
return not self==other
def __repr__(self):
return "<pgsphere %s>"%self.asSTCS("Unknown")
[docs] def asPoly(self):
raise ValueError("%s objects cannot be turned into polygons."%
self.__class__)
[docs] def asSODA(self):
"""returns the "SODA-form" for this circle.
This is a string containing blank-separated float literals; the
floats are what asDALI returns, and hence this is built on top of
asDALI. We don't worry about the fact that the coordinates just
*might* be non-ICRS.
"""
return removeTrailingZeroes(
" ".join("%.10f"%v for v in self.asDALI()))
[docs]class SPoint(PgSAdapter):
"""A point on a sphere from pgSphere.
>>> SPoint(1, 1).asSTCS("ICRS")
'Position ICRS 57.2957795131 57.2957795131'
>>> SPoint.fromDegrees(1, -1).asSTCS("ICRS")
'Position ICRS 1. -1.'
"""
pgType = "spoint"
checkedAttributes = ["x", "y"]
pattern = re.compile(r"\s*\(\s*([0-9.eNa-]+)\s*,\s*([0-9.eNa-]+)\s*\)")
def __init__(self, x: Component, y: Component):
if isinstance(x, u.Quantity) and isinstance(y, u.Quantity):
self.x, self.y = x.to(u.rad).value, y.to(u.rad).value
else:
self.x, self.y = float(x), float(y)
def __repr__(self) -> str:
return "SPoint(%r, %r)"%(self.x, self.y)
@staticmethod
def _adaptToPgSphere(spoint: SPoint) -> AsIs:
return AsIs("spoint '(%.10f,%.10f)'"%(spoint.x, spoint.y))
@classmethod
def _castFromPgSphere(cls, value: Optional[str], cursor: Any
) -> 'Optional[SPoint]':
if value is not None:
return cls(*[float(v)
for v in cls.pattern.match(value).groups()]) # type: ignore
return None
[docs] @classmethod
def fromDegreesP(cls, x: Component, y: Component) -> SPoint:
"""fromDegrees when it's certain that there's no None x and y.
"""
return cls(x*DEG, y*DEG)
[docs] @classmethod
def fromDegrees(cls, x: Optional[Component], y: Optional[Component]
) -> Optional[SPoint]:
if x is None or y is None:
return None
return cls.fromDegreesP(x, y)
[docs] @classmethod
def fromUnitVector(cls, uvec: UnitSphericalRepresentation) -> SPoint:
"""returns an SPoint for a 3-unit sphere vector.
"""
return cls(*mathtricks.cartToSpher(uvec))
[docs] def asCooPair(self) -> Tuple[float, float]:
"""returns this point as (long, lat) in degrees.
"""
return (self.x/DEG, self.y/DEG)
[docs] def asSTCS(self, systemString: str) -> str:
return removeTrailingZeroes(
"Position %s %.10f %.10f"%(systemString, self.x/DEG, self.y/DEG))
[docs] def asPgSphere(self) -> str:
return "spoint '(%.10f,%.10f)'"%(self.x, self.y)
[docs] def p(self) -> str: # helps below
return "(%r, %r)"%(self.x, self.y)
[docs] def asUnitSphereVec(self) -> UnitSphericalRepresentation:
"""returns self as a triple of cx, cy, cz on the unit sphere.
"""
return mathtricks.spherToCart(self.x, self.y)
[docs] def asDALI(self) -> Tuple[float, float]:
return self.asCooPair()
[docs] @classmethod
def fromDALI(cls, coos: Sequence[Component]):
return cls.fromDegrees(*coos)
[docs]class SCircle(PgSAdapter):
"""A spherical circle from pgSphere.
The constructor accepts an SPoint center and a radius in rad.
"""
pgType = "scircle"
checkedAttributes = ["center", "radius"]
pattern = re.compile("<(\([^)]*\))\s*,\s*([0-9.e-]+)>")
def __init__(self, center: SPoint, radius: Component):
self.center, self.radius = center, float(radius)
@staticmethod
def _adaptToPgSphere(sc: SCircle):
return AsIs("scircle '< %s, %r >'"%(sc.center.p(), sc.radius))
@classmethod
def _castFromPgSphere(cls, value: Optional[str], cursor: Any
) -> Optional[SCircle]:
if value is not None:
pt, radius = cls.pattern.match(value).groups() # type: ignore
if pt is not None:
return cls(SPoint._castFromPgSphere(pt, cursor), radius) # type: ignore
return None
[docs] def asDALI(self) -> Tuple[float, float, float]:
"""returns the "DALI-form" for this circle.
This is an array containing the center coordinates and the radius
in degrees.
"""
return (self.center.x/DEG, self.center.y/DEG, self.radius/DEG)
[docs] @classmethod
def fromDALI(cls, daliSeq: Sequence[Optional[Component]]
) -> Optional[SCircle]:
"""returns a circle from its DALI float sequence.
Any None in daliSeq makes this a None to help with VOTable null value
handling.
"""
if None in daliSeq:
return None
ra, dec, radius = [float(s) for s in daliSeq] # type: ignore
center = SPoint.fromDegreesP(ra, dec)
return cls(center, radius*DEG)
[docs] @classmethod
def fromPointSet(cls, points: Sequence[SPoint]) -> SCircle:
"""returns an SCircle covering all the SPoints in points.
(max radius: pi/2).
"""
uvecs = [numpy.array(p.asUnitSphereVec()) for p in points]
center = sum(uvecs)/len(uvecs)
radius = max(center.dot(v) for v in uvecs) # type: ignore
return cls(
SPoint.fromUnitVector(center),
math.acos(radius))
[docs] def asSTCS(self, systemString: str) -> str:
return removeTrailingZeroes("Circle %s %s"%(
systemString, self.asSODA()))
[docs] def asPgSphere(self) -> str:
return "scircle '< (%.10f, %.10f), %.10f >'"%(
self.center.x, self.center.y, self.radius)
[docs] def asPoly(self, nSegments: int=32) -> SPoly:
# approximate the circle with 32 line segments and don't worry about
# circles with radii larger than 90 degrees.
# We compute the circle around the north pole and then rotate
# the resulting points such that the center ends up at the
# circle's center.
r = math.sin(self.radius)
innerOffset = math.cos(self.radius)
rotationMatrix = mathtricks.getRotZ(math.pi/2-self.center.x
)@mathtricks.getRotX(math.pi/2-self.center.y)
points = []
for i in range(nSegments):
angle = 2.*i*math.pi/nSegments
dx, dy = r*math.sin(angle), r*math.cos(angle)
points.append(SPoint(
*mathtricks.cartToSpher(
rotationMatrix@numpy.array([dx, dy, innerOffset]))))
return SPoly(points)
[docs] def asSMoc(self, order: int=6, inclusive: bool=False):
"""returns an SMoc instance for this circle.
order is the maximum order of the moc returned.
"""
import healpy # type: ignore
pixels = healpy.query_disc(vec=self.center.asUnitSphereVec(),
radius=self.radius,
nside=2**order, nest=True, inclusive=inclusive)
return SMoc.fromCells(order, pixels, maxOrder=order)
[docs]class SPoly(PgSAdapter):
"""A spherical polygon from pgSphere.
The constructor accepts a list points of SPoints.
"""
pgType = "spoly"
checkedAttributes = ["points"]
pattern = re.compile("\([^)]+\)")
def __init__(self, points: Sequence[SPoint]):
self.points = tuple(points)
@staticmethod
def _adaptToPgSphere(spoly: SPoly) -> AsIs:
return AsIs("spoly '{%s}'"%(", ".join(p.p() for p in spoly.points)))
@classmethod
def _castFromPgSphere(cls, value: Optional[str], cursor: Any
) -> Optional[SPoly]:
if value is not None:
return cls([SPoint._castFromPgSphere(ptLit, cursor) # type: ignore
for ptLit in cls.pattern.findall(value)])
return None
[docs] def asDALI(self) -> List[float]:
"""returns the DALI form of this polygon.
This is a sequence of floats of the vertex coordinates in degrees.
"""
res = []
for p in self.points:
res.extend([p.x/DEG, p.y/DEG])
return res
[docs] @classmethod
def fromDALI(cls, daliSeq: List[Component]) -> Optional[SPoly]:
"""returns a polygon from a DALI float-sequence
This returns None for empty daliSeqs to help with VOTable NULL value
handling.
"""
if not daliSeq:
return None
if len(daliSeq)<6 or len(daliSeq)%2:
raise ValueError("Need an even-numbered number (>=6) of floats"
" in a DALI polygon representation, got %s floats."%len(daliSeq))
return cls([SPoint.fromDegreesP(*tuple(p))
for p in misctricks.grouped(2, daliSeq)])
[docs] def asCooPairs(self) -> List[Tuple[float, float]]:
"""returns the vertices as a sequence of (long, lat) pairs in
degrees.
This form is required by some functions from base.coords.
"""
return [p.asCooPair() for p in self.points]
[docs] def asSTCS(self, systemString: str) -> str:
return removeTrailingZeroes("Polygon %s %s"%(systemString,
self.asSODA()))
[docs] def asPgSphere(self) -> str:
return "spoly '{%s}'"%(",".join("(%.10f,%.10f)"%(p.x, p.y)
for p in self.points))
[docs] def asPoly(self) -> SPoly:
return self
[docs] def asSMoc(self, order: int=6, inclusive: bool=False) -> SMoc:
"""returns an SMoc instance for this polygon.
order is the maximum order of the moc returned.
"""
import healpy
vertices = [p.asUnitSphereVec() for p in self.points]
pixels = healpy.query_polygon(vertices=vertices,
nside=2**order, nest=True, inclusive=inclusive)
return SMoc.fromCells(order, pixels, maxOrder=order)
[docs] def getCenter(self) -> List[float]:
# This probably should return an SPoint, but let's first see where
# this is used.
"""returns an estimate for some sort of center of this polygon.
This is computed as the mean of the vertices (in unit sphere
representation), which, depending on the polygon, may be very far from the
polygon's center of mass. Also, you can kill this by having all points
sit on a great circle, etc.
So: handle with care. Doing this properly is hard.
"""
vertices = [numpy.array(mathtricks.spherToCart(p.x, p.y))
for p in self.points]
center = numpy.average(vertices, axis=0)
center = center/(center@center)
return mathtricks.cartToSpher(center)
[docs]class SBox(PgSAdapter):
"""A spherical box from pgSphere.
The constructor accepts the two corner points.
"""
pgType = "sbox"
checkedAttributes = ["corner1", "corner2"]
pattern = re.compile("\([^()]+\)")
def __init__(self, corner1: SPoint, corner2: SPoint):
self.corner1, self.corner2 = corner1, corner2
@staticmethod
def _adaptToPgSphere(sbox: SBox) -> AsIs:
return AsIs("sbox '(%s, %s)'"%(sbox.corner1.p(), sbox.corner2.p()))
@classmethod
def _castFromPgSphere(cls, value: Optional[str], cursor: Any
) -> Optional[SBox]:
if value is not None:
return cls(*[SPoint._castFromPgSphere(ptLit, cursor) # type: ignore
for ptLit in cls.pattern.findall(value)])
return None
[docs] @classmethod
def fromSIAPPars(cls, ra: float, dec: float, raSize: float, decSize: float
) -> SBox:
"""returns an SBox corresponding to what SIAP passes in.
In particular, all values are in degrees, and a cartesian projection
is assumed.
This is for use with SIAP and tries to partially implement that silly
prescription of "folding" over at the poles. If that happens,
a TwoSBoxes exception is raised. It contains two SBoxes that
should be ORed. I agree that sucks. Let's fix SIAP.
"""
if 90-abs(dec)<0.1: # Special handling at the pole
raSize = 360
else:
raSize = raSize/math.cos(dec*DEG)
decSize = abs(decSize) # inhibit auto swapping of points
minRA, maxRA = ra-raSize/2., ra+raSize/2.
bottom, top = dec-decSize/2., dec+decSize/2.
# folding over at the poles: raise an exception with two boxes,
# and let upstream handle it. Foldover on both poles is not supported.
# All this isn't really thought out and probably doesn't work in
# many interesting cases.
# I hate that folding over.
if bottom<-90 and top>90:
raise ValueError("Cannot fold over at both poles")
elif bottom<-90:
raise TwoSBoxes(
cls(
SPoint.fromDegreesP(minRA, -90),
SPoint.fromDegreesP(maxRA, top)),
cls(
SPoint.fromDegreesP(180+minRA, -90),
SPoint.fromDegreesP(180+maxRA, top)))
elif top>90:
raise TwoSBoxes(
cls(
SPoint.fromDegreesP(minRA, bottom),
SPoint.fromDegreesP(maxRA, 90)),
cls(
SPoint.fromDegreesP(180+minRA, bottom),
SPoint.fromDegreesP(180+maxRA, 90)))
return cls(SPoint.fromDegreesP(minRA, bottom),
SPoint.fromDegreesP(maxRA, top))
[docs] def asSTCS(self, systemString: str) -> str:
return removeTrailingZeroes("PositionInterval %s %s %s"%(systemString,
"%.10f %.10f"%(self.corner1.x/DEG, self.corner1.y/DEG),
"%.10f %.10f"%(self.corner2.x/DEG, self.corner2.y/DEG)))
[docs] def asPoly(self) -> SPoly:
x1, y1 = self.corner1.x, self.corner1.y
x2, y2 = self.corner2.x, self.corner2.y
minX, maxX = min(x1, x2), max(x1, x2)
minY, maxY = min(y1, y2), max(y1, y2)
return SPoly((
SPoint(minX, minY),
SPoint(minX, maxY),
SPoint(maxX, maxY),
SPoint(maxX, minY)))
[docs] @classmethod
def fromDALI(cls, daliSeq: Sequence[Component]) -> SBox:
# see asDALI()
return cls(
SPoint.fromDALI(daliSeq[:2]),
SPoint.fromDALI(daliSeq[2:]))
[docs] def asDALI(self) -> Tuple[float, float, float, float]:
# we're cheating a bit here; there's no official DALI representation
# yet. This is for our private xtype x-box
return self.corner1.asDALI()+self.corner2.asDALI()
[docs]class TwoSBoxes(excs.ExecutiveAction):
"""is raised when an SBox is constructed from center and size such that
it overlaps the pole.
"""
def __init__(self, box1: SBox, box2: SBox):
self.box1, self.box2 = box1, box2
try:
import pymoc # type: ignore
except ImportError:
# we're trying to work without pymoc, too
pass
[docs]class SMoc(PgSAdapter):
"""a MOC with a pgsphere interface.
The default constructor accepts a pymoc.MOC instance, which is
accessible as the moc attribute. The database interface uses
the ASCII serialisation, for which there's the fromASCII constructor.
"""
pgType = "smoc"
checkedAttributes = ["moc"]
def __init__(self, moc: pymoc.MOC):
self.moc = moc
if not hasattr(self.moc, "explicitMaxOrder"):
self.moc.explicitMaxOrder = None
_orderHeadRE = re.compile(r"(\d+)/")
_rangeRE = re.compile(r"\s*(\d+)(?:-(\d+))?\s*$")
@property
def maxOrder(self) -> int:
if self.moc.explicitMaxOrder is None:
return self.moc.order
else:
return self.moc.explicitMaxOrder
@classmethod
def _parseCells(cls, cellLiteral: str, firstPos: int
) -> List[int]:
"""returns a sequence of cells from a MOC cell literal.
firstPos is the string position of the beginning of cellLiteral
for error messages.
"""
curPos = 0
cells = []
for item in re.split("[, \n\t]", cellLiteral):
if not item.strip():
continue
mat = cls._rangeRE.match(item)
if not mat:
raise ValueError("MOC literal syntax error at char %s"%
(firstPos+curPos))
if mat.group(2) is None:
cells.append(int(mat.group(1)))
else:
cells.extend(list(range(int(mat.group(1)), int(mat.group(2))+1)))
curPos += len(item)+1
return cells
[docs] @classmethod
def fromASCII(cls, literal: str) -> SMoc:
"""returns an SMoc from a quasi-standard ASCII serialisation.
"""
# we do the parsing ourselves -- the pymoc interface is too clumsy,
# and the rigidity of the parser doesn't look good.
seps = cast(List[re.Match],
list(cls._orderHeadRE.finditer(literal))+[re.search("$", literal)])
if len(seps)==1:
raise ValueError("No order separator visible in MOC literal '%s'"%
texttricks.makeEllipsis(literal, 40))
if not re.match(r"\s*$", literal[:seps[0].start()]):
raise ValueError("MOC literal '%s' does not start with order spec"%
texttricks.makeEllipsis(literal, 40))
moc = pymoc.MOC()
moc.explicitMaxOrder = None
for openMat, closeMat in cast( # TODO: why do I need this cast?
Sequence[Tuple[re.Match, re.Match]], codetricks.iterRanges(seps)):
order = int(openMat.group(1))
cells = cls._parseCells(
literal[openMat.end():closeMat.start()],
openMat.end())
if cells:
moc.add(order, cells)
else:
moc.explicitMaxOrder = order
return cls(moc)
[docs] @classmethod
def fromCells(cls,
order: int,
pixels: List[int],
maxOrder: Optional[int]=None) -> SMoc:
"""returns a SMoc instance from a collection of pixels at order.
Pass maxOrder to set an explicit max order for the resulting MOC.
"""
moc = pymoc.MOC(order=order, cells=pixels)
moc.explicitMaxOrder = maxOrder
moc.normalize()
return cls(moc)
@staticmethod
def _formatASCIIRange(minCell: int, maxCell: int) -> str:
"""returns a cell literal for a MOC.
"""
if minCell==maxCell:
return str(minCell)
else:
return "%d-%d"%(minCell, maxCell)
[docs] def asASCII(self) -> str:
"""returns an ascii serialisation of this MOC.
"""
# this is essentially the pymoc code, but again saving the file
# interface that we don't want here.
parts = []
for order, cells in self.moc:
ranges = []
rmin, rmax = -1, -1
for cell in sorted(cells):
if rmin==-1:
rmin = rmax = cell
elif rmax==cell-1:
rmax = cell
else:
ranges.append(self._formatASCIIRange(rmin, rmax))
rmin = rmax = cell
ranges.append(self._formatASCIIRange(rmin, rmax))
parts.append("%d/%s"%(order, " ".join(ranges)))
if self.moc.explicitMaxOrder is not None:
parts.append(f"{self.moc.explicitMaxOrder}/")
return " ".join(parts)
[docs] @classmethod
def fromFITS(cls, literal: bytes) -> SMoc:
"""returns an SMoc from a string containing a FITS-serialised MOC.
"""
from pymoc.io.fits import read_moc_fits_hdu # type: ignore
moc = pymoc.MOC()
read_moc_fits_hdu(moc,
pyfits.open(io.BytesIO(literal))[1])
return cls(moc)
@staticmethod
def _adaptToPgSphere(smoc: SMoc) -> AsIs:
return AsIs("smoc '%s'"%(smoc.asASCII()))
@classmethod
def _castFromPgSphere(cls, value: Optional[str], cursor: Any
) -> Optional[SMoc]:
if value is not None:
return cls.fromASCII(value)
return None
[docs] def asPoly(self) -> SPoly:
raise TypeError("MOCs cannot be represented as polygons")
[docs] def asSTCS(self, frame: str) -> str:
# no STCS for MOCs, but this is really just for old-style VOTable
# serialisation, so let's cheat
return "MOC "+self.asASCII()
[docs] def asSMoc(self, order: int=6) -> SMoc:
"""returns a copy of self, normalised for order.
"""
moc = self.moc.copy()
moc.normalize(order)
if self.moc.explicitMaxOrder is not None:
moc.explicitMaxOrder = min(self.moc.explicitMaxOrder, order)
return self.__class__(moc)
[docs] def getPlot(self, **kwargs) -> bytes:
"""returns a png string with a plot visualising this moc.
"""
from pymoc.util.plot import plot_moc # type: ignore
with tempfile.NamedTemporaryFile(suffix=".png") as f:
plot_moc(self.moc, filename=f.name, projection="moll", **kwargs)
return f.read()
[docs] def asFITS(self) -> bytes:
"""returns a standard FITS (table) representation of this MOC.
"""
from pymoc.io.fits import write_moc_fits
with tempfile.NamedTemporaryFile(suffix=".fits") as f:
write_moc_fits(self.moc, f)
f.seek(0)
return f.read()
[docs] def asDALI(self) -> str:
"""returns the string representation of this MOC.
This isn't what DALI actually says at this point, but we suspect
it will say that at some point.
"""
return self.asASCII()
[docs] @classmethod
def fromDALI(cls, literal: str) -> SMoc:
"""returns an SMoc from a MOC string.
(see asDALI)
"""
return cls.fromASCII(literal)
[docs] def asNUNIQs(self) -> List[int]:
"""returns a list of integers usable as nuniqs.
"""
res: List[int] = []
for order, cells in self.moc:
shiftedOrder = 4 * (4 ** order)
res.extend(pix+shiftedOrder for pix in cells)
return res
try:
import psycopg2
from psycopg2.extensions import (register_adapter, AsIs, register_type,
new_type)
if TYPE_CHECKING:
from psycopg2.extensions import connection as Connection
def _query(conn: Connection,
query: str,
pars: Optional[Dict[str, Any]]=None) -> List[Tuple]:
c = conn.cursor()
c.execute(query, pars)
return list(c)
_getPgSClass = codetricks.buildClassResolver(
PgSAdapter,
list(globals().values()),
key=lambda obj: obj.pgType, default=PgSAdapter) # type: ignore
[docs] def preparePgSphere(conn: Connection) -> None:
if hasattr(psycopg2, "_pgsphereLoaded"): # type: ignore
return
try:
oidmap = _query(conn,
"SELECT typname, oid"
" FROM pg_type"
" WHERE typname ~ "
" '^s(point|trans|circle|line|ellipse|poly|path|box|moc)'")
for typeName, oid in oidmap:
cls = _getPgSClass(typeName)
if cls is not PgSAdapter: # base class is null value
register_adapter(cls, cls._adaptToPgSphere)
register_type(
new_type((oid,), "spoint", cls._castFromPgSphere))
psycopg2._pgsphereLoaded = True # type: ignore
conn.commit()
except:
psycopg2._pgsphereLoaded = False # type: ignore
except ImportError: # pragma: no cover
# psycopg2 not installed. Since preparsePgSphere can only be
# called from code depending on psycopg2, there's no harm if
# we don't define it.
pass
if __name__=="__main__": # pragma: no cover
import doctest
doctest.testmod()