From 870e753fd630dc9595d12876326be4e33288cd17 Mon Sep 17 00:00:00 2001 From: Theodlz Date: Thu, 7 Aug 2025 10:15:22 +0200 Subject: [PATCH 1/6] decode avros and check if alerts already exist in each worker task, not in the main kafka reading sequence --- kowalski/alert_brokers/alert_broker.py | 37 +- kowalski/alert_brokers/alert_broker_pgir.py | 256 ++++++++------ kowalski/alert_brokers/alert_broker_turbo.py | 218 ++++++------ kowalski/alert_brokers/alert_broker_winter.py | 242 +++++++------ kowalski/alert_brokers/alert_broker_ztf.py | 333 ++++++++++-------- 5 files changed, 574 insertions(+), 512 deletions(-) diff --git a/kowalski/alert_brokers/alert_broker.py b/kowalski/alert_brokers/alert_broker.py index 817b8738..13649a3f 100644 --- a/kowalski/alert_brokers/alert_broker.py +++ b/kowalski/alert_brokers/alert_broker.py @@ -220,34 +220,26 @@ def decode_message(cls, msg): return decoded_msg @staticmethod - def process_alert(alert: Mapping, topic: str): + def process_alerts(avro_msg: bytes, topic: str): """Alert brokering task run by dask.distributed workers - :param alert: decoded alert from Kafka stream + :param avro_msg: avro message from Kafka stream :param topic: Kafka stream topic name for bookkeeping :return: """ raise NotImplementedError("Must be implemented in subclass") - def submit_alert(self, record: Mapping): - # we look for objectId and objectid if missing, - # to support both ZTF and WNTR alert schemas - objectId = record.get("objectId", record.get("objectid", None)) - if objectId is None: - log( - f"Failed to get objectId from record {record}, skipping alert submission" - ) - return + def submit_alert(self, avro_msg: bytes): with timer( - f"Submitting alert {objectId} {record['candid']} for processing", + "Submitting alert for processing", self.verbose > 1, ): future = self.dask_client.submit( - self.process_alert, record, self.topic, pure=True + self.process_alerts, avro_msg, self.topic, pure=True ) dask.distributed.fire_and_forget(future) future.release() - del future, record + del future, avro_msg # clean up after thyself return def poll(self): @@ -264,22 +256,7 @@ def poll(self): elif msg is not None: try: - # decode avro packet - with timer("Decoding alert", self.verbose > 1): - msg_decoded = self.decode_message(msg) - - for record in msg_decoded: - if ( - retry(self.mongo.db[self.collection_alerts].count_documents)( - {"candid": record["candid"]}, limit=1 - ) - == 0 - ): - - self.submit_alert(record) - - # clean up after thyself - del msg_decoded + self.submit_alert(msg) except Exception as e: print("Error in poll!") diff --git a/kowalski/alert_brokers/alert_broker_pgir.py b/kowalski/alert_brokers/alert_broker_pgir.py index ce4d2840..f8d171d9 100644 --- a/kowalski/alert_brokers/alert_broker_pgir.py +++ b/kowalski/alert_brokers/alert_broker_pgir.py @@ -14,7 +14,7 @@ import dask.distributed from kowalski.alert_brokers.alert_broker import AlertConsumer, AlertWorker, EopError from bson.json_util import loads as bson_loads -from kowalski.utils import init_db_sync, timer +from kowalski.utils import init_db_sync, timer, retry from kowalski.config import load_config from kowalski.log import log @@ -31,155 +31,177 @@ def __init__(self, topic: str, dask_client: dask.distributed.Client, **kwargs): super().__init__(topic, dask_client, **kwargs) @staticmethod - def process_alert(alert: Mapping, topic: str): + def process_alerts(avro_msg: bytes, topic: str, worker): """Alert brokering task run by dask.distributed workers - :param alert: decoded alert from Kafka stream + :param avro_msg: avro message from Kafka stream :param topic: Kafka stream topic name for bookkeeping :return: """ - candid = alert["candid"] - object_id = alert["objectId"] # get worker running current task worker = dask.distributed.get_worker() alert_worker = worker.plugins["worker-init"].alert_worker - log(f"{topic} {object_id} {candid} {worker.address}") + with timer("Decoding alert", alert_worker.verbose > 1): + msg_decoded = alert_worker.decode_message(avro_msg) - # return if this alert packet has already been processed and ingested into collection_alerts: - if ( - alert_worker.mongo.db[alert_worker.collection_alerts].count_documents( - {"candid": candid}, limit=1 - ) - == 1 - ): - return - - # candid not in db, ingest decoded avro packet into db - with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1): - alert, prv_candidates, _ = alert_worker.alert_mongify(alert) - - # create alert history - all_prv_candidates = deepcopy(prv_candidates) + [deepcopy(alert["candidate"])] - with timer( - f"Gather all previous candidates for {object_id} {candid}", - alert_worker.verbose > 1, - ): - # get all prv_candidates for this objectId: - existing_aux = alert_worker.mongo.db[ - alert_worker.collection_alerts_aux - ].find_one({"_id": object_id}, {"prv_candidates": 1}) + for alert in msg_decoded: + candid = alert["candid"] + object_id = alert["objectId"] if ( - existing_aux is not None - and len(existing_aux.get("prv_candidates", [])) > 0 + retry( + alert_worker.mongo.db[ + alert_worker.collection_alerts + ].count_documents + )({"candid": candid}, limit=1) + == 1 ): - all_prv_candidates += existing_aux["prv_candidates"] - del existing_aux - - # ML models: - with timer(f"MLing of {object_id} {candid}", alert_worker.verbose > 1): - scores = alert_worker.alert_filter__ml(alert, all_prv_candidates) - alert["classifications"] = scores + # this alert has already been processed, skip it + log(f"Alert {object_id} {candid} already processed, skipping") + continue - with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1): - alert_worker.mongo.insert_one( - collection=alert_worker.collection_alerts, document=alert - ) + log(f"pgir: {topic} {object_id} {candid} {worker.address}") - # prv_candidates: pop nulls - save space - prv_candidates = [ - {kk: vv for kk, vv in prv_candidate.items() if vv is not None} - for prv_candidate in prv_candidates - ] - - alert_aux, xmatches, xmatches_ztf, passed_filters = None, None, None, None - # cross-match with external catalogs if objectId not in collection_alerts_aux: - if ( - alert_worker.mongo.db[alert_worker.collection_alerts_aux].count_documents( - {"_id": object_id}, limit=1 - ) - == 0 - ): + # candid not in db, ingest decoded avro packet into db with timer( - f"Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + f"Mongification of {object_id} {candid}", alert_worker.verbose > 1 ): - xmatches = alert_worker.alert_filter__xmatch(alert) + alert, prv_candidates, _ = alert_worker.alert_mongify(alert) - # Crossmatch new alert with most recent ZTF_alerts and insert + # create alert history + all_prv_candidates = deepcopy(prv_candidates) + [ + deepcopy(alert["candidate"]) + ] with timer( - f"ZTF Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + f"Gather all previous candidates for {object_id} {candid}", + alert_worker.verbose > 1, ): - xmatches = { - **xmatches, - **alert_worker.alert_filter__xmatch_ztf_alerts(alert), - } + # get all prv_candidates for this objectId: + existing_aux = alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].find_one({"_id": object_id}, {"prv_candidates": 1}) + if ( + existing_aux is not None + and len(existing_aux.get("prv_candidates", [])) > 0 + ): + all_prv_candidates += existing_aux["prv_candidates"] + del existing_aux - alert_aux = { - "_id": object_id, - "cross_matches": xmatches, - "prv_candidates": prv_candidates, - } + # ML models: + with timer(f"MLing of {object_id} {candid}", alert_worker.verbose > 1): + scores = alert_worker.alert_filter__ml(alert, all_prv_candidates) + alert["classifications"] = scores - with timer(f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1): + with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1): alert_worker.mongo.insert_one( - collection=alert_worker.collection_alerts_aux, document=alert_aux + collection=alert_worker.collection_alerts, document=alert ) - else: - with timer( - f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 - ): - alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one( - {"_id": object_id}, - {"$addToSet": {"prv_candidates": {"$each": prv_candidates}}}, - upsert=True, - ) + # prv_candidates: pop nulls - save space + prv_candidates = [ + {kk: vv for kk, vv in prv_candidate.items() if vv is not None} + for prv_candidate in prv_candidates + ] - # Crossmatch exisiting alert with most recent record in ZTF_alerts and update aux - with timer( - f"ZTF Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + alert_aux, xmatches, xmatches_ztf, passed_filters = None, None, None, None + # cross-match with external catalogs if objectId not in collection_alerts_aux: + if ( + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].count_documents({"_id": object_id}, limit=1) + == 0 ): - xmatches_ztf = alert_worker.alert_filter__xmatch_ztf_alerts(alert) + with timer( + f"Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + ): + xmatches = alert_worker.alert_filter__xmatch(alert) - with timer( - f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 - ): - alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one( - {"_id": object_id}, - {"$set": {"ZTF_alerts": xmatches_ztf}}, - upsert=True, - ) + # Crossmatch new alert with most recent ZTF_alerts and insert + with timer( + f"ZTF Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + ): + xmatches = { + **xmatches, + **alert_worker.alert_filter__xmatch_ztf_alerts(alert), + } + + alert_aux = { + "_id": object_id, + "cross_matches": xmatches, + "prv_candidates": prv_candidates, + } - if config["misc"]["broker"]: - # execute user-defined alert filters - with timer(f"Filtering of {object_id} {candid}", alert_worker.verbose > 1): - passed_filters = alert_worker.alert_filter__user_defined( - alert_worker.filter_templates, alert - ) - if alert_worker.verbose > 1: - log( - f"{object_id} {candid} number of filters passed: {len(passed_filters)}" + with timer( + f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1 + ): + alert_worker.mongo.insert_one( + collection=alert_worker.collection_alerts_aux, + document=alert_aux, + ) + + else: + with timer( + f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 + ): + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].update_one( + {"_id": object_id}, + {"$addToSet": {"prv_candidates": {"$each": prv_candidates}}}, + upsert=True, + ) + + # Crossmatch exisiting alert with most recent record in ZTF_alerts and update aux + with timer( + f"ZTF Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + ): + xmatches_ztf = alert_worker.alert_filter__xmatch_ztf_alerts(alert) + + with timer( + f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 + ): + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].update_one( + {"_id": object_id}, + {"$set": {"ZTF_alerts": xmatches_ztf}}, + upsert=True, + ) + + if config["misc"]["broker"]: + # execute user-defined alert filters + with timer( + f"Filtering of {object_id} {candid}", alert_worker.verbose > 1 + ): + passed_filters = alert_worker.alert_filter__user_defined( + alert_worker.filter_templates, alert + ) + if alert_worker.verbose > 1: + log( + f"{object_id} {candid} number of filters passed: {len(passed_filters)}" + ) + + # post to SkyPortal + alert_worker.alert_sentinel_skyportal( + alert, prv_candidates, passed_filters=passed_filters ) - # post to SkyPortal - alert_worker.alert_sentinel_skyportal( - alert, prv_candidates, passed_filters=passed_filters + # clean up after thyself + del ( + alert, + prv_candidates, + all_prv_candidates, + scores, + xmatches, + xmatches_ztf, + alert_aux, + passed_filters, + candid, + object_id, ) - # clean up after thyself - del ( - alert, - prv_candidates, - all_prv_candidates, - scores, - xmatches, - xmatches_ztf, - alert_aux, - passed_filters, - candid, - object_id, - ) + return class PGIRAlertWorker(AlertWorker, ABC): diff --git a/kowalski/alert_brokers/alert_broker_turbo.py b/kowalski/alert_brokers/alert_broker_turbo.py index f2ef5613..f27d265f 100644 --- a/kowalski/alert_brokers/alert_broker_turbo.py +++ b/kowalski/alert_brokers/alert_broker_turbo.py @@ -14,7 +14,7 @@ import dask.distributed from kowalski.alert_brokers.alert_broker import AlertConsumer, AlertWorker, EopError from bson.json_util import loads as bson_loads -from kowalski.utils import init_db_sync, timer +from kowalski.utils import init_db_sync, timer, retry from kowalski.config import load_config from kowalski.log import log @@ -31,131 +31,151 @@ def __init__(self, topic: str, dask_client: dask.distributed.Client, **kwargs): super().__init__(topic, dask_client, **kwargs) @staticmethod - def process_alert(alert: Mapping, topic: str): + def process_alerts(avro_msg: bytes, topic: str, worker): """Alert brokering task run by dask.distributed workers - :param alert: decoded alert from Kafka stream + :param avro_msg: avro message from Kafka stream :param topic: Kafka stream topic name for bookkeeping :return: """ - candid = alert["candid"] - object_id = alert["objectId"] # get worker running current task worker = dask.distributed.get_worker() alert_worker = worker.plugins["worker-init"].alert_worker - log(f"{topic} {object_id} {candid} {worker.address}") - - # return if this alert packet has already been processed and ingested into collection_alerts: - if ( - alert_worker.mongo.db[alert_worker.collection_alerts].count_documents( - {"candid": candid}, limit=1 - ) - == 1 - ): - return - - # candid not in db, ingest decoded avro packet into db - with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1): - alert, prv_candidates, _ = alert_worker.alert_mongify(alert) - - with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1): - alert_worker.mongo.insert_one( - collection=alert_worker.collection_alerts, document=alert - ) + with timer("Decoding alert", alert_worker.verbose > 1): + msg_decoded = alert_worker.decode_message(avro_msg) + + for alert in msg_decoded: + candid = alert["candid"] + object_id = alert["objectid"] + if ( + retry( + alert_worker.mongo.db[ + alert_worker.collection_alerts + ].count_documents + )({"candid": candid}, limit=1) + == 1 + ): + # this alert has already been processed, skip it + log(f"Alert {object_id} {candid} already processed, skipping") + continue - # prv_candidates: pop nulls - save space - prv_candidates = [ - {kk: vv for kk, vv in prv_candidate.items() if vv is not None} - for prv_candidate in prv_candidates - ] + log(f"turbo: {topic} {object_id} {candid} {worker.address}") - alert_aux, xmatches, xmatches_ztf, passed_filters = None, None, None, None - # cross-match with external catalogs if objectId not in collection_alerts_aux: - if ( - alert_worker.mongo.db[alert_worker.collection_alerts_aux].count_documents( - {"_id": object_id}, limit=1 - ) - == 0 - ): + # candid not in db, ingest decoded avro packet into db with timer( - f"Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + f"Mongification of {object_id} {candid}", alert_worker.verbose > 1 ): - xmatches = alert_worker.alert_filter__xmatch(alert) + alert, prv_candidates, _ = alert_worker.alert_mongify(alert) - # Crossmatch new alert with most recent ZTF_alerts and insert - with timer( - f"ZTF Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1): + alert_worker.mongo.insert_one( + collection=alert_worker.collection_alerts, document=alert + ) + + # prv_candidates: pop nulls - save space + prv_candidates = [ + {kk: vv for kk, vv in prv_candidate.items() if vv is not None} + for prv_candidate in prv_candidates + ] + + alert_aux, xmatches, xmatches_ztf, passed_filters = None, None, None, None + # cross-match with external catalogs if objectId not in collection_alerts_aux: + if ( + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].count_documents({"_id": object_id}, limit=1) + == 0 ): - xmatches = { - **xmatches, - **alert_worker.alert_filter__xmatch_ztf_alerts(alert), + with timer( + f"Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + ): + xmatches = alert_worker.alert_filter__xmatch(alert) + + # Crossmatch new alert with most recent ZTF_alerts and insert + with timer( + f"ZTF Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + ): + xmatches = { + **xmatches, + **alert_worker.alert_filter__xmatch_ztf_alerts(alert), + } + + alert_aux = { + "_id": object_id, + "cross_matches": xmatches, + "prv_candidates": prv_candidates, } - alert_aux = { - "_id": object_id, - "cross_matches": xmatches, - "prv_candidates": prv_candidates, - } + with timer( + f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1 + ): + alert_worker.mongo.insert_one( + collection=alert_worker.collection_alerts_aux, + document=alert_aux, + ) - with timer(f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1): - alert_worker.mongo.insert_one( - collection=alert_worker.collection_alerts_aux, document=alert_aux - ) + else: + with timer( + f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 + ): + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].update_one( + {"_id": object_id}, + {"$addToSet": {"prv_candidates": {"$each": prv_candidates}}}, + upsert=True, + ) - else: - with timer( - f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 - ): - alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one( - {"_id": object_id}, - {"$addToSet": {"prv_candidates": {"$each": prv_candidates}}}, - upsert=True, - ) + # Crossmatch exisiting alert with most recent record in ZTF_alerts and update aux + with timer( + f"ZTF Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + ): + xmatches_ztf = alert_worker.alert_filter__xmatch_ztf_alerts(alert) - # Crossmatch exisiting alert with most recent record in ZTF_alerts and update aux - with timer( - f"ZTF Cross-match of {object_id} {candid}", alert_worker.verbose > 1 - ): - xmatches_ztf = alert_worker.alert_filter__xmatch_ztf_alerts(alert) + with timer( + f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 + ): + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].update_one( + {"_id": object_id}, + {"$set": {"ZTF_alerts": xmatches_ztf}}, + upsert=True, + ) - with timer( - f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 - ): - alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one( - {"_id": object_id}, - {"$set": {"ZTF_alerts": xmatches_ztf}}, - upsert=True, - ) + if config["misc"]["broker"]: + # execute user-defined alert filters + with timer( + f"Filtering of {object_id} {candid}", alert_worker.verbose > 1 + ): + passed_filters = alert_worker.alert_filter__user_defined( + alert_worker.filter_templates, alert + ) + if alert_worker.verbose > 1: + log( + f"{object_id} {candid} number of filters passed: {len(passed_filters)}" + ) - if config["misc"]["broker"]: - # execute user-defined alert filters - with timer(f"Filtering of {object_id} {candid}", alert_worker.verbose > 1): - passed_filters = alert_worker.alert_filter__user_defined( - alert_worker.filter_templates, alert - ) - if alert_worker.verbose > 1: - log( - f"{object_id} {candid} number of filters passed: {len(passed_filters)}" + # post to SkyPortal + alert_worker.alert_sentinel_skyportal( + alert, prv_candidates, passed_filters=passed_filters ) - # post to SkyPortal - alert_worker.alert_sentinel_skyportal( - alert, prv_candidates, passed_filters=passed_filters + # clean up after thyself + del ( + alert, + prv_candidates, + xmatches, + xmatches_ztf, + alert_aux, + passed_filters, + candid, + object_id, ) - # clean up after thyself - del ( - alert, - prv_candidates, - xmatches, - xmatches_ztf, - alert_aux, - passed_filters, - candid, - object_id, - ) + return class TURBOAlertWorker(AlertWorker, ABC): diff --git a/kowalski/alert_brokers/alert_broker_winter.py b/kowalski/alert_brokers/alert_broker_winter.py index 538d3eec..4bf17c90 100644 --- a/kowalski/alert_brokers/alert_broker_winter.py +++ b/kowalski/alert_brokers/alert_broker_winter.py @@ -14,7 +14,7 @@ import dask.distributed from kowalski.alert_brokers.alert_broker import AlertConsumer, AlertWorker, EopError from bson.json_util import loads as bson_loads -from kowalski.utils import init_db_sync, timer +from kowalski.utils import init_db_sync, timer, retry from kowalski.config import load_config from kowalski.log import log @@ -47,146 +47,160 @@ def __init__(self, topic: str, dask_client: dask.distributed.Client, **kwargs): super().__init__(topic, dask_client, **kwargs) @staticmethod - def process_alert(alert: Mapping, topic: str): - """ - Main function that runs on a single alert. - -Read top-level packet field - -Separate alert and prv_candidate in MongoDB prep + def process_alerts(avro_msg: bytes, topic: str, worker): + """Alert brokering task run by dask.distributed workers - :param alert: decoded alert from Kafka stream + :param avro_msg: avro message from Kafka stream :param topic: Kafka stream topic name for bookkeeping :return: """ - candid = alert["candid"] - object_id = alert["objectid"] # get worker running current task worker = dask.distributed.get_worker() alert_worker = worker.plugins["worker-init"].alert_worker - log(f"winter: {topic} {object_id} {candid} {worker.address}") + with timer("Decoding alert", alert_worker.verbose > 1): + msg_decoded = alert_worker.decode_message(avro_msg) + + for alert in msg_decoded: + candid = alert["candid"] + object_id = alert["objectid"] + if ( + retry( + alert_worker.mongo.db[ + alert_worker.collection_alerts + ].count_documents + )({"candid": candid}, limit=1) + == 1 + ): + # this alert has already been processed, skip it + log(f"Alert {object_id} {candid} already processed, skipping") + continue - # return if this alert packet has already been processed - # and ingested into collection_alerts: - if ( - alert_worker.mongo.db[alert_worker.collection_alerts].count_documents( - {"candid": candid}, limit=1 - ) - == 1 - ): - return + log(f"winter: {topic} {object_id} {candid} {worker.address}") - # candid not in db, ingest decoded avro packet into db - with timer(f"Mongification of {object_id} {candid}"): - alert, prv_candidates, _ = alert_worker.alert_mongify(alert) + # candid not in db, ingest decoded avro packet into db + with timer(f"Mongification of {object_id} {candid}"): + alert, prv_candidates, _ = alert_worker.alert_mongify(alert) - # future: add ML model filtering here + # future: add ML model filtering here - with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1): - alert_worker.mongo.insert_one( - collection=alert_worker.collection_alerts, document=alert - ) - - # prv_candidates: pop nulls - save space - prv_candidates = [ - {kk: vv for kk, vv in prv_candidate.items() if vv is not None} - for prv_candidate in prv_candidates - ] + with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1): + alert_worker.mongo.insert_one( + collection=alert_worker.collection_alerts, document=alert + ) - alert_aux, xmatches, xmatches_ztf, passed_filters = None, None, None, None - # cross-match with external catalogs if objectId not in collection_alerts_aux: - if ( - alert_worker.mongo.db[alert_worker.collection_alerts_aux].count_documents( - {"_id": object_id}, limit=1 - ) - == 0 - ): - with timer( - f"Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + # prv_candidates: pop nulls - save space + prv_candidates = [ + {kk: vv for kk, vv in prv_candidate.items() if vv is not None} + for prv_candidate in prv_candidates + ] + + alert_aux, xmatches, xmatches_ztf, passed_filters = None, None, None, None + # cross-match with external catalogs if objectId not in collection_alerts_aux: + if ( + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].count_documents({"_id": object_id}, limit=1) + == 0 ): - xmatches = alert_worker.alert_filter__xmatch(alert) + with timer( + f"Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + ): + xmatches = alert_worker.alert_filter__xmatch(alert) - # Crossmatch new alert with most recent ZTF_alerts and insert - with timer( - f"ZTF Cross-match of {object_id} {candid}", alert_worker.verbose > 1 - ): - xmatches = { - **xmatches, - **alert_worker.alert_filter__xmatch_ztf_alerts(alert), + # Crossmatch new alert with most recent ZTF_alerts and insert + with timer( + f"ZTF Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + ): + xmatches = { + **xmatches, + **alert_worker.alert_filter__xmatch_ztf_alerts(alert), + } + + alert_aux = { + "_id": object_id, + "cross_matches": xmatches, + "prv_candidates": prv_candidates, } - alert_aux = { - "_id": object_id, - "cross_matches": xmatches, - "prv_candidates": prv_candidates, - } + with timer( + f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1 + ): + alert_worker.mongo.insert_one( + collection=alert_worker.collection_alerts_aux, + document=alert_aux, + ) - with timer(f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1): - alert_worker.mongo.insert_one( - collection=alert_worker.collection_alerts_aux, document=alert_aux - ) + else: + with timer( + f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 + ): + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].update_one( + {"_id": object_id}, + {"$addToSet": {"prv_candidates": {"$each": prv_candidates}}}, + upsert=True, + ) - else: - with timer( - f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 - ): - alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one( - {"_id": object_id}, - {"$addToSet": {"prv_candidates": {"$each": prv_candidates}}}, - upsert=True, - ) + # Crossmatch exisiting alert with most recent record in ZTF_alerts and update aux + with timer( + f"Exists in aux: ZTF Cross-match of {object_id} {candid}", + alert_worker.verbose > 1, + ): + xmatches_ztf = alert_worker.alert_filter__xmatch_ztf_alerts(alert) - # Crossmatch exisiting alert with most recent record in ZTF_alerts and update aux - with timer( - f"Exists in aux: ZTF Cross-match of {object_id} {candid}", - alert_worker.verbose > 1, - ): - xmatches_ztf = alert_worker.alert_filter__xmatch_ztf_alerts(alert) + with timer( + f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 + ): + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].update_one( + {"_id": object_id}, + {"$set": {"cross_matches.ZTF_alerts": xmatches_ztf}}, + upsert=True, + ) - with timer( - f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 - ): - alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one( - {"_id": object_id}, - {"$set": {"cross_matches.ZTF_alerts": xmatches_ztf}}, - upsert=True, - ) + if config["misc"]["broker"]: + # winter has a different schema (fields have different names), + # so now that the alert packet has been ingested, we just add some aliases + # to avoid having to make exceptions all the time everywhere in the rest of the code + # not good memory-wise, but not worth adding if statements everywhere just for this... + alert["objectId"] = alert.get("objectid") + alert["cutoutScience"] = alert.get("cutout_science") + alert["cutoutTemplate"] = alert.get("cutout_template") + alert["cutoutDifference"] = alert.get("cutout_difference") + # execute user-defined alert filters + with timer( + f"Filtering of {object_id} {candid}", alert_worker.verbose > 1 + ): + passed_filters = alert_worker.alert_filter__user_defined( + alert_worker.filter_templates, alert + ) + if alert_worker.verbose > 1: + log( + f"{object_id} {candid} number of filters passed: {len(passed_filters)}" + ) - if config["misc"]["broker"]: - # winter has a different schema (fields have different names), - # so now that the alert packet has been ingested, we just add some aliases - # to avoid having to make exceptions all the time everywhere in the rest of the code - # not good memory-wise, but not worth adding if statements everywhere just for this... - alert["objectId"] = alert.get("objectid") - alert["cutoutScience"] = alert.get("cutout_science") - alert["cutoutTemplate"] = alert.get("cutout_template") - alert["cutoutDifference"] = alert.get("cutout_difference") - # execute user-defined alert filters - with timer(f"Filtering of {object_id} {candid}", alert_worker.verbose > 1): - passed_filters = alert_worker.alert_filter__user_defined( - alert_worker.filter_templates, alert - ) - if alert_worker.verbose > 1: - log( - f"{object_id} {candid} number of filters passed: {len(passed_filters)}" + # post to SkyPortal + alert_worker.alert_sentinel_skyportal( + alert, prv_candidates, passed_filters=passed_filters ) - # post to SkyPortal - alert_worker.alert_sentinel_skyportal( - alert, prv_candidates, passed_filters=passed_filters + # clean up after thyself + del ( + alert, + prv_candidates, + xmatches, + xmatches_ztf, + alert_aux, + passed_filters, + candid, + object_id, ) - # clean up after thyself - del ( - alert, - prv_candidates, - xmatches, - xmatches_ztf, - alert_aux, - passed_filters, - candid, - object_id, - ) + return class WNTRAlertWorker(AlertWorker, ABC): diff --git a/kowalski/alert_brokers/alert_broker_ztf.py b/kowalski/alert_brokers/alert_broker_ztf.py index 7191fa80..ef977683 100644 --- a/kowalski/alert_brokers/alert_broker_ztf.py +++ b/kowalski/alert_brokers/alert_broker_ztf.py @@ -11,7 +11,7 @@ import pandas as pd from abc import ABC from copy import deepcopy -from typing import Mapping, Sequence +from typing import Sequence import dask.distributed from kowalski.alert_brokers.alert_broker import AlertConsumer, AlertWorker, EopError @@ -33,195 +33,224 @@ def __init__(self, topic: str, dask_client: dask.distributed.Client, **kwargs): super().__init__(topic, dask_client, **kwargs) @staticmethod - def process_alert(alert: Mapping, topic: str): + def process_alerts(avro_msg: bytes, topic: str, worker): """Alert brokering task run by dask.distributed workers - :param alert: decoded alert from Kafka stream + :param avro_msg: avro message from Kafka stream :param topic: Kafka stream topic name for bookkeeping :return: """ - candid = alert["candid"] - object_id = alert["objectId"] # get worker running current task worker = dask.distributed.get_worker() alert_worker: ZTFAlertWorker = worker.plugins["worker-init"].alert_worker - log(f"{topic} {object_id} {candid} {worker.address}") + with timer("Decoding alert", alert_worker.verbose > 1): + msg_decoded = alert_worker.decode_message(avro_msg) - # return if this alert packet has already been processed and ingested into collection_alerts: - if ( - retry( - alert_worker.mongo.db[alert_worker.collection_alerts].count_documents - )({"candid": candid}, limit=1) - == 1 - ): - return - - # candid not in db, ingest decoded avro packet into db - with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1): - alert, prv_candidates, fp_hists = alert_worker.alert_mongify(alert) - - # create alert history - all_prv_candidates = deepcopy(prv_candidates) + [deepcopy(alert["candidate"])] - with timer( - f"Gather all previous candidates for {object_id} {candid}", - alert_worker.verbose > 1, - ): - # get all prv_candidates for this objectId: - existing_aux = retry( - alert_worker.mongo.db[alert_worker.collection_alerts_aux].find_one - )({"_id": object_id}, {"prv_candidates": 1}) + for alert in msg_decoded: + candid = alert["candid"] + object_id = alert["objectId"] if ( - existing_aux is not None - and len(existing_aux.get("prv_candidates", [])) > 0 + retry( + alert_worker.mongo.db[ + alert_worker.collection_alerts + ].count_documents + )({"candid": candid}, limit=1) + == 1 ): - all_prv_candidates += existing_aux["prv_candidates"] - - # get all alerts for this objectId: - existing_alerts = list( - alert_worker.mongo.db[alert_worker.collection_alerts].find( - {"objectId": object_id}, {"candidate": 1} - ) - ) - if len(existing_alerts) > 0: - all_prv_candidates += [ - existing_alert["candidate"] for existing_alert in existing_alerts - ] - del existing_aux, existing_alerts - - # ML models: - with timer(f"MLing of {object_id} {candid}", alert_worker.verbose > 1): - scores = alert_worker.alert_filter__ml(alert, all_prv_candidates) - alert["classifications"] = scores - - with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1): - retry(alert_worker.mongo.insert_one)( - collection=alert_worker.collection_alerts, document=alert - ) - - # prv_candidates: pop nulls - save space - prv_candidates = [ - {kk: vv for kk, vv in prv_candidate.items() if vv is not None} - for prv_candidate in prv_candidates - ] + # this alert has already been processed, skip it + log(f"Alert {object_id} {candid} already processed, skipping") + continue - # fp_hists: pop nulls - save space - fp_hists = [ - {kk: vv for kk, vv in fp_hist.items() if vv not in [None, -99999, -99999.0]} - for fp_hist in fp_hists - ] + log(f"ztf: {topic} {object_id} {candid} {worker.address}") - # format fp_hists, add alert_mag, alert_ra, alert_dec - # and computing the FP's mag, magerr, snr, limmag3sig, limmag5sig - fp_hists = alert_worker.format_fp_hists(alert, fp_hists) + # candid not in db, ingest decoded avro packet into db + with timer( + f"Mongification of {object_id} {candid}", alert_worker.verbose > 1 + ): + alert, prv_candidates, fp_hists = alert_worker.alert_mongify(alert) - alert_aux, xmatches, passed_filters = None, None, None - # cross-match with external catalogs if objectId not in collection_alerts_aux: - if ( - retry( - alert_worker.mongo.db[ - alert_worker.collection_alerts_aux - ].count_documents - )({"_id": object_id}, limit=1) - == 0 - ): + # create alert history + all_prv_candidates = deepcopy(prv_candidates) + [ + deepcopy(alert["candidate"]) + ] with timer( - f"Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + f"Gather all previous candidates for {object_id} {candid}", + alert_worker.verbose > 1, ): - xmatches = alert_worker.alert_filter__xmatch(alert) + # get all prv_candidates for this objectId: + existing_aux = retry( + alert_worker.mongo.db[alert_worker.collection_alerts_aux].find_one + )({"_id": object_id}, {"prv_candidates": 1}) + if ( + existing_aux is not None + and len(existing_aux.get("prv_candidates", [])) > 0 + ): + all_prv_candidates += existing_aux["prv_candidates"] - alert_aux = { - "_id": object_id, - "cross_matches": xmatches, - "prv_candidates": prv_candidates, - } + # get all alerts for this objectId: + existing_alerts = list( + alert_worker.mongo.db[alert_worker.collection_alerts].find( + {"objectId": object_id}, {"candidate": 1} + ) + ) + if len(existing_alerts) > 0: + all_prv_candidates += [ + existing_alert["candidate"] + for existing_alert in existing_alerts + ] + del existing_aux, existing_alerts - # only add the fp_hists if its a recent/new object, which we determine based on either: - # - ndethist <= 1, we never detected it before - # - we detected it before (maybe missed a few alerts), but the first detection was - # less than 30 days ago, which is the maximum time window of the incoming data - # which means that we still have a lightcurve that dates back to the first detection - if ( - alert["candidate"]["ndethist"] <= 1 - or (alert["candidate"]["jd"] - alert["candidate"].get("jdstarthist", 0)) - < 30 - ): - alert_aux["fp_hists"] = fp_hists - else: - # if we don't save it, empty the fp_hists array to not send to SkyPortal what is not saved here. - fp_hists = [] + # ML models: + with timer(f"MLing of {object_id} {candid}", alert_worker.verbose > 1): + scores = alert_worker.alert_filter__ml(alert, all_prv_candidates) + alert["classifications"] = scores - with timer(f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1): + with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1): retry(alert_worker.mongo.insert_one)( - collection=alert_worker.collection_alerts_aux, document=alert_aux + collection=alert_worker.collection_alerts, document=alert ) - else: - with timer( - f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 - ): - # update prv_candidates + # prv_candidates: pop nulls - save space + prv_candidates = [ + {kk: vv for kk, vv in prv_candidate.items() if vv is not None} + for prv_candidate in prv_candidates + ] + + # fp_hists: pop nulls - save space + fp_hists = [ + { + kk: vv + for kk, vv in fp_hist.items() + if vv not in [None, -99999, -99999.0] + } + for fp_hist in fp_hists + ] + + # format fp_hists, add alert_mag, alert_ra, alert_dec + # and computing the FP's mag, magerr, snr, limmag3sig, limmag5sig + fp_hists = alert_worker.format_fp_hists(alert, fp_hists) + + alert_aux, xmatches, passed_filters = None, None, None + # cross-match with external catalogs if objectId not in collection_alerts_aux: + if ( retry( - alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one - )( - {"_id": object_id}, - {"$addToSet": {"prv_candidates": {"$each": prv_candidates}}}, - upsert=True, - ) + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].count_documents + )({"_id": object_id}, limit=1) + == 0 + ): + with timer( + f"Cross-match of {object_id} {candid}", alert_worker.verbose > 1 + ): + xmatches = alert_worker.alert_filter__xmatch(alert) - # if there is no fp_hists for this object, we don't update anything - # the idea is that we start accumulating FP only for new objects, to avoid - # having some objects with incomplete FP history, which would be confusing for the filters - # either there is full FP, or there isn't any - # we also update the fp_hists array we have here with the updated 30-day window + alert_aux = { + "_id": object_id, + "cross_matches": xmatches, + "prv_candidates": prv_candidates, + } + + # only add the fp_hists if its a recent/new object, which we determine based on either: + # - ndethist <= 1, we never detected it before + # - we detected it before (maybe missed a few alerts), but the first detection was + # less than 30 days ago, which is the maximum time window of the incoming data + # which means that we still have a lightcurve that dates back to the first detection if ( + alert["candidate"]["ndethist"] <= 1 + or ( + alert["candidate"]["jd"] + - alert["candidate"].get("jdstarthist", 0) + ) + < 30 + ): + alert_aux["fp_hists"] = fp_hists + else: + # if we don't save it, empty the fp_hists array to not send to SkyPortal what is not saved here. + fp_hists = [] + + with timer( + f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1 + ): + retry(alert_worker.mongo.insert_one)( + collection=alert_worker.collection_alerts_aux, + document=alert_aux, + ) + + else: + with timer( + f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 + ): + # update prv_candidates retry( alert_worker.mongo.db[ alert_worker.collection_alerts_aux - ].count_documents + ].update_one )( - {"_id": alert["objectId"], "fp_hists": {"$exists": True}}, - limit=1, + {"_id": object_id}, + {"$addToSet": {"prv_candidates": {"$each": prv_candidates}}}, + upsert=True, ) - == 1 - ): - fp_hists = alert_worker.update_fp_hists(alert, fp_hists) - else: + # if there is no fp_hists for this object, we don't update anything - # and we empty the fp_hists array to not send to SkyPortal what is not saved here. - fp_hists = [] + # the idea is that we start accumulating FP only for new objects, to avoid + # having some objects with incomplete FP history, which would be confusing for the filters + # either there is full FP, or there isn't any + # we also update the fp_hists array we have here with the updated 30-day window + if ( + retry( + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].count_documents + )( + {"_id": alert["objectId"], "fp_hists": {"$exists": True}}, + limit=1, + ) + == 1 + ): + fp_hists = alert_worker.update_fp_hists(alert, fp_hists) + else: + # if there is no fp_hists for this object, we don't update anything + # and we empty the fp_hists array to not send to SkyPortal what is not saved here. + fp_hists = [] + + if config["misc"]["broker"]: + # execute user-defined alert filters + with timer( + f"Filtering of {object_id} {candid}", alert_worker.verbose > 1 + ): + passed_filters = alert_worker.alert_filter__user_defined( + alert_worker.filter_templates, alert + ) + if alert_worker.verbose > 1: + log( + f"{object_id} {candid} number of filters passed: {len(passed_filters)}" + ) - if config["misc"]["broker"]: - # execute user-defined alert filters - with timer(f"Filtering of {object_id} {candid}", alert_worker.verbose > 1): - passed_filters = alert_worker.alert_filter__user_defined( - alert_worker.filter_templates, alert - ) - if alert_worker.verbose > 1: - log( - f"{object_id} {candid} number of filters passed: {len(passed_filters)}" + # post to SkyPortal + alert_worker.alert_sentinel_skyportal( + alert, + prv_candidates, + fp_hists=fp_hists, + passed_filters=passed_filters, ) - # post to SkyPortal - alert_worker.alert_sentinel_skyportal( - alert, prv_candidates, fp_hists=fp_hists, passed_filters=passed_filters + # clean up after thyself + del ( + alert, + prv_candidates, + fp_hists, + all_prv_candidates, + scores, + xmatches, + alert_aux, + passed_filters, + candid, + object_id, ) - # clean up after thyself - del ( - alert, - prv_candidates, - fp_hists, - all_prv_candidates, - scores, - xmatches, - alert_aux, - passed_filters, - candid, - object_id, - ) - return From ec1568f2ac251dd8d48ac99848a9ee15c75df55f Mon Sep 17 00:00:00 2001 From: Theodlz Date: Thu, 7 Aug 2025 10:46:57 +0200 Subject: [PATCH 2/6] submit the msg.value() for processing, not the msg itself which is of type cimpl.Message and can't be serialized by Dask --- kowalski/alert_brokers/alert_broker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kowalski/alert_brokers/alert_broker.py b/kowalski/alert_brokers/alert_broker.py index 13649a3f..2cec7691 100644 --- a/kowalski/alert_brokers/alert_broker.py +++ b/kowalski/alert_brokers/alert_broker.py @@ -256,7 +256,7 @@ def poll(self): elif msg is not None: try: - self.submit_alert(msg) + self.submit_alert(msg.value()) except Exception as e: print("Error in poll!") From 801594f009b35425c36e470a7eaeb22f64285afe Mon Sep 17 00:00:00 2001 From: Theodlz Date: Thu, 7 Aug 2025 11:06:32 +0200 Subject: [PATCH 3/6] remove the .value() call in decode_message, since we now do it before we attempt decoding --- kowalski/alert_brokers/alert_broker.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/kowalski/alert_brokers/alert_broker.py b/kowalski/alert_brokers/alert_broker.py index 2cec7691..62f8d915 100644 --- a/kowalski/alert_brokers/alert_broker.py +++ b/kowalski/alert_brokers/alert_broker.py @@ -200,22 +200,21 @@ def decode_message(cls, msg): :param msg: The Kafka message result from consumer.poll() :return: """ - message = msg.value() - decoded_msg = message + decoded_msg = msg try: - bytes_io = io.BytesIO(message) + bytes_io = io.BytesIO(msg) decoded_msg = cls.read_schema_data(bytes_io) except AssertionError: decoded_msg = None except IndexError: literal_msg = literal_eval( - str(message, encoding="utf-8") + str(msg, encoding="utf-8") ) # works to give bytes bytes_io = io.BytesIO(literal_msg) # works to give decoded_msg = cls.read_schema_data(bytes_io) # yields reader except Exception: - decoded_msg = message + decoded_msg = msg finally: return decoded_msg From aaa367eb823771ae4c015e38f951c76516bdee68 Mon Sep 17 00:00:00 2001 From: Theodlz Date: Thu, 7 Aug 2025 11:07:13 +0200 Subject: [PATCH 4/6] update docstring --- kowalski/alert_brokers/alert_broker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kowalski/alert_brokers/alert_broker.py b/kowalski/alert_brokers/alert_broker.py index 62f8d915..10dd4269 100644 --- a/kowalski/alert_brokers/alert_broker.py +++ b/kowalski/alert_brokers/alert_broker.py @@ -197,7 +197,7 @@ def decode_message(cls, msg): """ Decode Avro message according to a schema. - :param msg: The Kafka message result from consumer.poll() + :param msg: The Kafka message.value() from consumer.poll() :return: """ decoded_msg = msg From 95d45c3726c40e015c922957a36f1234ba6c5b57 Mon Sep 17 00:00:00 2001 From: Theodlz Date: Thu, 7 Aug 2025 13:46:49 +0200 Subject: [PATCH 5/6] move the decode_message method to the AlertWorker class --- kowalski/alert_brokers/alert_broker.py | 74 +++++++++---------- kowalski/alert_brokers/alert_broker_pgir.py | 2 +- kowalski/alert_brokers/alert_broker_turbo.py | 2 +- kowalski/alert_brokers/alert_broker_winter.py | 2 +- kowalski/alert_brokers/alert_broker_ztf.py | 2 +- 5 files changed, 41 insertions(+), 41 deletions(-) diff --git a/kowalski/alert_brokers/alert_broker.py b/kowalski/alert_brokers/alert_broker.py index 10dd4269..d5266f0a 100644 --- a/kowalski/alert_brokers/alert_broker.py +++ b/kowalski/alert_brokers/alert_broker.py @@ -181,43 +181,6 @@ def on_assign(consumer, partitions, _self=self): log("Finished AlertConsumer setup") - @staticmethod - def read_schema_data(bytes_io): - """Read data that already has an Avro schema. - - :param bytes_io: `_io.BytesIO` Data to be decoded. - :return: `dict` Decoded data. - """ - bytes_io.seek(0) - message = fastavro.reader(bytes_io) - return message - - @classmethod - def decode_message(cls, msg): - """ - Decode Avro message according to a schema. - - :param msg: The Kafka message.value() from consumer.poll() - :return: - """ - decoded_msg = msg - - try: - bytes_io = io.BytesIO(msg) - decoded_msg = cls.read_schema_data(bytes_io) - except AssertionError: - decoded_msg = None - except IndexError: - literal_msg = literal_eval( - str(msg, encoding="utf-8") - ) # works to give bytes - bytes_io = io.BytesIO(literal_msg) # works to give - decoded_msg = cls.read_schema_data(bytes_io) # yields reader - except Exception: - decoded_msg = msg - finally: - return decoded_msg - @staticmethod def process_alerts(avro_msg: bytes, topic: str): """Alert brokering task run by dask.distributed workers @@ -488,6 +451,43 @@ def __init__(self, **kwargs): ) log("AlertWorker setup complete") + @staticmethod + def read_schema_data(bytes_io): + """Read data that already has an Avro schema. + + :param bytes_io: `_io.BytesIO` Data to be decoded. + :return: `dict` Decoded data. + """ + bytes_io.seek(0) + message = fastavro.reader(bytes_io) + return message + + @classmethod + def decode_message(cls, msg): + """ + Decode Avro message according to a schema. + + :param msg: The Kafka message.value() from consumer.poll() + :return: + """ + decoded_msg = msg + + try: + bytes_io = io.BytesIO(msg) + decoded_msg = cls.read_schema_data(bytes_io) + except AssertionError: + decoded_msg = None + except IndexError: + literal_msg = literal_eval( + str(msg, encoding="utf-8") + ) # works to give bytes + bytes_io = io.BytesIO(literal_msg) # works to give + decoded_msg = cls.read_schema_data(bytes_io) # yields reader + except Exception: + decoded_msg = msg + finally: + return decoded_msg + def _api_skyportal( self, session: requests.Session, diff --git a/kowalski/alert_brokers/alert_broker_pgir.py b/kowalski/alert_brokers/alert_broker_pgir.py index f8d171d9..501240e5 100644 --- a/kowalski/alert_brokers/alert_broker_pgir.py +++ b/kowalski/alert_brokers/alert_broker_pgir.py @@ -31,7 +31,7 @@ def __init__(self, topic: str, dask_client: dask.distributed.Client, **kwargs): super().__init__(topic, dask_client, **kwargs) @staticmethod - def process_alerts(avro_msg: bytes, topic: str, worker): + def process_alerts(avro_msg: bytes, topic): """Alert brokering task run by dask.distributed workers :param avro_msg: avro message from Kafka stream diff --git a/kowalski/alert_brokers/alert_broker_turbo.py b/kowalski/alert_brokers/alert_broker_turbo.py index f27d265f..0a4b8894 100644 --- a/kowalski/alert_brokers/alert_broker_turbo.py +++ b/kowalski/alert_brokers/alert_broker_turbo.py @@ -31,7 +31,7 @@ def __init__(self, topic: str, dask_client: dask.distributed.Client, **kwargs): super().__init__(topic, dask_client, **kwargs) @staticmethod - def process_alerts(avro_msg: bytes, topic: str, worker): + def process_alerts(avro_msg: bytes, topic: str): """Alert brokering task run by dask.distributed workers :param avro_msg: avro message from Kafka stream diff --git a/kowalski/alert_brokers/alert_broker_winter.py b/kowalski/alert_brokers/alert_broker_winter.py index 4bf17c90..9bb96faf 100644 --- a/kowalski/alert_brokers/alert_broker_winter.py +++ b/kowalski/alert_brokers/alert_broker_winter.py @@ -47,7 +47,7 @@ def __init__(self, topic: str, dask_client: dask.distributed.Client, **kwargs): super().__init__(topic, dask_client, **kwargs) @staticmethod - def process_alerts(avro_msg: bytes, topic: str, worker): + def process_alerts(avro_msg: bytes, topic: str): """Alert brokering task run by dask.distributed workers :param avro_msg: avro message from Kafka stream diff --git a/kowalski/alert_brokers/alert_broker_ztf.py b/kowalski/alert_brokers/alert_broker_ztf.py index ef977683..c8376c20 100644 --- a/kowalski/alert_brokers/alert_broker_ztf.py +++ b/kowalski/alert_brokers/alert_broker_ztf.py @@ -33,7 +33,7 @@ def __init__(self, topic: str, dask_client: dask.distributed.Client, **kwargs): super().__init__(topic, dask_client, **kwargs) @staticmethod - def process_alerts(avro_msg: bytes, topic: str, worker): + def process_alerts(avro_msg: bytes, topic: str): """Alert brokering task run by dask.distributed workers :param avro_msg: avro message from Kafka stream From 41fc87e4bdd682e8eb9360ace73b21a5e11aff28 Mon Sep 17 00:00:00 2001 From: Theodlz Date: Thu, 7 Aug 2025 14:02:11 +0200 Subject: [PATCH 6/6] fix turbo's objectId access --- kowalski/alert_brokers/alert_broker_turbo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kowalski/alert_brokers/alert_broker_turbo.py b/kowalski/alert_brokers/alert_broker_turbo.py index 0a4b8894..d52f95e9 100644 --- a/kowalski/alert_brokers/alert_broker_turbo.py +++ b/kowalski/alert_brokers/alert_broker_turbo.py @@ -48,7 +48,7 @@ def process_alerts(avro_msg: bytes, topic: str): for alert in msg_decoded: candid = alert["candid"] - object_id = alert["objectid"] + object_id = alert["objectId"] if ( retry( alert_worker.mongo.db[