Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 44 additions & 68 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,72 +182,26 @@ def on_assign(consumer, partitions, _self=self):
log("Finished AlertConsumer setup")

@staticmethod
def read_schema_data(bytes_io):
"""Read data that already has an Avro schema.

:param bytes_io: `_io.BytesIO` Data to be decoded.
:return: `dict` Decoded data.
"""
bytes_io.seek(0)
message = fastavro.reader(bytes_io)
return message

@classmethod
def decode_message(cls, msg):
"""
Decode Avro message according to a schema.

:param msg: The Kafka message result from consumer.poll()
:return:
"""
message = msg.value()
decoded_msg = message

try:
bytes_io = io.BytesIO(message)
decoded_msg = cls.read_schema_data(bytes_io)
except AssertionError:
decoded_msg = None
except IndexError:
literal_msg = literal_eval(
str(message, encoding="utf-8")
) # works to give bytes
bytes_io = io.BytesIO(literal_msg) # works to give <class '_io.BytesIO'>
decoded_msg = cls.read_schema_data(bytes_io) # yields reader
except Exception:
decoded_msg = message
finally:
return decoded_msg

@staticmethod
def process_alert(alert: Mapping, topic: str):
def process_alerts(avro_msg: bytes, topic: str):
"""Alert brokering task run by dask.distributed workers

:param alert: decoded alert from Kafka stream
:param avro_msg: avro message from Kafka stream
:param topic: Kafka stream topic name for bookkeeping
:return:
"""
raise NotImplementedError("Must be implemented in subclass")

def submit_alert(self, record: Mapping):
# we look for objectId and objectid if missing,
# to support both ZTF and WNTR alert schemas
objectId = record.get("objectId", record.get("objectid", None))
if objectId is None:
log(
f"Failed to get objectId from record {record}, skipping alert submission"
)
return
def submit_alert(self, avro_msg: bytes):
with timer(
f"Submitting alert {objectId} {record['candid']} for processing",
"Submitting alert for processing",
self.verbose > 1,
):
future = self.dask_client.submit(
self.process_alert, record, self.topic, pure=True
self.process_alerts, avro_msg, self.topic, pure=True
)
dask.distributed.fire_and_forget(future)
future.release()
del future, record
del future, avro_msg # clean up after thyself
return

def poll(self):
Expand All @@ -264,22 +218,7 @@ def poll(self):

elif msg is not None:
try:
# decode avro packet
with timer("Decoding alert", self.verbose > 1):
msg_decoded = self.decode_message(msg)

for record in msg_decoded:
if (
retry(self.mongo.db[self.collection_alerts].count_documents)(
{"candid": record["candid"]}, limit=1
)
== 0
):

self.submit_alert(record)

# clean up after thyself
del msg_decoded
self.submit_alert(msg.value())

except Exception as e:
print("Error in poll!")
Expand Down Expand Up @@ -512,6 +451,43 @@ def __init__(self, **kwargs):
)
log("AlertWorker setup complete")

@staticmethod
def read_schema_data(bytes_io):
"""Read data that already has an Avro schema.

:param bytes_io: `_io.BytesIO` Data to be decoded.
:return: `dict` Decoded data.
"""
bytes_io.seek(0)
message = fastavro.reader(bytes_io)
return message

@classmethod
def decode_message(cls, msg):
"""
Decode Avro message according to a schema.

:param msg: The Kafka message.value() from consumer.poll()
:return:
"""
decoded_msg = msg

try:
bytes_io = io.BytesIO(msg)
decoded_msg = cls.read_schema_data(bytes_io)
except AssertionError:
decoded_msg = None
except IndexError:
literal_msg = literal_eval(
str(msg, encoding="utf-8")
) # works to give bytes
bytes_io = io.BytesIO(literal_msg) # works to give <class '_io.BytesIO'>
decoded_msg = cls.read_schema_data(bytes_io) # yields reader
except Exception:
decoded_msg = msg
finally:
return decoded_msg

def _api_skyportal(
self,
session: requests.Session,
Expand Down
Loading