From 89657a8a5d9e61408631cae692e4dfac57297693 Mon Sep 17 00:00:00 2001 From: ntellis Date: Thu, 15 Dec 2022 02:08:52 -0800 Subject: [PATCH 01/12] rough running ver of brute force using precovery DB - seems to be giving garbage results though. needs cleanup --- .gitignore | 2 + precovery/brute_force.py | 347 ++++++++++++++++++++++++++++++++++++++ precovery/frame_db.py | 84 ++++++--- precovery/healpix_geom.py | 7 +- precovery/precovery_db.py | 227 +++++++++++++++++++------ precovery/residuals.py | 191 +++++++++++++++++++++ precovery/utils.py | 136 +++++++++++++++ 7 files changed, 923 insertions(+), 71 deletions(-) create mode 100644 precovery/brute_force.py create mode 100644 precovery/residuals.py create mode 100644 precovery/utils.py diff --git a/.gitignore b/.gitignore index 0d1b939..3ab222c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ .eggs thorcontrol/version.py virtualenv3.10 +python3.9 data/ +precovery/__pycache__/* diff --git a/precovery/brute_force.py b/precovery/brute_force.py new file mode 100644 index 0000000..79fcca4 --- /dev/null +++ b/precovery/brute_force.py @@ -0,0 +1,347 @@ +import os + +from sqlalchemy import all_ + +from precovery.precovery_db import PrecoveryDatabase + +os.environ["OMP_NUM_THREADS"] = "1" +os.environ["OPENBLAS_NUM_THREADS"] = "1" +os.environ["MKL_NUM_THREADS"] = "1" +os.environ["VECLIB_MAXIMUM_THREADS"] = "1" +os.environ["NUMEXPR_NUM_THREADS"] = "1" + +import logging +import multiprocessing as mp +import time +from functools import partial + +import numpy as np +import pandas as pd +from astropy.time import Time +from sklearn.neighbors import BallTree + +# replace this usage with Orbit.compute_ephemeris +from .orbit import Orbit +from .residuals import calcResiduals +from .healpix_geom import radec_to_healpixel, radec_to_healpixel_with_neighbors +from .utils import _checkParallel, _initWorker, calcChunkSize, yieldChunks + +from typing import ( + Iterable, + Optional, + Tuple, + List +) +logger = logging.getLogger(__name__) + +__all__ = ["get_observations_and_ephemerides_for_orbits", "attribution_worker", "attributeObservations"] + + +def get_observations_and_ephemerides_for_orbits( + orbits: Iterable[Orbit], + mjd_start: float, + mjd_end: float, + precovery_db: PrecoveryDatabase, + obscode: str = "W84" +): + """ + Compute ephemeris for the orbit, propagated to an epoch, and observed from + a location represented by obscode. + + obscode should be a Minor Planet Center observatory code. + """ + + # Find all the mjd we need to propagate to + all_frame_mjd = precovery_db.frames.idx.unique_frame_times() + frame_mjd_within_range = [x for x in all_frame_mjd if (x > mjd_start and x < mjd_end)] + + ephemeris_dfs = [] + frame_dfs = [] + # this should pass through obscode...rather should use the relevant frames' obs codes + for orbit in orbits: + eph = orbit.compute_ephemeris(obscode=obscode, epochs=frame_mjd_within_range) + mjd = [w.mjd for w in eph] + ra = [w.ra for w in eph] + dec = [w.dec for w in eph] + ephemeris_df = pd.DataFrame.from_dict({ + 'mjd_utc': mjd, + 'RA_deg': ra, + 'Dec_deg': dec, + }) + ephemeris_df["orbit_id"] = orbit.orbit_id + ephemeris_df["observatory_code"] = obscode + ephemeris_dfs.append(ephemeris_df) + + # Now we gathetr the healpixels and neighboring pixels for each propagated position + healpix = radec_to_healpixel_with_neighbors(ra, dec, precovery_db.frames.healpix_nside) + + frame_df = pd.concat([pd.DataFrame.from_dict({ + "mjd_utc": [x[0] for y in range(9)], + "obscode": [x[1] for y in range(9)], + "healpix": list(x[2]) + }) for x in zip(mjd, ephemeris_df["observatory_code"], list(healpix.transpose()))], + ignore_index=True) + frame_dfs.append(frame_df) + + # This will be passed back to be used as the ephemeris dataframe later + ephemeris = pd.concat(ephemeris_dfs, ignore_index=True) + unique_frames = pd.concat(frame_dfs, ignore_index=True).drop_duplicates() + + # This filter is very inefficient - there is surely a better way to do this + filtered_frames = [] + all_frames = precovery_db.frames.idx.frames_by_date(mjd_start, mjd_end) + for frame in all_frames: + if ((unique_frames['mjd_utc'] == frame.mjd) & + (unique_frames['obscode'] == frame.obscode) & + (unique_frames['healpix'] == frame.healpixel)).any(): + filtered_frames.append(frame) + + + observations = precovery_db.extract_observations_by_frames(filtered_frames) + + return ephemeris, observations + + + + +def attribution_worker( + ephemeris, + observations, + eps=1 / 3600, + include_probabilistic=True, +): + + # Create observer's dictionary from observations + observers = {} + for observatory_code in observations["observatory_code"].unique(): + observers[observatory_code] = Time( + observations[observations["observatory_code"].isin([observatory_code])][ + "mjd_utc" + ].unique(), + scale="utc", + format="mjd", + ) + + + # Group the predicted ephemerides and observations by visit / exposure + observations_grouped = observations.groupby(by=["observatory_code", "mjd_utc"]) + observations_visits = [ + observations_grouped.get_group(g) for g in observations_grouped.groups + ] + + # We pre-computed the ephemerides. Now we filter the ephemeris for only visits + # that have observations in the obs group passed to the worker + + ephemeris_pre_grouped = ephemeris.groupby(by=["observatory_code", "mjd_utc"]) + obs_group_keys = list(observations_grouped.groups.keys()) + indices_to_drop = pd.Int64Index([]) + for g_key in list(ephemeris_pre_grouped.groups.keys()): + if g_key not in obs_group_keys: + indices_to_drop = indices_to_drop.union(ephemeris_pre_grouped.get_group(g_key).index) + + ephemeris_filtered = ephemeris.drop(indices_to_drop) + + # Group the now-filtered ephemerides. There should only be visits for the observation set + ephemeris_grouped = ephemeris_filtered.groupby(by=["observatory_code", "mjd_utc"]) + ephemeris_visits = [ + ephemeris_grouped.get_group(g) for g in ephemeris_grouped.groups + ] + + # Loop through each unique exposure and visit, find the nearest observations within + # eps (haversine metric) + distances = [] + orbit_ids_associated = [] + obs_ids_associated = [] + obs_times_associated = [] + eps_rad = np.radians(eps) + residuals = [] + stats = [] + for ephemeris_visit, observations_visit in zip( + ephemeris_visits, observations_visits + ): + + assert len(ephemeris_visit["mjd_utc"].unique()) == 1 + assert len(observations_visit["mjd_utc"].unique()) == 1 + assert ( + observations_visit["mjd_utc"].unique()[0] + == ephemeris_visit["mjd_utc"].unique()[0] + ) + + obs_ids = observations_visit[["obs_id"]].values + obs_times = observations_visit[["mjd_utc"]].values + orbit_ids = ephemeris_visit[["orbit_id"]].values + coords = observations_visit[["RA_deg", "Dec_deg"]].values + coords_predicted = ephemeris_visit[["RA_deg", "Dec_deg"]].values + coords_sigma = observations_visit[["RA_sigma_deg", "Dec_sigma_deg"]].values + + # Haversine metric requires latitude first then longitude... + coords_latlon = np.radians(observations_visit[["Dec_deg", "RA_deg"]].values) + coords_predicted_latlon = np.radians( + ephemeris_visit[["Dec_deg", "RA_deg"]].values + ) + + num_obs = len(coords_predicted) + k = np.minimum(3, num_obs) + + # Build BallTree with a haversine metric on predicted ephemeris + tree = BallTree(coords_predicted_latlon, metric="haversine") + # Query tree using observed RA, Dec + d, i = tree.query( + coords_latlon, + k=k, + return_distance=True, + dualtree=True, + breadth_first=False, + sort_results=False, + ) + + # Select all observations with distance smaller or equal + # to the maximum given distance + mask = np.where(d <= eps_rad) + + if len(d[mask]) > 0: + orbit_ids_associated.append(orbit_ids[i[mask]]) + obs_ids_associated.append(obs_ids[mask[0]]) + obs_times_associated.append(obs_times[mask[0]]) + distances.append(d[mask].reshape(-1, 1)) + + residuals_visit, stats_visit = calcResiduals( + coords[mask[0]], + coords_predicted[i[mask]], + sigmas_actual=coords_sigma[mask[0]], + include_probabilistic=True, + ) + residuals.append(residuals_visit) + stats.append(np.vstack(stats_visit).T) + + if len(distances) > 0: + distances = np.degrees(np.vstack(distances)) + orbit_ids_associated = np.vstack(orbit_ids_associated) + obs_ids_associated = np.vstack(obs_ids_associated) + obs_times_associated = np.vstack(obs_times_associated) + residuals = np.vstack(residuals) + stats = np.vstack(stats) + + attributions = { + "orbit_id": orbit_ids_associated[:, 0], + "obs_id": obs_ids_associated[:, 0], + "mjd_utc": obs_times_associated[:, 0], + "distance": distances[:, 0], + "residual_ra_arcsec": residuals[:, 0] * 3600, + "residual_dec_arcsec": residuals[:, 1] * 3600, + "chi2": stats[:, 0], + } + if include_probabilistic: + attributions["probability"] = stats[:, 1] + attributions["mahalanobis_distance"] = stats[:, 2] + + attributions = pd.DataFrame(attributions) + + else: + columns = [ + "orbit_id", + "obs_id", + "mjd_utc", + "distance", + "residual_ra_arcsec", + "residual_dec_arcsec", + "chi2", + ] + if include_probabilistic: + columns += ["probability", "mahalanobis_distance"] + + attributions = pd.DataFrame(columns=columns) + + return attributions + + +def attributeObservations( + orbits, + mjd_start: float, + mjd_end: float, + precovery_db: PrecoveryDatabase, + eps=5 / 3600, + include_probabilistic=True, + backend="PYOORB", + backend_kwargs={}, + orbits_chunk_size=10, + observations_chunk_size=100000, + num_jobs=1, + parallel_backend="mp", +): + logger.info("Running observation attribution...") + time_start = time.time() + + num_orbits = len(orbits) + + attribution_dfs = [] + + # prepare ephemeris and observation dictionaries + ephemeris, observations = get_observations_and_ephemerides_for_orbits(orbits, mjd_start, mjd_end, precovery_db) + + parallel, num_workers = _checkParallel(num_jobs, parallel_backend) + if num_workers > 1: + + p = mp.Pool( + processes=num_workers, + initializer=_initWorker, + ) + + # Send up to orbits_chunk_size orbits to each OD worker for processing + chunk_size_ = calcChunkSize( + num_orbits, num_workers, orbits_chunk_size, min_chunk_size=1 + ) + + orbits_split = [orbits[i:i + chunk_size_] for i in range(0, len(orbits), chunk_size_)] + + eph_split = [] + for orbit_c in orbits.split(orbits_chunk_size): + eph_split.append(ephemeris[ephemeris["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c])]) + for observations_c in yieldChunks(observations, observations_chunk_size): + + obs = [observations_c for i in range(len(orbits_split))] + attribution_dfs_i = p.starmap( + partial( + attribution_worker, + eps=eps, + include_probabilistic=include_probabilistic, + backend=backend, + backend_kwargs=backend_kwargs, + ), + zip( + eph_split, + obs, + ), + ) + attribution_dfs += attribution_dfs_i + + p.close() + + else: + for observations_c in yieldChunks(observations, observations_chunk_size): + for orbit_c in [orbits[i:i + orbits_chunk_size] for i in range(0, len(orbits), orbits_chunk_size)]: + + eph_c = ephemeris[ephemeris["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c])] + attribution_df_i = attribution_worker( + eph_c, + observations_c, + eps=eps, + include_probabilistic=include_probabilistic, + ) + attribution_dfs.append(attribution_df_i) + + attributions = pd.concat(attribution_dfs) + attributions.sort_values( + by=["orbit_id", "mjd_utc", "distance"], inplace=True, ignore_index=True + ) + + time_end = time.time() + logger.info( + "Attributed {} observations to {} orbits.".format( + attributions["obs_id"].nunique(), attributions["orbit_id"].nunique() + ) + ) + logger.info( + "Attribution completed in {:.3f} seconds.".format(time_end - time_start) + ) + return attributions diff --git a/precovery/frame_db.py b/precovery/frame_db.py index 4fbb21a..b8ace00 100644 --- a/precovery/frame_db.py +++ b/precovery/frame_db.py @@ -4,22 +4,11 @@ import logging import os import struct -from typing import ( - Iterable, - Iterator, - Optional, - Set, - Tuple -) +from typing import Iterable, Iterator, Optional, Set, Tuple import numpy as np import sqlalchemy as sq -from rich.progress import ( - BarColumn, - Progress, - TimeElapsedColumn, - TimeRemainingColumn -) +from rich.progress import BarColumn, Progress, TimeElapsedColumn, TimeRemainingColumn from sqlalchemy.sql import func as sqlfunc from . import sourcecatalog @@ -43,6 +32,7 @@ class HealpixFrame: data_offset: int data_length: int + @dataclasses.dataclass class Dataset: id: str @@ -51,6 +41,7 @@ class Dataset: documentation_url: str sia_url: str + @dataclasses.dataclass class FrameBundleDescription: obscode: str @@ -113,6 +104,7 @@ def from_srcobs(cls, so: sourcecatalog.SourceObservation): id=so.id, ) + class FrameIndex: def __init__(self, db_engine): self.db = db_engine @@ -123,12 +115,10 @@ def __init__(self, db_engine): def open(cls, db_uri, mode: str = "r"): if (mode != "r") and (mode != "w"): - err = ( - "mode should be one of {'r', 'w'}" - ) + err = "mode should be one of {'r', 'w'}" raise ValueError(err) - if db_uri.startswith('sqlite:///') and (mode == "r"): + if db_uri.startswith("sqlite:///") and (mode == "r"): db_uri += "?mode=ro" engine = sq.create_engine(db_uri) @@ -281,6 +271,20 @@ def n_unique_frames(self) -> int: row = self.dbconn.execute(stmt).fetchone() return row[0] + def unique_frame_times(self) -> Iterable[float]: + """ + return unique values of mjd among all frames. + + This is needed for brute force propagation to each time, + prior to extraction of observations + """ + select_stmt = ( + sq.select(self.frames.c.mjd) + .distinct() + ) + mjds = self.dbconn.execute(select_stmt).fetchall() + return [x[0] for x in mjds] + def frames_for_bundle( self, bundle: FrameBundleDescription ) -> Iterator[HealpixFrame]: @@ -393,6 +397,35 @@ def all_frames(self) -> Iterator[HealpixFrame]: for row in result: yield HealpixFrame(*row) + def frames_by_date( + self, mjd_start: float, mjd_end: float + ) -> Iterator[HealpixFrame]: + """ + Returns all frames in the index within a date range, sorted by obscode, mjd, and healpixel. + """ + select_stmt = ( + sq.select( + self.frames.c.id, + self.frames.c.dataset_id, + self.frames.c.obscode, + self.frames.c.exposure_id, + self.frames.c.filter, + self.frames.c.mjd, + self.frames.c.healpixel, + self.frames.c.data_uri, + self.frames.c.data_offset, + self.frames.c.data_length, + ) + .where( + self.frames.c.mjd >= mjd_start, + self.frames.c.mjd <= mjd_end, + ) + .order_by(self.frames.c.obscode, self.frames.c.mjd, self.frames.c.healpixel) + ) + result = self.dbconn.execute(select_stmt) + for row in result: + yield HealpixFrame(*row) + def add_frame(self, frame: HealpixFrame): insert = self.frames.insert().values( dataset_id=frame.dataset_id, @@ -413,7 +446,7 @@ def add_dataset(self, dataset: Dataset): name=dataset.name, reference_doi=dataset.reference_doi, documentation_url=dataset.documentation_url, - sia_url=dataset.sia_url + sia_url=dataset.sia_url, ) self.dbconn.execute(insert) @@ -462,6 +495,7 @@ def initialize_tables(self): ) self._metadata.create_all(self.db) + class FrameDB: def __init__( self, @@ -494,7 +528,7 @@ def load_hdf5( name: Optional[str] = None, reference_doi: Optional[str] = None, documentation_url: Optional[str] = None, - sia_url: Optional[str] = None + sia_url: Optional[str] = None, ): """ Load data from a HDF5 catalog file. @@ -512,7 +546,7 @@ def load_hdf5( Maximum number of frames to load from the file. None means no limit. key : str Name of key where the observations table is located in the hdf5 file. - chunksize: + chunksize: Load observations in chunks of this size and then iterate over the chunks to load observations. name : str, optional @@ -525,18 +559,22 @@ def load_hdf5( Simple Image Access URL for accessing images for this particular dataset. """ if dataset_id not in self.idx.get_dataset_ids(): - logger.info(f"Adding new entry into datasets table for dataset {dataset_id}.") + logger.info( + f"Adding new entry into datasets table for dataset {dataset_id}." + ) dataset = Dataset( id=dataset_id, name=name, reference_doi=reference_doi, documentation_url=documentation_url, - sia_url=sia_url + sia_url=sia_url, ) self.idx.add_dataset(dataset) else: - logger.info(f"{dataset_id} dataset already has an entry in the datasets table.") + logger.info( + f"{dataset_id} dataset already has an entry in the datasets table." + ) for src_frame in sourcecatalog.iterate_frames( hdf5_file, diff --git a/precovery/healpix_geom.py b/precovery/healpix_geom.py index 31649f0..08de892 100644 --- a/precovery/healpix_geom.py +++ b/precovery/healpix_geom.py @@ -3,4 +3,9 @@ import numpy.typing as npt def radec_to_healpixel(ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int) -> npt.NDArray[np.int64]: - return hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) \ No newline at end of file + return hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) + +def radec_to_healpixel_with_neighbors(ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int) -> npt.NDArray[np.int64]: + healpix = hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) + neighbors = hp.pixelfunc.get_all_neighbours(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) + return np.append(neighbors, [healpix], axis=0) \ No newline at end of file diff --git a/precovery/precovery_db.py b/precovery/precovery_db.py index 01faeae..84957b5 100644 --- a/precovery/precovery_db.py +++ b/precovery/precovery_db.py @@ -1,28 +1,16 @@ -import os import dataclasses import itertools import logging +import os +from typing import Iterable, Iterator, Optional, Union + import numpy as np -from typing import ( - Iterable, - Iterator, - Optional, - Union -) - -from .config import ( - Config, - DefaultConfig -) -from .frame_db import ( - FrameDB, - FrameIndex -) +import pandas as pd + +from .config import Config, DefaultConfig +from .frame_db import FrameDB, FrameIndex, HealpixFrame from .healpix_geom import radec_to_healpixel -from .orbit import ( - Orbit, - PropagationIntegrator -) +from .orbit import Orbit, PropagationIntegrator from .spherical_geom import haversine_distance_deg DEGREE = 1.0 @@ -30,11 +18,12 @@ ARCSEC = ARCMIN / 60 CANDIDATE_K = 15 -CANDIDATE_NSIDE = 2**CANDIDATE_K +CANDIDATE_NSIDE = 2 ** CANDIDATE_K logging.basicConfig() logger = logging.getLogger("precovery") + @dataclasses.dataclass class PrecoveryCandidate: mjd_utc: float @@ -58,6 +47,7 @@ class PrecoveryCandidate: distance_arcsec: float dataset_id: str + @dataclasses.dataclass class FrameCandidate: mjd_utc: float @@ -71,6 +61,7 @@ class FrameCandidate: pred_vdec_degpday: float dataset_id: str + class PrecoveryDatabase: def __init__(self, frames: FrameDB): self.frames = frames @@ -83,13 +74,13 @@ def from_dir(cls, directory: str, create: bool = False, mode: str = "r"): return cls.create(directory) try: - config = Config.from_json( - os.path.join(directory, "config.json") - ) + config = Config.from_json(os.path.join(directory, "config.json")) except FileNotFoundError: config = DefaultConfig if not create: - logger.warning("No configuration file found. Adopting configuration defaults.") + logger.warning( + "No configuration file found. Adopting configuration defaults." + ) frame_idx_db = "sqlite:///" + os.path.join(directory, "index.db") frame_idx = FrameIndex.open(frame_idx_db, mode=mode) @@ -115,10 +106,7 @@ def create( frame_idx_db = "sqlite:///" + os.path.join(directory, "index.db") frame_idx = FrameIndex.open(frame_idx_db) - config = Config( - nside=nside, - data_file_max_size=data_file_max_size - ) + config = Config(nside=nside, data_file_max_size=data_file_max_size) config.to_json(os.path.join(directory, "config.json")) data_path = os.path.join(directory, "data") @@ -135,7 +123,7 @@ def precover( start_mjd: Optional[float] = None, end_mjd: Optional[float] = None, window_size: int = 7, - include_frame_candidates: bool = False + include_frame_candidates: bool = False, ): """ Find observations which match orbit in the database. Observations are @@ -196,7 +184,7 @@ def precover( start_mjd=start_mjd, end_mjd=end_mjd, window_size=window_size, - include_frame_candidates=include_frame_candidates + include_frame_candidates=include_frame_candidates, ) for result in matches: yield result @@ -219,10 +207,14 @@ def _check_windows( Find all observations that match orbit within a list of windows """ # Propagate the orbit with n-body to every window center - orbit_propagated = orbit.propagate(window_midpoints, PropagationIntegrator.N_BODY) + orbit_propagated = orbit.propagate( + window_midpoints, PropagationIntegrator.N_BODY + ) # Calculate the location of the orbit on the sky with n-body propagation - window_ephems = orbit.compute_ephemeris(obscode, window_midpoints, PropagationIntegrator.N_BODY) + window_ephems = orbit.compute_ephemeris( + obscode, window_midpoints, PropagationIntegrator.N_BODY + ) window_healpixels = radec_to_healpixel( np.array([w.ra for w in window_ephems]), np.array([w.dec for w in window_ephems]), @@ -240,13 +232,17 @@ def _check_windows( # Check if start_mjd_window is not earlier than start_mjd (if defined) # If start_mjd_window is earlier, then set start_mjd_window to start_mjd if (start_mjd is not None) and (start_mjd_window < start_mjd): - logger.info(f"Window start MJD [UTC] ({start_mjd_window}) is earlier than desired start MJD [UTC] ({start_mjd}).") + logger.info( + f"Window start MJD [UTC] ({start_mjd_window}) is earlier than desired start MJD [UTC] ({start_mjd})." + ) start_mjd_window = start_mjd # Check if end_mjd_window is not later than end_mjd (if defined) # If end_mjd_window is later, then set end_mjd_window to end_mjd if (end_mjd is not None) and (end_mjd_window > end_mjd): - logger.info(f"Window end MJD [UTC] ({end_mjd_window}) is later than desired end MJD [UTC] ({end_mjd}).") + logger.info( + f"Window end MJD [UTC] ({end_mjd_window}) is later than desired end MJD [UTC] ({end_mjd})." + ) end_mjd_window = end_mjd timedeltas = [] @@ -298,7 +294,7 @@ def _check_windows( obscode, keep_mjds, tolerance, - include_frame_candidates + include_frame_candidates, ) for m in matches: yield m @@ -310,7 +306,7 @@ def _check_frames( obscode: str, mjds: Iterable[float], tolerance: float, - include_frame_candidates: bool + include_frame_candidates: bool, ) -> Iterator[Union[PrecoveryCandidate, FrameCandidate]]: """ Deeply inspect all frames that match the given obscode, mjd, and healpix to @@ -335,11 +331,11 @@ def _check_frames( # values for that parameter. As long as we return a Healpix ID generated with # nside greater than the indexed database then we can always down-sample the # ID to a lower nside value - healpix_id = int(radec_to_healpixel( - exact_ephem.ra, - exact_ephem.dec, - nside=CANDIDATE_NSIDE - )) + healpix_id = int( + radec_to_healpixel( + exact_ephem.ra, exact_ephem.dec, nside=CANDIDATE_NSIDE + ) + ) for f in frames: logger.info("checking frame: %s", f) @@ -366,8 +362,8 @@ def _check_frames( mjd_utc=f.mjd, ra_deg=o.ra, dec_deg=o.dec, - ra_sigma_arcsec=o.ra_sigma/ARCSEC, - dec_sigma_arcsec=o.dec_sigma/ARCSEC, + ra_sigma_arcsec=o.ra_sigma / ARCSEC, + dec_sigma_arcsec=o.dec_sigma / ARCSEC, mag=o.mag, mag_sigma=o.mag_sigma, filter=f.filter, @@ -379,9 +375,9 @@ def _check_frames( pred_dec_deg=exact_ephem.dec, pred_vra_degpday=exact_ephem.ra_velocity, pred_vdec_degpday=exact_ephem.dec_velocity, - delta_ra_arcsec=dra/ARCSEC, - delta_dec_arcsec=ddec/ARCSEC, - distance_arcsec=distance/ARCSEC, + delta_ra_arcsec=dra / ARCSEC, + delta_dec_arcsec=ddec / ARCSEC, + distance_arcsec=distance / ARCSEC, dataset_id=f.dataset_id, ) yield candidate @@ -406,3 +402,140 @@ def _check_frames( n_frame += 1 logger.info("checked %d frames", n_frame) + + + + def extract_observations_by_frames( + self, + frames: Iterable[HealpixFrame] + ): + # consider warnings for available memory + obs_out = pd.DataFrame( + columns=[ + "observatory_code", + "healpixel", + "obs_id", + "RA_deg", + "Dec_deg", + "RA_sigma_deg", + "Dec_sigma_deg", + "mjd_utc", + ] + ) + frame_dfs = [] + for frame in frames: + inc_arr = np.empty((0, 5), float) + obs_ids = np.empty((0, 1), object) + for obs in self.frames.iterate_observations(frame): + inc_arr = np.append( + inc_arr, + np.array( + [[obs.ra, obs.dec, obs.ra_sigma, obs.dec_sigma, frame.mjd]] + ), + axis=0, + ) + obs_ids = np.append( + obs_ids, + np.array( + [[obs.id.decode()]] + ), + axis=0, + ) + if np.any(inc_arr): + frame_obs = pd.DataFrame( + inc_arr, + columns=[ + "RA_deg", + "Dec_deg", + "RA_sigma_deg", + "Dec_sigma_deg", + "mjd_utc", + ], + ) + frame_obs.insert(0, "observatory_code", frame.obscode) + frame_obs.insert(1, "healpixel", frame.healpixel) + frame_obs.insert(2, "obs_id", obs_ids) + frame_dfs.append(frame_obs) + obs_out = pd.concat(frame_dfs) + return obs_out + + def extract_observations_by_date( + self, + mjd_start: float, + mjd_end: float, + ): + # consider warnings for available memory + + frames = self.frames.idx.frames_by_date(mjd_start, mjd_end) + + return self.extract_observations_by_frames(frames) + + + +def precover_brute_force( + self, + orbit: Orbit, + tolerance: float = 30 * ARCSEC, + start_mjd: Optional[float] = None, + end_mjd: Optional[float] = None, +): + """ + Find observations which match orbit in the database. Observations are + searched in descending order by mjd. + + orbit: The orbit to match. + + max_matches: End once this many matches have been found. If None, find + all matches. + + start_mjd: Only consider observations from after this epoch + (inclusive). If None, find all. + + end_mjd: Only consider observations from before this epoch (inclusive). + If None, find all. + """ + # basically: + """ + find all dates we need to propagate to + propagate orbit to each of these dates + get all intersecting healpix frames at these dates + grab neighboring healpixels + extract all observations from these healpixels + do the kd-tree stuff from original frame_db work + ??? + """ + if start_mjd is None or end_mjd is None: + first, last = self.frames.idx.mjd_bounds() + if start_mjd is None: + start_mjd = first + if end_mjd is None: + end_mjd = last + + n = 0 + logger.info( + "precovering orbit %s from %f.6f to %f.5f, window=%d", + orbit.orbit_id, + start_mjd, + end_mjd, + ) + + windows = self.frames.idx.window_centers(start_mjd, end_mjd) + + # group windows by obscodes so that many windows can be searched at once + for obscode, obs_windows in itertools.groupby(windows, key=lambda pair: pair[1]): + mjds = [window[0] for window in obs_windows] + matches = self._check_windows( + mjds, + obscode, + orbit, + tolerance, + start_mjd=start_mjd, + end_mjd=end_mjd, + window_size=window_size, + include_frame_candidates=include_frame_candidates, + ) + for result in matches: + yield result + n += 1 + if max_matches is not None and n >= max_matches: + return diff --git a/precovery/residuals.py b/precovery/residuals.py new file mode 100644 index 0000000..49ac997 --- /dev/null +++ b/precovery/residuals.py @@ -0,0 +1,191 @@ +import numpy as np +from scipy.spatial.distance import mahalanobis +from scipy.stats import chi2 + +__all__ = ["calcResiduals", "calcSimpleResiduals", "calcProbabilisticResiduals"] + + +def calcResiduals( + coords_actual, + coords_desired, + sigmas_actual=None, + covariances_actual=None, + include_probabilistic=True, +): + """ + Calculate residuals (actual - desired) and the associated chi2 if the + 1-sigma uncertainties on the actual quantities are provided. If desired, + a probabilistic measure of residual can also be calculated using the 1-sigma + uncertainties (which are converted into a covariance matrix) or by providing the + coveriance matrices for each coordinate. The probabilistic statistics include + the chi2 for each measurement, the mahalanobis distance and the associated + probability (calculated as 1 - chi2.cdf(mahalanobis_distance, degrees of freedom)). + + Parameters + ---------- + coords_actual : `~numpy.ndarray` (N, M) + Actual N coordinates in M dimensions. + coords_desired : `~numpy.ndarray` (N, M) + The desired N coordinates in M dimensions. + sigmas_actual : `~numpy.ndarray` (N, M), optional + The 1-sigma uncertainties of the actual coordinates. Can be + None, in which case chi2 will be return as a NaN. + covariances_actual : list of N `~numpy.ndarray`s (M, M) + The covariance matrix in M dimensions for each + actual observation if available. + include_probabilistic : bool, optional + Include the calculation of mahalanobis distance and the associated + probability. + + Returns + ------- + residuals : `~numpy.ndarray` (N, 2) + Residuals for each coordinate (actual - desired) + stats : tuple (1 or 3) + chi2 : `~numpy.ndarray` (N) + Chi-squared for each observation given the 1-sigma + uncertainties. NaN if no sigmas_actual is None. + p : `~numpy.ndarray` (N) + The probability that the actual coordinates given their uncertainty + belong to the same multivariate normal distribution as the desired + coordinates. + d : `~numpy.ndarray` (N) + The Mahalanobis distance of each coordinate compared to the desired + coordinates. + """ + if ( + covariances_actual is None + and sigmas_actual is not None + and include_probabilistic + ): + covariances_actual_ = [np.diag(i ** 2) for i in sigmas_actual] + sigmas_actual_ = sigmas_actual + elif covariances_actual is not None and sigmas_actual is None: + sigmas_actual_ = np.zeros_like(coords_actual) + for i in range(len(coords_actual)): + sigmas_actual_[i] = np.diagonal(covariances_actual[i]) + sigmas_actual_ = np.sqrt(sigmas_actual_) + else: + covariances_actual_ = covariances_actual + sigmas_actual_ = sigmas_actual + + residuals, chi2 = calcSimpleResiduals( + coords_actual, coords_desired, sigmas_actual=sigmas_actual_ + ) + + if include_probabilistic: + p, d = calcProbabilisticResiduals( + coords_actual, coords_desired, covariances_actual_ + ) + stats = (chi2, p, d) + else: + + stats = (chi2,) + + return residuals, stats + + +def calcSimpleResiduals( + coords_actual, + coords_desired, + sigmas_actual=None, +): + """ + Calculate residuals and the associated chi2 if the + 1-sigma uncertainties on the actual quantities are provided. + + Parameters + ---------- + coords_actual : `~numpy.ndarray` (N, M) + Actual N coordinates in M dimensions. + coords_desired : `~numpy.ndarray` (N, M) + The desired N coordinates in M dimensions. + sigmas_actual : `~numpy.ndarray` (N, M), optional + The 1-sigma uncertainties of the actual coordinates. Can be + None, in which case chi2 will be return as a NaN. + + Returns + ------- + residuals : `~numpy.ndarray` (N, 2) + Residuals for each coordinate (actual - desired) + chi2 : `~numpy.ndarray` (N) + Chi-squared for each observation given the 1-sigma + uncertainties. NaN if no sigmas_actual is None. + """ + residuals = np.zeros_like(coords_actual) + + ra = coords_actual[:, 0] + dec = coords_actual[:, 1] + ra_pred = coords_desired[:, 0] + dec_pred = coords_desired[:, 1] + + # Calculate residuals in RA, make sure to fix any potential wrap around errors + residual_ra = ra - ra_pred + residual_ra = np.where(residual_ra > 180.0, 360.0 - residual_ra, residual_ra) + residual_ra *= np.cos(np.radians(dec_pred)) + + # Calculate residuals in Dec + residual_dec = dec - dec_pred + + if isinstance(sigmas_actual, np.ndarray): + sigma_ra = sigmas_actual[:, 0] + sigma_dec = sigmas_actual[:, 1] + + # Calculate chi2 + chi2 = (residual_ra ** 2 / sigma_ra ** 2) + (residual_dec ** 2 / sigma_dec ** 2) + else: + chi2 = np.NaN + + residuals[:, 0] = residual_ra + residuals[:, 1] = residual_dec + + return residuals, chi2 + + +def calcProbabilisticResiduals(coords_actual, coords_desired, covariances_actual): + """ + Calculate the probabilistic residual. + + Parameters + ---------- + coords_actual : `~numpy.ndarray` (N, M) + Actual N coordinates in M dimensions. + coords_desired : `~numpy.ndarray` (N, M) + The desired N coordinates in M dimensions. + sigmas_actual : `~numpy.ndarray` (N, M) + The 1-sigma uncertainties of the actual coordinates. + covariances_actual : list of N `~numpy.ndarray`s (M, M) + The covariance matrix in M dimensions for each + actual observation if available. + + Returns + ------- + p : `~numpy.ndarray` (N) + The probability that the actual coordinates given their uncertainty + belong to the same multivariate normal distribution as the desired + coordinates. + d : `~numpy.ndarray` (N) + The Mahalanobis distance of each coordinate compared to the desired + coordinates. + """ + d = np.zeros(len(coords_actual)) + p = np.zeros(len(coords_actual)) + + for i, (actual, desired, covar) in enumerate( + zip(coords_actual, coords_desired, covariances_actual) + ): + # Calculate the degrees of freedom + k = len(actual) + + # Calculate the mahalanobis distance between the two coordinates + d_i = mahalanobis(actual, desired, np.linalg.inv(covar)) + + # Calculate the probability that both sets of coordinates are drawn from + # the same multivariate normal + p_i = 1 - chi2.cdf(d_i, k) + + # Append results + d[i] = d_i + p[i] = p_i + + return p, d diff --git a/precovery/utils.py b/precovery/utils.py new file mode 100644 index 0000000..9dd9b0c --- /dev/null +++ b/precovery/utils.py @@ -0,0 +1,136 @@ +import logging +import multiprocessing as mp +import signal + +import numpy as np +import pandas as pd + +__all__ = ["yieldChunks", "calcChunkSize", "_initWorker", "_checkParallel"] + +logger = logging.getLogger(__name__) + + +def yieldChunks(indexable, chunk_size): + """ + Generator that yields chunks of size chunk_size. + + Parameters + ---------- + indexable : list, `~numpy.ndarray`, `~pandas.DataFrame`, `~pandas.Series` (N) + Indexable object that needs to be divided into + chunks. + chunk_size : int + Size of each chunk. + + Yields + ------ + chunk : indexable (<=chunk_size) + Chunks of indexable + """ + if isinstance(indexable, list) or isinstance(indexable, np.ndarray): + for c in range(0, len(indexable), chunk_size): + yield indexable[c : c + chunk_size] + elif isinstance(indexable, pd.DataFrame) or isinstance(indexable, pd.Series): + for c in range(0, len(indexable), chunk_size): + yield indexable.iloc[c : c + chunk_size] + else: + err = "Indexable should be one of {list, `~numpy.ndarray`, `~pandas.DataFrame`, `~pandas.Series`}" + raise ValueError(err) + + +def calcChunkSize(n, num_workers, max_chunk_size, min_chunk_size=1): + """ + Calculate the optimal chunk size such that each worker gets at + least min_chunk_size chunks but does not get more than max_chunk_size + chunks. The goal is for no worker to be idle in terms of the number of items + it recieves + + Parameters + ---------- + n : int + Number of items. + num_workers : int + Number of workers to which items will be distributed. + max_chunk_size : int + Maximum chunk size to be given to each worker. + min_chunk_size : int, optional + Minimum chunk size to be given to each worker. + + Yields + ------ + chunk_size : int + Chunk_size between min_chunk_size and max_chunk_size. + """ + # Calculate the number of n that should be sent to each worker + c = np.maximum(np.floor(n / num_workers), min_chunk_size).astype(int) + + # Make sure this number does not exceed the maximum chunk size + chunk_size = np.minimum(c, max_chunk_size) + return chunk_size + + +def _initWorker(): + """ + Tell multiprocessing worker to ignore signals, will only + listen to parent process. + """ + signal.signal(signal.SIGINT, signal.SIG_IGN) + return + + +def _checkParallel(num_jobs, parallel_backend): + """ + Helper function to determine how many workers (jobs) should be submitted. + If the parallelization backend is Python's multiprocessing ('mp') and num_jobs + is either "auto" or None, then mp.cpu_count() will be used to determine the number + of jobs. If num_jobs is not "auto" or None, then that number will be used instead. + If the parallelization backend is ray, then the number of resources on the cluster will + determine the number of workers, and num_jobs is ignored. + + Parameters + ---------- + num_jobs : {None, "auto", int} + Number of jobs to launch. + parallel_backend : str + Name of backend. Should be one of {'ray', 'mp'}. + + Returns + ------- + enable_parallel : bool + True if parallelization should be used (true for cases where num_jobs > 1). + num_workers : int + The number of workers to use. + + Raises + ------ + ValueError : If parallel_backend is not one of {'ray', 'mp'}. + """ + if isinstance(num_jobs, str) or (num_jobs is None) or (num_jobs > 1): + + # Check that pareallel_backend is one of the support types + backends = ["ray", "mp"] + if parallel_backend not in backends: + err = "parallel_backend should be one of {'ray', 'mp'}" + raise ValueError(err) + + enable_parallel = True + if parallel_backend == "ray": + import ray + + if not num_jobs == "auto" or num_jobs is not None: + logger.warning( + "This process is running with the ray parallelization backend: num_jobs parameter will be ignored." + ) + + num_workers = int(ray.cluster_resources()["CPU"]) + + else: + if num_jobs == "auto" or num_jobs is None: + num_workers = mp.cpu_count() + else: + num_workers = num_jobs + else: + num_workers = 1 + enable_parallel = False + + return enable_parallel, num_workers From cf584f5479c667725bdffe67b3fb0209ead56562 Mon Sep 17 00:00:00 2001 From: ntellis Date: Thu, 15 Dec 2022 02:11:05 -0800 Subject: [PATCH 02/12] are these all reformat changes? --- precovery/brute_force.py | 106 ++++++++++++++++++++++++-------------- precovery/frame_db.py | 5 +- precovery/healpix_geom.py | 18 +++++-- precovery/precovery_db.py | 82 ++--------------------------- 4 files changed, 84 insertions(+), 127 deletions(-) diff --git a/precovery/brute_force.py b/precovery/brute_force.py index 79fcca4..920a6be 100644 --- a/precovery/brute_force.py +++ b/precovery/brute_force.py @@ -14,27 +14,27 @@ import multiprocessing as mp import time from functools import partial +from typing import Iterable, List, Optional, Tuple import numpy as np import pandas as pd from astropy.time import Time from sklearn.neighbors import BallTree +from .healpix_geom import radec_to_healpixel, radec_to_healpixel_with_neighbors + # replace this usage with Orbit.compute_ephemeris from .orbit import Orbit from .residuals import calcResiduals -from .healpix_geom import radec_to_healpixel, radec_to_healpixel_with_neighbors from .utils import _checkParallel, _initWorker, calcChunkSize, yieldChunks -from typing import ( - Iterable, - Optional, - Tuple, - List -) logger = logging.getLogger(__name__) -__all__ = ["get_observations_and_ephemerides_for_orbits", "attribution_worker", "attributeObservations"] +__all__ = [ + "get_observations_and_ephemerides_for_orbits", + "attribution_worker", + "attributeObservations", +] def get_observations_and_ephemerides_for_orbits( @@ -42,7 +42,7 @@ def get_observations_and_ephemerides_for_orbits( mjd_start: float, mjd_end: float, precovery_db: PrecoveryDatabase, - obscode: str = "W84" + obscode: str = "W84", ): """ Compute ephemeris for the orbit, propagated to an epoch, and observed from @@ -50,10 +50,12 @@ def get_observations_and_ephemerides_for_orbits( obscode should be a Minor Planet Center observatory code. """ - + # Find all the mjd we need to propagate to all_frame_mjd = precovery_db.frames.idx.unique_frame_times() - frame_mjd_within_range = [x for x in all_frame_mjd if (x > mjd_start and x < mjd_end)] + frame_mjd_within_range = [ + x for x in all_frame_mjd if (x > mjd_start and x < mjd_end) + ] ephemeris_dfs = [] frame_dfs = [] @@ -63,27 +65,40 @@ def get_observations_and_ephemerides_for_orbits( mjd = [w.mjd for w in eph] ra = [w.ra for w in eph] dec = [w.dec for w in eph] - ephemeris_df = pd.DataFrame.from_dict({ - 'mjd_utc': mjd, - 'RA_deg': ra, - 'Dec_deg': dec, - }) + ephemeris_df = pd.DataFrame.from_dict( + { + "mjd_utc": mjd, + "RA_deg": ra, + "Dec_deg": dec, + } + ) ephemeris_df["orbit_id"] = orbit.orbit_id ephemeris_df["observatory_code"] = obscode ephemeris_dfs.append(ephemeris_df) # Now we gathetr the healpixels and neighboring pixels for each propagated position - healpix = radec_to_healpixel_with_neighbors(ra, dec, precovery_db.frames.healpix_nside) - - frame_df = pd.concat([pd.DataFrame.from_dict({ - "mjd_utc": [x[0] for y in range(9)], - "obscode": [x[1] for y in range(9)], - "healpix": list(x[2]) - }) for x in zip(mjd, ephemeris_df["observatory_code"], list(healpix.transpose()))], - ignore_index=True) + healpix = radec_to_healpixel_with_neighbors( + ra, dec, precovery_db.frames.healpix_nside + ) + + frame_df = pd.concat( + [ + pd.DataFrame.from_dict( + { + "mjd_utc": [x[0] for y in range(9)], + "obscode": [x[1] for y in range(9)], + "healpix": list(x[2]), + } + ) + for x in zip( + mjd, ephemeris_df["observatory_code"], list(healpix.transpose()) + ) + ], + ignore_index=True, + ) frame_dfs.append(frame_df) - # This will be passed back to be used as the ephemeris dataframe later + # This will be passed back to be used as the ephemeris dataframe later ephemeris = pd.concat(ephemeris_dfs, ignore_index=True) unique_frames = pd.concat(frame_dfs, ignore_index=True).drop_duplicates() @@ -91,19 +106,18 @@ def get_observations_and_ephemerides_for_orbits( filtered_frames = [] all_frames = precovery_db.frames.idx.frames_by_date(mjd_start, mjd_end) for frame in all_frames: - if ((unique_frames['mjd_utc'] == frame.mjd) & - (unique_frames['obscode'] == frame.obscode) & - (unique_frames['healpix'] == frame.healpixel)).any(): + if ( + (unique_frames["mjd_utc"] == frame.mjd) + & (unique_frames["obscode"] == frame.obscode) + & (unique_frames["healpix"] == frame.healpixel) + ).any(): filtered_frames.append(frame) - observations = precovery_db.extract_observations_by_frames(filtered_frames) return ephemeris, observations - - def attribution_worker( ephemeris, observations, @@ -122,7 +136,6 @@ def attribution_worker( format="mjd", ) - # Group the predicted ephemerides and observations by visit / exposure observations_grouped = observations.groupby(by=["observatory_code", "mjd_utc"]) observations_visits = [ @@ -137,7 +150,9 @@ def attribution_worker( indices_to_drop = pd.Int64Index([]) for g_key in list(ephemeris_pre_grouped.groups.keys()): if g_key not in obs_group_keys: - indices_to_drop = indices_to_drop.union(ephemeris_pre_grouped.get_group(g_key).index) + indices_to_drop = indices_to_drop.union( + ephemeris_pre_grouped.get_group(g_key).index + ) ephemeris_filtered = ephemeris.drop(indices_to_drop) @@ -277,7 +292,9 @@ def attributeObservations( attribution_dfs = [] # prepare ephemeris and observation dictionaries - ephemeris, observations = get_observations_and_ephemerides_for_orbits(orbits, mjd_start, mjd_end, precovery_db) + ephemeris, observations = get_observations_and_ephemerides_for_orbits( + orbits, mjd_start, mjd_end, precovery_db + ) parallel, num_workers = _checkParallel(num_jobs, parallel_backend) if num_workers > 1: @@ -292,11 +309,17 @@ def attributeObservations( num_orbits, num_workers, orbits_chunk_size, min_chunk_size=1 ) - orbits_split = [orbits[i:i + chunk_size_] for i in range(0, len(orbits), chunk_size_)] + orbits_split = [ + orbits[i : i + chunk_size_] for i in range(0, len(orbits), chunk_size_) + ] eph_split = [] for orbit_c in orbits.split(orbits_chunk_size): - eph_split.append(ephemeris[ephemeris["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c])]) + eph_split.append( + ephemeris[ + ephemeris["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c]) + ] + ) for observations_c in yieldChunks(observations, observations_chunk_size): obs = [observations_c for i in range(len(orbits_split))] @@ -319,9 +342,14 @@ def attributeObservations( else: for observations_c in yieldChunks(observations, observations_chunk_size): - for orbit_c in [orbits[i:i + orbits_chunk_size] for i in range(0, len(orbits), orbits_chunk_size)]: - - eph_c = ephemeris[ephemeris["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c])] + for orbit_c in [ + orbits[i : i + orbits_chunk_size] + for i in range(0, len(orbits), orbits_chunk_size) + ]: + + eph_c = ephemeris[ + ephemeris["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c]) + ] attribution_df_i = attribution_worker( eph_c, observations_c, diff --git a/precovery/frame_db.py b/precovery/frame_db.py index b8ace00..b5dc7a1 100644 --- a/precovery/frame_db.py +++ b/precovery/frame_db.py @@ -278,10 +278,7 @@ def unique_frame_times(self) -> Iterable[float]: This is needed for brute force propagation to each time, prior to extraction of observations """ - select_stmt = ( - sq.select(self.frames.c.mjd) - .distinct() - ) + select_stmt = sq.select(self.frames.c.mjd).distinct() mjds = self.dbconn.execute(select_stmt).fetchall() return [x[0] for x in mjds] diff --git a/precovery/healpix_geom.py b/precovery/healpix_geom.py index 08de892..6f5de75 100644 --- a/precovery/healpix_geom.py +++ b/precovery/healpix_geom.py @@ -2,10 +2,18 @@ import numpy as np import numpy.typing as npt -def radec_to_healpixel(ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int) -> npt.NDArray[np.int64]: + +def radec_to_healpixel( + ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int +) -> npt.NDArray[np.int64]: return hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) -def radec_to_healpixel_with_neighbors(ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int) -> npt.NDArray[np.int64]: - healpix = hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) - neighbors = hp.pixelfunc.get_all_neighbours(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) - return np.append(neighbors, [healpix], axis=0) \ No newline at end of file + +def radec_to_healpixel_with_neighbors( + ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int +) -> npt.NDArray[np.int64]: + healpix = hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) + neighbors = hp.pixelfunc.get_all_neighbours( + nside, ra, dec, nest=True, lonlat=True + ).astype(np.int64) + return np.append(neighbors, [healpix], axis=0) diff --git a/precovery/precovery_db.py b/precovery/precovery_db.py index 84957b5..42249b3 100644 --- a/precovery/precovery_db.py +++ b/precovery/precovery_db.py @@ -403,12 +403,7 @@ def _check_frames( n_frame += 1 logger.info("checked %d frames", n_frame) - - - def extract_observations_by_frames( - self, - frames: Iterable[HealpixFrame] - ): + def extract_observations_by_frames(self, frames: Iterable[HealpixFrame]): # consider warnings for available memory obs_out = pd.DataFrame( columns=[ @@ -436,9 +431,7 @@ def extract_observations_by_frames( ) obs_ids = np.append( obs_ids, - np.array( - [[obs.id.decode()]] - ), + np.array([[obs.id.decode()]]), axis=0, ) if np.any(inc_arr): @@ -467,75 +460,6 @@ def extract_observations_by_date( # consider warnings for available memory frames = self.frames.idx.frames_by_date(mjd_start, mjd_end) - - return self.extract_observations_by_frames(frames) - + return self.extract_observations_by_frames(frames) -def precover_brute_force( - self, - orbit: Orbit, - tolerance: float = 30 * ARCSEC, - start_mjd: Optional[float] = None, - end_mjd: Optional[float] = None, -): - """ - Find observations which match orbit in the database. Observations are - searched in descending order by mjd. - - orbit: The orbit to match. - - max_matches: End once this many matches have been found. If None, find - all matches. - - start_mjd: Only consider observations from after this epoch - (inclusive). If None, find all. - - end_mjd: Only consider observations from before this epoch (inclusive). - If None, find all. - """ - # basically: - """ - find all dates we need to propagate to - propagate orbit to each of these dates - get all intersecting healpix frames at these dates - grab neighboring healpixels - extract all observations from these healpixels - do the kd-tree stuff from original frame_db work - ??? - """ - if start_mjd is None or end_mjd is None: - first, last = self.frames.idx.mjd_bounds() - if start_mjd is None: - start_mjd = first - if end_mjd is None: - end_mjd = last - - n = 0 - logger.info( - "precovering orbit %s from %f.6f to %f.5f, window=%d", - orbit.orbit_id, - start_mjd, - end_mjd, - ) - - windows = self.frames.idx.window_centers(start_mjd, end_mjd) - - # group windows by obscodes so that many windows can be searched at once - for obscode, obs_windows in itertools.groupby(windows, key=lambda pair: pair[1]): - mjds = [window[0] for window in obs_windows] - matches = self._check_windows( - mjds, - obscode, - orbit, - tolerance, - start_mjd=start_mjd, - end_mjd=end_mjd, - window_size=window_size, - include_frame_candidates=include_frame_candidates, - ) - for result in matches: - yield result - n += 1 - if max_matches is not None and n >= max_matches: - return From 57ade92aa632431f76b2017980d9143749a45b99 Mon Sep 17 00:00:00 2001 From: ntellis Date: Thu, 15 Dec 2022 11:24:39 -0800 Subject: [PATCH 03/12] docstrings --- precovery/brute_force.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/precovery/brute_force.py b/precovery/brute_force.py index 920a6be..0c7cdbe 100644 --- a/precovery/brute_force.py +++ b/precovery/brute_force.py @@ -45,10 +45,14 @@ def get_observations_and_ephemerides_for_orbits( obscode: str = "W84", ): """ - Compute ephemeris for the orbit, propagated to an epoch, and observed from - a location represented by obscode. - - obscode should be a Minor Planet Center observatory code. + Returns: + - ephemerides for the orbit list, propagated to all epochs for frames in + the range (mjd_start, mjd_end) + - observations in the indexed PrecoveryDatabase consistent with intersecting + and neighboring frames for orbits propagated to each of the represented epochs + + ***Currently breaks on non-NSC datasets + TODO propagation targets should be unique frames wrt obscode, mjd """ # Find all the mjd we need to propagate to @@ -123,7 +127,15 @@ def attribution_worker( observations, eps=1 / 3600, include_probabilistic=True, -): +): + + """ + gather attributions for a df of ephemerides, observations + + First filters ephemerides to match the chunked observations + + """ + # Create observer's dictionary from observations observers = {} From 2ecc526ef3cfa8d2af6f9c44943afe24d174d554 Mon Sep 17 00:00:00 2001 From: Nate Tellis Date: Tue, 17 Jan 2023 08:29:34 -0600 Subject: [PATCH 04/12] interim commit - incorporated ak, jm suggestions - refactored brute force to use smaller, testable functions. still todo: add obscode filter to get_frame_times --- precovery/brute_force.py | 203 +++++++++++++++++++------------------- precovery/frame_db.py | 45 ++++++++- precovery/healpix_geom.py | 21 ++-- precovery/precovery_db.py | 3 + precovery/residuals.py | 4 +- precovery/utils.py | 68 +------------ 6 files changed, 160 insertions(+), 184 deletions(-) diff --git a/precovery/brute_force.py b/precovery/brute_force.py index 0c7cdbe..faf1ece 100644 --- a/precovery/brute_force.py +++ b/precovery/brute_force.py @@ -1,125 +1,127 @@ import os -from sqlalchemy import all_ - from precovery.precovery_db import PrecoveryDatabase -os.environ["OMP_NUM_THREADS"] = "1" -os.environ["OPENBLAS_NUM_THREADS"] = "1" -os.environ["MKL_NUM_THREADS"] = "1" -os.environ["VECLIB_MAXIMUM_THREADS"] = "1" -os.environ["NUMEXPR_NUM_THREADS"] = "1" - import logging import multiprocessing as mp import time from functools import partial -from typing import Iterable, List, Optional, Tuple +from typing import Iterable import numpy as np import pandas as pd from astropy.time import Time from sklearn.neighbors import BallTree -from .healpix_geom import radec_to_healpixel, radec_to_healpixel_with_neighbors - +from .healpix_geom import radec_to_healpixel # replace this usage with Orbit.compute_ephemeris from .orbit import Orbit -from .residuals import calcResiduals -from .utils import _checkParallel, _initWorker, calcChunkSize, yieldChunks +from .residuals import calc_residuals +from .utils import calcChunkSize, yieldChunks + +os.environ["OMP_NUM_THREADS"] = "1" +os.environ["OPENBLAS_NUM_THREADS"] = "1" +os.environ["MKL_NUM_THREADS"] = "1" +os.environ["VECLIB_MAXIMUM_THREADS"] = "1" +os.environ["NUMEXPR_NUM_THREADS"] = "1" + logger = logging.getLogger(__name__) __all__ = [ - "get_observations_and_ephemerides_for_orbits", + "get_frame_times_by_obscode", + "ephemerides_from_orbits", + "intersecting_frames", "attribution_worker", - "attributeObservations", + "attribute_observations", ] -def get_observations_and_ephemerides_for_orbits( - orbits: Iterable[Orbit], +def get_frame_times_by_obscode( mjd_start: float, mjd_end: float, precovery_db: PrecoveryDatabase, - obscode: str = "W84", ): - """ - Returns: - - ephemerides for the orbit list, propagated to all epochs for frames in - the range (mjd_start, mjd_end) - - observations in the indexed PrecoveryDatabase consistent with intersecting - and neighboring frames for orbits propagated to each of the represented epochs - - ***Currently breaks on non-NSC datasets - TODO propagation targets should be unique frames wrt obscode, mjd - """ - - # Find all the mjd we need to propagate to all_frame_mjd = precovery_db.frames.idx.unique_frame_times() frame_mjd_within_range = [ - x for x in all_frame_mjd if (x > mjd_start and x < mjd_end) + x for x in all_frame_mjd if (x[0] > mjd_start and x[0] < mjd_end) ] + frame_mjds_by_obscode = dict() + for mjd, obscode in frame_mjd_within_range: + frame_mjds_by_obscode.setdefault(obscode, []).append(mjd) + + return frame_mjds_by_obscode + + +def ephemerides_from_orbits( + orbits: Iterable[Orbit], + epochs: Iterable[tuple], +): + ephemeris_dfs = [] - frame_dfs = [] - # this should pass through obscode...rather should use the relevant frames' obs codes for orbit in orbits: - eph = orbit.compute_ephemeris(obscode=obscode, epochs=frame_mjd_within_range) - mjd = [w.mjd for w in eph] - ra = [w.ra for w in eph] - dec = [w.dec for w in eph] - ephemeris_df = pd.DataFrame.from_dict( - { - "mjd_utc": mjd, - "RA_deg": ra, - "Dec_deg": dec, - } - ) - ephemeris_df["orbit_id"] = orbit.orbit_id - ephemeris_df["observatory_code"] = obscode - ephemeris_dfs.append(ephemeris_df) + for obscode in epochs.keys(): + eph = orbit.compute_ephemeris(obscode=obscode, epochs=epochs[obscode]) + mjd = [w.mjd for w in eph] + ra = [w.ra for w in eph] + dec = [w.dec for w in eph] + ephemeris_df = pd.DataFrame.from_dict( + { + "mjd_utc": mjd, + "RA_deg": ra, + "Dec_deg": dec, + } + ) + ephemeris_df["orbit_id"] = orbit.orbit_id + ephemeris_df["observatory_code"] = obscode + ephemeris_dfs.append(ephemeris_df) - # Now we gathetr the healpixels and neighboring pixels for each propagated position - healpix = radec_to_healpixel_with_neighbors( - ra, dec, precovery_db.frames.healpix_nside - ) + ephemerides = pd.concat(ephemeris_dfs, ignore_index=True) + return ephemerides - frame_df = pd.concat( - [ - pd.DataFrame.from_dict( - { - "mjd_utc": [x[0] for y in range(9)], - "obscode": [x[1] for y in range(9)], - "healpix": list(x[2]), - } - ) - for x in zip( - mjd, ephemeris_df["observatory_code"], list(healpix.transpose()) - ) - ], - ignore_index=True, - ) - frame_dfs.append(frame_df) - # This will be passed back to be used as the ephemeris dataframe later - ephemeris = pd.concat(ephemeris_dfs, ignore_index=True) - unique_frames = pd.concat(frame_dfs, ignore_index=True).drop_duplicates() +def intersecting_frames( + ephemerides, + precovery_db: PrecoveryDatabase, + neighbors=False, +): - # This filter is very inefficient - there is surely a better way to do this - filtered_frames = [] - all_frames = precovery_db.frames.idx.frames_by_date(mjd_start, mjd_end) - for frame in all_frames: - if ( - (unique_frames["mjd_utc"] == frame.mjd) - & (unique_frames["obscode"] == frame.obscode) - & (unique_frames["healpix"] == frame.healpixel) - ).any(): - filtered_frames.append(frame) + frame_identifier_dfs = [] + healpixel = radec_to_healpixel( + list(ephemerides["RA_deg"]), + list(ephemerides["Dec_deg"]), + precovery_db.frames.healpix_nside, + include_neighbors=neighbors + ) + + # we are reformatting this dataframe to account for the 9 healpixels returned from + # radec_to_healpixel (central plus eight neighbors) + frame_identifier_df = pd.concat( + [ + pd.DataFrame.from_dict( + { + "mjd_utc": [x[0] for y in range(9)], + "obscode": [x[1] for y in range(9)], + "healpixel": list(x[2]), + } + ) + for x in zip( + ephemerides["mjd_utc"], ephemerides["observatory_code"], list(healpixel.transpose()) + ) + ], + ignore_index=True, + ) + frame_identifier_dfs.append(frame_identifier_df) - observations = precovery_db.extract_observations_by_frames(filtered_frames) + unique_frame_identifier_df = pd.concat(frame_identifier_dfs, ignore_index=True).drop_duplicates() - return ephemeris, observations + filtered_frames = [] + for fi in unique_frame_identifier_df.itertuples(): + for f in precovery_db.frames.idx.frames_by_obscode_mjd_hp(fi.obscode, fi.mjd_utc, fi.healpixel): + filtered_frames.append(f) + + return filtered_frames def attribution_worker( @@ -127,7 +129,7 @@ def attribution_worker( observations, eps=1 / 3600, include_probabilistic=True, -): +): """ gather attributions for a df of ephemerides, observations @@ -136,7 +138,6 @@ def attribution_worker( """ - # Create observer's dictionary from observations observers = {} for observatory_code in observations["observatory_code"].unique(): @@ -232,7 +233,7 @@ def attribution_worker( obs_times_associated.append(obs_times[mask[0]]) distances.append(d[mask].reshape(-1, 1)) - residuals_visit, stats_visit = calcResiduals( + residuals_visit, stats_visit = calc_residuals( coords[mask[0]], coords_predicted[i[mask]], sigmas_actual=coords_sigma[mask[0]], @@ -282,7 +283,7 @@ def attribution_worker( return attributions -def attributeObservations( +def attribute_observations( orbits, mjd_start: float, mjd_end: float, @@ -293,8 +294,7 @@ def attributeObservations( backend_kwargs={}, orbits_chunk_size=10, observations_chunk_size=100000, - num_jobs=1, - parallel_backend="mp", + num_jobs: int = 1, ): logger.info("Running observation attribution...") time_start = time.time() @@ -304,17 +304,18 @@ def attributeObservations( attribution_dfs = [] # prepare ephemeris and observation dictionaries - ephemeris, observations = get_observations_and_ephemerides_for_orbits( - orbits, mjd_start, mjd_end, precovery_db - ) - - parallel, num_workers = _checkParallel(num_jobs, parallel_backend) + # ephemeris, observations = get_observations_and_ephemerides_for_orbits( + # orbits, mjd_start, mjd_end, precovery_db + # ) + + epochs = get_frame_times_by_obscode(mjd_start, mjd_end, precovery_db=precovery_db) + ephemerides = ephemerides_from_orbits(orbits, epochs) + frames_to_search = intersecting_frames(ephemerides, precovery_db=precovery_db, neighbors=True) + observations = precovery_db.extract_observations_by_frames(frames_to_search) + num_workers = min(num_jobs, mp.cpu_count() + 1) if num_workers > 1: - p = mp.Pool( - processes=num_workers, - initializer=_initWorker, - ) + p = mp.Pool(processes=num_workers) # Send up to orbits_chunk_size orbits to each OD worker for processing chunk_size_ = calcChunkSize( @@ -328,8 +329,8 @@ def attributeObservations( eph_split = [] for orbit_c in orbits.split(orbits_chunk_size): eph_split.append( - ephemeris[ - ephemeris["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c]) + ephemerides[ + ephemerides["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c]) ] ) for observations_c in yieldChunks(observations, observations_chunk_size): @@ -359,8 +360,8 @@ def attributeObservations( for i in range(0, len(orbits), orbits_chunk_size) ]: - eph_c = ephemeris[ - ephemeris["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c]) + eph_c = ephemerides[ + ephemerides["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c]) ] attribution_df_i = attribution_worker( eph_c, diff --git a/precovery/frame_db.py b/precovery/frame_db.py index db869f7..f802f20 100644 --- a/precovery/frame_db.py +++ b/precovery/frame_db.py @@ -300,9 +300,9 @@ def unique_frame_times(self) -> Iterable[float]: This is needed for brute force propagation to each time, prior to extraction of observations """ - select_stmt = sq.select(self.frames.c.mjd).distinct() - mjds = self.dbconn.execute(select_stmt).fetchall() - return [x[0] for x in mjds] + select_stmt = sq.select(self.frames.c.mjd, self.frames.c.obscode).distinct() + mjd_obscode_pairs = self.dbconn.execute(select_stmt).fetchall() + return [(x[0], x[1]) for x in mjd_obscode_pairs] def frames_for_bundle( self, bundle: FrameBundleDescription @@ -445,6 +445,45 @@ def frames_by_date( for row in result: yield HealpixFrame(*row) + def frames_by_obscode_mjd_hp( + self, obscode: str, mjd: float, healpixel: int, nside_search: int = None + ) -> Iterator[HealpixFrame]: + """ + Returns all frames in the index for a particular obscode, mjd, healpixel. + + Optionally, this can be performed at an nside value lower than the nside value + that the database is indexed to, in which case it will include intersecting frames + at that less-granular healpixel grid. + """ + + # healpixel resample here + # then we use self.frames.c.healpixel.in_(list) in the where condition to select + # those that are within the requested healpixel range + + select_stmt = ( + sq.select( + self.frames.c.id, + self.frames.c.dataset_id, + self.frames.c.obscode, + self.frames.c.exposure_id, + self.frames.c.filter, + self.frames.c.mjd, + self.frames.c.healpixel, + self.frames.c.data_uri, + self.frames.c.data_offset, + self.frames.c.data_length, + ) + .where( + self.frames.c.mjd == mjd, + self.frames.c.obscode == obscode, + self.frames.c.healpixel == healpixel, + ) + .order_by(self.frames.c.obscode, self.frames.c.mjd, self.frames.c.healpixel) + ) + result = self.dbconn.execute(select_stmt) + for row in result: + yield HealpixFrame(*row) + def add_frames(self, frames: list[HealpixFrame]): """ Add one or more frames to the index. diff --git a/precovery/healpix_geom.py b/precovery/healpix_geom.py index 6f5de75..8d0c262 100644 --- a/precovery/healpix_geom.py +++ b/precovery/healpix_geom.py @@ -4,16 +4,15 @@ def radec_to_healpixel( - ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int + ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int, + include_neighbors: bool = False ) -> npt.NDArray[np.int64]: - return hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) - -def radec_to_healpixel_with_neighbors( - ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int -) -> npt.NDArray[np.int64]: - healpix = hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) - neighbors = hp.pixelfunc.get_all_neighbours( - nside, ra, dec, nest=True, lonlat=True - ).astype(np.int64) - return np.append(neighbors, [healpix], axis=0) + if (not include_neighbors): + return hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) + else: + healpix = hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) + neighbors = hp.pixelfunc.get_all_neighbours( + nside, ra, dec, nest=True, lonlat=True + ).astype(np.int64) + return np.append(neighbors, [healpix], axis=0) \ No newline at end of file diff --git a/precovery/precovery_db.py b/precovery/precovery_db.py index 05043eb..d924757 100644 --- a/precovery/precovery_db.py +++ b/precovery/precovery_db.py @@ -442,7 +442,10 @@ def extract_observations_by_frames(self, frames: Iterable[HealpixFrame]): ] ) frame_dfs = [] + # iterate over frames, initially accumulating observations in numpy arrays for speed + # over loop, build dataframe of observations within each frame with shared frame scalars for frame in frames: + # can't mix numpy array types, so two accumulators: one for floats, one for obs_id strings inc_arr = np.empty((0, 5), float) obs_ids = np.empty((0, 1), object) for obs in self.frames.iterate_observations(frame): diff --git a/precovery/residuals.py b/precovery/residuals.py index 49ac997..927c2aa 100644 --- a/precovery/residuals.py +++ b/precovery/residuals.py @@ -2,10 +2,10 @@ from scipy.spatial.distance import mahalanobis from scipy.stats import chi2 -__all__ = ["calcResiduals", "calcSimpleResiduals", "calcProbabilisticResiduals"] +__all__ = ["calc_residuals", "calcSimpleResiduals", "calcProbabilisticResiduals"] -def calcResiduals( +def calc_residuals( coords_actual, coords_desired, sigmas_actual=None, diff --git a/precovery/utils.py b/precovery/utils.py index 9dd9b0c..10d8ab9 100644 --- a/precovery/utils.py +++ b/precovery/utils.py @@ -5,7 +5,7 @@ import numpy as np import pandas as pd -__all__ = ["yieldChunks", "calcChunkSize", "_initWorker", "_checkParallel"] +__all__ = ["yieldChunks", "calcChunkSize" ] logger = logging.getLogger(__name__) @@ -68,69 +68,3 @@ def calcChunkSize(n, num_workers, max_chunk_size, min_chunk_size=1): chunk_size = np.minimum(c, max_chunk_size) return chunk_size - -def _initWorker(): - """ - Tell multiprocessing worker to ignore signals, will only - listen to parent process. - """ - signal.signal(signal.SIGINT, signal.SIG_IGN) - return - - -def _checkParallel(num_jobs, parallel_backend): - """ - Helper function to determine how many workers (jobs) should be submitted. - If the parallelization backend is Python's multiprocessing ('mp') and num_jobs - is either "auto" or None, then mp.cpu_count() will be used to determine the number - of jobs. If num_jobs is not "auto" or None, then that number will be used instead. - If the parallelization backend is ray, then the number of resources on the cluster will - determine the number of workers, and num_jobs is ignored. - - Parameters - ---------- - num_jobs : {None, "auto", int} - Number of jobs to launch. - parallel_backend : str - Name of backend. Should be one of {'ray', 'mp'}. - - Returns - ------- - enable_parallel : bool - True if parallelization should be used (true for cases where num_jobs > 1). - num_workers : int - The number of workers to use. - - Raises - ------ - ValueError : If parallel_backend is not one of {'ray', 'mp'}. - """ - if isinstance(num_jobs, str) or (num_jobs is None) or (num_jobs > 1): - - # Check that pareallel_backend is one of the support types - backends = ["ray", "mp"] - if parallel_backend not in backends: - err = "parallel_backend should be one of {'ray', 'mp'}" - raise ValueError(err) - - enable_parallel = True - if parallel_backend == "ray": - import ray - - if not num_jobs == "auto" or num_jobs is not None: - logger.warning( - "This process is running with the ray parallelization backend: num_jobs parameter will be ignored." - ) - - num_workers = int(ray.cluster_resources()["CPU"]) - - else: - if num_jobs == "auto" or num_jobs is None: - num_workers = mp.cpu_count() - else: - num_workers = num_jobs - else: - num_workers = 1 - enable_parallel = False - - return enable_parallel, num_workers From 38cddcf7f2345939670b23b7f093db9478d728b1 Mon Sep 17 00:00:00 2001 From: Nate Tellis Date: Tue, 17 Jan 2023 08:47:26 -0600 Subject: [PATCH 05/12] testing gh auth, isort --- precovery/brute_force.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/precovery/brute_force.py b/precovery/brute_force.py index faf1ece..2c02021 100644 --- a/precovery/brute_force.py +++ b/precovery/brute_force.py @@ -1,18 +1,17 @@ -import os - -from precovery.precovery_db import PrecoveryDatabase - import logging import multiprocessing as mp +import os import time from functools import partial -from typing import Iterable +from typing import Iterable import numpy as np import pandas as pd from astropy.time import Time from sklearn.neighbors import BallTree +from precovery.precovery_db import PrecoveryDatabase + from .healpix_geom import radec_to_healpixel # replace this usage with Orbit.compute_ephemeris from .orbit import Orbit From b28fe59f15f0f5e7d330ef648f95def6fbf924ec Mon Sep 17 00:00:00 2001 From: Nate Tellis Date: Tue, 17 Jan 2023 12:20:56 -0600 Subject: [PATCH 06/12] added filtering on obscode in get_frame_times_by_obscode --- precovery/brute_force.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/precovery/brute_force.py b/precovery/brute_force.py index 2c02021..24fb01d 100644 --- a/precovery/brute_force.py +++ b/precovery/brute_force.py @@ -13,6 +13,7 @@ from precovery.precovery_db import PrecoveryDatabase from .healpix_geom import radec_to_healpixel + # replace this usage with Orbit.compute_ephemeris from .orbit import Orbit from .residuals import calc_residuals @@ -40,6 +41,7 @@ def get_frame_times_by_obscode( mjd_start: float, mjd_end: float, precovery_db: PrecoveryDatabase, + obscodes_specified: Iterable[str] = [], ): all_frame_mjd = precovery_db.frames.idx.unique_frame_times() frame_mjd_within_range = [ @@ -48,7 +50,8 @@ def get_frame_times_by_obscode( frame_mjds_by_obscode = dict() for mjd, obscode in frame_mjd_within_range: - frame_mjds_by_obscode.setdefault(obscode, []).append(mjd) + if len(obscode) == 0 or obscode in obscodes_specified: + frame_mjds_by_obscode.setdefault(obscode, []).append(mjd) return frame_mjds_by_obscode @@ -91,7 +94,7 @@ def intersecting_frames( list(ephemerides["RA_deg"]), list(ephemerides["Dec_deg"]), precovery_db.frames.healpix_nside, - include_neighbors=neighbors + include_neighbors=neighbors, ) # we are reformatting this dataframe to account for the 9 healpixels returned from @@ -106,20 +109,26 @@ def intersecting_frames( } ) for x in zip( - ephemerides["mjd_utc"], ephemerides["observatory_code"], list(healpixel.transpose()) + ephemerides["mjd_utc"], + ephemerides["observatory_code"], + list(healpixel.transpose()), ) ], ignore_index=True, ) frame_identifier_dfs.append(frame_identifier_df) - unique_frame_identifier_df = pd.concat(frame_identifier_dfs, ignore_index=True).drop_duplicates() + unique_frame_identifier_df = pd.concat( + frame_identifier_dfs, ignore_index=True + ).drop_duplicates() filtered_frames = [] for fi in unique_frame_identifier_df.itertuples(): - for f in precovery_db.frames.idx.frames_by_obscode_mjd_hp(fi.obscode, fi.mjd_utc, fi.healpixel): + for f in precovery_db.frames.idx.frames_by_obscode_mjd_hp( + fi.obscode, fi.mjd_utc, fi.healpixel + ): filtered_frames.append(f) - + return filtered_frames @@ -294,6 +303,7 @@ def attribute_observations( orbits_chunk_size=10, observations_chunk_size=100000, num_jobs: int = 1, + obscodes_specified: Iterable[str] = [], ): logger.info("Running observation attribution...") time_start = time.time() @@ -307,9 +317,16 @@ def attribute_observations( # orbits, mjd_start, mjd_end, precovery_db # ) - epochs = get_frame_times_by_obscode(mjd_start, mjd_end, precovery_db=precovery_db) + epochs = get_frame_times_by_obscode( + mjd_start, + mjd_end, + precovery_db=precovery_db, + obscodes_specified=obscodes_specified, + ) ephemerides = ephemerides_from_orbits(orbits, epochs) - frames_to_search = intersecting_frames(ephemerides, precovery_db=precovery_db, neighbors=True) + frames_to_search = intersecting_frames( + ephemerides, precovery_db=precovery_db, neighbors=True + ) observations = precovery_db.extract_observations_by_frames(frames_to_search) num_workers = min(num_jobs, mp.cpu_count() + 1) if num_workers > 1: From 76cddda3459c803e3a78287e0a9ef64dbf9baa7f Mon Sep 17 00:00:00 2001 From: Nate Tellis Date: Thu, 19 Jan 2023 23:40:20 -0800 Subject: [PATCH 07/12] updates to support per-obs mjd --- precovery/brute_force.py | 20 +++++++++++++------- precovery/frame_db.py | 35 ++++++++++++++++++++++------------- precovery/precovery_db.py | 7 ++----- 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/precovery/brute_force.py b/precovery/brute_force.py index 24fb01d..7cc304e 100644 --- a/precovery/brute_force.py +++ b/precovery/brute_force.py @@ -2,6 +2,7 @@ import multiprocessing as mp import os import time +import warnings from functools import partial from typing import Iterable @@ -13,8 +14,6 @@ from precovery.precovery_db import PrecoveryDatabase from .healpix_geom import radec_to_healpixel - -# replace this usage with Orbit.compute_ephemeris from .orbit import Orbit from .residuals import calc_residuals from .utils import calcChunkSize, yieldChunks @@ -47,11 +46,10 @@ def get_frame_times_by_obscode( frame_mjd_within_range = [ x for x in all_frame_mjd if (x[0] > mjd_start and x[0] < mjd_end) ] - frame_mjds_by_obscode = dict() - for mjd, obscode in frame_mjd_within_range: - if len(obscode) == 0 or obscode in obscodes_specified: - frame_mjds_by_obscode.setdefault(obscode, []).append(mjd) + for exposure_mjd_mid, obscode in frame_mjd_within_range: + if len(obscodes_specified) == 0 or obscode in obscodes_specified: + frame_mjds_by_obscode.setdefault(obscode, []).append(exposure_mjd_mid) return frame_mjds_by_obscode @@ -327,7 +325,15 @@ def attribute_observations( frames_to_search = intersecting_frames( ephemerides, precovery_db=precovery_db, neighbors=True ) - observations = precovery_db.extract_observations_by_frames(frames_to_search) + if len(frames_to_search) != 0: + observations = precovery_db.extract_observations_by_frames(frames_to_search) + else: + warning = ( + "No intersecting frames were found. This is unlikely unless" + " a sparsely populated database or small O(1) orbit set is used." + ) + warnings.warn(warning, UserWarning) + return [] num_workers = min(num_jobs, mp.cpu_count() + 1) if num_workers > 1: diff --git a/precovery/frame_db.py b/precovery/frame_db.py index d7c71c7..9861d52 100644 --- a/precovery/frame_db.py +++ b/precovery/frame_db.py @@ -312,7 +312,9 @@ def unique_frame_times(self) -> Iterable[float]: This is needed for brute force propagation to each time, prior to extraction of observations """ - select_stmt = sq.select(self.frames.c.mjd, self.frames.c.obscode).distinct() + select_stmt = sq.select( + self.frames.c.exposure_mjd_mid, self.frames.c.obscode + ).distinct() mjd_obscode_pairs = self.dbconn.execute(select_stmt).fetchall() return [(x[0], x[1]) for x in mjd_obscode_pairs] @@ -438,7 +440,7 @@ def frames_by_date( self, mjd_start: float, mjd_end: float ) -> Iterator[HealpixFrame]: """ - Returns all frames in the index within a date range, sorted by obscode, mjd, and healpixel. + Returns all frames in the index within a date range, sorted by obscode, exposure_mjd_mid, and healpixel. """ select_stmt = ( sq.select( @@ -447,7 +449,7 @@ def frames_by_date( self.frames.c.obscode, self.frames.c.exposure_id, self.frames.c.filter, - self.frames.c.mjd, + self.frames.c.exposure_mjd_mid, self.frames.c.healpixel, self.frames.c.data_uri, self.frames.c.data_offset, @@ -457,27 +459,27 @@ def frames_by_date( self.frames.c.mjd >= mjd_start, self.frames.c.mjd <= mjd_end, ) - .order_by(self.frames.c.obscode, self.frames.c.mjd, self.frames.c.healpixel) + .order_by( + self.frames.c.obscode, + self.frames.c.exposure_mjd_mid, + self.frames.c.healpixel, + ) ) result = self.dbconn.execute(select_stmt) for row in result: yield HealpixFrame(*row) def frames_by_obscode_mjd_hp( - self, obscode: str, mjd: float, healpixel: int, nside_search: int = None + self, obscode: str, mjd: float, healpixel: int ) -> Iterator[HealpixFrame]: """ Returns all frames in the index for a particular obscode, mjd, healpixel. - Optionally, this can be performed at an nside value lower than the nside value + TODO, this can be performed at an nside value lower than the nside value that the database is indexed to, in which case it will include intersecting frames at that less-granular healpixel grid. """ - # healpixel resample here - # then we use self.frames.c.healpixel.in_(list) in the where condition to select - # those that are within the requested healpixel range - select_stmt = ( sq.select( self.frames.c.id, @@ -485,18 +487,25 @@ def frames_by_obscode_mjd_hp( self.frames.c.obscode, self.frames.c.exposure_id, self.frames.c.filter, - self.frames.c.mjd, + self.frames.c.exposure_mjd_start, + self.frames.c.exposure_mjd_mid, + self.frames.c.exposure_duration, self.frames.c.healpixel, self.frames.c.data_uri, self.frames.c.data_offset, self.frames.c.data_length, ) .where( - self.frames.c.mjd == mjd, + self.frames.c.exposure_mjd_mid >= mjd - 1e-7, + self.frames.c.exposure_mjd_mid <= mjd + 1e-7, self.frames.c.obscode == obscode, self.frames.c.healpixel == healpixel, ) - .order_by(self.frames.c.obscode, self.frames.c.mjd, self.frames.c.healpixel) + .order_by( + self.frames.c.obscode, + self.frames.c.exposure_mjd_mid, + self.frames.c.healpixel, + ) ) result = self.dbconn.execute(select_stmt) for row in result: diff --git a/precovery/precovery_db.py b/precovery/precovery_db.py index 206b3a2..1eec6a2 100644 --- a/precovery/precovery_db.py +++ b/precovery/precovery_db.py @@ -19,7 +19,7 @@ ARCSEC = ARCMIN / 60 CANDIDATE_K = 15 -CANDIDATE_NSIDE = 2 ** CANDIDATE_K +CANDIDATE_NSIDE = 2**CANDIDATE_K logging.basicConfig() logger = logging.getLogger("precovery") @@ -524,9 +524,7 @@ def extract_observations_by_frames(self, frames: Iterable[HealpixFrame]): for obs in self.frames.iterate_observations(frame): inc_arr = np.append( inc_arr, - np.array( - [[obs.ra, obs.dec, obs.ra_sigma, obs.dec_sigma, frame.mjd]] - ), + np.array([[obs.ra, obs.dec, obs.ra_sigma, obs.dec_sigma, obs.mjd]]), axis=0, ) obs_ids = np.append( @@ -562,4 +560,3 @@ def extract_observations_by_date( frames = self.frames.idx.frames_by_date(mjd_start, mjd_end) return self.extract_observations_by_frames(frames) - From b788dbbcf9c74f72fa443dc257d28910e9ddfb68 Mon Sep 17 00:00:00 2001 From: Nate Tellis Date: Thu, 19 Jan 2023 23:52:26 -0800 Subject: [PATCH 08/12] reformatting... --- .gitignore | 1 + precovery/brute_force.py | 5 ----- precovery/residuals.py | 4 ++-- precovery/utils.py | 3 +-- 4 files changed, 4 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index cd59382..fbd0e2f 100644 --- a/.gitignore +++ b/.gitignore @@ -117,3 +117,4 @@ volume/ out/ database/ precovery/version.py +.DS_Store diff --git a/precovery/brute_force.py b/precovery/brute_force.py index 7cc304e..3e88fd9 100644 --- a/precovery/brute_force.py +++ b/precovery/brute_force.py @@ -310,11 +310,6 @@ def attribute_observations( attribution_dfs = [] - # prepare ephemeris and observation dictionaries - # ephemeris, observations = get_observations_and_ephemerides_for_orbits( - # orbits, mjd_start, mjd_end, precovery_db - # ) - epochs = get_frame_times_by_obscode( mjd_start, mjd_end, diff --git a/precovery/residuals.py b/precovery/residuals.py index 927c2aa..01f72a8 100644 --- a/precovery/residuals.py +++ b/precovery/residuals.py @@ -58,7 +58,7 @@ def calc_residuals( and sigmas_actual is not None and include_probabilistic ): - covariances_actual_ = [np.diag(i ** 2) for i in sigmas_actual] + covariances_actual_ = [np.diag(i**2) for i in sigmas_actual] sigmas_actual_ = sigmas_actual elif covariances_actual is not None and sigmas_actual is None: sigmas_actual_ = np.zeros_like(coords_actual) @@ -132,7 +132,7 @@ def calcSimpleResiduals( sigma_dec = sigmas_actual[:, 1] # Calculate chi2 - chi2 = (residual_ra ** 2 / sigma_ra ** 2) + (residual_dec ** 2 / sigma_dec ** 2) + chi2 = (residual_ra**2 / sigma_ra**2) + (residual_dec**2 / sigma_dec**2) else: chi2 = np.NaN diff --git a/precovery/utils.py b/precovery/utils.py index 10d8ab9..0bd459a 100644 --- a/precovery/utils.py +++ b/precovery/utils.py @@ -5,7 +5,7 @@ import numpy as np import pandas as pd -__all__ = ["yieldChunks", "calcChunkSize" ] +__all__ = ["yieldChunks", "calcChunkSize"] logger = logging.getLogger(__name__) @@ -67,4 +67,3 @@ def calcChunkSize(n, num_workers, max_chunk_size, min_chunk_size=1): # Make sure this number does not exceed the maximum chunk size chunk_size = np.minimum(c, max_chunk_size) return chunk_size - From 2366061f2a96721c340d926496d9595fc7007086 Mon Sep 17 00:00:00 2001 From: Nate Tellis Date: Fri, 20 Jan 2023 00:15:36 -0800 Subject: [PATCH 09/12] formatting --- precovery/healpix_geom.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/precovery/healpix_geom.py b/precovery/healpix_geom.py index 8d0c262..e1849e4 100644 --- a/precovery/healpix_geom.py +++ b/precovery/healpix_geom.py @@ -4,7 +4,7 @@ def radec_to_healpixel( - ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int, + ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int, include_neighbors: bool = False ) -> npt.NDArray[np.int64]: @@ -15,4 +15,4 @@ def radec_to_healpixel( neighbors = hp.pixelfunc.get_all_neighbours( nside, ra, dec, nest=True, lonlat=True ).astype(np.int64) - return np.append(neighbors, [healpix], axis=0) \ No newline at end of file + return np.append(neighbors, [healpix], axis=0) From cd0f243e6a87a91e14e1009818a101a57f995f47 Mon Sep 17 00:00:00 2001 From: Nate Tellis Date: Fri, 20 Jan 2023 00:24:47 -0800 Subject: [PATCH 10/12] formatting --- precovery/healpix_geom.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/precovery/healpix_geom.py b/precovery/healpix_geom.py index e1849e4..d1e7282 100644 --- a/precovery/healpix_geom.py +++ b/precovery/healpix_geom.py @@ -4,11 +4,13 @@ def radec_to_healpixel( - ra: npt.NDArray[np.float64], dec: npt.NDArray[np.float64], nside: int, - include_neighbors: bool = False + ra: npt.NDArray[np.float64], + dec: npt.NDArray[np.float64], + nside: int, + include_neighbors: bool = False, ) -> npt.NDArray[np.int64]: - if (not include_neighbors): + if not include_neighbors: return hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) else: healpix = hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) From efde510ae75c9bf4db482b174926fa1d7eeedf02 Mon Sep 17 00:00:00 2001 From: Nate Tellis Date: Thu, 26 Jan 2023 14:09:57 -0800 Subject: [PATCH 11/12] Refactored brute force to return a list of BruteForceAttributions (extending PrecoveryCandidate, as well as a sorted df of the same. --- precovery/brute_force.py | 280 ++++++++++++++++++++++++++----------- precovery/frame_db.py | 3 +- precovery/healpix_geom.py | 1 - precovery/main.py | 58 +++++--- precovery/precovery_db.py | 58 ++++++-- precovery/residuals.py | 1 - precovery/utils.py | 7 +- tests/get_orbits.py | 1 - tests/make_observations.py | 4 - 9 files changed, 286 insertions(+), 127 deletions(-) diff --git a/precovery/brute_force.py b/precovery/brute_force.py index 3e88fd9..6e3bfc5 100644 --- a/precovery/brute_force.py +++ b/precovery/brute_force.py @@ -1,3 +1,4 @@ +import dataclasses import logging import multiprocessing as mp import os @@ -11,7 +12,7 @@ from astropy.time import Time from sklearn.neighbors import BallTree -from precovery.precovery_db import PrecoveryDatabase +from precovery.precovery_db import PrecoveryCandidate, PrecoveryDatabase from .healpix_geom import radec_to_healpixel from .orbit import Orbit @@ -36,6 +37,14 @@ ] +@dataclasses.dataclass +class BruteForceAttribution(PrecoveryCandidate): + chi2: float + orbit_id: str + probability: float + mahalanobis_distance: float + + def get_frame_times_by_obscode( mjd_start: float, mjd_end: float, @@ -58,7 +67,6 @@ def ephemerides_from_orbits( orbits: Iterable[Orbit], epochs: Iterable[tuple], ): - ephemeris_dfs = [] for orbit in orbits: for obscode in epochs.keys(): @@ -66,11 +74,15 @@ def ephemerides_from_orbits( mjd = [w.mjd for w in eph] ra = [w.ra for w in eph] dec = [w.dec for w in eph] + vra = [w.ra_velocity for w in eph] + vdec = [w.dec_velocity for w in eph] ephemeris_df = pd.DataFrame.from_dict( { "mjd_utc": mjd, - "RA_deg": ra, - "Dec_deg": dec, + "ra_deg": ra, + "dec_deg": dec, + "vra_degperday": vra, + "vdec_degperday": vdec, } ) ephemeris_df["orbit_id"] = orbit.orbit_id @@ -86,11 +98,10 @@ def intersecting_frames( precovery_db: PrecoveryDatabase, neighbors=False, ): - frame_identifier_dfs = [] - healpixel = radec_to_healpixel( - list(ephemerides["RA_deg"]), - list(ephemerides["Dec_deg"]), + healpix_id = radec_to_healpixel( + list(ephemerides["ra_deg"]), + list(ephemerides["dec_deg"]), precovery_db.frames.healpix_nside, include_neighbors=neighbors, ) @@ -103,13 +114,13 @@ def intersecting_frames( { "mjd_utc": [x[0] for y in range(9)], "obscode": [x[1] for y in range(9)], - "healpixel": list(x[2]), + "healpix_id": list(x[2]), } ) for x in zip( ephemerides["mjd_utc"], ephemerides["observatory_code"], - list(healpixel.transpose()), + list(healpix_id.transpose()), ) ], ignore_index=True, @@ -123,7 +134,7 @@ def intersecting_frames( filtered_frames = [] for fi in unique_frame_identifier_df.itertuples(): for f in precovery_db.frames.idx.frames_by_obscode_mjd_hp( - fi.obscode, fi.mjd_utc, fi.healpixel + fi.obscode, fi.mjd_utc, fi.healpix_id ): filtered_frames.append(f) @@ -133,10 +144,9 @@ def intersecting_frames( def attribution_worker( ephemeris, observations, - eps=1 / 3600, + tolerance=1 / 3600, include_probabilistic=True, ): - """ gather attributions for a df of ephemerides, observations @@ -144,6 +154,7 @@ def attribution_worker( """ + attributions = [] # Create observer's dictionary from observations observers = {} for observatory_code in observations["observatory_code"].unique(): @@ -166,7 +177,7 @@ def attribution_worker( ephemeris_pre_grouped = ephemeris.groupby(by=["observatory_code", "mjd_utc"]) obs_group_keys = list(observations_grouped.groups.keys()) - indices_to_drop = pd.Int64Index([]) + indices_to_drop = pd.Index([], dtype="int64") for g_key in list(ephemeris_pre_grouped.groups.keys()): if g_key not in obs_group_keys: indices_to_drop = indices_to_drop.union( @@ -182,39 +193,58 @@ def attribution_worker( ] # Loop through each unique exposure and visit, find the nearest observations within - # eps (haversine metric) + # tolerance (haversine metric) distances = [] orbit_ids_associated = [] obs_ids_associated = [] + mag_associated = [] + mag_sigma_associated = [] obs_times_associated = [] - eps_rad = np.radians(eps) + coords_associated = [] + coords_pred_associated = [] + velocities_associated = [] + coords_sigma_associated = [] + frame_metadata_associated = [] + tolerance_rad = np.radians(tolerance) residuals = [] stats = [] for ephemeris_visit, observations_visit in zip( ephemeris_visits, observations_visits ): - assert len(ephemeris_visit["mjd_utc"].unique()) == 1 assert len(observations_visit["mjd_utc"].unique()) == 1 assert ( observations_visit["mjd_utc"].unique()[0] == ephemeris_visit["mjd_utc"].unique()[0] ) - obs_ids = observations_visit[["obs_id"]].values obs_times = observations_visit[["mjd_utc"]].values + mag = observations_visit[["mag"]].values + mag_sigma = observations_visit[["mag_sigma"]].values orbit_ids = ephemeris_visit[["orbit_id"]].values - coords = observations_visit[["RA_deg", "Dec_deg"]].values - coords_predicted = ephemeris_visit[["RA_deg", "Dec_deg"]].values - coords_sigma = observations_visit[["RA_sigma_deg", "Dec_sigma_deg"]].values - + coords = observations_visit[["ra_deg", "dec_deg"]].values + coords_predicted = ephemeris_visit[["ra_deg", "dec_deg"]].values + velocities = ephemeris_visit[["vra_degperday", "vdec_degperday"]].values + coords_sigma = observations_visit[["ra_sigma_deg", "dec_sigma_deg"]].values + frame_metadata = observations_visit[ + [ + "exposure_mjd_start", + "exposure_mjd_mid", + "filter", + "observatory_code", + "exposure_id", + "exposure_duration", + "healpix_id", + "dataset_id", + ] + ].values # Haversine metric requires latitude first then longitude... - coords_latlon = np.radians(observations_visit[["Dec_deg", "RA_deg"]].values) + coords_latlon = np.radians(observations_visit[["dec_deg", "ra_deg"]].values) coords_predicted_latlon = np.radians( - ephemeris_visit[["Dec_deg", "RA_deg"]].values + ephemeris_visit[["dec_deg", "ra_deg"]].values ) - num_obs = len(coords_predicted) + k = np.minimum(3, num_obs) # Build BallTree with a haversine metric on predicted ephemeris @@ -231,12 +261,19 @@ def attribution_worker( # Select all observations with distance smaller or equal # to the maximum given distance - mask = np.where(d <= eps_rad) + mask = np.where(d <= tolerance_rad) if len(d[mask]) > 0: orbit_ids_associated.append(orbit_ids[i[mask]]) obs_ids_associated.append(obs_ids[mask[0]]) obs_times_associated.append(obs_times[mask[0]]) + mag_associated.append(mag[mask[0]]) + mag_sigma_associated.append(mag_sigma[mask[0]]) + coords_associated.append(coords[mask[0]]) + coords_pred_associated.append(coords_predicted[i[mask]]) + velocities_associated.append(velocities[i[mask]]) + coords_sigma_associated.append(coords_sigma[mask[0]]) + frame_metadata_associated.append(frame_metadata[mask[0]]) distances.append(d[mask].reshape(-1, 1)) residuals_visit, stats_visit = calc_residuals( @@ -253,39 +290,76 @@ def attribution_worker( orbit_ids_associated = np.vstack(orbit_ids_associated) obs_ids_associated = np.vstack(obs_ids_associated) obs_times_associated = np.vstack(obs_times_associated) + mag_associated = np.vstack(mag_associated) + mag_sigma_associated = np.vstack(mag_sigma_associated) + coords_associated = np.vstack(coords_associated) + coords_pred_associated = np.vstack(coords_pred_associated) + velocities_associated = np.vstack(velocities_associated) + coords_sigma_associated = np.vstack(coords_sigma_associated) + frame_metadata_associated = np.vstack(frame_metadata_associated) residuals = np.vstack(residuals) stats = np.vstack(stats) - attributions = { - "orbit_id": orbit_ids_associated[:, 0], - "obs_id": obs_ids_associated[:, 0], - "mjd_utc": obs_times_associated[:, 0], - "distance": distances[:, 0], - "residual_ra_arcsec": residuals[:, 0] * 3600, - "residual_dec_arcsec": residuals[:, 1] * 3600, - "chi2": stats[:, 0], - } - if include_probabilistic: - attributions["probability"] = stats[:, 1] - attributions["mahalanobis_distance"] = stats[:, 2] - - attributions = pd.DataFrame(attributions) - - else: - columns = [ - "orbit_id", - "obs_id", - "mjd_utc", - "distance", - "residual_ra_arcsec", - "residual_dec_arcsec", - "chi2", + attributions = [ + BruteForceAttribution( + mjd=mjd_utc[0], + ra_deg=coords[0], + dec_deg=coords[1], + ra_sigma_arcsec=coords_sigma[0] * 3600.0, + dec_sigma_arcsec=coords_sigma[1] * 3600.0, + mag=mag[0], + mag_sigma=mag_sigma[0], + observation_id=obs_id[0], + pred_ra_deg=coords_pred[0], + pred_dec_deg=coords_pred[1], + pred_vra_degpday=velocities[0], + pred_vdec_degpday=velocities[1], + delta_ra_arcsec=residual[0] * 3600.0, + delta_dec_arcsec=residual[1] * 3600.0, + distance_arcsec=distance[0] * 3600.0, + exposure_mjd_start=frame_metadata[0], + exposure_mjd_mid=frame_metadata[1], + filter=frame_metadata[2], + obscode=frame_metadata[3], + exposure_id=frame_metadata[4], + exposure_duration=frame_metadata[5], + healpix_id=frame_metadata[6], + dataset_id=frame_metadata[7], + chi2=stats[0], + orbit_id=orbit_id[0], + probability=stats[1], + mahalanobis_distance=stats[2], + ) + for ( + orbit_id, + obs_id, + mjd_utc, + mag, + mag_sigma, + distance, + residual, + coords, + coords_pred, + velocities, + coords_sigma, + stats, + frame_metadata, + ) in zip( + orbit_ids_associated, + obs_ids_associated, + obs_times_associated, + mag_associated, + mag_sigma_associated, + distances, + residuals, + coords_associated, + coords_pred_associated, + velocities_associated, + coords_sigma_associated, + stats, + frame_metadata_associated, + ) ] - if include_probabilistic: - columns += ["probability", "mahalanobis_distance"] - - attributions = pd.DataFrame(columns=columns) - return attributions @@ -294,22 +368,51 @@ def attribute_observations( mjd_start: float, mjd_end: float, precovery_db: PrecoveryDatabase, - eps=5 / 3600, + tolerance=5 / 3600, include_probabilistic=True, - backend="PYOORB", - backend_kwargs={}, - orbits_chunk_size=10, - observations_chunk_size=100000, + orbits_chunk_size: int = 10, + observations_chunk_size: int = 100000, num_jobs: int = 1, obscodes_specified: Iterable[str] = [], ): + """ + + + Parameters + ---------- + orbits : list + List of Orbit objects to attribute observations to. + mjd_start : float + Start time of observations to attribute. + mjd_end : float + End time of observations to attribute. + precovery_db : PrecoveryDatabase + Precovery database to attribute observations from. + tolerance : float, optional + Tolerance in degrees to attribute observations to orbits, by default 5 / 3600 + include_probabilistic : bool, optional + Whether to include probabilistic statistics in the attribution, by default True + orbits_chunk_size : int, optional + Number of orbits to attribute in each chunk, by default 10 + observations_chunk_size : int, optional + Number of observations to attribute in each chunk, by default 100000 + num_jobs : int, optional + Number of jobs to run in parallel, by default 1 + Note that there is a very rare edge case when multiple versions of the same orbit + are provided - in this case the ballTree used will fail to consider all orbits + when computing nearest neighbors. + obscodes_specified : Iterable[str], optional + List of obscodes to attribute observations from, an empty list means no filter + is applied on the obscodes returned (i.e. every dataset is searched) + """ + logger.info("Running observation attribution...") time_start = time.time() num_orbits = len(orbits) - attribution_dfs = [] - + attribution_lists = [] + observations = [] epochs = get_frame_times_by_obscode( mjd_start, mjd_end, @@ -321,7 +424,9 @@ def attribute_observations( ephemerides, precovery_db=precovery_db, neighbors=True ) if len(frames_to_search) != 0: - observations = precovery_db.extract_observations_by_frames(frames_to_search) + observations = precovery_db.extract_observations_by_frames( + frames_to_search, include_frame_metadata=True + ) else: warning = ( "No intersecting frames were found. This is unlikely unless" @@ -330,43 +435,46 @@ def attribute_observations( warnings.warn(warning, UserWarning) return [] num_workers = min(num_jobs, mp.cpu_count() + 1) - if num_workers > 1: + logger.info( + "Splitting {} observations from {} frames over {} workers.".format( + len(observations), + len(frames_to_search), + num_workers, + ) + ) + if num_workers > 1: p = mp.Pool(processes=num_workers) # Send up to orbits_chunk_size orbits to each OD worker for processing chunk_size_ = calcChunkSize( num_orbits, num_workers, orbits_chunk_size, min_chunk_size=1 ) - orbits_split = [ orbits[i : i + chunk_size_] for i in range(0, len(orbits), chunk_size_) ] eph_split = [] - for orbit_c in orbits.split(orbits_chunk_size): + for orbit_c in orbits_split: eph_split.append( ephemerides[ ephemerides["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c]) ] ) for observations_c in yieldChunks(observations, observations_chunk_size): - obs = [observations_c for i in range(len(orbits_split))] - attribution_dfs_i = p.starmap( + attribution_lists_i = p.starmap( partial( attribution_worker, - eps=eps, + tolerance=tolerance, include_probabilistic=include_probabilistic, - backend=backend, - backend_kwargs=backend_kwargs, ), zip( eph_split, obs, ), ) - attribution_dfs += attribution_dfs_i + attribution_lists += attribution_lists_i p.close() @@ -376,30 +484,36 @@ def attribute_observations( orbits[i : i + orbits_chunk_size] for i in range(0, len(orbits), orbits_chunk_size) ]: - eph_c = ephemerides[ ephemerides["orbit_id"].isin([orbit.orbit_id for orbit in orbit_c]) ] - attribution_df_i = attribution_worker( + attribution_list_i = attribution_worker( eph_c, observations_c, - eps=eps, + tolerance=tolerance, include_probabilistic=include_probabilistic, ) - attribution_dfs.append(attribution_df_i) + attribution_lists.append(attribution_list_i) - attributions = pd.concat(attribution_dfs) - attributions.sort_values( - by=["orbit_id", "mjd_utc", "distance"], inplace=True, ignore_index=True + # Flatten the list of attribution lists + attributions = [ + attribution + for attribution_list in attribution_lists + for attribution in attribution_list + ] + attribution_df = pd.DataFrame(attributions) + attribution_df.sort_values( + by=["orbit_id", "mjd", "distance_arcsec"], inplace=True, ignore_index=True ) - time_end = time.time() logger.info( - "Attributed {} observations to {} orbits.".format( - attributions["obs_id"].nunique(), attributions["orbit_id"].nunique() + "Attributed {} observations ({} unique) to {} orbits.".format( + attribution_df["observation_id"].count(), + attribution_df["observation_id"].nunique(), + attribution_df["orbit_id"].nunique(), ) ) logger.info( "Attribution completed in {:.3f} seconds.".format(time_end - time_start) ) - return attributions + return attributions, attribution_df diff --git a/precovery/frame_db.py b/precovery/frame_db.py index 9861d52..5074a8e 100644 --- a/precovery/frame_db.py +++ b/precovery/frame_db.py @@ -440,7 +440,8 @@ def frames_by_date( self, mjd_start: float, mjd_end: float ) -> Iterator[HealpixFrame]: """ - Returns all frames in the index within a date range, sorted by obscode, exposure_mjd_mid, and healpixel. + Returns all frames in the index within a date range, sorted by obscode, + exposure_mjd_mid, and healpixel. """ select_stmt = ( sq.select( diff --git a/precovery/healpix_geom.py b/precovery/healpix_geom.py index d1e7282..3c1717e 100644 --- a/precovery/healpix_geom.py +++ b/precovery/healpix_geom.py @@ -9,7 +9,6 @@ def radec_to_healpixel( nside: int, include_neighbors: bool = False, ) -> npt.NDArray[np.int64]: - if not include_neighbors: return hp.ang2pix(nside, ra, dec, nest=True, lonlat=True).astype(np.int64) else: diff --git a/precovery/main.py b/precovery/main.py index 57e271a..62ab261 100644 --- a/precovery/main.py +++ b/precovery/main.py @@ -52,6 +52,7 @@ def precover( end_mjd: Optional[float] = None, window_size: int = 7, include_frame_candidates: bool = False, + algorithm: Optional[str] = "nelson", ) -> List[Union[PrecoveryCandidate, FrameCandidate]]: """ Connect to database directory and run precovery for the input orbit. @@ -73,13 +74,20 @@ def precover( Limit precovery search to all MJD UTC times beyond this time. end_mjd : float, optional Limit precovery search to all MJD UTC times before this time. + algorithm: str, optional from 'nelson' or 'bruteforce' + Choice among two precovery algorithms. Note that both algorithms use the same indexed + precovery database. + 'nelson': The orbit is propagated with N-body dynamics to the midpoint of each window (see + 'window_size). From the midpoint,the orbit is then propagated using 2-body dynamics to find + which HealpixFrames intersect the trajectory. Once the list of HealpixFrames has been made, + the orbit is then propagated vian-body dynamics to each frame and the angular distance to + each observation in that frame is checked. + 'bruteforce': propagates to each unique (mjd,obscode) among frames, finds intersecting frames + with neighbors, extracts all observations in intersecting frames, builds balltree to query + distance using observed RA/Dec window_size : int, optional To decrease computational cost, the index observations are searched in windows of this size. - The orbit is propagated with N-body dynamics to the midpoint of each window. From the midpoint, - the orbit is then propagated using 2-body dynamics to find which HealpixFrames intersect the - trajectory. Once the list of HealpixFrames has been made, the orbit is then propagated via - n-body dynamics to each frame and the angular distance to each observation in that - frame is checked. + This parameter is ignored if algorithm='bruteforce' is selected include_frame_candidates : bool, optional If no observations are found within the given angular tolerance, return the HealpixFrame where the trajectory intersected the Healpixel but no observations were found. This is useful @@ -97,19 +105,33 @@ def precover( precovery_db = PrecoveryDatabase.from_dir( database_directory, create=False, mode="r" ) - - candidates = [ - c - for c in precovery_db.precover( - orbit, - tolerance=tolerance, - max_matches=max_matches, - start_mjd=start_mjd, - end_mjd=end_mjd, - window_size=window_size, - include_frame_candidates=include_frame_candidates, - ) - ] + candidates = [] + if algorithm == "nelson": + candidates = [ + c + for c in precovery_db.precover( + orbit, + tolerance=tolerance, + max_matches=max_matches, + start_mjd=start_mjd, + end_mjd=end_mjd, + window_size=window_size, + include_frame_candidates=include_frame_candidates, + ) + ] + elif algorithm == "bruteforce": + candidates = [ + c + for c in precovery_db.precover( + orbit, + tolerance=tolerance, + max_matches=max_matches, + start_mjd=start_mjd, + end_mjd=end_mjd, + window_size=window_size, + include_frame_candidates=include_frame_candidates, + ) + ] df = pd.DataFrame(_candidates_to_dict(candidates)) df.loc[:, "observation_id"] = df.loc[:, "observation_id"].astype(str) diff --git a/precovery/precovery_db.py b/precovery/precovery_db.py index 1eec6a2..c89c8d2 100644 --- a/precovery/precovery_db.py +++ b/precovery/precovery_db.py @@ -500,17 +500,21 @@ def _check_frames( n_frame += 1 logger.info("checked %d frames", n_frame) - def extract_observations_by_frames(self, frames: Iterable[HealpixFrame]): + def extract_observations_by_frames( + self, frames: Iterable[HealpixFrame], include_frame_metadata=False + ): # consider warnings for available memory obs_out = pd.DataFrame( columns=[ "observatory_code", - "healpixel", + "healpix_id", "obs_id", - "RA_deg", - "Dec_deg", - "RA_sigma_deg", - "Dec_sigma_deg", + "ra_deg", + "dec_deg", + "ra_sigma_deg", + "dec_sigma_deg", + "mag", + "mag_sigma", "mjd_utc", ] ) @@ -519,12 +523,24 @@ def extract_observations_by_frames(self, frames: Iterable[HealpixFrame]): # over loop, build dataframe of observations within each frame with shared frame scalars for frame in frames: # can't mix numpy array types, so two accumulators: one for floats, one for obs_id strings - inc_arr = np.empty((0, 5), float) + inc_arr = np.empty((0, 7), float) obs_ids = np.empty((0, 1), object) for obs in self.frames.iterate_observations(frame): inc_arr = np.append( inc_arr, - np.array([[obs.ra, obs.dec, obs.ra_sigma, obs.dec_sigma, obs.mjd]]), + np.array( + [ + [ + obs.ra, + obs.dec, + obs.ra_sigma, + obs.dec_sigma, + obs.mag, + obs.mag_sigma, + obs.mjd, + ] + ] + ), axis=0, ) obs_ids = np.append( @@ -536,16 +552,28 @@ def extract_observations_by_frames(self, frames: Iterable[HealpixFrame]): frame_obs = pd.DataFrame( inc_arr, columns=[ - "RA_deg", - "Dec_deg", - "RA_sigma_deg", - "Dec_sigma_deg", + "ra_deg", + "dec_deg", + "ra_sigma_deg", + "dec_sigma_deg", + "mag", + "mag_sigma", "mjd_utc", ], ) - frame_obs.insert(0, "observatory_code", frame.obscode) - frame_obs.insert(1, "healpixel", frame.healpixel) - frame_obs.insert(2, "obs_id", obs_ids) + + frame_obs.insert(0, "obs_id", obs_ids) + if include_frame_metadata: + frame_obs.insert(0, "observatory_code", frame.obscode) + frame_obs.insert(1, "healpix_id", frame.healpixel) + frame_obs.insert(2, "frame_id", frame.id) + frame_obs.insert(3, "exposure_id", frame.exposure_id) + frame_obs.insert(4, "exposure_mjd_start", frame.exposure_mjd_start) + frame_obs.insert(5, "exposure_mjd_mid", frame.exposure_mjd_mid) + frame_obs.insert(6, "exposure_duration", frame.exposure_duration) + frame_obs.insert(7, "dataset_id", frame.dataset_id) + frame_obs.insert(7, "filter", frame.filter) + frame_dfs.append(frame_obs) obs_out = pd.concat(frame_dfs) return obs_out diff --git a/precovery/residuals.py b/precovery/residuals.py index 01f72a8..3f54d05 100644 --- a/precovery/residuals.py +++ b/precovery/residuals.py @@ -79,7 +79,6 @@ def calc_residuals( ) stats = (chi2, p, d) else: - stats = (chi2,) return residuals, stats diff --git a/precovery/utils.py b/precovery/utils.py index 0bd459a..316b821 100644 --- a/precovery/utils.py +++ b/precovery/utils.py @@ -1,6 +1,4 @@ import logging -import multiprocessing as mp -import signal import numpy as np import pandas as pd @@ -34,7 +32,10 @@ def yieldChunks(indexable, chunk_size): for c in range(0, len(indexable), chunk_size): yield indexable.iloc[c : c + chunk_size] else: - err = "Indexable should be one of {list, `~numpy.ndarray`, `~pandas.DataFrame`, `~pandas.Series`}" + err = ( + "Indexable should be one of {list, `~numpy.ndarray`, `~pandas.DataFrame`," + " `~pandas.Series`}" + ) raise ValueError(err) diff --git a/tests/get_orbits.py b/tests/get_orbits.py index b662c2e..bf99a8f 100644 --- a/tests/get_orbits.py +++ b/tests/get_orbits.py @@ -101,7 +101,6 @@ def get_sample_orbits(targets: list[str]) -> pd.DataFrame: if __name__ == "__main__": - parser = argparse.ArgumentParser( description="Get orbits of sample targets from JPL's Small-Body Database." ) diff --git a/tests/make_observations.py b/tests/make_observations.py index eb3d534..7bad0b5 100644 --- a/tests/make_observations.py +++ b/tests/make_observations.py @@ -38,9 +38,7 @@ def dataframe_to_orbit( """ orbits = [] for i in range(len(orbits_df)): - if orbit_type == "keplerian": - orbit_i = Orbit.keplerian( i, orbits_df["a"].values[i], @@ -56,7 +54,6 @@ def dataframe_to_orbit( ) elif orbit_type == "cometary": - # Extract time of perihelion passage tp = Time( orbits_df["tp"].values[i], @@ -205,7 +202,6 @@ def make_observations( if __name__ == "__main__": - parser = argparse.ArgumentParser( description="Get orbits of sample targets from JPL's Small-Body Database." ) From c3b0deae51251f7447934c95d44c6bec0abafc02 Mon Sep 17 00:00:00 2001 From: Nate Tellis Date: Thu, 26 Jan 2023 19:21:58 -0800 Subject: [PATCH 12/12] Added brute force to precovery.main.precover with algorithm as a kwarg, extends test_precovery_index to run on both algorithms --- precovery/brute_force.py | 32 +++++++++++++++++--------------- precovery/main.py | 26 ++++++++++++++------------ tests/test_precovery_index.py | 10 ++++++++-- 3 files changed, 39 insertions(+), 29 deletions(-) diff --git a/precovery/brute_force.py b/precovery/brute_force.py index 6e3bfc5..f610bd9 100644 --- a/precovery/brute_force.py +++ b/precovery/brute_force.py @@ -5,7 +5,7 @@ import time import warnings from functools import partial -from typing import Iterable +from typing import Iterable, Optional import numpy as np import pandas as pd @@ -46,15 +46,18 @@ class BruteForceAttribution(PrecoveryCandidate): def get_frame_times_by_obscode( - mjd_start: float, - mjd_end: float, precovery_db: PrecoveryDatabase, + mjd_start: Optional[float] = None, + mjd_end: Optional[float] = None, obscodes_specified: Iterable[str] = [], ): all_frame_mjd = precovery_db.frames.idx.unique_frame_times() - frame_mjd_within_range = [ - x for x in all_frame_mjd if (x[0] > mjd_start and x[0] < mjd_end) - ] + if (mjd_start is None) and (mjd_end is None): + frame_mjd_within_range = all_frame_mjd + else: + frame_mjd_within_range = [ + x for x in all_frame_mjd if (x[0] > mjd_start and x[0] < mjd_end) + ] frame_mjds_by_obscode = dict() for exposure_mjd_mid, obscode in frame_mjd_within_range: if len(obscodes_specified) == 0 or obscode in obscodes_specified: @@ -280,7 +283,7 @@ def attribution_worker( coords[mask[0]], coords_predicted[i[mask]], sigmas_actual=coords_sigma[mask[0]], - include_probabilistic=True, + include_probabilistic=include_probabilistic, ) residuals.append(residuals_visit) stats.append(np.vstack(stats_visit).T) @@ -299,7 +302,6 @@ def attribution_worker( frame_metadata_associated = np.vstack(frame_metadata_associated) residuals = np.vstack(residuals) stats = np.vstack(stats) - attributions = [ BruteForceAttribution( mjd=mjd_utc[0], @@ -325,10 +327,10 @@ def attribution_worker( exposure_duration=frame_metadata[5], healpix_id=frame_metadata[6], dataset_id=frame_metadata[7], - chi2=stats[0], + chi2=None if not include_probabilistic else stats[0], orbit_id=orbit_id[0], - probability=stats[1], - mahalanobis_distance=stats[2], + probability=None if not include_probabilistic else stats[1], + mahalanobis_distance=None if not include_probabilistic else stats[2], ) for ( orbit_id, @@ -365,9 +367,9 @@ def attribution_worker( def attribute_observations( orbits, - mjd_start: float, - mjd_end: float, precovery_db: PrecoveryDatabase, + mjd_start: Optional[float] = None, + mjd_end: Optional[float] = None, tolerance=5 / 3600, include_probabilistic=True, orbits_chunk_size: int = 10, @@ -414,9 +416,9 @@ def attribute_observations( attribution_lists = [] observations = [] epochs = get_frame_times_by_obscode( - mjd_start, - mjd_end, precovery_db=precovery_db, + mjd_start=mjd_start, + mjd_end=mjd_end, obscodes_specified=obscodes_specified, ) ephemerides = ephemerides_from_orbits(orbits, epochs) diff --git a/precovery/main.py b/precovery/main.py index 62ab261..103c490 100644 --- a/precovery/main.py +++ b/precovery/main.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd +from .brute_force import attribute_observations from .orbit import Orbit from .precovery_db import FrameCandidate, PrecoveryCandidate, PrecoveryDatabase @@ -53,6 +54,7 @@ def precover( window_size: int = 7, include_frame_candidates: bool = False, algorithm: Optional[str] = "nelson", + include_probabilistic: bool = False, ) -> List[Union[PrecoveryCandidate, FrameCandidate]]: """ Connect to database directory and run precovery for the input orbit. @@ -94,6 +96,10 @@ def precover( for negative observation campaigns. Note that camera footprints are not modeled, all datasets are mapped onto a Healpixel space and this simply returns the Healpixel equivalent exposure information. + include_probablilistic : bool, optional, default False + In the case of 'bruteforce' algorithm, include probabilistic information in the matches + output. These include the candidate chi2, probability, and mahalanobis distance. Note that + the return type will be a BruteForceAttribution, which extends the PrecoveryCandidate class. Returns ------- @@ -120,18 +126,14 @@ def precover( ) ] elif algorithm == "bruteforce": - candidates = [ - c - for c in precovery_db.precover( - orbit, - tolerance=tolerance, - max_matches=max_matches, - start_mjd=start_mjd, - end_mjd=end_mjd, - window_size=window_size, - include_frame_candidates=include_frame_candidates, - ) - ] + candidates, attribution_df = attribute_observations( + [orbit], + precovery_db=precovery_db, + mjd_start=start_mjd, + mjd_end=end_mjd, + tolerance=tolerance, + include_probabilistic=include_probabilistic, + ) df = pd.DataFrame(_candidates_to_dict(candidates)) df.loc[:, "observation_id"] = df.loc[:, "observation_id"].astype(str) diff --git a/tests/test_precovery_index.py b/tests/test_precovery_index.py index a8e7f50..d323616 100644 --- a/tests/test_precovery_index.py +++ b/tests/test_precovery_index.py @@ -26,7 +26,11 @@ def test_db_dir(): shutil.rmtree(out_dir) -def test_precovery(test_db_dir): +precovery_algorithms = ["nelson", "bruteforce"] + + +@pytest.mark.parametrize("algorithm", precovery_algorithms) +def test_precovery(test_db_dir, algorithm): """ Given a properly formatted h5 file, ensure the observations are indexed properly """ @@ -71,7 +75,9 @@ def test_precovery(test_db_dir): # For each sample orbit, validate we get all the observations we planted for orbit in orbits_keplerian: - results = precover(orbit, test_db_dir, tolerance=1 / 3600, window_size=1) + results = precover( + orbit, test_db_dir, tolerance=1 / 3600, window_size=1, algorithm=algorithm + ) object_observations = observations_df[ observations_df["object_id"] == orbit_name_mapping[orbit.orbit_id]