from lsst.rsp import get_tap_service
import pandas as pd
import numpy as np
import logging
import json
import astropy.units as u
from astropy.table import Table
from adler.objectdata.Observations import Observations
from adler.objectdata.MPCORB import MPCORB
from adler.objectdata.SSObject import SSObject
from adler.objectdata.AdlerData import AdlerData
from adler.objectdata.objectdata_utilities import get_data_table, flux_to_magnitude, get_tap_service_api
[docs]
logger = logging.getLogger(__name__)
# Convenient dict for setting which columns to include in SQL query given schema and desired flux flag
# TODO better handling of None case (this is probably bad Python)
[docs]
SCHEMA_CONFIG_DICT = {
None: {None: dict(fluxmag_column="mag", fluxmag_err_column="magErr", ra_column="ra", dec_column="dec")},
"dp03_catalogs_10yr": {
None: dict(fluxmag_column="mag", fluxmag_err_column="magErr", ra_column="ra", dec_column="dec")
},
"dp1": {
"apFlux": dict(
fluxmag_column="apFlux", fluxmag_err_column="apFluxErr", ra_column="ra", dec_column="dec"
),
"trailFlux": dict(
fluxmag_column="trailFlux",
fluxmag_err_column="psfFluxErr",
ra_column="trailRa",
dec_column="trailDec",
),
"psfFlux": dict(
fluxmag_column="psfFlux",
fluxmag_err_column="psfFluxErr",
ra_column="ra",
dec_column="dec",
),
},
}
[docs]
RSP_TAP_CONFIG_DICT = {"dp03_catalogs_10yr": "ssotap", "dp1": "tap"}
[docs]
class AdlerPlanetoid:
"""AdlerPlanetoid class. Contains the Observations, MPCORB and SSObject dataclass objects."""
def __init__(
self,
ssObjectId,
filter_list,
date_range,
observations_by_filter,
mpcorb,
ssobject,
adler_data,
):
"""Initialises the AdlerPlanetoid object.
Attributes
-----------
ssObjectId : str
ssObjectId of the object of interest.
filter_list : list of str
A comma-separated list of the filters of interest.
date_range : list of float
The minimum and maximum dates of the desired observations.
observations_by_filter : list of Observations objects
A list of Observations objects holding joined DIASource/SSSource observations of the planetoid specified by ssObjectId. Each item in the list holds observations of a different filter, in the order specified by filter_list.
mpcorb : MPCORB object
An MPCORB object, holding the MPCORB database information of the planetoid specified by ssObjectId.
ssobject : SSObject object
An SSObject object, holding the SSObject database information of the planetoid specified by ssObjectId.
adler_data : AdlerData object
An empty AdlerData object ready to store Adler-calculated values.
"""
[docs]
self.ssObjectId = ssObjectId
[docs]
self.filter_list = filter_list
[docs]
self.date_range = date_range
[docs]
self.observations_by_filter = observations_by_filter
[docs]
self.SSObject = ssobject
[docs]
self.AdlerData = adler_data
@classmethod
[docs]
def construct_from_SQL(
cls,
ssObjectId,
sql_filename,
filter_list=["u", "g", "r", "i", "z", "y"],
date_range=[60000.0, 67300.0],
schema=None,
flux_flag=None,
):
"""Custom constructor which builds the AdlerPlanetoid object and the associated Observations, MPCORB and SSObject objects from
a local SQL database. Mostly used for testing.
Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.
sql_filename : str
Filepath to the local SQL database.
filter_list : list of str
A comma-separated list of the filters of interest.
date_range : list of float
The minimum and maximum dates of the desired observations.
schema : str or None
Schema/database from which to select the data tables. Can be None. Default is currently "dp03_catalogs_10yr" for testing using DP0.3.
flux_flag : str or None
Name of the flux column to select from DP1 DiaSource table. Determines FluxErr and ra/dec columns to select also. Default is None (selects mag/magErr/ra/dec for DP0.3)
"""
if len(date_range) != 2:
logger.error("ValueError: date_range attribute must be of length 2.")
raise ValueError("date_range attribute must be of length 2.")
observations_by_filter = cls.populate_observations(
cls,
ssObjectId,
filter_list,
date_range,
sql_filename=sql_filename,
schema=schema,
flux_flag=flux_flag,
)
if len(observations_by_filter) == 0:
logger.error(
"No observations found for this object in the given filter(s). Check SSOID and try again."
)
raise Exception(
"No observations found for this object in the given filter(s). Check SSOID and try again."
)
if len(filter_list) > len(observations_by_filter):
logger.info(
"Not all specified filters have observations. Recalculating filter list based on past observations."
)
filter_list = [obs_object.filter_name for obs_object in observations_by_filter]
logger.info("New filter list is: {}".format(filter_list))
mpcorb = cls.populate_MPCORB(cls, ssObjectId, sql_filename=sql_filename, schema=schema)
ssobject = cls.populate_SSObject(
cls, ssObjectId, filter_list, sql_filename=sql_filename, schema=schema
)
adler_data = AdlerData(ssObjectId, filter_list)
return cls(
ssObjectId,
filter_list,
date_range,
observations_by_filter,
mpcorb,
ssobject,
adler_data,
)
@classmethod
[docs]
def construct_from_cassandra(
cls,
ssObjectId,
filter_list=["u", "g", "r", "i", "z", "y"],
date_range=[60000.0, 67300.0],
cassandra_hosts=["10.21.3.123"],
): # pragma: no cover
"""Custom constructor which builds the AdlerPlanetoid object and the associated Observations, MPCORB and SSObject objects from
a Cassandra database. Used only for Lasair integration.
TODO: move method to its own class which inherits from AdlerPlanetoid and move to adler-lasair repo?
Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.
filter_list : list of str
A comma-separated list of the filters of interest.
date_range : list of float
The minimum and maximum dates of the desired observations.
cassandra_hosts : list of str
Location of the Cassandra database - usually an IP address. Default is ["10.21.3.123"].
"""
# do not move this import! CassandraFetcher requires the non-mandatory
# cassandra-driver library - if not installed, and this import is at the top,
# test collection will break.
from adler.lasair.cassandra_fetcher import CassandraFetcher
fetcher = CassandraFetcher(cassandra_hosts=cassandra_hosts)
MPCORB_dict = fetcher.fetch_MPCORB(ssObjectId)
SSObject_dict = fetcher.fetch_SSObject(ssObjectId, filter_list)
observations_dict = fetcher.fetch_observations(ssObjectId)
# note that Cassandra doesn't allow filters/joins
# instead we pull all observations for this ID, then filter with Pandas later
observations_table = pd.DataFrame(observations_dict)
observations_table.rename(columns={"decl": "dec"}, inplace=True)
observations_by_filter = []
for filter_name in filter_list:
obs_slice = observations_table[
(observations_table["band"] == filter_name)
& (observations_table["midpointmjdtai"].between(date_range[0], date_range[1]))
]
if len(obs_slice) == 0:
logger.warning(
"No observations found in {} filter for this object. Skipping this filter.".format(
filter_name
)
)
else:
observations = Observations.construct_from_data_table(ssObjectId, filter_name, obs_slice)
observations_by_filter.append(observations)
if len(observations_by_filter) == 0:
logger.error(
"No observations found for this object in the given filter(s). Check SSOID and try again."
)
raise Exception(
"No observations found for this object in the given filter(s). Check SSOID and try again."
)
if len(filter_list) > len(observations_by_filter):
logger.info(
"Not all specified filters have observations. Recalculating filter list based on past observations."
)
filter_list = [obs_object.filter_name for obs_object in observations_by_filter]
logger.info("New filter list is: {}".format(filter_list))
mpcorb = MPCORB.construct_from_dictionary(ssObjectId, MPCORB_dict)
ssobject = SSObject.construct_from_dictionary(ssObjectId, filter_list, SSObject_dict)
adler_data = AdlerData(ssObjectId, filter_list)
return cls(
ssObjectId,
filter_list,
date_range,
observations_by_filter,
mpcorb,
ssobject,
adler_data,
)
@classmethod
[docs]
def construct_from_RSP(
cls,
ssObjectId,
filter_list=["u", "g", "r", "i", "z", "y"],
date_range=[60000.0, 67300.0],
schema="dp03_catalogs_10yr",
api_token_path=None,
flux_flag=None,
): # pragma: no cover
"""Custom constructor which builds the AdlerPlanetoid object and the associated Observations, MPCORB and SSObject objects
from the RSP.
Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.
filter_list : list of str
A comma-separated list of the filters of interest.
date_range : list of float
The minimum and maximum dates of the desired observations.
schema : str or None
Schema/database from which to select the data tables. Can be None. Default is currently "dp03_catalogs_10yr" for testing using DP0.3.
api_token_path : str or None
Path to user RSP API token if running not on RSP. See https://rsp.lsst.io/guides/auth/creating-user-tokens.html and lsst-adler/notebooks/adler_demo/adler_demo_rsp_api.ipynb for guide on setting this up.
flux_flag : str or None
Name of the flux column to select from DP1 DiaSource table. Determines FluxErr and ra/dec columns to select also. Default is None (selects mag/magErr/ra/dec for DP0.3)
"""
if len(date_range) != 2:
raise ValueError("date_range argument must be of length 2.")
rsp_tap_path = RSP_TAP_CONFIG_DICT[schema] # TODO give better name
# Select correct TAP service depending on schema chosen
if api_token_path:
service = get_tap_service_api(rsp_tap_path, api_token_path=api_token_path)
else:
service = get_tap_service(rsp_tap_path)
logger.info("Getting past observations from DIASource/SSSource...")
observations_by_filter = cls.populate_observations(
cls, ssObjectId, filter_list, date_range, service=service, schema=schema, flux_flag=flux_flag
)
if len(observations_by_filter) == 0:
logger.error(
"No observations found for this object in the given filter(s). Check SSOID and try again."
)
raise Exception(
"No observations found for this object in the given filter(s). Check SSOID and try again."
)
if len(filter_list) > len(observations_by_filter):
logger.info(
"Not all specified filters have observations. Recalculating filter list based on past observations."
)
filter_list = [obs_object.filter_name for obs_object in observations_by_filter]
logger.info("New filter list is: {}".format(filter_list))
logger.info("Populating MPCORB metadata...")
mpcorb = cls.populate_MPCORB(cls, ssObjectId, service=service, schema=schema)
logger.info("Populating SSObject metadata...")
ssobject = cls.populate_SSObject(cls, ssObjectId, filter_list, service=service, schema=schema)
adler_data = AdlerData(ssObjectId, filter_list)
return cls(
ssObjectId,
filter_list,
date_range,
observations_by_filter,
mpcorb,
ssobject,
adler_data,
)
[docs]
def populate_observations(
self,
ssObjectId,
filter_list,
date_range,
service=None,
sql_filename=None,
schema="dp03_catalogs_10yr",
flux_flag=None,
):
"""Populates the observations_by_filter class attribute. Can populate from either the RSP for a SQL database:
this behaviour is controlled by the service and sql_filename parameters, one of which must be supplied.
Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.
filter_list : list of str
A comma-separated list of the filters of interest.
date_range : list of float
The minimum and maximum dates of the desired observations.
service : pyvo.dal.tap.TAPService object or None
TAPService object linked to the RSP. Default=None.
sql_filename : str or None
Filepath to a SQL database. Default=None.
schema : str or None
Schema/database from which to select the data tables. Can be None. Default is currently "dp03_catalogs_10yr" for testing using DP0.3.
flux_flag : str or None
Name of the flux column to select from DP1 DiaSource table. Determines FluxErr and ra/dec columns to select also. Default is None (selects mag/magErr/ra/dec for DP0.3)
"""
if sql_filename:
sql_schema = ""
else: # pragma: no cover
sql_schema = schema + "."
# if schema: # pragma: no cover
# sql_schema = schema + "."
# else:
# sql_schema = ""
try:
selected_config = SCHEMA_CONFIG_DICT[schema][flux_flag]
except KeyError:
if schema not in SCHEMA_CONFIG_DICT:
logger.error(f"Schema {schema} not recognised.")
raise Exception(f"Schema {schema} not recognised.")
else:
logger.error(f"Flux column {flux_flag} not recognised for schema {schema}.")
raise Exception(f"Flux column {flux_flag} not recognised for schema {schema}.")
fluxmag_column = selected_config["fluxmag_column"]
fluxmag_err_column = selected_config["fluxmag_err_column"]
ra_column = selected_config["ra_column"]
dec_column = selected_config["dec_column"]
observations_by_filter = []
for filter_name in filter_list:
observations_sql_query = f"""
SELECT
SSObject.ssObjectId, SSSource.diaSourceId, {fluxmag_column}, {fluxmag_err_column}, band, midPointMjdTai, {ra_column} AS ra, {dec_column} AS dec, phaseAngle,
topocentricDist, heliocentricDist, heliocentricX, heliocentricY, heliocentricZ,
topocentricX, topocentricY, topocentricZ,
eclipticLambda, eclipticBeta
FROM
{sql_schema}SSObject
JOIN {sql_schema}DiaSource ON {sql_schema}SSObject.ssObjectId = {sql_schema}DiaSource.ssObjectId
JOIN {sql_schema}SSSource ON {sql_schema}DiaSource.diaSourceId = {sql_schema}SSSource.diaSourceId
WHERE
SSObject.ssObjectId = {ssObjectId} AND band = '{filter_name}' AND midPointMjdTai BETWEEN {date_range[0]} AND {date_range[1]}
"""
# This function submits the query and gets the results (or pulls from the SQL database)
data_table = get_data_table(observations_sql_query, service=service, sql_filename=sql_filename)
if len(data_table) == 0:
logger.warning(
"No observations found in {} filter for this object. Skipping this filter.".format(
filter_name
)
)
else:
if schema in [None, "dp03_catalogs_10yr"]: # TODO probably better way to do this
observations_by_filter.append(
Observations.construct_from_data_table(ssObjectId, filter_name, data_table)
)
elif schema == "dp1":
# Convert to astropy table so we can operate on it and add mag,magErr columns
# TODO temporary fix, get_data_table returns two possible objects (DALResultsTable or Pandas dataframe) that need different handling to convert to astropy tables
if isinstance(data_table, pd.DataFrame):
data_table_astropy = Table.from_pandas(data_table)
data_table_astropy[fluxmag_column] = data_table_astropy[fluxmag_column] * u.nJy
data_table_astropy[fluxmag_err_column] = (
data_table_astropy[fluxmag_err_column] * u.nJy
)
else:
data_table_astropy = data_table.to_table()
# Compute magnitudes
mag, mag_err = flux_to_magnitude(
data_table_astropy[fluxmag_column], data_table_astropy[fluxmag_err_column]
)
# Insert the new columns at the same positions
data_table_astropy.add_column(
mag, name="mag", index=data_table_astropy.colnames.index(fluxmag_column)
)
data_table_astropy.add_column(
mag_err, name="magErr", index=data_table_astropy.colnames.index(fluxmag_err_column)
)
# Remove the old flux columns
data_table_astropy.remove_columns([fluxmag_column, fluxmag_err_column])
observations_by_filter.append(
Observations.construct_from_data_table(ssObjectId, filter_name, data_table_astropy)
)
else:
logger.error(f"Schema {schema} not recognised.")
raise Exception(f"Schema {schema} not recognised.")
return observations_by_filter
[docs]
def populate_MPCORB(self, ssObjectId, service=None, sql_filename=None, schema="dp03_catalogs_10yr"):
"""Populates the MPCORB object class attribute. Can populate from either the RSP for a SQL database:
this behaviour is controlled by the service and sql_filename parameters, one of which must be supplied.
Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.
service : pyvo.dal.tap.TAPService object or None
TAPService object linked to the RSP. Default=None.
sql_filename : str or None
Filepath to a SQL database. Default=None.
schema : str or None
Schema/database from which to select the data tables. Can be None. Default is currently "dp03_catalogs_10yr" for testing using DP0.3.
"""
if sql_filename:
sql_schema = ""
else: # pragma: no cover
sql_schema = schema + "."
# if schema: # pragma: no cover
# sql_schema = schema + "."
# else:
# sql_schema = ""
if schema in [None, "dp03_catalogs_10yr"]:
# Query for DP0.3. Compatible with subsequent adler code
MPCORB_sql_query = f"""
SELECT
ssObjectId, mpcDesignation, fullDesignation, mpcNumber, mpcH, mpcG, epoch, tperi, peri, node, incl, e, n, q, uncertaintyParameter, flags
FROM
{sql_schema}MPCORB
WHERE
ssObjectId = {ssObjectId}
"""
elif schema == "dp1":
# Query for DP1. Selecting the columns that still exist in the DP1 table
# We select t_p (MJD of pericentric passage) as tperi for consistency with DP0.3
MPCORB_sql_query = f"""
SELECT
ssObjectId, mpcDesignation, mpcH, epoch, t_p AS tperi, peri, node, incl, e, q
FROM
{sql_schema}MPCORB
WHERE
ssObjectId = {ssObjectId}
"""
else:
logger.error(f"Schema {schema} not recognised.")
raise Exception(f"Schema {schema} not recognised.")
data_table = get_data_table(MPCORB_sql_query, service=service, sql_filename=sql_filename)
if len(data_table) == 0:
logger.error("No MPCORB data for this object could be found for this SSObjectId.")
raise Exception("No MPCORB data for this object could be found for this SSObjectId.")
if schema in [None, "dp03_catalogs_10yr"]:
return MPCORB.construct_from_data_table(ssObjectId, data_table)
elif schema == "dp1":
# TODO get_data_table (above) NaN fills if we, e.g., SELECT NULL AS mpcNumber, which may be fine and remove the need for this
# Convert to astropy Table and add in NaNs/0/empty strings for the columns that do not appear in DP1
if isinstance(data_table, pd.DataFrame):
data_table_astropy = Table.from_pandas(data_table)
else:
data_table_astropy = data_table.to_table()
data_table_astropy.add_columns(
cols=[
np.full(len(data_table_astropy), ""), # fullDesignation (str)
np.full(len(data_table_astropy), 0), # mpcNumber (int)
np.full(len(data_table_astropy), np.nan), # mpcG (float)
np.full(len(data_table_astropy), np.nan), # n (float)
np.full(len(data_table_astropy), ""), # uncertaintyParameter (str)
np.full(len(data_table_astropy), ""), # flags (str)
],
names=["fullDesignation", "mpcNumber", "mpcG", "n", "uncertaintyParameter", "flags"],
)
# Reorder columns to match DP0.3 expected order
data_table_astropy = data_table_astropy[
[
"ssObjectId",
"mpcDesignation",
"fullDesignation",
"mpcNumber",
"mpcH",
"mpcG",
"epoch",
"tperi",
"peri",
"node",
"incl",
"e",
"n",
"q",
"uncertaintyParameter",
"flags",
]
]
return MPCORB.construct_from_data_table(ssObjectId, data_table_astropy)
else:
logger.error(f"Schema {schema} not recognised.")
raise Exception(f"Schema {schema} not recognised.")
[docs]
def populate_SSObject(
self, ssObjectId, filter_list, service=None, sql_filename=None, schema="dp03_catalogs_10yr"
):
"""Populates the SSObject class attribute. Can populate from either the RSP for a SQL database:
this behaviour is controlled by the service and sql_filename parameters, one of which must be supplied.
Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.
filter_list : list of str
A comma-separated list of the filters of interest.
service : pyvo.dal.tap.TAPService object or None
TAPService object linked to the RSP. Default=None.
sql_filename : str or None
Filepath to a SQL database. Default=None.
schema : str or None
Schema/database from which to select the data tables. Can be None. Default is currently "dp03_catalogs_10yr" for testing using DP0.3.
"""
if sql_filename:
sql_schema = ""
else: # pragma: no cover
sql_schema = schema + "."
# if schema: # pragma: no cover
# sql_schema = schema + "."
# else:
# sql_schema = ""
filter_dependent_columns = ""
for filter_name in filter_list:
filter_string = "{}_H, {}_G12, {}_HErr, {}_G12Err, {}_Ndata, ".format(
filter_name, filter_name, filter_name, filter_name, filter_name
)
filter_dependent_columns += filter_string
if schema in [None, "dp03_catalogs_10yr"]:
# Query for DP0.3. Compatible with subsequent adler code
SSObject_sql_query = f"""
SELECT
discoverySubmissionDate, firstObservationDate, arc, numObs,
{filter_dependent_columns}
maxExtendedness, minExtendedness, medianExtendedness
FROM
{sql_schema}SSObject
WHERE
ssObjectId = {ssObjectId}
"""
elif schema == "dp1":
# Query for DP1. Selecting the columns that still exist in the DP1 table
SSObject_sql_query = f"""
SELECT
discoverySubmissionDate, numObs
FROM
{sql_schema}SSObject
WHERE
ssObjectId = {ssObjectId}
"""
else:
logger.error(f"Schema {schema} not recognised.")
raise Exception(f"Schema {schema} not recognised.")
data_table = get_data_table(SSObject_sql_query, service=service, sql_filename=sql_filename)
if len(data_table) == 0:
logger.error("No SSObject data for this object could be found for this SSObjectId.")
raise Exception("No SSObject data for this object could be found for this SSObjectId.")
if schema in [None, "dp03_catalogs_10yr"]:
return SSObject.construct_from_data_table(ssObjectId, filter_list, data_table)
elif schema == "dp1":
# Convert to Table
if isinstance(data_table, pd.DataFrame):
data_table_astropy = Table.from_pandas(data_table)
else:
data_table_astropy = data_table.to_table()
# Add non-filter-dependent columns and populate with NaNs
data_table_astropy.add_columns(
cols=[
np.full(len(data_table_astropy), np.nan), # firstObservationDate
np.full(len(data_table_astropy), np.nan), # arc
np.full(len(data_table_astropy), np.nan), # maxExtendedness
np.full(len(data_table_astropy), np.nan), # minExtendedness
np.full(len(data_table_astropy), np.nan), # medianExtendedness
],
names=[
"firstObservationDate",
"arc",
"maxExtendedness",
"minExtendedness",
"medianExtendedness",
],
)
# Add all filter-dependent columns and populate with NaNs
for filter_name in filter_list:
data_table_astropy.add_columns(
cols=[
np.full(len(data_table_astropy), np.nan), # f"{filter_name}_H"
np.full(len(data_table_astropy), np.nan), # f"{filter_name}_G12"
np.full(len(data_table_astropy), np.nan), # f"{filter_name}_HErr"
np.full(len(data_table_astropy), np.nan), # f"{filter_name}_G12Err"
np.full(len(data_table_astropy), 0), # Ndata
],
names=[
f"{filter_name}_H",
f"{filter_name}_G12",
f"{filter_name}_HErr",
f"{filter_name}_G12Err",
f"{filter_name}_Ndata",
],
)
# Reorder columns to match DP0.3 expected order
dp03_cols_order = ["discoverySubmissionDate", "firstObservationDate", "arc", "numObs"]
for filter_name in filter_list:
dp03_cols_order += [
f"{filter_name}_H",
f"{filter_name}_G12",
f"{filter_name}_HErr",
f"{filter_name}_G12Err",
f"{filter_name}_Ndata",
]
dp03_cols_order += ["maxExtendedness", "minExtendedness", "medianExtendedness"]
data_table_astropy = data_table_astropy[dp03_cols_order]
return SSObject.construct_from_data_table(ssObjectId, filter_list, data_table_astropy)
else:
logger.error(f"Schema {schema} not recognised.")
raise Exception(f"Schema {schema} not recognised.")
@classmethod
[docs]
def construct_from_mpc_obs_sbn(
cls,
ssObjectId,
sql_filename,
filter_list=["u", "g", "r", "i", "z", "y"],
date_range=[60000.0, 67300.0],
):
"""Custom constructor which builds the AdlerPlanetoid object and the associated Observations, MPCORB and SSObject objects
from the MPC obs_sbn database. This is designed specifically for the SSSC Prompt Products Database Bandaid.
Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.
sql_filename : str
Filepath to the local SQL database.
filter_list : list of str
A comma-separated list of the filters of interest.
date_range : list of float
The minimum and maximum dates of the desired observations (in MJD).
"""
if len(date_range) != 2:
raise ValueError("date_range argument must be of length 2.")
observations_by_filter = cls.populate_observations_from_mpc_obs_sbn(
cls, ssObjectId, filter_list, date_range, sql_filename=sql_filename
)
if len(observations_by_filter) == 0:
logger.error(
"No observations found for this object in the given filter(s). Check SSOID and try again."
)
raise Exception(
"No observations found for this object in the given filter(s). Check SSOID and try again."
)
if len(filter_list) > len(observations_by_filter):
logger.info(
"Not all specified filters have observations. Recalculating filter list based on past observations."
)
filter_list = [obs_object.filter_name for obs_object in observations_by_filter]
logger.info("New filter list is: {}".format(filter_list))
mpcorb = cls.populate_MPCORB_from_mpc_obs_sbn(cls, ssObjectId, sql_filename=sql_filename)
ssobject = cls.populate_SSObject_from_mpc_obs_sbn(
cls, ssObjectId, filter_list, sql_filename=sql_filename
)
adler_data = AdlerData(ssObjectId, filter_list)
return cls(
ssObjectId,
filter_list,
date_range,
observations_by_filter,
mpcorb,
ssobject,
adler_data,
)
[docs]
def populate_observations_from_mpc_obs_sbn(self, ssObjectId, filter_list, date_range, sql_filename):
"""Populates the observations_by_filter class attribute. This version is specific to the construct_from_mpc_obs_sbn function.
Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.
filter_list : list of str
A comma-separated list of the filters of interest.
date_range : list of float
The minimum and maximum dates of the desired observations.
sql_filename : str
Filepath to an SQL database.
"""
logger.warning(
f"Constructing from the MPC obs_sbn table populates the following LSST schema columns as their best case obs_sbn analogs (LSST column name = obs_sbn column name):"
)
logger.warning(f"SSObjectId = provid; diaSourceId = obsid; magErr = rmsmag")
logger.warning(f"mjd_utc is converted to mjd_tai and presented as midPointMjdTai")
logger.warning(
f"phaseAngle, topocentricDist and heliocentricDist are not currently corrected for light travel time effects"
)
logger.warning(
f"heliocentricX, heliocentricY, heliocentricZ, topocentricX, topocentricY, topocentricZ, eclipticLambda, eclipticBeta are unpopulated and selected as NULLs."
)
observations_by_filter = []
for filter_name in filter_list:
observations_sql_query = f"""
SELECT
provid AS SSObjectId, obsid as diaSourceId, mag, rmsmag AS magErr, band, mjd_tai AS midPointMjdTai, ra, dec,
phaseAngle, topocentricDist, heliocentricDist,
NULL AS heliocentricX, NULL AS heliocentricY, NULL AS heliocentricZ,
NULL AS topocentricX, NULL AS topocentricY, NULL AS topocentricZ,
NULL AS eclipticLambda, NULL AS eclipticBeta
FROM
obs_sbn
WHERE
provid='{ssObjectId}' AND band = '{filter_name}' AND mjd_tai BETWEEN '{date_range[0]}' AND '{date_range[1]}'
"""
# This function submits the query and gets the results from the SQL database supplied
# Explicitly setting service=None here for clarity as this version does not query from non-local databases
data_table = get_data_table(observations_sql_query, service=None, sql_filename=sql_filename)
if len(data_table) == 0:
logger.warning(
"No observations found in {} filter for this object. Skipping this filter.".format(
filter_name
)
)
else:
# DP1 discoveries have no magErr values so fill with NaNs
# for some reason (TODO: check why) this means that this column has dtype object so we force it to be float64 here
data_table["magErr"] = data_table.magErr.astype(np.float64)
observations_by_filter.append(
Observations.construct_from_data_table(ssObjectId, filter_name, data_table)
)
return observations_by_filter
[docs]
def populate_MPCORB_from_mpc_obs_sbn(self, ssObjectId, sql_filename):
"""Populates the MPCORB object class attribute. This version is specific to the construct_from_mpc_obs_sbn function.
Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.
sql_filename : str or None
Filepath to an SQL database.
"""
logger.warning(
f"Constructing from the MPC obs_sbn table populates the following LSST schema columns as their best case obs_sbn analogs (LSST column name = obs_sbn column name):"
)
logger.warning(f"ssObjectId = fullDesignation; fullDesignation = fullDesignation; tperi = t_p")
logger.warning(
f"mpcDesignation, mpcNumber, mpcG, n, uncertaintyParameter, flags are unpopulated and selected as NULL/0."
)
mpc_orbits_sql_query = f"""
SELECT
fullDesignation AS ssObjectId, NULL AS mpcDesignation, fullDesignation AS fullDesignation, 0 AS mpcNumber,
mpcH, NULL AS mpcG, epoch, t_p AS tperi, peri, node, incl, e, NULL AS n, q, NULL AS uncertaintyParameter, NULL AS flags
FROM
mpc_orbits
WHERE
fullDesignation = '{ssObjectId}'
"""
# Explicitly setting service=None here for clarity as this version does not query from non-local databases
data_table = get_data_table(mpc_orbits_sql_query, service=None, sql_filename=sql_filename)
if len(data_table) == 0:
logger.error("No mpc_orbits data for this object could be found for this SSObjectId.")
raise Exception("No mpc_orbits data for this object could be found for this SSObjectId.")
return MPCORB.construct_from_data_table(ssObjectId, data_table)
[docs]
def populate_SSObject_from_mpc_obs_sbn(self, ssObjectId, filter_list, sql_filename):
"""Populates the SSObject class attribute. This version is specific to the construct_from_mpc_obs_sbn function.
Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.
filter_list : list of str
A comma-separated list of the filters of interest.
sql_filename : str or None
Filepath to an SQL database.
"""
filter_dependent_columns = ""
for filter_name in filter_list:
# Counting number of observations in given filter in the query here
filter_string = "NULL AS {}_H, NULL AS {}_G12, NULL AS {}_HErr, NULL AS {}_G12Err, (SELECT COUNT(*) FROM obs_sbn WHERE band='{}' and provid='{}') AS {}_Ndata, ".format(
filter_name, filter_name, filter_name, filter_name, filter_name, ssObjectId, filter_name
)
filter_dependent_columns += filter_string
logger.warning(
f"Constructing from the MPC obs_sbn table populates the following LSST schema columns as their best case obs_sbn analogs (LSST column name = obs_sbn column name):"
)
logger.warning(f"All columns other than numObs/'band'_Ndata are selected as NULL/0.")
SSObject_sql_query = f"""
SELECT
NULL AS discoverySubmissionDate, NULL AS firstObservationDate, NULL AS arc, count(*) AS numObs,
{filter_dependent_columns}
NULL AS maxExtendedness, NULL AS minExtendedness, NULL AS medianExtendedness
FROM
obs_sbn
WHERE
provid = '{ssObjectId}'
"""
# Explicitly setting service=None here for clarity as this version does not query from non-local databases
data_table = get_data_table(SSObject_sql_query, service=None, sql_filename=sql_filename)
# TODO probably add some warnings for these as there isn't actually any SSObject data for these things in MPC file
if len(data_table) == 0 or data_table["numObs"].values == 0:
logger.error("No SSObject data for this object could be found for this SSObjectId.")
raise Exception("No SSObject data for this object could be found for this SSObjectId.")
return SSObject.construct_from_data_table(ssObjectId, filter_list, data_table)
[docs]
def observations_in_filter(self, filter_name):
"""User-friendly helper function. Returns the Observations object for a given filter.
Parameters
-----------
filter_name : str
The desired filter.
Returns
-----------
Observations object
The Observations object in self.observations_by_filter corresponding to the desired filter.
"""
try:
filter_index = self.filter_list.index(filter_name)
except ValueError:
logger.error("ValueError: Filter {} is not in AdlerPlanetoid.filter_list.".format(filter_name))
raise ValueError("Filter {} is not in AdlerPlanetoid.filter_list.".format(filter_name))
return self.observations_by_filter[filter_index]
[docs]
def observations_within_time(self, start=None, stop=None):
"""Get a dataframe of all observations (across all filters) taken within a given time interval.
Parameters
----------
start, stop : float
The time limits as modified Julian dates. Optional, if not declared then get all data
Returns
-------
observations : pandas.DataFrame
"""
result = pd.DataFrame()
for obs in self.observations_by_filter:
df = pd.DataFrame(obs.__dict__ | {"filter_name": [obs.filter_name] * obs.num_obs})
result = pd.concat([result, df]).reset_index(drop=True)
if start is None:
start = np.amin(result["midPointMjdTai"])
if stop is None:
stop = np.amax(result["midPointMjdTai"])
i = (result.midPointMjdTai >= start) * (result.midPointMjdTai <= stop)
result = result[i]
result = result.sort_values("midPointMjdTai")
return result
[docs]
def SSObject_in_filter(self, filter_name):
"""User-friendly helper function. Returns the filter-dependent values from SSObject for a given filter.
Parameters
-----------
filter_name : str
The desired filter.
Returns
-----------
ssobject_in_filter : SSObject
"""
try:
filter_index = self.filter_list.index(filter_name)
except ValueError:
logger.error("ValueError: Filter {} is not in AdlerPlanetoid.filter_list.".format(filter_name))
raise ValueError("Filter {} is not in AdlerPlanetoid.filter_list.".format(filter_name))
return self.SSObject.filter_dependent_values[filter_index]
[docs]
def attach_previous_adler_data(self, filepath, modelId=None):
"""Attaches and returns an AdlerData object containing the most recent AdlerData
for this ssObjectId.
Parameters
-----------
filepath : path-like object
Filepath with the location of the output SQL database.
modelId : str, optional
modelId for the model of interest that should be recovered. Default: None.
"""
self.PreviousAdlerData = AdlerData(self.ssObjectId, self.filter_list)
self.PreviousAdlerData.populate_from_database(filepath, modelId=modelId)
return self.PreviousAdlerData