"""
Dumping our resources to frictionless data packages (henceforce: datapack)
and loading from them again.
Specifications:
* https://specs.frictionlessdata.io/data-package/
* https://specs.frictionlessdata.io/data-resource/
DaCHS-generated RDs can be recognised by the presence of
a dachs-rd-id key in the global metadata. Also, we will always
write the RD as the first resource; for good measure, we also mark
it by having a dachs-resource-descriptor name.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import glob
import json
import os
import shutil
import subprocess
import zipfile
from typing import Callable, Generator
from gavo import base
from gavo import rscdef
from gavo import rscdesc
from gavo import utils
def _perhaps(d:dict, key:str, val:str) -> None:
if val:
d[key] = val
[docs]def namer(template:str) -> Callable[[], int]:
i = 0
while True:
yield template%i
i += 1
[docs]def iterRDResources(rd:rscdesc.RD) -> Generator[dict, None, None]:
"""yields datapack resource descriptions for the RD and all
ancillary files we can discover.
All path names here are relative to the RD. Anything that is not in the
RD will not be exported (without serious trickery, that is).
"""
resdir = os.path.normpath(rd.resdir)
def cleanPath(path):
return utils.getRelativePath(path, resdir, liberalChars=True)
# the RD itself
yield {
"name": "dachs-resource-descriptor",
"path": cleanPath(rd.srcPath),
"profile": "data-resource",
"title": "The DaCHS resource descriptor",
"mediatype": "text/xml",
}
# a README, if it's there
if os.path.exists(rd.getAbsPath("README")):
yield {
"name": "readme",
"path": "README",
"profile": "data-resource",
"mediatype": "text/plain",
}
# Manually added files
for res in iterExtraResources(rd, cleanPath):
yield res
# extra instrumentation in getDRForDump creates the filesLoaded
# attribute, which we use to add ancillary files used by the RD
# TODO: catch a few more and add them in getRDForDump
makeName = namer("rdaux-%02d")
# filesLoaded are uniquefied because, e.g., multiple data
# elements might use the same custom grammar.
for extra in sorted(set(rd.filesLoaded)):
fullPath = os.path.join(rd.resdir, extra)
# hack: external python modules drop the .py in the attribute value
if not os.path.exists(fullPath) and os.path.exists(fullPath+".py"):
extra = extra+".py"
yield {
"name": next(makeName),
"profile": "data-resource",
"path": extra,
}
# files imported
makeName = namer("data-%05d")
for dd in rd.dds:
if dd.sources and dd.sources.ignoredSources.ignoresVaryingStuff():
base.ui.notifyWarning(
"data %s#%s ignored because of dynamic ignoreSources"%(
rd.sourceId, dd.id))
continue
for source in dd.iterSources():
if isinstance(source, str) and os.path.exists(source):
yield {
"name": next(makeName),
"profile": "data-resource",
"path": cleanPath(source)}
else:
# it's probably some artificial source token; don't even
# bother to report anything looks weird.
pass
[docs]def makeDescriptor(rd:rscdesc.RD) -> dict:
"""returns a datapack descriptor in a python dictionary.
"""
desc = makeBasicMeta(rd)
desc["resources"] = list(iterRDResources(rd))
return desc
[docs]def getRDForDump(rdId:str) -> rscdesc.RD:
"""loads an RD for later dumping.
The main thing this does is instrument ResdirRelativeAttribute
(and possibly later other things) to record what ancillary
data the RD has loaded.
This is, of course, not thread-safe or anything, and it could collect
false positives when RDs reference or include other RDs.
Only use it while making datapacks.
"""
origParse = rscdef.ResdirRelativeAttribute.parse
filesLoaded = []
def record(self, val):
filesLoaded.append(val)
return origParse(self, val)
try:
rscdef.ResdirRelativeAttribute.parse = record
rd = rscdesc.getRD(rdId)
rd.filesLoaded = filesLoaded
finally:
rscdef.ResdirRelativeAttribute.parse = origParse
return rd
[docs]def dumpPackage(rdId:str, destFile) -> None:
"""write a zip of the complete data package for a resource descriptor
to destFile.
destFile an be anything that zip.ZipFile accepts in w mode.
"""
with zipfile.ZipFile(destFile, "w") as dest:
rd = getRDForDump(rdId)
descriptor = makeDescriptor(rd)
dest.writestr("datapackage.json", json.dumps(descriptor))
for rsc in descriptor["resources"]:
dest.write(
os.path.join(rd.resdir, rsc["path"]),
rsc["path"])
[docs]@utils.exposedFunction([
utils.Arg("id", help="Absolute RD id to produce a datapack for"),
utils.Arg("dest", help="Name of a zip file to dump to")],
help="Produce a frictionless data package for the RD and the data"
" it imports.")
def create(args:list):
with open(args.dest, "wb") as dest:
dumpPackage(args.id, dest)
[docs]@utils.exposedFunction([
utils.Arg("source", help="Name of a DaCHS-produced data package zip file."),
utils.Arg("-t", "--no-test", help="Do not run tests after importing.",
dest="suppressTests", action="store_true"),
utils.Arg("--force", help="If the resdir the package declares already"
" exists, remove it before unpacking (rather than bailing out).",
dest="removeResdir", action="store_true"),],
help="Load and import a data package. This only works for data packages"
" actually produced by DaCHS.")
def load(args:list):
packageMeta = getPackageMeta(args.source)
absSource = os.path.abspath(args.source)
os.chdir(base.getConfig("inputsDir"))
if os.path.exists(packageMeta["resdir"]):
if args.removeResdir:
shutil.rmtree(packageMeta["resdir"])
else:
raise base.ReportableError(
"Refusing to overwrite directory '%s'"%packageMeta["resdir"],
hint="The data package is for a resource living in this resource"
" directory, and it can only run there. However, that directory"
" already exists. Move it away and try again.")
base.makeSharedDir(packageMeta["resdir"])
os.chdir(packageMeta["resdir"])
subprocess.run(["unzip", absSource], check=True)
subprocess.run(["dachs", "imp", packageMeta["rdpath"]], check=True)
if not args.suppressTests:
subprocess.run(["dachs", "test", "-v", packageMeta["rdpath"]])
[docs]def main():
"""does the cli interaction.
"""
args = utils.makeCLIParser(globals()).parse_args()
args.subAction(args)