"""
Base classes and common code for grammars.
NOTE: If you add grammars, you have to enter manually them in
rscdef.builtingrammars.GRAMMAR_REGISTRY (we don't want to import all
the mess in this package just to make that).
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import codecs
import io
import re
import os
import select
import subprocess
from gavo import base
from gavo import rscdef
from gavo import utils
from gavo.rscdef import procdef
from gavo.rscdef import rowtriggers
[docs]class REAttribute(base.UnicodeAttribute):
"""is an attribute containing (compiled) RE
"""
[docs] def parse(self, value):
if value is None or not value:
return None
try:
return re.compile(value)
except re.error as msg:
raise base.ui.logOldExc(base.LiteralParseError(self.name_, value,
hint="A python regular expression was expected here. Compile"
" complained: %s"%str(msg)))
[docs] def unparse(self, value):
if value is None:
return ""
else:
return value.pattern
[docs]class Rowfilter(procdef.ProcApp):
"""A generator for rows coming from a grammar.
Rowfilters receive rows (i.e., dictionaries) as yielded by a grammar
under the name row. Additionally, the embedding row iterator is
available under the name rowIter.
Macros are expanded within the embedding grammar.
The procedure definition *must* result in a generator, i.e., there must
be at least one yield; in general, this will typically be a ``yield row``,
but a rowfilter may swallow or create as many rows as desired.
If you forget to have a yield in the rowfilter source, you'll get a
"NoneType is not iterable" error that's a bit hard to understand.
Here, you can only access whatever comes from the grammar. You can
access grammar keys in late parameters as row[key] or, if key is
like an identifier, as @key.
"""
name_ = "rowfilter"
requiredType="rowfilter"
formalArgs = "row, rowIter"
[docs] def getFuncCode(self):
return rscdef.replaceProcDefAt(procdef.ProcApp.getFuncCode(self), "row")
[docs]def compileRowfilter(filters):
"""returns an iterator that "pipes" the rowfilters in filters.
This means that the output of filters[0] is used as arguments to
filters[1] and so on.
If filters is empty, None is returned.
"""
if not filters:
return
iters = [f.compile() for f in filters] #noflake: code gen
src = [
"def iterPipe(row, rowIter):",
" for item0 in iters[0](row, rowIter):"]
for ind in range(1, len(filters)):
src.append("%s for item%d in iters[%d](item%d, rowIter):"%(
" "*ind, ind, ind, ind-1))
src.append("%s yield item%d"%(" "*len(filters), len(filters)-1))
d = locals()
exec("\n".join(src), d)
return d["iterPipe"]
[docs]class SourceFieldApp(rscdef.ProcApp):
"""A procedure application that returns a dictionary added to all
incoming rows.
Use this to programmatically provide information that can be computed
once but that is then added to all rows coming from a single source, usually
a file. This could be useful to add information on the source of a
record or the like.
The code must return a dictionary. The source that is about to be parsed is
passed in as sourceToken. When parsing from files, this simply is the file
name. The data the rows will be delivered to is available as "data", which
is useful for adding or retrieving meta information.
"""
name_ = "sourceFields"
requriedType = "sourceFields"
formalArgs = "sourceToken, data"
[docs]class MapKeys(base.Structure):
"""Mapping of names, specified in long or short forms.
mapKeys is necessary in grammars like keyValueGrammar or fitsProdGrammar.
In these, the source files themselves give key names. Within the GAVO
DC, keys are required to be valid python identifiers (i.e., match
``[A-Za-z\_][A-Za-z\_0-9]*``). If keys coming in do not have this form,
mapping can force proper names.
mapKeys could also be used to make incoming names more suitable for
matching with shell patterns (like in rowmaker idmaps).
"""
name_ = "mapKeys"
_content = base.DataContent(description="Simple mappings in the form"
"<dest>:<src>{,<dest>:<src>}")
_mappings = base.DictAttribute("maps", keyName="dest", description=
"Map source names given in content to the name given in dest.",
itemAttD=base.UnicodeAttribute("map"), inverted=True,
copyable=True)
def _parseShortenedMap(self, literal):
try:
for dest, src in (p.split(":") for p in literal.split(",")):
src = src.strip()
if src not in self.maps:
self.maps[src] = dest.strip()
else:
raise base.StructureError(
"%s clobbers an existing source within the key map."%src)
except ValueError:
raise base.ui.logOldExc(base.LiteralParseError(self.name_, literal,
hint="A key-value enumeration of the format k:v {,k:v}"
" is expected here"))
[docs] def onElementComplete(self):
if self.content_:
self._parseShortenedMap(self.content_)
super().onElementComplete()
[docs] def doMap(self, aDict):
"""returns dict with the keys mapped according to the defined mappings.
"""
if self.maps:
newDict = {}
for k, v in aDict.items():
newDict[self.maps.get(k, k)] = v
return newDict
else:
return aDict
[docs]class RowIterator(object):
"""An object that encapsulates the a source being parsed by a
grammar.
RowIterators are returned by Grammars' parse methods. Iterate
over them to retrieve the rows contained in the source.
You can also call getParameters on them to retrieve document-global
values (e.g., the parameters of a VOTable, a global header of
a FITS table).
The getLocator method should return some string that aids the user
in finding out why something went wrong (file name, line number, etc.)
This default implementation works for when source is a sequence
of dictionaries. You will, in general, want to override
_iteRows and getLocator, plus probably __init__ (to prepare external
resources) and getParameters (if you have them; make sure to update
any parameters you have with self.sourceRow as shown in the default
getParameters implementation).
RowIterators are supposed to be self-destructing, i.e., they should
release any external resources they hold when _iterRows runs out of
items.
_iterRows should arrange for the instance variable recNo to be incremented
by one for each item returned.
"""
notify = True
def __init__(self, grammar, sourceToken, sourceRow=None):
self.grammar, self.sourceToken = grammar, sourceToken
self.sourceRow = sourceRow
self.recNo = 0
def __iter__(self):
if self.notify:
base.ui.notifyNewSource(self.sourceToken)
if hasattr(self, "rowfilter"):
baseIter = self._iterRowsProcessed()
else:
baseIter = self._iterRows()
if self.grammar.ignoreOn:
rowSource = self._filteredIter(baseIter)
else:
rowSource = baseIter
try:
try:
for row in rowSource:
# handle dispatched grammars here, too
if isinstance(row, tuple):
d = row[1]
else:
d = row
if isinstance(d, dict):
# else it could be a sentinel like FLUSH, which we leave alone
if self.sourceRow:
d.update(self.sourceRow)
d["parser_"] = self
yield row
except Exception:
base.ui.notifySourceError()
raise
finally:
self.finalize()
[docs] def finalize(self):
if self.notify:
base.ui.notifySourceFinished()
def _filteredIter(self, baseIter):
for row in baseIter:
if not self.grammar.ignoreOn(row):
yield row
def _iterRowsProcessed(self):
if self.grammar.isDispatching:
for dest, row in self._iterRows():
for procRow in self.rowfilter(row, self):
yield dest, procRow
else:
for row in self._iterRows():
for procRow in self.rowfilter(row, self):
yield procRow
def _iterRows(self):
if False:
yield None
self.grammar = None # don't wait for garbage collection
[docs] def getParameters(self):
res = {"parser_": self}
if self.sourceRow:
res.update(self.sourceRow)
return res
[docs] def getLocator(self):
return "(unknown position -- locator missing)"
[docs]def wrapFileFor(fileobj, desiredMode, enc):
"""wraps or unwraps fileobj so that it matches the open mode desiredMode.
If there's a "b" in desiredMode, this will return fileobj.raw if it's
there. Otherwise, it'll wrap it into a codec.getreader for enc.
"""
# I guess there's no good way to figure out if fileobj is binary or
# text; so let's have a bit of heuristics.
if hasattr(fileobj, "mode"):
foundMode = fileobj.mode
elif isinstance(fileobj, io.BytesIO):
foundMode = "rb"
elif isinstance(fileobj, io.StringIO):
foundMode = "r"
else:
foundMode = "r" if hasattr(fileobj, "raw") else "rb"
if "b" in desiredMode:
if "b" in foundMode:
return fileobj
else:
return fileobj.raw
else:
if "b" in foundMode:
return codecs.getreader(enc)(fileobj)
else:
return fileobj
[docs]class FileRowIterator(RowIterator):
"""is a RowIterator base for RowIterators reading files.
It analyzes the sourceToken to see if it's a string, in which case
it opens it as a file name and leaves the file object in self.inputFile.
Otherwise, it assumes sourceToken already is a file object and binds
it to self.inputFile. It then tries to come up with a sensible designation
for sourceToken.
It also inspects the parent grammar for a gunzip attribute. If it is
present and true, the input file will be unzipped transparently. Don't
add more features like this; preFilter is a lot more flexible.
Classes using this reading binary data will want to set fileMode
to rb. If they don't what's returned is strings.
"""
fileMode = "r"
def __init__(self, grammar, sourceToken, **kwargs):
RowIterator.__init__(self, grammar, sourceToken, **kwargs)
self.curLine = 1
try:
self._openFile()
except IOError as ex:
raise base.ui.logOldExc(
base.SourceParseError("I/O operation failed (%s)"%str(ex),
source=str(sourceToken), location="start"))
def _openFile(self):
preFilter = None
# we'll use at the end to generate a sourceToken for display purposes
parsingFrom = self.sourceToken
if hasattr(parsingFrom, "name"):
# it' probably an open file
parsingFrom = self.sourceToken.name
if not isinstance(parsingFrom, str):
parsingFrom = repr(parsingFrom)
if hasattr(self.grammar, "gunzip") and self.grammar.gunzip:
preFilter = "zcat"
preFilter = preFilter or (
hasattr(self.grammar, "preFilter") and self.grammar.preFilter)
# need to handle preFilter first, as that needs a binary file.
curSrc = self.sourceToken
if preFilter:
if isinstance(curSrc, str):
curSrc = open(curSrc, "rb")
else:
curSrc = curSrc
curSrc = FilteredInputFile(preFilter, curSrc)
if isinstance(curSrc, str):
curSrc = open(curSrc, "rb")
# now curSrc is a binary file. If a normal file mode
# is requested, wrap it into a codec.
if "b" not in self.fileMode:
curSrc = wrapFileFor(
curSrc, self.fileMode, self.grammar.enc or "ascii")
self.inputFile = curSrc
self.sourceToken = parsingFrom
[docs] def finalize(self):
RowIterator.finalize(self)
if hasattr(self.inputFile, "close"):
self.inputFile.close()
[docs]class FileRowAttributes(base.StructCallbacks):
"""A mixin for grammars with FileRowIterators.
This provides some attributes that FileRowIterators interpret, e.g.,
preFilter.
"""
_gunzip = base.BooleanAttribute("gunzip", description="Unzip sources"
" while reading? (Deprecated, use preFilter='zcat')", default=False)
_preFilter = base.UnicodeAttribute("preFilter", description="Shell"
" command to pipe the input through before passing it on to the"
" grammar. Classical examples include zcat or bzcat, but you"
" can commit arbitrary shell atrocities here.",
copyable=True)
[docs] def completeElement(self, ctx):
if ctx.restricted:
if self.preFilter is not None:
raise base.RestrictedElement("preFilter")
super().completeElement(ctx)
[docs]class GrammarMacroMixin(base.StandardMacroMixin):
"""A collection of macros available to rowfilters.
NOTE: All macros should return only one single physical python line,
or they will mess up the calculation of what constructs caused errors.
"""
[docs] def macro_fullDLURL(self, dlService):
r"""returns a python expression giving a link to the full current data
set retrieved through the datalink service.
You would write \fullDLURL{dlsvc} here, and the macro will expand into
something like http://yourserver/currd/dlsvc/dlget?ID=ivo://whatever.
dlService is the id of the datalink service in the current RD.
This is intended for "virtual" data where the dataset is generated
on the fly through datalink.
"""
baseURL = self.rd.getById(dlService).getURL("dlget")
return ("'%%s?ID=%%s'%%(%s,"
" urllib.parse.quote_plus(getStandardPubDID(rowIter.sourceToken)))"%(
repr(baseURL)))
[docs] def macro_standardPreviewPath(self):
"""returns an expression for the standard path for a custom preview.
This consists of resdir, the name of the previewDir property on the
embedding DD, and the flat name of the accref (which this macro
assumes to see in its namespace as accref; this is usually the
case in //products#define, which is where this macro would typically be
used).
As an alternative, there is the splitPreviewPath macro, which does not
mogrify the file name. In particular, do not use standardPreviewPath
when you have more than a few 1e4 files, as it will have all these
files in a single, flat directory, and that can become a chore.
See the introduction to custom previews for details.
"""
constantPrefix = os.path.join(
rscdef.getInputsRelativePath(self.parent.rd.resdir),
self.parent.getProperty("previewDir"))+"/"
return (repr(constantPrefix)
+"+getFlatName(accref)")
[docs] def macro_splitPreviewPath(self, ext):
"""returns an expression for the split standard path for a custom
preview.
As standardPreviewPath, except that the directory hierarchy of the data
files will be reproduced in previews. For ext, you should typically pass
the extension appropriate for the preview (like {.png} or {.jpeg}).
See the introduction to custom previews for details.
"""
constantPrefix = os.path.join(
rscdef.getInputsRelativePath(self.parent.rd.resdir),
self.parent.getProperty("previewDir"))+"/"
return (repr(constantPrefix)
+"+accref+'%s'"%ext)
[docs] def macro_sourceDate(self):
"""returns an expression giving the timestamp of the current source.
"""
return ('datetime.datetime.utcfromtimestamp('
'os.path.getmtime(rowIter.sourceToken))')
[docs] def macro_srcstem(self):
"""returns python code for the stem of the source file currently parsed in a rowmaker.
Example: if you're currently parsing /tmp/foo.bar, the stem is foo.
"""
return 'getFileStem(rowIter.sourceToken)'
[docs] def macro_rootlessPath(self):
"""returns an expression giving the current source's path with
the resource descriptor's root removed.
"""
return ('utils.getRelativePath(rowIter.sourceToken,'
' rowIter.grammar.rd.resdir)')
[docs] def macro_colNames(self, tableRef):
"""returns a comma-separated list of column names for a table reference.
This is convenient if an input file matches the table structure; you
can then simply say things like <reGrammar names="\\\\colName{someTable}"/>.
"""
return ",".join(c.name for c in self.rd.getById(tableRef))
[docs] def macro_property(self, property):
"""returns the value of property on the parent DD.
"""
return self.parent.getProperty(property)
[docs]class Grammar(base.Structure, GrammarMacroMixin):
"""An abstract grammar.
Grammars are configured via their structure parameters. Their
parse(sourceToken) method returns an object that iterates over rawdicts
(dictionaries mapping keys to (typically) strings) that can then be fed
through rowmakers; it also has a method getParameters that returns
global properties of the whole document (like parameters in VOTables;
this will be empty for many kinds of grammars).
RowIterators will return a reference to themselves in the raw dicts in the
parser_ key unless you override their _iterRowsProcessed method (which you
shouldn't). This is used by rowmaker macros.
What exactly sourceToken is is up to the concrete grammar. While
typically it's a file name, it might be a sequence of dictionaries,
a twisted web request, or whatever.
To derive a concrete Grammar, define a RowIterator for your source
and set the rowIterator class attribute to it.
"""
name_ = "grammar"
_encoding = base.UnicodeAttribute("enc", default=None, description=
"Encoding of the source file(s).", copyable=True)
_rowfilters = base.StructListAttribute("rowfilters",
description="Row filters for this grammar.",
childFactory=Rowfilter, copyable=True)
_ignoreOn = base.StructAttribute("ignoreOn", default=None, copyable=True,
description="Conditions for ignoring certain input records. These"
" triggers drop an input record entirely. In modern RDs, prefer"
" rowfilters raising SkipThis.",
childFactory=rowtriggers.IgnoreOn)
_sourceFields = base.StructAttribute("sourceFields", default=None,
copyable=True, description="Code returning a dictionary of values"
" added to all returned rows.", childFactory=SourceFieldApp)
_properties = base.PropertyAttribute(copyable=True)
_original = base.OriginalAttribute()
_rd = rscdef.RDAttribute()
# isDispatching is used by various special grammars to signify the
# grammar returns rowdicts for multiple makers. See those.
# Here, we just fix it to false so clients can rely on the attribute's
# existence.
isDispatching = False
rowIterator = RowIterator
[docs] def getSourceFields(self, sourceToken, data):
"""returns a dict containing user-defined fields to be added to
all results.
"""
if self.sourceFields is None:
return None
if not hasattr(self, "_compiledSourceFields"):
self._compiledSourceFields = self.sourceFields.compile()
return self._compiledSourceFields(sourceToken, data)
[docs] def parse(self, sourceToken, targetData=None):
ri = self.rowIterator(self, sourceToken,
sourceRow=self.getSourceFields(sourceToken, targetData))
if self.rowfilters:
ri.rowfilter = compileRowfilter(self.rowfilters)
return ri
[docs]class NullGrammar(Grammar):
"""A grammar that never returns any rows.
"""
name_ = "nullGrammar"
[docs]class TransparentGrammar(Grammar):
"""A grammar that returns its sourceToken as the row iterator.
This only makes sense in extreme situations and never without custom
code. If you're not sure you need this, you don't want to know about
it.
"""
name_ = "transparentGrammar"
[docs] def parse(self, sourceToken, targetData=None):
return sourceToken