"""
DaCHS supports dump/restore operations on tables or set of tables.
This module implements the underlying file format and some utilities.
The file format itself is a tar.gz file with an index.txt consisting of lines
file_name table-id
followed by the files. Table id is DaCHS's usual rd-id#table-id.
The files contain binary dump material.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import os
import sys
import time
import tarfile
from io import BytesIO
from gavo import base
from gavo import rscdef
from gavo import utils
from gavo.rsc import tables
[docs]def parseIndexFile(srcFile):
"""parses our index.txt file format and returns (member-name, table-id)
tuples.
srcFile is an open binary file, probably from TarFile.extractfile.
If you change the index file format, you'll have to change this
and the corresponding code in createDump.
"""
for line in srcFile:
parts = line.decode("utf-8").strip().split()
assert len(parts)==2
yield parts
[docs]def iterDbTables(objectId, connection):
"""iterates over dbtable objects referenced by objectId.
objectId can reference a table def or an RD (in which case all
onDisk tables from it are returned). Or it can be a table already.
"""
if not isinstance(objectId, str):
# let's believe it's a database table
# (perhaps check that tableId.tableDef is good?)
yield objectId
else:
obj = base.resolveCrossId(objectId)
if isinstance(obj, rscdef.TableDef):
yield tables.TableForDef(obj, connection=connection)
elif hasattr(obj, "tables"): # let's assume it's an RD
for td in obj.tables:
if td.onDisk and not td.viewStatement:
yield tables.TableForDef(td, connection=connection)
else:
raise base.ReportableError("Can only dump by table or RD,"
" but %s is neither"%objectId)
[docs]def getTablesForIds(tableIds, connection):
"""returns a list of validated dbtables of all tableIds.
This will raise an exception if any table id or database table doesn't
exist, or if the on-disk schema doesn't match the definition.
For convenience in internal use, tableIds that already are table
instances will not be touched. That's a bit tricky, though, because
you can have data from different transactions when you do that.
"""
dbtables = []
for tableId in tableIds:
for table in iterDbTables(tableId, connection):
table.ensureOnDiskMatches()
dbtables.append(table)
return dbtables
[docs]def createDump(tableIds, destFile, binary=True):
"""writes a DaCHS dump of tableIds to destFile.
tableIds is a list of rd-id#table-id identifiers (all must resolve),
destFile is a file object opened for writing.
"""
with base.getTableConn() as connection:
toDump = getTablesForIds(tableIds, connection)
destTar = tarfile.open(fileobj=destFile, mode="w:gz")
dumped = []
if binary:
namePattern = "table_%03d.dump"
else:
namePattern = "table_%03d.textdump"
for index, curTable in enumerate(toDump):
try:
dumpedBytes = BytesIO()
curTable.copyOut(dumpedBytes, binary=binary)
dumpedBytes.seek(0)
curInfo = tarfile.TarInfo(namePattern%index)
curInfo.mtime = time.time()
curInfo.size = len(dumpedBytes.getvalue())
destTar.addfile(curInfo, dumpedBytes)
dumped.append((curInfo.name, curTable.tableDef.getFullId()))
except Exception as msg:
base.ui.notifyError("Dumping %s failed: %s"%(
curTable.tableDef.getFullId(),
utils.safe_str(msg)))
indexText = ("\n".join(
"%s %s"%d for d in dumped)).encode("utf-8")
curInfo = tarfile.TarInfo("index.txt")
curInfo.mtime = time.time()
curInfo.size = len(indexText)
destTar.addfile(curInfo, BytesIO(indexText))
destTar.close()
[docs]def iterTableInfos(dumpFile):
"""iterates over table info tuples from an open dump file.
Each tuple has the member name, the table id, a boolean whether
the table definion is accessible, the UTC unix time the dump
was made, and the size of the dump.
"""
tf = tarfile.open(fileobj=dumpFile, mode="r:gz")
for memberName, tableId in parseIndexFile(tf.extractfile("index.txt")):
memberInfo = tf.getmember(memberName)
tdExists = True
try:
base.resolveCrossId(tableId, forceType=rscdef.TableDef)
except base.NotFoundError:
tdExists = False
yield (memberName, tableId, tdExists, memberInfo.mtime, memberInfo.size)
[docs]def restoreDump(dumpFile):
"""restores a dump.
dumpFile is an open file object containing a file created by createDump.
This comprises recrating all mentioned tables, copying in the associated
data, and re-creating all indices.
Each table is handled in a separate transaction, we do not stop if a single
restore has failed.
"""
toDo = list(iterTableInfos(dumpFile))
dumpFile.seek(0)
tf = tarfile.open(fileobj=dumpFile, mode="r:gz")
with base.getWritableAdminConn() as connection:
for memberName, tdId, tdExists, _, _ in toDo:
if not tdExists:
base.ui.notifyWarning("Skipping restore of undefined table %s"%tdId)
continue
try:
table = tables.TableForDef(
base.resolveCrossId(tdId, forceType=rscdef.TableDef),
connection=connection)
table.recreate()
table.copyIn(tf.extractfile(memberName),
binary=memberName.endswith(".dump"))
table.makeIndices()
except Exception as msg:
table.connection.rollback()
base.ui.notifyError("Restore of %s failed: %s"%(
tdId, utils.safe_str(msg)))
else:
table.connection.commit()
################# CLI functions
[docs]@utils.exposedFunction([
utils.Arg("-t", "--text",
help="Dump text rather than binary",
dest="textFormat",
action="store_true"),
utils.Arg("dumpFile", help="Name of a file to write the dump to; use - to"
" dump to stderr."),
utils.Arg("ids", help="ids of table definitions (as in myres/q#main)"
" or RDs to dump.", nargs="+")],
help="Dump one or more tables to DaCHS' dump format.")
def create(args):
if args.dumpFile=="-":
dumpTo = sys.stdout
else:
dn = os.path.dirname(args.dumpFile)
if dn:
utils.ensureDir(dn)
dumpTo = open(args.dumpFile, "wb")
try:
createDump(args.ids, dumpTo, not args.textFormat)
finally:
dumpTo.flush()
if dumpTo!=sys.stdout:
dumpTo.close()
[docs]@utils.exposedFunction([
utils.Arg("source",
help="File to restore from. Use - to restore from stdin.")],
help="Restore one or more table(s) from a file created by the create"
" subcommand before")
def load(args):
if args.source=="-":
loadFrom = sys.stdin
else:
loadFrom = open(args.source, "rb")
restoreDump(loadFrom)
if loadFrom!=sys.stdin:
loadFrom.close()
[docs]@utils.exposedFunction([
utils.Arg("source", help="File to list")],
help="List tables and dump metadata from a DaCHS dump.")
def ls(args):
import datetime
with open(args.source, "rb") as f:
data = []
for _, tdId, exists, mtime, size in iterTableInfos(f):
data.append((
tdId,
"probably" if exists else "no",
datetime.datetime.utcfromtimestamp(mtime).isoformat(),
size))
sys.stdout.write(utils.formatSimpleTable(
data,
titles="table restorable? dumped size".split())+"\n")
[docs]def main():
"""does the user interaction.
"""
from gavo import rscdesc #noflake: for registration
args = utils.makeCLIParser(globals()).parse_args()
args.subAction(args)