"""
A grammar splitting the input file into lines and lines into records
using REs.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import re
from gavo import base
from gavo.grammars.common import (
Grammar, FileRowIterator, FileRowAttributes, REAttribute)
[docs]class REIterator(FileRowIterator):
"""An iterator based on regular expressions.
"""
chunkSize = 8192
def _iterInRecords(self):
for i in range(self.grammar.topIgnoredLines):
self.inputFile.readline()
self.curLine += 1
curPos = 0
splitPat = self.grammar.recordSep
buffer = ""
while True:
mat = splitPat.search(buffer, curPos)
if not mat: # no match, fetch new stuff.
newStuff = self.inputFile.read(self.chunkSize)
if not newStuff: # file exhausted
break
buffer = buffer[curPos:]+newStuff
curPos = 0
if self.grammar.commentPat:
buffer = self.grammar.commentPat.sub("", buffer)
continue
self.curLine += mat.group().count("\n")
res = buffer[curPos:mat.start()]
if self.grammar.stopPat and self.grammar.stopPat.match(res):
return
yield res.strip()
curPos = mat.end()
self.curLine += res.count("\n")
# yield stuff left if there's something left
res = buffer[curPos:].strip()
if res and not (
self.grammar.stopPat and self.grammar.stopPat.match(res)):
yield res
def _iterRows(self):
for rawRec in self._iterInRecords():
try:
res = self._makeRec(rawRec)
except base.SkipThis:
continue
yield res
self.inputFile.close()
self.grammar = None
def _makeRec(self, inputLine):
if self.grammar.recordCleaner:
cleanMat = self.grammar.recordCleaner.match(inputLine)
if not cleanMat:
raise base.SourceParseError("'%s' does not match cleaner"%inputLine,
source=str(self.sourceToken))
inputLine = " ".join(cleanMat.groups())
if not inputLine.strip():
raise base.SkipThis("Empty line")
fields = self.grammar.fieldSep.split(inputLine)
if not self.grammar.lax and len(fields)!=len(self.grammar.names):
raise base.SourceParseError("%d fields found, expected %d"%(
len(fields), len(self.grammar.names)),
source=self.sourceToken,
location=self.getLocator(),
hint="reGrammars need the same number of input fields in each line,"
" and that number has to match the number of tokens in the names"
" attribute. If that's not true for your input but it still"
" makes sense, add lax='True' to your grammar.")
return dict(list(zip(self.grammar.names, fields)))
[docs] def getLocator(self):
return "line %d"%self.curLine
[docs]class REGrammar(Grammar, FileRowAttributes):
"""A grammar that builds rowdicts from records and fields specified
via REs separating them.
There is also a simple facility for "cleaning up" records. This can be
used to remove standard shell-like comments; use
``recordCleaner="(?:#.*)?(.*)"``.
"""
name_ = "reGrammar"
rowIterator = REIterator
_til = base.IntAttribute("topIgnoredLines", default=0, description=
"Skip this many lines at the top of each source file.",
copyable=True)
_stopPat = REAttribute("stopPat", default=None,
description="Stop parsing when a record *matches* this RE (this"
" is for skipping non-data footers",
copyable=True)
_recordSep = REAttribute("recordSep", default=re.compile("\n"),
description="RE for separating two records in the source.",
copyable=True)
_fieldSep = REAttribute("fieldSep", default=re.compile(r"\s+"),
description="RE for separating two fields in a record.",
copyable=True)
_commentPat = REAttribute("commentPat", default=None,
description="RE inter-record material to be ignored (note: make this"
" match the entire comment, or you'll get random mess from partly-matched"
" comments. Use '(?m)^#.*$' for beginning-of-line hash-comments.",
copyable=True)
_recordCleaner = REAttribute("recordCleaner", default=None,
description="A regular expression matched against each record."
" The matched groups in this RE are joined by blanks and used"
" as the new pattern. This can be used for simple cleaning jobs;"
" However, records not matching recordCleaner are rejected.",
copyable=True)
_names = base.StringListAttribute("names", description=
"Names for the parsed fields, in matching sequence. You can"
r" use macros here, e.g., \\colNames{someTable}.", expand=True,
copyable=True)
_lax = base.BooleanAttribute("lax", description="allow more or less"
" fields in source records than there are names", default=False,
copyable=True)