"""
Functions dealing with compilation and introspection of python and
external code.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from __future__ import annotations
import ast
import atexit
import contextlib
import importlib.util
import io
import itertools
import inspect
import functools
import linecache
import os
import re
import shutil
import sys
import tempfile
import threading
import weakref
from gavo.utils import algotricks
from gavo.utils import misctricks
from gavo.utils import excs
from gavo.utils.dachstypes import (Any, Callable, Dict, Filename,
Generator, Hashable, List, ModuleType, Optional,
Sequence, Set, StrToStrMap, TextIO, Tuple, TypeVar, TYPE_CHECKING)
if TYPE_CHECKING:
T = TypeVar("T")
[docs]def document(origFun: Any):
"""is a decorator that adds a "buildDocsForThis" attribute to its argument.
This attribute is evaluated by documentation generators.
"""
origFun.buildDocsForThis = True
return origFun
[docs]class CachedGetter:
"""A cache for a callable.
This is basically memoization, except that these are supposed
to be singletons; CachedGetters should be used where the
construction of a resource (e.g., a grammar) should be deferred
until it is actually needed to save on startup times.
The resource is created on the first call, all further calls
just return references to the original object.
You can also leave out the getter argument and add an argumentless
method impl computing the value to cache.
Using a CachedGetter also serializes generation, so you can also
use it when getter isn't thread-safe.
At construction, you can pass a f(thing) -> bool in an isAlive
keyword argument. If you do, the function will be called with the
cache before the cache is being returned. If it returns false,
the resource is re-made (no concurrency control is enforced here).
"""
def __init__(self, getter: Callable, *args, **kwargs):
if getter is None:
getter = self.impl
self.cache: Any = None
self.getter = getter
self.isAlive: Optional[Callable[[Any], bool]
] = kwargs.pop("isAlive", None)
self.args, self.kwargs = args, kwargs
self.lock = threading.Lock()
def __call__(self):
if (self.isAlive is not None
and self.cache is not None
and not self.isAlive(self.cache)):
self.cache = None
if self.cache is None:
with self.lock:
# Second and following in already have the cache set and return here
if self.cache is not None:
return self.cache
self.cache = self.getter(*self.args, **self.kwargs)
# If the cache is immortal, do away with the stuff needed
# for its creation
if self.isAlive is None:
del self.args
del self.kwargs
del self.lock
return self.cache
[docs]class DeferredImport:
"""A trivial deferred module loader.
Use this to delay the actual import of a module until it's actually
needed.
It is constructed with a module name (that will be inserted into the
calling module's globals() as a side effect) and some literal code
that, when executed in the caller's global namespace, actually
imports the module, for instance::
utils.DeferredImport("wcs", "from astropy import wcs")
As a service for static code checking, you'll usually want to repeat
the module name, though:
wcs = utils.DeferredImport("wcs", "from astropy import wcs")
"""
loadedModule = None
def __init__(self, moduleName: str, loadingCode:str):
# I now f_back is non-None here, so silence mypy
self.parentGlobals = inspect.currentframe().f_back.f_globals # type: ignore
self.moduleName = moduleName
self.loader = compile(loadingCode, "<modloader-%s>"%id(self), 'exec')
self.parentGlobals[moduleName] = self
def __getattr__(self, name: str):
exec(self.loader, self.parentGlobals)
return getattr(self.parentGlobals[self.moduleName], name)
[docs]class IdManagerMixin:
"""
A mixin for objects requiring unique IDs.
The primary use case is XML generation, where you want stable IDs
for objects, but IDs must be unique over an entire XML file.
The IdManagerMixin provides some methods for doing that:
- makeIdFor(object) -- returns an id for object, or None if makeIdFor has
already been called for that object (i.e., it presumably already is
in the document).
- getIdFor(object) -- returns an id for object if makeIdFor has already
been called before. Otherwise, a NotFoundError is raised
- getOrMakeIdFor(object) -- returns an id for object; if object has
been seen before, it's the same id as before. Identity is by equality
for purposes of dictionaries.
- getForId(id) -- returns the object belonging to an id that has
been handed out before. Raises a NotFoundError for unknown ids.
- cloneFrom(other) -- overwrites the self's id management dictionaries
with those from other. You want this if two id managers must work
on the same document.
"""
__cleanupPat = re.compile("[^A-Za-z0-9_]+")
# Return a proxy instead of raising a KeyError here? We probably do not
# really want to generate xml with forward references, but who knows?
def __getIdMaps(self) -> Tuple[Dict, Dict]:
try:
return self.__objectToId, self.__idsToObject
except AttributeError:
self.__objectToId: Dict[Hashable, str] = {}
self.__idsToObject: Dict[str, Hashable] = {}
return self.__objectToId, self.__idsToObject
def _fixSuggestion(self, suggestion: str, invMap: StrToStrMap
) -> str:
"""returns suggestion with some suffix to make in unique if it not
already is.
"""
for i in itertools.count(start=2):
newId = f"{suggestion}-{i:02d}"
if newId not in invMap:
return newId
return suggestion # notreached
[docs] def cloneFrom(self, other):
"""takes the id management dictionaries from other.
"""
self.__objectToId, self.__idsToObject = other.__getIdMaps()
[docs] def makeIdFor(self, ob: Hashable, suggestion: Optional[str]=None
) -> str:
"""returns a new id for ob (which must be Hashable).
suggestion can be a string giving what id ought to be; the function
will change it as necessary to make it a usable and unique id.
If the object already has an id, the method will raise a ValueError
(since DaCHS 2.8.1; returned None before).
"""
map, invMap = self.__getIdMaps()
if suggestion:
suggestion = self.__cleanupPat.sub("", suggestion)
if id(ob) in map:
raise ValueError(f"{ob} already has the id {id(ob)}")
if suggestion is not None:
if suggestion in invMap:
newId = self._fixSuggestion(suggestion, invMap)
else:
newId = suggestion
else:
newId = intToFunnyWord(id(ob))
# register id(ob) <-> newId map, avoiding refs to ob
map[id(ob)] = newId
try:
invMap[newId] = weakref.proxy(ob)
except TypeError: # something we can't weakref to
invMap[newId] = ob
return newId
[docs] def getIdFor(self, ob: Hashable) -> str:
"""returns the id for ob if it is known.
If ob does not yet have an id, the method raises a NotFoundError.
"""
try:
return self.__getIdMaps()[0][id(ob)]
except KeyError:
raise excs.NotFoundError(repr(ob), what="object",
within="id manager %r"%(self,), hint="Someone asked for the"
" id of an object not managed by the id manager. This usually"
" is a software bug.")
[docs] def getOrMakeIdFor(self,
ob: Hashable, suggestion: Optional[str]=None) -> str:
"""returns the id of the (hashable) ob or creates one if it
does not already have one.
This is the function you would normally use with the id manager.
"""
try:
return self.getIdFor(ob)
except excs.NotFoundError:
return self.makeIdFor(ob, suggestion)
[docs] def getForId(self, id: str) -> Hashable:
"""returns the object associated with id.
This will raise a KeyError for unknown ids.
"""
try:
return self.__getIdMaps()[1][id]
except KeyError:
raise excs.NotFoundError(id, what="id", within="id manager %r"%(self,),
hint="Someone asked for the object belonging to an id that has"
" been generated externally (i.e., not by this id manager). This"
" usually is an internal error of the software.")
[docs]class NullObject(object):
"""A Null object, i.e. one that accepts any method call whatsoever.
This mainly here for use in scaffolding.
"""
def __getattr__(self, name: str):
return self
def __call__(self, *args, **kwargs):
pass
class _CmpType(type):
"""is a metaclass for *classes* that always compare in one way.
"""
# Ok, the class thing is just posing. It's fun anyway.
def __lt__(cls, other):
return cls.cmpRes
def __le__(cls, other):
return cls.cmpRes
def __gt__(cls, other):
return not cls.cmpRes
def __ge__(cls, other):
return not cls.cmpRes
def __eq__(cls, other):
return False
class _Comparer(object, metaclass=_CmpType):
def __init__(self, *args, **kwargs):
raise excs.Error(
"%s classes can't be instantiated."%self.__class__.__name__)
[docs]class Infimum(_Comparer):
"""is a *class* smaller than anything.
This will only work as the first operand.
>>> Infimum<-2333
True
>>> Infimum<""
True
>>> -2333<Infimum
False
"""
cmpRes = True
[docs]class Supremum(_Comparer):
"""is a *class* larger than anything.
This will only work as the first operand.
>>> Supremum>1e300
True
>>> Supremum>""
True
>>> Supremum>None
True
>>> Supremum>Supremum
True
"""
cmpRes = False
[docs]class AllEncompassingSet(set):
"""a set that contains everything.
Ok, so this doesn't exist. Yes, I've read my Russell. You see, this
is a restricted hack for a reason. And even the docstring is
contradictory.
Sort-of. This now works for intersection and containing.
Should this reject union? Also, unfortunately this only works as a
left operand; I don't see how to override whatever set does with
this as a right operand.
>>> s = AllEncompassingSet()
>>> s & set([1,2])
{1, 2}
>>> "gooble" in s
True
>>> s in s
True
>>> s not in s
False
"""
def __init__(self):
set.__init__(self, [])
def __bool__(self):
return True
# I don't want to mimic the complex signature of the superclass
# here, so let's shut mypy up.
def __and__(self, other: Set) -> Set: # type: ignore
return other
[docs] def intersection(self, other: Set) -> Set: # type: ignore
return other
def __contains__(self, el: Any):
return True
[docs]def iterDerivedClasses(baseClass: type, objects: List[Any]):
"""iterates over all subclasses of baseClass in the sequence objects.
"""
for cand in objects:
try:
if issubclass(cand, baseClass) and cand is not baseClass:
yield cand
except TypeError: # issubclass wants a class
pass
[docs]def iterDerivedObjects(baseClass: type, objects: List[Any]):
"""iterates over all instances of baseClass in the sequence objects.
"""
for cand in objects:
if isinstance(cand, baseClass):
yield cand
# The API of buildClassResolver is a bit painful because it
# can actually instantiate the classes. That's why there's an
# Any in the signature; this would be a good place for overload
# (or to separate the two functions).
[docs]def buildClassResolver(
baseClass: type,
objects: List[Any],
instances: bool=False,
key: Callable[[Hashable], Optional[str]]
=lambda obj: getattr(obj, "name", None), default=None
) -> Callable[[str], Any]:
"""returns a function resolving classes deriving from baseClass
in the sequence objects by their names.
This is used to build registries of, for instance, Macros and RowProcessors.
The classes in question have to have a name attribute.
objects would usually be something like globals().values()
If instances is True the function will return instances instead
of classes.
key is a function taking an object and returning the key under which
you will later access it. If this function returns None, the object
will not be entered into the registry.
"""
if instances:
registry: Dict[str, Any] = algotricks.DeferringDict()
else:
registry = {}
for cls in iterDerivedClasses(baseClass, objects):
clsKey = key(cls)
if clsKey is not None:
registry[clsKey] = cls
def resolve(name:str, registry: Dict[str, Any]=registry):
try:
return registry[name]
except KeyError:
if default is not None:
return default
raise
# see mypy bug #2087
resolve.registry = registry # type: ignore
return resolve
_SILENCE_LOCK = threading.RLock()
[docs]@contextlib.contextmanager
def silence(errToo: bool=False) -> Generator:
"""a context manager to temporarily redirect stdout to /dev/null.
This is used to shut up some versions of pyparsing and pyfits that
insist on spewing stuff to stdout from deep within in relatively
normal situations.
Note that this will acquire a lock while things are silenced; this
means that silenced things cannot run concurrently.
"""
with _SILENCE_LOCK:
realstdout = sys.stdout
sys.stdout = devnull()
if errToo:
realstderr = sys.stderr
sys.stderr = sys.stdout
try:
yield
finally:
sys.stdout.close()
sys.stdout = realstdout
if errToo:
sys.stderr = realstderr
[docs]@contextlib.contextmanager
def in_dir(destDir: Filename) -> Generator:
"""executes the controlled block within destDir and then returns
to the previous directory.
Think "within dir". Haha.
"""
owd = os.getcwd()
os.chdir(destDir)
try:
yield owd
finally:
os.chdir(owd)
[docs]@contextlib.contextmanager
def sandbox(
tmpdir: Optional[Filename]=None,
debug: Optional[bool]=False,
extractfunc: Optional[Callable[[Filename], None]]=None):
"""sets up and tears down a sandbox directory within tmpdir.
This is is a context manager. The object returned is the original
path (which allows you to copy stuff from there). The working
directory is the sandbox created while in the controlled block.
If tmpdir is None, the *system* default is used (usually /tmp),
rather than dachs' tmpdir. So, you will usually want to call
this as sandbox(base.getConfig("tempDir"))
This is obviously not thread-safe -- you'll not usually want
to run this in the main server process. Better fork before
running this.
You can pass in a function extractfunc(owd) that is executed in
the sandbox just before teardown. It receives the original working
directory and can, e.g., move files there from the sandbox.
"""
owd = os.getcwd()
wd = tempfile.mkdtemp("sandbox", dir=tmpdir)
os.chdir(wd)
try:
yield owd
finally:
try:
if extractfunc:
extractfunc(owd)
except Exception as ex:
sys.stderr.write("Extraction function failed: %s\n"%ex)
os.chdir(owd)
if not debug:
shutil.rmtree(wd)
def _instrumentForDebugging(func: Callable, funcName: str, src: str
) -> Callable:
"""returns a callable for a callable func with instrumentation for
debugging.
src must be the source of func.
Specifically, when there's an exception when calling func, this will
be logged as a warning.
"""
debugLocals: Dict[str, Any] = {}
embSrc = "\n".join([
"from gavo.utils import excs",
"def compileFunctionDebugWrapper(*args, **kwargs):",
" try:",
" return %s(*args, **kwargs)"%funcName,
" except (excs.ExecutiveAction, AssertionError):",
" raise",
" except:",
' notify("Failing source:\\n%s"%src)',
" raise"])
debugLocals["src"] = src
debugLocals["notify"] = lambda msg: misctricks.sendUIEvent("Warning", msg)
debugLocals[funcName] = func
exec(embSrc+"\n", debugLocals)
return debugLocals["compileFunctionDebugWrapper"]
# What with function attributes, wild lambdas and the linecache,
# it's nigh impossible to satisfy mypy on this:
[docs]def compileFunction(src: str,
funcName: str,
useGlobals: Optional[Dict[str, Any]]=None,
debug: bool=False,
uniqueName: Optional[str]=None):
"""runs src through exec and returns the funcName from the resulting
namespace.
This takes care to preserve src in the line cache so it is available
in tracebacks or in the debugger.
useGlobals can be a namespace; if not passed, the globals of the
utils.codestricks is used.
If debug=True is passed in, additional code is produced to give halfway
useful tracebacks.
uniqueName, if given, is the identifier for the code. If passed in,
no automatic cleanup for the linecache is done under the assumption
that reloads (or whatever) will overwrite the linecache. Otherwise
code is autonumbered (which is not really desirable for user-provided
code, as they won't know where their failing code comes from).
This is typically used to define functions, like this:
>>> resFunc = compileFunction("def f(x): print(x)", "f")
>>> resFunc(1); resFunc("abc")
1
abc
"""
if not hasattr(compileFunction, "autoNumber"):
compileFunction.autoNumber = 0 # type: ignore
doCleanup = False
if uniqueName is None:
uniqueName = "<generated code %s>"%compileFunction.autoNumber # type: ignore
compileFunction.autoNumber += 1 # type: ignore
doCleanup = True
src = src+"\n"
localVars: Dict[str, Any] = {}
if useGlobals is None:
useGlobals = globals()
try:
code = compile(src, uniqueName, 'exec')
exec(code, useGlobals, localVars)
except Exception as ex:
misctricks.sendUIEvent("Warning", "The code that failed to compile was:"
"\n%s"%src)
raise misctricks.logOldExc(excs.BadCode(src, "function", ex))
func = localVars[funcName]
# this makes our compiled lines available to the traceback writer.
# we might want to do sys.excepthook = traceback.print_exception
# somewhere so the post mortem dumper uses this, too. Let's see
# if it's worth the added risk of breaking things.
linecache.cache[uniqueName] = ( # type: ignore
len(src), None, src.split("\n"), uniqueName)
if doCleanup:
func._cleanup = weakref.ref(func,
lambda _, key=uniqueName: linecache and linecache.cache.pop(key, None)) #type: ignore
if debug:
return _instrumentForDebugging(func, funcName, src)
else:
return func
[docs]def ensureExpression(expr: str, errName: str="unknown") -> None:
"""raises a LiteralParserError if expr is not a parseable python expression.
>>> ensureExpression("4+4")
>>> ensureExpression("'/'.join([str(x) for x in range(10)])")
>>> ensureExpression("junk")
"""
try:
tree = ast.parse(expr)
except SyntaxError as msg:
raise misctricks.logOldExc(excs.BadCode(expr, "expression", msg))
# An ast for an expression is a Discard inside at Stmt inside the
# top-level Module
try:
exprNodes = tree.body
if len(exprNodes)!=1:
raise ValueError("Not a single statement")
if not isinstance(exprNodes[0], ast.Expr):
raise ValueError("Not an expression")
except (ValueError, AttributeError) as ex:
raise misctricks.logOldExc(excs.BadCode(expr, "expression", ex))
[docs]def importModule(modName: str) -> ModuleType:
"""imports a module from the module path.
Use this to programmatically import "normal" modules, e.g., dc-internal
ones. It uses python's standard import mechanism and returns the
module object.
We're using exec and python's normal import, so the semantics
should be identical to saying import modName except that the
caller's namespace is not changed.
The function returns the imported module.
"""
# ward against exploits (we're about to use exec): check syntax
if not re.match(r"([A-Za-z_]+)(\.[A-Za-z_]+)*", modName):
raise excs.Error("Invalid name in internal import: %s"%modName)
parts = modName.split(".")
vars: Dict[str, Any] = {}
if len(parts)==1:
exec("import %s"%modName, vars)
else:
exec("from %s import %s"%(".".join(parts[:-1]), parts[-1]), vars)
return vars[parts[-1]]
[docs]@document
def loadPythonModule(fqName: Filename, relativeTo: Optional[Filename]=None
) -> Tuple[ModuleType, Any]:
"""imports fqName and returns the (module, spec).
Do not use this function to import DC-internal modules; this may
mess up singletons since you could bypass python's mechanisms
to prevent multiple imports of the same module.
fqName is a fully qualified path to the module without the .py,
unless relativeTo is given, in which case it is interpreted as a
relative path. This for letting modules in resdir/res import each
other by saying::
mod, _ = api.loadPythonModule("foo", relativeTo=__file__)
The python path is temporarily amended with the path part of the
source module.
If the module is in /var/gavo/inputs/foo/bar/mod.py, Python will know
the module as foo_bar_mod (the last two path components are always added).
This is to keep Python from using the module when someone writes
import mod.
"""
if relativeTo is not None:
fqName = os.path.join(os.path.dirname(relativeTo), fqName)
try:
spec = importlib.util.spec_from_file_location(
"_".join(fqName.split("/")[-3:]),
fqName+".py")
if spec is None:
raise ImportError("Could not locate or open %s"%fqName)
module = importlib.util.module_from_spec(spec)
except ImportError:
# cloak the actual import error; since this probably comes from user
# code, chances are they want to see something else. Let's guess a
# structure error
raise excs.StructureError("Requested module %s not importable."%fqName,
hint="If it exists at all, the import might also fail because"
' of syntax errors or similar. Try python -c "import mod" to get'
' a clue in that case.')
spec.loader.exec_module(module) # type: ignore
return module, spec
[docs]def loadInternalObject(relativeName: str, objectName: str) -> Any:
"""gets a name from an internal module.
relativeName is the python module path (not including "gavo."),
objectName the name of something within the module.
This is used for "manual" registries (grammars, cores,...).
"""
modName = "gavo."+relativeName
module = importModule(modName)
return getattr(module, objectName)
[docs]def memoized(origFun: Callable):
"""a trivial memoizing decorator.
This is a legacy wrapper for functools.lru_cache. Don't use in new
code
"""
return functools.lru_cache()(origFun)
[docs]def iterConsecutivePairs(sequence: Sequence):
"""yields pairs of consecutive items from sequence.
If the last item cannot be paired, it is dropped.
>>> list(iterConsecutivePairs(range(6)))
[(0, 1), (2, 3), (4, 5)]
>>> list(iterConsecutivePairs(range(5)))
[(0, 1), (2, 3)]
"""
iter1, iter2 = iter(sequence), iter(sequence)
next(iter2)
return list(zip(
itertools.islice(iter1, None, None, 2),
itertools.islice(iter2, None, None, 2)))
[docs]def iterRanges(separators: Sequence[T]) -> Generator[Tuple[T, T], None, None]:
"""yields (left, right) pairs for a sequence of separating indexes.
This is when you want to partition a sequence based on cut points.
>>> list(iterRanges(range(6)))
[(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]
"""
i = iter(separators)
left = next(i)
for right in i:
yield left, right
left = right
[docs]def identity(x: Any) -> Any:
return x
[docs]def intToFunnyWord(anInt: int, translation=bytes.maketrans(
b"-0123456789abcdef",
b"zaeiousmnthwblpgd")) -> str:
"""returns a sometimes funny (but unique) byte string from an arbitrary
integer.
>>> intToFunnyWord(3829901938)
'guunwwni'
"""
return (b"%x"%anInt).translate(translation).decode("ascii")
[docs]def addDefaults(dataDict: Dict, defaultDict: Dict) -> None:
"""adds key-value pairs from defaultDict to dataDict if the key is missing
in dataDict.
"""
for key, value in defaultDict.items():
if key not in dataDict:
dataDict[key] = value
[docs]@functools.lru_cache(None)
def devnull() -> TextIO:
"""returns a (string-) writable /dev/null.
This always returns the same object, and to placate resource warnings,
the file will be closed before exiting the program; the close method
of the returned thing is a no-op.
"""
devnull = open("/dev/null", "w")
closeIt = devnull.close
devnull.close = lambda: None # type: ignore
atexit.register(closeIt)
return devnull
[docs]def memoizeOn(
onObject: object,
generatingObject: object,
generatingFunction: Callable,
*args) -> Any:
"""memoizes the result of generatingFunction on onObject and returns it.
This is for caching things that adapt to onObjects; see procdefs
and rowmakers for examples why this is useful.
args is passed to generatingFunction; generatingObject is only
needed for identification.
"""
cacheName = "_cache%s%s"%(generatingObject.__class__.__name__,
str(id(generatingObject)))
if getattr(onObject, cacheName, None) is None:
setattr(onObject, cacheName, generatingFunction(*args))
return getattr(onObject, cacheName)
[docs]def forgetMemoized(ob: object) -> None:
"""clears things memoizeOn-ed on ob or @utils.memoize-ed.
This is sometimes necessary to let the garbage collector free
ob, e.g., when closures have been memoized.
"""
for n in dir(ob):
child = getattr(ob, n)
# this is for @memoized things
if hasattr(child, "_cache"):
child._cache.clear()
# this is for lru_cache-decorated functions
if hasattr(child, "cache_clear"):
child.cache_clear()
# this is for memoizedOn-ed things
if n.startswith("_cache"):
delattr(ob, n)
[docs]def stealVar(varName: str) -> Any:
"""returns the first local variable called varName in the frame stack
above my caller.
This is obviously abominable. This is only used within the DC code where
the author deemed the specification ugly. Ah. Almost.
If no variable with varName is found anywhere in the stack, this
raises a ValueError.
"""
# frame operations are essentially impossible to get through mypy right now.
frame = inspect.currentframe().f_back.f_back # type: ignore
while frame:
if varName in frame.f_locals: # type: ignore
return frame.f_locals[varName] # type: ignore
frame = frame.f_back # type: ignore
raise ValueError("No local %s in the stack"%varName)
[docs]def printFrames() -> None: # types: ignore
"""prints a compact list of frames.
This is an aid for printf debugging.
"""
frame = inspect.currentframe().f_back.f_back #type: ignore
if inspect.getframeinfo(frame)[2]=="getJobsTable": # type: ignore
return
while frame:
print("[%s,%s], [%s]"%inspect.getframeinfo(frame)[:3])
frame = frame.f_back
[docs]def getTracebackAsString() -> str:
import traceback
f = io.StringIO()
traceback.print_exc(file=f)
return f.getvalue()
[docs]class EqualingRE(object):
"""A value that compares equal based on RE matches.
This is a helper mainly for GetHasXPathsTests. Use an instance of
this class to check against an RE rather than a plain string.
>>> EqualingRE("(ab)+") == "ababab"
True
>>> EqualingRE("(ab)+$") == "ababa"
False
>>> EqualingRE("(ab)+$") != "ababa$"
True
>>> "ababa" == EqualingRE("(ab)+$")
False
"""
def __init__(self, pattern: str):
self.pat = re.compile(pattern)
def __eq__(self, other: Any) -> bool:
if isinstance(other, str):
return bool(self.pat.match(other))
elif isinstance(other, EqualingRE):
return id(self)==id(other)
return False
def __hash__(self):
return hash(self.pat)
def __ne__(self, other: Any) -> bool:
return not self.__eq__(other)
def __str__(self):
return "<Pattern %s>"%self.pat.pattern
__repr__ = __str__
[docs]class NocaseString(str):
"""A string that compares case-insensitively.
This is my way to work around the crazy requirement that all kinds of
VO protocol parameters need to be case-insensitive. This won't work
with dictionaries. It will work with cgi.FieldStorage, though, because
it does a linear search.
Normal DaCHS code doesn't need this because of various hacks in
contextgrammar and elsewhere. If you're touching request.args manually,
you'll have to look at this, though.
Case insensitivity is evil. Let's get rid of it and then get rid of
this nasty mess.
>>> NocaseString("aBc")=="abc"
True
>>> "aBc"==NocaseString("abc")
True
>>> NocaseString("axc")=="abc"
False
>>> NocaseString("axc")!="abc"
True
>>> NocaseString("axc")!=NocaseString("abc")
True
"""
def __eq__(self, other: object):
if not hasattr(other, "lower"):
return False
return self.lower()==other.lower() # type: ignore
# we define different list classes depending to help the sqlsupport
# add appropriate casts to NULL (SqlArrayAdapter)
[docs]class bytelist(list): pass
[docs]class intlist(list): pass
[docs]class floatlist(list): pass
[docs]class complexlist(list): pass
[docs]class NS(object):
"""An object that has its kwargs as attributes.
"""
def __init__(self, **kwargs):
for k,v in kwargs.items():
setattr(self, k, v)
if __name__=="__main__": # pragma: no cover
import doctest
doctest.testmod()