"""
Common error handling facilities for user interface components.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import re
import sys
import textwrap
import traceback
from gavo import base
from gavo import rsc
from gavo import utils
from gavo.base import sqlsupport
class _Reformatter(object):
"""A helper class for reformatMessage.
"""
verbatimRE = re.compile("\s")
def __init__(self):
self.inBuffer, self.outBuffer = [], []
def flush(self):
if self.inBuffer:
self.outBuffer.append(textwrap.fill("\n".join(self.inBuffer),
break_long_words=False))
self.inBuffer = []
def feed(self, line):
if self.verbatimRE.match(line):
self.flush()
self.outBuffer.append(line)
elif not line.strip():
self.flush()
self.outBuffer.append("")
else:
self.inBuffer.append(line)
def get(self):
self.flush()
return "\n".join(self.outBuffer)
def feedLines(self, lines):
for l in lines:
self.feed(l)
[docs]def outputError(message):
if isinstance(message, bytes):
# make sure we don't fail on non-ASCII in byte strings
message = message.decode("ascii", "replace")
sys.stderr.write(message)
[docs]def raiseAndCatch(opts=None, output=outputError):
"""Tries to come up with a good error message for the current exception.
opts can be an object with some attribute (read the source); this
usually comes from user.cli's main.
output must be a function accepting a single string, defaulting to
something just encoding the string for the output found and dumping
it to stderr.
The function returns a suggested return value for the whole program.
"""
# Messages are reformatted by reformatMessage (though it's probably ok
# to just call output(someString) to write to the user directly.
#
# To write messages, append strings to the messages list. An empty string
# would produce a paragraph. Append unicode or ASCII.
retval = 1
messages = []
_, msg, _ = sys.exc_info()
if isinstance(msg, SystemExit):
return msg.code
elif isinstance(msg, KeyboardInterrupt):
return 2
elif isinstance(msg, base.SourceParseError):
messages.append("While parsing source %s, near %s:\n"%(
msg.source, msg.location))
messages.append((msg.msg+"\n"))
if msg.offending:
messages.append("Offending literal: %s\n"%repr(msg.offending))
elif isinstance(msg, base.BadCode):
messages.append("Bad user %s: %s\n"%(msg.codeType, str(msg.origExc)))
if msg.pos:
messages.append("(At %s)"%msg.pos)
if base.DEBUG:
sys.stderr.write("The code that produced the error is:\n%s\n\n"%
msg.code)
elif isinstance(msg, rsc.DBTableError):
messages.append("While building table %s: %s"%(msg.qName,
msg))
elif isinstance(msg, base.MetaError):
messages.append("While working on metadata of '%s': %s"%(
str(msg.carrier),
str(msg.__class__.__name__)))
if getattr(msg, "pos", None):
messages.append("in element starting at %s:"%msg.pos)
if msg.key is not None:
messages.append("-- key %s --"%msg.key)
messages.append(utils.safe_str(msg))
elif isinstance(msg, (base.ValidationError, base.ReportableError,
base.LiteralParseError, base.StructureError, base.NotFoundError,
base.MetaValidationError)):
if not getattr(msg, "posInMsg", False):
if getattr(msg, "pos", None):
messages.append("At or near %s:"%msg.pos)
elif getattr(msg, "inFile", None):
messages.append("In %s:"%msg.inFile)
if getattr(msg, "row", None):
row = dict((key, value) for key, value in msg.row.items()
if key and not key.startswith("_") and not key.endswith("_"))
messages.append(" Row\n %s"%formatRow(row))
messages.append(" ")
messages.append(str(msg))
elif isinstance(msg, base.DataError):
messages.append("Error in input data:")
messages.append(utils.safe_str(msg))
elif isinstance(msg, sqlsupport.ProgrammingError):
if msg.cursor:
messages.append("While executing DB query:\n")
messages.append(" "+utils.debytify(msg.cursor.query))
messages.append("\nThe database engine reported:\n")
messages.append(" "+utils.debytify(msg.pgerror or msg.args[0]))
else:
if hasattr(msg, "excRow"):
messages.append("Snafu in %s, %s\n"%(msg.excRow, msg.excCol))
messages.append("")
messages.append("Oops. Unhandled exception %s.\n"%msg.__class__.__name__)
messages.append("Exception payload: %s"%utils.safe_str(msg))
base.ui.notifyError("Uncaught exception at toplevel")
if getattr(opts, "enablePDB", False):
raise
elif getattr(opts, "alwaysTracebacks", False):
traceback.print_exc()
if messages:
errTx = utils.safe_str("*** Error: "+"\n".join(messages))
output(reformatMessage(errTx)+"\n")
if getattr(opts, "showHints", True) and getattr(msg, "hint", None):
output(reformatMessage("Hint: "+msg.hint)+"\n")
return retval
[docs]def bailOut():
"""A fake cli operation just raising exceptions.
This is mainly for testing and development.
"""
if len(sys.argv)<2:
raise ValueError("Too short")
arg = sys.argv[1]
if arg=="--help":
raise base.Error("Hands off this. For Developers only")