Source code for gavo.formats.common
"""
Common code for generation of various data formats.
The main function here is formatData. It receives a string format id, a data
instance and a destination file (binary mode). It dispatches this to
formatters previously registered using registerDataWriter.
The data writers must take a data instance and a file instance; their
effect must be that a serialized representation of data, or, if the format
does not support this, the data's primary table is written to the file
instance.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import cgi
import io
import os
import mimetypes
from gavo import base
from gavo import utils
# used in guessMediaType
EXTENSION_FALLBACKS = {
".vot": base.votableType,
".fits": "application/fits",
".fz": "image/fits",
}
# image/fits is probably not quite legal (it's not in Debian's
# /etc/mime.types), but it's too handy to pass up
mimetypes.add_type("image/fits", ".fits")
[docs]class CannotSerializeIn(base.Error):
def __init__(self, format):
self.format = format
base.Error.__init__(self, format,
hint="Either you gave an invalid format id or a known format"
" did not get registered for some reason. Format codes"
" known at this point: %s. You can also try common MIME types"%(
", ".join(FORMATS_REGISTRY.writerRegistry)))
self.args = [format]
def __str__(self):
return "Cannot serialize in '%s'."%self.format
[docs]def getMIMEKey(contentType):
"""makes a DaCHS mime key from a content-type string.
This is used for retrieving matching mime types and is a triple
of major and minor mime type and a set of parameter pairs.
contentType is a string-serialized mime type.
We also normalise everything to lower case. I don't think that's
quite standards-compliant, but with all the other case-insensitivity
nonsense, anything else will get really ugly.
"""
contentType = contentType.lower()
media_type, paramdict = cgi.parse_header(contentType)
try:
major, minor = media_type.split("/")
except (ValueError, TypeError):
raise CannotSerializeIn(contentType)
return (major, minor,
frozenset(iter(paramdict.items())))
[docs]class FORMATS_REGISTRY(object):
"""a registry for data formats that can be produced by DaCHS.
This works by self-registration of the respective modules on their
input; hence, if you want to rely on some entry here, be sure
there's an import somewhere.
"""
# format key -> writer function
writerRegistry = {}
# format key -> mime type
formatToMIME = {}
# format key -> human-readable label
formatToLabel = {}
# (major, minor, param pair set) -> format key
mimeToKey = {}
extensionToKey = utils.CaseSemisensitiveDict()
keyToExtension = {}
# Formats TAPRegExt standard ids have an entry here
keyToTAPId = {}
# main format key to aliases also accepted
keyToAliases = {}
[docs] @classmethod
def registerDataWriter(cls,
key, writer, mainMime, label, extension, *aliases, tapId=None):
"""adds a writer to the formats registry.
Key is a short, unique handle for the format, writer is a writer
function(data, outputFile) -> None (where data can be an rsc.Data
or an rsc.Table instance), mainMime is the preferred media type,
label is a human-readable designation for the format (shown in
selection widgets and the like), extension is a suggested extension
for the format (lower-case only), and aliases are other strings
that can be used to select the format in DALI FORMAT or similar.
Where keys, mainMime, and aliases clash, previous entries are
silently overwritten. For extensions, the first registered format
wins.
"""
cls.writerRegistry[key] = writer
cls.formatToMIME[key] = mainMime
cls.formatToLabel[key] = label
cls.mimeToKey[getMIMEKey(mainMime)] = key
for mime in aliases:
cls.mimeToKey[getMIMEKey(mime)] = key
if extension not in cls.extensionToKey:
cls.extensionToKey[extension] = key
cls.keyToExtension[key] = extension
if tapId is not None:
cls.keyToTAPId[key] = tapId
cls.keyToAliases[key] = list(aliases)
if mainMime!=key:
cls.keyToAliases[key].append(key)
[docs] @classmethod
def getMIMEFor(cls, formatName, orderedFormat=None):
"""returns a simple MIME type for our formatName (some incoming MIME
or an alias).
Some magic, reserved mimes that need to be preserved from
the input are recognised and returned in orderedFormat. This
is for TAP and related DALI hacks.
"""
# TAP Spec, 2.7.1, similar in DALI, wants us to keep some
# media types. It's not quite clear which these actually are,
# but I'd guess something like:
if (orderedFormat
and (orderedFormat.startswith("text/xml")
or orderedFormat.startswith("application/x-votable+xml")
or orderedFormat.startswith("text/plain"))):
return orderedFormat
if formatName in cls.formatToMIME:
return cls.formatToMIME[formatName]
# if it looks like a mime type, return it, otherwise assume it's
# an unimported format and return a generic mime
if "/" in formatName:
return formatName
else:
return "application/octet-stream"
[docs] @classmethod
def getWriterFor(cls, formatName):
"""returns a writer for formatName.
writers are what's registered via registerDataWriter; formatName is
a MIME type or a format alias. This raises CannotSerializeIn
if no writer is available.
"""
return cls.writerRegistry[cls.getKeyFor(formatName)]
[docs] @classmethod
def getLabelFor(cls, formatName):
"""returns a label for formatName (DaCHS key or MIME type).
"""
return cls.formatToLabel[cls.getKeyFor(formatName)]
[docs] @classmethod
def getKeyFor(cls, formatName):
"""returns a DaCHS format key for formatName (DaCHS key or MIME).
If formatName is a mime type with parameters, we'll also try
to get a format with the parameters stripped and silently succeed
if that works.
"""
formatName = formatName.lower()
if formatName in cls.writerRegistry:
return formatName
parsed = getMIMEKey(formatName)
if parsed in cls.mimeToKey:
return cls.mimeToKey[parsed]
parsed = (parsed[0], parsed[1], frozenset())
if parsed in cls.mimeToKey:
return cls.mimeToKey[parsed]
raise CannotSerializeIn(formatName)
[docs] @classmethod
def getAliasesFor(cls, formatName):
"""returns alternate names for a DaCHS format key.
Don't modify what you get back. This will return the DaCHS format
key if it is not the mime itself.
"""
return cls.keyToAliases[formatName]
[docs] @classmethod
def getTAPIdFor(cls, formatName):
"""returns a TAPRegExt ivoid for a DaCHS format key.
This will return None if TAPRegExt does not prescribe such a key.
"""
return cls.keyToTAPId.get(formatName)
[docs] @classmethod
def getTypeForExtension(cls, extension):
"""returns the media type first registered for extension.
extension must begin with a dot. None is returned for extensions
no format has (yet) claimed.
"""
key = cls.extensionToKey.get(extension.lower())
if key is None:
return None
return cls.formatToMIME[key]
[docs] @classmethod
def iterFormats(cls):
"""iterates over the short names of the available formats.
"""
return iter(cls.writerRegistry)
registerDataWriter = FORMATS_REGISTRY.registerDataWriter
getMIMEFor = FORMATS_REGISTRY.getMIMEFor
getKeyFor = FORMATS_REGISTRY.getKeyFor
getWriterFor = FORMATS_REGISTRY.getWriterFor
getLabelFor = FORMATS_REGISTRY.getLabelFor
getAliasesFor = FORMATS_REGISTRY.getAliasesFor
getTAPIdFor = FORMATS_REGISTRY.getTAPIdFor
iterFormats = FORMATS_REGISTRY.iterFormats
[docs]def formatData(
formatName,
table,
outputFile,
acquireSamples=True,
**moreFormatterArgs):
"""writes a table to outputFile in the format given by key.
Table may be a table or a ``Data`` instance. ``formatName`` is a format
shortcut (``formats.iterFormats()`` gives keys available) or a media type.
If you pass None, the default VOTable format will be selected.
This raises a ``CannotSerializeIn`` exception if ``formatName`` is
not recognized. Note that you have to import the serialising modules
from the format package to make the formats available (fitstable,
csvtable, geojson, jsontable, texttable, votable; api itself already
imports the more popular of these).
If a client knows a certain formatter understands additional arguments,
it can hand them in as keywords arguments. This will raise an error
if another formatter that doesn't understand the argument is being used.
"""
if formatName is None:
formatName = base.votableType
getWriterFor(formatName)(
table,
outputFile,
acquireSamples=acquireSamples,
**moreFormatterArgs)
[docs]def getFormatted(formatName, table, acquireSamples=False):
"""returns a string containing a representation of table in the
format given by formatName.
This is just wrapping the `function formatData`_; se there for formatName.
This function will use large amounts of memory for large data.
"""
buffer = io.BytesIO()
formatData(formatName, table, buffer, acquireSamples)
return buffer.getvalue()
[docs]def guessMediaType(fName):
"""returns a media type plausible for a file named fName.
This first uses the extension map inferred by our formats registry,
has some built-in safety catches in case the formatters haven't
been imported, and then falls back to built-in python
mimetypes.guess_type If nothing matches, it returns
application/octet-stream.
Extensions are used case-insensitively. We don't do any encoding
inference (yet). We may, though, so by all means shout if you're using
this in DaCHS-external code.
"""
extension = os.path.splitext(fName)[-1].lower()
res = FORMATS_REGISTRY.getTypeForExtension(extension)
if res is None:
res = EXTENSION_FALLBACKS.get(extension)
if res is None:
res, _ = mimetypes.guess_type(fName)
if res is None:
res = "application/octet-stream"
return res
[docs]def getExtensionFor(mediaType):
"""returns a suggested extension for files of mediaType.
mediaType can be an RFC 2045 media type, or one of DaCHS' internal format
codes.
As a fallback, .dat will be returned.
"""
try:
return FORMATS_REGISTRY.keyToExtension[
FORMATS_REGISTRY.getKeyFor(mediaType)]
except (CannotSerializeIn, KeyError):
return mimetypes.guess_extension(mediaType) or ".dat"