import numpy as np
import pandas as pd
import sqlite3
import warnings
import logging
from astropy.time import Time
import erfa
import astropy.units as u
import os
import pyvo
import requests
[docs]
logger = logging.getLogger(__name__)
[docs]
def get_data_table(sql_query, service=None, sql_filename=None):
"""Gets a table of data based on a SQL query. Table is pulled from either the RSP or a local SQL database:
this behaviour is controlled by the service and sql_filename parameters, one of which must be supplied.
Parameters
-----------
sql_query : str
The SQL query made to the RSP or SQL database.
service : pyvo.dal.tap.TAPService object or None
TAPService object linked to the RSP. Default=None.
sql_filename : str or None
Filepath to a SQL database. Default=None.
Returns
-----------
data_table : DALResultsTable or Pandas dataframe
Data table containing the results of the SQL query.
"""
if service: # pragma: no cover
data_table = service.search(sql_query)
elif sql_filename:
cnx = sqlite3.connect(sql_filename)
data_table = pd.read_sql_query(
sql_query, cnx
) # would really like to move away from Pandas for this...
# Pandas is triggering a useless FutureWarning here which I am choosing to suppress.
# The code is already written to account for the FutureWarning, but it triggers anyway. Thanks, Pandas.
# Note that pd.option_context("future.no_silent_downcasting", True) would also work, but only for Pandas >2.0.3.
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
data_table = data_table.fillna(value=np.nan).infer_objects(
copy=False
) # changes Nones to NaNs because None forces dtype=object: bad.
return data_table
[docs]
def get_from_table(data_table, column_name, data_type, table_name="default"):
"""Retrieves information from the data_table and forces it to be a specified type.
Parameters
-----------
data_table : DALResultsTable or Pandas dataframe
Data table containing columns of interest.
column_name : str
Column name under which the data of interest is stored.
data_type : type
Data type. Should be int, float, str or np.ndarray.
table_name : str
Name of the table. This is mostly for more informative error messages. Default="default".
Returns
-----------
data_val : str, float, int or nd.array
The data requested from the table cast to the type required.
"""
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", category=UserWarning
) # RSP tables mask unpopulated elements, which get converted to NaN here and trigger a warning we don't care about.
try:
if data_type == str:
data_val = str(data_table[column_name][0])
elif data_type == float:
data_val = float(data_table[column_name][0])
elif data_type == int:
data_val = int(data_table[column_name][0])
elif data_type == np.ndarray:
data_val = np.array(data_table[column_name], ndmin=1)
else:
logger.error(
"TypeError: Type for argument data_type not recognised for column {} in table {}: must be str, float, int or np.ndarray.".format(
column_name, table_name
)
)
raise TypeError(
"Type for argument data_type not recognised for column {} in table {}: must be str, float, int or np.ndarray.".format(
column_name, table_name
)
)
except ValueError:
logger.error("ValueError: Could not cast column name to type.")
raise ValueError("Could not cast column name to type.")
# here we alert the user if one of the values is unpopulated and change it to a NaN
data_val = check_value_populated(data_val, data_type, column_name, table_name)
return data_val
[docs]
def get_from_dictionary(data_dict, key_name, data_type, table_name="default"):
"""Retrieves information from a dictionary and forces it to be a specified type.
Parameters
-----------
data_dict : dict or dict-like object
Dictionary containing columns of interest.
key_name : str
Key name under which the data of interest is stored.
data_type : type
Data type. Should be int, float, str or np.ndarray.
table_name : str
Name of the table or dictionary. This is mostly for more informative error messages. Default="default".
Returns
-----------
data_val : str, float, int or nd.array
The data requested from the dictionary cast to the type required.
"""
try:
if data_type == str:
data_val = str(data_dict[key_name])
elif data_type == float:
data_val = float(data_dict[key_name])
elif data_type == int:
data_val = int(data_dict[key_name])
elif data_type == np.ndarray:
data_val = np.array(data_dict[key_name], ndmin=1)
else:
print("type not recognised")
except ValueError:
print("error message")
data_val = check_value_populated(data_val, data_type, key_name, "dictionary")
return data_val
[docs]
def check_value_populated(data_val, data_type, column_name, table_name):
"""Checks to see if data_val populated properly and prints a helpful warning if it didn't.
Usually this will trigger because the RSP or Cassandra database hasn't populated that
field for this particular object.
Parameters
-----------
data_val : str, float, int or nd.array
The value to check.
data_type: type
Data type. Should be int, float, str or np.ndarray.
column_name: str
Column name under which the data of interest is stored.
table_name : str
Name of the table. This is mostly for more informative error messages. Default="default".
Returns
-----------
data_val : str, float, int, nd.array or np.nan
Either returns the original data_val or an np.nan if it detected that the value was not populated.
"""
array_length_zero = data_type == np.ndarray and len(data_val) == 0
number_is_nan = data_type in [float, int] and np.isnan(data_val)
str_is_empty = data_type == str and len(data_val) == 0
if array_length_zero or number_is_nan or str_is_empty:
logger.warning(
"{} unpopulated in {} table for this object. Storing NaN instead.".format(column_name, table_name)
)
data_val = np.nan
return data_val
[docs]
def convertTime(timestamps, input_fmt="mjd", input_scale="utc", output_fmt="mjd", output_scale="tai"):
"""
Convenience function for converting timestamps between formats and scales.
Parameters
-----------
timestamps : float, int or nd.array
Timestamp(s) to be converted.
input_fmt: str
Input format of timestamps. See https://docs.astropy.org/en/latest/api/astropy.time.Time.html for allowable formats. Default 'mjd'.
input_scale: str
Input scale of timestamps. See https://docs.astropy.org/en/latest/api/astropy.time.Time.html for allowable scales. Default 'utc'.
output_fmt: str
Desired format of output values. See https://docs.astropy.org/en/latest/api/astropy.time.Time.html for allowable formats. Default 'mjd'.
output_scale: str
Desired scale of output values. See https://docs.astropy.org/en/latest/api/astropy.time.Time.html for allowable scales. Default 'tai'.
Returns
-----------
output_timestamps : astropy.time.Time object
The output timestamps in the desired format and scale as an astropy.time.Time object.
"""
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message=".*dubious year.*", category=erfa.ErfaWarning, module="erfa.core"
) # Converting dates really far in the future throws a warning here that we ignore
output_timestamps = getattr(
getattr(Time(timestamps, format=input_fmt, scale=input_scale), output_scale), output_fmt
)
return output_timestamps
[docs]
def sqlite_column_exists(conn, table, column):
"""
Function for checking if column exists in a given table in a given database connection.
Parameters
-----------
conn : sqlite3.Connection object
The connection to the database that we wish to check for a given column.
table : str
The name of the table we wish to check for the given column in.
column : str
The name of the column we wish to check for.
Returns
-----------
result : bool
Boolean flag for whether the collect exists or not.
"""
cur = conn.cursor()
cur.execute(f"PRAGMA table_info({table})")
result = any(row[1] == column for row in cur.fetchall())
return result
[docs]
def add_column_if_not_exists(conn, table, column, coltype):
"""
Function for adding a column to a given table in a given database if it does not exist.
Parameters
-----------
conn : sqlite3.Connection object
The connection to the database that we wish to add the column to in the given table.
table : str
The name of the table we wish add the column to.
column : str
The name of the column we wish to add.
coltype : str
SQlite datatype for the column we wish to add. See https://sqlite.org/datatype3.html
Returns
-----------
None
"""
if not sqlite_column_exists(conn, table, column):
cur = conn.cursor()
cur.execute(f"ALTER TABLE {table} ADD COLUMN {column} {coltype};")
logger.info(f"{column} added to {table}")
else:
logger.info(f"{column} already exists in {table}")
[docs]
def mpc_file_preprocessing(sql_filename): # pragma: no cover
"""
Function for performing pre-processing steps on the obs_sbn table in the MPC file format.
The function strips the leading 'L' from the band in the obs_sbn file;
adds a mjd_tai column with the observation time in MJD in the TAI scale;
and checks the topocentricDist, heliocentricDist and phaseAngle are present (renaming columns if need be).
Parameters
-----------
sql_filename : str
Filepath to the local SQL database.
Returns
-----------
None
"""
conn = sqlite3.connect(sql_filename)
cursor = conn.cursor()
# Strip the leading L that is used for some of the observations in the MPC file to ensure compatibility with adler
cursor.execute("CREATE INDEX IF NOT EXISTS idx_obs_sbn_band ON obs_sbn(band);")
cursor.execute("UPDATE obs_sbn SET band=substr(band, 2) WHERE band LIKE 'L%';")
conn.commit()
logger.warning(f"Leading 'L' stripped from band names")
# Add MJD TAI column
if sqlite_column_exists(conn, "obs_sbn", "mjd_tai"):
logger.info(f"mjd_tai column already in file")
else:
add_column_if_not_exists(conn, "obs_sbn", "mjd_tai", "REAL")
cursor.execute(f"SELECT rowid, mjd_utc FROM obs_sbn;")
rows = cursor.fetchall()
rowids = np.array([r[0] for r in rows])
mjd_utc_vals = np.array([r[1] for r in rows], dtype=float)
mjd_tai_vals = convertTime(
mjd_utc_vals, input_fmt="mjd", input_scale="utc", output_fmt="mjd", output_scale="tai"
)
data_to_update = list(zip(mjd_tai_vals.tolist(), rowids.tolist()))
cursor.executemany(f"UPDATE obs_sbn SET mjd_tai = ? WHERE rowid = ?;", data_to_update)
conn.commit()
logger.info(f"Added mjd_tai column to obs_sbn")
if (
sqlite_column_exists(conn, "obs_sbn", "heliocentricDist")
and sqlite_column_exists(conn, "obs_sbn", "topocentricDist")
and sqlite_column_exists(conn, "obs_sbn", "phaseAngle")
):
logger.info(f"heliocentricDist, topocentricDist and phaseAngle information exist in obs_sbn already.")
else:
# Check if columns with alternative names exist:
# Once we fix the light travel time considerations this won't be technically wrong
if sqlite_column_exists(
conn, "obs_sbn", "r"
): # heliocentricDist (without ltt corretion) is called r in some versions of this file
cursor.execute("ALTER TABLE obs_sbn RENAME COLUMN r TO heliocentricDist")
conn.commit()
logger.warning(
"Column r renamed to heliocentricDist in obs_sbn. Be wary of light travel time as this may not have been accounted for yet"
)
if sqlite_column_exists(
conn, "obs_sbn", "delta"
): # topocentricDist (without ltt corretion) is called delta in some versions of this file
cursor.execute("ALTER TABLE obs_sbn RENAME COLUMN delta TO topocentricDist")
conn.commit()
logger.warning(
"Column delta renamed to topocentricDist in obs_sbn. Be wary of light travel time as this may not have been accounted for yet"
)
if sqlite_column_exists(
conn, "obs_sbn", "alpha"
): # phaseAngle is called alpha in JPL Horizons and may not have been changed in the file
cursor.execute("ALTER TABLE obs_sbn RENAME COLUMN alpha TO phaseAngle")
conn.commit()
logger.warning(
"Column alpha renamed to phaseAngle in obs_sbn. Be wary of light travel time as this may not have been accounted for yet"
)
[docs]
def flux_to_magnitude(flux, flux_err=np.nan):
"""Converts a flux measurement (with units of nanoJanskys) and its associated error
into AB magnitudes. If no flux error is provided, the returned magnitude error
will be NaN.
Parameters
-----------
flux : float or astropy.units.Quantity
Flux value in nanoJanskys (can be specified with Astropy units of nanoJanskys (u.nJy).
flux_err : float or astropy.units.Quantity, optional
Flux error with units of nanoJanskys (u.nJy). Default is np.nan (dimensionless),
in which case the magnitude error will be returned as NaN.
Returns
-----------
magnitude : float
The flux converted into AB magnitude (unitless scalar).
magnitude_err : float
The propagated uncertainty in AB magnitude (unitless scalar).
Returns NaN if flux_err is not provided.
"""
# TODO Handle the masked arrays better here
# (ideally I think we want to keep magnitude as a masked array rather than making magErr non-masked)
magnitude = flux.to(u.ABmag).value
magnitude_err = ((2.5 / np.log(10)) * (flux_err / flux)).value
return magnitude, magnitude_err
[docs]
def generate_summary_csvs(
sql_filepath, output_file_root, output_cols=["ssObjectId"], filter_list=["u", "g", "r", "i", "z", "y"]
):
"""
Function for generating the CSV files that summarise the contents of the outlier detection SQLite database.
Parameters
-----------
sql_filepath : str
Filepath to the SQLite database
output_file_root : str
Desired path for the output file. Default in example notebook is f"{output_dir}/{planetoid.AdlerData.modelId}".
output_cols : list of str, optional
List of columns desired for output file. Default: ["ssObjectId"]. If default chosen, only DISTINCT ssObjectIds will be written out.
filter_list : list of str, optional
A comma-separated list of the filters of interest.
"""
logger.info("Generating lists of objects of interest")
con = sqlite3.connect(sql_filepath)
if output_cols == ["ssObjectId"]:
logger.info(
f"Only ssObjectId selected for summary output, Adler will use 'SELECT DISTINCT ssObjectId' to return the unique ssObjectIds in AdlerSourceFlags."
)
output_cols_sql = "DISTINCT ssObjectId"
else:
output_cols_sql = ", ".join(output_cols)
# Simple magnitude difference
mag_diff_sql_query = f"""
SELECT {output_cols_sql} FROM AdlerSourceFlags
WHERE mag_diff!=0
"""
mag_diff_output = f"{output_file_root}_outliers.csv"
mag_diff_df = pd.read_sql_query(mag_diff_sql_query, con)
if len(mag_diff_df) == 0:
logger.info(f"No outliers found, {mag_diff_output} not generated")
else:
mag_diff_df.to_csv(mag_diff_output, index=False)
logger.info(f"Output written to {mag_diff_output}")
# Outliers in sigma-space
std_diff_sql_query = f"""
SELECT {output_cols_sql} FROM AdlerSourceFlags
WHERE std_diff!=0
"""
std_diff_output = f"{output_file_root}_std_outliers.csv"
std_diff_df = pd.read_sql_query(std_diff_sql_query, con)
if len(std_diff_df) == 0:
logger.info(f"No outliers found, {std_diff_output} not generated")
else:
std_diff_df.to_csv(std_diff_output, index=False)
logger.info(f"Output written to {std_diff_output}")
# Sustained outliers
sus_outlier_sql_condition = " OR ".join(f"{filt}_sustained_outliers IS NOT NULL" for filt in filter_list)
sus_outlier_sql_query = f"SELECT ssObjectId FROM AdlerData WHERE {sus_outlier_sql_condition}"
sus_outlier_output = f"{output_file_root}_sustained_outliers.csv"
sus_outlier_df = pd.read_sql_query(sus_outlier_sql_query, con)
if len(sus_outlier_df) == 0:
logger.info(f"No outliers found, {sus_outlier_output} not generated")
else:
sus_outlier_df.to_csv(sus_outlier_output, index=False)
logger.info(f"Output written to {sus_outlier_output}")
con.close()
[docs]
def get_tap_service_api(rsp_tap_path, api_token_path): # pragma: no cover
"""Returns a pyvo.dal.TAPService object linked to the RSP if provided with the desired API path and valid API token. For use with querying the RSP remotely.
Parameters
-----------
rsp_tap_path : str
End of the API url, 'ssotap' for DP0.3, 'tap' for DP1. Set by schema choice in AdlerPlanetoid.
api_token_path : str
User-provided path to their RSP TAP API token. If specified as, e.g., '~/.rsp_tap.token' this will be expanded by the code.
Returns
-----------
rsp_tap_service : pyvo.dal.TAPService
TAPService object linked to the RSP.
"""
RSP_TAP_SERVICE = f"https://data.lsst.cloud/api/{rsp_tap_path}"
expanded_path = os.path.expanduser(
api_token_path
) # Expands '~' to full path if included in user-provided path
with open(expanded_path, "r") as f:
token_str = f.readline()
token_str = token_str.replace("\n", "")
# tap_session = requests.Session()
# tap_session.headers['Authorization'] = f"Bearer {token_str}"
# rsp_tap_service = pyvo.dal.TAPService(RSP_TAP_SERVICE, session=tap_session)
cred = pyvo.auth.CredentialStore()
cred.set_password("x-oauth-basic", token_str)
credential = cred.get("ivo://ivoa.net/sso#BasicAA")
rsp_tap_service = pyvo.dal.TAPService(RSP_TAP_SERVICE, session=credential)
return rsp_tap_service