Source code for gavo.rscdef.rmkfuncs

"""
Functions available to rowmaker procs.

Rowmaker procs are compiled in the namespace defined by this module.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import base64
import datetime
import linecache
import math
import os
import pprint #noflake: exported name
import re #noflake: exported name
import sys #noflake: exported name
import time
import traceback #noflake: exported name
import urllib.request, urllib.parse, urllib.error #noflake: exported name

import numpy #noflake: exported name

from gavo import base
from gavo import stc
from gavo import utils
from gavo.base import ( #noflake: exported names
	coords, parseBooleanLiteral, parseInt, sqlmunge,
	makeSitePath, makeAbsoluteURL, SkipThis)
from gavo.base.literals import (identity, #noflake:exported names
	parseInt, parseFloat, parseBooleanLiteral, parseUnicode,
	parseDefaultDate, parseDefaultDatetime, parseDefaultTime, parseCooPair,
	parseSPoint, getDefaultValueParsers, originalOrIdentity, parseBytes)

from gavo.base.unitconv import ( #noflake: exported names
	PLANCK_H, LIGHT_C, BOLTZMANN_K)
from gavo.rscdef.common import ( #noflake: exported names
	getStandardPubDID, getAccrefFromStandardPubDID,
	getDatalinkMetaLink,
	getInputsRelativePath)
from gavo.stc import parseSimpleSTCS #noflake: exported name
from gavo.stc.times import ( #noflake: exported names
	dateTimeToJdn, dateTimeToMJD, dateTimeToJYear,
	bYearToDateTime, jdnToDateTime, mjdToDateTime, TTtoTAI, TAItoTT)
from gavo.utils import codetricks
from gavo.utils import ( #noflake: exported names
	dmsToDeg, hmsToDeg, degToDms, degToHms, hoursToHms, makeIAUId,
	DEG, parseISODT, iterSimpleText, getFileStem,
	getWCSAxis, getRelativePath, loadPythonModule)
from gavo.utils import pgsphere #noflake: exported names


# degrees per mas
DEG_MAS = 1/3600000.
# degrees per arcsec
DEG_ARCSEC = 1/3600.

NaN = float('nan')

MS = base.makeStruct


[docs]class IgnoreThisRow(base.ExecutiveAction): """can be raised by user code to indicate that a row should be skipped when building a table. Note: To skip an entire source, raise SkipThis (usually in a rowfilter or so). Also note that the non-code way to skip things, `Triggers`_, is preferred when you don't already use code. """
[docs]def addCartesian(result, alpha, delta): """inserts c_x, c_y, and c_z for the equatorial position alpha, delta into result. c_x, c_y, and c_z are the cartesian coordinates of the intersection point of the radius vector of alpha, delta with the unit sphere. alpha and delta already have to be floats, so you probably want to use variables here. >>> r = {}; addCartesian(r, 25, 30); "%.7f %.7f"%(r["c_x"], r["c_y"]) '0.7848856 0.3659982' """ result["c_x"], result["c_y"], result["c_z"] = coords.computeUnitSphereCoords( alpha, delta)
[docs]def combinePMs(result, pma, pmd): """inserts pm_total (in degs/yr) and pm_posang (in degs, east over north) into result. pma and pmd have to be in degs/yr, with cos(delta) applied to pma. """ if pma is None or pmd is None: tpm = pmpa = None else: tpm = math.sqrt(pma**2+pmd**2) pmpa = math.atan2(pma, pmd)*360/2/math.pi result["pm_total"] = tpm result["pm_posang"] = pmpa
[docs]@utils.document def getQueryMeta(): """returns a query meta object from somewhere up the stack. This is for row makers running within a service. This can be used to, e.g., enforce match limits by writing getQueryMeta()["dbLimit"]. """ return codetricks.stealVar("queryMeta")
[docs]@utils.document def parseTime(literal, format="%H:%M:%S"): """returns a ``datetime.timedelta`` object for literal parsed according to format. For format, you can the magic values ``!!secondsSinceMidnight``, ``!!decimalHours`` or a strptime-like spec using the H, M, and S codes. >>> parseTime("89930", "!!secondsSinceMidnight") datetime.timedelta(days=1, seconds=3530) >>> parseTime("23.4", "!!decimalHours") datetime.timedelta(seconds=84240) >>> parseTime("3.4:5", "%H.%M:%S") datetime.timedelta(seconds=11045) >>> parseTime("20:04", "%H:%M") datetime.timedelta(seconds=72240) """ if format=="!!secondsSinceMidnight": return datetime.timedelta(seconds=float(literal)) elif format=="!!decimalHours": return datetime.timedelta(hours=float(literal)) else: # We can't really use prebuilt strptimes since times like 25:21:22.445 # are perfectly ok in astronomy. partDict = utils.parsePercentExpression(literal, format) return datetime.timedelta(0, hours=float(partDict.get("H", 0)), minutes=float(partDict.get("M", 0)), seconds=float(partDict.get("S", 0)))
[docs]@utils.document def parseDate(literal, format="%Y-%m-%d"): """returns a ``datetime.date`` object of literal parsed according to the strptime-similar format. The function understands the special ``dateFormat`` ``!!jYear`` (stuff like 1980.89). """ if format in ("!!julianEp", "!!jYear"): return stc.jYearToDateTime(float(literal)) return datetime.datetime(*time.strptime(literal, format)[:3])
[docs]@utils.document def parseTimestamp(literal, format="%Y-%m-%dT%H:%M:%S"): """returns a ``datetime.datetime`` object from a literal parsed according to the strptime-similar format. A ``ValueError`` is raised if literal doesn't match format (actually, a parse with essentially DALI-standard ISO representation is always tried) """ try: return datetime.datetime(*time.strptime(literal, format)[:6]) except ValueError: # always try ISO format return parseISODT(literal)
[docs]@utils.document def toMJD(literal): """returns a modified julian date made from some datetime representation. Valid representations include: * MJD (a float smaller than 1e6) * JD (a float larger than 1e6) * datetime.datetime instances * ISO time strings. """ if literal is None: return None elif isinstance(literal, str): return stc.dateTimeToMJD(parseTimestamp(literal)) elif isinstance(literal, datetime.datetime): return stc.dateTimeToMJD(literal) elif literal>1e6: return literal-stc.JD_MJD else: return literal
[docs]@utils.document def makeTimestamp(date, time): """makes a datetime instance from a date and a time. """ return datetime.datetime(date.year, date.month, date.day)+time
[docs]@utils.document def parseAngle(literal, format, sepChar=None): """converts the various forms angles might be encountered to degrees. format is one of hms, dms, fracHour. For sexagesimal/time angles, you can pass a sepChar (default: split at blanks) that lets you specify what separates hours/degrees, minutes, and seconds. >>> "%.8f"%(parseAngle("23 59 59.95", "hms")) '359.99979167' >>> "%10.5f"%parseAngle("-20:31:05.12", "dms", sepChar=":") ' -20.51809' >>> "%010.6f"%parseAngle("21.0209556", "fracHour") '315.314334' """ if format=="dms": return utils.dmsToDeg(literal, sepChar=sepChar) elif format=="hms": return utils.hmsToDeg(literal, sepChar=sepChar) elif format=="fracHour": return utils.fracHoursToDeg(literal) else: raise base.Error("Invalid format: %s"%format)
[docs]@utils.document def computeMean(val1, val2): """returns the mean value between two values. Beware: Integer division done here for the benefit of datetime calculations. >>> computeMean(1.,3) 2.0 >>> computeMean(datetime.datetime(2000, 10, 13), ... datetime.datetime(2000, 10, 12)) datetime.datetime(2000, 10, 12, 12, 0) """ return val1+(val2-val1)/2
[docs]@utils.document def killBlanks(literal): """returns the string literal with all blanks removed. This is useful when numbers are formatted with blanks thrown in. Nones are passed through. """ if literal is None: return None else: return literal.replace(" ", "")
[docs]@utils.document def lastSourceElements(path, numElements): """returns a path made up from the last ``numElements`` items in ``path``. """ newPath = [] for i in range(int(numElements)): path, part = os.path.split(path) newPath.append(part) newPath.reverse() return os.path.join(*newPath)
[docs]@utils.document def scale(val, factor, offset=0): """returns val*factor+offset if val is not None, None otherwise. This is when you want to manipulate a numeric value that may be NULL. It is a somewhat safer alternative to using nullExcs with scaled values. """ if val is None: return None return factor*val+offset
[docs]@utils.document def parseWithNull(literal, baseParser, nullLiteral=base.Undefined, default=None, checker=None): """returns default if literal is ``nullLiteral``, else ``baseParser(literal)``. If ``checker`` is non-None, it must be a callable returning ``True`` if its argument is a null value. ``nullLiteral`` is compared against the unprocessed literal (usually, a string). The intended use is like this (but note that often, a ``nullExcs`` attribute on a rowmaker ``map`` element is the more elegant way: >>> parseWithNull("8888.0", float, "8888") 8888.0 >>> print(parseWithNull("8888", float, "8888")) None >>> print(parseWithNull("N/A", int, "N/A")) None """ if (nullLiteral is not base.Undefined and literal==nullLiteral ) or literal is None: return default if checker is not None: if checker(literal): return default res = baseParser(literal) if res is None: return default return res
[docs]@utils.document def requireValue(val, fieldName): """returns ``val`` unless it is ``None``, in which case a ``ValidationError`` for ``fieldName`` will be raised. """ if val is None: raise base.ValidationError("Value is required but was not provided", fieldName) return val
[docs]def genLimitKeys(inputKey): """yields _MAX and _MIN inputKeys from a single input key. This also tries to sensibly fix descriptions and ucds. This is mainly for datalink metaMakers; condDescs may use a similar thing, but that's not exposed to RDs. Don't use this function any more. It will go away soon. """ name = inputKey.name ucd = inputKey.ucd or None description = inputKey.description base.ui.notifyWarning("Deprecated genLimitKeys function used." " Fix your RD to use proper DALI-intervals (ask dachs-support" " if you don't know what to do.)") yield inputKey.change(name=name+"_MIN", ucd=ucd and "par.min;"+ucd, description=description.rstrip(".")+", lower limit") yield inputKey.change(name=name+"_MAX", ucd=ucd and "par.max;"+ucd, description=description.rstrip(".")+", upper limit")
[docs]@utils.document def getFlatName(accref): """returns a unix-compatible file name for an access reference. The file name will not contain terrible characters, let alone slashes. This is used to, e.g., keep all previews in one directory. """ # just in case someone passes in a full file path, strip it if accref.startswith("/"): accref = utils.getRelativePath(accref, base.getConfig("inputsDir"), liberalChars=True) return base64.b64encode(accref.encode("utf-8"), b"$!").decode("ascii")
[docs]def addProcDefObject(name, func): globals()[name] = func
[docs]def makeProc(funcName, code, setupCode, parent, **moreNames): """compiles a function in the rmkfunc's namespace. code is a complete function source. setupCode is executed right away in this namespace to add globals. """ funcNs = globals().copy() funcNs["parent"] = parent funcNs.update(moreNames) if setupCode.strip(): src = setupCode.rstrip() try: uniqueName = "<setup code in {}>".format(parent.getSourcePosition()) compiled = compile(src, uniqueName, 'exec') linecache.cache[uniqueName] = ( len(src), None, src.split("\n"), uniqueName) exec(compiled, funcNs) except (SyntaxError, TypeError) as ex: raise base.ui.logOldExc( base.BadCode(setupCode, "setup code", ex, pos=parent.getSourcePosition())) except NameError as ex: raise base.ui.logOldExc( base.BadCode(setupCode, "setup code", ex, hint="This typically happens when you forget to put" " quotes around string values.", pos=parent.getSourcePosition())) uniqueName = None if parent and getattr(parent, "name_", None): uniqueName = "<{} at {}>".format(parent.name_, parent.getSourcePosition()) return utils.compileFunction( code.rstrip(), funcName, funcNs, debug=base.DEBUG, uniqueName=uniqueName)
def _test(): # pragma: no cover import doctest doctest.testmod()