"""
Some utility functions to deal with FITS files.
It's unclear if pyfits coming from astropy is now threadsafe, but I'm hoping
it might be, so for now I'm making fitslock a no-op.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import builtins
import datetime
import itertools
import re
import struct
import threading
import warnings
from contextlib import contextmanager
import numpy
# temporary measure to shut up astropy's configuration parser.
builtins._ASTROPY_SETUP_ = True # type:ignore
# the following is the original import of pyfits, and only it should be used
from astropy.io import fits as pyfits # type:ignore
from astropy.utils.exceptions import AstropyDeprecationWarning # type:ignore
warnings.filterwarnings('ignore', category=AstropyDeprecationWarning)
from gavo.utils import codetricks
from gavo.utils import excs
from gavo.utils import ostricks
from gavo.utils.dachstypes import (Any, BinaryIO, Callable, Dict, Filename,
Generator, List, NDArray, Optional, StrOrBytes, StrToStrMap, Tuple)
_FITS_TABLE_LOCK = threading.RLock()
[docs]@contextmanager
def fitsLock():
_FITS_TABLE_LOCK.acquire()
try:
yield
finally:
_FITS_TABLE_LOCK.release()
CARD_SIZE = 80
END_CARD = b'END'+b' '*(CARD_SIZE-3)
FITS_BLOCK_SIZE = CARD_SIZE*36
[docs]class FITSError(Exception):
pass
# pyfits is a bit too liberal in throwing depreciation warnings. Filter them
# for now TODO: Figure out a system to check them now and then
warnings.filterwarnings('ignore', category=DeprecationWarning)
try:
warnings.filterwarnings('ignore', category=pyfits.PyfitsDeprecationWarning)
except AttributeError: # pyfits too old to produce these: Good.
pass
[docs]def padCard(input: StrOrBytes, length: int=CARD_SIZE) -> bytes:
"""pads input (a string or bytes) with blanks until len(result)%80=0
The length keyword argument lets you adjust the "card size". Use
this to pad headers with length=FITS_BLOCK_SIZE
This always returns bytes.
>>> padCard("")
b''
>>> len(padCard(b"end"))
80
>>> len(padCard(b"whacko"*20))
160
>>> len(padCard("junkodumnk"*17, 27))%27
0
"""
# This is like pyfits._pad, but I'd rather not depend on pyfits internals
# to much.
if isinstance(input, str):
input = input.encode("ascii")
l = len(input)
if not l%length:
return input
return input+b' '*(length-l%length)
[docs]def parseCards(aString: StrOrBytes) -> pyfits.Header:
"""returns a list of pyfits Cards parsed from aString.
This will raise a ValueError if aString's length is not divisible by 80.
It may also raise pyfits errors for malformed cards.
Empty (i.e., all-whitespace) cards are ignored. If an END card is
encountered processing is aborted.
"""
if isinstance(aString, str):
aString = aString.encode("ascii")
cards = []
if len(aString)%CARD_SIZE:
raise ValueError("parseCards argument has impossible length %s"%(
len(aString)))
for offset in range(0, len(aString), CARD_SIZE):
rawCard = aString[offset:offset+CARD_SIZE]
if rawCard==END_CARD:
break
if not rawCard.strip():
continue
cards.append(pyfits.Card().fromstring(rawCard.decode("ascii")))
return cards
# enforced sequence of well-known keywords, and whether they are mandatory
STANDARD_CARD_SEQUENCE = [
("SIMPLE", True),
("BITPIX", True),
("NAXIS", True),
("NAXIS1", False),
("NAXIS2", False),
("NAXIS3", False),
("NAXIS4", False),
("EXTEND", False),
("BZERO", False),
("BSCALE", False),
]
def _iterForcedPairs(seq: List) -> Generator:
"""helps _enforceHeaderConstraints.
"""
if seq is None:
return
for item in seq:
if isinstance(item, tuple):
yield item
else:
yield (item, False)
def _enforceHeaderConstraints(
cardList: List[pyfits.Card],
cardSequence: Optional[List[Tuple[str, bool]]]):
"""returns a pyfits header containing the cards in cardList with FITS
sequence constraints satisfied.
This may raise a FITSError if mandatory cards are missing.
This will only work correctly for image FITSes with less than five
dimensions.
On cardSequence, see sortHeaders (except that this function always
requires sequences of pairs).
"""
# I can't use pyfits.verify for this since cardList may not refer to
# a data set that's actually in memory
cardsAdded = set()
newCards: List[pyfits.Card] = []
cardDict = dict((card.keyword, card) for card in cardList)
for kw, mandatory in itertools.chain(
STANDARD_CARD_SEQUENCE, _iterForcedPairs(cardSequence or [])):
if isinstance(kw, pyfits.Card):
newCards.append(kw)
continue
if kw in cardsAdded:
continue
try:
newCards.append(cardDict[kw])
cardsAdded.add(kw)
except KeyError:
if mandatory:
raise FITSError("Mandatory card '%s' missing"%kw)
for card in cardList: # use cardList rather than cardDict to maintain
# cardList order
if card.keyword not in cardsAdded or card.keyword=='':
newCards.append(card)
return pyfits.Header(newCards)
[docs]def openFits(fitsName: Filename) -> pyfits.HDUList:
"""returns the hdus for fName.
This is simulating legacy pyfits behaviour in that it will use
memmap I/O if it thinks that works. You'll probably want to use
astropy directly these days if this gives you a headache.
"""
hdus = pyfits.open(fitsName, memmap=True)
# memmapping will bomb later if data needs to be scaled, so we
# guess now if that's going to happen
if (hdus[0].header.get("BZERO", 0)!=0
or hdus[0].header.get("BSCALE", 1)!=1
or "BLANK" in hdus[0].header):
hdus.close()
hdus = pyfits.open(fitsName, memmap=False)
return hdus
[docs]def getAxisLengths(hdr: pyfits.Header) -> List[int]:
"""returns a sequence of the lengths of the axes of a FITS image
described by hdr.
"""
return [hdr["NAXIS%d"%i] for i in range(1, hdr["NAXIS"]+1)]
_FitsCuts = Tuple[Optional[float], ...]
[docs]def cutoutFITS(hdu: pyfits.ImageHDU, *cuts: _FitsCuts) -> pyfits.ImageHDU:
"""returns a cutout of hdu restricted to cuts.
hdu is a primary FITS hdu. cuts is a list of cut specs, each of which is
a triple (axis, lower, upper). axis is between 1 and naxis, lower and
upper a 1-based pixel coordinates of the limits, and "border" pixels
are included. Specifications outside of the image are legal and will
be cropped back. Open limits are supported via a specification of
None.
If an axis would vanish (i.e. length 0 or less), the function fudges
things such that the axis gets a length of 1.
axis is counted here in the FORTRAN/FITS sense, *not* in the C sense,
i.e., axis=1 cuts along NAXIS1, which is the *last* index in a numpy
array.
WCS CRPIXes in hdu's header will be updated. Axes and specified will
not be touched. It is an error to specify cuts for an axis twice
(behaviour is undefined).
Note that this will lose all extensions the original FITS file might have
had.
"""
slices, newHeader = cutoutHeader(hdu.header, *cuts)
slices.reverse()
newHDU = hdu.__class__(data=hdu.data[tuple(slices)].copy(order='C'),
header=newHeader)
# Fix astropy 1.3 foulup
if "BZERO" in newHeader:
newHDU.header["BZERO"] = newHeader["BZERO"]
if "BSCALE" in newHeader:
newHDU.header["BSCALE"] = newHeader["BSCALE"]
return newHDU
NUM_CODE = {
8: 'uint8',
16: '>i2',
32: '>i4',
64: '>i8',
-32: '>f4',
-64: '>f8'}
def _makeDecoder(hdr: pyfits.Header) -> Callable[[BinaryIO], NDArray]:
"""returns a decoder for the rows of FITS primary image data.
The decoder is called with an open file and returns the next row.
You need to keep track of the total number of rows yourself.
"""
numType = NUM_CODE[hdr["BITPIX"]]
rowLength = hdr["NAXIS1"]
nBytes = abs(rowLength*hdr["BITPIX"]//8)
bzero, bscale = hdr.get("BZERO", 0), hdr.get("BSCALE", 1)
if bzero!=0 or bscale!=1:
def read(f: BinaryIO) -> NDArray:
return numpy.asarray(
numpy.frombuffer(f.read(nBytes), numType, rowLength),
'float32')*bscale+bzero
else:
def read(f: BinaryIO) -> NDArray: #noflake: conditional definition
return numpy.frombuffer(f.read(nBytes), numType, rowLength)
return read
[docs]def iterFITSRows(hdr: pyfits.Header, f: BinaryIO
) -> Generator[NDArray, None, None]:
"""iterates over the rows of a FITS (primary) image.
You pass in a FITS header and a file positioned to the start of
the image data.
What's returned are 1d numpy arrays of the datatype implied by bitpix. The
function understands only very basic FITSes (BSCALE and BZERO are known,
though, and lead to floats arrays).
We do this ourselves since pyfits may pull in the whole thing or at least
mmaps it; both are not attractive when I want to stream-process large
images.
"""
decoder = _makeDecoder(hdr)
for col in range(hdr["NAXIS2"]):
yield decoder(f)
def _iterSetupFast(inFile: BinaryIO, hdr: pyfits.Header
) -> Tuple[pyfits.Header, Generator[NDArray, None, None]]:
"""helps iterScaledRows for the case of a simple, real-file FITS.
"""
if hdr is None:
hdr = readPrimaryHeaderQuick(inFile)
if hdr["NAXIS"]==0:
# presumably a compressed FITS
return _iterSetupCompatible(inFile, hdr)
return hdr, iterFITSRows(hdr, inFile)
[docs]def fixImageExtind(hdus: List, extInd: int) -> int:
"""returns the index of a compressed image HDU if it immediately
follows extInd.
If hdus[extInd] is a CompImageHDU itself, this will return extInd
unchanged.
"""
if isinstance(hdus[extInd], pyfits.CompImageHDU):
return extInd
try:
if isinstance(hdus[extInd+1], pyfits.CompImageHDU):
return extInd+1
except IndexError:
# no extension following, least of all a CompImageHDU. Fall through
pass
return extInd
def _iterSetupCompatible(
inFile: BinaryIO, hdr: pyfits.Header, extInd: int=0
) -> Tuple[pyfits.Header, Generator[NDArray, None, None]]:
"""helps iterScaledRows for when _iterSetupFast will not work.
Using extInd, you can select a different extension. extInd=0
will automatically select extension 1 if that's a compressed image
HDU.
"""
hdus = pyfits.open(inFile)
extInd = fixImageExtind(hdus, extInd)
def iterRows():
for row in hdus[extInd].data[:,:]:
yield row
return hdus[extInd].header, iterRows()
[docs]def iterScaledRows(
inFile: BinaryIO,
factor: Optional[float]=None,
destSize: Optional[int]=None,
hdr: Optional[pyfits.Header]=None,
slow: bool=False,
extInd: int=0) -> Generator[NDArray, None, None]:
"""iterates over numpy arrays of pixel rows within the open FITS
stream inFile scaled by it integer in factor.
The arrays are always float32, regardless of the input. When the
image size is not a multiple of factor, border pixels are discarded.
A FITS header for this data can be obtained using shrinkWCSHeader.
You can pass in either a factor the file is to be scaled down, or
an approximate size desired. If both are given, factor takes precedence,
if none is given, it's an error.
If you pass in hdr, it will not be read from inFile; the function then
expects the file pointer to point to the start of the first data block.
Use this if you've already read the header of a non-seekable FITS.
extInd lets you select a different extension. extInd=0 means the first
image HDU, which may be in extension 1 for compressed images.
iterScaledRows will try to use a hand-hacked interface guaranteed to
stream. This only works for plain, 2D-FITSes from real files.
iterScaledRows normally notices when it should fall back to
pyfits code, but if it doesn't you can pass slow=True.
"""
if hasattr(inFile, "read") and not slow and extInd==0:
hdr, rows = _iterSetupFast(inFile, hdr)
else:
hdr, rows = _iterSetupCompatible(inFile, hdr, extInd)
if factor is None:
if destSize is None:
raise excs.DataError("iterScaledRows needs either factor or destSize.")
size = max(hdr["NAXIS1"], hdr["NAXIS2"])
factor = max(1, size//destSize+1)
factor = int(factor)
assert factor>0
rowLength = hdr["NAXIS1"]
destRowLength = rowLength//factor
summedInds = list(range(factor))
for index in range(hdr["NAXIS2"]//factor):
newRow = numpy.zeros((rowLength,), 'float32')
for i in summedInds:
try:
newRow += next(rows)
except StopIteration:
break
newRow /= factor
# horizontal scaling via reshaping to a matrix and then summing over
# its columns...
newRow = newRow[:destRowLength*factor]
# mypy 1.0.1 type inference bad
yield sum(numpy.transpose( # type: ignore
(newRow/factor).reshape((destRowLength, factor))))
[docs]def iterScaledBytes(
inFileName: Filename,
factor: float,
extraCards: StrToStrMap={}) -> Generator[bytes, None, None]:
"""iterates over the bytes for a simple FITS file generated by scaling
down the 2D image inFileName by factor.
factor must be an integer between 1 and something reasonable.
This function acquires the FITS lock itself.
"""
with fitsLock():
with open(inFileName, "rb") as f:
oldHdr = readPrimaryHeaderQuick(f)
newHdr = shrinkWCSHeader(oldHdr, factor)
newHdr.update(extraCards)
yield serializeHeader(newHdr)
for row in iterScaledRows(f, factor, hdr=oldHdr):
# Unfortunately, numpy's byte swapping for floats is broken in
# many wide-spread revisions. So, we cannot do the fast
# yield row.newbyteorder(">").tostring()
# but rather, for now, have to try the slow:
yield struct.pack("!%df"%len(row), *row)
[docs]class WCSAxis(object):
"""represents a single 1D WCS axis and allows easy metadata discovery.
You'll usually use the fromHeader constructor.
The idea of using this rather than astropy.wcs or similar is that this is
simple and robust. It doesn't know many of the finer points of WCS,
though, and in particular it's 1D only.
However, for the purposes of cutouts it probably should do for the
overwhelming majority of non-spatial FITS axes.
The default pixel coordinates are handled in the FITS sense here,
i.e., the first pixel has the index 1. Three are methods that have
pix0 in their names; these assume 0-based arrays. All the transforms
return Nones unchanged.
To retrieve the metadata shoved in, use the name, crval, crpix, cdelt,
ctype, cunit, and axisLength attributes.
"""
def __init__(self,
name: str,
crval: float,
crpix: float,
cdelt: float,
ctype: str="UNKNOWN",
cunit: Optional[str]="",
axisLength: int=1):
assert cdelt!=0
self.crval, self.crpix, self.cdelt = crval, crpix, cdelt
self.ctype, self.cunit, self.axisLength = ctype, cunit, axisLength
self.name = name
[docs] def pixToPhys(self, pixCoo: float) -> float:
"""returns the physical value for a 1-based pixel coordinate.
"""
if pixCoo is None:
return None
return self.crval+(pixCoo-self.crpix)*self.cdelt
[docs] def pix0ToPhys(self, pix0Coo: float) -> float:
"""returns the physical value for a 0-based pixel coordinate.
"""
if pix0Coo is None:
return None
return self.pixToPhys(pix0Coo+1)
[docs] def physToPix(self, physCoo: float) -> float:
"""returns a 1-based pixel coordinate for a physical value.
"""
if physCoo is None:
return None
return (physCoo-self.crval)/self.cdelt+self.crpix
[docs] def physToPix0(self, physCoo: float) -> float:
"""returns a 0-based pixel coordinate for a physical value.
"""
if physCoo is None:
return None
return self.physToPix(physCoo)-1
[docs] def getLimits(self) -> Tuple[float, float]:
"""returns the minimal and maximal physical values this axis
takes within the image.
"""
limits = self.pixToPhys(1), self.pixToPhys(self.axisLength)
return min(limits), max(limits)
# WCSAxis.fromHeader wrapper for rmkfuncs
[docs]@codetricks.document
def getWCSAxis(
header: pyfits.Header,
axisIndex: int,
forceSeparable: bool=False) -> WCSAxis:
"""returns a WCSAxis instance from an axis index and a FITS header.
If the axis is mentioned in a transformation matrix (CD or PC),
a ``ValueError`` is raised (use ``forceSeparable`` to override).
The ``axisIndex`` is 1-based; to get a transform for the axis described
by CTYPE1, pass 1 here.
The object returned has methods like ``pixToPhys``, ``physToPix`` (and their
``pix0`` brethren), and ``getLimits``.
Note that at this point WCSAxis only supports linear transforms (it's
a DaCHS-specific implementation). We'll extend it on request.
"""
return WCSAxis.fromHeader(header, axisIndex, forceSeparable)
[docs]class ESODescriptorsError(excs.SourceParseError):
"""is raised when something goes wrong while parsing ESO descriptors.
"""
def _makeNamed(name: str, re: str) -> str:
# A helper to construct REs for the ESO parser.
return "(?P<%s>%s)"%(name, re)
def _makeTypeScanner(
typeName: str,
literalRE: re.Pattern,
constructor: Callable[[str], Any]):
"""returns a scanner method for the _ESODescriptorsParser.
This is a helper method for building the class; typeName
must be the same as the name in _scan_name. constructor is
a function that turns string literals into objects of the desired
type.
"""
def scanner(self) -> str:
mat = literalRE.match(self.data, self.curPos)
if not mat:
raise ValueError("Expected a %s here"%typeName)
self.curPos = mat.end()
self.curCol.append(constructor(mat.group()))
self.yetToRead -= 1
if self.yetToRead==0:
return "harvest"
else:
return typeName
return scanner
class _ESODescriptorsParser:
"""an ad-hoc parser for ESO's descriptors.
These are sometimes in FITS files produced by MIDAS. What I'm pretending
to parse here is the contatenation of the cards' values without the
boundaries.
The parse is happening at construction time, after which you fetch the result
in the result attribute.
I'm adhoccing this. If someone digs up the docs on what the actual
grammar is, I'll do it properly.
"""
stringPat = "'[^']*'"
intPat = r"-?\d+"
floatPat = r"-?\d+\.\d+E[+-]\d+"
white = r"\s+"
nextToken = r"\s*"
headerSep = nextToken+','+nextToken
headerRE = re.compile(nextToken
+_makeNamed("colName", stringPat)+headerSep
+_makeNamed("typeCode", stringPat)+headerSep
+_makeNamed("startInd", intPat)+headerSep
+_makeNamed("endInd", intPat)+headerSep
+stringPat+headerSep
+stringPat+headerSep
+stringPat)
floatRE = re.compile(nextToken+floatPat)
integerRE = re.compile(nextToken+intPat)
def __init__(self, data: str):
self.data, state = data, "header"
self.result: Dict[str, List[float]] = {}
self.curPos:int = 0
while state!="end":
try:
state = getattr(self, "_scan_"+state)()
except Exception as msg:
raise ESODescriptorsError(str(msg),
location="character %s"%self.curPos,
offending=repr(self.data[self.curPos:self.curPos+20]))
@classmethod
def fromFITSHeader(cls, hdr: pyfits.Header) -> '_ESODescriptorsParser':
"""returns a parser from the data in the pyfits hdr.
"""
descLines, collecting = [], False
for card in hdr.cards:
if card.keyword=="HISTORY":
if " ESO-DESCRIPTORS END" in card.value:
collecting = False
if collecting:
descLines.append(card.value)
if " ESO-DESCRIPTORS START" in card.value:
collecting = True
return cls("\n".join(descLines))
def _scan_header(self) -> str:
"""reads the next descriptor header and returns a type name for it.
"""
mat = self.headerRE.match(self.data, self.curPos)
if not mat:
if not self.data[self.curPos:].strip():
return "end"
else:
raise ValueError("Could not find next header")
self.curPos = mat.end()
self.curCol: List[float] = []
self.curColName = mat.group("colName")[1:-1]
self.yetToRead = int(mat.group("endInd"))-int(mat.group("startInd"))+1
if mat.group("typeCode")[1:-1].startswith("R"):
return "float"
elif mat.group("typeCode")[1:-1].startswith("I"):
return "integer"
else:
raise ValueError("Unknown type code %s"%mat.group("typeCode"))
def _scan_harvest(self):
"""enter a parsed column into result and prepare for the next column.
"""
self.result[self.curColName] = self.curCol
del self.curCol
del self.curColName
del self.yetToRead
return "header"
_scan_float = _makeTypeScanner("float", floatRE, float)
_scan_integer = _makeTypeScanner("integer", integerRE, int)
del _makeTypeScanner
[docs]def parseESODescriptors(hdr: pyfits.Header) -> Dict[str, List[float]]:
"""returns parsed ESO descriptors from a pyfits header hdr.
ESO descriptors are data columns stuck into FITS history lines.
They were produced by MIDAS. This implementation was made
without actual documentation, is largely based on conjecture,
and is certainly incomplete.
What's returned is a dictionary mapping column keywords to lists of
column values.
"""
return _ESODescriptorsParser.fromFITSHeader(hdr).result
if __name__=="__main__": # pragma: no cover
import doctest
doctest.testmod()