"""
Structure definition of resource descriptors.
The stuff they are describing is not a resource in the VO sense (whatever
that is) or in the Dublin Core sense, but simply stuff held together
by common metadata. If it's got the same creator, the same base title,
the same keywords, etc., it's described by one RD.
In the DaCHS, a resource descriptor typically sets up a schema in
the database.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import datetime
import os
import pkg_resources
import time
import threading
import weakref
from gavo import base
from gavo import registry
from gavo import rsc
from gavo import rscdef
from gavo import svcs
from gavo import utils
from gavo.rscdef import common
from gavo.rscdef import rdinj
from gavo.rscdef import regtest
from gavo.rscdef import scripting
from gavo.rscdef import executing
[docs]class ResdirAttribute(base.UnicodeAttribute):
"""An attribute representing an RD's resdir.
This is for resource's resdir attribute, which will by default
get the inputsDir prepended. There is the special value "." that
means "the directory containing the RD", which is what people should
do now.
This will only work for RDs.
Note that without that, the resdir defaults to <inputsDir>/<schemaName>.
"""
[docs] def feedObject(self, instance, parsedValue):
setattr(self, f"_original_resdir_{self.name_}", parsedValue)
if parsedValue==".":
parsedValue = "/".join(instance.sourceId.split("/")[:-1])
parsedValue = os.path.join(
base.getConfig("inputsDir"), parsedValue)
super().feedObject(instance, parsedValue)
[docs]class RD(base.Structure, base.ComputedMetaMixin, scripting.ScriptingMixin,
base.StandardMacroMixin, common.PrivilegesMixin):
"""A resource descriptor.
RDs collect all information about how to parse a particular source (like a
collection of FITS images, a catalogue, or whatever), about the database
tables the data ends up in, and the services used to access them.
In DaCHS' RD XML serialisation, they correspond to the root element.
"""
name_ = "resource"
# this is set somewhere below once parsing has proceeded far enough
# such that caching the RD make sense
cacheable = False
# we used to accept scripts in RDs and probably will do so again,
# but for now the semantics are not well-defined and we reject all
# scripts for a while
acceptedScriptTypes = {}
_resdir = ResdirAttribute("resdir",
default=None,
description="Base directory for source files and everything else"
" belonging to the resource. Use a single dot (.) to say 'the"
" directory the RD resides in', which is recommended in modern"
" DaCHS.",
copyable=True)
_schema = base.UnicodeAttribute("schema",
default=base.Undefined,
description="Database schema for tables defined here. Follow the rule"
" 'one schema, one RD' if at all possible. If two RDs share the same"
" schema, the must generate exactly the same permissions for that"
" schema; this means, in particular, that if one has an ADQL-published"
" table, so must the other. In a nutshell: one schema, one RD.",
copyable=True,
callbacks=["_inferResdir"])
_dds = base.StructListAttribute("dds",
childFactory=rscdef.DataDescriptor,
description="Descriptors for the data generated and/or published"
" within this resource.",
copyable=True,
before="outputTables")
_tables = base.StructListAttribute("tables",
childFactory=rscdef.TableDef,
description="A table used or created by this resource",
copyable=True,
before="dds")
_outputTables = base.StructListAttribute("outputTables",
childFactory=svcs.OutputTableDef,
description="Canned output tables for later reference.",
copyable=True)
_rowmakers = base.StructListAttribute("rowmakers",
childFactory=rscdef.RowmakerDef,
description="Transformations for going from grammars to tables."
" If specified in the RD, they must be referenced from make"
" elements to become active.",
copyable=True,
before="dds")
_procDefs = base.StructListAttribute("procDefs",
childFactory=rscdef.ProcDef,
description="Procedure definintions (rowgens, rowmaker apply-s)",
copyable=True, before="rowmakers")
_condDescs = base.StructListAttribute("condDescs",
childFactory=svcs.CondDesc,
description="Global condition descriptors for later reference",
copyable=True,
before="cores")
_resRecs = base.StructListAttribute("resRecs",
childFactory=registry.ResRec,
description="Non-service resources for the IVOA registry. They will"
" be published when gavo publish is run on the RD.")
_services = base.StructListAttribute("services",
childFactory=svcs.Service,
description="Services exposing data from this resource.",
copyable=True)
_macDefs = base.MacDefAttribute(before="tables",
description="User-defined macros available on this RD")
_mixinDefs = base.StructListAttribute("mixdefs",
childFactory=rscdef.MixinDef,
description="Mixin definitions (usually not for users)")
_require = base.ActionAttribute("require",
methodName="importModule",
description="Import the named gavo module (for when you need something"
" registered)")
_cores = base.MultiStructListAttribute("cores",
childFactory=svcs.getCore,
childNames=list(svcs.CORE_REGISTRY.keys()),
description="Cores available in this resource.", copyable=True,
before="services")
_jobs = base.StructListAttribute("jobs",
childFactory=executing.Execute,
description="Jobs to be run while this RD is active.")
_tests = base.StructListAttribute("tests",
childFactory=regtest.RegTestSuite,
description="Suites of regression tests connected to this RD.")
_coverage = base.StructAttribute("coverage",
childFactory=rscdef.Coverage,
default=None,
description="STC coverage of this resource.", copyable=True)
_properties = base.PropertyAttribute()
def __init__(self, srcId, **kwargs):
# RDs never have parents, so contrary to all other structures they
# are constructed with with a srcId instead of a parent. You
# *can* have that None, but such RDs cannot be used to create
# non-temporary tables, services, etc, since the srcId is used
# in the construction of identifiers and such.
self.sourceId = srcId or "temporary"
base.Structure.__init__(self, None, **kwargs)
# The rd attribute is a weakref on self. Always. So, this is the class
# that roots common.RDAttributes
self.rd = weakref.proxy(self)
# real dateUpdated is set by getRD, this is just for RDs created
# on the fly.
self.dateUpdated = datetime.datetime.utcnow()
# if an RD is parsed from a disk file, this gets set to its path
# by getRD below
self.srcPath = None
# this is for modified-since and friends.
self.loadedAt = time.time()
# keep track of RDs depending on us for the registry code
# (only read this)
self.rdDependencies = set()
def __iter__(self):
return iter(self.dds)
def __repr__(self):
return "<resource descriptor for %s>"%self.sourceId
[docs] def validate(self):
if not utils.identifierPattern.match(self.schema):
raise base.StructureError("DaCHS schema attributes must be valid"
" python identifiers")
[docs] def isDirty(self):
"""returns true if the RD on disk has a timestamp newer than
loadedAt.
"""
if isinstance(self.srcPath, PkgResourcePath):
# stuff from the resource package should not change underneath us.
return False
try:
if self.srcPath is not None:
return os.path.getmtime(self.srcPath)>self.loadedAt
except os.error:
# this will usually mean the file went away
return True
return False
[docs] def importModule(self, ctx):
# this is a callback for the require attribute
utils.loadInternalObject(self.require, "__doc__")
[docs] def onElementComplete(self):
for table in self.tables:
self.readProfiles = self.readProfiles | table.readProfiles
table.setMetaParent(self)
self.serviceIndex = {}
for svc in self.services:
self.serviceIndex[svc.id] = svc
svc.setMetaParent(self)
for dd in self.dds:
dd.setMetaParent(self)
if self.resdir and not os.path.isdir(self.resdir):
base.ui.notifyWarning("RD %s: resource directory '%s' does not exist"%(
self.sourceId, self.resdir))
super().onElementComplete()
def _inferResdir(self, value):
# a callback for the schema attribute
if self.resdir is None:
self._resdir.feedObject(self, value)
[docs] def iterDDs(self):
return iter(self.dds)
[docs] def getService(self, id):
return self.serviceIndex.get(id, None)
[docs] def getTableDefById(self, id):
return self.getById(id, rscdef.TableDef)
[docs] def getDataDescById(self, id):
return self.getById(id, rscdef.DataDescriptor)
[docs] def getById(self, id, forceType=None):
try:
res = self.idmap[id]
except KeyError:
raise base.NotFoundError(
id, "Element with id", "RD %s"%(self.sourceId))
if forceType:
base.assertType(id, res, forceType)
return res
[docs] def getRelResdir(self):
"""returns the inputsDir-relative resource directory path.
This never has either a leading or a trailing path.
"""
return utils.getRelativePath(self.resdir, base.getConfig("inputsDir"))
[docs] def getAbsPath(self, relPath):
"""returns the absolute path for a resdir-relative relPath.
"""
return os.path.join(self.resdir, relPath)
[docs] def openRes(self, relPath, mode="rb"):
"""returns a file object for relPath within self's resdir.
Deprecated. This is going to go away, use getAbsPath and a context
manager.
"""
return open(self.getAbsPath(relPath), mode)
def _computeIdmap(self):
res = {}
for child in self.iterChildren():
if hasattr(child, "id"):
res[child.id] = child
return res
[docs] def addDependency(self, rd, prereq):
"""declares that rd needs the RD prereq to properly work.
This is used in the generation of resource records to ensure that, e.g.
registered data have added their served-bys to the service resources.
"""
if rd.sourceId!=prereq.sourceId:
self.rdDependencies.add((rd.sourceId, prereq.sourceId))
[docs] def copy(self, parent):
base.ui.notifyWarning("Copying an RD -- this may not be a good idea")
new = base.Structure.copy(self, parent)
new.idmap = new._computeIdmap()
new.sourceId = self.sourceId
return new
[docs] def invalidate(self):
"""make the RD fail on every attribute read.
See rscdesc._loadRDIntoCache for why we want this.
"""
errMsg = ("Loading of %s failed in another thread; this RD cannot"
" be used here")%self.sourceId
class BrokenClass(object):
"""A class that reacts to all attribute requests with a some exception.
"""
def __getattribute__(self, attributeName):
if attributeName=="__class__":
return BrokenClass
raise base.ReportableError(errMsg)
self.__class__ = BrokenClass
def _meta__metadataUpdated(self):
"""falls back to utcnow in case we don't know the metadata update
time.
Since _metadataUpdated falls back to the file creation time, this
is only reached for file-less RDs. For these, just about now
would be a good guess as to their creation time.
In reality, this shouldn't be reached outside of test code; there,
however, it's important because this is used in required attributes
in OAI-PMH and VOResource.
"""
return utils.formatISODT(datetime.datetime.utcnow())
[docs] def macro_RSTccbysa(self, stuffDesignation):
"""expands to a declaration that stuffDesignation is available under
CC-BY-SA.
This only works in reStructured text (though it's still almost
readable as source).
You'll probably want to use the `//procs#license-cc-by-sa`_ stream
instead of this, as that also sets the rights URI.
"""
return ("%s is licensed under the `Creative Commons Attribution"
" Share-Alike 4.0"
" License <http://creativecommons.org/licenses/by-sa/4.0/>`_\n\n"
".. image:: /static/img/ccbysa.png\n :alt: [CC-BY-SA]\n"
)%stuffDesignation
[docs] def macro_RSTccby(self, stuffDesignation):
"""expands to a declaration that stuffDesignation is available under
CC-BY.
This only works in reStructured text (though it's still almost
readable as source).
You'll probably want to use the `//procs#license-cc-by`_ stream
instead of this, as that also sets the rights URI.
"""
return ("%s is licensed under the `Creative Commons Attribution 4.0"
" License <http://creativecommons.org/licenses/by/4.0/>`_\n\n"
".. image:: /static/img/ccby.png\n :alt: [CC-BY]\n\n"
)%stuffDesignation
[docs] def macro_RSTcc0(self, stuffDesignation):
"""expands to a declaration that stuffDesignation is available under
CC-0.
This only works in reStructured text (though it's still almost
readable as source).
You'll probably want to use the `//procs#license-cc0`_ stream
instead of this, as that also sets the rights URI.
"""
return ("To the extent possible under law, the publisher has"
" waived all copyright and related or neighboring rights to %s."
" For details, see the `Creative Commons CC0 1.0"
" Public Domain dedication"
" <http://creativecommons.org/publicdomain/zero/1.0/>`_. Of course,"
" you should still give proper credit when using this data as"
" required by good scientific practice.\n\n"
".. image:: /static/img/cc0.png\n :alt: [CC0]\n\n"
)%stuffDesignation
[docs]class RDParseContext(base.ParseContext):
"""is a parse context for RDs.
It defines a couple of attributes that structures can ask for (however,
it's good practice not to rely on their presence in case someone wants
to parse XML snippets with a standard parse context, so use
getattr(ctx, "doQueries", True) or somesuch.
"""
def __init__(self, doQueries=True, restricted=False, forRD=None):
self.doQueries = doQueries
base.ParseContext.__init__(self, restricted, forRD)
[docs] @classmethod
def fromContext(cls, ctx, forRD=None):
"""a constructor that makes a context with the parameters taken from
the RDParseContext ctx.
"""
return cls(doQueries=ctx.doQueries, restricted=ctx.restricted,
forRD=forRD)
@property
def failuresAreCacheable(self):
"""returns true if failures produced with this context should
be cached.
This is not the case with restricted parses.
"""
return not self.restricted
[docs]class PkgResourcePath(str):
"""A sentinel class used to mark an RD as coming from pkg_resources.
"""
def __str__(self):
return self
[docs]def canonicalizeRDId(srcId):
"""returns a standard rd id for srcId.
srcId may be a file system path, or it may be an "id". The canonical
basically is "inputs-relative path without .rd extension". Everything
that's not within inputs or doesn't end with .rd is handed through.
// is expanded to __system__/. The path to built-in RDs,
/resources/inputs, is treated analogous to inputsDir.
"""
if srcId.startswith("//"):
srcId = "__system__"+srcId[1:]
# This may see un-normalised path; let's try and follow unix path
# semantics
cleanedPath = []
for segment in srcId.split("/"):
if segment==".":
pass
elif segment=="..":
if cleanedPath:
cleanedPath.pop()
else:
raise ValueError("Too many .. in relative rd id")
else:
cleanedPath.append(segment)
srcId = "/".join(cleanedPath)
for inputsDir in (base.getConfig("inputsDir"), "/resources/inputs"):
if srcId.startswith(inputsDir):
srcId = srcId[len(inputsDir):].lstrip("/")
if srcId.endswith(".rd"):
srcId = srcId[:-3]
return srcId
def _getFilenamesForId(srcId):
"""helps getRDInputStream by iterating over possible files for srcId.
"""
if srcId.startswith("/"):
yield srcId+".rd"
yield srcId
else:
inputsDir = base.getConfig("inputsDir")
yield os.path.join(inputsDir, srcId)+".rd"
yield os.path.join(inputsDir, srcId)
yield "/resources/inputs/%s.rd"%srcId
yield "/resources/inputs/%s"%srcId
[docs]def setRDDateTimes(ctx, rd, inputFile):
"""sets the _dataUpdated and _metadataUpdated meta items on rd.
"""
rd.setMeta("_metadataUpdated",
datetime.datetime.utcfromtimestamp(
utils.fgetmtime(inputFile)))
try:
rd.setMeta("_dataUpdated", ctx.getInjected("_dataUpdated"))
except KeyError:
# no data imported yet, probably
pass
USERCONFIG_RD_PATH = os.path.join(base.getConfig("configDir"), "userconfig")
class _UserConfigFakeRD(object):
"""A fake object that's in the RD cache as "%".
This is used by the id resolvers in parsecontext; this certainly is
of no use as an RD otherwise.
"""
def __init__(self):
pass
def getRealRD(self):
return base.caches.getRD(USERCONFIG_RD_PATH)
def getMeta(self, *args, **kwargs):
return base.caches.getRD(USERCONFIG_RD_PATH).getMeta(*args, **kwargs)
def getById(self, id, forceType=None):
"""returns an item from userconfig.
This first tries to resolve id in gavo/etc/userconfig.rd, then in the
fallback //userconfig.rd.
"""
try:
try:
return base.caches.getRD(
os.path.join(base.getConfig("configDir"), "userconfig.rd")
).getById(id, forceType=forceType)
except base.NotFoundError:
pass
except Exception as msg:
base.ui.notifyError("Bad userconfig: (%s), ignoring it. Run"
" 'dachs val %%' to see actual errors."%repr(msg))
return base.caches.getRD("//userconfig"
).getById(id, forceType=forceType)
except base.NotFoundError:
raise base.NotFoundError(id, "Element with id",
"etc/userconfig.rd")
[docs]def refuseBlacklisted(srcId, rdInputPath):
"""raises an exception if rdInputPath ends with or srcId is equal to
one of the strings in [general]rdblacklist.
"""
for blacklistedName in base.getConfig("rdblacklist"):
if (rdInputPath.endswith(blacklistedName)
or blacklistedName==srcId):
raise utils.Error(
f"RD {srcId} is blacklisted in gavo.rc. Not loading.")
[docs]def getRD(srcId, doQueries=True, restricted=False, useRD=None):
"""returns a ResourceDescriptor for srcId.
srcId is something like an input-relative path; you'll generally
omit the extension (unless it's not the standard .rd).
getRD furnishes the resulting RD with an idmap attribute containing
the mapping from id to object collected by the parse context.
The useRD parameter is for _loadRDIntoCache exclusively and is
used by it internally. It is strictly an ugly implementation detail.
"""
if srcId=='%':
return _UserConfigFakeRD()
if useRD is None:
rd = RD(canonicalizeRDId(srcId))
else:
rd = useRD
srcPath, inputFile = getRDInputStream(rd.sourceId)
try:
refuseBlacklisted(srcId, srcPath)
except utils.Error:
inputFile.close()
raise
# look for a context upstack and get the default parameters from there,
# overriding the parameters.
try:
getRD_context = RDParseContext.fromContext(
utils.stealVar("getRD_context"), forRD=rd.sourceId)
except ValueError:
# no getRD_context variable in the stack
getRD_context = RDParseContext(doQueries=doQueries,
restricted=restricted, forRD=rd.sourceId)
rdinj.injectIntoContext(getRD_context, rd.sourceId)
if not isinstance(srcPath, PkgResourcePath):
srcPath = os.path.abspath(srcPath)
rd.srcPath = getRD_context.srcPath = srcPath
rd.idmap = getRD_context.idmap
try:
rd = base.parseFromStream(rd, inputFile, context=getRD_context)
setRDDateTimes(getRD_context, rd, inputFile)
except Exception as ex:
ex.inFile = srcPath
ex.cacheable = getRD_context.failuresAreCacheable
raise
finally:
inputFile.close()
return rd
# in _CURRENTLY_PARSING, getRD keeps track of what RDs are currently being
# parsed. The keys are the canonical sourceIds, the values are pairs of
# an unfinished RD and RLocks protecting it.
_CURRENTLY_PARSING_LOCK = threading.Lock()
_CURRENTLY_PARSING = {}
import threading
[docs]class CachedException(object):
"""An exception that occurred while parsing an RD.
This will remain in the cache until the underlying RD is changed.
"""
def __init__(self, exception, sourcePath):
self.exception = exception.with_traceback(None)
self.sourcePath = sourcePath
# this can race a bit in that we won't catch saves done between
# we started parsing and we came up with the exception, but
# these are easy to fix by saving again, so we won't bother.
try:
self.loadedAt = os.path.getmtime(self.sourcePath)
except (TypeError, os.error):
# If the file doesn't exist, that state is "as of now"
self.loadedAt = time.time()
[docs] def isDirty(self):
if self.sourcePath is None:
# this can have various reasons, but most likely it's because
# the RD hasn't been there. Since we can't tell if the
# file has appreared in the mean time, we'll have to re-check
return False
if not os.path.exists(self.sourcePath):
# someone has removed the file, kill cache
return True
return os.path.getmtime(self.sourcePath)>self.loadedAt
[docs] def raiseAgain(self):
# XXX TODO: do we want to fix the traceback here?
raise self.exception.with_traceback(None)
def _loadRDIntoCache(canonicalRDId, cacheDict):
"""helps _makeRDCache.
This function contains the locking logic that makes sure multiple
threads can load RDs.
"""
with _CURRENTLY_PARSING_LOCK:
if canonicalRDId in _CURRENTLY_PARSING:
lock, rd = _CURRENTLY_PARSING[canonicalRDId]
justWait = True
else:
lock, rd = threading.RLock(), RD(canonicalRDId)
_CURRENTLY_PARSING[canonicalRDId] = lock, rd
lock.acquire()
justWait = False
if justWait:
# Someone else is already parsing. If it's the current thread,
# go on (lock is an RLock!) so we can resolve self-references
# (as long as they are backward references). All other threads
# just wait for the parsing thread to finish
lock.acquire()
lock.release()
return rd
try:
try:
cacheDict[canonicalRDId] = getRD(canonicalRDId, useRD=rd)
except Exception as ex:
# Importing failed, invalidate the RD (in case other threads still
# see it from _CURRENTLY_PARSING)
if getattr(ex, "cacheable", False):
cacheDict[canonicalRDId] = CachedException(ex,
getattr(rd, "srcPath", None))
rd.invalidate()
raise
finally:
del _CURRENTLY_PARSING[canonicalRDId]
lock.release()
return cacheDict[canonicalRDId]
def _makeRDCache():
"""installs the cache for RDs.
One trick here is to handle "aliasing", i.e. making sure that
you get identical objects regardless of whether you request
__system__/adql.rd, __system__/adql, or //adql.
Then, we're checking for "dirty" RDs (i.e., those that should
be reloaded).
The messiest part is the support for getting RDs in the presence of
threads while still supporting recursive references, though.
"""
# TODO: Maybe unify this again with caches._makeCache? That stuff could
# do with a facility to invalidate cached entries, too.
# But care is necessary to not cache any RD parsed in a nonstandard
# fashion (e.g., in restricted mode). CAREFUL: since getRD indulges
# in variable stealing, explicit checks are necessary.
rdCache = {}
currentlyValidating = set()
def clearRDIfDirty(srcId):
"""clears a cached RD for srcId if it's dirty and it can be
loaded from disk.
If it can't be loaded from disk, the disk file's time stamp
is being set as the change date on the rd, so it's counting as
clean until it's being changed again. This will avoid re-parsing
RDs all the time when they're broken for a while.
"""
if (srcId in rdCache
and getattr(rdCache[srcId], "isDirty", lambda: False)()):
try:
# limit re-check rate to 10 seconds to avoid getRD storms
# on often-used but currently broken RDs
if time.time()-rdCache[srcId].loadedAt<10:
return
if srcId in currentlyValidating:
return
currentlyValidating.add(srcId)
base.ui.notifyWarning("RD dirty, attempting reload of %s"%srcId)
try:
tempCache = {}
_loadRDIntoCache(srcId, tempCache)
finally:
currentlyValidating.remove(srcId)
except Exception as ex:
base.ui.notifyError("Tried to reload RD %s but found it broken."
" Will retry loading when it has been edited."%srcId)
# the source path of the cached exception isn't necessarily
# right (for system and userconfig RDs); but it ought
# to be good enough for what users typcially are confronted with.
if isinstance(rdCache[srcId], CachedException):
rdCache[srcId] = CachedException(ex,
os.path.join(base.getConfig("inputsDir")+srcId+".rd"))
else:
rdCache[srcId].loadedAt = time.time()
return
base.caches.clearForName(srcId)
def getRDCached(srcId, **kwargs):
if kwargs:
return getRD(srcId, **kwargs)
srcId = canonicalizeRDId(srcId)
# Since the "validate before purge last" change it's possible
# that we're currently parsing and in the cache. During the
# validation run, we need to return from the the thing that
# currently parses. Hence, we need to re-do quite a bit of
# the logic of _loadRDIntoCache here. Gnwm.
if srcId in _CURRENTLY_PARSING:
lock, rd = _CURRENTLY_PARSING[srcId]
lock.acquire()
lock.release()
return rd
clearRDIfDirty(srcId)
if srcId in rdCache:
cachedOb = rdCache[srcId]
if isinstance(cachedOb, CachedException):
cachedOb.raiseAgain()
else:
return cachedOb
else:
return _loadRDIntoCache(srcId, rdCache)
getRDCached.cacheCopy = rdCache
base.caches.registerCache("getRD", rdCache, getRDCached)
_makeRDCache()