"""
OS abstractions and related.
This module contains, in partiular, the interface for having "easy subcommands"
using argparse. The idea is to use the exposedFunction decorator on functions
that should be callable from the command line as subcommands; the functions
must all have the same signature. For example, if they all took the stuff
returned by argparse, you could say in the module containing them::
args = makeCLIParser(globals()).parse_args()
args.subAction(args)
To specify the command line arguments to the function, use Args. See
admin.py for an example.
"""
#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import argparse
import contextlib
import os
import sys
import tempfile
import urllib.request, urllib.parse
from gavo.utils import codetricks
from gavo.utils import excs
from gavo.utils import misctricks
from gavo.utils.dachstypes import (Any, BinaryIO, Callable, Dict, Filename,
Generator, IO, List, Optional, TextIO, Tuple, Union)
[docs]def safeclose(f: IO) -> None:
"""syncs and closes the python file f.
You generally want to use this rather than a plain close() before
overwriting a file with a new version.
"""
f.flush()
os.fsync(f.fileno())
f.close()
[docs]@contextlib.contextmanager
def safeReplaced(fName: Filename, *, binary: bool=True) -> Generator:
"""opens fName for "safe replacement".
Safe replacement means that you can write to the object returned, and
when everything works out all right, what you have written replaces
the old content of fName, where the old mode is preserved if possible.
When there are errors, however, the old content remains.
"""
mode = "wb" if binary else "w"
targetDir = os.path.abspath(os.path.dirname(fName))
try:
oldMode = os.stat(fName)[0]
except os.error:
oldMode = None
handle, tempName = tempfile.mkstemp(".temp", "", dir=targetDir)
targetFile = os.fdopen(handle, mode)
try:
yield targetFile
except:
safeclose(targetFile)
try:
os.unlink(tempName)
except os.error:
pass
raise
else:
safeclose(targetFile)
os.rename(tempName, fName)
if oldMode is not None:
try:
os.chmod(fName, oldMode)
except os.error:
pass
class _UrlopenRemotePasswordMgr(urllib.request.HTTPPasswordMgr):
"""A password manager that grabs credentials from upwards in
its call stack.
This is for cooperation with urlopenRemote, which defines a name
_temp_credentials. If this is non-None, it's supposed to be
a pair of user password presented to *any* realm. This means
that, at least with http basic auth, password stealing is
almost trivial.
"""
def find_user_password(self, realm: str, authuri: str
) -> Tuple[Optional[str], Optional[str]]:
creds = codetricks.stealVar("_temp_credentials")
if creds is not None:
return creds
return (None, None)
try:
import ssl
[docs] class HTTPSHandler(urllib.request.HTTPSHandler):
# We're overriding the https handler so we don't actually
# check certificates. Yes, that may seem somewhat daring,
# but then expired certs are normal in our business, whereas
# man-in-the-middle attacks are not.
def __init__(self,
debuglevel: int=0,
context: Optional[ssl.SSLContext]=None):
if context is None:
context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH,
cafile="/etc/ssl/certs/ca-certificates.crt")
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
urllib.request.HTTPSHandler.__init__(self, debuglevel, context)
except (ImportError, IOError):
# probably the ssl bundle isn't where I think it is; just use
# the normal, certificate-checking https handler.
from urllib.request import HTTPSHandler # type: ignore
_restrictedURLOpener = urllib.request.OpenerDirector()
_restrictedURLOpener.add_handler(urllib.request.HTTPRedirectHandler())
_restrictedURLOpener.add_handler(urllib.request.HTTPHandler())
try:
_restrictedURLOpener.add_handler(HTTPSHandler())
except IOError:
# some versions of ssl only check for the CA bundle at HTTPSHandler
# construction time. If that happens, fall back to urllib default handler
_restrictedURLOpener.add_handler(urllib.request.HTTPSHandler())
_restrictedURLOpener.add_handler(
urllib.request.HTTPBasicAuthHandler(_UrlopenRemotePasswordMgr()))
_restrictedURLOpener.add_handler(urllib.request.HTTPErrorProcessor())
_restrictedURLOpener.add_handler(urllib.request.FTPHandler())
_restrictedURLOpener.add_handler(urllib.request.UnknownHandler())
_restrictedURLOpener.addheaders = [("user-agent",
"GAVO DaCHS HTTP client")]
[docs]def setUserAgent(userAgent: str) -> None:
"""sets the user agent string for requests through urlopenRemote.
This is a global setting and thus, in particular, nowhere near
thread-safe.
"""
assert len(_restrictedURLOpener.addheaders)==1
_restrictedURLOpener.addheaders = [("user-agent", userAgent)]
[docs]def urlopenRemote(url: str, *,
data: Union[None,dict,str,bytes] = None,
creds: Tuple[Optional[str], Optional[str]]=(None, None),
timeout: int=100) -> BinaryIO:
"""works like urllib.urlopen, except only http, https, and ftp URLs
are handled.
The function also massages the error messages of urllib a bit. urllib
errors always become IOErrors (which is more convenient within DaCHS).
creds may be a pair of username and password. Those credentials
will be presented in http basic authentication to any server
that cares to ask. For both reasons, don't use any valuable credentials
here.
"""
# The name in the next line is used in _UrlopenRemotePasswrodMgr
_temp_credentials = creds #noflake: Picked up from down the call chain
if isinstance(data, dict):
data = urllib.parse.urlencode(data, encoding="utf-8")
if isinstance(data, str):
data = data.encode("ascii")
try:
res = _restrictedURLOpener.open(url, data, timeout=timeout)
if res is None:
raise IOError("Could not open URL %s -- does the resource exist?"%
url)
return res
except (urllib.error.URLError, ValueError) as msg:
msgStr = str(msg)
try:
msgStr = msg.args[0]
if isinstance(msgStr, Exception):
try: # maybe it's an os/socket type error
msgStr = msgStr.args[1]
except IndexError: # maybe not...
pass
if not isinstance(msgStr, str):
msgStr = str(msg)
except:
# there's going to be an error message, albeit maybe a weird one
pass
raise IOError("Could not open URL %s: %s"%(url, msgStr))
[docs]def fgetmtime(fileobj: IO) -> float:
"""returns the mtime of the file below fileobj (like os.path.getmtime,
but without having to have a file name).
This raises an os.error if that file cannot be fstated.
"""
try:
return os.fstat(fileobj.fileno()).st_mtime
except AttributeError:
raise misctricks.logOldExc(os.error("Not a file: %s"%repr(fileobj)))
[docs]def cat(srcF: IO, destF: IO, chunkSize: int=1<<20) -> None:
"""reads srcF into destF in chunks.
"""
while True:
data = srcF.read(chunkSize)
if not data:
break
destF.write(data)
[docs]def ensureDir(dirPath: str, *,
mode: Optional[int]=None, setGroupTo: Optional[int]=None) -> None:
"""makes sure that dirPath exists and is a directory.
If dirPath does not exist, it is created, and its permissions are
set to mode with group ownership setGroupTo if those are given.
setGroupTo must be a numerical gid if given.
This function may raise all kinds of os.errors if something goes
wrong. These probably should be handed through all the way to the
user since when something fails here, there's usually little
the program can safely do to recover.
"""
if os.path.exists(dirPath):
return
os.mkdir(dirPath)
if mode is not None:
os.chmod(dirPath, mode)
if setGroupTo:
os.chown(dirPath, -1, setGroupTo)
[docs]class Arg:
"""an argument/option to a subcommand.
These are constructed with positional and keyword parameters to
the argparse's add_argument.
"""
def __init__(self, *args, **kwargs):
self.args, self.kwargs = args, kwargs
[docs] def add(self, parser):
parser.add_argument(*self.args, **self.kwargs)
[docs]def exposedFunction(argSpecs: List[Arg]=[], help: Optional[str]=None
) -> Callable:
"""a decorator exposing a function to parseArgs.
argSpecs is a sequence of Arg objects. This defines the command line
interface to the function.
The decorated function itself must accept a single argument,
the args object returned by argparse's parse_args.
"""
def deco(func):
func.subparseArgs = argSpecs
func.subparseHelp = help
return func
return deco
class _PrefixMatchDict(dict):
"""A dictionary matching on unique prefixes.
Is is just barely enough to enable longest-prefix matching for
argparse.
"""
def __getitem__(self, key: str) -> Any: # type: ignore[override]
matches = [s for s in list(self.keys()) if s.startswith(key)]
if len(matches)==0:
raise KeyError(key)
elif len(matches)==1:
return dict.__getitem__(self, matches[0])
else:
raise excs.ReportableError("Ambiguous subcommand specification;"
" choose between %s."%repr(matches))
def __contains__(self, key: str) -> bool: # type: ignore[override]
for s in list(self.keys()):
if s.startswith(key):
return True
return False
[docs]def makeCLIParser(functions: Dict[str, Any]) -> argparse.ArgumentParser:
"""returns a command line parser parsing subcommands from functions.
functions is a dictionary (as returned from globals()). Subcommands
will be generated from all objects that have a subparseArgs attribute;
furnish them using the commandWithArgs decorator.
This attribute must contain a sequence of Arg items (see above).
"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
for name, val in functions.items():
args = getattr(val, "subparseArgs", None)
if args is not None:
subForName = subparsers.add_parser(
name,
description=val.subparseHelp,
help=val.subparseHelp)
for arg in args:
arg.add(subForName)
subForName.set_defaults(subAction=val)
# Now monkeypatch matching of unique prefixes into argparse guts.
# If the guts change, don't fail hard, just turn off prefix matching
try:
for action in parser._actions:
if isinstance(action, argparse._SubParsersAction):
action.choices = action._name_parser_map = \
_PrefixMatchDict(action._name_parser_map)
break
except Exception as msg:
# no prefix matching, then
misctricks.sendUIEvent("Warning",
"Couldn't teach prefix matching to argparse: %s"%repr(msg))
return parser
[docs]class StatusDisplay:
"""A context manager for updating a one-line display.
This shouldn't be used from DaCHS proper (which should use base.ui.notify*),
but it's sometimes handy in helper scripts.
In short::
with StatusDisplay() as d:
for i in range(300):
d.update(str(i))
"""
def __init__(self, dest_f: TextIO=sys.stdout):
self.dest_f = dest_f
self.clearer = "\r\n"
[docs] def update(self, new_content: str) -> None:
self.dest_f.write(self.clearer+new_content)
self.dest_f.flush()
self.clearer = "\r"+(" "*len(new_content))+"\r"
def __enter__(self):
self.dest_f.write(self.clearer)
self.dest_f.flush()
return self
def __exit__(self, *args):
self.dest_f.write("\r\n")
self.dest_f.flush()