"""
Common items used by resource definition objects.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import os
import re
import urllib.parse
from gavo import base
from gavo import utils
# The following is a flag for initdachs (and initdachs exclusively)
# to prevent resource metadata reading from database tables during
# the dachs init.
_BOOTSTRAPPING = False
[docs]class RDAttribute(base.AttributeDef):
"""an attribute that gives access to the current rd.
The attribute is always called rd. There is no default, but on
the first access, we look for an ancestor with an rd attribute and
use that if it exists, otherwise rd will be None. There currently
is no way to reset the rd.
These attributes cannot (yet) be fed, so rd="xxx" won't work.
If we need this, the literal would probably be an id.
"""
computed_ = True
typeDesc_ = "reference to a resource descriptor"
def __init__(self):
base.AttributeDef.__init__(self, "rd", None, "The parent"
" resource descriptor; never set this manually, the value will"
" be filled in by the software.")
[docs] def iterParentMethods(self):
def _getRD(self):
if getattr(self, "parent", None) is None:
# not yet adopted, we may want to try again later
return None
try:
return self.__rd
except AttributeError:
parent = self.parent
while parent is not None:
if hasattr(parent, "rd") and parent.rd is not None:
self.__rd = parent.rd
break
parent = parent.parent
else: # a parent hasn't been adopted yet, try again later.
return None
return self.__rd
yield ("rd", property(_getRD))
def getFullId(self):
if self.rd is None:
return self.id
return "%s#%s"%(self.rd.sourceId, self.id)
yield ("getFullId", getFullId)
[docs] def makeUserDoc(self):
return None # don't mention it in docs -- users can't and mustn't set it.
[docs]class ResdirRelativeAttribute(base.FunctionRelativePathAttribute):
"""is a path that is interpreted relative to the current RD's resdir.
The parent needs an RDAttribute.
"""
def __init__(self, name, default=None, description="Undocumented", **kwargs):
base.FunctionRelativePathAttribute.__init__(self, name,
baseFunction=self.getResdir,
default=default, description=description, **kwargs)
[docs] def getResdir(self, instance):
if instance.rd is None:
# we don't have a parent yet, but someone wants to see our
# value. This can happen if an element is validated before
# it is adopted (which we probably should forbid). Here, we
# hack around it and hope nobody trips over it
return None
return instance.rd.resdir
[docs]class ProfileListAttribute(base.AtomicAttribute):
"""An attribute containing a comma separated list of profile names.
There's the special role name "defaults" for whatever default this
profile list was constructed with.
"""
typeDesc_ = "Comma separated list of profile names"
def __init__(self, name, default, description, defaultSource):
base.AtomicAttribute.__init__(self, name, base.Computed, description)
self.realDefault = default
self.defaultSource_ = defaultSource
@property
def default_(self):
return self.realDefault.copy()
[docs] def parse(self, value):
pNames = set()
for pName in value.split(","):
pName = pName.strip()
if not pName:
continue
if pName=="defaults":
pNames = pNames|self.default_
else:
pNames.add(pName)
return pNames
[docs] def unparse(self, value):
# It would be nice to reconstruct "defaults" here, but right now it's
# certainly not worth the effort.
return ", ".join(value)
[docs] def makeUserDoc(self):
return (f"**{self.name_}** ({self.typeDesc_}, defaults to"
f" {self.defaultSource_} from gavorc) -- {self.description_}")
[docs]class PrivilegesMixin(base.StructCallbacks):
"""A mixin for structures declaring access to database objects (tables,
schemas).
Access is managed on the level of database profiles. Thus, the names
here are not directly role names in the database.
We have two types of privileges: "All" means at least read and write,
and "Read" meaning at least read and lookup.
"""
_readProfiles = ProfileListAttribute("readProfiles",
default=base.getConfig("db", "queryProfiles"),
description="A (comma separated) list of profile names through"
" which the object can be read.",
defaultSource="[db]queryProfiles")
_allProfiles = ProfileListAttribute("allProfiles",
default=base.getConfig("db", "maintainers"),
description="A (comma separated) list of profile names through"
" which the object can be written or administred.",
defaultSource="[db]maintainers")
[docs]class Registration(base.Structure, base.MetaMixin):
"""A request for registration of a data or table item.
This is much like publish for services, just for data and tables;
since they have no renderers, you can only have one register element
per such element.
Data registrations may refer to published services that make their
data available.
"""
name_ = "publish"
docName_ = "publish (data)"
aliases = ["register"]
_sets = base.StringSetAttribute("sets", default=frozenset(["ivo_managed"]),
description="A comma-separated list of sets this data will be"
" published in. To publish data to the VO registry, just"
" say ivo_managed here. Other sets probably don't make much"
" sense right now. ivo_managed also is the default.")
_servedThrough = base.ReferenceListAttribute("services",
description="A DC-internal reference to a service that lets users"
" query that within the data collection; tables with adql=True"
" are automatically declared as isServiceFor the TAP service.")
# the following attribute is for compatibility with service.Publication
# in case someone manages to pass such a publication to the capability
# builder.
auxiliary = True
def _completeMetadataFromResRow(self, resRow):
"""fiddles publication dates from a dc.resources row for the parent
table or data item.
(see rscdef.rdjinj for where this comes from).
"""
if resRow.get("rectimestamp"):
self.parent.setMeta("_metadataUpdated", resRow["rectimestamp"])
# we ignore dateupdated here, assuming that the info coming from
# the RD is more current.
[docs] def completeElement(self, ctx):
if (ctx is not None
and self.id
and self.parent.rd is not None):
self._completeMetadataFromResRow(
ctx.getInjected("resprop:%s#%s"%(
self.parent.rd.sourceId, self.parent.id), {}))
super().completeElement(ctx)
[docs] def publishedForADQL(self):
"""returns true if at least one table published is available for
TAP/ADQL.
"""
if getattr(self.parent, "adql", False):
# single table
return True
for t in getattr(self.parent, "iterTableDefs", lambda: [])():
# data item with multiple tables
if t.adql:
return True
return False
[docs] def register(self):
"""adds isServiceFor and isServedBy metadata to data, service pairs
in this registration.
"""
if self.publishedForADQL():
tapSvc = base.caches.getRD("//tap").getById("run")
if not tapSvc in self.services:
self.services.append(tapSvc)
for srv in self.services:
srv.declareServes(self.parent)
[docs]class ColumnList(list):
"""A list of column.Columns (or derived classes) that takes
care that no duplicates (in name) occur.
If you add a field with the same dest to a ColumnList, the previous
instance will be overwritten. The idea is that you can override
ColumnList in, e.g., interfaces later on.
Also, two ColumnLists are considered equal if they contain the
same names.
After construction, you should set the withinId attribute to
something that will help make sense of error messages.
"""
def __init__(self, *args):
list.__init__(self, *args)
self.redoIndex()
self.withinId = "unnamed table"
def __contains__(self, fieldName):
return fieldName in self.nameIndex
def __eq__(self, other):
if isinstance(other, ColumnList):
myFields = set([f.name for f in self
if f.name not in self.internallyUsedFields])
otherFields = set([f.name for f in other
if f.name not in self.internallyUsedFields])
return myFields==otherFields
elif other==[] and len(self)==0:
return True
return False
[docs] def redoIndex(self):
"""creates a mapping of names to list indexes.
You must call this when you dare to munge this manually (which you
shouldn't).
"""
self.nameIndex = dict([(c.name, ct) for ct, c in enumerate(self)])
[docs] def deepcopy(self, newParent):
"""returns a deep copy of self.
This means that all child structures are being copied. In that
process, they receive a new parent, which is why you need to
pass one in.
"""
return self.__class__([c.copy(newParent) for c in self])
[docs] def getIdIndex(self):
try:
return self.__idIndex
except AttributeError:
self.__idIndex = dict((c.id, c) for c in self if c.id is not None)
return self.__idIndex
[docs] def append(self, item):
"""adds the Column item to the data field list.
It will overwrite a Column of the same name if such a thing is already
in the list. Indices are updated.
"""
key = item.name
if key in self.nameIndex:
nameInd = self.nameIndex[key]
assert self[nameInd].name==key, \
"Someone tampered with ColumnList"
self[nameInd] = item
else:
self.nameIndex[item.name] = len(self)
list.append(self, item)
[docs] def replace(self, oldCol, newCol):
ind = 0
while True:
if self[ind]==oldCol:
self[ind] = newCol
break
ind += 1
del self.nameIndex[oldCol.name]
self.nameIndex[newCol.name] = ind
[docs] def remove(self, col):
del self.nameIndex[col.name]
list.remove(self, col)
[docs] def extend(self, seq):
for item in seq:
self.append(item)
[docs] def getColumnByName(self, name):
"""returns the column with name.
It will raise a NotFoundError if no such column exists.
"""
try:
return self[self.nameIndex[name]]
except KeyError:
try:
return self[self.nameIndex[utils.QuotedName(name)]]
except KeyError:
raise base.NotFoundError(name, what="column", within=self.withinId)
[docs] def getColumnById(self, id):
"""returns the column with id.
It will raise a NotFoundError if no such column exists.
"""
try:
return self.getIdIndex()[id]
except KeyError:
raise base.NotFoundError(id, what="column", within=self.withinId)
[docs] def getColumnByUtype(self, utype):
"""returns the column having utype.
This should be unique, but this method does not check for uniqueness.
"""
utype = utype.lower()
for item in self:
if item.utype and item.utype.lower()==utype:
return item
raise base.NotFoundError(utype, what="column with utype",
within=self.withinId)
[docs] def getColumnsByUCD(self, ucd):
"""returns all columns having ucd.
"""
return [item for item in self if item.ucd==ucd]
[docs] def getColumnByUCD(self, ucd):
"""returns the single, unique column having ucd.
It raises a StructureError if there is no such column or more than one.
"""
cols = self.getColumnsByUCD(ucd)
if len(cols)==1:
return cols[0]
elif cols:
raise base.StructureError("More than one column for %s"%ucd)
else:
raise base.StructureError("No column for %s"%ucd)
[docs] def getColumnByUCDs(self, *ucds):
"""returns the single, unique column having one of ucds.
This method has a confusing interface. It sole function is to
help when there are multiple possible UCDs that may be interesting
(like pos.eq.ra;meta.main and POS_EQ_RA_MAIN). It should only be
used for such cases.
"""
for ucd in ucds:
try:
return self.getColumnByUCD(ucd)
except base.StructureError:
# probably just no column with this UCD, try next
pass
raise base.StructureError("No unique column for any of %s"%", ".join(ucds))
[docs]class ColumnListAttribute(base.StructListAttribute):
"""An adapter from a ColumnList to a structure attribute.
"""
@property
def default_(self):
return ColumnList()
[docs] def getCopy(self, instance, newParent, ctx):
return ColumnList(base.StructListAttribute.getCopy(self,
instance, newParent, ctx))
[docs] def replace(self, instance, oldStruct, newStruct):
if oldStruct.name!=newStruct.name:
raise base.StructureError("Can only replace fields of the same"
" name in a ColumnList")
getattr(instance, self.name_).append(newStruct)
[docs] def feedObject(self, instance, obj):
# we really want columns and params to have the proper parents
if isinstance(obj, list):
for child in obj:
self.feedObject(instance, child)
else:
if obj.parent and obj.parent is not instance:
obj = obj.copy(instance)
super().feedObject(instance, obj)
[docs]class NamePathAttribute(base.AtomicAttribute):
"""defines an attribute NamePath used for resolution of "original"
attributes.
The NamePathAttribute provides a resolveName method as expected
by base.OriginalAttribute.
"""
typeDesc_ = "id reference"
def __init__(self, **kwargs):
if "description" not in kwargs:
kwargs["description"] = ("Reference to an element tried to"
" satisfy requests for names in id references of this"
" element's children.")
base.AtomicAttribute.__init__(self, name="namePath", **kwargs)
[docs] def iterParentMethods(self):
def resolveName(instance, context, id):
if hasattr(instance, "parentTable"):
try:
return base.resolveNameBased(instance.parentTable, id)
except base.NotFoundError:
# try on real name path
pass
if hasattr(instance, "getByName"):
try:
return instance.getByName(id)
except base.NotFoundError:
pass
np = instance.namePath
if np is None and instance.parent:
np = getattr(instance.parent, "namePath", None)
if np is None:
raise base.NotFoundError(id, "Element with name", repr(self),
hint="No namePath here")
res = context.resolveId(np+"."+id)
return res
yield "resolveName", resolveName
[docs] def parse(self, value):
return value
[docs] def unparse(self, value):
return value
_atPattern = re.compile("@(%s)"%utils.identifierPattern.pattern[:-1])
[docs]def replaceProcDefAt(src, dictName="vars"):
"""replaces @<identifier> with <dictName>["<identifier>"] in src.
We do this to support this shortcut in the vicinity of rowmakers (i.e.,
there and in procApps).
"""
return _atPattern.sub(r'%s["\1"]'%dictName, src)
# this is mainly here for lack of a better place. I don't want it in
# base.parsecontext as it needs config, and I don't want it
# in user.common as it might be useful for non-UI stuff.
[docs]def getReferencedElement(refString, forceType=None, **kwargs):
"""returns the element for the DaCHS reference ``refString``.
``refString`` has the form ``rdId[#subRef]``; ``rdId`` can be
filesystem-relative, but the RD referenced must be below ``inputsDir``
anyway.
You can pass a structure class into ``forceType``, and a ``StructureError``
will be raised if what's pointed to by the id isn't of that type.
You should usually use ``base.resolveCrossId`` instead of this from *within*
DaCHS. This is intended for code handling RD ids from users.
This supports further keyword arguments to getRD.
"""
# get the inputs postfix now so we don't pollute the current exception later
try:
cwdInInputs = utils.getRelativePath(os.getcwd(),
base.getConfig("inputsDir"), liberalChars=True)
except ValueError:
# not in inputs
cwdInInputs = None
try:
return base.resolveCrossId(refString, forceType=forceType, **kwargs)
except base.RDNotFound:
if cwdInInputs:
return base.resolveCrossId("%s/%s"%(cwdInInputs, refString),
forceType=forceType)
raise
[docs]@utils.document
def getStandardPubDID(path):
"""returns the standard DaCHS PubDID for ``path``.
The publisher dataset identifier (PubDID) is important in protocols like
SSAP and obscore. If you use this function, the PubDID will be your
authority, the path component ~, and the inputs-relative path of
the input file as the parameter.
``path`` can be relative, in which case it is interpreted relative to
the DaCHS ``inputsDir.``
You *can* define your PubDIDs in a different way, but you'd then need
to provide a custom descriptorGenerator to datalink services (and
might need other tricks). If your data comes from plain files, use
this function.
In a rowmaker, you'll usually use the \\standardPubDID macro.
"""
# Why add inputsDir first and remove it again? Well, I want to keep
# getInputsRelativePath in the loop since it does some validation
# and may, at some point, do more.
if path[0]!="/":
path = os.path.join(base.getConfig("inputsDir"), path)
auth = base.getConfig("ivoa", "authority")
if auth=="x-unregistred":
raise base.ReportableError("You must configure your IVOA authority"
" before creating standard PubDIDs.",
hint="Read up on 'Choosing your authority' in the tutorial to see how"
" to fix this.")
return "ivo://%s/~?%s"%(
auth,
getInputsRelativePath(path, liberalChars=True))
[docs]@utils.document
def getAccrefFromStandardPubDID(pubdid,
authBase="ivo://%s/~?"%base.getConfig("ivoa", "authority")):
"""returns an accref from a standard DaCHS PubDID.
This is basically the inverse of getStandardPubDID. It will raise
NotFound if pubdid "looks like a URI" (implementation detail: has a colon
in the first 10 characters) and does not start with ivo://<authority>/~?.
If it's not a URI, we assume it's a local accref and just return it.
The function does not check if the remaining characters are a valid
accref, much less whether it can be resolved.
authBase's default will reflect you system's settings on your installation,
which probably is not what's given in this documentation.
"""
if ":" not in pubdid[:10]:
return pubdid
if not pubdid.startswith(authBase):
raise base.NotFoundError(pubdid,
"The authority in the dataset identifier",
"the authorities managed here")
return pubdid[len(authBase):]