Source code for adler.objectdata.AdlerData

import os
import sqlite3
import logging
import re
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
from astropy.time import Time

[docs] FILTER_DEPENDENT_KEYS = [ "phaseAngle_min", "phaseAngle_range", "observationTime_max", "nobs", "arc", "n_outliers", "n_std_outliers", "sustained_outliers", ]
[docs] PHASE_MODEL_DEPENDENT_KEYS = [ "H", "H_err", "phase_parameter_1", "phase_parameter_1_err", "phase_parameter_2", "phase_parameter_2_err", ]
[docs] VALID_PHASE_MODELS = ["HG", "HG1G2", "HG12", "HG12_Pen16", "LinearPhaseFunc"]
[docs] AVG_MAG_MODEL_DEPENDENT_KEYS = [ "avg_mag", "std_mag", ]
[docs] VALID_AVG_MAG_MODELS = ["median", "mean"]
[docs] ALL_FILTER_LIST = ["u", "g", "r", "i", "z", "y"]
[docs] VALID_MODELS = sorted( VALID_PHASE_MODELS + VALID_AVG_MAG_MODELS, key=len, reverse=True ) # sorted to avoid partial matches when using _get_model_name
[docs] logger = logging.getLogger(__name__)
# Ensure that numpy dtypes correctly map to SQL types sqlite3.register_adapter(np.float64, float) sqlite3.register_adapter(np.float32, float) sqlite3.register_adapter(np.int64, int) sqlite3.register_adapter(np.int32, int) @dataclass
[docs] class AdlerData: """ Class for storing Adler-calculated values. Attributes: ----------- ssObjectId : str ssObjectId of the object of interest. filter_list : list of str List of filters under investigation. modelId : str, optional modelId for the model of interest that has been or is to be computed. Default: Empty str updatedMJD : float, optional Timestamp (in MJD) of the time that this AdlerData object was initialized. filter_dependent_values : list of FilterDependentAdler objects, optional List of FilterDependentAdler objects containing filter-dependent data in order of filter_list. Default empty list. """
[docs] ssObjectId: str
[docs] filter_list: list
[docs] modelId: str = ""
[docs] updatedMJD: float = np.nan
[docs] filter_dependent_values: list = field(default_factory=list)
[docs] def __post_init__(self): """This runs post-initialisation and creates the class attribute where one dimension is "filters" to ensure the array has the correct size. This makes population a little easier. We also generate the current MJD timestamp to record the time this AdlerData object was initialized. """ # note that we don't do the same for model-dependent values as we don't know a priori how many models the user wishes # to calculate, but we do know how many filters the AdlerPlanetoid object was generated with self.filter_dependent_values = [FilterDependentAdler(filter_name) for filter_name in self.filter_list] self.updatedMJD = Time.now().mjd
[docs] def _MJD_update(self): """ Function for updating the updatedMJD value stored in AdlerData. This should be called whenever an update is made to AdlerData so that it is clear what is the most up-to-date version of AdlerData. """ self.updatedMJD = Time.now().mjd
[docs] def set_modelId(self, model_name, end_mjd, data_timespan, n_new_nights): """ Function for setting the modelId parameter. Parameters ----------- model_name : str The model name for the given model calculated. One of "HG", "HG1G2", "HG12", "HG12_Pen16", "LinearPhaseFunc", "median", "mean". end_mjd : float The MJD set as the maximum MJD to consider in the model. data_timespan : float The number of nights of data that is considered for the given model. n_new_nights : float The number of nights of data that are considered as "new observations" in calculating outliers. """ # N.B. double underscore is intentional to provide a point to break this string if searching for the model_name after generation (as HG12_Pen16 is a valid model name with a single underscore) self.modelId = f"{model_name}_{end_mjd:.1f}_{data_timespan}n_{n_new_nights}n" self._MJD_update()
[docs] def populate_filter_dependent_parameters(self, filter_name, **kwargs): """Convenience method to correctly populate the filter-dependent parameters for a given filter. Only the supplied arguments to the method will be updated, allowing for only some values to be populated if desired. filter_name : str The one-letter name of the filter of interest. **kwargs : FilterDependentAdler attributes The attribute names of the parameters you wish to update. See docs for FilterDependentAdler class for definitions of each attribute. Valid keyword arguments are: model_name, phaseAngle_min, phaseAngle_range, observationTime_max, arc, nobs, n_outliers, n_std_outliers, sustained_outliers. """ # make sure the supplied filter is in the filter list try: filter_index = self.filter_list.index(filter_name) except ValueError: logger.error("ValueError: Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) raise ValueError("Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) # update the value if it's in **kwargs for filter_key in FILTER_DEPENDENT_KEYS: if kwargs.get(filter_key): setattr(self.filter_dependent_values[filter_index], filter_key, kwargs.get(filter_key)) self._MJD_update()
[docs] def populate_phase_parameters(self, filter_name, **kwargs): """Convenience method to correctly populate phase curve parameters for a given filter and model. Only the supplied arguments to the method will be updated, allowing for only some values to be populated if desired. This method will automatically populate filter-dependent parameters also. Parameters ----------- filter_name : str The one-letter name of the filter in which the phase curve was calculated. **kwargs : FilterDependentAdler and PhaseModelDependentAdler attributes The attribute names of the parameters you wish to update. See docs for FilterDependentAdler and PhaseModelDependentAdler classes for definitions of each attribute. Valid keyword arguments are: model_name, phaseAngle_min, phaseAngle_range, observationTime_max, arc, nobs, n_outliers, n_std_outliers, sustained_outliers, H, H_err, phase_parameter_1, phase_parameter_1_err, phase_parameter_2, phase_parameter_2_err. Note that to update any of the model-dependent parameters (H, H_err, etc.), you WILL need to supply a model_name. """ # make sure the supplied filter is in the filter list try: filter_index = self.filter_list.index(filter_name) except ValueError: logger.error("ValueError: Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) raise ValueError("Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) # populate the filter dependent parameters self.populate_filter_dependent_parameters(filter_name, **kwargs) # if model-dependent parameters exist without a model name, return an error if not kwargs.get("model_name") and any(name in kwargs for name in PHASE_MODEL_DEPENDENT_KEYS): logger.error("NameError: No model name given. Cannot update model-specific phase parameters.") raise NameError("No model name given. Cannot update model-specific phase parameters.") # if no model_name is supplied, just end here # else, if the model does not exist for this filter, create it if not kwargs.get("model_name"): return elif kwargs.get("model_name") != self.filter_dependent_values[filter_index].model_name: logger.warning( f"Input model name {kwargs.get('model_name')} does not match model name {self.filter_dependent_values[filter_index].model_name} in AdlerData. Parameters will be overwritten." ) self.filter_dependent_values[filter_index].model_name = kwargs.get("model_name") self.filter_dependent_values[filter_index].model_dependent_values = PhaseModelDependentAdler( filter_name, kwargs.get("model_name") ) # update the value if it's in **kwargs for model_key in PHASE_MODEL_DEPENDENT_KEYS: if model_key in kwargs: setattr( self.filter_dependent_values[filter_index].model_dependent_values, model_key, kwargs.get(model_key), ) self._MJD_update()
[docs] def populate_avg_mag_parameters(self, filter_name, **kwargs): """Convenience method to correctly populate average magnitude model parameters for a given filter and model. Only the supplied arguments to the method will be updated, allowing for only some values to be populated if desired. This method will automatically populate filter-dependent parameters also. Parameters ----------- filter_name : str The one-letter name of the filter in which the model was calculated. **kwargs : FilterDependentAdler and AvgMagModelDependentAdler attributes The attribute names of the parameters you wish to update. See docs for FilterDependentAdler and AvgMagModelDependentAdler classes for definitions of each attribute. Valid keyword arguments are: model_name, phaseAngle_min, phaseAngle_range, observationTime_max, arc, nobs, n_outliers, n_std_outliers, sustained_outliers, avg_mag, std_mag. Note that to update any of the model-dependent parameters (avg_mag, std_mag), you WILL need to supply a model_name. """ # make sure the supplied filter is in the filter list try: filter_index = self.filter_list.index(filter_name) except ValueError: logger.error("ValueError: Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) raise ValueError("Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) # populate the filter dependent parameters self.populate_filter_dependent_parameters(filter_name, **kwargs) # if model-dependent parameters exist without a model name, return an error if not kwargs.get("model_name") and any(name in kwargs for name in AVG_MAG_MODEL_DEPENDENT_KEYS): logger.error( "NameError: No model name given. Cannot update model-specific average magnitude parameters." ) raise NameError("No model name given. Cannot update model-specific average magnitude parameters.") # if no model_name is supplied, just end here # else, if the model does not exist for this filter, create it if not kwargs.get("model_name"): return elif kwargs.get("model_name") != self.filter_dependent_values[filter_index].model_name: logger.warning( f"Input model name {kwargs.get('model_name')} does not match model name {self.filter_dependent_values[filter_index].model_name} in AdlerData. Parameters will be overwritten." ) self.filter_dependent_values[filter_index].model_name = kwargs.get("model_name") self.filter_dependent_values[filter_index].model_dependent_values = AvgMagModelDependentAdler( filter_name, kwargs.get("model_name") ) # update the value if it's in **kwargs for model_key in AVG_MAG_MODEL_DEPENDENT_KEYS: if model_key in kwargs: setattr( self.filter_dependent_values[filter_index].model_dependent_values, model_key, kwargs.get(model_key), ) self._MJD_update()
[docs] def populate_source_flags(self, filter_name, modelId, df, **kwargs): """Convenience method to correctly populate the source outlier flags for a given filter and modelId. Only the supplied arguments to the method will be updated, allowing for only some values to be populated if desired. Observations detected as outliers (source flags) must be supplied in a pandas.DataFrame. This method will automatically populate filter-dependent parameters also. Parameters ----------- filter_name : str The one-letter name of the filter in which the model was calculated. modelId : str modelId that the given outliers correspond to. This is used to check the supplied modelId from the user matches that stored in AdlerData. df : pandas.DataFrame DataFrame of the observations that are identified as outliers. Must contain columns diaSourceId, midPointMjdTai, mag_diff, std_diff. **kwargs : FilterDependentAdler and AvgMagModelDependentAdler attributes The attribute names of the parameters you wish to update. See docs for FilterDependentAdler classes for definitions of each attribute. Valid keyword arguments are: model_name, phaseAngle_min, phaseAngle_range, observationTime_max, arc, nobs, n_outliers, n_std_outliers, sustained_outliers. """ if modelId != self.modelId: logger.error( f"ValueError: modelId {modelId} does not match the modelId in AdlerData.modelId: {self.modelId}" ) raise ValueError( f"modelId {modelId} does not match the modelId in AdlerData.modelId: {self.modelId}" ) # make sure the supplied filter is in the filter list try: filter_index = self.filter_list.index(filter_name) except ValueError: logger.error("ValueError: Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) raise ValueError("Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) # populate the filter dependent parameters # Add n_outliers, n_std_outliers to kwargs so it populates FilterDependentAdler kwargs.update({"n_outliers": len(df.loc[df.mag_diff != 0])}) kwargs.update({"n_std_outliers": len(df.loc[df.std_diff != 0])}) self.populate_filter_dependent_parameters(filter_name, **kwargs) self.filter_dependent_values[filter_index].source_flags = ( AdlerSourceFlags.construct_source_flags_from_data_table(self.ssObjectId, filter_name, modelId, df) ) self._MJD_update()
[docs] def populate_from_database(self, filepath, modelId=None): """Populates the AdlerData object with information from the most recent timestamped entry for the ssObjectId in a given database. Parameters ----------- filepath : path-like object Filepath with the location of the output SQL database. Note that for now, we assume only one table with all the data. modelId : str, optional modelId for the model of interest that should be recovered. Default: None. """ if modelId: self.modelId = modelId con = self._get_database_connection(filepath) cursor = con.cursor() tbl_list = self._get_tables(con) # Ensure AdlerData is always the first table queried (thus populating modelId if not already specified) tbl_list.insert(0, tbl_list.pop(tbl_list.index("AdlerData"))) for tbl_name in tbl_list: logger.info(f"Populating information from {tbl_name}.") # Specific query required for AdlerSourceFlags whereas the other tables can follow the same style (as they are unique on ssObjectId) if tbl_name == "AdlerSourceFlags": source_flags_query = f"SELECT * FROM AdlerSourceFlags WHERE CAST(ssObjectId AS TEXT)='{self.ssObjectId}' and modelId='{self.modelId}'" cursor.execute(source_flags_query) rows = cursor.fetchall() columns = [desc[0] for desc in cursor.description] df = pd.DataFrame(rows, columns=columns) for filter_name in self.filter_list: _df = df.loc[df.filter_name == filter_name] self.populate_source_flags(filter_name, self.modelId, _df) else: if not modelId and tbl_name == "AdlerData": # If modelId isn't specified and we're querying AdlerData (i.e. the default first table to be queried), we take the most recent entry sql_query = f"""SELECT * from {tbl_name} WHERE CAST(ssObjectId AS TEXT)='{self.ssObjectId}' ORDER BY updatedMJD DESC LIMIT 1""" else: sql_query = f"""SELECT * from {tbl_name} WHERE CAST(ssObjectId AS TEXT)='{self.ssObjectId}' AND modelId='{self.modelId}' ORDER BY updatedMJD DESC LIMIT 1""" query_result = cursor.execute(sql_query) try: fetched_data_raw = query_result.fetchall()[0] except IndexError: logger.error( f"ValueError: No data found in table {tbl_name} in this database for the supplied ssObjectId/modelId." ) raise ValueError( f"ValueError: No data found in table {tbl_name} in this database for the supplied ssObjectId/modelId." ) fetched_data = [ np.nan if v is None else v for v in fetched_data_raw ] # replaces Nones with nans column_list = self._get_database_columns(con, tbl_name) row_data = dict(zip(column_list, fetched_data)) # Set modelId if not modelId: self.modelId = row_data["modelId"] self.updatedMJD = row_data["updatedMJD"] filter_bools = [ any((column_heading.startswith(filter + "_") for column_heading in column_list)) for filter in ALL_FILTER_LIST ] database_filter_list = [b for a, b in zip(filter_bools, ALL_FILTER_LIST) if a] if not all( [requested_filter in database_filter_list for requested_filter in self.filter_list] ): logger.error( "ValueError: Data does not exist for some of the requested filters in this database. Filters in database for this object: {}".format( database_filter_list ) ) raise ValueError( "Data does not exist for some of the requested filters in this database. Filters in database for this object: {}".format( database_filter_list ) ) for filter_name in self.filter_list: expected_filter_columns = [ filter_name + "_" + filter_key for filter_key in FILTER_DEPENDENT_KEYS ] filter_columns = [col for col in column_list if col in expected_filter_columns] filter_values = [row_data[col] for col in filter_columns] present_filter_columns = [col.strip(filter_name + "_") for col in filter_columns] filter_dependent_info = dict(zip(present_filter_columns, filter_values)) self.populate_filter_dependent_parameters(filter_name, **filter_dependent_info) model_name = self._get_model_name() if model_name in VALID_PHASE_MODELS: expected_model_columns = [ filter_name + "_" + model_name + "_" + model_key for model_key in PHASE_MODEL_DEPENDENT_KEYS ] model_columns = [col for col in column_list if col in expected_model_columns] model_values = [row_data[col] for col in model_columns] model_dependent_info = dict(zip(PHASE_MODEL_DEPENDENT_KEYS, model_values)) model_dependent_info["model_name"] = model_name self.populate_phase_parameters(filter_name, **model_dependent_info) elif model_name in VALID_AVG_MAG_MODELS: expected_model_columns = [ filter_name + "_" + model_name + "_" + model_key for model_key in AVG_MAG_MODEL_DEPENDENT_KEYS ] model_columns = [col for col in column_list if col in expected_model_columns] model_values = [row_data[col] for col in model_columns] model_dependent_info = dict(zip(AVG_MAG_MODEL_DEPENDENT_KEYS, model_values)) model_dependent_info["model_name"] = model_name self.populate_avg_mag_parameters(filter_name, **model_dependent_info) con.close()
[docs] def print_data(self): """Convenience method to clearly print the stored values.""" for f, filter_name in enumerate(self.filter_list): print("Filter: {}".format(filter_name)) print("Phase angle minimum: {}".format(self.filter_dependent_values[f].phaseAngle_min)) print("Phase angle range: {}".format(self.filter_dependent_values[f].phaseAngle_range)) print("Maximum observation time: {}".format(self.filter_dependent_values[f].observationTime_max)) print("Number of observations: {}".format(self.filter_dependent_values[f].nobs)) print("Arc: {}".format(self.filter_dependent_values[f].arc)) print("Number of outliers detected: {}".format(self.filter_dependent_values[f].n_outliers)) print( "Number of outliers in sigma-space detected: {}".format( self.filter_dependent_values[f].n_std_outliers ) ) print( "Magnitude change of sustained outliers: {}".format( self.filter_dependent_values[f].sustained_outliers ) ) model_name = self.filter_dependent_values[f].model_name print("Model: {}.".format(model_name)) if model_name in VALID_PHASE_MODELS: print("H: {}".format(self.filter_dependent_values[f].model_dependent_values.H)) print("H error: {}".format(self.filter_dependent_values[f].model_dependent_values.H_err)) print( "Phase parameter 1: {}".format( self.filter_dependent_values[f].model_dependent_values.phase_parameter_1 ) ) print( "Phase parameter 1 error: {}".format( self.filter_dependent_values[f].model_dependent_values.phase_parameter_1_err ) ) print( "Phase parameter 2: {}".format( self.filter_dependent_values[f].model_dependent_values.phase_parameter_2 ) ) print( "Phase parameter 2 error: {}".format( self.filter_dependent_values[f].model_dependent_values.phase_parameter_2_err ) ) elif model_name in VALID_AVG_MAG_MODELS: print( "Average magnitude {}".format( self.filter_dependent_values[f].model_dependent_values.avg_mag ) ) print( "Standard deviation of magnitudes {}".format( self.filter_dependent_values[f].model_dependent_values.std_mag ) ) else: logger.error( f"Invalid model name '{model_name}' provided. Model must be one of {VALID_PHASE_MODELS} or {VALID_AVG_MAG_MODELS}" ) raise ValueError( f"Invalid model name '{model_name}' provided. Model must be one of {VALID_PHASE_MODELS} or {VALID_AVG_MAG_MODELS}" ) print("\n")
[docs] def get_phase_parameters_in_filter(self, filter_name, model_name=None): """Convenience method to return the phase parameters in a specific filter and model. Parameters ----------- filter_name : str The filter of interest. model_name : str, optional The model name of the model of interest. If this is not supplied, the code will not return any model-dependent parameters. Default None. Returns ----------- output_obj : PhaseParameterOutput object Object containing phase curve parameters for the specified filter and model. """ try: filter_index = self.filter_list.index(filter_name) except ValueError: logger.error("ValueError: Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) raise ValueError("Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) output_obj = PhaseParameterOutput() output_obj.filter_name = filter_name output_obj.phaseAngle_min = self.filter_dependent_values[filter_index].phaseAngle_min output_obj.phaseAngle_range = self.filter_dependent_values[filter_index].phaseAngle_range output_obj.observationTime_max = self.filter_dependent_values[filter_index].observationTime_max output_obj.nobs = self.filter_dependent_values[filter_index].nobs output_obj.arc = self.filter_dependent_values[filter_index].arc output_obj.n_outliers = self.filter_dependent_values[filter_index].n_outliers output_obj.n_std_outliers = self.filter_dependent_values[filter_index].n_std_outliers output_obj.sustained_outliers = self.filter_dependent_values[filter_index].sustained_outliers if not model_name: logger.warning("No model name was specified. Returning non-model-dependent phase parameters.") print("No model name specified. Returning non-model-dependent phase parameters.") elif model_name == self.filter_dependent_values[filter_index].model_name: output_obj.model_name = model_name output_obj.H = self.filter_dependent_values[filter_index].model_dependent_values.H output_obj.H_err = self.filter_dependent_values[filter_index].model_dependent_values.H_err output_obj.phase_parameter_1 = self.filter_dependent_values[ filter_index ].model_dependent_values.phase_parameter_1 output_obj.phase_parameter_1_err = self.filter_dependent_values[ filter_index ].model_dependent_values.phase_parameter_1_err output_obj.phase_parameter_2 = self.filter_dependent_values[ filter_index ].model_dependent_values.phase_parameter_2 output_obj.phase_parameter_2_err = self.filter_dependent_values[ filter_index ].model_dependent_values.phase_parameter_2_err else: logger.error( f"Input model name {model_name} does not match model name {self.filter_dependent_values[filter_index].model_name} in AdlerData. Cannot get phase parameters for {model_name}." ) raise ValueError( f"Input model name {model_name} does not match model name {self.filter_dependent_values[filter_index].model_name} in AdlerData. Cannot get phase parameters for {model_name}." ) return output_obj
[docs] def get_avg_mag_parameters_in_filter(self, filter_name, model_name=None): """Convenience method to return the average magnitude parameters in a specific filter and model. Parameters ----------- filter_name : str The filter of interest. model_name : str, optional The model name of the model of interest. If this is not supplied, the code will not return any model-dependent parameters. Default None. Returns ----------- output_obj : AvgMagParameterOutput object Object containing average magnitude model parameters for the specified filter and model. """ try: filter_index = self.filter_list.index(filter_name) except ValueError: logger.error("ValueError: Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) raise ValueError("Filter {} does not exist in AdlerData.filter_list.".format(filter_name)) output_obj = AvgMagParameterOutput() output_obj.filter_name = filter_name output_obj.phaseAngle_min = self.filter_dependent_values[filter_index].phaseAngle_min output_obj.phaseAngle_range = self.filter_dependent_values[filter_index].phaseAngle_range output_obj.observationTime_max = self.filter_dependent_values[filter_index].observationTime_max output_obj.nobs = self.filter_dependent_values[filter_index].nobs output_obj.arc = self.filter_dependent_values[filter_index].arc output_obj.n_outliers = self.filter_dependent_values[filter_index].n_outliers output_obj.n_std_outliers = self.filter_dependent_values[filter_index].n_std_outliers output_obj.sustained_outliers = self.filter_dependent_values[filter_index].sustained_outliers if not model_name: logger.warning("No model name was specified. Returning non-model-dependent phase parameters.") print("No model name specified. Returning non-model-dependent phase parameters.") elif model_name == self.filter_dependent_values[filter_index].model_name: output_obj.model_name = model_name output_obj.avg_mag = self.filter_dependent_values[filter_index].model_dependent_values.avg_mag output_obj.std_mag = self.filter_dependent_values[filter_index].model_dependent_values.std_mag else: logger.error( f"Input model name {model_name} does not match model name {self.filter_dependent_values[filter_index].model_name} in AdlerData. Cannot get average magnitude parameters for {model_name}." ) raise ValueError( f"Input model name {model_name} does not match model name {self.filter_dependent_values[filter_index].model_name} in AdlerData. Cannot get average magnitude parameters for {model_name}." ) return output_obj
[docs] def _get_database_connection(self, filepath, table_name=None): """Returns the connection to the output SQL database, creating it and the given table if it does not exist. Parameters ----------- filepath : path-like object Filepath with the location of the output SQL database. table_name : str, optional Name of the table to create if it doesn't exist. (This replaces the create_new argument) Returns ---------- con : sqlite3 Connection object The connection to the output database. """ database_exists = os.path.isfile( filepath ) # check this FIRST as the next statement creates the db if it doesn't exist if not database_exists and table_name: # we need to make the table and a couple of starter columns con = sqlite3.connect(filepath) cur = con.cursor() cur.execute(f"CREATE TABLE {table_name}(ssObjectId PRIMARY KEY, modelId, updatedMJD REAL)") elif not database_exists and not table_name: logger.error("ValueError: Database cannot be found at given filepath.") raise ValueError("Database cannot be found at given filepath.") elif database_exists and not table_name: # If no table_name specified connect and return the connection con = sqlite3.connect(filepath) else: # If the database exists, connect to it con = sqlite3.connect(filepath) cur = con.cursor() # If the table doesn't exist (i.e. we're creating the table for writing, this will create it) cur.execute( f"CREATE TABLE IF NOT EXISTS {table_name}(ssObjectId PRIMARY KEY, modelId, updatedMJD REAL)" ) return con
[docs] def _get_row_data_and_columns(self, table_name): """Collects all of the data present in the AdlerData object as a list with a corresponding list of column names, in preparation for a row to be written to a SQL database in the given table_name. Returns ----------- table_name : str Name of the table that we want to collect data for in preparation for writing to the database. """ required_columns = ["ssObjectId", "modelId", "updatedMJD"] row_data = [self.ssObjectId, self.modelId, self.updatedMJD] for f, filter_name in enumerate(self.filter_list): if table_name == "AdlerData": columns_by_filter = [ "_".join([filter_name, filter_key]) for filter_key in FILTER_DEPENDENT_KEYS if "outliers" in filter_key ] data_by_filter = [ getattr(self.filter_dependent_values[f], filter_key) for filter_key in FILTER_DEPENDENT_KEYS if "outliers" in filter_key ] required_columns.extend(columns_by_filter) row_data.extend(data_by_filter) elif table_name == "FilterDependentAdler": columns_by_filter = [ "_".join([filter_name, filter_key]) for filter_key in FILTER_DEPENDENT_KEYS ] data_by_filter = [ getattr(self.filter_dependent_values[f], filter_key) for filter_key in FILTER_DEPENDENT_KEYS ] required_columns.extend(columns_by_filter) row_data.extend(data_by_filter) elif table_name == "PhaseModelDependentAdler": model_name = self.filter_dependent_values[f].model_name if model_name == "": logger.warning( f"No models calculated for filter {filter_name}, continuing to next filter" ) continue else: columns_by_model = [ "_".join([filter_name, model_name, model_key]) for model_key in PHASE_MODEL_DEPENDENT_KEYS ] data_by_model = [ getattr(self.filter_dependent_values[f].model_dependent_values, model_key) for model_key in PHASE_MODEL_DEPENDENT_KEYS ] required_columns.extend(columns_by_model) row_data.extend(data_by_model) elif table_name == "AvgMagModelDependentAdler": model_name = self.filter_dependent_values[f].model_name if model_name == "": logger.warning( f"No models calculated for filter {filter_name}, continuing to next filter" ) continue else: columns_by_model = [ "_".join([filter_name, model_name, model_key]) for model_key in AVG_MAG_MODEL_DEPENDENT_KEYS ] data_by_model = [ getattr(self.filter_dependent_values[f].model_dependent_values, model_key) for model_key in AVG_MAG_MODEL_DEPENDENT_KEYS ] required_columns.extend(columns_by_model) row_data.extend(data_by_model) else: logger.error( f"{table_name} must be one of [AdlerData, FilterDependentAlder, PhaseModelDependentAdler, AvgMagModelDependentAdler]" ) raise ValueError( ( f"{table_name} must be one of [AdlerData, FilterDependentAlder, PhaseModelDependentAdler, AvgMagModelDependentAdler]" ) ) return row_data, required_columns
[docs] def _get_database_columns(self, con, table_name): """Gets a list of the current columns in a given table in a SQL database. Parameters ----------- con : sqlite3 Connection object The connection to the output SQL database. table_name : str The name of the relevant table in the database. Returns ---------- list of str List of current columns existing in the table. """ cur = con.cursor() cur.execute(f"""SELECT * from {table_name} where 1=0""") return [d[0] for d in cur.description]
[docs] def _get_tables(self, con): """Gets a list of the current tables in a SQL database. Parameters ----------- con : sqlite3 Connection object The connection to the output SQL database. Returns ---------- list of str List of current tables existing in the database. """ cur = con.cursor() cur.execute(f"SELECT tbl_name FROM sqlite_schema WHERE type='table'") res = cur.fetchall() return [r[0] for r in res]
[docs] def _ensure_columns(self, con, table_name, current_columns, required_columns): """Creates new columns in a given table of a SQL database as needed by checking the list of current columns against a list of required columns. Parameters ----------- con : sqlite3 Connection object The connection to the output SQL database. table_name : str The name of the relevant table in the database. current_columns : list of str A list of the columns already existing in the database table. required_columns : list of str A list of the columns needed in the database table. """ cur = con.cursor() for column_name in required_columns: if column_name not in current_columns: cur.execute(f"""ALTER TABLE {table_name} ADD COLUMN {column_name}""")
[docs] def _write_table(self, filepath, table_name): """ Function for writing information to a given table in the given database. Connects to the database and creates table if necessary, gathers data from AdlerData, ensures required columns are present and writes to the database. Parameters ----------- filepath : path-like object Filepath with the location of the output SQL database. table_name : str Name of table to write to. Must be one of AdlerData, FilterDependentAdler, PhaseModelDependentAdler, AvgMagModelDependentAdler """ con = self._get_database_connection(filepath, table_name=table_name) row_data, required_columns = self._get_row_data_and_columns(table_name=table_name) current_columns = self._get_database_columns(con, table_name) self._ensure_columns(con, table_name, current_columns, required_columns) column_names = ",".join(required_columns) column_spaces = ",".join(["?"] * len(required_columns)) update_clause = ", ".join([f"{col} = excluded.{col}" for col in required_columns[1:]]) sql_command = f""" INSERT INTO {table_name} ({column_names}) VALUES ({column_spaces}) ON CONFLICT(ssObjectId) DO UPDATE SET {update_clause}; """ cur = con.cursor() cur.execute(sql_command, row_data) con.commit() con.close()
[docs] def _get_model_name(self): """Returns the model_name by parsing the modelId Returns ----------- Name of the model specified in self.modelId """ for model in VALID_MODELS: if self.modelId.startswith(model + "_"): return model logger.error(f"Unknown model in string: {self.modelId}") raise ValueError(f"Unknown model in string: {self.modelId}")
[docs] def write_to_database(self, filepath, write_model_data=False): """Writes all of the relevant data contained within the AdlerData object to a SQLite database. Parameters ----------- filepath : path-like object Filepath with the location of the output SQL database. write_model_data : Boolean, optional A flag to set whether to write out specific model data to AdlerData. Default: False. """ # Write default AdlerData information self._write_table(filepath=filepath, table_name="AdlerData") logger.info(f"Top-level information written to AdlerData table") if write_model_data: # Write FilterDependentAdler data self._write_table(filepath=filepath, table_name="FilterDependentAdler") logger.info(f"Filter-specific information written to FilterDependentAdler table") model_name = self._get_model_name() if model_name in VALID_PHASE_MODELS: # Write PhaseModelDependentAdler data self._write_table(filepath=filepath, table_name="PhaseModelDependentAdler") logger.info( f"Phase Model-specific information for model {model_name} written to PhaseModelDependentAdler table" ) elif model_name in VALID_AVG_MAG_MODELS: # Write AvgMagModelDependentAdler data self._write_table(filepath=filepath, table_name="AvgMagModelDependentAdler") logger.info( f"Average Magnitude model-specific information for model {model_name} written to AvgMagModelDependentAdler table" ) # Write AdlerSourceFlags data for f, filter_name in enumerate(self.filter_list): filter_source_flags = self.filter_dependent_values[f].source_flags if filter_source_flags: filter_source_flags.write_flags_to_database(filepath=filepath) logger.info( f"Source flags information written to AdlerSourceFlags for filter '{filter_name}'" ) else: logger.info(f"No source flags for filter '{filter_name}', continuing to next filter")
@dataclass
[docs] class PhaseModelDependentAdler: """Dataclass containing phase-model-dependent values generated by Adler. Note that NaN indicates a value that has not yet been populated. Attributes: ----------- filter_name : str The filter for which these values are calculated. model_name : str The phase model for which these values were calculated. Example: "HG", "HG1G2", "linear". H : float, optional The absolute magnitude. Default NaN. H_err : float, optional Error in absolute magnitude. Default NaN. phase_parameter_1 : float, optional The first parameter of the phase model. May be the only parameter. For example, G in the HG model. Default NaN. phase_parameter_1_err : float, optional The error on the first parameter of the phase model. Default NaN. phase_parameter_2 : float, optional The second parameter of the phase model. May not exist for this model. Default NaN. phase_parameter_2_err : float, optional The error on the second parameter of the phase model. Default NaN. """
[docs] filter_name: str
[docs] model_name: str
[docs] H: float = np.nan
[docs] H_err: float = np.nan
[docs] phase_parameter_1: float = np.nan
[docs] phase_parameter_1_err: float = np.nan
[docs] phase_parameter_2: float = np.nan
[docs] phase_parameter_2_err: float = np.nan
@dataclass
[docs] class AvgMagModelDependentAdler: """Dataclass containing model-dependent values for the simple average magnitude model pgenerated by Adler. Note that NaN indicates a value that has not yet been populated. Attributes: ----------- filter_name : str The filter for which these values are calculated. model_name : str The model for which these values were calculated. Example: "median", "mean". avg_mag : float, optional Average magnitude of the measurements used to calculate the model. Default NaN. std_mag : float, optional Standard deviation of the measurements used to calculate the model. Default NaN. """
[docs] filter_name: str
[docs] model_name: str
[docs] avg_mag: float = np.nan
[docs] std_mag: float = np.nan
[docs] class PhaseParameterOutput: """Empty convenience class so that the output of AdlerData.get_phase_parameters_in_filter is an object.""" pass
[docs] class AvgMagParameterOutput: """Empty convenience class so that the output of AdlerData.get_avg_mag_parameters_in_filter is an object.""" pass
@dataclass
[docs] class AdlerSourceFlags: """ Class for storing Adler-determined outlier information. Attributes: ----------- ssObjectId : str ssObjectId of the object of interest. filter_name : str Filter the observation was taken in. modelId : str modelId for the model that the outliers are compared to. n_outliers : int Number of observations identified as outliers. n_std_outliers : int, optional Number of outliers detected for the given model in sigma space. diaSourceId : array_like of ints or strs Unique identifier of the observation. midPointMjdTai : array_like of floats Observation timestamps. mag_diff : array_like of floats Differences in (reduced) magnitude between the observations and the model. std_diff : array_like of floats Deviation (in terms of the observations uncertainties) between the observations and the model. """
[docs] ssObjectId: str
[docs] filter_name: str
[docs] modelId: str
[docs] n_outliers: int
[docs] n_std_outliers: int
[docs] diaSourceId: np.ndarray = field(default_factory=lambda: np.zeros(0))
[docs] midPointMjdTai: np.ndarray = field(default_factory=lambda: np.zeros(0))
[docs] mag_diff: np.ndarray = field(default_factory=lambda: np.zeros(0))
[docs] std_diff: np.ndarray = field(default_factory=lambda: np.zeros(0))
@classmethod
[docs] def construct_source_flags_from_data_table(cls, ssObjectId, filter_name, modelId, df): """Method for constructing the AdlerSourceFlags object from a dataframe. Parameters ----------- ssObjectId : str ssObjectId of the object of interest. filter_name : str Filter the observation was taken in. modelId : str modelId for the model that the outliers are compared to. df : pandas.DataFrame DataFrame of the observations that are identified as outliers. Must contain columns diaSourceId, midPointMjdTai, mag_diff, std_diff Returns ----------- AdlerSourceFlags object Object containing the source flags information on outliers identified. """ obs_dict = {"ssObjectId": ssObjectId, "filter_name": filter_name, "modelId": modelId} obs_dict.update( df.loc[:, ["diaSourceId", "midPointMjdTai", "mag_diff", "std_diff"]].to_dict(orient="list") ) obs_dict.update({"n_outliers": len(df.loc[df.mag_diff != 0])}) obs_dict.update({"n_std_outliers": len(df.loc[df.std_diff != 0])}) return cls(**obs_dict)
[docs] def _get_database_connection(self, filepath, create_new=False): """Returns the connection to the output SQL database, creating it and the AdlerSource Flags table if it does not exist. Parameters ----------- filepath : path-like object Filepath with the location of the output SQL database. create_new : Boolean Whether to create the database if it doesn't already exist. Default is False. Returns ---------- con : sqlite3 Connection object The connection to the output database. """ database_exists = os.path.isfile( filepath ) # check this FIRST as the next statement creates the db if it doesn't exist if not database_exists and create_new: # we need to make the table and a couple of starter columns con = sqlite3.connect(filepath) cur = con.cursor() cur.execute( "CREATE TABLE AdlerSourceFlags(ssObjectId, filter_name, modelId, diaSourceId, midPointMjdTai, mag_diff, std_diff)" ) elif not database_exists and not create_new: logger.error("ValueError: Database cannot be found at given filepath.") raise ValueError("Database cannot be found at given filepath.") else: con = sqlite3.connect(filepath) cur = con.cursor() # Create the table if it doesn't exist (in case database was created previously without this table) cur.execute( "CREATE TABLE IF NOT EXISTS AdlerSourceFlags(ssObjectId, filter_name, modelId, diaSourceId, midPointMjdTai, mag_diff, std_diff)" ) return con
[docs] def write_flags_to_database(self, filepath, table_name="AdlerSourceFlags"): """ Writes the information from AdlerSourceFlags to the given database. Parameters ----------- filepath : path-like object Path to the output database. table_name : str, optional Name of the table to write the flags to. Default: AdlerSourceFlags. """ con = self._get_database_connection(filepath, create_new=True) required_columns = [ "ssObjectId", "filter_name", "modelId", "diaSourceId", "midPointMjdTai", "mag_diff", "std_diff", ] column_names = ",".join(required_columns) column_spaces = ",".join(["?"] * len(required_columns)) sql_command = f""" INSERT INTO {table_name} ({column_names}) VALUES ({column_spaces}); """ row_data = list( zip( [self.ssObjectId] * len(self.diaSourceId), [self.filter_name] * len(self.diaSourceId), [self.modelId] * len(self.diaSourceId), self.diaSourceId, self.midPointMjdTai, self.mag_diff, self.std_diff, ) ) cur = con.cursor() cur.executemany(sql_command, row_data) con.commit() con.close()
@dataclass
[docs] class FilterDependentAdler: """Dataclass containing filter-dependent values generated by Adler. Note that NaN indicates a value that has not yet been populated. Attributes: ----------- filter_name : str The filter for which these values are calculated. phaseAngle_min : float, optional Minimum phase angle of observations used in fitting model (degrees). phaseAngle_range : float, optional Max minus min phase angle range of observations used in fitting model (degrees). observationTime_max : float, optional Maximum time of observation used in fitting the model (in modified Julian day). arc: float, optional Observational arc used to fit model (days). nobs : int, optional Number of observations used in fitting model. n_outliers : int, optional Number of outliers detected for the given model. n_std_outliers : int, optional Number of outliers detected for the given model in sigma space. sustained_outliers : float, optional Magnitude difference between old and new observations. model_name : str, optional Name of the model computed. Must be one of "HG", "HG1G2", "HG12", "HG12_Pen16", "LinearPhaseFunc", "median", "mean". model_dependent_values : PhaseModelDependentAdler or AvgMagModelDependentAdler object, optional PhaseModelDependentAdler or AvgMagModelDependentAdler object storing phase-model or average-magnitude-model parameters for the given model. Default: None. source_flags : AdlerSourceFlags, optional AdlerSourceFlags object storing the information on specific observations identified as outliers compared to the given model. """
[docs] filter_name: str
[docs] phaseAngle_min: float = np.nan
[docs] phaseAngle_range: float = np.nan
[docs] observationTime_max: float = np.nan
[docs] arc: float = np.nan
[docs] nobs: int = 0
[docs] n_outliers: int = 0
[docs] n_std_outliers: int = 0
[docs] sustained_outliers: float = np.nan
[docs] model_name: str = ""
[docs] model_dependent_values: Optional[PhaseModelDependentAdler | AvgMagModelDependentAdler] = None
[docs] source_flags: Optional[AdlerSourceFlags] = None