Source code for adler.objectdata.objectdata_utilities

import numpy as np
import pandas as pd
import sqlite3
import warnings
import logging
from astropy.time import Time
import erfa
import astropy.units as u
import os
import pyvo
import requests

[docs] logger = logging.getLogger(__name__)
[docs] def get_data_table(sql_query, service=None, sql_filename=None): """Gets a table of data based on a SQL query. Table is pulled from either the RSP or a local SQL database: this behaviour is controlled by the service and sql_filename parameters, one of which must be supplied. Parameters ----------- sql_query : str The SQL query made to the RSP or SQL database. service : pyvo.dal.tap.TAPService object or None TAPService object linked to the RSP. Default=None. sql_filename : str or None Filepath to a SQL database. Default=None. Returns ----------- data_table : DALResultsTable or Pandas dataframe Data table containing the results of the SQL query. """ if service: # pragma: no cover data_table = service.search(sql_query) elif sql_filename: cnx = sqlite3.connect(sql_filename) data_table = pd.read_sql_query( sql_query, cnx ) # would really like to move away from Pandas for this... # Pandas is triggering a useless FutureWarning here which I am choosing to suppress. # The code is already written to account for the FutureWarning, but it triggers anyway. Thanks, Pandas. # Note that pd.option_context("future.no_silent_downcasting", True) would also work, but only for Pandas >2.0.3. with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=FutureWarning) data_table = data_table.fillna(value=np.nan).infer_objects( copy=False ) # changes Nones to NaNs because None forces dtype=object: bad. return data_table
[docs] def get_from_table(data_table, column_name, data_type, table_name="default"): """Retrieves information from the data_table and forces it to be a specified type. Parameters ----------- data_table : DALResultsTable or Pandas dataframe Data table containing columns of interest. column_name : str Column name under which the data of interest is stored. data_type : type Data type. Should be int, float, str or np.ndarray. table_name : str Name of the table. This is mostly for more informative error messages. Default="default". Returns ----------- data_val : str, float, int or nd.array The data requested from the table cast to the type required. """ with warnings.catch_warnings(): warnings.filterwarnings( "ignore", category=UserWarning ) # RSP tables mask unpopulated elements, which get converted to NaN here and trigger a warning we don't care about. try: if data_type == str: data_val = str(data_table[column_name][0]) elif data_type == float: data_val = float(data_table[column_name][0]) elif data_type == int: data_val = int(data_table[column_name][0]) elif data_type == np.ndarray: data_val = np.array(data_table[column_name], ndmin=1) else: logger.error( "TypeError: Type for argument data_type not recognised for column {} in table {}: must be str, float, int or np.ndarray.".format( column_name, table_name ) ) raise TypeError( "Type for argument data_type not recognised for column {} in table {}: must be str, float, int or np.ndarray.".format( column_name, table_name ) ) except ValueError: logger.error("ValueError: Could not cast column name to type.") raise ValueError("Could not cast column name to type.") # here we alert the user if one of the values is unpopulated and change it to a NaN data_val = check_value_populated(data_val, data_type, column_name, table_name) return data_val
[docs] def get_from_dictionary(data_dict, key_name, data_type, table_name="default"): """Retrieves information from a dictionary and forces it to be a specified type. Parameters ----------- data_dict : dict or dict-like object Dictionary containing columns of interest. key_name : str Key name under which the data of interest is stored. data_type : type Data type. Should be int, float, str or np.ndarray. table_name : str Name of the table or dictionary. This is mostly for more informative error messages. Default="default". Returns ----------- data_val : str, float, int or nd.array The data requested from the dictionary cast to the type required. """ try: if data_type == str: data_val = str(data_dict[key_name]) elif data_type == float: data_val = float(data_dict[key_name]) elif data_type == int: data_val = int(data_dict[key_name]) elif data_type == np.ndarray: data_val = np.array(data_dict[key_name], ndmin=1) else: print("type not recognised") except ValueError: print("error message") data_val = check_value_populated(data_val, data_type, key_name, "dictionary") return data_val
[docs] def check_value_populated(data_val, data_type, column_name, table_name): """Checks to see if data_val populated properly and prints a helpful warning if it didn't. Usually this will trigger because the RSP or Cassandra database hasn't populated that field for this particular object. Parameters ----------- data_val : str, float, int or nd.array The value to check. data_type: type Data type. Should be int, float, str or np.ndarray. column_name: str Column name under which the data of interest is stored. table_name : str Name of the table. This is mostly for more informative error messages. Default="default". Returns ----------- data_val : str, float, int, nd.array or np.nan Either returns the original data_val or an np.nan if it detected that the value was not populated. """ array_length_zero = data_type == np.ndarray and len(data_val) == 0 number_is_nan = data_type in [float, int] and np.isnan(data_val) str_is_empty = data_type == str and len(data_val) == 0 if array_length_zero or number_is_nan or str_is_empty: logger.warning( "{} unpopulated in {} table for this object. Storing NaN instead.".format(column_name, table_name) ) data_val = np.nan return data_val
[docs] def convertTime(timestamps, input_fmt="mjd", input_scale="utc", output_fmt="mjd", output_scale="tai"): """ Convenience function for converting timestamps between formats and scales. Parameters ----------- timestamps : float, int or nd.array Timestamp(s) to be converted. input_fmt: str Input format of timestamps. See https://docs.astropy.org/en/latest/api/astropy.time.Time.html for allowable formats. Default 'mjd'. input_scale: str Input scale of timestamps. See https://docs.astropy.org/en/latest/api/astropy.time.Time.html for allowable scales. Default 'utc'. output_fmt: str Desired format of output values. See https://docs.astropy.org/en/latest/api/astropy.time.Time.html for allowable formats. Default 'mjd'. output_scale: str Desired scale of output values. See https://docs.astropy.org/en/latest/api/astropy.time.Time.html for allowable scales. Default 'tai'. Returns ----------- output_timestamps : astropy.time.Time object The output timestamps in the desired format and scale as an astropy.time.Time object. """ with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message=".*dubious year.*", category=erfa.ErfaWarning, module="erfa.core" ) # Converting dates really far in the future throws a warning here that we ignore output_timestamps = getattr( getattr(Time(timestamps, format=input_fmt, scale=input_scale), output_scale), output_fmt ) return output_timestamps
[docs] def sqlite_column_exists(conn, table, column): """ Function for checking if column exists in a given table in a given database connection. Parameters ----------- conn : sqlite3.Connection object The connection to the database that we wish to check for a given column. table : str The name of the table we wish to check for the given column in. column : str The name of the column we wish to check for. Returns ----------- result : bool Boolean flag for whether the collect exists or not. """ cur = conn.cursor() cur.execute(f"PRAGMA table_info({table})") result = any(row[1] == column for row in cur.fetchall()) return result
[docs] def add_column_if_not_exists(conn, table, column, coltype): """ Function for adding a column to a given table in a given database if it does not exist. Parameters ----------- conn : sqlite3.Connection object The connection to the database that we wish to add the column to in the given table. table : str The name of the table we wish add the column to. column : str The name of the column we wish to add. coltype : str SQlite datatype for the column we wish to add. See https://sqlite.org/datatype3.html Returns ----------- None """ if not sqlite_column_exists(conn, table, column): cur = conn.cursor() cur.execute(f"ALTER TABLE {table} ADD COLUMN {column} {coltype};") logger.info(f"{column} added to {table}") else: logger.info(f"{column} already exists in {table}")
[docs] def mpc_file_preprocessing(sql_filename): # pragma: no cover """ Function for performing pre-processing steps on the obs_sbn table in the MPC file format. The function strips the leading 'L' from the band in the obs_sbn file; adds a mjd_tai column with the observation time in MJD in the TAI scale; and checks the topocentricDist, heliocentricDist and phaseAngle are present (renaming columns if need be). Parameters ----------- sql_filename : str Filepath to the local SQL database. Returns ----------- None """ conn = sqlite3.connect(sql_filename) cursor = conn.cursor() # Strip the leading L that is used for some of the observations in the MPC file to ensure compatibility with adler cursor.execute("CREATE INDEX IF NOT EXISTS idx_obs_sbn_band ON obs_sbn(band);") cursor.execute("UPDATE obs_sbn SET band=substr(band, 2) WHERE band LIKE 'L%';") conn.commit() logger.warning(f"Leading 'L' stripped from band names") # Add MJD TAI column if sqlite_column_exists(conn, "obs_sbn", "mjd_tai"): logger.info(f"mjd_tai column already in file") else: add_column_if_not_exists(conn, "obs_sbn", "mjd_tai", "REAL") cursor.execute(f"SELECT rowid, mjd_utc FROM obs_sbn;") rows = cursor.fetchall() rowids = np.array([r[0] for r in rows]) mjd_utc_vals = np.array([r[1] for r in rows], dtype=float) mjd_tai_vals = convertTime( mjd_utc_vals, input_fmt="mjd", input_scale="utc", output_fmt="mjd", output_scale="tai" ) data_to_update = list(zip(mjd_tai_vals.tolist(), rowids.tolist())) cursor.executemany(f"UPDATE obs_sbn SET mjd_tai = ? WHERE rowid = ?;", data_to_update) conn.commit() logger.info(f"Added mjd_tai column to obs_sbn") if ( sqlite_column_exists(conn, "obs_sbn", "heliocentricDist") and sqlite_column_exists(conn, "obs_sbn", "topocentricDist") and sqlite_column_exists(conn, "obs_sbn", "phaseAngle") ): logger.info(f"heliocentricDist, topocentricDist and phaseAngle information exist in obs_sbn already.") else: # Check if columns with alternative names exist: # Once we fix the light travel time considerations this won't be technically wrong if sqlite_column_exists( conn, "obs_sbn", "r" ): # heliocentricDist (without ltt corretion) is called r in some versions of this file cursor.execute("ALTER TABLE obs_sbn RENAME COLUMN r TO heliocentricDist") conn.commit() logger.warning( "Column r renamed to heliocentricDist in obs_sbn. Be wary of light travel time as this may not have been accounted for yet" ) if sqlite_column_exists( conn, "obs_sbn", "delta" ): # topocentricDist (without ltt corretion) is called delta in some versions of this file cursor.execute("ALTER TABLE obs_sbn RENAME COLUMN delta TO topocentricDist") conn.commit() logger.warning( "Column delta renamed to topocentricDist in obs_sbn. Be wary of light travel time as this may not have been accounted for yet" ) if sqlite_column_exists( conn, "obs_sbn", "alpha" ): # phaseAngle is called alpha in JPL Horizons and may not have been changed in the file cursor.execute("ALTER TABLE obs_sbn RENAME COLUMN alpha TO phaseAngle") conn.commit() logger.warning( "Column alpha renamed to phaseAngle in obs_sbn. Be wary of light travel time as this may not have been accounted for yet" )
[docs] def flux_to_magnitude(flux, flux_err=np.nan): """Converts a flux measurement (with units of nanoJanskys) and its associated error into AB magnitudes. If no flux error is provided, the returned magnitude error will be NaN. Parameters ----------- flux : float or astropy.units.Quantity Flux value in nanoJanskys (can be specified with Astropy units of nanoJanskys (u.nJy). flux_err : float or astropy.units.Quantity, optional Flux error with units of nanoJanskys (u.nJy). Default is np.nan (dimensionless), in which case the magnitude error will be returned as NaN. Returns ----------- magnitude : float The flux converted into AB magnitude (unitless scalar). magnitude_err : float The propagated uncertainty in AB magnitude (unitless scalar). Returns NaN if flux_err is not provided. """ # TODO Handle the masked arrays better here # (ideally I think we want to keep magnitude as a masked array rather than making magErr non-masked) magnitude = flux.to(u.ABmag).value magnitude_err = ((2.5 / np.log(10)) * (flux_err / flux)).value return magnitude, magnitude_err
[docs] def generate_summary_csvs( sql_filepath, output_file_root, output_cols=["ssObjectId"], filter_list=["u", "g", "r", "i", "z", "y"] ): """ Function for generating the CSV files that summarise the contents of the outlier detection SQLite database. Parameters ----------- sql_filepath : str Filepath to the SQLite database output_file_root : str Desired path for the output file. Default in example notebook is f"{output_dir}/{planetoid.AdlerData.modelId}". output_cols : list of str, optional List of columns desired for output file. Default: ["ssObjectId"]. If default chosen, only DISTINCT ssObjectIds will be written out. filter_list : list of str, optional A comma-separated list of the filters of interest. """ logger.info("Generating lists of objects of interest") con = sqlite3.connect(sql_filepath) if output_cols == ["ssObjectId"]: logger.info( f"Only ssObjectId selected for summary output, Adler will use 'SELECT DISTINCT ssObjectId' to return the unique ssObjectIds in AdlerSourceFlags." ) output_cols_sql = "DISTINCT ssObjectId" else: output_cols_sql = ", ".join(output_cols) # Simple magnitude difference mag_diff_sql_query = f""" SELECT {output_cols_sql} FROM AdlerSourceFlags WHERE mag_diff!=0 """ mag_diff_output = f"{output_file_root}_outliers.csv" mag_diff_df = pd.read_sql_query(mag_diff_sql_query, con) if len(mag_diff_df) == 0: logger.info(f"No outliers found, {mag_diff_output} not generated") else: mag_diff_df.to_csv(mag_diff_output, index=False) logger.info(f"Output written to {mag_diff_output}") # Outliers in sigma-space std_diff_sql_query = f""" SELECT {output_cols_sql} FROM AdlerSourceFlags WHERE std_diff!=0 """ std_diff_output = f"{output_file_root}_std_outliers.csv" std_diff_df = pd.read_sql_query(std_diff_sql_query, con) if len(std_diff_df) == 0: logger.info(f"No outliers found, {std_diff_output} not generated") else: std_diff_df.to_csv(std_diff_output, index=False) logger.info(f"Output written to {std_diff_output}") # Sustained outliers sus_outlier_sql_condition = " OR ".join(f"{filt}_sustained_outliers IS NOT NULL" for filt in filter_list) sus_outlier_sql_query = f"SELECT ssObjectId FROM AdlerData WHERE {sus_outlier_sql_condition}" sus_outlier_output = f"{output_file_root}_sustained_outliers.csv" sus_outlier_df = pd.read_sql_query(sus_outlier_sql_query, con) if len(sus_outlier_df) == 0: logger.info(f"No outliers found, {sus_outlier_output} not generated") else: sus_outlier_df.to_csv(sus_outlier_output, index=False) logger.info(f"Output written to {sus_outlier_output}") con.close()
[docs] def get_tap_service_api(rsp_tap_path, api_token_path): # pragma: no cover """Returns a pyvo.dal.TAPService object linked to the RSP if provided with the desired API path and valid API token. For use with querying the RSP remotely. Parameters ----------- rsp_tap_path : str End of the API url, 'ssotap' for DP0.3, 'tap' for DP1. Set by schema choice in AdlerPlanetoid. api_token_path : str User-provided path to their RSP TAP API token. If specified as, e.g., '~/.rsp_tap.token' this will be expanded by the code. Returns ----------- rsp_tap_service : pyvo.dal.TAPService TAPService object linked to the RSP. """ RSP_TAP_SERVICE = f"https://data.lsst.cloud/api/{rsp_tap_path}" expanded_path = os.path.expanduser( api_token_path ) # Expands '~' to full path if included in user-provided path with open(expanded_path, "r") as f: token_str = f.readline() token_str = token_str.replace("\n", "") # tap_session = requests.Session() # tap_session.headers['Authorization'] = f"Bearer {token_str}" # rsp_tap_service = pyvo.dal.TAPService(RSP_TAP_SERVICE, session=tap_session) cred = pyvo.auth.CredentialStore() cred.set_password("x-oauth-basic", token_str) credential = cred.get("ivo://ivoa.net/sso#BasicAA") rsp_tap_service = pyvo.dal.TAPService(RSP_TAP_SERVICE, session=credential) return rsp_tap_service