"""
Various helpers that didn't fit into any other xTricks.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import base64
import collections
import functools
import io
import os
import re
import struct
import time
import threading
import urllib.request as urlrequest
import zlib
from docutils import core as rstcore
from docutils import nodes
from docutils import utils as rstutils
from docutils.parsers.rst import roles
from docutils.parsers.rst import directives
from docutils.parsers.rst.states import Inliner
from gavo.utils import excs
from gavo.utils.dachstypes import (Any, BinaryIO, Callable, Dict,
Filename, Generator, Generic, List, Optional, Sequence, StrOrBytes,
StrToStrMap, Tuple, Type, TypeVar, Union, cast)
BIBCODE_PATTERN = re.compile("[012]\d\d\d\w[^ ]{14}$")
[docs]def couldBeABibcode(s: str) -> bool:
"""returns true if we think that the string s is a bibcode.
This is based on matching against BIBCODE_PATTERN.
"""
return bool(BIBCODE_PATTERN.match(s))
_RSTRoleReturnType = Tuple[List[nodes.Node], List[str]]
_RSTRoleFunction = Callable[[str, str, str, int, Inliner,
Optional[dict], Optional[List[str]]], _RSTRoleReturnType]
[docs]class RSTExtensions:
"""a register for local RST extensions.
This is for both directives and interpreted text roles.
We need these as additional markup in examples; these always
introduce local rst interpreted text roles, which always
add some class to the node in question (modifications are possible).
These classes are then changed to properties as the HTML fragments
from RST translation are processed by the _Example nevow data factory.
To add a new text role, say::
RSTExtensions.addRole(roleName, roleFunc=None)
You can pass in a full role function as discussed in
...docs/howto/rst-roles.html#define-the-role-function
It must, however, add a dachs-ex-<roleName> class to the node. The
default function produces a nodes.emphasis item with the proper class.
In a pinch, you can pass a propertyName argument to addRole if the
desired property name is distinct from the role name in the RST.
This is used by tapquery and taprole since we didn't want to change
our examples when the standard changed.
To add a directive, say::
RSTExtensions.addDirective(dirName, dirClass)
In HTML, these classes become properties named like the role name
(except you can again use propertyName in a pinch).
"""
classToProperty: Dict[str, str] = {}
[docs] @classmethod
def addDirective(cls,
name: str,
implementingClass: type,
propertyName: Optional[str]=None) -> None:
directives.register_directive(name, implementingClass)
cls.classToProperty["dachs-ex-"+name] = propertyName or name
[docs] @classmethod
def makeTextRole(cls,
roleName: str,
roleFunc: Optional[_RSTRoleFunction]=None, propertyName=None
) -> None:
"""creates a new text role for roleName.
See class docstring.
"""
if roleFunc is None:
roleFunc = cls._makeDefaultRoleFunc(roleName)
roles.register_local_role(roleName, roleFunc)
cls.classToProperty["dachs-ex-"+roleName] = propertyName or roleName
@classmethod
def _makeDefaultRoleFunc(cls, roleName: str) -> _RSTRoleFunction:
"""returns an RST interpreted text role parser function returning
an emphasis node with a dachs-ex-roleName class.
"""
def roleFunc(name, rawText, text, lineno, inliner,
options={}, content=[]):
node = nodes.emphasis(rawText, text)
node["classes"] = ["dachs-ex-"+roleName]
return [node], []
return roleFunc
# Generally useful RST extensions (for roles useful in examples,
# see examplesrender)
def _bibcodeRoleFunc(name: str, rawText: str, text: str, lineno: int,
inliner: Inliner,
options: Optional[dict]={}, content: Optional[List[str]]=[]
) -> _RSTRoleReturnType:
if not couldBeABibcode(text):
raise ValueError("Probably not a bibcode: '%s'"%text)
node = nodes.reference(rawText, text,
refuri="http://adsabs.harvard.edu/abs/%s"%text)
node["classes"] = ["bibcode-link"]
return [node], []
RSTExtensions.makeTextRole("bibcode", _bibcodeRoleFunc)
del _bibcodeRoleFunc
# RST extensions for documentation writing
_explicitTitleRE = re.compile(r'^(.+?)\s*(?<!\x00)<(.*?)>$', re.DOTALL)
def _dachsdocRoleFunc(name: str, rawText: str, text: str, lineno: int,
inliner: Inliner,
options: Optional[dict]={}, content: Optional[List[str]]=[]
) -> _RSTRoleReturnType:
# inspired by sphinx extlinks
text = rstutils.unescape(text)
mat = _explicitTitleRE.match(text)
if mat:
title, url = mat.groups()
else:
title, url = text.split("/")[-1], text
url = "http://docs.g-vo.org/DaCHS/"+url
return [nodes.reference(title, title, internal=False, refuri=url)
], []
RSTExtensions.makeTextRole("dachsdoc", _dachsdocRoleFunc)
del _dachsdocRoleFunc
def _dachsrefRoleFunc(name: str, rawText: str, text: str, lineno: int,
inliner: Inliner,
options: Optional[dict]={}, content: Optional[List[str]]=[]
) -> _RSTRoleReturnType:
# this will guess a link into the ref documentation
text = rstutils.unescape(text)
fragId = re.sub("[^a-z0-9]+", "-", text.lower())
url = "http://docs.g-vo.org/DaCHS/ref.html#"+fragId
return [nodes.reference(text, text, internal=False, refuri=url)
], []
RSTExtensions.makeTextRole("dachsref", _dachsrefRoleFunc)
del _dachsrefRoleFunc
def _samplerdRoleFunc(name: str, rawText: str, text: str, lineno: int,
inliner: Inliner,
options: Optional[dict]={}, content: Optional[List[str]]=[]
) -> _RSTRoleReturnType:
# this will turn into a link to a file in the GAVO svn
# (usually for RDs)
text = rstutils.unescape(text)
url = "http://svn.ari.uni-heidelberg.de/svn/gavo/hdinputs/"+text+".rd"
return [nodes.reference(text, text, internal=False, refuri=url)
], []
RSTExtensions.makeTextRole("samplerd", _samplerdRoleFunc)
del _samplerdRoleFunc
class _UndefinedType(type):
"""the metaclass for Undefined.
Used internally.
"""
def __str__(cls):
raise excs.StructureError("%s cannot be stringified."%cls.__name__)
def __repr__(cls):
return "<Undefined>"
def __bool__(cls):
return False
[docs]class Undefined(metaclass=_UndefinedType):
"""a sentinel for all kinds of undefined values.
Do not instantiate.
>>> Undefined()
Traceback (most recent call last):
TypeError: Undefined cannot be instantiated.
>>> bool(Undefined)
False
>>> repr(Undefined)
'<Undefined>'
>>> str(Undefined)
Traceback (most recent call last):
gavo.utils.excs.StructureError: Undefined cannot be stringified.
"""
def __init__(self):
raise TypeError("Undefined cannot be instantiated.")
[docs]class RateLimiter:
"""A class that helps limit rates of events.
You construct it with a timeout (in seconds) and then protect things you want
to rate-limit with "if rl.inDeadtime(key): skip". The key is an identifier
for what it is that you want to limit (e.g., the sort of an event, so that
different events can share a rate limiter).
If you have many events that usually need rate limiting, you'd have
to revisit this implementation -- this is really for when rate limiting
is the exception.
"""
def __init__(self, timeout: float):
self.timeout = timeout
self.lastEvents: Dict[str, float] = {}
[docs] def inDeadtime(self, key: str):
now = time.time()
# no reason to have this work in 1970
if self.lastEvents.get(key, 0)+self.timeout>now:
return True
self.lastEvents[key] = now
return False
[docs]@functools.total_ordering
class QuotedName:
"""A string-like thing basically representing SQL delimited identifiers.
This has some features that make handling these relatively painless
in ADQL code.
The most horrible feature is that these hash and compare as their embedded
names, except to other QuotedNamess.
SQL-92, in 5.2, roughly says:
delimited identifiers compare literally with each other,
delimited identifiers compare with regular identifiers after the
latter are all turned to upper case. But since postgres turns everything
to lower case, we do so here, too.
>>> n1, n2, n3 = QuotedName("foo"), QuotedName('foo"l'), QuotedName("foo")
>>> n1==n2,n1==n3,hash(n1)==hash("foo")
(False, True, True)
>>> print(n1, n2)
"foo" "foo""l"
>>> "Foo"<n1, n1>"bar"
(False, True)
>>> QuotedName('7oh-no"+rob').makeIdentifier()
'id7oh2dno222brob'
"""
def __init__(self, name: str):
self.name = name
def __hash__(self) -> int:
return hash(self.name)
def __eq__(self, other: Any) -> bool:
if isinstance(other, QuotedName):
return self.name==other.name
elif isinstance(other, str):
return self.name==other.lower()
else:
return False
def __lt__(self, other: Any) -> bool:
if isinstance(other, QuotedName):
return self.name<other.name
elif isinstance(other, str):
return self.name<other.lower()
else:
return False
def __str__(self) -> str:
return '"%s"'%(self.name.replace('"', '""'))
def __repr__(self) -> str:
return 'QuotedName(%s)'%repr(self.name)
[docs] def isRegularLower(self) -> bool:
return not not re.match("[a-z][a-z0-9_]*$", self.name)
[docs] def lower(self): # service to ADQL name resolution
return self
[docs] def flatten(self) -> str: # ADQL query serialization
return str(self)
[docs] def capitalize(self) -> str: # service for table head and such
return self.name.capitalize()
[docs] def makeIdentifier(self) -> str:
"""returns self as something usable as a SQL regular identifier.
This will be rather unreadable if there's a substantial number
of non-letters in there, and of course there's no absolute
guarantee that doesn't clash with actual identifiers.
This is *not* for SQL serialisation but mainly for generating sqlKey,
where this kind of thing ends up in %(name)s patterns.
"""
id = re.sub("[^a-zA-Z0-9]", lambda mat: "%x"%ord(mat.group(0)), self.name)
if not re.match("[a-zA-Z]", id):
id = "id"+id
return id
def __add__(self, other: str): # for disambiguateColumns
return QuotedName(self.name+other)
_StreamData = TypeVar('_StreamData', bytes, str)
[docs]class StreamBuffer(Generic[_StreamData]):
"""a buffer that takes data in arbitrary chunks and returns
them in chops of chunkSize bytes.
There's a lock in place so you can access add and get from
different threads.
When everything is written, you must all doneWriting.
"""
chunkSize = 50000
def __init__(self, chunkSize: Optional[int]=None, binary: bool=True):
self.buffer: collections.deque = collections.deque()
if chunkSize is not None:
self.chunkSize = chunkSize
self.curSize = 0
self.lock = threading.Lock()
self.finished = False
# annotation problem: understand how to make mypy grok this
self.joiner: _StreamData = b"" if binary else "" # type: ignore
[docs] def add(self, data: _StreamData) -> None:
with self.lock:
self.buffer.append(data)
self.curSize += len(data)
[docs] def get(self, numBytes: Optional[int]=None) -> Optional[_StreamData]:
if numBytes is None:
numBytes = self.chunkSize
if self.curSize<numBytes and not self.finished:
return None
if not self.buffer:
return None
with self.lock:
items, sz = [], 0
# collect items till we've got a chunk
while self.buffer:
item = self.buffer.popleft()
sz += len(item)
self.curSize -= len(item)
items.append(item)
if sz>=numBytes:
break
# make a chunk and push back what we didn't need
chunk = self.joiner.join(items)
leftOver = chunk[numBytes:]
if leftOver:
self.buffer.appendleft(leftOver)
self.curSize += len(leftOver)
chunk = chunk[:numBytes]
return chunk
# XXX TODO: refactor get and getToChar to use as much common code
# as sensible
[docs] def getToChar(self, char: _StreamData) -> Optional[_StreamData]:
"""returns the the buffer up to the first occurrence of char.
If char is not present in the buffer, the function returns None.
"""
with self.lock:
items, sz = [], 0
# collect items till we've got our character
while self.buffer:
item = self.buffer.popleft()
sz += len(item)
self.curSize -= len(item)
items.append(item)
if char in item:
break
else:
# didn't break out of the loop, i.e., no char found.
# items now contains the entire buffer.
self.buffer.clear()
self.buffer.append(self.joiner.join(items))
self.curSize = sz
return None
# char is in the last element of items
items[-1], leftOver = items[-1].split(char, 1)
chunk = self.joiner.join(items)
if leftOver:
self.buffer.appendleft(leftOver)
self.curSize += len(leftOver)
return chunk+char
raise AssertionError("This cannot happen") # pragma: no cover
[docs] def getRest(self) -> _StreamData:
"""returns the entire buffer as far as it is left over.
"""
result = self.joiner.join(self.buffer)
self.buffer = collections.deque()
return result
[docs] def doneWriting(self) -> None:
self.finished = True
_T = TypeVar("_T")
[docs]def grouped(n: int, seq: Sequence[_T]) -> List[List[_T]]:
"""yields items of seq in groups n elements.
If len(seq)%n!=0, the last elements are discarded.
>>> list(grouped(2, range(5)))
[(0, 1), (2, 3)]
>>> list(grouped(3, range(9)))
[(0, 1, 2), (3, 4, 5), (6, 7, 8)]
"""
# annotation problem: understand why mypy doesn't understand this
return list(zip(*([iter(seq)]*n))) # type: ignore
# annotation problem: https://github.com/python/mypy/issues/3737
[docs]def getfirst(args: Dict[str, _T], key: str, default: _T=Undefined) -> _T: # type: ignore
"""returns the first value of key in the web argument-like object args.
args is a dictionary mapping keys to lists of values. If key is present,
the first element of the list is returned; else, or if the list is
empty, default if given. If not, a Validation error for the requested
column is raised.
Finally, if args[key] is neither list nor tuple (in an ininstance
sense), it is returned unchanged.
>>> getfirst({'x': [1,2,3]}, 'x')
1
>>> getfirst({'x': []}, 'x')
Traceback (most recent call last):
gavo.utils.excs.ValidationError: Field x: Missing mandatory parameter x
>>> getfirst({'x': []}, 'y')
Traceback (most recent call last):
gavo.utils.excs.ValidationError: Field y: Missing mandatory parameter y
>>> print(getfirst({'x': []}, 'y', None))
None
>>> getfirst({'x': 'abc'}, 'x')
'abc'
"""
try:
val = args[key]
if isinstance(val, (list, tuple)):
return val[0]
else:
return val
except (KeyError, IndexError):
if default is Undefined:
raise excs.ValidationError("Missing mandatory parameter %s"%key,
colName=key)
return default
[docs]def sendUIEvent(eventName: str, *args) -> None:
"""sends an eventName to the DC event dispatcher.
If no event dispatcher is available, do nothing.
The base.ui object that DaCHS uses for event dispatching
is only available to sub-packages above base. Other code should not
use or need it under normal circumstances, but if it does, it can
use this.
All other code should use ``base.ui.notify<eventName>(*args)`` directly.
"""
try:
from gavo.base import ui
getattr(ui, "notify"+eventName)(*args)
except ImportError:
pass
[docs]def logOldExc(exc: Exception) -> Exception:
"""logs the mutation of the currently handled exception to exc.
This just does a notifyExceptionMutation using sendUIEvent; it should
only be used by code at or below base.
"""
sendUIEvent("ExceptionMutation", exc)
return exc
[docs]def getFortranRec(f: BinaryIO) -> Optional[bytes]:
"""reads a "fortran record" from f and returns the payload.
A "fortran record" comes from an unformatted file and has a
4-byte payload length before and after the payload. Native endianness
is assumed here.
If the two length specs do not match, a ValueError is raised.
Of course, f must be open in binary mode.
"""
try:
startPos: Union[int, str] = f.tell()
except IOError:
startPos = "(stdin)"
rawLength = f.read(4)
if rawLength==b'': # EOF
return None
recLen = struct.unpack("i", rawLength)[0]
data = f.read(recLen)
rawPost = f.read(4)
if not rawPost:
raise ValueError("Record starting at %s has no postamble"%startPos)
postambleLen = struct.unpack("i", rawPost)[0]
if recLen!=postambleLen:
raise ValueError("Record length at record (%d) and did not match"
" postamble declared length (%d) at %s"%(
recLen, postambleLen, startPos))
return data
[docs]def iterFortranRecs(f: BinaryIO, skip: int=0) -> Generator[bytes, None, None]:
"""iterates over the fortran records in f.
For details, see getFortranRec.
"""
while True:
rec = getFortranRec(f)
if rec is None:
break
if skip>0:
skip -= 1
continue
yield rec
[docs]def getWithCache(url: str, cacheDir: Filename, extraHeaders: dict={}) -> bytes:
"""returns the content of url, from a cache if possible.
Of course, you only want to use this if there's some external guarantee
that the resource behind url doesn't change. No expiry mechanism is
present here.
"""
if not os.path.isdir(cacheDir):
os.makedirs(cacheDir)
cacheName = os.path.join(cacheDir, re.sub("[^\w]+", "", url)+".cache")
if os.path.exists(cacheName):
with open(cacheName, "rb") as f:
return f.read()
else:
with urlrequest.urlopen(url) as f:
doc = f.read()
with open(cacheName, "wb") as f:
f.write(doc)
urlrequest.urlcleanup()
return doc
[docs]def rstxToHTMLWithWarning(source: str, **userOverrides) -> Tuple[str, str]:
"""returns HTML and a string with warnings for a piece of ReStructured
text.
source can be a unicode string or a byte string in utf-8.
userOverrides will be added to the overrides argument of docutils'
core.publish_parts.
"""
sourcePath, destinationPath = None, None
if not isinstance(source, str):
source = source.decode("utf-8")
warnAccum = io.StringIO()
overrides = {'input_encoding': 'unicode',
'raw_enabled': True,
'doctitle_xform': None,
'warning_stream': warnAccum,
'initial_header_level': 4}
overrides.update(userOverrides)
parts = rstcore.publish_parts(
source=source+"\n", source_path=sourcePath,
destination_path=destinationPath,
writer_name='html', settings_overrides=overrides)
return parts["fragment"], warnAccum.getvalue()
[docs]def rstxToHTML(source: str, **userOverrides) -> str:
"""returns HTML for a piece of ReStructured text.
source can be a unicode string or a byte string in utf-8.
userOverrides will be added to the overrides argument of docutils'
core.publish_parts.
"""
return rstxToHTMLWithWarning(source, **userOverrides)[0]
[docs]class CaseSemisensitiveDict(dict):
"""A dictionary allowing case-insensitive access to its content.
This is used for DAL renderers which, unfortunately, are supposed
to be case insensitive. Since case insensitivity is at least undesirable
for service-specific keys, we go a semi-insenstitve approach here:
First, we try literal matches, if that does not work, we try matching
against an all-uppercase version.
Name clashes resulting from different names being mapped to the
same normalized version are handled in some random way. Don't do this.
And don't rely on case normalization if at all possible.
Only strings are allowed as keys here. This class is not concerned
with the values.
>>> d = CaseSemisensitiveDict({"a": 1, "A": 2, "b": 3})
>>> d["a"], d["A"], d["b"], d["B"]
(1, 2, 3, 3)
>>> d["B"] = 9; d["b"], d["B"]
(3, 9)
>>> del d["b"]; d["b"], d["B"]
(9, 9)
>>> "B" in d, "b" in d, "u" in d
(True, True, False)
>>> d.pop("a"), list(d.keys())
(1, ['A', 'B'])
"""
def __init__(self, *args, **kwargs):
dict.__init__(self, *args, **kwargs)
self._normCasedCache = None
def __getitem__(self, key: str) -> Any:
try:
return dict.__getitem__(self, key)
except KeyError:
pass # try again with normalized case.
return self._normCased[key.upper()]
def __setitem__(self, key: str, value: Any) -> None:
self._normCasedCache = None
dict.__setitem__(self, key, value)
def __contains__(self, key: object) -> bool:
key = cast(str, key)
return dict.__contains__(self, key) or key.upper() in self._normCased
def __delitem__(self, key: str) -> None:
self.pop(key, None)
[docs] def get(self, key: str, default: Any=None) -> Any:
try:
return self[key]
except KeyError:
return default
[docs] def pop(self, key: str, default: Any=KeyError) -> Any:
try:
return dict.pop(self, key)
except KeyError:
pass # try again with normalized case.
try:
return self._normCased.pop(key.upper())
except KeyError:
if default is not KeyError:
return default
raise
[docs] def copy(self):
return CaseSemisensitiveDict(dict.copy(self))
@property
def _normCased(self) -> Dict[str, Any]:
if self._normCasedCache is None:
self._normCasedCache = dict((k.upper(), v)
for k, v in self.items())
return self._normCasedCache
[docs] @classmethod
def fromDict(cls: Type['CaseSemisensitiveDict'],
aDict: Union[dict, 'CaseSemisensitiveDict']) -> 'CaseSemisensitiveDict':
if isinstance(aDict, CaseSemisensitiveDict):
return aDict
else:
return cls(aDict)
[docs]def getCleanBytes(b: StrOrBytes) -> bytes:
"""returns the bytes b in an ASCII representation.
This is zlib-compressed base64 stuff. b can be a string, too, in which
case it's utf-8 encoded before marshalling.
"""
if isinstance(b, str):
b = b.encode("utf-8")
return base64.b64encode(
zlib.compress(b)).replace(b"\n", b"")
[docs]def getDirtyBytes(b: bytes) -> bytes:
"""returns b decoded and uncompressed.
This is the inverse operation of getCleanBytes. b must be bytes, and
bytes is what you get back.
"""
return zlib.decompress(
base64.b64decode(b))
######################### pyparsing-based key-value lines.
from gavo.utils.parsetricks import (
Word,alphas, QuotedString, Regex, OneOrMore,
pyparsingWhitechars, pyparseString)
def _makeKVLGrammar():
with pyparsingWhitechars(" \t"):
keyword = Word(alphas+"_")("key")
keyword.setName("Keyword")
value = (QuotedString(quoteChar="'", escChar='\\')
| Regex("[^'= \t]*"))("value")
value.setName("Simple value or quoted string")
pair = keyword - "=" - value
pair.setParseAction(lambda s,p,t: (t["key"], t["value"]))
line = OneOrMore(pair)
line.setParseAction(lambda s,p,t: dict(list(t)))
return line
_KVL_GRAMMAR = _makeKVLGrammar()
[docs]def parseKVLine(aString: str):
"""returns a dictionary for a "key-value line".
key-value lines represent string-valued dictionaries
following postgres libpq/dsn (see PQconnectdb docs;
it's keyword=value, whitespace-separated, with
whitespace allowed in values through single quoting,
and backslash-escaping
"""
return pyparseString(_KVL_GRAMMAR, aString, parseAll=True)[0]
_IDENTIFIER_PATTERN = re.compile("[A-Za-z_]+$")
[docs]def makeKVLine(aDict: StrToStrMap) -> str:
"""serializes a dictionary to a key-value line.
See parseKVLine for details.
"""
parts = []
for key, value in aDict.items():
if not _IDENTIFIER_PATTERN.match(key):
raise ValueError("'%s' not allowed as a key in key-value lines"%key)
value = str(value)
if not _IDENTIFIER_PATTERN.match(value):
value = "'%s'"%value.replace("\\", "\\\\"
).replace("'", "\\'")
parts.append("%s=%s"%(key, value))
return " ".join(sorted(parts))
if __name__=="__main__": # pragma: no cover
import doctest
doctest.testmod()