"""
Writing data as plain text.
Currently, we only do TSV. It would probably be nice to support "formatted
ASCII as well, though that may be a bit tricky given that we do not
really store sane formatting hints for most columns.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import io
import datetime
from gavo import base
from gavo import rsc
from gavo import stc
from gavo import utils
from gavo.formats import common
from gavo.utils import serializers
# A mapper function registry for formats directed at humans
displayMFRegistry = serializers.ValueMapperFactoryRegistry()
registerDisplayMF = displayMFRegistry.registerFactory
def _defaultMapperFactory(colDesc):
def coder(val):
if val is None:
return "N/A"
return str(val)
return coder
registerDisplayMF(_defaultMapperFactory)
floatTypes = set(["real", "float", "double", "double precision"])
def _sfMapperFactory(colDesc):
if colDesc["dbtype"] not in floatTypes:
return
if colDesc["displayHint"].get("sf"):
fmtStr = "%%.%df"%int(colDesc["displayHint"].get("sf"))
def coder(val):
if val is None:
return "N/A"
else:
return fmtStr%val
return coder
registerDisplayMF(_sfMapperFactory)
def _hmsMapperFactory(colDesc):
if colDesc["displayHint"].get("type")!="hms":
return
assert colDesc["unit"]=="deg", "hms display hint only works with deg so far"
colDesc["unit"] = "h:m:s"
sepChar = colDesc["displayHint"].get("sepChar", " ")
sf = int(colDesc["displayHint"].get("sf", 2))
def coder(val):
if val is None:
return "N/A"
else:
return utils.degToHms(val, sepChar, sf)
return coder
registerDisplayMF(_hmsMapperFactory)
def _dmsMapperFactory(colDesc):
if colDesc["displayHint"].get("type")!="dms":
return
colDesc["unit"] = "d:m:s"
sepChar = colDesc["displayHint"].get("sepChar", " ")
sf = int(colDesc["displayHint"].get("sf", 2))
def coder(val):
if val is None:
return "N/A"
return utils.degToDms(val, sepChar, sf)
return coder
registerDisplayMF(_dmsMapperFactory)
def _specUnitMapperFactory(colDesc):
"""returns a factory that converts between spectral units based
on a spectralUnit displayHint.
In contract to unitMapperFactory, this supports non-linear conversions.
"""
spUnit = colDesc["displayHint"].get("spectralUnit")
if spUnit and spUnit!=colDesc["unit"]:
fmtStr = "'%%.%df'"%int(colDesc["displayHint"].get("sf", 2))
expr = base.getSpecExpr(colDesc["unit"], spUnit).format("val")
code = ("def coder(val):\n if val is None:\n return 'N/A'\n"
f" else:\n return {fmtStr}%({expr})")
colDesc["unit"] = spUnit
return utils.compileFunction(code, "coder")
registerDisplayMF(_specUnitMapperFactory)
def _unitMapperFactory(colDesc):
"""returns a factory that converts between units for fields that have
a displayUnit displayHint.
The stuff done here has to be done for all factories handling unit-based
floating point values. Maybe we want to do "decorating" meta-factories?
"""
if colDesc["displayHint"].get("displayUnit") and \
colDesc["displayHint"]["displayUnit"]!=colDesc["unit"]:
try:
factor = base.computeConversionFactor(colDesc["unit"],
colDesc["displayHint"]["displayUnit"])
except base.BadUnit:
# bad unit somewhere; ignore display hint
base.ui.notifyError("Bad unit while computing conversion factor.")
return None
colDesc["unit"] = colDesc["displayHint"]["displayUnit"]
fmtStr = "%%.%df"%int(colDesc["displayHint"].get("sf", 2))
if "[" in colDesc["dbtype"]:
def coder(val):
if val is None:
return "N/A"
return "[%s]"%", ".join("N/A" if item is None else fmtStr%(item*factor)
for item in val)
else:
def coder(val):
return "N/A" if val is None else fmtStr%(val*factor)
return coder
registerDisplayMF(_unitMapperFactory)
def _stringWrapMF(baseMF):
"""returns a factory that that stringifies floats and makes N/A from
Nones coming out of baseMF and passes everything else through.
"""
def factory(colDesc):
handler = baseMF(colDesc)
if colDesc["displayHint"].get("sf", None):
fmtstr = "%%.%df"%int(colDesc["displayHint"]["sf"])
fmtstr = "%s"
if handler:
def realHandler(val):
res = handler(val)
if isinstance(res, float):
return fmtstr%res
else:
if res is None:
return "N/A"
else:
return res
return realHandler
return factory
registerDisplayMF(_stringWrapMF(stc.datetimeMapperFactory))
[docs]def humanDatesFactory(colDesc):
format, unit = {"humanDate": ("%Y-%m-%d %H:%M:%S", ""),
"humanDay": ("%Y-%m-%d", "") }.get(
colDesc["displayHint"].get("type"), (None, None))
if format and colDesc["dbtype"] in ("date", "timestamp"):
colDesc["unit"] = unit
def coder(val):
if val is None:
return "N/A"
else:
colDesc["datatype"], colDesc["arraysize"] = "char", "*"
colDesc["xtype"] = "timestamp"
colDesc["unit"] = ""
try:
return val.strftime(format)
except ValueError: # probably too old a date, fall back to a hack
return val.isoformat()
return coder
registerDisplayMF(humanDatesFactory)
[docs]def humanTimesFactory(colDesc):
if colDesc["displayHint"].get("type")=="humanTime":
sf = int(colDesc["displayHint"].get("sf", 0))
fmtStr = "%%02d:%%02d:%%0%d.%df"%(sf+3, sf)
def coder(val):
if val is None:
return "N/A"
else:
if isinstance(val, (datetime.time, datetime.datetime)):
return fmtStr%(val.hour, val.minute, val.second)
elif isinstance(val, datetime.timedelta):
hours = val.seconds//3600
minutes = (val.seconds-hours*3600)//60
seconds = (val.seconds-hours*3600-minutes*60)+val.microseconds/1e6
return fmtStr%(hours, minutes, seconds)
return coder
registerDisplayMF(humanTimesFactory)
[docs]def jdMapperFactory(colDesc):
"""maps JD, MJD, unix timestamp, and julian year columns to
human-readable datetimes.
"""
if (colDesc["displayHint"].get("type")=="humanDate"
and colDesc["dbtype"] in ("double precision", "real")):
if colDesc["unit"]=="d":
if stc.isMJD(colDesc.original):
converter = stc.mjdToDateTime
else:
converter = stc.jdnToDateTime
elif colDesc["unit"]=="s":
converter = datetime.datetime.utcfromtimestamp
elif colDesc["unit"]=="yr":
converter = stc.jYearToDateTime
else:
return None
def fun(val):
if val is None:
return "N/A"
return utils.formatISODT(converter(val))
colDesc["datatype"], colDesc["arraysize"] = "char", "*"
colDesc["xtype"] = "timestamp"
colDesc["unit"] = ""
return fun
registerDisplayMF(jdMapperFactory)
def _sizeMapperFactory(colDesc):
"""is a factory for formatters for file sizes and similar.
"""
if colDesc["unit"]!="byte":
return
sf = int(colDesc["displayHint"].get("sf", 1))
def coder(val):
if val is None:
return "N/A"
else:
return utils.formatSize(val, sf)
return coder
registerDisplayMF(_sizeMapperFactory)
registerDisplayMF(serializers._pgSphereMapperFactory)
def _makeString(val):
if val is None:
return "N/A"
elif isinstance(val, str):
# a cheaty way of making sure we get hex escapes for non-printable stuff
return repr(val)[1:-1]
elif isinstance(val, bytes):
return repr(val)[2:-1]
elif isinstance(val, (list, tuple)):
return "[%s]"%" ".join("%s"%v for v in val)
return str(val)
[docs]def renderAsColumns(table, target, acquireSamples=False):
"""writes a fixed-column representation of table to target.
"""
if isinstance(table, rsc.Data):
table = table.getPrimaryTable()
sm = base.SerManager(table, acquireSamples=acquireSamples,
mfRegistry=displayMFRegistry)
target.write(utils.bytify(
utils.formatSimpleTable(
(_makeString(s) for s in row) for row in sm.getMappedTuples())))
[docs]def renderAsText(table, target, acquireSamples=True):
"""writes a text (TSV) rendering of table to the file target.
"""
if isinstance(table, rsc.Data):
table = table.getPrimaryTable()
sm = base.SerManager(table, acquireSamples=acquireSamples)
for row in sm.getMappedTuples():
target.write(
utils.bytify("\t".join([_makeString(s) for s in row])+"\n"))
[docs]def getAsText(data):
target = io.BytesIO()
renderAsText(data, target)
return target.getvalue().decode("utf-8")
[docs]def readTSV(inFile):
"""returns a list of tuples for a tab-separated-values file.
Lines starting with # and lines containing only whitespace are ignored.
Whitespace at front and back is stripped.
No checks are done at this point, i.e., the tuples could be of varying
lengths.
"""
data = []
for ln in inFile:
ln = ln.strip()
if not ln or ln.startswith("#"):
continue
data.append(tuple(ln.split("\t")))
return data
# NOTE: This will only serialize the primary table.
common.registerDataWriter("tsv", renderAsText, "text/tab-separated-values",
"Tab separated values", ".tsv")
common.registerDataWriter("txt", renderAsColumns, "text/plain",
"Fixed-column plain text", ".txt")