"""
Creation of resource descriptors
The first part of this is an early experiment to automatically create
resource descriptors from structured data representations.
While parts of this may be recoverable for smarter gavo start functionality,
doing this so that the result is actually useful is hard.
Instead, the new gavo start functionality just fetches one of a few
commented RD templates, fills out a thing or two and leaves the rest to
the operator.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import datetime
import os
import re
import sys
import pkg_resources
from gavo import base
from gavo import grammars
from gavo import rscdef
from gavo import votable
from gavo import utils
from gavo.base import macros
from gavo.grammars import fitsprodgrammar
from gavo.grammars import fitstablegrammar
from gavo.formats import votableread
from gavo.utils import ElementTree
from gavo.utils import fitstools
MS = base.makeStruct
FT_TYPE_MAP = {
"L": "boolean",
"B": "bytea",
"A": "char",
"I": "smallint",
"J": "integer",
"K": "bigint",
"E": "real",
"D": "double precision",
}
# ======================= Begin deprecated ui and implementation ===========
ignoredFITSHeaders = set(["COMMENT", "SIMPLE", "BITPIX", "EXTEND",
"NEXTEND", "SOFTNAME", "SOFTVERS", "SOFTDATE", "SOFTAUTH", "SOFTINST",
"HISTORY", "BZERO", "BSCALE", "DATAMIN", "DATAMAX"])
wcsKey = re.compile("CD.*|CRVAL.*|CDELT.*|NAXIS.*|CRPIX.*|CTYPE.*|CUNIT.*"
"|CROTA.*|RADECSYS|AP?_\d_\d|BP?_\d_\d|LATPOLE|LONPOLE")
[docs]def isIgnoredKeyword(kw):
"""returns true if kw should not be translated or put into the table.
This is important for all WCS keywords when you want to compute
SIAP bboxes; these keywords must not be translated.
"""
return kw in ignoredFITSHeaders or wcsKey.match(kw)
[docs]def structToETree(aStruct):
"""returns an ElementTree for the copyable content of aStruct.
Note that due to manipulations at parse time and non-copyable content,
this will, in general, not reproduce the original XML trees.
"""
nodeStack = [ElementTree.Element(aStruct.name_)]
for evType, elName, value in aStruct.iterEvents():
try:
if evType=="start":
nodeStack.append(ElementTree.SubElement(nodeStack[-1], elName))
elif evType=="end":
nodeStack.pop()
elif evType=="value":
if value is None or value is base.NotGiven:
continue
if elName=="content_":
nodeStack[-1].text = value
else:
if not isinstance(value, str):
# TODO: figure out if something is a reference by inspecting
# the attribute definition; meanwhile, just assume it is:
value = value.id
nodeStack[-1].set(elName, value)
else:
raise base.Error("Invalid struct event: %s"%evType)
except:
base.ui.notifyError("Badness occurred in element %s, event %s,"
" value %s\n"%(elName, evType, value))
raise
return nodeStack[-1]
[docs]def makeTableFromFT(rd, srcName, opts):
from gavo.utils import pyfits
cols, nameMaps = [], {}
nameMaker = base.VOTNameMaker()
hdus = pyfits.open(srcName)
for fitsCol in hdus[1].columns:
destName = nameMaker.makeName(fitsCol)
if destName!=fitsCol.name:
nameMaps[destName] = fitsCol.name
cols.append(MS(rscdef.Column,
type=getTypeForFTFormat(fitsCol.format, fitsCol.name),
name=destName,
unit=fitsCol.unit,
ucd="",
description="FILL IN"))
table = rscdef.TableDef(rd, id=opts.tableName, onDisk=True,
columns=cols)
table.nameMaps = nameMaps
hdus.close()
return table
[docs]def makeDataForFT(rd, srcName, opts):
targetTable = rd.tables[0]
# nameMaps left by makeTableFromFT
rmkMaps =[MS(rscdef.MapRule, key=key, content_="vars['%s']"%src)
for key, src in targetTable.nameMaps.items()]
grammar = MS(fitstablegrammar.FITSTableGrammar)
sources = MS(rscdef.SourceSpec,
pattern=["*.fits"], recurse=True)
rowmaker = MS(rscdef.RowmakerDef, id="gen_rmk", idmaps="*", maps=[rmkMaps])
make = MS(rscdef.Make,
table=targetTable,
rowmaker=rowmaker)
return MS(rscdef.DataDescriptor,
id="import",
sources=sources,
grammar=grammar,
rowmakers=[rowmaker],
make=make)
[docs]def makeTableFromFITS(rd, srcName, opts):
keyMappings = []
table = rscdef.TableDef(rd, id=opts.tableName, onDisk=True)
hdus = fitstools.openFits(srcName)
headerCards = hdus[0].header.cards
for index, card in enumerate(headerCards):
if isIgnoredKeyword(card.keyword):
continue
colName = re.sub("[^a-z]", "_", card.keyword.lower())
if not colName:
continue
if isinstance(card.value, str):
type = "text"
elif isinstance(card.value, int):
type = "integer"
else:
type = "real"
table.feedObject("column", MS(rscdef.Column,
name=colName, unit="FILLIN", ucd="FILLIN", type=type,
description=card.comment))
keyMappings.append((colName, card.keyword))
rd.setProperty("mapKeys", ", ".join("%s:%s"%(v,k) for k,v in keyMappings))
hdus.close()
return table.finishElement()
[docs]def makeDataForFITS(rd, srcName, opts):
targetTable = rd.tables[0]
dd = rscdef.DataDescriptor(rd, id="import_"+opts.tableName)
grammar = fitsprodgrammar.FITSProdGrammar(dd)
grammar.feedObject("qnd", True)
rowfilter = base.parseFromString(grammars.Rowfilter, """
<rowfilter procDef="//products#define">
<bind key="table">"%s"</bind>
<bind key="owner">"FILLIN"</bind>
<bind key="embargo">"FILLIN"</bind>
</rowfilter>"""%(targetTable.getQName()))
grammar.feedObject("rowfilter", rowfilter)
grammar.feedObject("mapKeys", MS(grammars.MapKeys,
content_=rd.getProperty("mapKeys")))
grammar.finishElement()
dd.grammar = grammar
dd.feedObject("sources", MS(rscdef.SourceSpec,
pattern=["*.fits"], recurse=True))
dd.feedObject("rowmaker", MS(rscdef.RowmakerDef, idmaps="*", id="gen_rmk"))
dd.feedObject("make", MS(rscdef.Make, table=targetTable, rowmaker="gen_rmk"))
return dd
[docs]def makeTableFromVOTable(rd, srcName, opts):
with open(srcName, "rb") as f:
rawTable = next(votable.parse(f))
return votableread.makeTableDefForVOTable(opts.tableName,
rawTable.tableDefinition, onDisk=True)
[docs]def makeDataForVOTable(rd, srcName, opts):
rowmaker = MS(rscdef.RowmakerDef, id="makerows_"+opts.tableName,
idmaps="*")
# The qualifiedId monkeying is necessary since otherwise the
# ReferenceAttribute.unparse thinks it's ok to return the objects raw.
# Face it: I've not really had serialization in mind when writing all
# this.
rowmaker.qualifiedId = rowmaker.id
rd.tables[0].qualifiedId = rd.tables[0].id
return MS(rscdef.DataDescriptor,
grammar=MS(rscdef.getGrammar("voTableGrammar")),
sources=MS(rscdef.SourceSpec, pattern=srcName),
rowmaker=rowmaker,
makes=[MS(rscdef.Make, table=rd.tables[0], rowmaker=rowmaker)])
tableMakers = {
"FITS": makeTableFromFITS,
"VOT": makeTableFromVOTable,
"FT": makeTableFromFT,
}
dataMakers = {
"FITS": makeDataForFITS,
"VOT": makeDataForVOTable,
"FT": makeDataForFT,
}
[docs]def makeRD(args, opts):
from gavo import rscdesc
rd = rscdesc.RD(None, schema=os.path.basename(opts.resdir),
resdir=opts.resdir)
for key, value in [
("title", "FILL-IN"),
("creationDate", utils.formatISODT(datetime.datetime.utcnow())),
("description", "FILL-IN a long text (and maybe do format='plain'"
" or even format='rst'"),
("copyright", "Free to use."),
("creator.name", "Author, S."),
("creator", ""),
("creator.name", "Other, A."),
("subject", "One Keyword"),
("subject", "Two Words"),
("content.type", "Catalog"),
("coverage.waveband", "Optical"),
("coverage.profile", "AllSky ICRS"),]:
rd.addMeta(key, value)
rd.feedObject("table", tableMakers[opts.srcForm](rd, args[0], opts))
rd.feedObject("data", dataMakers[opts.srcForm](rd, args[0], opts))
return rd.finishElement()
[docs]def indent(elem, level=0):
i = "\n" + level*"\t"
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + "\t"
if not elem.tail or not elem.tail.strip():
elem.tail = i
for child in elem:
indent(child, level+1)
if not child.tail or not child.tail.strip():
child.tail = i
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
[docs]def writePrettyPrintedXML(root):
indent(root)
ElementTree.ElementTree(root).write(sys.stdout.buffer, encoding="utf-8")
[docs]def parseCommandLine():
from optparse import OptionParser
parser = OptionParser(usage = "%prog [options] <sample>"
" DEPRECATED, use dachs start instead")
parser.add_option("-f", "--format", help="Input file format: "
" FITS, VOT or FT (FITS table)"
" Default: Detect from file name", dest="srcForm", default=None,
action="store", type="str")
parser.add_option("-t", "--table-name", help="Name of the generated table",
dest="tableName", default="main", action="store", type="str")
parser.add_option("-r", "--resdir", help="Override resdir (and schema)",
dest="resdir", default=os.getcwd(), action="store", type="str")
opts, args = parser.parse_args()
if len(args)!=1:
parser.print_help(file=sys.stderr)
sys.exit(1)
if not opts.srcForm:
ext = os.path.splitext(args[0])[1].lower()
if ext in set([".xml", ".vot"]):
opts.srcForm = "VOT"
elif ext==".fits":
opts.srcForm = "FITS"
else:
sys.stderr.write("Cannot guess format, use -f option: %s\n"%args[0])
parser.print_help(file=sys.stderr)
sys.exit(1)
return opts, args
[docs]def main():
# hack to make id and onDisk copyable so we see them on iterEvent
rscdef.TableDef._id.copyable = rscdef.TableDef._onDisk.copyable = True
rscdef.DataDescriptor._id.copyable = True
opts, args = parseCommandLine()
rd = makeRD(args, opts)
rd._metaAttr.copyable = True
eTree = structToETree(rd)
writePrettyPrintedXML(eTree)
# ======================= End deprecated ui and implementation ===========
[docs]def iterColAttrsFITS(table):
"""yields dicts of column attributes from a FITS binary table HDU.
"""
hdr = table.header
for colInd in range(1, hdr["TFIELDS"]+1):
colMeta = [
("name", re.sub("[^0-9a-z_]", "_", hdr[f"TTYPE{colInd}"].lower())),
("type", getTypeForFTFormat(
hdr[f"TFORM{colInd}"], hdr[f"TTYPE{colInd}"])),
("ucd", hdr.get(f"TUCD{colInd}", None)),
("utype", hdr.get(f"TUTYP{colInd}", None)),
("unit", hdr.get(f"TUNIT{colInd}", None)),
("description", hdr.get(f"TCOMM{colInd}", None
) or hdr.cards[f"TTYPE{colInd}"].comment or ""),
("verbLevel", 1),]
yield dict((k,v) for k,v in colMeta if v is not None)
[docs]def iterColAttrsVOTable(inFile):
"""tries to parse (binary) inFile as a VOTable and yields colAttrs
dictionaries for all FIELDs in it.
"""
for el in votable.parse(inFile, watchset=[votable.V.FIELD]):
if isinstance(el, votable.V.FIELD):
colMeta = [
("name", el.name),
("unit", el.unit),
("ucd", el.ucd or ""),
("utype", el.utype),
("type", base.voTableToSQLType(el.datatype, el.arraysize, el.xtype)),
("description", el.getDescription()),
("verbLevel", 1),]
yield dict((k,v) for k,v in colMeta if v is not None)
[docs]class VizBBBParser(object):
"""A simple and naive parser for VizieR Byte-by-Byte descriptions.
The results are available in the colAttrs and bytesForCols attributes.
"""
bbbLinePat = re.compile(
r"\s*(?P<from>\d+)\s*-\s*(?P<to>\d+)"
r"\s+(?P<format>[A-Z0-9.]+)"
r"\s+(?P<units>[^\s]*)"
r"\s+(?P<label>[^\s]*)"
r"\s+(?P<description>.*)")
tableBorderPat = re.compile("--------+")
def __init__(self):
self.state = "scanning"
self.infoSoFar = {}
self.colAttrs = []
self.bytesForCols = []
def _parse_finished(self, ln):
"""we've found and parsed the BBB table.
"""
pass
def _parse_scanning(self, ln):
"""we're waiting for the table header of the first BBB block.
"""
if ln.startswith("Byte-by-byte Description"):
self.state = "skip_header"
def _parse_skip_header(self, ln):
"""we're skipping over the table header of the first BBB block.
"""
mat = self.bbbLinePat.match(ln)
if mat:
self.state = "bbb_colhead"
self._parse_bbb_colhead(ln, mat)
elif self.tableBorderPat.match(ln) or re.match("\s*Bytes", ln):
pass
else:
raise Exception(f"Expected BBB table, got '{ln}'")
def _parse_bbb_colhead(self, ln, mat=None):
"""we're expecting either a BBB column or the end of the table.
"""
if mat is None:
mat = self.bbbLinePat.match(ln)
if not mat:
if self.tableBorderPat.match(ln):
self.state = "finished"
else:
raise Exception("Expected BBB column def, got '{ln}'")
self.infoSoFar = mat.groupdict()
self.state = "bbb_moreexpl"
def _parse_bbb_moreexpl(self, ln):
"""we're expecting either a BBB column, a continuation line for the
explanation, or the end of the table.
In this state, there's non-shipped column data in infoSoFar.
"""
mat = self.bbbLinePat.match(ln)
if mat:
self._shipout()
self.state = "bbb_colhead"
self._parse_bbb_colhead(ln, mat)
elif self.tableBorderPat.match(ln):
self._shipout()
self.state = "finished"
else:
self.infoSoFar["description"] += (" "+ln.strip())
def _getTypeForTypeCode(self, typeCode):
"""returns one of our SQL types for a VizieR BBB type code.
"""
baseType = typeCode[0]
if baseType=="A":
return "text"
elif baseType=="I":
return {
"I1": "smallint",
"I2": "smallint",
"I4": "integer"}.get(typeCode, "bigint")
elif baseType in "FE":
if int(typeCode[1:].split(".")[0])>7:
return "double precision"
else:
return "real"
def _shipout(self):
"""takes infoSoFar and makes bytesForCols and colAttrs entries from
it.
"""
unit = self.infoSoFar["units"]
rec = [
("name", self.infoSoFar["label"]),
("unit", None if unit=="---" else unit),
("ucd", ""),
("verbLevel", "1"),
("type", self._getTypeForTypeCode(self.infoSoFar["format"])),
("description", self.infoSoFar["description"].strip())]
self.colAttrs.append(dict((k,v) for k,v in rec if v is not None))
self.bytesForCols.append("{label}: {from}-{to}".format(**self.infoSoFar))
[docs] def feed(self, line):
getattr(self, "_parse_"+self.state)(line)
[docs]def iterColAttrsViz(inFile, dumpRanges=False):
"""tries to parse (text) inFile as a VizieR-Style bytes-by-bytes
description and yields colAttrs for the first described table.
dumpRanges is for interactive use, where the function additionally
dumps a colDefs content to stdout.
"""
parser = VizBBBParser()
for ln in inFile:
parser.feed(ln)
if parser.state!="finished":
base.ui.notifyWarning("Probably invalid VizieR byte-by-byte: parser"
f" ended up in {parser.state}")
if dumpRanges:
print("\n".join(parser.bytesForCols)+"\n")
return parser.colAttrs
COLUMN_ELEMENT_TEMPLATES = ['<column',
' name="{name}"', ' type="{type}"',
'\n ', 'unit="{unit}" ', 'ucd="{ucd}"',
'\n utype="{utype}"',
'\n tablehead="{tablehead}"',
'\n description="{description}"',
'\n displayHint="{displayHint}"',
'\n verbLevel="{verbLevel}"',
"/>"]
[docs]def getColumnXML(colAttrs):
"""returns formatted XML for a sequence for column attributes.
colAttrs is a sequence of dicts with fillers into COLUMN_ELEMENT_TEMPLATES;
the must already be escaped for use in XML attributes.
"""
accum = []
for fillers in colAttrs:
accum.append(formatColumnElement(fillers))
return "\n".join(accum)
[docs]def parseGenColCommandLine():
import argparse
parser = argparse.ArgumentParser(description="Write column definitions"
" for representing a table")
parser.add_argument("table", metavar="FILE", action="store",
type=str, help="Source file to generate the columns from."
" This can be a FITS, a VOTable, or Vizier Column-by-column"
" description, and DaCHS will guess based on the extension. Use"
" the format option if it guesses wrong")
parser.add_argument("-f", "--format", dest="format", action="store",
choices=["fits", "vot", "viz"],
help="Format of the input table: FITS binary, VOTable, or Vizier"
" byte-by-byte.")
return parser.parse_args()
[docs]def gencol():
"""a UI function generating columns element from FITS binary tables.
"""
from gavo.utils import pyfits
args = parseGenColCommandLine()
if args.table.endswith(".fits") or args.format=="fits":
try:
table = pyfits.open(args.table)[1]
colIterator = iterColAttrsFITS(table)
except Exception as exc:
base.ui.notifyError(f"Cannot open {args.table} as a FITS binary table: "
+str(exc))
sys.exit(1)
elif (args.table.endswith(".vot")
or args.table.endswith(".xml")
or args.format=="vot"):
try:
with open(args.table, "rb") as f:
colIterator = list(iterColAttrsVOTable(f))
except Exception as exc:
base.ui.notifyError(f"Cannot open {args.table} as a VOTable: "
+str(exc))
elif (args.table.endswith(".txt")
or args.table=="README"
or args.format=="viz"):
try:
with open(args.table, "rb") as f:
colIterator = iterColAttrsViz(f, True)
except Exception as exc:
base.ui.notifyError(f"Cannot open {args.table} as a VizieR"
" Byte-by-Byte file: "+str(exc))
else:
base.ui.notifyError(f"Do not know how to read {args.table}. Please"
" use --format")
sys.exit(1)
print(utils.fixIndentation(
getColumnXML(
colIterator), " "))
# ====================== Begin templating =========================
def _listProtocols():
"""writes to stdout a list of protocols we can generate RD templates
for.
This is read from the resources coming with DaCHS, where we evaluate
a magic =tpldesc Desc= string that must be embedded in the template.
"""
templateDir ="resources/src/"
for fName in pkg_resources.resource_listdir(
"gavo", templateDir):
mat = re.match(r"template-(.*)\.rd_$", fName)
if not mat:
continue
protName = mat.group(1)
with pkg_resources.resource_stream('gavo', templateDir+fName) as f:
tplHead = f.read(1000).decode("utf-8")
mat = re.search(r"\\tpldesc{(.*?)}", tplHead)
if not mat:
continue
protDesc = mat.group(1)
print("%s -- %s"%(protName, protDesc))
[docs]def parseStartCommandLine():
import argparse
parser = argparse.ArgumentParser(description="Write a template"
" q.rd for a certain data type")
parser.add_argument("protocol", metavar="PROTO", action="store",
type=str, help="Generate an RD template for PROTO; use list to"
" see what is available.")
args = parser.parse_args()
if args.protocol=="list":
_listProtocols()
sys.exit(0)
return args
[docs]class TemplateMacroPackage(macros.MacroPackage):
"""Macros for RD templates.
"""
[docs] def macro_tpldesc(self, description):
"""A silent macro used for self-documentation.
"""
return ""
[docs] def macro_now(self):
"""returns an ISO representation of just about now UTC.
"""
return utils.formatISODT(datetime.datetime.utcnow())
[docs] def macro_resdir(self):
"""returns the last element of the current path.
This is assumed to be the intended resource directory.
"""
return os.path.split(os.getcwd())[-1]
[docs]class MkrdMacroProcessor(macros.MacroExpander):
"""a macro expander for RD templates.
"""
def __init__(self):
macros.MacroExpander.__init__(self, TemplateMacroPackage())
[docs]def start():
args = parseStartCommandLine()
outputName = "q.rd"
try:
source = pkg_resources.resource_stream('gavo',
"resources/src/template-%s.rd_"%args.protocol)
except IOError:
base.ui.notifyError("No template for %s."%args.protocol)
sys.exit(1)
if os.path.exists(outputName):
base.ui.notifyError(
"Output %s already exists. Move it away and try again."%outputName)
sys.exit(1)
else:
proc = MkrdMacroProcessor()
rdSource = proc.expand(source.read().decode("utf-8"))
with open(outputName, "wb") as dest:
dest.write(rdSource.encode("utf-8"))
source.close()